|
|
package trans
|
|
|
|
|
|
import (
|
|
|
"time"
|
|
|
|
|
|
"github.com/didi/nightingale/v5/config"
|
|
|
"github.com/didi/nightingale/v5/judge"
|
|
|
"github.com/didi/nightingale/v5/vos"
|
|
|
"github.com/toolkits/pkg/concurrent/semaphore"
|
|
|
"github.com/toolkits/pkg/container/list"
|
|
|
"github.com/toolkits/pkg/logger"
|
|
|
)
|
|
|
|
|
|
// 多个judge实例,如果对端地址等于本地地址走内存
|
|
|
func send2JudgeTask(q *list.SafeListLimited, addr string) {
|
|
|
if config.Config.Heartbeat.LocalAddr == addr {
|
|
|
send2LocalJudge(q)
|
|
|
} else {
|
|
|
send2RemoteJudge(q, addr)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func send2LocalJudge(q *list.SafeListLimited) {
|
|
|
for {
|
|
|
items := q.PopBackBy(config.Config.Judge.ReadBatch)
|
|
|
|
|
|
count := len(items)
|
|
|
if count == 0 {
|
|
|
time.Sleep(time.Millisecond * 100)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
points := make([]*vos.MetricPoint, count)
|
|
|
for i := 0; i < count; i++ {
|
|
|
points[i] = items[i].(*vos.MetricPoint)
|
|
|
}
|
|
|
|
|
|
judge.Send(points)
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
func send2RemoteJudge(q *list.SafeListLimited, addr string) {
|
|
|
sema := semaphore.NewSemaphore(config.Config.Judge.WriterNum)
|
|
|
|
|
|
for {
|
|
|
items := q.PopBackBy(config.Config.Judge.ReadBatch)
|
|
|
count := len(items)
|
|
|
if count == 0 {
|
|
|
time.Sleep(time.Millisecond * 50)
|
|
|
if !queues.Exists(addr) {
|
|
|
// 对端实例已挂,我已经没有存在的必要了
|
|
|
logger.Infof("server instance %s dead, queue reader exiting...", addr)
|
|
|
return
|
|
|
}
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
judgeItems := make([]*vos.MetricPoint, count)
|
|
|
for i := 0; i < count; i++ {
|
|
|
judgeItems[i] = items[i].(*vos.MetricPoint)
|
|
|
}
|
|
|
|
|
|
sema.Acquire()
|
|
|
go func(addr string, judgeItems []*vos.MetricPoint, count int) {
|
|
|
defer sema.Release()
|
|
|
|
|
|
var res string
|
|
|
var err error
|
|
|
sendOk := false
|
|
|
for i := 0; i < 15; i++ {
|
|
|
err = connPools.Call(addr, "Server.PushToJudge", judgeItems, &res)
|
|
|
if err == nil {
|
|
|
sendOk = true
|
|
|
break
|
|
|
}
|
|
|
time.Sleep(time.Second)
|
|
|
}
|
|
|
|
|
|
if !sendOk {
|
|
|
for _, item := range judgeItems {
|
|
|
logger.Errorf("send %v to judge %s fail: %v", item, addr, err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}(addr, judgeItems, count)
|
|
|
}
|
|
|
}
|