1
0

backoff.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright 2023 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wait
  15. import (
  16. "math/rand"
  17. "time"
  18. "github.com/samber/lo"
  19. "github.com/fatedier/frp/pkg/util/util"
  20. )
  21. type BackoffFunc func(previousDuration time.Duration, previousConditionError bool) time.Duration
  22. func (f BackoffFunc) Backoff(previousDuration time.Duration, previousConditionError bool) time.Duration {
  23. return f(previousDuration, previousConditionError)
  24. }
  25. type BackoffManager interface {
  26. Backoff(previousDuration time.Duration, previousConditionError bool) time.Duration
  27. }
  28. type FastBackoffOptions struct {
  29. Duration time.Duration
  30. Factor float64
  31. Jitter float64
  32. MaxDuration time.Duration
  33. InitDurationIfFail time.Duration
  34. // If FastRetryCount > 0, then within the FastRetryWindow time window,
  35. // the retry will be performed with a delay of FastRetryDelay for the first FastRetryCount calls.
  36. FastRetryCount int
  37. FastRetryDelay time.Duration
  38. FastRetryJitter float64
  39. FastRetryWindow time.Duration
  40. }
  41. type fastBackoffImpl struct {
  42. options FastBackoffOptions
  43. lastCalledTime time.Time
  44. consecutiveErrCount int
  45. fastRetryCutoffTime time.Time
  46. countsInFastRetryWindow int
  47. }
  48. func NewFastBackoffManager(options FastBackoffOptions) BackoffManager {
  49. return &fastBackoffImpl{
  50. options: options,
  51. countsInFastRetryWindow: 1,
  52. }
  53. }
  54. func (f *fastBackoffImpl) Backoff(previousDuration time.Duration, previousConditionError bool) time.Duration {
  55. if f.lastCalledTime.IsZero() {
  56. f.lastCalledTime = time.Now()
  57. return f.options.Duration
  58. }
  59. now := time.Now()
  60. f.lastCalledTime = now
  61. if previousConditionError {
  62. f.consecutiveErrCount++
  63. } else {
  64. f.consecutiveErrCount = 0
  65. }
  66. if f.options.FastRetryCount > 0 && previousConditionError {
  67. f.countsInFastRetryWindow++
  68. if f.countsInFastRetryWindow <= f.options.FastRetryCount {
  69. return Jitter(f.options.FastRetryDelay, f.options.FastRetryJitter)
  70. }
  71. if now.After(f.fastRetryCutoffTime) {
  72. // reset
  73. f.fastRetryCutoffTime = now.Add(f.options.FastRetryWindow)
  74. f.countsInFastRetryWindow = 0
  75. }
  76. }
  77. if previousConditionError {
  78. var duration time.Duration
  79. if f.consecutiveErrCount == 1 {
  80. duration = util.EmptyOr(f.options.InitDurationIfFail, previousDuration)
  81. } else {
  82. duration = previousDuration
  83. }
  84. duration = util.EmptyOr(duration, time.Second)
  85. if f.options.Factor != 0 {
  86. duration = time.Duration(float64(duration) * f.options.Factor)
  87. }
  88. if f.options.Jitter > 0 {
  89. duration = Jitter(duration, f.options.Jitter)
  90. }
  91. if f.options.MaxDuration > 0 && duration > f.options.MaxDuration {
  92. duration = f.options.MaxDuration
  93. }
  94. return duration
  95. }
  96. return f.options.Duration
  97. }
  98. func BackoffUntil(f func() error, backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
  99. var delay time.Duration
  100. previousError := false
  101. ticker := time.NewTicker(backoff.Backoff(delay, previousError))
  102. defer ticker.Stop()
  103. for {
  104. select {
  105. case <-stopCh:
  106. return
  107. default:
  108. }
  109. if !sliding {
  110. delay = backoff.Backoff(delay, previousError)
  111. }
  112. if err := f(); err != nil {
  113. previousError = true
  114. } else {
  115. previousError = false
  116. }
  117. if sliding {
  118. delay = backoff.Backoff(delay, previousError)
  119. }
  120. ticker.Reset(delay)
  121. select {
  122. case <-stopCh:
  123. return
  124. default:
  125. }
  126. select {
  127. case <-stopCh:
  128. return
  129. case <-ticker.C:
  130. }
  131. }
  132. }
  133. // Jitter returns a time.Duration between duration and duration + maxFactor *
  134. // duration.
  135. //
  136. // This allows clients to avoid converging on periodic behavior. If maxFactor
  137. // is 0.0, a suggested default value will be chosen.
  138. func Jitter(duration time.Duration, maxFactor float64) time.Duration {
  139. if maxFactor <= 0.0 {
  140. maxFactor = 1.0
  141. }
  142. wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
  143. return wait
  144. }
  145. func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
  146. ff := func() error {
  147. f()
  148. return nil
  149. }
  150. BackoffUntil(ff, BackoffFunc(func(time.Duration, bool) time.Duration {
  151. return period
  152. }), true, stopCh)
  153. }
  154. func MergeAndCloseOnAnyStopChannel[T any](upstreams ...<-chan T) <-chan T {
  155. out := make(chan T)
  156. for _, upstream := range upstreams {
  157. ch := upstream
  158. go lo.Try0(func() {
  159. select {
  160. case <-ch:
  161. close(out)
  162. case <-out:
  163. }
  164. })
  165. }
  166. return out
  167. }