forked from pneymrl2f/nightingale
refactor transfer datasources for ui/judge, implement tsdb(+index) an… (#246)
* refactor transfer datasources for ui/judge, implement tsdb(+index) and influxdb * fix error string; fix import identidy ; refactor pushendpoint init * fix influx queryData Co-authored-by: wangzhiguo04 <wangzhiguo04@meicai.cn>master
parent
b6169ac706
commit
520dda70c0
@ -0,0 +1,84 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
)
|
||||
|
||||
// send
|
||||
const (
|
||||
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
|
||||
DefaultSendQueueMaxSize = 102400 //10.24w
|
||||
MaxSendRetry = 10
|
||||
)
|
||||
|
||||
var (
|
||||
MinStep int //最小上报周期,单位sec
|
||||
)
|
||||
|
||||
type DataSource interface {
|
||||
PushEndpoint
|
||||
|
||||
// query data for judge
|
||||
QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
|
||||
// query data for ui
|
||||
QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse
|
||||
|
||||
// query metrics & tags
|
||||
QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp
|
||||
QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp
|
||||
QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp
|
||||
QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp
|
||||
|
||||
// tsdb instance
|
||||
GetInstance(metric, endpoint string, tags map[string]string) []string
|
||||
}
|
||||
|
||||
type PushEndpoint interface {
|
||||
// push data
|
||||
Push2Queue(items []*dataobj.MetricValue)
|
||||
}
|
||||
|
||||
var registryDataSources map[string]DataSource
|
||||
var registryPushEndpoints map[string]PushEndpoint
|
||||
|
||||
func init() {
|
||||
registryDataSources = make(map[string]DataSource)
|
||||
registryPushEndpoints = make(map[string]PushEndpoint)
|
||||
}
|
||||
|
||||
// get backend datasource
|
||||
// (pluginId == "" for default datasource)
|
||||
func GetDataSourceFor(pluginId string) (DataSource, error) {
|
||||
if pluginId == "" {
|
||||
pluginId = defaultDataSource
|
||||
}
|
||||
if source, exists := registryDataSources[pluginId]; exists {
|
||||
return source, nil
|
||||
}
|
||||
return nil, fmt.Errorf("could not find datasource for plugin: %s", pluginId)
|
||||
}
|
||||
|
||||
// get all push endpoints
|
||||
func GetPushEndpoints() ([]PushEndpoint, error) {
|
||||
if len(registryPushEndpoints) > 0 {
|
||||
items := make([]PushEndpoint, 0, len(registryPushEndpoints))
|
||||
for _, value := range registryPushEndpoints {
|
||||
items = append(items, value)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
return nil, fmt.Errorf("could not find any pushendpoint")
|
||||
}
|
||||
|
||||
func RegisterDataSource(pluginId string, datasource DataSource) {
|
||||
|
||||
registryDataSources[pluginId] = datasource
|
||||
registryPushEndpoints[pluginId] = datasource
|
||||
}
|
||||
|
||||
func RegisterPushEndpoint(pluginId string, push PushEndpoint) {
|
||||
registryPushEndpoints[pluginId] = push
|
||||
}
|
@ -0,0 +1,187 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type InfluxdbSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Name string `yaml:"name"`
|
||||
Batch int `yaml:"batch"`
|
||||
MaxRetry int `yaml:"maxRetry"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
Timeout int `yaml:"timeout"`
|
||||
Address string `yaml:"address"`
|
||||
Database string `yaml:"database"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
Precision string `yaml:"precision"`
|
||||
}
|
||||
|
||||
type InfluxdbDataSource struct {
|
||||
// config
|
||||
Section InfluxdbSection
|
||||
SendQueueMaxSize int
|
||||
SendTaskSleepInterval time.Duration
|
||||
|
||||
// 发送缓存队列 node -> queue_of_data
|
||||
InfluxdbQueue *list.SafeListLimited
|
||||
}
|
||||
|
||||
func (influxdb *InfluxdbDataSource) Init() {
|
||||
|
||||
// init queue
|
||||
if influxdb.Section.Enabled {
|
||||
influxdb.InfluxdbQueue = list.NewSafeListLimited(influxdb.SendQueueMaxSize)
|
||||
}
|
||||
|
||||
// init task
|
||||
influxdbConcurrent := influxdb.Section.WorkerNum
|
||||
if influxdbConcurrent < 1 {
|
||||
influxdbConcurrent = 1
|
||||
}
|
||||
go influxdb.send2InfluxdbTask(influxdbConcurrent)
|
||||
}
|
||||
|
||||
// 将原始数据插入到influxdb缓存队列
|
||||
func (influxdb *InfluxdbDataSource) Push2Queue(items []*dataobj.MetricValue) {
|
||||
errCnt := 0
|
||||
for _, item := range items {
|
||||
influxdbItem := influxdb.convert2InfluxdbItem(item)
|
||||
isSuccess := influxdb.InfluxdbQueue.PushFront(influxdbItem)
|
||||
|
||||
if !isSuccess {
|
||||
errCnt += 1
|
||||
}
|
||||
}
|
||||
stats.Counter.Set("influxdb.queue.err", errCnt)
|
||||
}
|
||||
|
||||
func (influxdb *InfluxdbDataSource) send2InfluxdbTask(concurrent int) {
|
||||
batch := influxdb.Section.Batch // 一次发送,最多batch条数据
|
||||
retry := influxdb.Section.MaxRetry
|
||||
addr := influxdb.Section.Address
|
||||
sema := semaphore.NewSemaphore(concurrent)
|
||||
|
||||
var err error
|
||||
c, err := NewInfluxdbClient(influxdb.Section)
|
||||
defer c.Client.Close()
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("init influxdb client fail: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
items := influxdb.InfluxdbQueue.PopBackBy(batch)
|
||||
count := len(items)
|
||||
if count == 0 {
|
||||
time.Sleep(influxdb.SendTaskSleepInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
influxdbItems := make([]*dataobj.InfluxdbItem, count)
|
||||
for i := 0; i < count; i++ {
|
||||
influxdbItems[i] = items[i].(*dataobj.InfluxdbItem)
|
||||
stats.Counter.Set("points.out.influxdb", 1)
|
||||
logger.Debug("send to influxdb: ", influxdbItems[i])
|
||||
}
|
||||
|
||||
// 同步Call + 有限并发 进行发送
|
||||
sema.Acquire()
|
||||
go func(addr string, influxdbItems []*dataobj.InfluxdbItem, count int) {
|
||||
defer sema.Release()
|
||||
sendOk := false
|
||||
|
||||
for i := 0; i < retry; i++ {
|
||||
err = c.Send(influxdbItems)
|
||||
if err == nil {
|
||||
sendOk = true
|
||||
break
|
||||
}
|
||||
logger.Warningf("send influxdb fail: %v", err)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
if !sendOk {
|
||||
stats.Counter.Set("points.out.influxdb.err", count)
|
||||
logger.Errorf("send %v to influxdb %s fail: %v", influxdbItems, addr, err)
|
||||
} else {
|
||||
logger.Debugf("send to influxdb %s ok", addr)
|
||||
}
|
||||
}(addr, influxdbItems, count)
|
||||
}
|
||||
}
|
||||
|
||||
func (influxdb *InfluxdbDataSource) convert2InfluxdbItem(d *dataobj.MetricValue) *dataobj.InfluxdbItem {
|
||||
t := dataobj.InfluxdbItem{Tags: make(map[string]string), Fields: make(map[string]interface{})}
|
||||
|
||||
for k, v := range d.TagsMap {
|
||||
t.Tags[k] = v
|
||||
}
|
||||
t.Tags["endpoint"] = d.Endpoint
|
||||
t.Measurement = d.Metric
|
||||
t.Fields["value"] = d.Value
|
||||
t.Timestamp = d.Timestamp
|
||||
|
||||
return &t
|
||||
}
|
||||
|
||||
type InfluxClient struct {
|
||||
Client client.Client
|
||||
Database string
|
||||
Precision string
|
||||
}
|
||||
|
||||
func NewInfluxdbClient(section InfluxdbSection) (*InfluxClient, error) {
|
||||
c, err := client.NewHTTPClient(client.HTTPConfig{
|
||||
Addr: section.Address,
|
||||
Username: section.Username,
|
||||
Password: section.Password,
|
||||
Timeout: time.Millisecond * time.Duration(section.Timeout),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &InfluxClient{
|
||||
Client: c,
|
||||
Database: section.Database,
|
||||
Precision: section.Precision,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error {
|
||||
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
||||
Database: c.Database,
|
||||
Precision: c.Precision,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("create batch points error: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
pt, err := client.NewPoint(item.Measurement, item.Tags, item.Fields, time.Unix(item.Timestamp, 0))
|
||||
if err != nil {
|
||||
logger.Error("create new points error: ", err)
|
||||
continue
|
||||
}
|
||||
bp.AddPoint(pt)
|
||||
}
|
||||
|
||||
return c.Client.Write(bp)
|
||||
}
|
||||
|
||||
func (influxdb *InfluxdbDataSource) GetInstance(metric, endpoint string, tags map[string]string) []string {
|
||||
// influxdb 单实例 或 influx-proxy
|
||||
return []string{influxdb.Section.Address}
|
||||
}
|
@ -0,0 +1,169 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type ShowSeries struct {
|
||||
Database string
|
||||
Metric string
|
||||
Endpoints []string
|
||||
Include []*dataobj.TagPair
|
||||
Exclude []*dataobj.TagPair
|
||||
Start int64
|
||||
End int64
|
||||
|
||||
RawQuery string
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderShow() {
|
||||
query.RawQuery = fmt.Sprintf("SHOW SERIES ON \"%s\" FROM \"%s\"", query.Database,
|
||||
query.Metric)
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderEndpoints() {
|
||||
if len(query.Endpoints) > 0 {
|
||||
// endpoints
|
||||
endpointPart := "("
|
||||
for _, endpoint := range query.Endpoints {
|
||||
endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint)
|
||||
}
|
||||
endpointPart = endpointPart[:len(endpointPart)-len("OR")]
|
||||
endpointPart += ")"
|
||||
query.RawQuery = fmt.Sprintf("\"%s\" WHERE \"%s\"", query.RawQuery, endpointPart)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderInclude() {
|
||||
if len(query.Include) > 0 {
|
||||
// include
|
||||
includePart := "("
|
||||
for _, include := range query.Include {
|
||||
for _, value := range include.Values {
|
||||
includePart += fmt.Sprintf(" \"%s\"='%s' OR", include.Key, value)
|
||||
}
|
||||
}
|
||||
includePart = includePart[:len(includePart)-len("OR")]
|
||||
includePart += ")"
|
||||
if !strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery += " WHERE"
|
||||
}
|
||||
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderExclude() {
|
||||
if len(query.Exclude) > 0 {
|
||||
// exclude
|
||||
excludePart := "("
|
||||
for _, exclude := range query.Exclude {
|
||||
for _, value := range exclude.Values {
|
||||
excludePart += fmt.Sprintf(" \"%s\"='%s' OR", exclude.Key, value)
|
||||
}
|
||||
}
|
||||
excludePart = excludePart[:len(excludePart)-len("OR")]
|
||||
excludePart += ")"
|
||||
if !strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery += " WHERE"
|
||||
}
|
||||
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderTimeRange() {
|
||||
// time
|
||||
if strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery = fmt.Sprintf("%s AND time >= %d AND time <= %d", query.RawQuery,
|
||||
time.Duration(query.Start)*time.Second,
|
||||
time.Duration(query.End)*time.Second)
|
||||
} else {
|
||||
query.RawQuery = fmt.Sprintf("%s WHERE time >= %d AND time <= %d", query.RawQuery,
|
||||
time.Duration(query.Start)*time.Second,
|
||||
time.Duration(query.End)*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
type QueryData struct {
|
||||
Start int64
|
||||
End int64
|
||||
Metric string
|
||||
Endpoints []string
|
||||
Tags []string
|
||||
Step int
|
||||
DsType string
|
||||
GroupKey []string //聚合维度
|
||||
AggrFunc string //聚合计算
|
||||
|
||||
RawQuery string
|
||||
}
|
||||
|
||||
func (query *QueryData) renderSelect() {
|
||||
// select
|
||||
if query.AggrFunc != "" && len(query.GroupKey) > 0 {
|
||||
query.RawQuery = ""
|
||||
} else {
|
||||
query.RawQuery = fmt.Sprintf("SELECT \"value\" FROM \"%s\"", query.Metric)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *QueryData) renderEndpoints() {
|
||||
// where endpoint
|
||||
if len(query.Endpoints) > 0 {
|
||||
endpointPart := "("
|
||||
for _, endpoint := range query.Endpoints {
|
||||
endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint)
|
||||
}
|
||||
endpointPart = endpointPart[:len(endpointPart)-len("OR")]
|
||||
endpointPart += ")"
|
||||
query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, endpointPart)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *QueryData) renderTags() {
|
||||
// where tags
|
||||
if len(query.Tags) > 0 {
|
||||
s := strings.Join(query.Tags, ",")
|
||||
tags, err := dataobj.SplitTagsString(s)
|
||||
if err != nil {
|
||||
logger.Warningf("split tags error, %+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
tagPart := "("
|
||||
for tagK, tagV := range tags {
|
||||
tagPart += fmt.Sprintf(" \"%s\"='%s' AND", tagK, tagV)
|
||||
}
|
||||
tagPart = tagPart[:len(tagPart)-len("AND")]
|
||||
tagPart += ")"
|
||||
|
||||
if strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery = fmt.Sprintf("%s AND %s", query.RawQuery, tagPart)
|
||||
} else {
|
||||
query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, tagPart)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (query *QueryData) renderTimeRange() {
|
||||
// time
|
||||
if strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery = fmt.Sprintf("%s AND time >= %d AND time <= %d", query.RawQuery,
|
||||
time.Duration(query.Start)*time.Second,
|
||||
time.Duration(query.End)*time.Second)
|
||||
} else {
|
||||
query.RawQuery = fmt.Sprintf("%s WHERE time >= %d AND time <= %d", query.RawQuery, query.Start, query.End)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *QueryData) renderGroupBy() {
|
||||
// group by
|
||||
if len(query.GroupKey) > 0 {
|
||||
groupByPart := strings.Join(query.GroupKey, ",")
|
||||
query.RawQuery = fmt.Sprintf("%s GROUP BY %s", query.RawQuery, groupByPart)
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -1,166 +1,76 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/container/set"
|
||||
"github.com/toolkits/pkg/str"
|
||||
|
||||
"github.com/didi/nightingale/src/modules/transfer/cache"
|
||||
"github.com/didi/nightingale/src/toolkits/pools"
|
||||
"github.com/didi/nightingale/src/toolkits/report"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
"github.com/didi/nightingale/src/modules/transfer/backend/influxdb"
|
||||
"github.com/didi/nightingale/src/modules/transfer/backend/tsdb"
|
||||
)
|
||||
|
||||
type InfluxdbSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Batch int `yaml:"batch"`
|
||||
MaxRetry int `yaml:"maxRetry"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
Timeout int `yaml:"timeout"`
|
||||
Address string `yaml:"address"`
|
||||
Database string `yaml:"database"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
Precision string `yaml:"precision"`
|
||||
}
|
||||
|
||||
type OpenTsdbSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Batch int `yaml:"batch"`
|
||||
ConnTimeout int `yaml:"connTimeout"`
|
||||
CallTimeout int `yaml:"callTimeout"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
MaxConns int `yaml:"maxConns"`
|
||||
MaxIdle int `yaml:"maxIdle"`
|
||||
MaxRetry int `yaml:"maxRetry"`
|
||||
Address string `yaml:"address"`
|
||||
}
|
||||
|
||||
type KafkaSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Topic string `yaml:"topic"`
|
||||
BrokersPeers string `yaml:"brokersPeers"`
|
||||
SaslUser string `yaml:"saslUser"`
|
||||
SaslPasswd string `yaml:"saslPasswd"`
|
||||
Retry int `yaml:"retry"`
|
||||
KeepAlive int64 `yaml:"keepAlive"`
|
||||
}
|
||||
|
||||
type BackendSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Batch int `yaml:"batch"`
|
||||
ConnTimeout int `yaml:"connTimeout"`
|
||||
CallTimeout int `yaml:"callTimeout"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
MaxConns int `yaml:"maxConns"`
|
||||
MaxIdle int `yaml:"maxIdle"`
|
||||
IndexTimeout int `yaml:"indexTimeout"`
|
||||
StraPath string `yaml:"straPath"`
|
||||
HbsMod string `yaml:"hbsMod"`
|
||||
|
||||
Replicas int `yaml:"replicas"`
|
||||
Cluster map[string]string `yaml:"cluster"`
|
||||
ClusterList map[string]*ClusterNode `json:"clusterList"`
|
||||
Influxdb InfluxdbSection `yaml:"influxdb"`
|
||||
OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
|
||||
Kafka KafkaSection `yaml:"kafka"`
|
||||
}
|
||||
|
||||
const DefaultSendQueueMaxSize = 102400 //10.24w
|
||||
|
||||
type ClusterNode struct {
|
||||
Addrs []string `json:"addrs"`
|
||||
DataSource string `yaml:"datasource"`
|
||||
StraPath string `yaml:"straPath"`
|
||||
|
||||
Judge JudgeSection `yaml:"judge"`
|
||||
Tsdb tsdb.TsdbSection `yaml:"tsdb"`
|
||||
Influxdb influxdb.InfluxdbSection `yaml:"influxdb"`
|
||||
OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
|
||||
Kafka KafkaSection `yaml:"kafka"`
|
||||
}
|
||||
|
||||
var (
|
||||
Config BackendSection
|
||||
// 服务节点的一致性哈希环 pk -> node
|
||||
TsdbNodeRing *ConsistentHashRing
|
||||
|
||||
// 发送缓存队列 node -> queue_of_data
|
||||
TsdbQueues = make(map[string]*list.SafeListLimited)
|
||||
JudgeQueues = cache.SafeJudgeQueue{}
|
||||
InfluxdbQueue *list.SafeListLimited
|
||||
OpenTsdbQueue *list.SafeListLimited
|
||||
KafkaQueue = make(chan KafkaData, 10)
|
||||
|
||||
// 连接池 node_address -> connection_pool
|
||||
TsdbConnPools *pools.ConnPools
|
||||
JudgeConnPools *pools.ConnPools
|
||||
OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper
|
||||
|
||||
connTimeout int32
|
||||
callTimeout int32
|
||||
defaultDataSource string
|
||||
StraPath string
|
||||
tsdbDataSource *tsdb.TsdbDataSource
|
||||
openTSDBPushEndpoint *OpenTsdbPushEndpoint
|
||||
influxdbDataSource *influxdb.InfluxdbDataSource
|
||||
kafkaPushEndpoint *KafkaPushEndpoint
|
||||
)
|
||||
|
||||
func Init(cfg BackendSection) {
|
||||
Config = cfg
|
||||
// 初始化默认参数
|
||||
connTimeout = int32(Config.ConnTimeout)
|
||||
callTimeout = int32(Config.CallTimeout)
|
||||
|
||||
initHashRing()
|
||||
initConnPools()
|
||||
initSendQueues()
|
||||
|
||||
startSendTasks()
|
||||
}
|
||||
|
||||
func initHashRing() {
|
||||
TsdbNodeRing = NewConsistentHashRing(int32(Config.Replicas), str.KeysOfMap(Config.Cluster))
|
||||
}
|
||||
|
||||
func initConnPools() {
|
||||
tsdbInstances := set.NewSafeSet()
|
||||
for _, item := range Config.ClusterList {
|
||||
for _, addr := range item.Addrs {
|
||||
tsdbInstances.Add(addr)
|
||||
defaultDataSource = cfg.DataSource
|
||||
StraPath = cfg.StraPath
|
||||
|
||||
// init judge
|
||||
InitJudge(cfg.Judge)
|
||||
|
||||
// init tsdb
|
||||
if cfg.Tsdb.Enabled {
|
||||
tsdbDataSource = &tsdb.TsdbDataSource{
|
||||
Section: cfg.Tsdb,
|
||||
SendQueueMaxSize: DefaultSendQueueMaxSize,
|
||||
SendTaskSleepInterval: DefaultSendTaskSleepInterval,
|
||||
}
|
||||
tsdbDataSource.Init() // register
|
||||
RegisterDataSource(tsdbDataSource.Section.Name, tsdbDataSource)
|
||||
}
|
||||
TsdbConnPools = pools.NewConnPools(
|
||||
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice(),
|
||||
)
|
||||
|
||||
JudgeConnPools = pools.NewConnPools(
|
||||
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, GetJudges(),
|
||||
)
|
||||
if Config.OpenTsdb.Enabled {
|
||||
OpenTsdbConnPoolHelper = pools.NewOpenTsdbConnPoolHelper(Config.OpenTsdb.Address, Config.OpenTsdb.MaxConns, Config.OpenTsdb.MaxIdle, Config.OpenTsdb.ConnTimeout, Config.OpenTsdb.CallTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func initSendQueues() {
|
||||
for node, item := range Config.ClusterList {
|
||||
for _, addr := range item.Addrs {
|
||||
TsdbQueues[node+addr] = list.NewSafeListLimited(DefaultSendQueueMaxSize)
|
||||
// init influxdb
|
||||
if cfg.Influxdb.Enabled {
|
||||
influxdbDataSource = &influxdb.InfluxdbDataSource{
|
||||
Section: cfg.Influxdb,
|
||||
SendQueueMaxSize: DefaultSendQueueMaxSize,
|
||||
SendTaskSleepInterval: DefaultSendTaskSleepInterval,
|
||||
}
|
||||
}
|
||||
influxdbDataSource.Init()
|
||||
// register
|
||||
RegisterDataSource(influxdbDataSource.Section.Name, influxdbDataSource)
|
||||
|
||||
JudgeQueues = cache.NewJudgeQueue()
|
||||
judges := GetJudges()
|
||||
for _, judge := range judges {
|
||||
JudgeQueues.Set(judge, list.NewSafeListLimited(DefaultSendQueueMaxSize))
|
||||
}
|
||||
|
||||
if Config.Influxdb.Enabled {
|
||||
InfluxdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize)
|
||||
}
|
||||
|
||||
if Config.OpenTsdb.Enabled {
|
||||
OpenTsdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize)
|
||||
}
|
||||
}
|
||||
|
||||
func GetJudges() []string {
|
||||
var judgeInstances []string
|
||||
instances, err := report.GetAlive("judge", Config.HbsMod)
|
||||
if err != nil {
|
||||
stats.Counter.Set("judge.get.err", 1)
|
||||
return judgeInstances
|
||||
// init opentsdb
|
||||
if cfg.OpenTsdb.Enabled {
|
||||
openTSDBPushEndpoint = &OpenTsdbPushEndpoint{
|
||||
Section: cfg.OpenTsdb,
|
||||
}
|
||||
openTSDBPushEndpoint.Init()
|
||||
// register
|
||||
RegisterPushEndpoint(openTSDBPushEndpoint.Section.Name, openTSDBPushEndpoint)
|
||||
}
|
||||
for _, instance := range instances {
|
||||
judgeInstance := instance.Identity + ":" + instance.RPCPort
|
||||
judgeInstances = append(judgeInstances, judgeInstance)
|
||||
// init kafka
|
||||
if cfg.Kafka.Enabled {
|
||||
kafkaPushEndpoint = &KafkaPushEndpoint{
|
||||
Section: cfg.Kafka,
|
||||
}
|
||||
kafkaPushEndpoint.Init()
|
||||
// register
|
||||
RegisterPushEndpoint(kafkaPushEndpoint.Section.Name, kafkaPushEndpoint)
|
||||
}
|
||||
return judgeInstances
|
||||
}
|
||||
|
@ -0,0 +1,191 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
"github.com/didi/nightingale/src/model"
|
||||
"github.com/didi/nightingale/src/modules/transfer/cache"
|
||||
"github.com/didi/nightingale/src/toolkits/pools"
|
||||
"github.com/didi/nightingale/src/toolkits/report"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
"github.com/didi/nightingale/src/toolkits/str"
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type JudgeSection struct {
|
||||
Batch int `yaml:"batch"`
|
||||
ConnTimeout int `yaml:"connTimeout"`
|
||||
CallTimeout int `yaml:"callTimeout"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
MaxConns int `yaml:"maxConns"`
|
||||
MaxIdle int `yaml:"maxIdle"`
|
||||
HbsMod string `yaml:"hbsMod"`
|
||||
}
|
||||
|
||||
var (
|
||||
// config
|
||||
Judge JudgeSection
|
||||
|
||||
// 连接池 node_address -> connection_pool
|
||||
JudgeConnPools *pools.ConnPools
|
||||
|
||||
// queue
|
||||
JudgeQueues = cache.SafeJudgeQueue{}
|
||||
)
|
||||
|
||||
func InitJudge(section JudgeSection) {
|
||||
Judge = section
|
||||
|
||||
judges := GetJudges()
|
||||
|
||||
// init connPool
|
||||
JudgeConnPools = pools.NewConnPools(Judge.MaxConns, Judge.MaxIdle, Judge.ConnTimeout, Judge.CallTimeout, judges)
|
||||
|
||||
// init queue
|
||||
JudgeQueues = cache.NewJudgeQueue()
|
||||
for _, judgeNode := range judges {
|
||||
JudgeQueues.Set(judgeNode, list.NewSafeListLimited(DefaultSendQueueMaxSize))
|
||||
}
|
||||
|
||||
// start task
|
||||
judgeConcurrent := Judge.WorkerNum
|
||||
if judgeConcurrent < 1 {
|
||||
judgeConcurrent = 1
|
||||
}
|
||||
judgeQueue := JudgeQueues.GetAll()
|
||||
for instance, queue := range judgeQueue {
|
||||
go Send2JudgeTask(queue, instance, judgeConcurrent)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
|
||||
batch := Judge.Batch
|
||||
sema := semaphore.NewSemaphore(concurrent)
|
||||
|
||||
for {
|
||||
items := Q.PopBackBy(batch)
|
||||
count := len(items)
|
||||
if count == 0 {
|
||||
time.Sleep(DefaultSendTaskSleepInterval)
|
||||
continue
|
||||
}
|
||||
judgeItems := make([]*dataobj.JudgeItem, count)
|
||||
stats.Counter.Set("points.out.judge", count)
|
||||
for i := 0; i < count; i++ {
|
||||
judgeItems[i] = items[i].(*dataobj.JudgeItem)
|
||||
logger.Debug("send to judge: ", judgeItems[i])
|
||||
}
|
||||
|
||||
sema.Acquire()
|
||||
go func(addr string, judgeItems []*dataobj.JudgeItem, count int) {
|
||||
defer sema.Release()
|
||||
|
||||
resp := &dataobj.SimpleRpcResponse{}
|
||||
var err error
|
||||
sendOk := false
|
||||
for i := 0; i < MaxSendRetry; i++ {
|
||||
err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp)
|
||||
if err == nil {
|
||||
sendOk = true
|
||||
break
|
||||
}
|
||||
logger.Warningf("send judge %s fail: %v", addr, err)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
if !sendOk {
|
||||
stats.Counter.Set("points.out.err", count)
|
||||
for _, item := range judgeItems {
|
||||
logger.Errorf("send %v to judge %s fail: %v", item, addr, err)
|
||||
}
|
||||
}
|
||||
|
||||
}(addr, judgeItems, count)
|
||||
}
|
||||
}
|
||||
|
||||
func Push2JudgeQueue(items []*dataobj.MetricValue) {
|
||||
errCnt := 0
|
||||
for _, item := range items {
|
||||
key := str.PK(item.Metric, item.Endpoint)
|
||||
stras := cache.StraMap.GetByKey(key)
|
||||
|
||||
for _, stra := range stras {
|
||||
if !TagMatch(stra.Tags, item.TagsMap) {
|
||||
continue
|
||||
}
|
||||
judgeItem := &dataobj.JudgeItem{
|
||||
Endpoint: item.Endpoint,
|
||||
Metric: item.Metric,
|
||||
Value: item.Value,
|
||||
Timestamp: item.Timestamp,
|
||||
DsType: item.CounterType,
|
||||
Tags: item.Tags,
|
||||
TagsMap: item.TagsMap,
|
||||
Step: int(item.Step),
|
||||
Sid: stra.Id,
|
||||
Extra: item.Extra,
|
||||
}
|
||||
|
||||
q, exists := JudgeQueues.Get(stra.JudgeInstance)
|
||||
if exists {
|
||||
if !q.PushFront(judgeItem) {
|
||||
errCnt += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.Counter.Set("judge.queue.err", errCnt)
|
||||
}
|
||||
|
||||
func alignTs(ts int64, period int64) int64 {
|
||||
return ts - ts%period
|
||||
}
|
||||
|
||||
func TagMatch(straTags []model.Tag, tag map[string]string) bool {
|
||||
for _, stag := range straTags {
|
||||
if _, exists := tag[stag.Tkey]; !exists {
|
||||
return false
|
||||
}
|
||||
var match bool
|
||||
if stag.Topt == "=" { //当前策略 tagkey 对应的 tagv
|
||||
for _, v := range stag.Tval {
|
||||
if tag[stag.Tkey] == v {
|
||||
match = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match = true
|
||||
for _, v := range stag.Tval {
|
||||
if tag[stag.Tkey] == v {
|
||||
match = false
|
||||
return match
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !match {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func GetJudges() []string {
|
||||
var judgeInstances []string
|
||||
instances, err := report.GetAlive("judge", Judge.HbsMod)
|
||||
if err != nil {
|
||||
stats.Counter.Set("judge.get.err", 1)
|
||||
return judgeInstances
|
||||
}
|
||||
for _, instance := range instances {
|
||||
judgeInstance := instance.Identity + ":" + instance.RPCPort
|
||||
judgeInstances = append(judgeInstances, judgeInstance)
|
||||
}
|
||||
return judgeInstances
|
||||
}
|
@ -0,0 +1,136 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
"github.com/didi/nightingale/src/toolkits/pools"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type OpenTsdbSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Name string `yaml:"name"`
|
||||
Batch int `yaml:"batch"`
|
||||
ConnTimeout int `yaml:"connTimeout"`
|
||||
CallTimeout int `yaml:"callTimeout"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
MaxConns int `yaml:"maxConns"`
|
||||
MaxIdle int `yaml:"maxIdle"`
|
||||
MaxRetry int `yaml:"maxRetry"`
|
||||
Address string `yaml:"address"`
|
||||
}
|
||||
|
||||
type OpenTsdbPushEndpoint struct {
|
||||
// config
|
||||
Section OpenTsdbSection
|
||||
|
||||
OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper
|
||||
|
||||
// 发送缓存队列 node -> queue_of_data
|
||||
OpenTsdbQueue *list.SafeListLimited
|
||||
}
|
||||
|
||||
func (opentsdb *OpenTsdbPushEndpoint) Init() {
|
||||
// init connPool
|
||||
if opentsdb.Section.Enabled {
|
||||
opentsdb.OpenTsdbConnPoolHelper = pools.NewOpenTsdbConnPoolHelper(opentsdb.Section.Address,
|
||||
opentsdb.Section.MaxConns, opentsdb.Section.MaxIdle, opentsdb.Section.ConnTimeout,
|
||||
opentsdb.Section.CallTimeout)
|
||||
}
|
||||
|
||||
// init queue
|
||||
if opentsdb.Section.Enabled {
|
||||
opentsdb.OpenTsdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize)
|
||||
}
|
||||
|
||||
// start task
|
||||
openTsdbConcurrent := opentsdb.Section.WorkerNum
|
||||
if openTsdbConcurrent < 1 {
|
||||
openTsdbConcurrent = 1
|
||||
}
|
||||
go opentsdb.send2OpenTsdbTask(openTsdbConcurrent)
|
||||
|
||||
}
|
||||
|
||||
// 将原始数据入到tsdb发送缓存队列
|
||||
func (opentsdb *OpenTsdbPushEndpoint) Push2Queue(items []*dataobj.MetricValue) {
|
||||
errCnt := 0
|
||||
for _, item := range items {
|
||||
tsdbItem := opentsdb.convert2OpenTsdbItem(item)
|
||||
isSuccess := opentsdb.OpenTsdbQueue.PushFront(tsdbItem)
|
||||
|
||||
if !isSuccess {
|
||||
errCnt += 1
|
||||
}
|
||||
}
|
||||
stats.Counter.Set("opentsdb.queue.err", errCnt)
|
||||
}
|
||||
|
||||
func (opentsdb *OpenTsdbPushEndpoint) send2OpenTsdbTask(concurrent int) {
|
||||
batch := opentsdb.Section.Batch // 一次发送,最多batch条数据
|
||||
retry := opentsdb.Section.MaxRetry
|
||||
addr := opentsdb.Section.Address
|
||||
sema := semaphore.NewSemaphore(concurrent)
|
||||
|
||||
for {
|
||||
items := opentsdb.OpenTsdbQueue.PopBackBy(batch)
|
||||
count := len(items)
|
||||
if count == 0 {
|
||||
time.Sleep(DefaultSendTaskSleepInterval)
|
||||
continue
|
||||
}
|
||||
var openTsdbBuffer bytes.Buffer
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
tsdbItem := items[i].(*dataobj.OpenTsdbItem)
|
||||
openTsdbBuffer.WriteString(tsdbItem.OpenTsdbString())
|
||||
openTsdbBuffer.WriteString("\n")
|
||||
stats.Counter.Set("points.out.opentsdb", 1)
|
||||
logger.Debug("send to opentsdb: ", tsdbItem)
|
||||
}
|
||||
// 同步Call + 有限并发 进行发送
|
||||
sema.Acquire()
|
||||
go func(addr string, openTsdbBuffer bytes.Buffer, count int) {
|
||||
defer sema.Release()
|
||||
|
||||
var err error
|
||||
sendOk := false
|
||||
for i := 0; i < retry; i++ {
|
||||
err = opentsdb.OpenTsdbConnPoolHelper.Send(openTsdbBuffer.Bytes())
|
||||
if err == nil {
|
||||
sendOk = true
|
||||
break
|
||||
}
|
||||
logger.Warningf("send opentsdb %s fail: %v", addr, err)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
if !sendOk {
|
||||
stats.Counter.Set("points.out.opentsdb.err", count)
|
||||
for _, item := range items {
|
||||
logger.Errorf("send %v to opentsdb %s fail: %v", item, addr, err)
|
||||
}
|
||||
} else {
|
||||
logger.Debugf("send to opentsdb %s ok", addr)
|
||||
}
|
||||
}(addr, openTsdbBuffer, count)
|
||||
}
|
||||
}
|
||||
|
||||
func (opentsdb *OpenTsdbPushEndpoint) convert2OpenTsdbItem(d *dataobj.MetricValue) *dataobj.OpenTsdbItem {
|
||||
t := dataobj.OpenTsdbItem{Tags: make(map[string]string)}
|
||||
|
||||
for k, v := range d.TagsMap {
|
||||
t.Tags[k] = v
|
||||
}
|
||||
t.Tags["endpoint"] = d.Endpoint
|
||||
t.Metric = d.Metric
|
||||
t.Timestamp = d.Timestamp
|
||||
t.Value = d.Value
|
||||
return &t
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,57 @@
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/toolkits/report"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var IndexList IndexAddrs
|
||||
|
||||
type IndexAddrs struct {
|
||||
sync.RWMutex
|
||||
Data []string
|
||||
}
|
||||
|
||||
func (i *IndexAddrs) Set(addrs []string) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
i.Data = addrs
|
||||
}
|
||||
|
||||
func (i *IndexAddrs) Get() []string {
|
||||
i.RLock()
|
||||
defer i.RUnlock()
|
||||
return i.Data
|
||||
}
|
||||
|
||||
func GetIndexLoop() {
|
||||
t1 := time.NewTicker(time.Duration(9) * time.Second)
|
||||
GetIndex()
|
||||
for {
|
||||
<-t1.C
|
||||
GetIndex()
|
||||
}
|
||||
}
|
||||
|
||||
func GetIndex() {
|
||||
instances, err := report.GetAlive("index", "monapi")
|
||||
if err != nil {
|
||||
stats.Counter.Set("get.index.err", 1)
|
||||
logger.Warningf("get index list err:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
activeIndexs := []string{}
|
||||
for _, instance := range instances {
|
||||
activeIndexs = append(activeIndexs, fmt.Sprintf("%s:%s", instance.Identity, instance.HTTPPort))
|
||||
}
|
||||
|
||||
IndexList.Set(activeIndexs)
|
||||
return
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -1,4 +1,4 @@
|
||||
package backend
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"sync"
|
@ -0,0 +1,212 @@
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/toolkits/pkg/errors"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
"github.com/didi/nightingale/src/toolkits/pools"
|
||||
"github.com/didi/nightingale/src/toolkits/stats"
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/container/set"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
)
|
||||
|
||||
type TsdbSection struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Name string `yaml:"name"`
|
||||
Batch int `yaml:"batch"`
|
||||
ConnTimeout int `yaml:"connTimeout"`
|
||||
CallTimeout int `yaml:"callTimeout"`
|
||||
WorkerNum int `yaml:"workerNum"`
|
||||
MaxConns int `yaml:"maxConns"`
|
||||
MaxIdle int `yaml:"maxIdle"`
|
||||
IndexTimeout int `yaml:"indexTimeout"`
|
||||
|
||||
Replicas int `yaml:"replicas"`
|
||||
Cluster map[string]string `yaml:"cluster"`
|
||||
ClusterList map[string]*ClusterNode `json:"clusterList"`
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Addrs []string `json:"addrs"`
|
||||
}
|
||||
|
||||
type TsdbDataSource struct {
|
||||
//config
|
||||
Section TsdbSection
|
||||
SendQueueMaxSize int
|
||||
SendTaskSleepInterval time.Duration
|
||||
|
||||
// 服务节点的一致性哈希环 pk -> node
|
||||
TsdbNodeRing *ConsistentHashRing
|
||||
|
||||
// 发送缓存队列 node -> queue_of_data
|
||||
TsdbQueues map[string]*list.SafeListLimited
|
||||
|
||||
// 连接池 node_address -> connection_pool
|
||||
TsdbConnPools *pools.ConnPools
|
||||
}
|
||||
|
||||
func (tsdb *TsdbDataSource) Init() {
|
||||
|
||||
// init hash ring
|
||||
tsdb.TsdbNodeRing = NewConsistentHashRing(int32(tsdb.Section.Replicas),
|
||||
str.KeysOfMap(tsdb.Section.Cluster))
|
||||
|
||||
// init connPool
|
||||
tsdbInstances := set.NewSafeSet()
|
||||
for _, item := range tsdb.Section.ClusterList {
|
||||
for _, addr := range item.Addrs {
|
||||
tsdbInstances.Add(addr)
|
||||
}
|
||||
}
|
||||
tsdb.TsdbConnPools = pools.NewConnPools(
|
||||
tsdb.Section.MaxConns, tsdb.Section.MaxIdle, tsdb.Section.ConnTimeout, tsdb.Section.CallTimeout,
|
||||
tsdbInstances.ToSlice(),
|
||||
)
|
||||
|
||||
// init queues
|
||||
tsdb.TsdbQueues = make(map[string]*list.SafeListLimited)
|
||||
for node, item := range tsdb.Section.ClusterList {
|
||||
for _, addr := range item.Addrs {
|
||||
tsdb.TsdbQueues[node+addr] = list.NewSafeListLimited(tsdb.SendQueueMaxSize)
|
||||
}
|
||||
}
|
||||
|
||||
// start task
|
||||
tsdbConcurrent := tsdb.Section.WorkerNum
|
||||
if tsdbConcurrent < 1 {
|
||||
tsdbConcurrent = 1
|
||||
}
|
||||
for node, item := range tsdb.Section.ClusterList {
|
||||
for _, addr := range item.Addrs {
|
||||
queue := tsdb.TsdbQueues[node+addr]
|
||||
go tsdb.Send2TsdbTask(queue, node, addr, tsdbConcurrent)
|
||||
}
|
||||
}
|
||||
|
||||
go GetIndexLoop()
|
||||
}
|
||||
|
||||
// Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring.
|
||||
func (tsdb *TsdbDataSource) Push2Queue(items []*dataobj.MetricValue) {
|
||||
errCnt := 0
|
||||
for _, item := range items {
|
||||
tsdbItem := convert2TsdbItem(item)
|
||||
stats.Counter.Set("tsdb.queue.push", 1)
|
||||
|
||||
node, err := tsdb.TsdbNodeRing.GetNode(item.PK())
|
||||
if err != nil {
|
||||
logger.Warningf("get tsdb node error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
cnode := tsdb.Section.ClusterList[node]
|
||||
for _, addr := range cnode.Addrs {
|
||||
Q := tsdb.TsdbQueues[node+addr]
|
||||
// 队列已满
|
||||
if !Q.PushFront(tsdbItem) {
|
||||
errCnt += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// statistics
|
||||
if errCnt > 0 {
|
||||
stats.Counter.Set("tsdb.queue.err", errCnt)
|
||||
logger.Error("Push2TsdbSendQueue err num: ", errCnt)
|
||||
}
|
||||
}
|
||||
|
||||
func (tsdb *TsdbDataSource) Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) {
|
||||
batch := tsdb.Section.Batch // 一次发送,最多batch条数据
|
||||
Q = tsdb.TsdbQueues[node+addr]
|
||||
|
||||
sema := semaphore.NewSemaphore(concurrent)
|
||||
|
||||
for {
|
||||
items := Q.PopBackBy(batch)
|
||||
count := len(items)
|
||||
if count == 0 {
|
||||
time.Sleep(tsdb.SendTaskSleepInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
tsdbItems := make([]*dataobj.TsdbItem, count)
|
||||
stats.Counter.Set("points.out.tsdb", count)
|
||||
for i := 0; i < count; i++ {
|
||||
tsdbItems[i] = items[i].(*dataobj.TsdbItem)
|
||||
logger.Debug("send to tsdb->: ", tsdbItems[i])
|
||||
}
|
||||
|
||||
//控制并发
|
||||
sema.Acquire()
|
||||
go func(addr string, tsdbItems []*dataobj.TsdbItem, count int) {
|
||||
defer sema.Release()
|
||||
|
||||
resp := &dataobj.SimpleRpcResponse{}
|
||||
var err error
|
||||
sendOk := false
|
||||
for i := 0; i < 3; i++ { //最多重试3次
|
||||
err = tsdb.TsdbConnPools.Call(addr, "Tsdb.Send", tsdbItems, resp)
|
||||
if err == nil {
|
||||
sendOk = true
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
if !sendOk {
|
||||
stats.Counter.Set("points.out.tsdb.err", count)
|
||||
logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err)
|
||||
} else {
|
||||
logger.Debugf("send to tsdb %s:%s ok", node, addr)
|
||||
}
|
||||
}(addr, tsdbItems, count)
|
||||
}
|
||||
}
|
||||
|
||||
func (tsdb *TsdbDataSource) GetInstance(metric, endpoint string, tags map[string]string) []string {
|
||||
counter, err := dataobj.GetCounter(metric, "", tags)
|
||||
errors.Dangerous(err)
|
||||
|
||||
pk := dataobj.PKWithCounter(endpoint, counter)
|
||||
pools, err := tsdb.SelectPoolByPK(pk)
|
||||
addrs := make([]string, len(pools))
|
||||
for i, pool := range pools {
|
||||
addrs[i] = pool.Addr
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// 打到 Tsdb 的数据,要根据 rrdtool 的特定 来限制 step、counterType、timestamp
|
||||
func convert2TsdbItem(d *dataobj.MetricValue) *dataobj.TsdbItem {
|
||||
item := &dataobj.TsdbItem{
|
||||
Endpoint: d.Endpoint,
|
||||
Metric: d.Metric,
|
||||
Value: d.Value,
|
||||
Timestamp: d.Timestamp,
|
||||
Tags: d.Tags,
|
||||
TagsMap: d.TagsMap,
|
||||
Step: int(d.Step),
|
||||
Heartbeat: int(d.Step) * 2,
|
||||
DsType: dataobj.GAUGE,
|
||||
Min: "U",
|
||||
Max: "U",
|
||||
}
|
||||
|
||||
return item
|
||||
}
|
||||
|
||||
func getTags(counter string) (tags string) {
|
||||
idx := strings.IndexAny(counter, "/")
|
||||
if idx == -1 {
|
||||
return ""
|
||||
}
|
||||
return counter[idx+1:]
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue