diff --git a/etc/transfer.yml b/etc/transfer.yml index 869f8089..e7a467de 100644 --- a/etc/transfer.yml +++ b/etc/transfer.yml @@ -4,6 +4,13 @@ backend: # callTimeout: 3000 cluster: tsdb01: 127.0.0.1:5821 + influxdb: + enabled: false + username: "influx" + password: "admin123" + precision: "s" + database: "n9e" + address: "http://127.0.0.1:8086" logger: dir: logs/transfer diff --git a/src/dataobj/influxdb.go b/src/dataobj/influxdb.go new file mode 100644 index 00000000..d329c890 --- /dev/null +++ b/src/dataobj/influxdb.go @@ -0,0 +1,8 @@ +package dataobj + +type InfluxdbItem struct { + Measurement string `json:"metric"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"fields"` + Timestamp int64 `json:"timestamp"` +} diff --git a/src/modules/transfer/backend/init.go b/src/modules/transfer/backend/init.go index f9737476..f8083b40 100644 --- a/src/modules/transfer/backend/init.go +++ b/src/modules/transfer/backend/init.go @@ -11,6 +11,19 @@ import ( "github.com/didi/nightingale/src/toolkits/stats" ) +type InfluxdbSection struct { + Enabled bool `yaml:"enabled"` + Batch int `yaml:"batch"` + MaxRetry int `yaml:"maxRetry"` + WorkerNum int `yaml:"workerNum"` + Timeout int `yaml:"timeout"` + Address string `yaml:"address"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Precision string `yaml:"precision"` +} + type BackendSection struct { Enabled bool `yaml:"enabled"` Batch int `yaml:"batch"` @@ -26,6 +39,7 @@ type BackendSection struct { Replicas int `yaml:"replicas"` Cluster map[string]string `yaml:"cluster"` ClusterList map[string]*ClusterNode `json:"clusterList"` + Influxdb InfluxdbSection `yaml:"influxdb"` } const DefaultSendQueueMaxSize = 102400 //10.24w @@ -40,8 +54,9 @@ var ( TsdbNodeRing *ConsistentHashRing // 发送缓存队列 node -> queue_of_data - TsdbQueues = make(map[string]*list.SafeListLimited) - JudgeQueues = cache.SafeJudgeQueue{} + TsdbQueues = make(map[string]*list.SafeListLimited) + JudgeQueues = cache.SafeJudgeQueue{} + InfluxdbQueue *list.SafeListLimited // 连接池 node_address -> connection_pool TsdbConnPools *pools.ConnPools @@ -96,6 +111,10 @@ func initSendQueues() { for _, judge := range judges { JudgeQueues.Set(judge, list.NewSafeListLimited(DefaultSendQueueMaxSize)) } + + if Config.Influxdb.Enabled { + InfluxdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize) + } } func GetJudges() []string { diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go index 89aa9a08..888fc60f 100644 --- a/src/modules/transfer/backend/sender.go +++ b/src/modules/transfer/backend/sender.go @@ -8,6 +8,7 @@ import ( "github.com/didi/nightingale/src/modules/transfer/cache" "github.com/didi/nightingale/src/toolkits/stats" "github.com/didi/nightingale/src/toolkits/str" + "github.com/influxdata/influxdb/client/v2" "github.com/toolkits/pkg/concurrent/semaphore" "github.com/toolkits/pkg/container/list" @@ -36,6 +37,11 @@ func startSendTasks() { judgeConcurrent = 1 } + influxdbConcurrent := Config.Influxdb.WorkerNum + if influxdbConcurrent < 1 { + influxdbConcurrent = 1 + } + if Config.Enabled { for node, item := range Config.ClusterList { for _, addr := range item.Addrs { @@ -51,6 +57,11 @@ func startSendTasks() { go Send2JudgeTask(queue, instance, judgeConcurrent) } } + + if Config.Influxdb.Enabled { + go send2InfluxdbTask(influxdbConcurrent) + + } } func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) { @@ -265,3 +276,134 @@ func TagMatch(straTags []model.Tag, tag map[string]string) bool { } return true } + +type InfluxClient struct { + Client client.Client + Database string + Precision string +} + +func NewInfluxdbClient() (*InfluxClient, error) { + c, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: Config.Influxdb.Address, + Username: Config.Influxdb.Username, + Password: Config.Influxdb.Password, + Timeout: time.Millisecond * time.Duration(Config.Influxdb.Timeout), + }) + + if err != nil { + return nil, err + } + + return &InfluxClient{ + Client: c, + Database: Config.Influxdb.Database, + Precision: Config.Influxdb.Precision, + }, nil +} + +func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error { + bp, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: c.Database, + Precision: c.Precision, + }) + if err != nil { + logger.Errorf("create batch points error: ", err) + return err + } + + for _, item := range items { + pt, err := client.NewPoint(item.Measurement, item.Tags, item.Fields, time.Unix(item.Timestamp, 0)) + if err != nil { + logger.Errorf("create new points error: ", err) + continue + } + bp.AddPoint(pt) + } + + return c.Client.Write(bp) +} + +// 将原始数据插入到influxdb缓存队列 +func Push2InfluxdbSendQueue(items []*dataobj.MetricValue) { + errCnt := 0 + for _, item := range items { + influxdbItem := convert2InfluxdbItem(item) + isSuccess := InfluxdbQueue.PushFront(influxdbItem) + + if !isSuccess { + errCnt += 1 + } + } + stats.Counter.Set("influxdb.queue.err", errCnt) +} + +func convert2InfluxdbItem(d *dataobj.MetricValue) *dataobj.InfluxdbItem { + t := dataobj.InfluxdbItem{Tags: make(map[string]string), Fields: make(map[string]interface{})} + + for k, v := range d.TagsMap { + t.Tags[k] = v + } + t.Tags["endpoint"] = d.Endpoint + t.Measurement = d.Metric + t.Fields["value"] = d.Value + t.Timestamp = d.Timestamp + + return &t +} + +func send2InfluxdbTask(concurrent int) { + batch := Config.Influxdb.Batch // 一次发送,最多batch条数据 + retry := Config.Influxdb.MaxRetry + addr := Config.Influxdb.Address + sema := semaphore.NewSemaphore(concurrent) + + var err error + c, err := NewInfluxdbClient() + defer c.Client.Close() + + if err != nil { + logger.Errorf("init influxdb client fail: %v", err) + return + } + + for { + items := InfluxdbQueue.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + + influxdbItems := make([]*dataobj.InfluxdbItem, count) + for i := 0; i < count; i++ { + influxdbItems[i] = items[i].(*dataobj.InfluxdbItem) + stats.Counter.Set("points.out.influxdb", 1) + logger.Debug("send to influxdb: ", influxdbItems[i]) + } + + // 同步Call + 有限并发 进行发送 + sema.Acquire() + go func(addr string, influxdbItems []*dataobj.InfluxdbItem, count int) { + defer sema.Release() + sendOk := false + + for i := 0; i < retry; i++ { + err = c.Send(influxdbItems) + if err == nil { + sendOk = true + break + } + logger.Warningf("send influxdb fail: %v", err) + time.Sleep(time.Millisecond * 10) + } + + if !sendOk { + stats.Counter.Set("points.out.influxdb.err", count) + logger.Errorf("send %v to influxdb %s fail: %v", influxdbItems, addr, err) + } else { + logger.Debugf("send to influxdb %s ok", addr) + } + }(addr, influxdbItems, count) + } +} diff --git a/src/modules/transfer/config/config.go b/src/modules/transfer/config/config.go index f1593e56..e1207655 100644 --- a/src/modules/transfer/config/config.go +++ b/src/modules/transfer/config/config.go @@ -93,6 +93,15 @@ func Parse(conf string) error { "hbsMod": "monapi", }) + viper.SetDefault("backend.influxdb", map[string]interface{}{ + "enabled": true, + "batch": 200, //每次拉取文件的个数 + "maxRetry": 3, //重试次数 + "workerNum": 32, + "maxConns": 2000, //查询和推送数据的并发个数 + "timeout": 3000, //访问超时时间,单位毫秒 + }) + err = viper.Unmarshal(&Config) if err != nil { return fmt.Errorf("cannot read yml[%s]: %v", conf, err) diff --git a/src/modules/transfer/rpc/push.go b/src/modules/transfer/rpc/push.go index e54369ad..fc13ff9d 100644 --- a/src/modules/transfer/rpc/push.go +++ b/src/modules/transfer/rpc/push.go @@ -44,6 +44,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp backend.Push2JudgeSendQueue(items) } + if backend.Config.Influxdb.Enabled { + backend.Push2InfluxdbSendQueue(items) + } + if reply.Invalid == 0 { reply.Msg = "ok" }