Commit 48aef0c5 by Mitsuhiro Tanda

fix concurrent map writes

parent f4b7b003
...@@ -86,9 +86,10 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc ...@@ -86,9 +86,10 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc
} }
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{ results := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult), Results: make(map[string]*tsdb.QueryResult),
} }
resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))
eg, ectx := errgroup.WithContext(ctx) eg, ectx := errgroup.WithContext(ctx)
...@@ -102,10 +103,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo ...@@ -102,10 +103,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
RefId := queryContext.Queries[i].RefId RefId := queryContext.Queries[i].RefId
query, err := parseQuery(queryContext.Queries[i].Model) query, err := parseQuery(queryContext.Queries[i].Model)
if err != nil { if err != nil {
result.Results[RefId] = &tsdb.QueryResult{ results.Results[RefId] = &tsdb.QueryResult{
Error: err, Error: err,
} }
return result, nil return results, nil
} }
query.RefId = RefId query.RefId = RefId
...@@ -118,10 +119,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo ...@@ -118,10 +119,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
} }
if query.Id == "" && query.Expression != "" { if query.Id == "" && query.Expression != "" {
result.Results[query.RefId] = &tsdb.QueryResult{ results.Results[query.RefId] = &tsdb.QueryResult{
Error: fmt.Errorf("Invalid query: id should be set if using expression"), Error: fmt.Errorf("Invalid query: id should be set if using expression"),
} }
return result, nil return results, nil
} }
eg.Go(func() error { eg.Go(func() error {
...@@ -130,12 +131,13 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo ...@@ -130,12 +131,13 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
return err return err
} }
if err != nil { if err != nil {
result.Results[query.RefId] = &tsdb.QueryResult{ resultChan <- &tsdb.QueryResult{
RefId: query.RefId,
Error: err, Error: err,
} }
return nil return nil
} }
result.Results[queryRes.RefId] = queryRes resultChan <- queryRes
return nil return nil
}) })
} }
...@@ -149,10 +151,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo ...@@ -149,10 +151,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
return err return err
} }
for _, queryRes := range queryResponses { for _, queryRes := range queryResponses {
result.Results[queryRes.RefId] = queryRes
if err != nil { if err != nil {
result.Results[queryRes.RefId].Error = err queryRes.Error = err
} }
resultChan <- queryRes
} }
return nil return nil
}) })
...@@ -162,8 +164,12 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo ...@@ -162,8 +164,12 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
if err := eg.Wait(); err != nil { if err := eg.Wait(); err != nil {
return nil, err return nil, err
} }
close(resultChan)
for result := range resultChan {
results.Results[result.RefId] = result
}
return result, nil return results, nil
} }
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) { func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment