No 'this' anymore (#135)

master
陈键冬 5 years ago committed by GitHub
parent f609a84c18
commit a45ca89ecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -76,7 +76,7 @@ func Parse(conf string) error {
err = viper.Unmarshal(&Config)
if err != nil {
return fmt.Errorf("Unmarshal %v", err)
return fmt.Errorf("unmarshal config error:%v", err)
}
return nil

@ -13,7 +13,7 @@ import (
)
// cached时间周期
const CACHED_DURATION = 60
const cachedDuration = 60
type counterCache struct {
sync.RWMutex
@ -31,13 +31,6 @@ func init() {
go CleanLoop()
}
func (this *counterCache) AddPoint(tms int64, value float64) {
this.Lock()
tmsStr := fmt.Sprintf("%d", tms)
this.Points[tmsStr] = value
this.Unlock()
}
func PostToCache(paramPoints []*dataobj.MetricValue) {
for _, point := range paramPoints {
globalPushPoints.AddPoint(point)
@ -62,69 +55,76 @@ func GetCachedAll() string {
return string(str)
}
func (this *counterCache) GetKeys() []string {
this.RLock()
func (cc *counterCache) AddPoint(tms int64, value float64) {
cc.Lock()
tmsStr := fmt.Sprintf("%d", tms)
cc.Points[tmsStr] = value
cc.Unlock()
}
func (cc *counterCache) GetKeys() []string {
cc.RLock()
retList := make([]string, 0)
for k := range this.Points {
for k := range cc.Points {
retList = append(retList, k)
}
this.RUnlock()
cc.RUnlock()
return retList
}
func (this *counterCache) RemoveTms(tms string) {
this.Lock()
delete(this.Points, tms)
this.Unlock()
func (cc *counterCache) RemoveTms(tms string) {
cc.Lock()
delete(cc.Points, tms)
cc.Unlock()
}
func (this *pushPointsCache) AddCounter(counter string) {
this.Lock()
func (pc *pushPointsCache) AddCounter(counter string) {
pc.Lock()
tmp := new(counterCache)
tmp.Points = make(map[string]float64, 0)
this.Counters[counter] = tmp
this.Unlock()
tmp.Points = make(map[string]float64)
pc.Counters[counter] = tmp
pc.Unlock()
}
func (this *pushPointsCache) GetCounters() []string {
func (pc *pushPointsCache) GetCounters() []string {
ret := make([]string, 0)
this.RLock()
for k := range this.Counters {
pc.RLock()
for k := range pc.Counters {
ret = append(ret, k)
}
this.RUnlock()
pc.RUnlock()
return ret
}
func (this *pushPointsCache) RemoveCounter(counter string) {
this.Lock()
delete(this.Counters, counter)
this.Unlock()
func (pc *pushPointsCache) RemoveCounter(counter string) {
pc.Lock()
delete(pc.Counters, counter)
pc.Unlock()
}
func (this *pushPointsCache) GetCounterObj(key string) (*counterCache, bool) {
this.RLock()
Points, ok := this.Counters[key]
this.RUnlock()
func (pc *pushPointsCache) GetCounterObj(key string) (*counterCache, bool) {
pc.RLock()
Points, ok := pc.Counters[key]
pc.RUnlock()
return Points, ok
}
func (this *pushPointsCache) AddPoint(point *dataobj.MetricValue) {
func (pc *pushPointsCache) AddPoint(point *dataobj.MetricValue) {
counter := calcCounter(point)
if _, ok := this.GetCounterObj(counter); !ok {
this.AddCounter(counter)
if _, ok := pc.GetCounterObj(counter); !ok {
pc.AddCounter(counter)
}
counterPoints, exists := this.GetCounterObj(counter)
counterPoints, exists := pc.GetCounterObj(counter)
if exists {
counterPoints.AddPoint(point.Timestamp, point.Value)
}
}
func (this *pushPointsCache) CleanOld() {
counters := this.GetCounters()
func (pc *pushPointsCache) CleanOld() {
counters := pc.GetCounters()
for _, counter := range counters {
counterObj, exists := this.GetCounterObj(counter)
counterObj, exists := pc.GetCounterObj(counter)
if !exists {
continue
}
@ -132,20 +132,21 @@ func (this *pushPointsCache) CleanOld() {
//如果列表为空清理掉这个counter
if len(tmsList) == 0 {
this.RemoveCounter(counter)
} else {
pc.RemoveCounter(counter)
continue
}
for _, tmsStr := range tmsList {
tms, err := strconv.Atoi(tmsStr)
if err != nil {
logger.Errorf("clean cached point, atoi error : [%v]", err)
counterObj.RemoveTms(tmsStr)
} else if (time.Now().Unix() - int64(tms)) > CACHED_DURATION {
} else if (time.Now().Unix() - int64(tms)) > cachedDuration {
counterObj.RemoveTms(tmsStr)
}
}
}
}
}
func calcCounter(point *dataobj.MetricValue) string {
tagstring := dataobj.SortedTags(point.TagsMap)

File diff suppressed because it is too large Load Diff

@ -40,30 +40,6 @@ type WorkerGroup struct {
TimeFormatStrategy string
}
func (this WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) {
return this.LatestTms, this.MaxDelay
}
func (this *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
latest := atomic.LoadInt64(&this.LatestTms)
if latest < tms {
swapped := atomic.CompareAndSwapInt64(&this.LatestTms, latest, tms)
if swapped {
logger.Debugf("[work group:%s][set latestTms:%d]", this.Workers[0].Mark, tms)
}
}
if delay == 0 {
return
}
newest := atomic.LoadInt64(&this.MaxDelay)
if newest < delay {
atomic.CompareAndSwapInt64(&this.MaxDelay, newest, delay)
}
}
/*
* filepathstream
*/
@ -94,6 +70,30 @@ func NewWorkerGroup(filePath string, stream chan string) *WorkerGroup {
return wg
}
func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) {
return wg.LatestTms, wg.MaxDelay
}
func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
latest := atomic.LoadInt64(&wg.LatestTms)
if latest < tms {
swapped := atomic.CompareAndSwapInt64(&wg.LatestTms, latest, tms)
if swapped {
logger.Debugf("[work group:%s][set latestTms:%d]", wg.Workers[0].Mark, tms)
}
}
if delay == 0 {
return
}
newest := atomic.LoadInt64(&wg.MaxDelay)
if newest < delay {
atomic.CompareAndSwapInt64(&wg.MaxDelay, newest, delay)
}
}
func (wg *WorkerGroup) Start() {
for _, worker := range wg.Workers {
worker.Start()

Loading…
Cancel
Save