kcp.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053
  1. package kcp
  2. import (
  3. "encoding/binary"
  4. "sync/atomic"
  5. "time"
  6. )
  7. const (
  8. IKCP_RTO_NDL = 30 // no delay min rto
  9. IKCP_RTO_MIN = 100 // normal min rto
  10. IKCP_RTO_DEF = 200
  11. IKCP_RTO_MAX = 60000
  12. IKCP_CMD_PUSH = 81 // cmd: push data
  13. IKCP_CMD_ACK = 82 // cmd: ack
  14. IKCP_CMD_WASK = 83 // cmd: window probe (ask)
  15. IKCP_CMD_WINS = 84 // cmd: window size (tell)
  16. IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
  17. IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
  18. IKCP_WND_SND = 32
  19. IKCP_WND_RCV = 32
  20. IKCP_MTU_DEF = 1400
  21. IKCP_ACK_FAST = 3
  22. IKCP_INTERVAL = 100
  23. IKCP_OVERHEAD = 24
  24. IKCP_DEADLINK = 20
  25. IKCP_THRESH_INIT = 2
  26. IKCP_THRESH_MIN = 2
  27. IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
  28. IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
  29. )
  30. // monotonic reference time point
  31. var refTime time.Time = time.Now()
  32. // currentMs returns current elasped monotonic milliseconds since program startup
  33. func currentMs() uint32 { return uint32(time.Now().Sub(refTime) / time.Millisecond) }
  34. // output_callback is a prototype which ought capture conn and call conn.Write
  35. type output_callback func(buf []byte, size int)
  36. /* encode 8 bits unsigned int */
  37. func ikcp_encode8u(p []byte, c byte) []byte {
  38. p[0] = c
  39. return p[1:]
  40. }
  41. /* decode 8 bits unsigned int */
  42. func ikcp_decode8u(p []byte, c *byte) []byte {
  43. *c = p[0]
  44. return p[1:]
  45. }
  46. /* encode 16 bits unsigned int (lsb) */
  47. func ikcp_encode16u(p []byte, w uint16) []byte {
  48. binary.LittleEndian.PutUint16(p, w)
  49. return p[2:]
  50. }
  51. /* decode 16 bits unsigned int (lsb) */
  52. func ikcp_decode16u(p []byte, w *uint16) []byte {
  53. *w = binary.LittleEndian.Uint16(p)
  54. return p[2:]
  55. }
  56. /* encode 32 bits unsigned int (lsb) */
  57. func ikcp_encode32u(p []byte, l uint32) []byte {
  58. binary.LittleEndian.PutUint32(p, l)
  59. return p[4:]
  60. }
  61. /* decode 32 bits unsigned int (lsb) */
  62. func ikcp_decode32u(p []byte, l *uint32) []byte {
  63. *l = binary.LittleEndian.Uint32(p)
  64. return p[4:]
  65. }
  66. func _imin_(a, b uint32) uint32 {
  67. if a <= b {
  68. return a
  69. }
  70. return b
  71. }
  72. func _imax_(a, b uint32) uint32 {
  73. if a >= b {
  74. return a
  75. }
  76. return b
  77. }
  78. func _ibound_(lower, middle, upper uint32) uint32 {
  79. return _imin_(_imax_(lower, middle), upper)
  80. }
  81. func _itimediff(later, earlier uint32) int32 {
  82. return (int32)(later - earlier)
  83. }
  84. // segment defines a KCP segment
  85. type segment struct {
  86. conv uint32
  87. cmd uint8
  88. frg uint8
  89. wnd uint16
  90. ts uint32
  91. sn uint32
  92. una uint32
  93. rto uint32
  94. xmit uint32
  95. resendts uint32
  96. fastack uint32
  97. acked uint32 // mark if the seg has acked
  98. data []byte
  99. }
  100. // encode a segment into buffer
  101. func (seg *segment) encode(ptr []byte) []byte {
  102. ptr = ikcp_encode32u(ptr, seg.conv)
  103. ptr = ikcp_encode8u(ptr, seg.cmd)
  104. ptr = ikcp_encode8u(ptr, seg.frg)
  105. ptr = ikcp_encode16u(ptr, seg.wnd)
  106. ptr = ikcp_encode32u(ptr, seg.ts)
  107. ptr = ikcp_encode32u(ptr, seg.sn)
  108. ptr = ikcp_encode32u(ptr, seg.una)
  109. ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
  110. atomic.AddUint64(&DefaultSnmp.OutSegs, 1)
  111. return ptr
  112. }
  113. // KCP defines a single KCP connection
  114. type KCP struct {
  115. conv, mtu, mss, state uint32
  116. snd_una, snd_nxt, rcv_nxt uint32
  117. ssthresh uint32
  118. rx_rttvar, rx_srtt int32
  119. rx_rto, rx_minrto uint32
  120. snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
  121. interval, ts_flush uint32
  122. nodelay, updated uint32
  123. ts_probe, probe_wait uint32
  124. dead_link, incr uint32
  125. fastresend int32
  126. nocwnd, stream int32
  127. snd_queue []segment
  128. rcv_queue []segment
  129. snd_buf []segment
  130. rcv_buf []segment
  131. acklist []ackItem
  132. buffer []byte
  133. reserved int
  134. output output_callback
  135. }
  136. type ackItem struct {
  137. sn uint32
  138. ts uint32
  139. }
  140. // NewKCP create a new kcp state machine
  141. //
  142. // 'conv' must be equal in the connection peers, or else data will be silently rejected.
  143. //
  144. // 'output' function will be called whenever these is data to be sent on wire.
  145. func NewKCP(conv uint32, output output_callback) *KCP {
  146. kcp := new(KCP)
  147. kcp.conv = conv
  148. kcp.snd_wnd = IKCP_WND_SND
  149. kcp.rcv_wnd = IKCP_WND_RCV
  150. kcp.rmt_wnd = IKCP_WND_RCV
  151. kcp.mtu = IKCP_MTU_DEF
  152. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  153. kcp.buffer = make([]byte, kcp.mtu)
  154. kcp.rx_rto = IKCP_RTO_DEF
  155. kcp.rx_minrto = IKCP_RTO_MIN
  156. kcp.interval = IKCP_INTERVAL
  157. kcp.ts_flush = IKCP_INTERVAL
  158. kcp.ssthresh = IKCP_THRESH_INIT
  159. kcp.dead_link = IKCP_DEADLINK
  160. kcp.output = output
  161. return kcp
  162. }
  163. // newSegment creates a KCP segment
  164. func (kcp *KCP) newSegment(size int) (seg segment) {
  165. seg.data = xmitBuf.Get().([]byte)[:size]
  166. return
  167. }
  168. // delSegment recycles a KCP segment
  169. func (kcp *KCP) delSegment(seg *segment) {
  170. if seg.data != nil {
  171. xmitBuf.Put(seg.data)
  172. seg.data = nil
  173. }
  174. }
  175. // ReserveBytes keeps n bytes untouched from the beginning of the buffer,
  176. // the output_callback function should be aware of this.
  177. //
  178. // Return false if n >= mss
  179. func (kcp *KCP) ReserveBytes(n int) bool {
  180. if n >= int(kcp.mtu-IKCP_OVERHEAD) || n < 0 {
  181. return false
  182. }
  183. kcp.reserved = n
  184. kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(n)
  185. return true
  186. }
  187. // PeekSize checks the size of next message in the recv queue
  188. func (kcp *KCP) PeekSize() (length int) {
  189. if len(kcp.rcv_queue) == 0 {
  190. return -1
  191. }
  192. seg := &kcp.rcv_queue[0]
  193. if seg.frg == 0 {
  194. return len(seg.data)
  195. }
  196. if len(kcp.rcv_queue) < int(seg.frg+1) {
  197. return -1
  198. }
  199. for k := range kcp.rcv_queue {
  200. seg := &kcp.rcv_queue[k]
  201. length += len(seg.data)
  202. if seg.frg == 0 {
  203. break
  204. }
  205. }
  206. return
  207. }
  208. // Receive data from kcp state machine
  209. //
  210. // Return number of bytes read.
  211. //
  212. // Return -1 when there is no readable data.
  213. //
  214. // Return -2 if len(buffer) is smaller than kcp.PeekSize().
  215. func (kcp *KCP) Recv(buffer []byte) (n int) {
  216. peeksize := kcp.PeekSize()
  217. if peeksize < 0 {
  218. return -1
  219. }
  220. if peeksize > len(buffer) {
  221. return -2
  222. }
  223. var fast_recover bool
  224. if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
  225. fast_recover = true
  226. }
  227. // merge fragment
  228. count := 0
  229. for k := range kcp.rcv_queue {
  230. seg := &kcp.rcv_queue[k]
  231. copy(buffer, seg.data)
  232. buffer = buffer[len(seg.data):]
  233. n += len(seg.data)
  234. count++
  235. kcp.delSegment(seg)
  236. if seg.frg == 0 {
  237. break
  238. }
  239. }
  240. if count > 0 {
  241. kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
  242. }
  243. // move available data from rcv_buf -> rcv_queue
  244. count = 0
  245. for k := range kcp.rcv_buf {
  246. seg := &kcp.rcv_buf[k]
  247. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) {
  248. kcp.rcv_nxt++
  249. count++
  250. } else {
  251. break
  252. }
  253. }
  254. if count > 0 {
  255. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  256. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  257. }
  258. // fast recover
  259. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
  260. // ready to send back IKCP_CMD_WINS in ikcp_flush
  261. // tell remote my window size
  262. kcp.probe |= IKCP_ASK_TELL
  263. }
  264. return
  265. }
  266. // Send is user/upper level send, returns below zero for error
  267. func (kcp *KCP) Send(buffer []byte) int {
  268. var count int
  269. if len(buffer) == 0 {
  270. return -1
  271. }
  272. // append to previous segment in streaming mode (if possible)
  273. if kcp.stream != 0 {
  274. n := len(kcp.snd_queue)
  275. if n > 0 {
  276. seg := &kcp.snd_queue[n-1]
  277. if len(seg.data) < int(kcp.mss) {
  278. capacity := int(kcp.mss) - len(seg.data)
  279. extend := capacity
  280. if len(buffer) < capacity {
  281. extend = len(buffer)
  282. }
  283. // grow slice, the underlying cap is guaranteed to
  284. // be larger than kcp.mss
  285. oldlen := len(seg.data)
  286. seg.data = seg.data[:oldlen+extend]
  287. copy(seg.data[oldlen:], buffer)
  288. buffer = buffer[extend:]
  289. }
  290. }
  291. if len(buffer) == 0 {
  292. return 0
  293. }
  294. }
  295. if len(buffer) <= int(kcp.mss) {
  296. count = 1
  297. } else {
  298. count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
  299. }
  300. if count > 255 {
  301. return -2
  302. }
  303. if count == 0 {
  304. count = 1
  305. }
  306. for i := 0; i < count; i++ {
  307. var size int
  308. if len(buffer) > int(kcp.mss) {
  309. size = int(kcp.mss)
  310. } else {
  311. size = len(buffer)
  312. }
  313. seg := kcp.newSegment(size)
  314. copy(seg.data, buffer[:size])
  315. if kcp.stream == 0 { // message mode
  316. seg.frg = uint8(count - i - 1)
  317. } else { // stream mode
  318. seg.frg = 0
  319. }
  320. kcp.snd_queue = append(kcp.snd_queue, seg)
  321. buffer = buffer[size:]
  322. }
  323. return 0
  324. }
  325. func (kcp *KCP) update_ack(rtt int32) {
  326. // https://tools.ietf.org/html/rfc6298
  327. var rto uint32
  328. if kcp.rx_srtt == 0 {
  329. kcp.rx_srtt = rtt
  330. kcp.rx_rttvar = rtt >> 1
  331. } else {
  332. delta := rtt - kcp.rx_srtt
  333. kcp.rx_srtt += delta >> 3
  334. if delta < 0 {
  335. delta = -delta
  336. }
  337. if rtt < kcp.rx_srtt-kcp.rx_rttvar {
  338. // if the new RTT sample is below the bottom of the range of
  339. // what an RTT measurement is expected to be.
  340. // give an 8x reduced weight versus its normal weighting
  341. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5
  342. } else {
  343. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2
  344. }
  345. }
  346. rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2)
  347. kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
  348. }
  349. func (kcp *KCP) shrink_buf() {
  350. if len(kcp.snd_buf) > 0 {
  351. seg := &kcp.snd_buf[0]
  352. kcp.snd_una = seg.sn
  353. } else {
  354. kcp.snd_una = kcp.snd_nxt
  355. }
  356. }
  357. func (kcp *KCP) parse_ack(sn uint32) {
  358. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  359. return
  360. }
  361. for k := range kcp.snd_buf {
  362. seg := &kcp.snd_buf[k]
  363. if sn == seg.sn {
  364. // mark and free space, but leave the segment here,
  365. // and wait until `una` to delete this, then we don't
  366. // have to shift the segments behind forward,
  367. // which is an expensive operation for large window
  368. seg.acked = 1
  369. kcp.delSegment(seg)
  370. break
  371. }
  372. if _itimediff(sn, seg.sn) < 0 {
  373. break
  374. }
  375. }
  376. }
  377. func (kcp *KCP) parse_fastack(sn, ts uint32) {
  378. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  379. return
  380. }
  381. for k := range kcp.snd_buf {
  382. seg := &kcp.snd_buf[k]
  383. if _itimediff(sn, seg.sn) < 0 {
  384. break
  385. } else if sn != seg.sn && _itimediff(seg.ts, ts) <= 0 {
  386. seg.fastack++
  387. }
  388. }
  389. }
  390. func (kcp *KCP) parse_una(una uint32) {
  391. count := 0
  392. for k := range kcp.snd_buf {
  393. seg := &kcp.snd_buf[k]
  394. if _itimediff(una, seg.sn) > 0 {
  395. kcp.delSegment(seg)
  396. count++
  397. } else {
  398. break
  399. }
  400. }
  401. if count > 0 {
  402. kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count)
  403. }
  404. }
  405. // ack append
  406. func (kcp *KCP) ack_push(sn, ts uint32) {
  407. kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
  408. }
  409. // returns true if data has repeated
  410. func (kcp *KCP) parse_data(newseg segment) bool {
  411. sn := newseg.sn
  412. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  413. _itimediff(sn, kcp.rcv_nxt) < 0 {
  414. return true
  415. }
  416. n := len(kcp.rcv_buf) - 1
  417. insert_idx := 0
  418. repeat := false
  419. for i := n; i >= 0; i-- {
  420. seg := &kcp.rcv_buf[i]
  421. if seg.sn == sn {
  422. repeat = true
  423. break
  424. }
  425. if _itimediff(sn, seg.sn) > 0 {
  426. insert_idx = i + 1
  427. break
  428. }
  429. }
  430. if !repeat {
  431. // replicate the content if it's new
  432. dataCopy := xmitBuf.Get().([]byte)[:len(newseg.data)]
  433. copy(dataCopy, newseg.data)
  434. newseg.data = dataCopy
  435. if insert_idx == n+1 {
  436. kcp.rcv_buf = append(kcp.rcv_buf, newseg)
  437. } else {
  438. kcp.rcv_buf = append(kcp.rcv_buf, segment{})
  439. copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
  440. kcp.rcv_buf[insert_idx] = newseg
  441. }
  442. }
  443. // move available data from rcv_buf -> rcv_queue
  444. count := 0
  445. for k := range kcp.rcv_buf {
  446. seg := &kcp.rcv_buf[k]
  447. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) {
  448. kcp.rcv_nxt++
  449. count++
  450. } else {
  451. break
  452. }
  453. }
  454. if count > 0 {
  455. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  456. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  457. }
  458. return repeat
  459. }
  460. // Input a packet into kcp state machine.
  461. //
  462. // 'regular' indicates it's a real data packet from remote, and it means it's not generated from ReedSolomon
  463. // codecs.
  464. //
  465. // 'ackNoDelay' will trigger immediate ACK, but surely it will not be efficient in bandwidth
  466. func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
  467. snd_una := kcp.snd_una
  468. if len(data) < IKCP_OVERHEAD {
  469. return -1
  470. }
  471. var latest uint32 // the latest ack packet
  472. var flag int
  473. var inSegs uint64
  474. for {
  475. var ts, sn, length, una, conv uint32
  476. var wnd uint16
  477. var cmd, frg uint8
  478. if len(data) < int(IKCP_OVERHEAD) {
  479. break
  480. }
  481. data = ikcp_decode32u(data, &conv)
  482. if conv != kcp.conv {
  483. return -1
  484. }
  485. data = ikcp_decode8u(data, &cmd)
  486. data = ikcp_decode8u(data, &frg)
  487. data = ikcp_decode16u(data, &wnd)
  488. data = ikcp_decode32u(data, &ts)
  489. data = ikcp_decode32u(data, &sn)
  490. data = ikcp_decode32u(data, &una)
  491. data = ikcp_decode32u(data, &length)
  492. if len(data) < int(length) {
  493. return -2
  494. }
  495. if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
  496. cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
  497. return -3
  498. }
  499. // only trust window updates from regular packets. i.e: latest update
  500. if regular {
  501. kcp.rmt_wnd = uint32(wnd)
  502. }
  503. kcp.parse_una(una)
  504. kcp.shrink_buf()
  505. if cmd == IKCP_CMD_ACK {
  506. kcp.parse_ack(sn)
  507. kcp.parse_fastack(sn, ts)
  508. flag |= 1
  509. latest = ts
  510. } else if cmd == IKCP_CMD_PUSH {
  511. repeat := true
  512. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
  513. kcp.ack_push(sn, ts)
  514. if _itimediff(sn, kcp.rcv_nxt) >= 0 {
  515. var seg segment
  516. seg.conv = conv
  517. seg.cmd = cmd
  518. seg.frg = frg
  519. seg.wnd = wnd
  520. seg.ts = ts
  521. seg.sn = sn
  522. seg.una = una
  523. seg.data = data[:length] // delayed data copying
  524. repeat = kcp.parse_data(seg)
  525. }
  526. }
  527. if regular && repeat {
  528. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  529. }
  530. } else if cmd == IKCP_CMD_WASK {
  531. // ready to send back IKCP_CMD_WINS in Ikcp_flush
  532. // tell remote my window size
  533. kcp.probe |= IKCP_ASK_TELL
  534. } else if cmd == IKCP_CMD_WINS {
  535. // do nothing
  536. } else {
  537. return -3
  538. }
  539. inSegs++
  540. data = data[length:]
  541. }
  542. atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)
  543. // update rtt with the latest ts
  544. // ignore the FEC packet
  545. if flag != 0 && regular {
  546. current := currentMs()
  547. if _itimediff(current, latest) >= 0 {
  548. kcp.update_ack(_itimediff(current, latest))
  549. }
  550. }
  551. // cwnd update when packet arrived
  552. if kcp.nocwnd == 0 {
  553. if _itimediff(kcp.snd_una, snd_una) > 0 {
  554. if kcp.cwnd < kcp.rmt_wnd {
  555. mss := kcp.mss
  556. if kcp.cwnd < kcp.ssthresh {
  557. kcp.cwnd++
  558. kcp.incr += mss
  559. } else {
  560. if kcp.incr < mss {
  561. kcp.incr = mss
  562. }
  563. kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
  564. if (kcp.cwnd+1)*mss <= kcp.incr {
  565. kcp.cwnd++
  566. }
  567. }
  568. if kcp.cwnd > kcp.rmt_wnd {
  569. kcp.cwnd = kcp.rmt_wnd
  570. kcp.incr = kcp.rmt_wnd * mss
  571. }
  572. }
  573. }
  574. }
  575. if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
  576. kcp.flush(true)
  577. }
  578. return 0
  579. }
  580. func (kcp *KCP) wnd_unused() uint16 {
  581. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  582. return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
  583. }
  584. return 0
  585. }
  586. // flush pending data
  587. func (kcp *KCP) flush(ackOnly bool) uint32 {
  588. var seg segment
  589. seg.conv = kcp.conv
  590. seg.cmd = IKCP_CMD_ACK
  591. seg.wnd = kcp.wnd_unused()
  592. seg.una = kcp.rcv_nxt
  593. buffer := kcp.buffer
  594. ptr := buffer[kcp.reserved:] // keep n bytes untouched
  595. // makeSpace makes room for writing
  596. makeSpace := func(space int) {
  597. size := len(buffer) - len(ptr)
  598. if size+space > int(kcp.mtu) {
  599. kcp.output(buffer, size)
  600. ptr = buffer[kcp.reserved:]
  601. }
  602. }
  603. // flush bytes in buffer if there is any
  604. flushBuffer := func() {
  605. size := len(buffer) - len(ptr)
  606. if size > kcp.reserved {
  607. kcp.output(buffer, size)
  608. }
  609. }
  610. // flush acknowledges
  611. for i, ack := range kcp.acklist {
  612. makeSpace(IKCP_OVERHEAD)
  613. // filter jitters caused by bufferbloat
  614. if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
  615. seg.sn, seg.ts = ack.sn, ack.ts
  616. ptr = seg.encode(ptr)
  617. }
  618. }
  619. kcp.acklist = kcp.acklist[0:0]
  620. if ackOnly { // flash remain ack segments
  621. flushBuffer()
  622. return kcp.interval
  623. }
  624. // probe window size (if remote window size equals zero)
  625. if kcp.rmt_wnd == 0 {
  626. current := currentMs()
  627. if kcp.probe_wait == 0 {
  628. kcp.probe_wait = IKCP_PROBE_INIT
  629. kcp.ts_probe = current + kcp.probe_wait
  630. } else {
  631. if _itimediff(current, kcp.ts_probe) >= 0 {
  632. if kcp.probe_wait < IKCP_PROBE_INIT {
  633. kcp.probe_wait = IKCP_PROBE_INIT
  634. }
  635. kcp.probe_wait += kcp.probe_wait / 2
  636. if kcp.probe_wait > IKCP_PROBE_LIMIT {
  637. kcp.probe_wait = IKCP_PROBE_LIMIT
  638. }
  639. kcp.ts_probe = current + kcp.probe_wait
  640. kcp.probe |= IKCP_ASK_SEND
  641. }
  642. }
  643. } else {
  644. kcp.ts_probe = 0
  645. kcp.probe_wait = 0
  646. }
  647. // flush window probing commands
  648. if (kcp.probe & IKCP_ASK_SEND) != 0 {
  649. seg.cmd = IKCP_CMD_WASK
  650. makeSpace(IKCP_OVERHEAD)
  651. ptr = seg.encode(ptr)
  652. }
  653. // flush window probing commands
  654. if (kcp.probe & IKCP_ASK_TELL) != 0 {
  655. seg.cmd = IKCP_CMD_WINS
  656. makeSpace(IKCP_OVERHEAD)
  657. ptr = seg.encode(ptr)
  658. }
  659. kcp.probe = 0
  660. // calculate window size
  661. cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
  662. if kcp.nocwnd == 0 {
  663. cwnd = _imin_(kcp.cwnd, cwnd)
  664. }
  665. // sliding window, controlled by snd_nxt && sna_una+cwnd
  666. newSegsCount := 0
  667. for k := range kcp.snd_queue {
  668. if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
  669. break
  670. }
  671. newseg := kcp.snd_queue[k]
  672. newseg.conv = kcp.conv
  673. newseg.cmd = IKCP_CMD_PUSH
  674. newseg.sn = kcp.snd_nxt
  675. kcp.snd_buf = append(kcp.snd_buf, newseg)
  676. kcp.snd_nxt++
  677. newSegsCount++
  678. }
  679. if newSegsCount > 0 {
  680. kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
  681. }
  682. // calculate resent
  683. resent := uint32(kcp.fastresend)
  684. if kcp.fastresend <= 0 {
  685. resent = 0xffffffff
  686. }
  687. // check for retransmissions
  688. current := currentMs()
  689. var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
  690. minrto := int32(kcp.interval)
  691. ref := kcp.snd_buf[:len(kcp.snd_buf)] // for bounds check elimination
  692. for k := range ref {
  693. segment := &ref[k]
  694. needsend := false
  695. if segment.acked == 1 {
  696. continue
  697. }
  698. if segment.xmit == 0 { // initial transmit
  699. needsend = true
  700. segment.rto = kcp.rx_rto
  701. segment.resendts = current + segment.rto
  702. } else if _itimediff(current, segment.resendts) >= 0 { // RTO
  703. needsend = true
  704. if kcp.nodelay == 0 {
  705. segment.rto += kcp.rx_rto
  706. } else {
  707. segment.rto += kcp.rx_rto / 2
  708. }
  709. segment.resendts = current + segment.rto
  710. lost++
  711. lostSegs++
  712. } else if segment.fastack >= resent { // fast retransmit
  713. needsend = true
  714. segment.fastack = 0
  715. segment.rto = kcp.rx_rto
  716. segment.resendts = current + segment.rto
  717. change++
  718. fastRetransSegs++
  719. } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
  720. needsend = true
  721. segment.fastack = 0
  722. segment.rto = kcp.rx_rto
  723. segment.resendts = current + segment.rto
  724. change++
  725. earlyRetransSegs++
  726. }
  727. if needsend {
  728. current = currentMs()
  729. segment.xmit++
  730. segment.ts = current
  731. segment.wnd = seg.wnd
  732. segment.una = seg.una
  733. need := IKCP_OVERHEAD + len(segment.data)
  734. makeSpace(need)
  735. ptr = segment.encode(ptr)
  736. copy(ptr, segment.data)
  737. ptr = ptr[len(segment.data):]
  738. if segment.xmit >= kcp.dead_link {
  739. kcp.state = 0xFFFFFFFF
  740. }
  741. }
  742. // get the nearest rto
  743. if rto := _itimediff(segment.resendts, current); rto > 0 && rto < minrto {
  744. minrto = rto
  745. }
  746. }
  747. // flash remain segments
  748. flushBuffer()
  749. // counter updates
  750. sum := lostSegs
  751. if lostSegs > 0 {
  752. atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
  753. }
  754. if fastRetransSegs > 0 {
  755. atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
  756. sum += fastRetransSegs
  757. }
  758. if earlyRetransSegs > 0 {
  759. atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
  760. sum += earlyRetransSegs
  761. }
  762. if sum > 0 {
  763. atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
  764. }
  765. // cwnd update
  766. if kcp.nocwnd == 0 {
  767. // update ssthresh
  768. // rate halving, https://tools.ietf.org/html/rfc6937
  769. if change > 0 {
  770. inflight := kcp.snd_nxt - kcp.snd_una
  771. kcp.ssthresh = inflight / 2
  772. if kcp.ssthresh < IKCP_THRESH_MIN {
  773. kcp.ssthresh = IKCP_THRESH_MIN
  774. }
  775. kcp.cwnd = kcp.ssthresh + resent
  776. kcp.incr = kcp.cwnd * kcp.mss
  777. }
  778. // congestion control, https://tools.ietf.org/html/rfc5681
  779. if lost > 0 {
  780. kcp.ssthresh = cwnd / 2
  781. if kcp.ssthresh < IKCP_THRESH_MIN {
  782. kcp.ssthresh = IKCP_THRESH_MIN
  783. }
  784. kcp.cwnd = 1
  785. kcp.incr = kcp.mss
  786. }
  787. if kcp.cwnd < 1 {
  788. kcp.cwnd = 1
  789. kcp.incr = kcp.mss
  790. }
  791. }
  792. return uint32(minrto)
  793. }
  794. // (deprecated)
  795. //
  796. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  797. // ikcp_check when to call it again (without ikcp_input/_send calling).
  798. // 'current' - current timestamp in millisec.
  799. func (kcp *KCP) Update() {
  800. var slap int32
  801. current := currentMs()
  802. if kcp.updated == 0 {
  803. kcp.updated = 1
  804. kcp.ts_flush = current
  805. }
  806. slap = _itimediff(current, kcp.ts_flush)
  807. if slap >= 10000 || slap < -10000 {
  808. kcp.ts_flush = current
  809. slap = 0
  810. }
  811. if slap >= 0 {
  812. kcp.ts_flush += kcp.interval
  813. if _itimediff(current, kcp.ts_flush) >= 0 {
  814. kcp.ts_flush = current + kcp.interval
  815. }
  816. kcp.flush(false)
  817. }
  818. }
  819. // (deprecated)
  820. //
  821. // Check determines when should you invoke ikcp_update:
  822. // returns when you should invoke ikcp_update in millisec, if there
  823. // is no ikcp_input/_send calling. you can call ikcp_update in that
  824. // time, instead of call update repeatly.
  825. // Important to reduce unnacessary ikcp_update invoking. use it to
  826. // schedule ikcp_update (eg. implementing an epoll-like mechanism,
  827. // or optimize ikcp_update when handling massive kcp connections)
  828. func (kcp *KCP) Check() uint32 {
  829. current := currentMs()
  830. ts_flush := kcp.ts_flush
  831. tm_flush := int32(0x7fffffff)
  832. tm_packet := int32(0x7fffffff)
  833. minimal := uint32(0)
  834. if kcp.updated == 0 {
  835. return current
  836. }
  837. if _itimediff(current, ts_flush) >= 10000 ||
  838. _itimediff(current, ts_flush) < -10000 {
  839. ts_flush = current
  840. }
  841. if _itimediff(current, ts_flush) >= 0 {
  842. return current
  843. }
  844. tm_flush = _itimediff(ts_flush, current)
  845. for k := range kcp.snd_buf {
  846. seg := &kcp.snd_buf[k]
  847. diff := _itimediff(seg.resendts, current)
  848. if diff <= 0 {
  849. return current
  850. }
  851. if diff < tm_packet {
  852. tm_packet = diff
  853. }
  854. }
  855. minimal = uint32(tm_packet)
  856. if tm_packet >= tm_flush {
  857. minimal = uint32(tm_flush)
  858. }
  859. if minimal >= kcp.interval {
  860. minimal = kcp.interval
  861. }
  862. return current + minimal
  863. }
  864. // SetMtu changes MTU size, default is 1400
  865. func (kcp *KCP) SetMtu(mtu int) int {
  866. if mtu < 50 || mtu < IKCP_OVERHEAD {
  867. return -1
  868. }
  869. if kcp.reserved >= int(kcp.mtu-IKCP_OVERHEAD) || kcp.reserved < 0 {
  870. return -1
  871. }
  872. buffer := make([]byte, mtu)
  873. if buffer == nil {
  874. return -2
  875. }
  876. kcp.mtu = uint32(mtu)
  877. kcp.mss = kcp.mtu - IKCP_OVERHEAD - uint32(kcp.reserved)
  878. kcp.buffer = buffer
  879. return 0
  880. }
  881. // NoDelay options
  882. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  883. // nodelay: 0:disable(default), 1:enable
  884. // interval: internal update timer interval in millisec, default is 100ms
  885. // resend: 0:disable fast resend(default), 1:enable fast resend
  886. // nc: 0:normal congestion control(default), 1:disable congestion control
  887. func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
  888. if nodelay >= 0 {
  889. kcp.nodelay = uint32(nodelay)
  890. if nodelay != 0 {
  891. kcp.rx_minrto = IKCP_RTO_NDL
  892. } else {
  893. kcp.rx_minrto = IKCP_RTO_MIN
  894. }
  895. }
  896. if interval >= 0 {
  897. if interval > 5000 {
  898. interval = 5000
  899. } else if interval < 10 {
  900. interval = 10
  901. }
  902. kcp.interval = uint32(interval)
  903. }
  904. if resend >= 0 {
  905. kcp.fastresend = int32(resend)
  906. }
  907. if nc >= 0 {
  908. kcp.nocwnd = int32(nc)
  909. }
  910. return 0
  911. }
  912. // WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
  913. func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
  914. if sndwnd > 0 {
  915. kcp.snd_wnd = uint32(sndwnd)
  916. }
  917. if rcvwnd > 0 {
  918. kcp.rcv_wnd = uint32(rcvwnd)
  919. }
  920. return 0
  921. }
  922. // WaitSnd gets how many packet is waiting to be sent
  923. func (kcp *KCP) WaitSnd() int {
  924. return len(kcp.snd_buf) + len(kcp.snd_queue)
  925. }
  926. // remove front n elements from queue
  927. // if the number of elements to remove is more than half of the size.
  928. // just shift the rear elements to front, otherwise just reslice q to q[n:]
  929. // then the cost of runtime.growslice can always be less than n/2
  930. func (kcp *KCP) remove_front(q []segment, n int) []segment {
  931. if n > cap(q)/2 {
  932. newn := copy(q, q[n:])
  933. return q[:newn]
  934. }
  935. return q[n:]
  936. }