prober nginx 采集插件 (#557)

* add a method to get the Endpoint

* 增加nginx插件,修改control。支持./control build prober job这种多个参数

* 修改提示

Co-authored-by: lynxcat <lynxcatdeng@gmail.com>
master
lynxcat 5 years ago committed by GitHub
parent 211dfc62e4
commit 2d4e6bb8da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -228,30 +228,49 @@ pack()
rm -rf ~/n9e.bak.$clock
}
exec()
{
params=${@:2}
if [ ${#2} -gt 0 ]; then
for param in $params
do
$1 $param
if [ "x${mod}" = "xall" ]; then
break
fi
done
else
echo $1
$1
fi
}
case "$1" in
start)
start $2
exec start ${@:2}
;;
stop)
stop $2
exec stop ${@:2}
;;
restart)
restart $2
exec restart ${@:2}
;;
status)
status
;;
build)
build $2
exec build ${@:2}
;;
build_local)
build_local $2
exec build_local ${@:2}
;;
reload)
reload $2
exec reload ${@:2}
;;
pack)
pack $2
exec pack ${@:2}
;;
*)
usage

@ -0,0 +1,9 @@
mode: whitelist # whitelist(default),all
metrics:
- name: nginx_accepts
- name: nginx_active
- name: nginx_handled
- name: nginx_reading
- name: nginx_requests
- name: nginx_waiting
- name: nginx_writing

@ -8,6 +8,7 @@ import (
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/redis"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/nginx"
// local
_ "github.com/didi/nightingale/src/modules/monapi/plugins/log"

@ -0,0 +1,55 @@
package nginx
import (
"fmt"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/nginx"
)
func init() {
collector.CollectorRegister(NewCollector()) // for monapi
i18n.DictRegister(langDict)
}
type Collector struct {
*collector.BaseCollector
}
func NewCollector() *Collector {
return &Collector{BaseCollector: collector.NewBaseCollector(
"nginx",
collector.RemoteCategory,
func() collector.TelegrafPlugin { return &Rule{} },
)}
}
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"nginx status uri": "查看Nginx状态的地址",
},
}
)
type Rule struct {
Urls []string `label:"nginx status uri" json:"url,required" example:"http://localhost/status"`
}
func (p *Rule) Validate() error {
if len(p.Urls) == 0 || p.Urls[0] == "" {
return fmt.Errorf("ningx.rule.urls must be set")
}
return nil
}
func (p *Rule) TelegrafInput() (telegraf.Input, error) {
if err := p.Validate(); err != nil {
return nil, err
}
return &nginx.Nginx{
Urls: p.Urls,
}, nil
}

@ -0,0 +1,19 @@
Copyright (C) 2014 Alec Thomas
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,11 @@
# Units - Helpful unit multipliers and functions for Go
The goal of this package is to have functionality similar to the [time](http://golang.org/pkg/time/) package.
It allows for code like this:
```go
n, err := ParseBase2Bytes("1KB")
// n == 1024
n = units.Mebibyte * 512
```

@ -0,0 +1,85 @@
package units
// Base2Bytes is the old non-SI power-of-2 byte scale (1024 bytes in a kilobyte,
// etc.).
type Base2Bytes int64
// Base-2 byte units.
const (
Kibibyte Base2Bytes = 1024
KiB = Kibibyte
Mebibyte = Kibibyte * 1024
MiB = Mebibyte
Gibibyte = Mebibyte * 1024
GiB = Gibibyte
Tebibyte = Gibibyte * 1024
TiB = Tebibyte
Pebibyte = Tebibyte * 1024
PiB = Pebibyte
Exbibyte = Pebibyte * 1024
EiB = Exbibyte
)
var (
bytesUnitMap = MakeUnitMap("iB", "B", 1024)
oldBytesUnitMap = MakeUnitMap("B", "B", 1024)
)
// ParseBase2Bytes supports both iB and B in base-2 multipliers. That is, KB
// and KiB are both 1024.
// However "kB", which is the correct SI spelling of 1000 Bytes, is rejected.
func ParseBase2Bytes(s string) (Base2Bytes, error) {
n, err := ParseUnit(s, bytesUnitMap)
if err != nil {
n, err = ParseUnit(s, oldBytesUnitMap)
}
return Base2Bytes(n), err
}
func (b Base2Bytes) String() string {
return ToString(int64(b), 1024, "iB", "B")
}
var (
metricBytesUnitMap = MakeUnitMap("B", "B", 1000)
)
// MetricBytes are SI byte units (1000 bytes in a kilobyte).
type MetricBytes SI
// SI base-10 byte units.
const (
Kilobyte MetricBytes = 1000
KB = Kilobyte
Megabyte = Kilobyte * 1000
MB = Megabyte
Gigabyte = Megabyte * 1000
GB = Gigabyte
Terabyte = Gigabyte * 1000
TB = Terabyte
Petabyte = Terabyte * 1000
PB = Petabyte
Exabyte = Petabyte * 1000
EB = Exabyte
)
// ParseMetricBytes parses base-10 metric byte units. That is, KB is 1000 bytes.
func ParseMetricBytes(s string) (MetricBytes, error) {
n, err := ParseUnit(s, metricBytesUnitMap)
return MetricBytes(n), err
}
// TODO: represents 1000B as uppercase "KB", while SI standard requires "kB".
func (m MetricBytes) String() string {
return ToString(int64(m), 1000, "B", "B")
}
// ParseStrictBytes supports both iB and B suffixes for base 2 and metric,
// respectively. That is, KiB represents 1024 and kB, KB represent 1000.
func ParseStrictBytes(s string) (int64, error) {
n, err := ParseUnit(s, bytesUnitMap)
if err != nil {
n, err = ParseUnit(s, metricBytesUnitMap)
}
return int64(n), err
}

@ -0,0 +1,13 @@
// Package units provides helpful unit multipliers and functions for Go.
//
// The goal of this package is to have functionality similar to the time [1] package.
//
//
// [1] http://golang.org/pkg/time/
//
// It allows for code like this:
//
// n, err := ParseBase2Bytes("1KB")
// // n == 1024
// n = units.Mebibyte * 512
package units

@ -0,0 +1,3 @@
module github.com/alecthomas/units
require github.com/stretchr/testify v1.4.0

@ -0,0 +1,11 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

@ -0,0 +1,50 @@
package units
// SI units.
type SI int64
// SI unit multiples.
const (
Kilo SI = 1000
Mega = Kilo * 1000
Giga = Mega * 1000
Tera = Giga * 1000
Peta = Tera * 1000
Exa = Peta * 1000
)
func MakeUnitMap(suffix, shortSuffix string, scale int64) map[string]float64 {
res := map[string]float64{
shortSuffix: 1,
// see below for "k" / "K"
"M" + suffix: float64(scale * scale),
"G" + suffix: float64(scale * scale * scale),
"T" + suffix: float64(scale * scale * scale * scale),
"P" + suffix: float64(scale * scale * scale * scale * scale),
"E" + suffix: float64(scale * scale * scale * scale * scale * scale),
}
// Standard SI prefixes use lowercase "k" for kilo = 1000.
// For compatibility, and to be fool-proof, we accept both "k" and "K" in metric mode.
//
// However, official binary prefixes are always capitalized - "KiB" -
// and we specifically never parse "kB" as 1024B because:
//
// (1) people pedantic enough to use lowercase according to SI unlikely to abuse "k" to mean 1024 :-)
//
// (2) Use of capital K for 1024 was an informal tradition predating IEC prefixes:
// "The binary meaning of the kilobyte for 1024 bytes typically uses the symbol KB, with an
// uppercase letter K."
// -- https://en.wikipedia.org/wiki/Kilobyte#Base_2_(1024_bytes)
// "Capitalization of the letter K became the de facto standard for binary notation, although this
// could not be extended to higher powers, and use of the lowercase k did persist.[13][14][15]"
// -- https://en.wikipedia.org/wiki/Binary_prefix#History
// See also the extensive https://en.wikipedia.org/wiki/Timeline_of_binary_prefixes.
if scale == 1024 {
res["K"+suffix] = float64(scale)
} else {
res["k"+suffix] = float64(scale)
res["K"+suffix] = float64(scale)
}
return res
}

@ -0,0 +1,138 @@
package units
import (
"errors"
"fmt"
"strings"
)
var (
siUnits = []string{"", "K", "M", "G", "T", "P", "E"}
)
func ToString(n int64, scale int64, suffix, baseSuffix string) string {
mn := len(siUnits)
out := make([]string, mn)
for i, m := range siUnits {
if n%scale != 0 || i == 0 && n == 0 {
s := suffix
if i == 0 {
s = baseSuffix
}
out[mn-1-i] = fmt.Sprintf("%d%s%s", n%scale, m, s)
}
n /= scale
if n == 0 {
break
}
}
return strings.Join(out, "")
}
// Below code ripped straight from http://golang.org/src/pkg/time/format.go?s=33392:33438#L1123
var errLeadingInt = errors.New("units: bad [0-9]*") // never printed
// leadingInt consumes the leading [0-9]* from s.
func leadingInt(s string) (x int64, rem string, err error) {
i := 0
for ; i < len(s); i++ {
c := s[i]
if c < '0' || c > '9' {
break
}
if x >= (1<<63-10)/10 {
// overflow
return 0, "", errLeadingInt
}
x = x*10 + int64(c) - '0'
}
return x, s[i:], nil
}
func ParseUnit(s string, unitMap map[string]float64) (int64, error) {
// [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
orig := s
f := float64(0)
neg := false
// Consume [-+]?
if s != "" {
c := s[0]
if c == '-' || c == '+' {
neg = c == '-'
s = s[1:]
}
}
// Special case: if all that is left is "0", this is zero.
if s == "0" {
return 0, nil
}
if s == "" {
return 0, errors.New("units: invalid " + orig)
}
for s != "" {
g := float64(0) // this element of the sequence
var x int64
var err error
// The next character must be [0-9.]
if !(s[0] == '.' || ('0' <= s[0] && s[0] <= '9')) {
return 0, errors.New("units: invalid " + orig)
}
// Consume [0-9]*
pl := len(s)
x, s, err = leadingInt(s)
if err != nil {
return 0, errors.New("units: invalid " + orig)
}
g = float64(x)
pre := pl != len(s) // whether we consumed anything before a period
// Consume (\.[0-9]*)?
post := false
if s != "" && s[0] == '.' {
s = s[1:]
pl := len(s)
x, s, err = leadingInt(s)
if err != nil {
return 0, errors.New("units: invalid " + orig)
}
scale := 1.0
for n := pl - len(s); n > 0; n-- {
scale *= 10
}
g += float64(x) / scale
post = pl != len(s)
}
if !pre && !post {
// no digits (e.g. ".s" or "-.s")
return 0, errors.New("units: invalid " + orig)
}
// Consume unit.
i := 0
for ; i < len(s); i++ {
c := s[i]
if c == '.' || ('0' <= c && c <= '9') {
break
}
}
u := s[:i]
s = s[i:]
unit, ok := unitMap[u]
if !ok {
return 0, errors.New("units: unknown unit " + u + " in " + orig)
}
f += g * unit
}
if neg {
f = -f
}
if f < float64(-1<<63) || f > float64(1<<63-1) {
return 0, errors.New("units: overflow parsing unit")
}
return int64(f), nil
}

@ -0,0 +1,182 @@
package internal
import (
"bufio"
"bytes"
"compress/gzip"
"errors"
"io"
)
// NewStreamContentDecoder returns a reader that will decode the stream
// according to the encoding type.
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
switch encoding {
case "gzip":
return NewGzipReader(r)
case "identity", "":
return r, nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}
// GzipReader is similar to gzip.Reader but reads only a single gzip stream per read.
type GzipReader struct {
r io.Reader
z *gzip.Reader
endOfStream bool
}
func NewGzipReader(r io.Reader) (io.Reader, error) {
// We need a read that implements ByteReader in order to line up the next
// stream.
br := bufio.NewReader(r)
// Reads the first gzip stream header.
z, err := gzip.NewReader(br)
if err != nil {
return nil, err
}
// Prevent future calls to Read from reading the following gzip header.
z.Multistream(false)
return &GzipReader{r: br, z: z}, nil
}
func (r *GzipReader) Read(b []byte) (int, error) {
if r.endOfStream {
// Reads the next gzip header and prepares for the next stream.
err := r.z.Reset(r.r)
if err != nil {
return 0, err
}
r.z.Multistream(false)
r.endOfStream = false
}
n, err := r.z.Read(b)
// Since multistream is disabled, io.EOF indicates the end of the gzip
// sequence. On the next read we must read the next gzip header.
if err == io.EOF {
r.endOfStream = true
return n, nil
}
return n, err
}
// NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()
case "identity", "":
return NewIdentityEncoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}
// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
case "gzip":
return NewGzipDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}
// ContentEncoder applies a wrapper encoding to byte buffers.
type ContentEncoder interface {
Encode([]byte) ([]byte, error)
}
// GzipEncoder compresses the buffer using gzip at the default level.
type GzipEncoder struct {
writer *gzip.Writer
buf *bytes.Buffer
}
func NewGzipEncoder() (*GzipEncoder, error) {
var buf bytes.Buffer
return &GzipEncoder{
writer: gzip.NewWriter(&buf),
buf: &buf,
}, nil
}
func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
e.buf.Reset()
e.writer.Reset(e.buf)
_, err := e.writer.Write(data)
if err != nil {
return nil, err
}
err = e.writer.Close()
if err != nil {
return nil, err
}
return e.buf.Bytes(), nil
}
// IdentityEncoder is a null encoder that applies no transformation.
type IdentityEncoder struct{}
func NewIdentityEncoder() *IdentityEncoder {
return &IdentityEncoder{}
}
func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
return data, nil
}
// ContentDecoder removes a wrapper encoding from byte buffers.
type ContentDecoder interface {
Decode([]byte) ([]byte, error)
}
// GzipDecoder decompresses buffers with gzip compression.
type GzipDecoder struct {
reader *gzip.Reader
buf *bytes.Buffer
}
func NewGzipDecoder() (*GzipDecoder, error) {
return &GzipDecoder{
reader: new(gzip.Reader),
buf: new(bytes.Buffer),
}, nil
}
func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
d.reader.Reset(bytes.NewBuffer(data))
d.buf.Reset()
_, err := d.buf.ReadFrom(d.reader)
if err != nil && err != io.EOF {
return nil, err
}
err = d.reader.Close()
if err != nil {
return nil, err
}
return d.buf.Bytes(), nil
}
// IdentityDecoder is a null decoder that returns the input.
type IdentityDecoder struct{}
func NewIdentityDecoder() *IdentityDecoder {
return &IdentityDecoder{}
}
func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
return data, nil
}

@ -0,0 +1,44 @@
package internal
import (
"bytes"
"os/exec"
"time"
)
// CombinedOutputTimeout runs the given command with the given timeout and
// returns the combined output of stdout and stderr.
// If the command times out, it attempts to kill the process.
func CombinedOutputTimeout(c *exec.Cmd, timeout time.Duration) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
if err := c.Start(); err != nil {
return nil, err
}
err := WaitTimeout(c, timeout)
return b.Bytes(), err
}
// StdOutputTimeout runs the given command with the given timeout and
// returns the output of stdout.
// If the command times out, it attempts to kill the process.
func StdOutputTimeout(c *exec.Cmd, timeout time.Duration) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
c.Stderr = nil
if err := c.Start(); err != nil {
return nil, err
}
err := WaitTimeout(c, timeout)
return b.Bytes(), err
}
// RunTimeout runs the given command with the given timeout.
// If the command times out, it attempts to kill the process.
func RunTimeout(c *exec.Cmd, timeout time.Duration) error {
if err := c.Start(); err != nil {
return err
}
return WaitTimeout(c, timeout)
}

@ -0,0 +1,58 @@
// +build !windows
package internal
import (
"log"
"os/exec"
"syscall"
"time"
)
// KillGrace is the amount of time we allow a process to shutdown before
// sending a SIGKILL.
const KillGrace = 5 * time.Second
// WaitTimeout waits for the given command to finish with a timeout.
// It assumes the command has already been started.
// If the command times out, it attempts to kill the process.
func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
var kill *time.Timer
term := time.AfterFunc(timeout, func() {
err := c.Process.Signal(syscall.SIGTERM)
if err != nil {
log.Printf("E! [agent] Error terminating process: %s", err)
return
}
kill = time.AfterFunc(KillGrace, func() {
err := c.Process.Kill()
if err != nil {
log.Printf("E! [agent] Error killing process: %s", err)
return
}
})
})
err := c.Wait()
// Shutdown all timers
if kill != nil {
kill.Stop()
}
termSent := !term.Stop()
// If the process exited without error treat it as success. This allows a
// process to do a clean shutdown on signal.
if err == nil {
return nil
}
// If SIGTERM was sent then treat any process error as a timeout.
if termSent {
return TimeoutErr
}
// Otherwise there was an error unrelated to termination.
return err
}

@ -0,0 +1,41 @@
// +build windows
package internal
import (
"log"
"os/exec"
"time"
)
// WaitTimeout waits for the given command to finish with a timeout.
// It assumes the command has already been started.
// If the command times out, it attempts to kill the process.
func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
timer := time.AfterFunc(timeout, func() {
err := c.Process.Kill()
if err != nil {
log.Printf("E! [agent] Error killing process: %s", err)
return
}
})
err := c.Wait()
// Shutdown all timers
termSent := !timer.Stop()
// If the process exited without error treat it as success. This allows a
// process to do a clean shutdown on signal.
if err == nil {
return nil
}
// If SIGTERM was sent then treat any process error as a timeout.
if termSent {
return TimeoutErr
}
// Otherwise there was an error unrelated to termination.
return err
}

@ -0,0 +1,143 @@
package internal
import (
"crypto/subtle"
"net"
"net/http"
"net/url"
)
type BasicAuthErrorFunc func(rw http.ResponseWriter)
// AuthHandler returns a http handler that requires HTTP basic auth
// credentials to match the given username and password.
func AuthHandler(username, password, realm string, onError BasicAuthErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &basicAuthHandler{
username: username,
password: password,
realm: realm,
onError: onError,
next: h,
}
}
}
type basicAuthHandler struct {
username string
password string
realm string
onError BasicAuthErrorFunc
next http.Handler
}
func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if h.username != "" || h.password != "" {
reqUsername, reqPassword, ok := req.BasicAuth()
if !ok ||
subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 ||
subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 {
rw.Header().Set("WWW-Authenticate", "Basic realm=\""+h.realm+"\"")
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
}
h.next.ServeHTTP(rw, req)
}
type GenericAuthErrorFunc func(rw http.ResponseWriter)
// GenericAuthHandler returns a http handler that requires `Authorization: <credentials>`
func GenericAuthHandler(credentials string, onError GenericAuthErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &genericAuthHandler{
credentials: credentials,
onError: onError,
next: h,
}
}
}
// Generic auth scheme handler - exact match on `Authorization: <credentials>`
type genericAuthHandler struct {
credentials string
onError GenericAuthErrorFunc
next http.Handler
}
func (h *genericAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if h.credentials != "" {
// Scheme checking
authorization := req.Header.Get("Authorization")
if subtle.ConstantTimeCompare([]byte(authorization), []byte(h.credentials)) != 1 {
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
}
h.next.ServeHTTP(rw, req)
}
// ErrorFunc is a callback for writing an error response.
type ErrorFunc func(rw http.ResponseWriter, code int)
// IPRangeHandler returns a http handler that requires the remote address to be
// in the specified network.
func IPRangeHandler(network []*net.IPNet, onError ErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &ipRangeHandler{
network: network,
onError: onError,
next: h,
}
}
}
type ipRangeHandler struct {
network []*net.IPNet
onError ErrorFunc
next http.Handler
}
func (h *ipRangeHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if len(h.network) == 0 {
h.next.ServeHTTP(rw, req)
return
}
remoteIPString, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
h.onError(rw, http.StatusForbidden)
return
}
remoteIP := net.ParseIP(remoteIPString)
if remoteIP == nil {
h.onError(rw, http.StatusForbidden)
return
}
for _, net := range h.network {
if net.Contains(remoteIP) {
h.next.ServeHTTP(rw, req)
return
}
}
h.onError(rw, http.StatusForbidden)
}
func OnClientError(client *http.Client, err error) {
// Close connection after a timeout error. If this is a HTTP2
// connection this ensures that next interval a new connection will be
// used and name lookup will be performed.
// https://github.com/golang/go/issues/36026
if err, ok := err.(*url.Error); ok && err.Timeout() {
client.CloseIdleConnections()
}
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,61 @@
// +build !windows
package internal
const Usage = `Telegraf, The plugin-driven server agent for collecting and reporting metrics.
Usage:
telegraf [commands|flags]
The commands & flags are:
config print out full sample configuration to stdout
version print the version to stdout
--aggregator-filter <filter> filter the aggregators to enable, separator is :
--config <file> configuration file to load
--config-directory <directory> directory containing additional *.conf files
--plugin-directory directory containing *.so files, this directory will be
searched recursively. Any Plugin found will be loaded
and namespaced.
--debug turn on debug logging
--input-filter <filter> filter the inputs to enable, separator is :
--input-list print available input plugins.
--output-filter <filter> filter the outputs to enable, separator is :
--output-list print available output plugins.
--pidfile <file> file to write our pid to
--pprof-addr <address> pprof address to listen on, don't activate pprof if empty
--processor-filter <filter> filter the processors to enable, separator is :
--quiet run in quiet mode
--section-filter filter config sections to output, separator is :
Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs'
--sample-config print out full sample configuration
--once enable once mode: gather metrics once, write them, and exit
--test enable test mode: gather metrics once and print them
--test-wait wait up to this many seconds for service
inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit
Examples:
# generate a telegraf config file:
telegraf config > telegraf.conf
# generate config with only cpu input & influxdb output plugins defined
telegraf --input-filter cpu --output-filter influxdb config
# run a single telegraf collection, outputting metrics to stdout
telegraf --config telegraf.conf --test
# run telegraf with all plugins defined in config file
telegraf --config telegraf.conf
# run telegraf, enabling the cpu & memory input, and influxdb output plugins
telegraf --config telegraf.conf --input-filter cpu:mem --output-filter influxdb
# run telegraf with pprof
telegraf --config telegraf.conf --pprof-addr localhost:6060
`

@ -0,0 +1,72 @@
// +build windows
package internal
const Usage = `Telegraf, The plugin-driven server agent for collecting and reporting metrics.
Usage:
telegraf [commands|flags]
The commands & flags are:
config print out full sample configuration to stdout
version print the version to stdout
--aggregator-filter <filter> filter the aggregators to enable, separator is :
--config <file> configuration file to load
--config-directory <directory> directory containing additional *.conf files
--debug turn on debug logging
--input-filter <filter> filter the inputs to enable, separator is :
--input-list print available input plugins.
--output-filter <filter> filter the outputs to enable, separator is :
--output-list print available output plugins.
--pidfile <file> file to write our pid to
--pprof-addr <address> pprof address to listen on, don't activate pprof if empty
--processor-filter <filter> filter the processors to enable, separator is :
--quiet run in quiet mode
--sample-config print out full sample configuration
--section-filter filter config sections to output, separator is :
Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs'
--once enable once mode: gather metrics once, write them, and exit
--test enable test mode: gather metrics once and print them
--test-wait wait up to this many seconds for service
inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit
--console run as console application (windows only)
--service <service> operate on the service (windows only)
--service-name service name (windows only)
--service-display-name service display name (windows only)
Examples:
# generate a telegraf config file:
telegraf config > telegraf.conf
# generate config with only cpu input & influxdb output plugins defined
telegraf --input-filter cpu --output-filter influxdb config
# run a single telegraf collection, outputting metrics to stdout
telegraf --config telegraf.conf --test
# run telegraf with all plugins defined in config file
telegraf --config telegraf.conf
# run telegraf, enabling the cpu & memory input, and influxdb output plugins
telegraf --config telegraf.conf --input-filter cpu:mem --output-filter influxdb
# run telegraf with pprof
telegraf --config telegraf.conf --pprof-addr localhost:6060
# run telegraf without service controller
telegraf --console install --config "C:\Program Files\Telegraf\telegraf.conf"
# install telegraf service
telegraf --service install --config "C:\Program Files\Telegraf\telegraf.conf"
# install telegraf service with custom name
telegraf --service install --service-name=my-telegraf --service-display-name="My Telegraf"
`

@ -0,0 +1,57 @@
# Nginx Input Plugin
### Configuration:
```toml
# Read Nginx's basic status information (ngx_http_stub_status_module)
[[inputs.nginx]]
## An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/server_status"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP response timeout (default: 5s)
response_timeout = "5s"
```
### Measurements & Fields:
- Measurement
- accepts
- active
- handled
- reading
- requests
- waiting
- writing
### Tags:
- All measurements have the following tags:
- port
- server
### Example Output:
Using this configuration:
```toml
[[inputs.nginx]]
## An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/status"]
```
When run with:
```sh
./telegraf --config telegraf.conf --input-filter nginx --test
```
It produces:
```
* Plugin: nginx, Collection 1
> nginx,port=80,server=localhost accepts=605i,active=2i,handled=605i,reading=0i,requests=12132i,waiting=1i,writing=1i 1456690994701784331
```

@ -0,0 +1,207 @@
package nginx
import (
"bufio"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Nginx struct {
Urls []string
ResponseTimeout internal.Duration
tls.ClientConfig
// HTTP client
client *http.Client
}
var sampleConfig = `
# An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/server_status"]
## Optional TLS Config
tls_ca = "/etc/telegraf/ca.pem"
tls_cert = "/etc/telegraf/cert.cer"
tls_key = "/etc/telegraf/key.key"
## Use TLS but skip chain & host verification
insecure_skip_verify = false
# HTTP response timeout (default: 5s)
response_timeout = "5s"
`
func (n *Nginx) SampleConfig() string {
return sampleConfig
}
func (n *Nginx) Description() string {
return "Read Nginx's basic status information (ngx_http_stub_status_module)"
}
func (n *Nginx) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Create an HTTP client that is re-used for each
// collection interval
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
for _, u := range n.Urls {
addr, err := url.Parse(u)
if err != nil {
acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err))
continue
}
wg.Add(1)
go func(addr *url.URL) {
defer wg.Done()
acc.AddError(n.gatherUrl(addr, acc))
}(addr)
}
wg.Wait()
return nil
}
func (n *Nginx) createHttpClient() (*http.Client, error) {
tlsCfg, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
if n.ResponseTimeout.Duration < time.Second {
n.ResponseTimeout.Duration = time.Second * 5
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
}
return client, nil
}
func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := n.client.Get(addr.String())
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status)
}
r := bufio.NewReader(resp.Body)
// Active connections
_, err = r.ReadString(':')
if err != nil {
return err
}
line, err := r.ReadString('\n')
if err != nil {
return err
}
active, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64)
if err != nil {
return err
}
// Server accepts handled requests
_, err = r.ReadString('\n')
if err != nil {
return err
}
line, err = r.ReadString('\n')
if err != nil {
return err
}
data := strings.Fields(line)
accepts, err := strconv.ParseUint(data[0], 10, 64)
if err != nil {
return err
}
handled, err := strconv.ParseUint(data[1], 10, 64)
if err != nil {
return err
}
requests, err := strconv.ParseUint(data[2], 10, 64)
if err != nil {
return err
}
// Reading/Writing/Waiting
line, err = r.ReadString('\n')
if err != nil {
return err
}
data = strings.Fields(line)
reading, err := strconv.ParseUint(data[1], 10, 64)
if err != nil {
return err
}
writing, err := strconv.ParseUint(data[3], 10, 64)
if err != nil {
return err
}
waiting, err := strconv.ParseUint(data[5], 10, 64)
if err != nil {
return err
}
tags := getTags(addr)
fields := map[string]interface{}{
"active": active,
"accepts": accepts,
"handled": handled,
"requests": requests,
"reading": reading,
"writing": writing,
"waiting": waiting,
}
acc.AddFields("nginx", fields, tags)
return nil
}
// Get tag(s) for the nginx plugin
func getTags(addr *url.URL) map[string]string {
h := addr.Host
host, port, err := net.SplitHostPort(h)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
return map[string]string{"server": host, "port": port}
}
func init() {
inputs.Add("nginx", func() telegraf.Input {
return &Nginx{}
})
}

@ -4,6 +4,8 @@ github.com/BurntSushi/toml
github.com/MichaelTJones/pcg
# github.com/Shopify/sarama v1.27.1
github.com/Shopify/sarama
# github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/alecthomas/units
# github.com/apache/thrift v0.13.0 => github.com/m3db/thrift v0.0.0-20190820191926-05b5a2227fe4
github.com/apache/thrift/lib/go/thrift
# github.com/beorn7/perks v1.0.1
@ -138,12 +140,14 @@ github.com/influxdata/influxdb/models
github.com/influxdata/influxdb/pkg/escape
# github.com/influxdata/telegraf v1.16.2
github.com/influxdata/telegraf
github.com/influxdata/telegraf/internal
github.com/influxdata/telegraf/metric
github.com/influxdata/telegraf/plugins/common/tls
github.com/influxdata/telegraf/plugins/inputs
github.com/influxdata/telegraf/plugins/inputs/mysql
github.com/influxdata/telegraf/plugins/inputs/mysql/v1
github.com/influxdata/telegraf/plugins/inputs/mysql/v2
github.com/influxdata/telegraf/plugins/inputs/nginx
github.com/influxdata/telegraf/plugins/inputs/redis
github.com/influxdata/telegraf/selfstat
github.com/influxdata/telegraf/testutil

Loading…
Cancel
Save