|
|
|
@ -19,6 +19,7 @@ import (
|
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
|
|
|
"github.com/prometheus/prometheus/prompb"
|
|
|
|
|
"github.com/toolkits/pkg/logger"
|
|
|
|
|
|
|
|
|
|
"github.com/didi/nightingale/v5/vos"
|
|
|
|
|
)
|
|
|
|
@ -137,8 +138,15 @@ func remoteWritePost(c *HttpClient, req []byte) error {
|
|
|
|
|
if scanner.Scan() {
|
|
|
|
|
line = scanner.Text()
|
|
|
|
|
}
|
|
|
|
|
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
|
|
|
|
|
|
|
|
|
if httpResp.StatusCode == 400 {
|
|
|
|
|
//400的错误是客户端的问题,不返回给上层,输出到debug日志中
|
|
|
|
|
logger.Debugf("server returned HTTP status %s: %s req:%v", httpResp.Status, line, getSamples(req))
|
|
|
|
|
} else {
|
|
|
|
|
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if httpResp.StatusCode/100 == 5 {
|
|
|
|
|
return RecoverableError{err}
|
|
|
|
|
}
|
|
|
|
@ -160,3 +168,16 @@ func (pd *PromeDataSource) buildWriteRequest(samples []prompb.TimeSeries) ([]byt
|
|
|
|
|
compressed := snappy.Encode(nil, data)
|
|
|
|
|
return compressed, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getSamples(compressed []byte) []prompb.TimeSeries {
|
|
|
|
|
var samples []prompb.TimeSeries
|
|
|
|
|
req := &prompb.WriteRequest{
|
|
|
|
|
Timeseries: samples,
|
|
|
|
|
Metadata: nil,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
d, _ := snappy.Decode(nil, compressed)
|
|
|
|
|
proto.Unmarshal(d, req)
|
|
|
|
|
|
|
|
|
|
return req.Timeseries
|
|
|
|
|
}
|
|
|
|
|