From 3fdd61edfc482db6798fb25f10b94b0b0584732b Mon Sep 17 00:00:00 2001 From: sun763625521 <763625521@qq.com> Date: Wed, 3 Feb 2021 15:02:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Erabbitmq=E3=80=81haproxy?= =?UTF-8?q?=E7=BB=84=E4=BB=B6=E9=87=87=E9=9B=86=20(#575)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add * add prober plugin for rabbitmq * add prober plugin for haproxy Co-authored-by: root Co-authored-by: UlricQin --- src/modules/monapi/plugins/all/all.go | 2 + src/modules/monapi/plugins/haproxy/haproxy.go | 71 ++++ .../monapi/plugins/haproxy/haproxy/haproxy.go | 313 ++++++++++++++++++ .../monapi/plugins/rabbitmq/rabbitmq.go | 98 ++++++ 4 files changed, 484 insertions(+) create mode 100644 src/modules/monapi/plugins/haproxy/haproxy.go create mode 100644 src/modules/monapi/plugins/haproxy/haproxy/haproxy.go create mode 100644 src/modules/monapi/plugins/rabbitmq/rabbitmq.go diff --git a/src/modules/monapi/plugins/all/all.go b/src/modules/monapi/plugins/all/all.go index 55f6d99a..45d3c721 100644 --- a/src/modules/monapi/plugins/all/all.go +++ b/src/modules/monapi/plugins/all/all.go @@ -11,6 +11,8 @@ import ( _ "github.com/didi/nightingale/src/modules/monapi/plugins/redis" _ "github.com/didi/nightingale/src/modules/monapi/plugins/nginx" _ "github.com/didi/nightingale/src/modules/monapi/plugins/elasticsearch" + _ "github.com/didi/nightingale/src/modules/monapi/plugins/rabbitmq" + _ "github.com/didi/nightingale/src/modules/monapi/plugins/haproxy" _ "github.com/didi/nightingale/src/modules/monapi/plugins/tengine" _ "github.com/didi/nightingale/src/modules/monapi/plugins/zookeeper" diff --git a/src/modules/monapi/plugins/haproxy/haproxy.go b/src/modules/monapi/plugins/haproxy/haproxy.go new file mode 100644 index 00000000..c122543a --- /dev/null +++ b/src/modules/monapi/plugins/haproxy/haproxy.go @@ -0,0 +1,71 @@ +package haproxy + +import ( + "fmt" + "github.com/didi/nightingale/src/modules/monapi/collector" + "github.com/didi/nightingale/src/modules/monapi/plugins" + "github.com/didi/nightingale/src/toolkits/i18n" + "github.com/influxdata/telegraf" + "github.com/didi/nightingale/src/modules/monapi/plugins/haproxy/haproxy" +) + +func init() { + collector.CollectorRegister(NewHaproxyCollector()) // for monapi + i18n.DictRegister(langDict) +} + +type HaproxyCollector struct { + *collector.BaseCollector +} + +func NewHaproxyCollector() *HaproxyCollector { + return &HaproxyCollector{BaseCollector: collector.NewBaseCollector( + "haproxy", + collector.RemoteCategory, + func() collector.TelegrafPlugin { return &HaproxyRule{} }, + )} +} + +var ( + langDict = map[string]map[string]string{ + "zh": map[string]string{ + "Servers": "Servers", + "Username": "用户名", + "Password": "密码", + }, + } +) + +type HaproxyRule struct { + Servers []string `label:"Servers" json:"servers,required" example:"http://myhaproxy.com:1936/haproxy?stats"` + KeepFieldNames bool `label:"KeepFieldNames" json:"keepFieldNames" default:"false" description:"Setting this option to true results in the plugin keeping the original"` + Username string `label:"Username" json:"username" description:"specify username"` + Password string `label:"Password" json:"password" format:"password" description:"specify server password"` + + plugins.ClientConfig +} + +func (p *HaproxyRule) Validate() error { + if len(p.Servers) == 0 || p.Servers[0] == "" { + return fmt.Errorf("haproxy.rule.servers must be set") + } + return nil +} + +func (p *HaproxyRule) TelegrafInput() (telegraf.Input, error) { + if err := p.Validate(); err != nil { + return nil, err + } + + ha := &haproxy.Haproxy{ + + Servers: p.Servers, + KeepFieldNames: p.KeepFieldNames, + Username: p.Username, + Password: p.Password, + ClientConfig: p.ClientConfig.TlsClientConfig(), + } + + return ha, nil +} + diff --git a/src/modules/monapi/plugins/haproxy/haproxy/haproxy.go b/src/modules/monapi/plugins/haproxy/haproxy/haproxy.go new file mode 100644 index 00000000..3d26ec58 --- /dev/null +++ b/src/modules/monapi/plugins/haproxy/haproxy/haproxy.go @@ -0,0 +1,313 @@ +package haproxy + +import ( + "encoding/csv" + "fmt" + "io" + "net" + "net/http" + "net/url" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" + +) + +//CSV format: https://cbonte.github.io/haproxy-dconv/1.5/configuration.html#9.1 + +type Haproxy struct { + Servers []string + KeepFieldNames bool + Username string + Password string + tls.ClientConfig + + client *http.Client +} + +var sampleConfig = ` + ## An array of address to gather stats about. Specify an ip on hostname + ## with optional port. ie localhost, 10.10.3.33:1936, etc. + ## Make sure you specify the complete path to the stats endpoint + ## including the protocol, ie http://10.10.3.33:1936/haproxy?stats + + ## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats + servers = ["http://myhaproxy.com:1936/haproxy?stats"] + + ## Credentials for basic HTTP authentication + # username = "admin" + # password = "admin" + + ## You can also use local socket with standard wildcard globbing. + ## Server address not starting with 'http' will be treated as a possible + ## socket, so both examples below are valid. + # servers = ["socket:/run/haproxy/admin.sock", "/run/haproxy/*.sock"] + + ## By default, some of the fields are renamed from what haproxy calls them. + ## Setting this option to true results in the plugin keeping the original + ## field names. + # keep_field_names = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +func (r *Haproxy) SampleConfig() string { + return sampleConfig +} + +func (r *Haproxy) Description() string { + return "Read metrics of haproxy, via socket or csv stats page" +} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (g *Haproxy) Gather(acc telegraf.Accumulator) error { + if len(g.Servers) == 0 { + return g.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc) + } + + endpoints := make([]string, 0, len(g.Servers)) + + for _, endpoint := range g.Servers { + + if strings.HasPrefix(endpoint, "http") { + endpoints = append(endpoints, endpoint) + continue + } + + socketPath := getSocketAddr(endpoint) + + matches, err := filepath.Glob(socketPath) + + if err != nil { + return err + } + + if len(matches) == 0 { + endpoints = append(endpoints, socketPath) + } else { + for _, match := range matches { + endpoints = append(endpoints, match) + } + } + } + + var wg sync.WaitGroup + wg.Add(len(endpoints)) + for _, server := range endpoints { + go func(serv string) { + defer wg.Done() + if err := g.gatherServer(serv, acc); err != nil { + acc.AddError(err) + } + }(server) + } + + wg.Wait() + return nil +} + +func (g *Haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error { + socketPath := getSocketAddr(addr) + + c, err := net.Dial("unix", socketPath) + + if err != nil { + return fmt.Errorf("Could not connect to socket '%s': %s", addr, err) + } + + _, errw := c.Write([]byte("show stat\n")) + + if errw != nil { + return fmt.Errorf("Could not write to socket '%s': %s", addr, errw) + } + + return g.importCsvResult(c, acc, socketPath) +} + +func (g *Haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { + if !strings.HasPrefix(addr, "http") { + return g.gatherServerSocket(addr, acc) + } + + if g.client == nil { + tlsCfg, err := g.ClientConfig.TLSConfig() + if err != nil { + return err + } + tr := &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + TLSClientConfig: tlsCfg, + } + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + g.client = client + } + + if !strings.HasSuffix(addr, ";csv") { + addr += "/;csv" + } + + u, err := url.Parse(addr) + if err != nil { + return fmt.Errorf("unable parse server address '%s': %s", addr, err) + } + + req, err := http.NewRequest("GET", addr, nil) + if err != nil { + return fmt.Errorf("unable to create new request '%s': %s", addr, err) + } + if u.User != nil { + p, _ := u.User.Password() + req.SetBasicAuth(u.User.Username(), p) + u.User = &url.Userinfo{} + addr = u.String() + } + + if g.Username != "" || g.Password != "" { + req.SetBasicAuth(g.Username, g.Password) + } + + res, err := g.client.Do(req) + if err != nil { + return fmt.Errorf("unable to connect to haproxy server '%s': %s", addr, err) + } + defer res.Body.Close() + + if res.StatusCode != 200 { + return fmt.Errorf("unable to get valid stat result from '%s', http response code : %d", addr, res.StatusCode) + } + + if err := g.importCsvResult(res.Body, acc, u.Host); err != nil { + return fmt.Errorf("unable to parse stat result from '%s': %s", addr, err) + } + + return nil +} + +func getSocketAddr(sock string) string { + socketAddr := strings.Split(sock, ":") + + if len(socketAddr) >= 2 { + return socketAddr[1] + } else { + return socketAddr[0] + } +} + +var typeNames = []string{"frontend", "backend", "server", "listener"} +var fieldRenames = map[string]string{ + "pxname": "proxy", + "svname": "sv", + "act": "active_servers", + "bck": "backup_servers", + "cli_abrt": "cli_abort", + "srv_abrt": "srv_abort", + "hrsp_1xx": "http_response.1xx", + "hrsp_2xx": "http_response.2xx", + "hrsp_3xx": "http_response.3xx", + "hrsp_4xx": "http_response.4xx", + "hrsp_5xx": "http_response.5xx", + "hrsp_other": "http_response.other", +} + +func (g *Haproxy) importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error { + csvr := csv.NewReader(r) + now := time.Now() + + headers, err := csvr.Read() + if err != nil { + return err + } + if len(headers[0]) <= 2 || headers[0][:2] != "# " { + return fmt.Errorf("did not receive standard haproxy headers") + } + headers[0] = headers[0][2:] + + for { + row, err := csvr.Read() + if err == io.EOF { + break + } + if err != nil { + return err + } + + fields := make(map[string]interface{}) + tags := map[string]string{ + "server": host, + } + + if len(row) != len(headers) { + return fmt.Errorf("number of columns does not match number of headers. headers=%d columns=%d", len(headers), len(row)) + } + for i, v := range row { + if v == "" { + continue + } + + colName := headers[i] + fieldName := colName + if !g.KeepFieldNames { + if fieldRename, ok := fieldRenames[colName]; ok { + fieldName = fieldRename + } + } + + switch colName { + case "pxname", "svname": + tags[fieldName] = v + case "type": + vi, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return fmt.Errorf("unable to parse type value '%s'", v) + } + if vi >= int64(len(typeNames)) { + return fmt.Errorf("received unknown type value: %d", vi) + } + tags[fieldName] = typeNames[vi] + case "check_desc", "agent_desc": + // do nothing. These fields are just a more verbose description of the check_status & agent_status fields + case "status", "check_status", "last_chk", "mode", "tracked", "agent_status", "last_agt", "addr", "cookie": + // these are string fields + fields[fieldName] = v + case "lastsess": + vi, err := strconv.ParseInt(v, 10, 64) + if err != nil { + //TODO log the error. And just once (per column) so we don't spam the log + continue + } + fields[fieldName] = vi + default: + vi, err := strconv.ParseUint(v, 10, 64) + if err != nil { + //TODO log the error. And just once (per column) so we don't spam the log + continue + } + fields[fieldName] = vi + } + } + acc.AddFields("haproxy", fields, tags, now) + } + return err +} + +func init() { + inputs.Add("haproxy", func() telegraf.Input { + return &Haproxy{} + }) +} + diff --git a/src/modules/monapi/plugins/rabbitmq/rabbitmq.go b/src/modules/monapi/plugins/rabbitmq/rabbitmq.go new file mode 100644 index 00000000..0844752f --- /dev/null +++ b/src/modules/monapi/plugins/rabbitmq/rabbitmq.go @@ -0,0 +1,98 @@ +package rabbitmq + +import ( + "fmt" + "time" + "reflect" + "github.com/didi/nightingale/src/modules/monapi/collector" + "github.com/didi/nightingale/src/modules/monapi/plugins" + "github.com/didi/nightingale/src/toolkits/i18n" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" +) + +func init() { + collector.CollectorRegister(NewRabbitMQCollector()) // for monapi + i18n.DictRegister(langDict) +} + +type RabbitMQCollector struct { + *collector.BaseCollector +} + +func NewRabbitMQCollector() *RabbitMQCollector { + return &RabbitMQCollector{BaseCollector: collector.NewBaseCollector( + "rabbitMQ", + collector.RemoteCategory, + func() collector.TelegrafPlugin { return &RabbitMQRule{} }, + )} +} + +var ( + langDict = map[string]map[string]string{ + "zh": map[string]string{ + "URL": "URL", + "Name": "节点名称", + "Username": "用户名", + "Password": "密码", + "header time out": "请求超时时间", + "client time out": "连接超时时间", + "nodes": "MQ节点", + "queues": "队列", + "exchanges": "Exchange交换机", + "QueueNameInclude": "包含队列", + "QueueNameExclude": "排除队列", + + }, + } +) + +type RabbitMQRule struct { + URL string `label:"URL" json:"url,required" example:"http://localhost:15672"` + Name string `label:"Name" json:"Name" description:"Tag added to rabbitmq_overview series"` + Username string `label:"Username" json:"username,required" description:"specify username"` + Password string `label:"Password" json:"password,required" format:"password" description:"specify server password"` + ResponseHeaderTimeout int `label:"header time out" json:"header_timeout" default:"3" description:"for a server's response headers after fully writing the request"` + ClientTimeout int `label:"client time out" json:"client_timeout" default:"4" description:"for a server's response headers after fully writing the request"` + Nodes []string `label:"nodes" json:"nodes" description:"A list of nodes to gather as the rabbitmq_node measurement"` + Queues []string `label:"queues" json:"queues" description:"A list of queues to gather as the rabbitmq_queue measurement"` + Exchanges []string `label:"exchanges" json:"exchanges" description:"A list of exchanges to gather as the rabbitmq_exchange measurement"` + QueueNameInclude []string `label:"queue name include" json:"queue_name_include" description:"Queues to include."` + QueueNameExclude []string `label:"queue name exclude" json:"queue_name_exclude" description:"Queues to exclude."` + FederationUpstreamInclude []string `label:"FederationUpstreamInclude" json:"federation_upstream_include" description:"exchange filters include"` + FederationUpstreamExclude []string `label:"FederationUpstreamExclude" json:"federation_upstream_exclude" description:"exchange filters exclude"` + plugins.ClientConfig +} + +func (p *RabbitMQRule) Validate() error { + if len(p.URL) == 0 || p.URL == "" { + return fmt.Errorf("rabbitmq.rule.servers must be set") + } + return nil +} + +func (p *RabbitMQRule) TelegrafInput() (telegraf.Input, error) { + if err := p.Validate(); err != nil { + return nil, err + } + + mq := &rabbitmq.RabbitMQ{ + + URL: p.URL, + Name: p.Name, + Username: p.Username, + Password: p.Password, + Nodes: p.Nodes, + Queues: p.Queues, + Exchanges: p.Exchanges, + QueueInclude: p.QueueNameInclude, + QueueExclude: p.QueueNameExclude, + ClientConfig: p.ClientConfig.TlsClientConfig(), + } + v := reflect.ValueOf(&(mq.ResponseHeaderTimeout.Duration)).Elem() + v.Set(reflect.ValueOf(time.Second * time.Duration(p.ResponseHeaderTimeout))) + v1 := reflect.ValueOf(&(mq.ClientTimeout.Duration)).Elem() + v1.Set(reflect.ValueOf(time.Second * time.Duration(p.ClientTimeout))) + return mq, nil +} +