|
|
|
@ -49,7 +49,7 @@ func NewEtcdClient(endpoints string, numPservers int, timeout time.Duration) *Et
|
|
|
|
|
// Register registers the pserver on etcd
|
|
|
|
|
//
|
|
|
|
|
// Register returns the index of the current pserver.
|
|
|
|
|
func (e *EtcdClient) Register() (int, error) {
|
|
|
|
|
func (e *EtcdClient) Register(port int) (int, error) {
|
|
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
e.externalIP, err = networkhelper.GetExternalIP()
|
|
|
|
@ -116,7 +116,7 @@ func (e *EtcdClient) Register() (int, error) {
|
|
|
|
|
for {
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
|
|
var err error
|
|
|
|
|
pserverIdx, err = e.registerPserverEtcd(ctx)
|
|
|
|
|
pserverIdx, err = e.registerPserverEtcd(ctx, port)
|
|
|
|
|
cancel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn(err)
|
|
|
|
@ -140,7 +140,7 @@ func (e *EtcdClient) initDesiredPservers(ctx context.Context, numPservers int) (
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// registerPserverEtcd registers pserver node on etcd using transaction.
|
|
|
|
|
func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) {
|
|
|
|
|
func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, error) {
|
|
|
|
|
var idx int
|
|
|
|
|
_, err := concurrency.NewSTM(e.etcdClient, func(c concurrency.STM) error {
|
|
|
|
|
registered := false
|
|
|
|
@ -156,8 +156,9 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
}
|
|
|
|
|
// find the first id and write info
|
|
|
|
|
c.Put(psKey, e.externalIP, clientv3.WithLease(resp.ID))
|
|
|
|
|
log.Debugf("set pserver node %s with value %s", psKey, e.externalIP)
|
|
|
|
|
pserverAddr := e.externalIP + ":" + strconv.Itoa(port)
|
|
|
|
|
c.Put(psKey, pserverAddr, clientv3.WithLease(resp.ID))
|
|
|
|
|
log.Debugf("set pserver node %s with value %s", psKey, pserverAddr)
|
|
|
|
|
ch, kaerr := e.etcdClient.KeepAlive(context.TODO(), resp.ID)
|
|
|
|
|
if kaerr != nil {
|
|
|
|
|
log.Errorf("keepalive etcd node error: %v", kaerr)
|
|
|
|
|