From 299122f96559376b91c3e8a0384a4d15253ec441 Mon Sep 17 00:00:00 2001 From: yubo Date: Wed, 18 Nov 2020 23:46:51 +0800 Subject: [PATCH] add nid to transfer query data/index (#411) --- src/common/dataobj/query_item.go | 6 ++++++ src/modules/transfer/backend/m3db/convert.go | 5 +++++ src/modules/transfer/backend/m3db/m3db.go | 8 ++++++-- src/modules/transfer/backend/m3db/query.go | 12 ++++++------ .../transfer/backend/m3db/test/04-query-metrics.sh | 2 +- 5 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/common/dataobj/query_item.go b/src/common/dataobj/query_item.go index 415ae92f..5347cbdd 100644 --- a/src/common/dataobj/query_item.go +++ b/src/common/dataobj/query_item.go @@ -54,6 +54,7 @@ func (resp *TsdbQueryResponse) Key() string { type EndpointsRecv struct { Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` } type MetricResp struct { @@ -62,11 +63,13 @@ type MetricResp struct { type EndpointMetricRecv struct { Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` Metrics []string `json:"metrics"` } type IndexTagkvResp struct { Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` Metric string `json:"metric"` Tagkv []*TagPair `json:"tagkv"` } @@ -78,6 +81,7 @@ type TagPair struct { type CludeRecv struct { Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` Metric string `json:"metric"` Include []*TagPair `json:"include"` Exclude []*TagPair `json:"exclude"` @@ -94,12 +98,14 @@ type XcludeResp struct { type IndexByFullTagsRecv struct { Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` Metric string `json:"metric"` Tagkv []TagPair `json:"tagkv"` } type IndexByFullTagsResp struct { Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` Metric string `json:"metric"` Tags []string `json:"tags"` Step int `json:"step"` diff --git a/src/modules/transfer/backend/m3db/convert.go b/src/modules/transfer/backend/m3db/convert.go index 65224ebd..80090b2a 100644 --- a/src/modules/transfer/backend/m3db/convert.go +++ b/src/modules/transfer/backend/m3db/convert.go @@ -75,6 +75,11 @@ func tagsIndexTagkvResp(tags *consolidators.CompleteTagsResult) *dataobj.IndexTa for i, v := range tag.Values { ret.Endpoints[i] = string(v) } + case NID_NAME: + ret.Nids = make([]string, len(tag.Values)) + for i, v := range tag.Values { + ret.Nids[i] = string(v) + } default: kv := &dataobj.TagPair{Key: string(tag.Name)} kv.Values = make([]string, len(tag.Values)) diff --git a/src/modules/transfer/backend/m3db/m3db.go b/src/modules/transfer/backend/m3db/m3db.go index b26b8a81..9734d52f 100644 --- a/src/modules/transfer/backend/m3db/m3db.go +++ b/src/modules/transfer/backend/m3db/m3db.go @@ -292,6 +292,7 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde } ret.Endpoints = input.Endpoints + ret.Nids = input.Nids tags := map[string]struct{}{} for iter.Next() { _, _, tagIter := iter.Current() @@ -446,7 +447,7 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons tagsIter := iter.Tags() tags := map[string]string{} - var metric, endpoint string + var metric, endpoint, nid string for tagsIter.Next() { tag := tagsIter.Current() @@ -455,8 +456,10 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons switch k { case METRIC_NAME: metric = v - case ENDPOINT_NAME, NID_NAME: + case ENDPOINT_NAME: endpoint = v + case NID_NAME: + nid = v default: tags[k] = v } @@ -467,6 +470,7 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons Start: iter.Start().Unix(), End: iter.End().Unix(), Endpoint: endpoint, + Nid: nid, Counter: counter, Values: values, }, nil diff --git a/src/modules/transfer/backend/m3db/query.go b/src/modules/transfer/backend/m3db/query.go index 0392d7a3..4dda3f0c 100644 --- a/src/modules/transfer/backend/m3db/query.go +++ b/src/modules/transfer/backend/m3db/query.go @@ -140,7 +140,7 @@ func metricTagsQuery(tags []string) idx.Query { func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) { nameByte := []byte(METRIC_NAME) return index.Query{idx.NewConjunctionQuery( - endpointsQuery(nil, input.Endpoints), + endpointsQuery(input.Nids, input.Endpoints), idx.NewFieldQuery(nameByte), )}, index.AggregationOptions{ @@ -158,7 +158,7 @@ func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Q // QueryTagPairs // (endpoint[0] || endpoint[1]...) && (metrics[0] || metrics[1] ... ) func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) { - q1 := endpointsQuery(nil, input.Endpoints) + q1 := endpointsQuery(input.Nids, input.Endpoints) q2 := metricsQuery(input.Metrics) return index.Query{idx.NewConjunctionQuery(q1, q2)}, @@ -179,8 +179,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index. query := index.Query{} q := []idx.Query{} - if len(input.Endpoints) > 0 { - q = append(q, endpointsQuery(nil, input.Endpoints)) + if len(input.Endpoints) > 0 || len(input.Nids) > 0 { + q = append(q, endpointsQuery(input.Nids, input.Endpoints)) } if input.Metric != "" { q = append(q, metricQuery(input.Metric)) @@ -214,8 +214,8 @@ func (cfg M3dbSection) queryIndexByFullTagsOptions(input dataobj.IndexByFullTags query := index.Query{} q := []idx.Query{} - if len(input.Endpoints) > 0 { - q = append(q, endpointsQuery(nil, input.Endpoints)) + if len(input.Endpoints) > 0 || len(input.Nids) > 0 { + q = append(q, endpointsQuery(input.Nids, input.Endpoints)) } if input.Metric != "" { q = append(q, metricQuery(input.Metric)) diff --git a/src/modules/transfer/backend/m3db/test/04-query-metrics.sh b/src/modules/transfer/backend/m3db/test/04-query-metrics.sh index 16e4656f..f3e34c5e 100755 --- a/src/modules/transfer/backend/m3db/test/04-query-metrics.sh +++ b/src/modules/transfer/backend/m3db/test/04-query-metrics.sh @@ -7,7 +7,7 @@ curl -X POST \ http://localhost:8008/api/index/metrics \ -d '{ - "endpoints": ["10.178.24.116"] + "endpoints": ["10.178.24.120", "10.178.25.123"] }'