Commit 4afd4001 by Kyle Brandt Committed by GitHub

transform: update to use sdk with frame.labels moved to frame.[]field.labels (#20670)

sdk v0.4.0
parent 1776c11f
...@@ -31,7 +31,7 @@ require ( ...@@ -31,7 +31,7 @@ require (
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/gosimple/slug v1.4.2 github.com/gosimple/slug v1.4.2
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.2.0 github.com/grafana/grafana-plugin-sdk-go v0.4.0
github.com/hashicorp/go-hclog v0.8.0 github.com/hashicorp/go-hclog v0.8.0
github.com/hashicorp/go-plugin v1.0.1 github.com/hashicorp/go-plugin v1.0.1
github.com/hashicorp/go-version v1.1.0 github.com/hashicorp/go-version v1.1.0
......
...@@ -123,6 +123,10 @@ github.com/grafana/grafana-plugin-sdk-go v0.1.1-0.20191115194829-9bfc5937b406 h1 ...@@ -123,6 +123,10 @@ github.com/grafana/grafana-plugin-sdk-go v0.1.1-0.20191115194829-9bfc5937b406 h1
github.com/grafana/grafana-plugin-sdk-go v0.1.1-0.20191115194829-9bfc5937b406/go.mod h1:yA268OaX+C71ubT39tyACEfFwyhEzS1kbEVHUCgkKS8= github.com/grafana/grafana-plugin-sdk-go v0.1.1-0.20191115194829-9bfc5937b406/go.mod h1:yA268OaX+C71ubT39tyACEfFwyhEzS1kbEVHUCgkKS8=
github.com/grafana/grafana-plugin-sdk-go v0.2.0 h1:MgcTjCuzIkZcjb/2vCPK1RvLEHfRnQtFK7AF0W3SQm0= github.com/grafana/grafana-plugin-sdk-go v0.2.0 h1:MgcTjCuzIkZcjb/2vCPK1RvLEHfRnQtFK7AF0W3SQm0=
github.com/grafana/grafana-plugin-sdk-go v0.2.0/go.mod h1:yA268OaX+C71ubT39tyACEfFwyhEzS1kbEVHUCgkKS8= github.com/grafana/grafana-plugin-sdk-go v0.2.0/go.mod h1:yA268OaX+C71ubT39tyACEfFwyhEzS1kbEVHUCgkKS8=
github.com/grafana/grafana-plugin-sdk-go v0.3.1-0.20191125180836-d77f6ffe8e05 h1:COdehD2bs2CJ3zrGAOueGrqCOaCG/M9aYiO4y+J4MUk=
github.com/grafana/grafana-plugin-sdk-go v0.3.1-0.20191125180836-d77f6ffe8e05/go.mod h1:yA268OaX+C71ubT39tyACEfFwyhEzS1kbEVHUCgkKS8=
github.com/grafana/grafana-plugin-sdk-go v0.4.0 h1:bypT7gwGL9i584JEUQ1twcLxoUPO/60XW3VM8VYndYI=
github.com/grafana/grafana-plugin-sdk-go v0.4.0/go.mod h1:yA268OaX+C71ubT39tyACEfFwyhEzS1kbEVHUCgkKS8=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI= github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
github.com/hashicorp/go-hclog v0.8.0 h1:z3ollgGRg8RjfJH6UVBaG54R70GFd++QOkvnJH3VSBY= github.com/hashicorp/go-hclog v0.8.0 h1:z3ollgGRg8RjfJH6UVBaG54R70GFd++QOkvnJH3VSBY=
github.com/hashicorp/go-hclog v0.8.0/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-hclog v0.8.0/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
......
...@@ -13,9 +13,9 @@ func SeriesToFrame(series *TimeSeries) (*dataframe.Frame, error) { ...@@ -13,9 +13,9 @@ func SeriesToFrame(series *TimeSeries) (*dataframe.Frame, error) {
for idx, point := range series.Points { for idx, point := range series.Points {
timeVec[idx], floatVec[idx] = convertTSDBTimePoint(point) timeVec[idx], floatVec[idx] = convertTSDBTimePoint(point)
} }
frame := dataframe.New(series.Name, dataframe.Labels(series.Tags), frame := dataframe.New(series.Name,
dataframe.NewField("time", timeVec), dataframe.NewField("time", nil, timeVec),
dataframe.NewField("value", floatVec), dataframe.NewField("value", dataframe.Labels(series.Tags), floatVec),
) )
return frame, nil return frame, nil
......
...@@ -80,7 +80,8 @@ func buildArrowFields(f *Frame) ([]arrow.Field, error) { ...@@ -80,7 +80,8 @@ func buildArrowFields(f *Frame) ([]arrow.Field, error) {
} }
fieldMeta := map[string]string{ fieldMeta := map[string]string{
"name": field.Name, "name": field.Name,
"labels": field.Labels.String(),
} }
arrowFields[i] = arrow.Field{ arrowFields[i] = arrow.Field{
...@@ -146,10 +147,6 @@ func buildArrowSchema(f *Frame, fs []arrow.Field) (*arrow.Schema, error) { ...@@ -146,10 +147,6 @@ func buildArrowSchema(f *Frame, fs []arrow.Field) (*arrow.Schema, error) {
"refId": f.RefID, "refId": f.RefID,
} }
if f.Labels != nil {
tableMetaMap["labels"] = f.Labels.String()
}
tableMeta := arrow.MetadataFrom(tableMetaMap) tableMeta := arrow.MetadataFrom(tableMetaMap)
return arrow.NewSchema(fs, &tableMeta), nil return arrow.NewSchema(fs, &tableMeta), nil
...@@ -195,39 +192,27 @@ func fieldToArrow(f *Field) (arrow.DataType, bool, error) { ...@@ -195,39 +192,27 @@ func fieldToArrow(f *Field) (arrow.DataType, bool, error) {
} }
} }
// UnMarshalArrow converts a byte representation of an arrow table to a Frame func getMDKey(key string, metaData arrow.Metadata) (string, bool) {
// TODO: Break up this function. idx := metaData.FindKey(key)
func UnMarshalArrow(b []byte) (*Frame, error) { if idx < 0 {
fB := filebuffer.New(b) return "", false
fR, err := ipc.NewFileReader(fB)
if err != nil {
return nil, err
} }
defer fR.Close() return metaData.Values()[idx], true
}
schema := fR.Schema() func initializeFrameFields(schema *arrow.Schema, frame *Frame) ([]bool, error) {
metaData := schema.Metadata()
frame := &Frame{}
getMDKey := func(key string) (string, bool) {
idx := metaData.FindKey(key)
if idx < 0 {
return "", false
}
return metaData.Values()[idx], true
}
frame.Name, _ = getMDKey("name") // No need to check ok, zero value ("") is returned
frame.RefID, _ = getMDKey("refId")
if labelsAsString, ok := getMDKey("labels"); ok {
frame.Labels, err = LabelsFromString(labelsAsString)
if err != nil {
return nil, err
}
}
nullable := make([]bool, len(schema.Fields())) nullable := make([]bool, len(schema.Fields()))
for idx, field := range schema.Fields() { for idx, field := range schema.Fields() {
sdkField := &Field{ sdkField := &Field{
Name: field.Name, Name: field.Name,
} }
if labelsAsString, ok := getMDKey("labels", field.Metadata); ok {
var err error
sdkField.Labels, err = LabelsFromString(labelsAsString)
if err != nil {
return nil, err
}
}
nullable[idx] = field.Nullable nullable[idx] = field.Nullable
switch field.Type.ID() { switch field.Type.ID() {
case arrow.STRING: case arrow.STRING:
...@@ -267,18 +252,21 @@ func UnMarshalArrow(b []byte) (*Frame, error) { ...@@ -267,18 +252,21 @@ func UnMarshalArrow(b []byte) (*Frame, error) {
} }
sdkField.Vector = newTimeVector(0) sdkField.Vector = newTimeVector(0)
default: default:
return nil, fmt.Errorf("unsupported conversion from arrow to sdk type for arrow type %v", field.Type.ID().String()) return nullable, fmt.Errorf("unsupported conversion from arrow to sdk type for arrow type %v", field.Type.ID().String())
} }
frame.Fields = append(frame.Fields, sdkField) frame.Fields = append(frame.Fields, sdkField)
} }
return nullable, nil
}
func populateFrameFields(fR *ipc.FileReader, nullable []bool, frame *Frame) error {
for { for {
record, err := fR.Read() record, err := fR.Read()
if err == io.EOF { if err == io.EOF {
break break
} }
if err != nil { if err != nil {
return nil, err return err
} }
for i := 0; i < len(frame.Fields); i++ { for i := 0; i < len(frame.Fields); i++ {
col := record.Column(i) col := record.Column(i)
...@@ -374,10 +362,35 @@ func UnMarshalArrow(b []byte) (*Frame, error) { ...@@ -374,10 +362,35 @@ func UnMarshalArrow(b []byte) (*Frame, error) {
frame.Fields[i].Vector.Append(t) frame.Fields[i].Vector.Append(t)
} }
default: default:
return nil, fmt.Errorf("unsupported arrow type %s for conversion", col.DataType().ID()) return fmt.Errorf("unsupported arrow type %s for conversion", col.DataType().ID())
} }
} }
} }
return nil
}
// UnmarshalArrow converts a byte representation of an arrow table to a Frame
func UnmarshalArrow(b []byte) (*Frame, error) {
fB := filebuffer.New(b)
fR, err := ipc.NewFileReader(fB)
if err != nil {
return nil, err
}
defer fR.Close()
schema := fR.Schema()
metaData := schema.Metadata()
frame := &Frame{}
frame.Name, _ = getMDKey("name", metaData) // No need to check ok, zero value ("") is returned
frame.RefID, _ = getMDKey("refId", metaData)
nullable, err := initializeFrameFields(schema, frame)
if err != nil {
return nil, err
}
err = populateFrameFields(fR, nullable, frame)
if err != nil {
return nil, err
}
return frame, nil return frame, nil
} }
...@@ -7,17 +7,26 @@ import ( ...@@ -7,17 +7,26 @@ import (
"time" "time"
) )
// Frame represents a columnar storage with optional labels.
type Frame struct {
Name string
Fields []*Field
RefID string
}
// Field represents a column of data with a specific type. // Field represents a column of data with a specific type.
type Field struct { type Field struct {
Name string Name string
Vector Vector Vector Vector
Labels Labels
} }
// Fields is a slice of Field pointers. // Fields is a slice of Field pointers.
type Fields []*Field type Fields []*Field
// NewField returns a new instance of Field. // NewField returns a new instance of Field.
func NewField(name string, values interface{}) *Field { func NewField(name string, labels Labels, values interface{}) *Field {
var vec Vector var vec Vector
switch v := values.(type) { switch v := values.(type) {
case []int64: case []int64:
...@@ -87,6 +96,7 @@ func NewField(name string, values interface{}) *Field { ...@@ -87,6 +96,7 @@ func NewField(name string, values interface{}) *Field {
return &Field{ return &Field{
Name: name, Name: name,
Vector: vec, Vector: vec,
Labels: labels,
} }
} }
...@@ -152,10 +162,10 @@ func (l Labels) String() string { ...@@ -152,10 +162,10 @@ func (l Labels) String() string {
// LabelsFromString parses the output of Labels.String() into // LabelsFromString parses the output of Labels.String() into
// a Labels object. It probably has some flaws. // a Labels object. It probably has some flaws.
func LabelsFromString(s string) (Labels, error) { func LabelsFromString(s string) (Labels, error) {
labels := make(map[string]string)
if s == "" { if s == "" {
return labels, nil return nil, nil
} }
labels := make(map[string]string)
for _, rawKV := range strings.Split(s, ", ") { for _, rawKV := range strings.Split(s, ", ") {
kV := strings.SplitN(rawKV, "=", 2) kV := strings.SplitN(rawKV, "=", 2)
...@@ -168,20 +178,10 @@ func LabelsFromString(s string) (Labels, error) { ...@@ -168,20 +178,10 @@ func LabelsFromString(s string) (Labels, error) {
return labels, nil return labels, nil
} }
// Frame represents a columnar storage with optional labels.
type Frame struct {
Name string
Labels Labels
Fields []*Field
RefID string
}
// New returns a new instance of a Frame. // New returns a new instance of a Frame.
func New(name string, labels Labels, fields ...*Field) *Frame { func New(name string, fields ...*Field) *Frame {
return &Frame{ return &Frame{
Name: name, Name: name,
Labels: labels,
Fields: fields, Fields: fields,
} }
} }
......
...@@ -11,6 +11,7 @@ type Vector interface { ...@@ -11,6 +11,7 @@ type Vector interface {
Append(i interface{}) Append(i interface{})
At(i int) interface{} At(i int) interface{}
Len() int Len() int
PrimitiveType() VectorPType
} }
func newVector(t interface{}, n int) (v Vector) { func newVector(t interface{}, n int) (v Vector) {
...@@ -44,3 +45,38 @@ func newVector(t interface{}, n int) (v Vector) { ...@@ -44,3 +45,38 @@ func newVector(t interface{}, n int) (v Vector) {
} }
return return
} }
// VectorPType indicates the go type underlying the Vector.
type VectorPType int
const (
// VectorPTypeInt64 indicates the underlying primitive is a []int64.
VectorPTypeInt64 VectorPType = iota
// VectorPTypeNullableInt64 indicates the underlying primitive is a []*int64.
VectorPTypeNullableInt64
// VectorPTypeUint64 indicates the underlying primitive is a []uint64.
VectorPTypeUint64
// VectorPTypeNullableUInt64 indicates the underlying primitive is a []*uint64.
VectorPTypeNullableUInt64
// VectorPTypeFloat64 indicates the underlying primitive is a []float64.
VectorPTypeFloat64
// VectorPTypeNullableFloat64 indicates the underlying primitive is a []*float64.
VectorPTypeNullableFloat64
// VectorPTypeString indicates the underlying primitive is a []string.
VectorPTypeString
// VectorPTypeNullableString indicates the underlying primitive is a []*string.
VectorPTypeNullableString
// VectorPTypeBool indicates the underlying primitive is a []bool.
VectorPTypeBool
// VectorPTypeNullableBool indicates the underlying primitive is a []*bool.
VectorPTypeNullableBool
// VectorPTypeTime indicates the underlying primitive is a []time.Time.
VectorPTypeTime
// VectorPTypeNullableTime indicates the underlying primitive is a []*time.Time.
VectorPTypeNullableTime
)
...@@ -22,3 +22,7 @@ func (v *boolVector) At(i int) interface{} { ...@@ -22,3 +22,7 @@ func (v *boolVector) At(i int) interface{} {
func (v *boolVector) Len() int { func (v *boolVector) Len() int {
return len(*v) return len(*v)
} }
func (v *boolVector) PrimitiveType() VectorPType {
return VectorPTypeBool
}
\ No newline at end of file
...@@ -22,3 +22,7 @@ func (v *nullableBoolVector) At(i int) interface{} { ...@@ -22,3 +22,7 @@ func (v *nullableBoolVector) At(i int) interface{} {
func (v *nullableBoolVector) Len() int { func (v *nullableBoolVector) Len() int {
return len(*v) return len(*v)
} }
func (v *nullableBoolVector) PrimitiveType() VectorPType {
return VectorPTypeNullableBool
}
...@@ -22,3 +22,7 @@ func (v *floatVector) At(i int) interface{} { ...@@ -22,3 +22,7 @@ func (v *floatVector) At(i int) interface{} {
func (v *floatVector) Len() int { func (v *floatVector) Len() int {
return len(*v) return len(*v)
} }
func (v *floatVector) PrimitiveType() VectorPType {
return VectorPTypeFloat64
}
...@@ -18,3 +18,7 @@ func (v *nullableFloatVector) Append(val interface{}) { ...@@ -18,3 +18,7 @@ func (v *nullableFloatVector) Append(val interface{}) {
func (v *nullableFloatVector) At(i int) interface{} { return (*v)[i] } func (v *nullableFloatVector) At(i int) interface{} { return (*v)[i] }
func (v *nullableFloatVector) Len() int { return len(*v) } func (v *nullableFloatVector) Len() int { return len(*v) }
func (v *nullableFloatVector) PrimitiveType() VectorPType {
return VectorPTypeNullableFloat64
}
...@@ -22,3 +22,7 @@ func (v *intVector) At(i int) interface{} { ...@@ -22,3 +22,7 @@ func (v *intVector) At(i int) interface{} {
func (v *intVector) Len() int { func (v *intVector) Len() int {
return len(*v) return len(*v)
} }
func (v *intVector) PrimitiveType() VectorPType {
return VectorPTypeInt64
}
...@@ -18,3 +18,7 @@ func (v *nullableIntVector) Append(val interface{}) { ...@@ -18,3 +18,7 @@ func (v *nullableIntVector) Append(val interface{}) {
func (v *nullableIntVector) At(i int) interface{} { return (*v)[i] } func (v *nullableIntVector) At(i int) interface{} { return (*v)[i] }
func (v *nullableIntVector) Len() int { return len(*v) } func (v *nullableIntVector) Len() int { return len(*v) }
func (v *nullableIntVector) PrimitiveType() VectorPType {
return VectorPTypeNullableInt64
}
...@@ -21,4 +21,8 @@ func (v *stringVector) At(i int) interface{} { ...@@ -21,4 +21,8 @@ func (v *stringVector) At(i int) interface{} {
func (v *stringVector) Len() int { func (v *stringVector) Len() int {
return len(*v) return len(*v)
}
func (v *stringVector) PrimitiveType() VectorPType {
return VectorPTypeString
} }
\ No newline at end of file
...@@ -22,3 +22,7 @@ func (v *nullableStringVector) At(i int) interface{} { ...@@ -22,3 +22,7 @@ func (v *nullableStringVector) At(i int) interface{} {
func (v *nullableStringVector) Len() int { func (v *nullableStringVector) Len() int {
return len(*v) return len(*v)
} }
func (v *nullableStringVector) PrimitiveType() VectorPType {
return VectorPTypeNullableString
}
\ No newline at end of file
...@@ -26,4 +26,7 @@ func (v *timeVector) At(i int) interface{} { ...@@ -26,4 +26,7 @@ func (v *timeVector) At(i int) interface{} {
func (v *timeVector) Len() int { func (v *timeVector) Len() int {
return len(*v) return len(*v)
} }
\ No newline at end of file func (v *timeVector) PrimitiveType() VectorPType {
return VectorPTypeTime
}
\ No newline at end of file
...@@ -26,3 +26,7 @@ func (v *nullableTimeVector) At(i int) interface{} { ...@@ -26,3 +26,7 @@ func (v *nullableTimeVector) At(i int) interface{} {
func (v *nullableTimeVector) Len() int { func (v *nullableTimeVector) Len() int {
return len(*v) return len(*v)
} }
func (v *nullableTimeVector) PrimitiveType() VectorPType {
return VectorPTypeNullableTime
}
...@@ -22,3 +22,7 @@ func (v *uintVector) At(i int) interface{} { ...@@ -22,3 +22,7 @@ func (v *uintVector) At(i int) interface{} {
func (v *uintVector) Len() int { func (v *uintVector) Len() int {
return len(*v) return len(*v)
} }
func (v *uintVector) PrimitiveType() VectorPType {
return VectorPTypeUint64
}
...@@ -18,3 +18,7 @@ func (v *nullableUintVector) Append(val interface{}) { ...@@ -18,3 +18,7 @@ func (v *nullableUintVector) Append(val interface{}) {
func (v *nullableUintVector) At(i int) interface{} { return (*v)[i] } func (v *nullableUintVector) At(i int) interface{} { return (*v)[i] }
func (v *nullableUintVector) Len() int { return len(*v) } func (v *nullableUintVector) Len() int { return len(*v) }
func (v *nullableUintVector) PrimitiveType() VectorPType {
return VectorPTypeNullableUInt64
}
...@@ -152,7 +152,7 @@ func (w *grafanaAPIWrapper) QueryDatasource(ctx context.Context, orgID int64, da ...@@ -152,7 +152,7 @@ func (w *grafanaAPIWrapper) QueryDatasource(ctx context.Context, orgID int64, da
// TODO Error property etc // TODO Error property etc
dfs := make([]*dataframe.Frame, len(rawRes.Dataframes)) dfs := make([]*dataframe.Frame, len(rawRes.Dataframes))
for dfIdx, b := range rawRes.Dataframes { for dfIdx, b := range rawRes.Dataframes {
dfs[dfIdx], err = dataframe.UnMarshalArrow(b) dfs[dfIdx], err = dataframe.UnmarshalArrow(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -131,7 +131,7 @@ github.com/gosimple/slug ...@@ -131,7 +131,7 @@ github.com/gosimple/slug
# github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 # github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-model/go/datasource github.com/grafana/grafana-plugin-model/go/datasource
github.com/grafana/grafana-plugin-model/go/renderer github.com/grafana/grafana-plugin-model/go/renderer
# github.com/grafana/grafana-plugin-sdk-go v0.2.0 # github.com/grafana/grafana-plugin-sdk-go v0.4.0
github.com/grafana/grafana-plugin-sdk-go/common github.com/grafana/grafana-plugin-sdk-go/common
github.com/grafana/grafana-plugin-sdk-go/dataframe github.com/grafana/grafana-plugin-sdk-go/dataframe
github.com/grafana/grafana-plugin-sdk-go/datasource github.com/grafana/grafana-plugin-sdk-go/datasource
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment