|
|
|
@ -107,6 +107,7 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
|
|
|
|
|
|
|
|
|
|
// 将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定
|
|
|
|
|
func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
|
|
|
|
|
errCnt := 0
|
|
|
|
|
for _, item := range items {
|
|
|
|
|
tsdbItem := convert2TsdbItem(item)
|
|
|
|
|
stats.Counter.Set("tsdb.queue.push", 1)
|
|
|
|
@ -118,19 +119,18 @@ func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cnode := Config.ClusterList[node]
|
|
|
|
|
errCnt := 0
|
|
|
|
|
for _, addr := range cnode.Addrs {
|
|
|
|
|
Q := TsdbQueues[node+addr]
|
|
|
|
|
if !Q.PushFront(tsdbItem) {
|
|
|
|
|
errCnt += 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// statistics
|
|
|
|
|
if errCnt > 0 {
|
|
|
|
|
stats.Counter.Set("tsdb.queue.err", errCnt)
|
|
|
|
|
logger.Error("Push2TsdbSendQueue err num: ", errCnt)
|
|
|
|
|
}
|
|
|
|
|
// statistics
|
|
|
|
|
if errCnt > 0 {
|
|
|
|
|
stats.Counter.Set("tsdb.queue.err", errCnt)
|
|
|
|
|
logger.Error("Push2TsdbSendQueue err num: ", errCnt)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -172,7 +172,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
|
|
|
|
|
|
|
|
|
|
if !sendOk {
|
|
|
|
|
stats.Counter.Set("points.out.judge.err", 1)
|
|
|
|
|
logger.Errorf("send judge %s fail: %v", addr, err)
|
|
|
|
|
logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}(addr, judgeItems, count)
|
|
|
|
@ -180,6 +180,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
|
|
|
|
|
errCnt := 0
|
|
|
|
|
for _, item := range items {
|
|
|
|
|
key := str.PK(item.Metric, item.Endpoint)
|
|
|
|
|
stras := cache.StraMap.GetByKey(key)
|
|
|
|
@ -203,11 +204,13 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
|
|
|
|
|
|
|
|
|
|
q, exists := JudgeQueues.Get(stra.JudgeInstance)
|
|
|
|
|
if exists {
|
|
|
|
|
q.PushFront(judgeItem)
|
|
|
|
|
if !q.PushFront(judgeItem) {
|
|
|
|
|
errCnt += 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
stats.Counter.Set("judge.queue.err", errCnt)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 打到Tsdb的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
|
|
|
|
|