|
|
|
@ -3,6 +3,7 @@ package m3db
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/didi/nightingale/src/common/dataobj"
|
|
|
|
@ -89,23 +90,31 @@ func (p *Client) Push2Queue(items []*dataobj.MetricValue) {
|
|
|
|
|
logger.Errorf("unable to get m3db session: %s", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
errCnt := 0
|
|
|
|
|
var errCnt int32
|
|
|
|
|
var (
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
)
|
|
|
|
|
for _, item := range items {
|
|
|
|
|
if err := session.WriteTagged(
|
|
|
|
|
p.namespaceID,
|
|
|
|
|
mvID(item),
|
|
|
|
|
ident.NewTagsIterator(mvTags(item)),
|
|
|
|
|
time.Unix(item.Timestamp, 0),
|
|
|
|
|
item.Value,
|
|
|
|
|
xtime.Second,
|
|
|
|
|
nil,
|
|
|
|
|
); err != nil {
|
|
|
|
|
logger.Errorf("unable to writeTagged: %s", err)
|
|
|
|
|
errCnt++
|
|
|
|
|
}
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func(dm *dataobj.MetricValue) {
|
|
|
|
|
err := session.WriteTagged(
|
|
|
|
|
p.namespaceID,
|
|
|
|
|
mvID(dm),
|
|
|
|
|
ident.NewTagsIterator(mvTags(dm)),
|
|
|
|
|
time.Unix(dm.Timestamp, 0),
|
|
|
|
|
dm.Value,
|
|
|
|
|
xtime.Second,
|
|
|
|
|
nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Errorf("unable to writeTagged: %s", err)
|
|
|
|
|
atomic.AddInt32(&errCnt, 1)
|
|
|
|
|
}
|
|
|
|
|
wg.Done()
|
|
|
|
|
}(item)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
stats.Counter.Set("m3db.queue.err", errCnt)
|
|
|
|
|
wg.Wait()
|
|
|
|
|
stats.Counter.Set("m3db.queue.err", int(errCnt))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryData: || (|| endpoints...) (&& tags...)
|
|
|
|
|