|
@@ -281,18 +281,21 @@ func (ctl *Control) manager() {
|
|
|
ctl.Warn("[%s] start error: %s", m.ProxyName, m.Error)
|
|
|
continue
|
|
|
}
|
|
|
- oldPxy, ok := ctl.proxies[m.ProxyName]
|
|
|
- if ok {
|
|
|
- oldPxy.Close()
|
|
|
- }
|
|
|
cfg, ok := ctl.pxyCfgs[m.ProxyName]
|
|
|
if !ok {
|
|
|
// it will never go to this branch
|
|
|
ctl.Warn("[%s] no proxy conf found", m.ProxyName)
|
|
|
continue
|
|
|
}
|
|
|
+ oldPxy, ok := ctl.proxies[m.ProxyName]
|
|
|
+ if ok {
|
|
|
+ oldPxy.Close()
|
|
|
+ }
|
|
|
pxy := NewProxy(ctl, cfg)
|
|
|
- pxy.Run()
|
|
|
+ if err := pxy.Run(); err != nil {
|
|
|
+ ctl.Warn("[%s] proxy start running error: %v", m.ProxyName, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
ctl.proxies[m.ProxyName] = pxy
|
|
|
ctl.Info("[%s] start proxy success", m.ProxyName)
|
|
|
case *msg.Pong:
|
|
@@ -307,46 +310,64 @@ func (ctl *Control) controler() {
|
|
|
var err error
|
|
|
maxDelayTime := 30 * time.Second
|
|
|
delayTime := time.Second
|
|
|
+
|
|
|
+ checkInterval := 60 * time.Second
|
|
|
+ checkProxyTicker := time.NewTicker(checkInterval)
|
|
|
for {
|
|
|
- // we won't get any variable from this channel
|
|
|
- _, ok := <-ctl.closedCh
|
|
|
- if !ok {
|
|
|
- // close related channels
|
|
|
- close(ctl.readCh)
|
|
|
- close(ctl.sendCh)
|
|
|
- time.Sleep(time.Second)
|
|
|
-
|
|
|
- // loop util reconnect to server success
|
|
|
- for {
|
|
|
- ctl.Info("try to reconnect to server...")
|
|
|
- err = ctl.login()
|
|
|
- if err != nil {
|
|
|
- ctl.Warn("reconnect to server error: %v", err)
|
|
|
- time.Sleep(delayTime)
|
|
|
- delayTime = delayTime * 2
|
|
|
- if delayTime > maxDelayTime {
|
|
|
- delayTime = maxDelayTime
|
|
|
- }
|
|
|
- continue
|
|
|
+ select {
|
|
|
+ case <-checkProxyTicker.C:
|
|
|
+ // Every 60 seconds, check which proxy registered failed and reregister it to server.
|
|
|
+ for _, cfg := range ctl.pxyCfgs {
|
|
|
+ if _, exist := ctl.proxies[cfg.GetName()]; !exist {
|
|
|
+ ctl.Info("try to reregister proxy [%s]", cfg.GetName())
|
|
|
+ var newProxyMsg msg.NewProxy
|
|
|
+ cfg.UnMarshalToMsg(&newProxyMsg)
|
|
|
+ ctl.sendCh <- &newProxyMsg
|
|
|
}
|
|
|
- // reconnect success, init the delayTime
|
|
|
- delayTime = time.Second
|
|
|
- break
|
|
|
}
|
|
|
+ case _, ok := <-ctl.closedCh:
|
|
|
+ // we won't get any variable from this channel
|
|
|
+ if !ok {
|
|
|
+ // close related channels
|
|
|
+ close(ctl.readCh)
|
|
|
+ close(ctl.sendCh)
|
|
|
+ time.Sleep(time.Second)
|
|
|
+
|
|
|
+ // loop util reconnect to server success
|
|
|
+ for {
|
|
|
+ ctl.Info("try to reconnect to server...")
|
|
|
+ err = ctl.login()
|
|
|
+ if err != nil {
|
|
|
+ ctl.Warn("reconnect to server error: %v", err)
|
|
|
+ time.Sleep(delayTime)
|
|
|
+ delayTime = delayTime * 2
|
|
|
+ if delayTime > maxDelayTime {
|
|
|
+ delayTime = maxDelayTime
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // reconnect success, init the delayTime
|
|
|
+ delayTime = time.Second
|
|
|
+ break
|
|
|
+ }
|
|
|
|
|
|
- // init related channels and variables
|
|
|
- ctl.init()
|
|
|
+ // init related channels and variables
|
|
|
+ ctl.init()
|
|
|
|
|
|
- // previous work goroutines should be closed and start them here
|
|
|
- go ctl.manager()
|
|
|
- go ctl.writer()
|
|
|
- go ctl.reader()
|
|
|
+ // previous work goroutines should be closed and start them here
|
|
|
+ go ctl.manager()
|
|
|
+ go ctl.writer()
|
|
|
+ go ctl.reader()
|
|
|
|
|
|
- // send NewProxy message for all configured proxies
|
|
|
- for _, cfg := range ctl.pxyCfgs {
|
|
|
- var newProxyMsg msg.NewProxy
|
|
|
- cfg.UnMarshalToMsg(&newProxyMsg)
|
|
|
- ctl.sendCh <- &newProxyMsg
|
|
|
+ // send NewProxy message for all configured proxies
|
|
|
+ for _, cfg := range ctl.pxyCfgs {
|
|
|
+ var newProxyMsg msg.NewProxy
|
|
|
+ cfg.UnMarshalToMsg(&newProxyMsg)
|
|
|
+ ctl.sendCh <- &newProxyMsg
|
|
|
+ }
|
|
|
+
|
|
|
+ checkProxyTicker.Stop()
|
|
|
+ checkProxyTicker = time.NewTicker(checkInterval)
|
|
|
}
|
|
|
}
|
|
|
}
|