backoff.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright 2023 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wait
  15. import (
  16. "math/rand"
  17. "sync"
  18. "time"
  19. "github.com/fatedier/frp/pkg/util/util"
  20. )
  21. type BackoffFunc func(previousDuration time.Duration, previousConditionError bool) time.Duration
  22. func (f BackoffFunc) Backoff(previousDuration time.Duration, previousConditionError bool) time.Duration {
  23. return f(previousDuration, previousConditionError)
  24. }
  25. type BackoffManager interface {
  26. Backoff(previousDuration time.Duration, previousConditionError bool) time.Duration
  27. }
  28. type FastBackoffOptions struct {
  29. Duration time.Duration
  30. Factor float64
  31. Jitter float64
  32. MaxDuration time.Duration
  33. InitDurationIfFail time.Duration
  34. // If FastRetryCount > 0, then within the FastRetryWindow time window,
  35. // the retry will be performed with a delay of FastRetryDelay for the first FastRetryCount calls.
  36. FastRetryCount int
  37. FastRetryDelay time.Duration
  38. FastRetryJitter float64
  39. FastRetryWindow time.Duration
  40. }
  41. type fastBackoffImpl struct {
  42. options FastBackoffOptions
  43. lastCalledTime time.Time
  44. consecutiveErrCount int
  45. fastRetryCutoffTime time.Time
  46. countsInFastRetryWindow int
  47. }
  48. func NewFastBackoffManager(options FastBackoffOptions) BackoffManager {
  49. return &fastBackoffImpl{
  50. options: options,
  51. countsInFastRetryWindow: 1,
  52. }
  53. }
  54. func (f *fastBackoffImpl) Backoff(previousDuration time.Duration, previousConditionError bool) time.Duration {
  55. if f.lastCalledTime.IsZero() {
  56. f.lastCalledTime = time.Now()
  57. return f.options.Duration
  58. }
  59. now := time.Now()
  60. f.lastCalledTime = now
  61. if previousConditionError {
  62. f.consecutiveErrCount++
  63. } else {
  64. f.consecutiveErrCount = 0
  65. }
  66. if f.options.FastRetryCount > 0 && previousConditionError {
  67. f.countsInFastRetryWindow++
  68. if f.countsInFastRetryWindow <= f.options.FastRetryCount {
  69. return Jitter(f.options.FastRetryDelay, f.options.FastRetryJitter)
  70. }
  71. if now.After(f.fastRetryCutoffTime) {
  72. // reset
  73. f.fastRetryCutoffTime = now.Add(f.options.FastRetryWindow)
  74. f.countsInFastRetryWindow = 0
  75. }
  76. }
  77. if previousConditionError {
  78. var duration time.Duration
  79. if f.consecutiveErrCount == 1 {
  80. duration = util.EmptyOr(f.options.InitDurationIfFail, previousDuration)
  81. } else {
  82. duration = previousDuration
  83. }
  84. duration = util.EmptyOr(duration, time.Second)
  85. if f.options.Factor != 0 {
  86. duration = time.Duration(float64(duration) * f.options.Factor)
  87. }
  88. if f.options.Jitter > 0 {
  89. duration = Jitter(duration, f.options.Jitter)
  90. }
  91. if f.options.MaxDuration > 0 && duration > f.options.MaxDuration {
  92. duration = f.options.MaxDuration
  93. }
  94. return duration
  95. }
  96. return f.options.Duration
  97. }
  98. func BackoffUntil(f func() (bool, error), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
  99. var delay time.Duration
  100. previousError := false
  101. ticker := time.NewTicker(backoff.Backoff(delay, previousError))
  102. defer ticker.Stop()
  103. for {
  104. select {
  105. case <-stopCh:
  106. return
  107. default:
  108. }
  109. if !sliding {
  110. delay = backoff.Backoff(delay, previousError)
  111. }
  112. if done, err := f(); done {
  113. return
  114. } else if err != nil {
  115. previousError = true
  116. } else {
  117. previousError = false
  118. }
  119. if sliding {
  120. delay = backoff.Backoff(delay, previousError)
  121. }
  122. ticker.Reset(delay)
  123. select {
  124. case <-stopCh:
  125. return
  126. case <-ticker.C:
  127. }
  128. }
  129. }
  130. // Jitter returns a time.Duration between duration and duration + maxFactor *
  131. // duration.
  132. //
  133. // This allows clients to avoid converging on periodic behavior. If maxFactor
  134. // is 0.0, a suggested default value will be chosen.
  135. func Jitter(duration time.Duration, maxFactor float64) time.Duration {
  136. if maxFactor <= 0.0 {
  137. maxFactor = 1.0
  138. }
  139. wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
  140. return wait
  141. }
  142. func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
  143. ff := func() (bool, error) {
  144. f()
  145. return false, nil
  146. }
  147. BackoffUntil(ff, BackoffFunc(func(time.Duration, bool) time.Duration {
  148. return period
  149. }), true, stopCh)
  150. }
  151. func MergeAndCloseOnAnyStopChannel[T any](upstreams ...<-chan T) <-chan T {
  152. out := make(chan T)
  153. closeOnce := sync.Once{}
  154. for _, upstream := range upstreams {
  155. ch := upstream
  156. go func() {
  157. select {
  158. case <-ch:
  159. closeOnce.Do(func() {
  160. close(out)
  161. })
  162. case <-out:
  163. }
  164. }()
  165. }
  166. return out
  167. }