Commit 6d136452 by Daniel Lee

mysql: update to use ColumnTypes interface in new version

The custom code in vendor is not needed anymore and most of the
mapping code can be replaced.
parent 7bc7af6c
...@@ -5,8 +5,8 @@ import ( ...@@ -5,8 +5,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"reflect"
"strconv" "strconv"
"time" "time"
"github.com/go-sql-driver/mysql" "github.com/go-sql-driver/mysql"
...@@ -73,24 +73,36 @@ func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, ...@@ -73,24 +73,36 @@ func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows,
table.Columns[i].Text = name table.Columns[i].Text = name
} }
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
rowLimit := 1000000 rowLimit := 1000000
rowCount := 0 rowCount := 0
timeIndex := -1
// check if there is a column named time
for i, col := range columnNames {
switch col {
case "time_sec":
timeIndex = i
}
}
for ; rows.Next(); rowCount++ { for ; rows.Next(); rowCount++ {
if rowCount > rowLimit { if rowCount > rowLimit {
return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit) return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
} }
values, err := e.getTypedRowData(columnTypes, rows) values, err := e.getTypedRowData(rows)
if err != nil { if err != nil {
return err return err
} }
// for annotations, convert to epoch
if timeIndex != -1 {
switch value := values[timeIndex].(type) {
case time.Time:
values[timeIndex] = float64(value.UnixNano() / 1e9)
}
}
table.Rows = append(table.Rows, values) table.Rows = append(table.Rows, values)
} }
...@@ -99,60 +111,20 @@ func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, ...@@ -99,60 +111,20 @@ func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows,
return nil return nil
} }
func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) { func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) {
types, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
values := make([]interface{}, len(types)) values := make([]interface{}, len(types))
for i, stype := range types { for i := range values {
e.log.Debug("type", "type", stype) scanType := types[i].ScanType()
switch stype.DatabaseTypeName() { values[i] = reflect.New(scanType).Interface()
case mysql.FieldTypeNameTiny:
values[i] = new(int8) if types[i].DatabaseTypeName() == "BIT" {
case mysql.FieldTypeNameInt24:
values[i] = new(int32)
case mysql.FieldTypeNameShort:
values[i] = new(int16)
case mysql.FieldTypeNameVarString:
values[i] = new(string)
case mysql.FieldTypeNameVarChar:
values[i] = new(string)
case mysql.FieldTypeNameLong:
values[i] = new(int)
case mysql.FieldTypeNameLongLong:
values[i] = new(int64)
case mysql.FieldTypeNameDouble:
values[i] = new(float64)
case mysql.FieldTypeNameDecimal:
values[i] = new(float32)
case mysql.FieldTypeNameNewDecimal:
values[i] = new(float64)
case mysql.FieldTypeNameFloat:
values[i] = new(float64)
case mysql.FieldTypeNameTimestamp:
values[i] = new(time.Time)
case mysql.FieldTypeNameDateTime:
values[i] = new(time.Time)
case mysql.FieldTypeNameTime:
values[i] = new(string)
case mysql.FieldTypeNameYear:
values[i] = new(int16)
case mysql.FieldTypeNameNULL:
values[i] = nil
case mysql.FieldTypeNameBit:
values[i] = new([]byte) values[i] = new([]byte)
case mysql.FieldTypeNameBLOB:
values[i] = new(string)
case mysql.FieldTypeNameTinyBLOB:
values[i] = new(string)
case mysql.FieldTypeNameMediumBLOB:
values[i] = new(string)
case mysql.FieldTypeNameLongBLOB:
values[i] = new(string)
case mysql.FieldTypeNameString:
values[i] = new(string)
case mysql.FieldTypeNameDate:
values[i] = new(string)
default:
return nil, fmt.Errorf("Database type %s not supported", stype.DatabaseTypeName())
} }
} }
...@@ -160,14 +132,54 @@ func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core. ...@@ -160,14 +132,54 @@ func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.
return nil, err return nil, err
} }
for i := 0; i < len(types); i++ {
typeName := reflect.ValueOf(values[i]).Type().String()
switch typeName {
case "*sql.RawBytes":
values[i] = string(*values[i].(*sql.RawBytes))
case "*mysql.NullTime":
sqlTime := (*values[i].(*mysql.NullTime))
if sqlTime.Valid {
values[i] = sqlTime.Time
} else {
values[i] = nil
}
case "*sql.NullInt64":
nullInt64 := (*values[i].(*sql.NullInt64))
if nullInt64.Valid {
values[i] = nullInt64.Int64
} else {
values[i] = nil
}
case "*sql.NullFloat64":
nullFloat64 := (*values[i].(*sql.NullFloat64))
if nullFloat64.Valid {
values[i] = nullFloat64.Float64
} else {
values[i] = nil
}
}
if types[i].DatabaseTypeName() == "DECIMAL" {
f, err := strconv.ParseFloat(values[i].(string), 64)
if err == nil {
values[i] = f
} else {
values[i] = nil
}
}
}
return values, nil return values, nil
} }
func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error { func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
pointsBySeries := make(map[string]*tsdb.TimeSeries) pointsBySeries := make(map[string]*tsdb.TimeSeries)
seriesByQueryOrder := list.New() seriesByQueryOrder := list.New()
columnNames, err := rows.Columns()
columnNames, err := rows.Columns()
if err != nil { if err != nil {
return err return err
} }
......
...@@ -30,19 +30,19 @@ func TestMySQL(t *testing.T) { ...@@ -30,19 +30,19 @@ func TestMySQL(t *testing.T) {
defer sess.Close() defer sess.Close()
sql := "CREATE TABLE `mysql_types` (" sql := "CREATE TABLE `mysql_types` ("
sql += "`atinyint` tinyint(1)," sql += "`atinyint` tinyint(1) NOT NULL,"
sql += "`avarchar` varchar(3)," sql += "`avarchar` varchar(3) NOT NULL,"
sql += "`achar` char(3)," sql += "`achar` char(3),"
sql += "`amediumint` mediumint," sql += "`amediumint` mediumint NOT NULL,"
sql += "`asmallint` smallint," sql += "`asmallint` smallint NOT NULL,"
sql += "`abigint` bigint," sql += "`abigint` bigint NOT NULL,"
sql += "`aint` int(11)," sql += "`aint` int(11) NOT NULL,"
sql += "`adouble` double(10,2)," sql += "`adouble` double(10,2),"
sql += "`anewdecimal` decimal(10,2)," sql += "`anewdecimal` decimal(10,2),"
sql += "`afloat` float(10,2)," sql += "`afloat` float(10,2) NOT NULL,"
sql += "`atimestamp` timestamp NOT NULL," sql += "`atimestamp` timestamp NOT NULL,"
sql += "`adatetime` datetime," sql += "`adatetime` datetime NOT NULL,"
sql += "`atime` time," sql += "`atime` time NOT NULL,"
// sql += "`ayear` year," // Crashes xorm when running cleandb // sql += "`ayear` year," // Crashes xorm when running cleandb
sql += "`abit` bit(1)," sql += "`abit` bit(1),"
sql += "`atinytext` tinytext," sql += "`atinytext` tinytext,"
...@@ -55,7 +55,12 @@ func TestMySQL(t *testing.T) { ...@@ -55,7 +55,12 @@ func TestMySQL(t *testing.T) {
sql += "`alongblob` longblob," sql += "`alongblob` longblob,"
sql += "`aenum` enum('val1', 'val2')," sql += "`aenum` enum('val1', 'val2'),"
sql += "`aset` set('a', 'b', 'c', 'd')," sql += "`aset` set('a', 'b', 'c', 'd'),"
sql += "`adate` date" sql += "`adate` date,"
sql += "`time_sec` datetime(6),"
sql += "`aintnull` int(11),"
sql += "`afloatnull` float(10,2),"
sql += "`avarcharnull` varchar(3),"
sql += "`adecimalnull` decimal(10,2)"
sql += ") ENGINE=InnoDB DEFAULT CHARSET=latin1;" sql += ") ENGINE=InnoDB DEFAULT CHARSET=latin1;"
_, err := sess.Exec(sql) _, err := sess.Exec(sql)
So(err, ShouldBeNil) So(err, ShouldBeNil)
...@@ -64,11 +69,11 @@ func TestMySQL(t *testing.T) { ...@@ -64,11 +69,11 @@ func TestMySQL(t *testing.T) {
sql += "(`atinyint`, `avarchar`, `achar`, `amediumint`, `asmallint`, `abigint`, `aint`, `adouble`, " sql += "(`atinyint`, `avarchar`, `achar`, `amediumint`, `asmallint`, `abigint`, `aint`, `adouble`, "
sql += "`anewdecimal`, `afloat`, `adatetime`, `atimestamp`, `atime`, `abit`, `atinytext`, " sql += "`anewdecimal`, `afloat`, `adatetime`, `atimestamp`, `atime`, `abit`, `atinytext`, "
sql += "`atinyblob`, `atext`, `ablob`, `amediumtext`, `amediumblob`, `alongtext`, `alongblob`, " sql += "`atinyblob`, `atext`, `ablob`, `amediumtext`, `amediumblob`, `alongtext`, `alongblob`, "
sql += "`aenum`, `aset`, `adate`) " sql += "`aenum`, `aset`, `adate`, `time_sec`) "
sql += "VALUES(1, 'abc', 'def', 1, 10, 100, 1420070400, 1.11, " sql += "VALUES(1, 'abc', 'def', 1, 10, 100, 1420070400, 1.11, "
sql += "2.22, 3.33, now(), current_timestamp(), '11:11:11', 1, 'tinytext', " sql += "2.22, 3.33, now(), current_timestamp(), '11:11:11', 1, 'tinytext', "
sql += "'tinyblob', 'text', 'blob', 'mediumtext', 'mediumblob', 'longtext', 'longblob', " sql += "'tinyblob', 'text', 'blob', 'mediumtext', 'mediumblob', 'longtext', 'longblob', "
sql += "'val2', 'a,b', curdate());" sql += "'val2', 'a,b', curdate(), '2018-01-01 00:01:01.123456');"
_, err = sess.Exec(sql) _, err = sess.Exec(sql)
So(err, ShouldBeNil) So(err, ShouldBeNil)
...@@ -90,32 +95,38 @@ func TestMySQL(t *testing.T) { ...@@ -90,32 +95,38 @@ func TestMySQL(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
column := queryResult.Tables[0].Rows[0] column := queryResult.Tables[0].Rows[0]
So(*column[0].(*int8), ShouldEqual, 1) So(*column[0].(*int8), ShouldEqual, 1)
So(*column[1].(*string), ShouldEqual, "abc") So(column[1].(string), ShouldEqual, "abc")
So(*column[2].(*string), ShouldEqual, "def") So(column[2].(string), ShouldEqual, "def")
So(*column[3].(*int32), ShouldEqual, 1) So(*column[3].(*int32), ShouldEqual, 1)
So(*column[4].(*int16), ShouldEqual, 10) So(*column[4].(*int16), ShouldEqual, 10)
So(*column[5].(*int64), ShouldEqual, 100) So(*column[5].(*int64), ShouldEqual, 100)
So(*column[6].(*int), ShouldEqual, 1420070400) So(*column[6].(*int32), ShouldEqual, 1420070400)
So(*column[7].(*float64), ShouldEqual, 1.11) So(column[7].(float64), ShouldEqual, 1.11)
So(*column[8].(*float64), ShouldEqual, 2.22) So(column[8].(float64), ShouldEqual, 2.22)
So(*column[9].(*float64), ShouldEqual, 3.33) So(*column[9].(*float32), ShouldEqual, 3.33)
_, offset := time.Now().Zone() _, offset := time.Now().Zone()
So((*column[10].(*time.Time)), ShouldHappenWithin, time.Duration(10*time.Second), time.Now().Add(time.Duration(offset)*time.Second)) So(column[10].(time.Time), ShouldHappenWithin, time.Duration(10*time.Second), time.Now().Add(time.Duration(offset)*time.Second))
So(*column[11].(*time.Time), ShouldHappenWithin, time.Duration(10*time.Second), time.Now().Add(time.Duration(offset)*time.Second)) So(column[11].(time.Time), ShouldHappenWithin, time.Duration(10*time.Second), time.Now().Add(time.Duration(offset)*time.Second))
So(*column[12].(*string), ShouldEqual, "11:11:11") So(column[12].(string), ShouldEqual, "11:11:11")
So(*column[13].(*[]byte), ShouldHaveSameTypeAs, []byte{1}) So(*column[13].(*[]byte), ShouldHaveSameTypeAs, []byte{1})
So(*column[14].(*string), ShouldEqual, "tinytext") So(column[14].(string), ShouldEqual, "tinytext")
So(*column[15].(*string), ShouldEqual, "tinyblob") So(column[15].(string), ShouldEqual, "tinyblob")
So(*column[16].(*string), ShouldEqual, "text") So(column[16].(string), ShouldEqual, "text")
So(*column[17].(*string), ShouldEqual, "blob") So(column[17].(string), ShouldEqual, "blob")
So(*column[18].(*string), ShouldEqual, "mediumtext") So(column[18].(string), ShouldEqual, "mediumtext")
So(*column[19].(*string), ShouldEqual, "mediumblob") So(column[19].(string), ShouldEqual, "mediumblob")
So(*column[20].(*string), ShouldEqual, "longtext") So(column[20].(string), ShouldEqual, "longtext")
So(*column[21].(*string), ShouldEqual, "longblob") So(column[21].(string), ShouldEqual, "longblob")
So(*column[22].(*string), ShouldEqual, "val2") So(column[22].(string), ShouldEqual, "val2")
So(*column[23].(*string), ShouldEqual, "a,b") So(column[23].(string), ShouldEqual, "a,b")
So(*column[24].(*string), ShouldEqual, time.Now().Format("2006-01-02T00:00:00Z")) So(column[24].(time.Time).Format("2006-01-02T00:00:00Z"), ShouldEqual, time.Now().Format("2006-01-02T00:00:00Z"))
So(column[25].(float64), ShouldEqual, 1514764861)
So(column[26], ShouldEqual, nil)
So(column[27], ShouldEqual, nil)
So(column[28], ShouldEqual, "")
So(column[29], ShouldEqual, nil)
}) })
}) })
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment