diff --git a/src/modules/judge/backend/query/init.go b/src/modules/judge/backend/query/init.go index a93b52da..a11a7d8a 100644 --- a/src/modules/judge/backend/query/init.go +++ b/src/modules/judge/backend/query/init.go @@ -2,10 +2,11 @@ package query import ( "github.com/didi/nightingale/src/toolkits/address" + "github.com/didi/nightingale/src/toolkits/pools" ) var ( - TransferConnPools *ConnPools = &ConnPools{} + TransferConnPools *pools.ConnPools connTimeout int32 callTimeout int32 @@ -24,6 +25,7 @@ type SeriesQuerySection struct { func Init(cfg SeriesQuerySection) { Config = cfg - TransferConnPools = CreateConnPools(Config.MaxConn, Config.MaxIdle, - Config.ConnTimeout, Config.CallTimeout, address.GetRPCAddresses("transfer")) + TransferConnPools = pools.NewConnPools( + Config.MaxConn, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, address.GetRPCAddresses("transfer"), + ) } diff --git a/src/modules/judge/backend/query/pool.go b/src/modules/judge/backend/query/pool.go deleted file mode 100644 index 06bfb1e4..00000000 --- a/src/modules/judge/backend/query/pool.go +++ /dev/null @@ -1,133 +0,0 @@ -package query - -import ( - "bufio" - "fmt" - "io" - "math/rand" - "net" - "net/rpc" - "reflect" - "sync" - "time" - - "github.com/toolkits/pkg/pool" - "github.com/ugorji/go/codec" -) - -// 每个后端backend对应一个ConnPool -type ConnPools struct { - sync.RWMutex - Pools []*pool.ConnPool - MaxConns int - MaxIdle int - ConnTimeout int - CallTimeout int -} - -func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools { - cp := &ConnPools{Pools: []*pool.ConnPool{}, MaxConns: maxConns, MaxIdle: maxIdle, - ConnTimeout: connTimeout, CallTimeout: callTimeout} - - ct := time.Duration(cp.ConnTimeout) * time.Millisecond - for _, address := range cluster { - cp.Pools = append(cp.Pools, createOnePool(address, address, ct, maxConns, maxIdle)) - } - - return cp -} - -func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool { - p := pool.NewConnPool(name, address, maxConns, maxIdle) - p.New = func(connName string) (pool.NConn, error) { - //校验地址是否正确 - _, err := net.ResolveTCPAddr("tcp", p.Address) - if err != nil { - return nil, err - } - - conn, err := net.DialTimeout("tcp", p.Address, connTimeout) - if err != nil { - return nil, err - } - var mh codec.MsgpackHandle - mh.MapType = reflect.TypeOf(map[string]interface{}(nil)) - - var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser - io.Closer - *bufio.Reader - *bufio.Writer - }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} - - rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh) - return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil - } - return p -} - -// 同步发送, 完成发送或超时后 才能返回 -func (cp *ConnPools) Call(method string, args interface{}, resp interface{}) error { - connPool := cp.Get() - - conn, err := connPool.Fetch() - if err != nil { - return fmt.Errorf("get connection fail: conn %v, err %v. proc: %s", conn, err, connPool.Proc()) - } - - rpcClient := conn.(RpcClient) - callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond - - done := make(chan error, 1) - go func() { - done <- rpcClient.Call(method, args, resp) - }() - - select { - case <-time.After(callTimeout): - connPool.ForceClose(conn) - return fmt.Errorf("%v, call timeout", connPool.Proc()) - case err = <-done: - if err != nil { - connPool.ForceClose(conn) - err = fmt.Errorf(" call failed, err %v. proc: %s", err, connPool.Proc()) - } else { - connPool.Release(conn) - } - return err - } -} - -func (cp *ConnPools) Get() *pool.ConnPool { - cp.RLock() - defer cp.RUnlock() - i := rand.Intn(len(cp.Pools)) - - return cp.Pools[i] -} - -// RpcCient, 要实现io.Closer接口 -type RpcClient struct { - cli *rpc.Client - name string -} - -func (r RpcClient) Name() string { - return r.name -} - -func (r RpcClient) Closed() bool { - return r.cli == nil -} - -func (r RpcClient) Close() error { - if r.cli != nil { - err := r.cli.Close() - r.cli = nil - return err - } - return nil -} - -func (r RpcClient) Call(method string, args interface{}, reply interface{}) error { - return r.cli.Call(method, args, reply) -} diff --git a/src/modules/judge/backend/query/query.go b/src/modules/judge/backend/query/query.go index 2ed4c569..e09fb097 100644 --- a/src/modules/judge/backend/query/query.go +++ b/src/modules/judge/backend/query/query.go @@ -41,7 +41,7 @@ func Query(reqs []*dataobj.QueryData) ([]*dataobj.TsdbQueryResponse, error) { var resp *dataobj.QueryDataResp var err error for i := 0; i < 3; i++ { - err = TransferConnPools.Call("Transfer.Query", reqs, &resp) + err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp) if err == nil { break } diff --git a/src/modules/transfer/backend/init.go b/src/modules/transfer/backend/init.go index 38d718f8..f9737476 100644 --- a/src/modules/transfer/backend/init.go +++ b/src/modules/transfer/backend/init.go @@ -3,10 +3,10 @@ package backend import ( "github.com/toolkits/pkg/container/list" "github.com/toolkits/pkg/container/set" - "github.com/toolkits/pkg/pool" "github.com/toolkits/pkg/str" "github.com/didi/nightingale/src/modules/transfer/cache" + "github.com/didi/nightingale/src/toolkits/pools" "github.com/didi/nightingale/src/toolkits/report" "github.com/didi/nightingale/src/toolkits/stats" ) @@ -44,8 +44,8 @@ var ( JudgeQueues = cache.SafeJudgeQueue{} // 连接池 node_address -> connection_pool - TsdbConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} - JudgeConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} + TsdbConnPools *pools.ConnPools + JudgeConnPools *pools.ConnPools connTimeout int32 callTimeout int32 @@ -75,11 +75,13 @@ func initConnPools() { tsdbInstances.Add(addr) } } - TsdbConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle, - Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice()) + TsdbConnPools = pools.NewConnPools( + Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice(), + ) - JudgeConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle, - Config.ConnTimeout, Config.CallTimeout, GetJudges()) + JudgeConnPools = pools.NewConnPools( + Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, GetJudges(), + ) } func initSendQueues() { diff --git a/src/modules/transfer/backend/query.go b/src/modules/transfer/backend/query.go index 22da12c1..ef8658f0 100644 --- a/src/modules/transfer/backend/query.go +++ b/src/modules/transfer/backend/query.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/rand" "strings" @@ -12,6 +13,7 @@ import ( "github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/modules/transfer/calc" "github.com/didi/nightingale/src/toolkits/address" + "github.com/didi/nightingale/src/toolkits/pools" "github.com/didi/nightingale/src/toolkits/stats" "github.com/toolkits/pkg/logger" @@ -235,15 +237,15 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err resp = &dataobj.TsdbQueryResponse{} pk := dataobj.PKWithCounter(para.Endpoint, para.Counter) - pools, err := SelectPoolByPK(pk) + ps, err := SelectPoolByPK(pk) if err != nil { return resp, err } - count := len(pools) + count := len(ps) for _, i := range rand.Perm(count) { - onePool := pools[i].Pool - addr := pools[i].Addr + onePool := ps[i].Pool + addr := ps[i].Addr conn, err := onePool.Fetch() if err != nil { @@ -251,7 +253,7 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err continue } - rpcConn := conn.(RpcClient) + rpcConn := conn.(pools.RpcClient) if rpcConn.Closed() { onePool.ForceClose(conn) @@ -322,21 +324,21 @@ func SelectPoolByPK(pk string) ([]Pool, error) { return []Pool{}, errors.New("node not found") } - var pools []Pool + var ps []Pool for _, addr := range nodeAddrs.Addrs { onePool, found := TsdbConnPools.Get(addr) if !found { logger.Errorf("addr %s not found", addr) continue } - pools = append(pools, Pool{Pool: onePool, Addr: addr}) + ps = append(ps, Pool{Pool: onePool, Addr: addr}) } - if len(pools) < 1 { - return pools, errors.New("addr not found") + if len(ps) < 1 { + return ps, errors.New("addr not found") } - return pools, nil + return ps, nil } func getTags(counter string) (tags string) { diff --git a/src/modules/transfer/cron/pool.go b/src/modules/transfer/cron/pool.go index bafe6e93..052a5cd4 100644 --- a/src/modules/transfer/cron/pool.go +++ b/src/modules/transfer/cron/pool.go @@ -16,6 +16,6 @@ func RebuildJudgePool() { continue } - backend.JudgeConnPools.Update(judges) + backend.JudgeConnPools.UpdatePools(judges) } } diff --git a/src/modules/tsdb/backend/rpc/init.go b/src/modules/tsdb/backend/rpc/init.go index 0d2dd549..61a8827d 100644 --- a/src/modules/tsdb/backend/rpc/init.go +++ b/src/modules/tsdb/backend/rpc/init.go @@ -1,12 +1,12 @@ package rpc import ( - "github.com/toolkits/pkg/pool" + "github.com/didi/nightingale/src/toolkits/pools" ) var ( // 连接池 node_address -> connection_pool - IndexConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} + IndexConnPools *pools.ConnPools Config RpcClientSection ) @@ -17,12 +17,11 @@ type RpcClientSection struct { CallTimeout int `yaml:"callTimeout"` } -func Init(cfg RpcClientSection, indexs []string) { +func Init(cfg RpcClientSection, indexes []string) { Config = cfg - IndexConnPools = CreateConnPools(cfg.MaxConns, cfg.MaxIdle, - cfg.ConnTimeout, cfg.CallTimeout, indexs) + IndexConnPools = pools.NewConnPools(cfg.MaxConns, cfg.MaxIdle, cfg.ConnTimeout, cfg.CallTimeout, indexes) } -func ReNewPools(indexs []string) []string { - return IndexConnPools.UpdatePools(indexs) +func ReNewPools(indexes []string) []string { + return IndexConnPools.UpdatePools(indexes) } diff --git a/src/modules/tsdb/backend/rpc/pool.go b/src/modules/tsdb/backend/rpc/pool.go deleted file mode 100644 index c97952ca..00000000 --- a/src/modules/tsdb/backend/rpc/pool.go +++ /dev/null @@ -1,170 +0,0 @@ -package rpc - -import ( - "bufio" - "fmt" - "io" - "net" - "net/rpc" - "reflect" - "sync" - "time" - - "github.com/toolkits/pkg/pool" - - "github.com/ugorji/go/codec" -) - -// 每个后端backend对应一个ConnPool -type ConnPools struct { - sync.RWMutex - M map[string]*pool.ConnPool - MaxConns int - MaxIdle int - ConnTimeout int - CallTimeout int -} - -func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools { - cp := &ConnPools{M: make(map[string]*pool.ConnPool), MaxConns: maxConns, MaxIdle: maxIdle, - ConnTimeout: connTimeout, CallTimeout: callTimeout} - - ct := time.Duration(cp.ConnTimeout) * time.Millisecond - for _, address := range cluster { - if _, exist := cp.M[address]; exist { - continue - } - cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle) - } - - return cp -} - -func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool { - p := pool.NewConnPool(name, address, maxConns, maxIdle) - p.New = func(connName string) (pool.NConn, error) { - //校验地址是否正确 - _, err := net.ResolveTCPAddr("tcp", p.Address) - if err != nil { - return nil, err - } - - conn, err := net.DialTimeout("tcp", p.Address, connTimeout) - if err != nil { - return nil, err - } - var mh codec.MsgpackHandle - mh.MapType = reflect.TypeOf(map[string]interface{}(nil)) - - var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser - io.Closer - *bufio.Reader - *bufio.Writer - }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} - - rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh) - return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil - } - return p -} - -// 同步发送, 完成发送或超时后 才能返回 -func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error { - connPool, exists := this.Get(addr) - if !exists { - return fmt.Errorf("%s has no connection pool", addr) - } - - conn, err := connPool.Fetch() - if err != nil { - return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc()) - } - - rpcClient := conn.(RpcClient) - callTimeout := time.Duration(this.CallTimeout) * time.Millisecond - - done := make(chan error, 1) - go func() { - done <- rpcClient.Call(method, args, resp) - }() - - select { - case <-time.After(callTimeout): - connPool.ForceClose(conn) - return fmt.Errorf("%s, call timeout", addr) - case err = <-done: - if err != nil { - connPool.ForceClose(conn) - err = fmt.Errorf("%s, call failed, err %v. proc: %s", addr, err, connPool.Proc()) - } else { - connPool.Release(conn) - } - return err - } -} - -func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) { - this.RLock() - defer this.RUnlock() - p, exists := this.M[address] - return p, exists -} - -func (c *ConnPools) UpdatePools(addrs []string) []string { - c.Lock() - defer c.Unlock() - newAddrs := []string{} - - if len(addrs) == 0 { - c.M = make(map[string]*pool.ConnPool) - return newAddrs - } - addrMap := make(map[string]struct{}) - - ct := time.Duration(c.ConnTimeout) * time.Millisecond - for _, addr := range addrs { - addrMap[addr] = struct{}{} - _, exists := c.M[addr] - if exists { - continue - } - newAddrs = append(newAddrs, addr) - c.M[addr] = createOnePool(addr, addr, ct, c.MaxConns, c.MaxIdle) - } - - for addr := range c.M { //删除旧的地址 - if _, exists := addrMap[addr]; !exists { - delete(c.M, addr) - } - } - - return newAddrs - -} - -// RpcCient, 要实现io.Closer接口 -type RpcClient struct { - cli *rpc.Client - name string -} - -func (this RpcClient) Name() string { - return this.name -} - -func (this RpcClient) Closed() bool { - return this.cli == nil -} - -func (this RpcClient) Close() error { - if this.cli != nil { - err := this.cli.Close() - this.cli = nil - return err - } - return nil -} - -func (this RpcClient) Call(method string, args interface{}, reply interface{}) error { - return this.cli.Call(method, args, reply) -} diff --git a/src/modules/tsdb/migrate/init.go b/src/modules/tsdb/migrate/init.go index 6546ed79..cb2d6c21 100644 --- a/src/modules/tsdb/migrate/init.go +++ b/src/modules/tsdb/migrate/init.go @@ -6,8 +6,9 @@ import ( "github.com/toolkits/pkg/container/list" "github.com/toolkits/pkg/container/set" "github.com/toolkits/pkg/logger" - "github.com/toolkits/pkg/pool" "github.com/toolkits/pkg/str" + + "github.com/didi/nightingale/src/toolkits/pools" ) type MigrateSection struct { @@ -39,8 +40,8 @@ var ( NewTsdbNodeRing *ConsistentHashRing // 连接池 node_address -> connection_pool - TsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} - NewTsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} + TsdbConnPools *pools.ConnPools + NewTsdbConnPools *pools.ConnPools ) type QueueFilter struct { @@ -87,16 +88,18 @@ func initConnPools() { for _, addr := range Config.OldCluster { tsdbInstances.Add(addr) } - TsdbConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle, - Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice()) + TsdbConnPools = pools.NewConnPools( + Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice(), + ) // tsdb newTsdbInstances := set.NewSafeSet() for _, addr := range Config.NewCluster { newTsdbInstances.Add(addr) } - NewTsdbConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle, - Config.ConnTimeout, Config.CallTimeout, newTsdbInstances.ToSlice()) + NewTsdbConnPools = pools.NewConnPools( + Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, newTsdbInstances.ToSlice(), + ) } func initQueues() { diff --git a/src/modules/tsdb/migrate/pool.go b/src/modules/tsdb/migrate/pool.go deleted file mode 100644 index 8daf033e..00000000 --- a/src/modules/tsdb/migrate/pool.go +++ /dev/null @@ -1,137 +0,0 @@ -package migrate - -import ( - "bufio" - "fmt" - "io" - "net" - "net/rpc" - "reflect" - "sync" - "time" - - "github.com/toolkits/pkg/pool" - "github.com/ugorji/go/codec" -) - -// 每个后端backend对应一个ConnPool -type ConnPools struct { - sync.RWMutex - M map[string]*pool.ConnPool - MaxConns int - MaxIdle int - ConnTimeout int - CallTimeout int -} - -func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools { - cp := &ConnPools{M: make(map[string]*pool.ConnPool), MaxConns: maxConns, MaxIdle: maxIdle, - ConnTimeout: connTimeout, CallTimeout: callTimeout} - - ct := time.Duration(cp.ConnTimeout) * time.Millisecond - for _, address := range cluster { - if _, exist := cp.M[address]; exist { - continue - } - cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle) - } - - return cp -} - -func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool { - p := pool.NewConnPool(name, address, maxConns, maxIdle) - p.New = func(connName string) (pool.NConn, error) { - //校验地址是否正确 - _, err := net.ResolveTCPAddr("tcp", p.Address) - if err != nil { - return nil, err - } - - conn, err := net.DialTimeout("tcp", p.Address, connTimeout) - if err != nil { - return nil, err - } - var mh codec.MsgpackHandle - mh.MapType = reflect.TypeOf(map[string]interface{}(nil)) - - var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser - io.Closer - *bufio.Reader - *bufio.Writer - }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} - - rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh) - return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil - } - return p -} - -// 同步发送, 完成发送或超时后 才能返回 -func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error { - connPool, exists := this.Get(addr) - if !exists { - return fmt.Errorf("%s has no connection pool", addr) - } - - conn, err := connPool.Fetch() - if err != nil { - return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc()) - } - - rpcClient := conn.(RpcClient) - callTimeout := time.Duration(this.CallTimeout) * time.Millisecond - - done := make(chan error, 1) - go func() { - done <- rpcClient.Call(method, args, resp) - }() - - select { - case <-time.After(callTimeout): - connPool.ForceClose(conn) - return fmt.Errorf("%s, call timeout", addr) - case err = <-done: - if err != nil { - connPool.ForceClose(conn) - err = fmt.Errorf("%s, call failed, err %v. proc: %s", addr, err, connPool.Proc()) - } else { - connPool.Release(conn) - } - return err - } -} - -func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) { - this.RLock() - defer this.RUnlock() - p, exists := this.M[address] - return p, exists -} - -// RpcCient, 要实现io.Closer接口 -type RpcClient struct { - cli *rpc.Client - name string -} - -func (this RpcClient) Name() string { - return this.name -} - -func (this RpcClient) Closed() bool { - return this.cli == nil -} - -func (this RpcClient) Close() error { - if this.cli != nil { - err := this.cli.Close() - this.cli = nil - return err - } - return nil -} - -func (this RpcClient) Call(method string, args interface{}, reply interface{}) error { - return this.cli.Call(method, args, reply) -} diff --git a/src/modules/tsdb/migrate/query.go b/src/modules/tsdb/migrate/query.go index 19bec9fd..bd3ead2c 100644 --- a/src/modules/tsdb/migrate/query.go +++ b/src/modules/tsdb/migrate/query.go @@ -7,6 +7,7 @@ import ( "time" "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/toolkits/pools" "github.com/didi/nightingale/src/toolkits/str" "github.com/toolkits/pkg/pool" @@ -68,19 +69,19 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err resp = &dataobj.TsdbQueryResponse{} pk := str.PK(para.Endpoint, para.Counter) - pool, addr, err := selectPoolByPK(pk) + onePool, addr, err := selectPoolByPK(pk) if err != nil { return resp, err } - conn, err := pool.Fetch() + conn, err := onePool.Fetch() if err != nil { return resp, err } - rpcConn := conn.(RpcClient) + rpcConn := conn.(pools.RpcClient) if rpcConn.Closed() { - pool.ForceClose(conn) + onePool.ForceClose(conn) return resp, errors.New("conn closed") } @@ -98,20 +99,20 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err select { case <-time.After(time.Duration(Config.CallTimeout) * time.Millisecond): - pool.ForceClose(conn) - return nil, fmt.Errorf("%s, call timeout. proc: %s", addr, pool.Proc()) + onePool.ForceClose(conn) + return nil, fmt.Errorf("%s, call timeout. proc: %s", addr, onePool.Proc()) case r := <-ch: if r.Err != nil { - pool.ForceClose(conn) - return r.Resp, fmt.Errorf("%s, call failed, err %v. proc: %s", addr, r.Err, pool.Proc()) + onePool.ForceClose(conn) + return r.Resp, fmt.Errorf("%s, call failed, err %v. proc: %s", addr, r.Err, onePool.Proc()) } else { - pool.Release(conn) + onePool.Release(conn) if len(r.Resp.Values) < 1 { r.Resp.Values = []*dataobj.RRDData{} return r.Resp, nil } - fixed := []*dataobj.RRDData{} + fixed := make([]*dataobj.RRDData, 0) for _, v := range r.Resp.Values { if v == nil || !(v.Timestamp >= start && v.Timestamp <= end) { continue @@ -136,11 +137,10 @@ func selectPoolByPK(pk string) (*pool.ConnPool, string, error) { return nil, "", errors.New("node not found") } - pool, found := TsdbConnPools.Get(addr) + onePool, found := TsdbConnPools.Get(addr) if !found { return nil, "", errors.New("addr not found") } - return pool, addr, nil - + return onePool, addr, nil } diff --git a/src/modules/transfer/backend/pool.go b/src/toolkits/pools/pools.go similarity index 54% rename from src/modules/transfer/backend/pool.go rename to src/toolkits/pools/pools.go index 4de9e87d..6df239ee 100644 --- a/src/modules/transfer/backend/pool.go +++ b/src/toolkits/pools/pools.go @@ -1,4 +1,4 @@ -package backend +package pools import ( "bufio" @@ -11,38 +11,43 @@ import ( "time" "github.com/toolkits/pkg/pool" + "github.com/ugorji/go/codec" ) -// backend -> ConnPool +// ConnPools is responsible for the Connection Pool lifecycle management. type ConnPools struct { sync.RWMutex - M map[string]*pool.ConnPool + P map[string]*pool.ConnPool MaxConns int MaxIdle int ConnTimeout int CallTimeout int } -func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools { - cp := &ConnPools{M: make(map[string]*pool.ConnPool), MaxConns: maxConns, MaxIdle: maxIdle, - ConnTimeout: connTimeout, CallTimeout: callTimeout} +func NewConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools { + cp := &ConnPools{ + P: make(map[string]*pool.ConnPool), + MaxConns: maxConns, + MaxIdle: maxIdle, + ConnTimeout: connTimeout, + CallTimeout: callTimeout, + } ct := time.Duration(cp.ConnTimeout) * time.Millisecond for _, address := range cluster { - if _, exist := cp.M[address]; exist { + if _, exist := cp.P[address]; exist { continue } - cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle) + cp.P[address] = createOnePool(address, address, ct, maxConns, maxIdle) } - return cp } func createOnePool(name, address string, connTimeout time.Duration, maxConns, maxIdle int) *pool.ConnPool { p := pool.NewConnPool(name, address, maxConns, maxIdle) p.New = func(connName string) (pool.NConn, error) { - // check address + // valid address _, err := net.ResolveTCPAddr("tcp", p.Address) if err != nil { return nil, err @@ -55,11 +60,12 @@ func createOnePool(name, address string, connTimeout time.Duration, maxConns, ma var mh codec.MsgpackHandle mh.MapType = reflect.TypeOf(map[string]interface{}(nil)) - var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser + // bufconn here is a buffered io.ReadWriteCloser + var bufconn = struct { io.Closer *bufio.Reader *bufio.Writer - }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} + }{Closer: conn, Reader: bufio.NewReader(conn), Writer: bufio.NewWriter(conn)} rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh) return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil @@ -67,37 +73,33 @@ func createOnePool(name, address string, connTimeout time.Duration, maxConns, ma return p } -func (cp *ConnPools) Update(cluster []string) { - cp.Lock() - defer cp.Unlock() - - maxConns := Config.MaxConns - maxIdle := Config.MaxIdle - ct := time.Duration(cp.ConnTimeout) * time.Millisecond - newCluster := make(map[string]struct{}) - for _, address := range cluster { - newCluster[address] = struct{}{} - if _, exist := cp.M[address]; exist { - continue +// Call will block until request failed or timeout. +func (cp *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error { + var selectedPool *pool.ConnPool + var exists bool + + // if address is empty, we will select a available pool from cp.P randomly. + // map-range function gets random keys order every time. + if addr == "" { + for _, p := range cp.P { + if p != nil { + selectedPool = p + break + } } - cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle) - } - - // delete invalid address from cp.M - for address := range cp.M { - if _, exists := newCluster[address]; !exists { - delete(cp.M, address) + } else { + selectedPool, exists = cp.Get(addr) + if !exists { + return fmt.Errorf("%s has no connection pool", addr) } } -} -// Call will block until the request failed or timeout -func (cp *ConnPools) Call(addr, method string, args, resp interface{}) error { - connPool, exists := cp.Get(addr) - if !exists { - return fmt.Errorf("%s has no connection pool", addr) + // make sure the selected pool alive. + if selectedPool == nil { + return fmt.Errorf("no connection pool available") } + connPool := selectedPool conn, err := connPool.Fetch() if err != nil { return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc()) @@ -129,11 +131,44 @@ func (cp *ConnPools) Call(addr, method string, args, resp interface{}) error { func (cp *ConnPools) Get(address string) (*pool.ConnPool, bool) { cp.RLock() defer cp.RUnlock() - p, exists := cp.M[address] + + p, exists := cp.P[address] return p, exists } -// RpcClient implements the io.Closer interface +func (cp *ConnPools) UpdatePools(addrs []string) []string { + cp.Lock() + defer cp.Unlock() + + newAddrs := make([]string, 0) + if len(addrs) == 0 { + cp.P = make(map[string]*pool.ConnPool) + return newAddrs + } + addrMap := make(map[string]struct{}) + + ct := time.Duration(cp.ConnTimeout) * time.Millisecond + for _, addr := range addrs { + addrMap[addr] = struct{}{} + _, exists := cp.P[addr] + if exists { + continue + } + newAddrs = append(newAddrs, addr) + cp.P[addr] = createOnePool(addr, addr, ct, cp.MaxConns, cp.MaxIdle) + } + + // remove a pool from cp.P + for addr := range cp.P { + if _, exists := addrMap[addr]; !exists { + delete(cp.P, addr) + } + } + + return newAddrs +} + +// RpcCient implements the io.Closer interface type RpcClient struct { cli *rpc.Client name string