kcp.go 22 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. package kcp
  3. import (
  4. "encoding/binary"
  5. "sync/atomic"
  6. )
  7. const (
  8. IKCP_RTO_NDL = 30 // no delay min rto
  9. IKCP_RTO_MIN = 100 // normal min rto
  10. IKCP_RTO_DEF = 200
  11. IKCP_RTO_MAX = 60000
  12. IKCP_CMD_PUSH = 81 // cmd: push data
  13. IKCP_CMD_ACK = 82 // cmd: ack
  14. IKCP_CMD_WASK = 83 // cmd: window probe (ask)
  15. IKCP_CMD_WINS = 84 // cmd: window size (tell)
  16. IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
  17. IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
  18. IKCP_WND_SND = 32
  19. IKCP_WND_RCV = 32
  20. IKCP_MTU_DEF = 1400
  21. IKCP_ACK_FAST = 3
  22. IKCP_INTERVAL = 100
  23. IKCP_OVERHEAD = 24
  24. IKCP_DEADLINK = 20
  25. IKCP_THRESH_INIT = 2
  26. IKCP_THRESH_MIN = 2
  27. IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
  28. IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
  29. )
  30. // output_callback is a prototype which ought capture conn and call conn.Write
  31. type output_callback func(buf []byte, size int)
  32. /* encode 8 bits unsigned int */
  33. func ikcp_encode8u(p []byte, c byte) []byte {
  34. p[0] = c
  35. return p[1:]
  36. }
  37. /* decode 8 bits unsigned int */
  38. func ikcp_decode8u(p []byte, c *byte) []byte {
  39. *c = p[0]
  40. return p[1:]
  41. }
  42. /* encode 16 bits unsigned int (lsb) */
  43. func ikcp_encode16u(p []byte, w uint16) []byte {
  44. binary.LittleEndian.PutUint16(p, w)
  45. return p[2:]
  46. }
  47. /* decode 16 bits unsigned int (lsb) */
  48. func ikcp_decode16u(p []byte, w *uint16) []byte {
  49. *w = binary.LittleEndian.Uint16(p)
  50. return p[2:]
  51. }
  52. /* encode 32 bits unsigned int (lsb) */
  53. func ikcp_encode32u(p []byte, l uint32) []byte {
  54. binary.LittleEndian.PutUint32(p, l)
  55. return p[4:]
  56. }
  57. /* decode 32 bits unsigned int (lsb) */
  58. func ikcp_decode32u(p []byte, l *uint32) []byte {
  59. *l = binary.LittleEndian.Uint32(p)
  60. return p[4:]
  61. }
  62. func _imin_(a, b uint32) uint32 {
  63. if a <= b {
  64. return a
  65. }
  66. return b
  67. }
  68. func _imax_(a, b uint32) uint32 {
  69. if a >= b {
  70. return a
  71. }
  72. return b
  73. }
  74. func _ibound_(lower, middle, upper uint32) uint32 {
  75. return _imin_(_imax_(lower, middle), upper)
  76. }
  77. func _itimediff(later, earlier uint32) int32 {
  78. return (int32)(later - earlier)
  79. }
  80. // segment defines a KCP segment
  81. type segment struct {
  82. conv uint32
  83. cmd uint8
  84. frg uint8
  85. wnd uint16
  86. ts uint32
  87. sn uint32
  88. una uint32
  89. rto uint32
  90. xmit uint32
  91. resendts uint32
  92. fastack uint32
  93. acked uint32 // mark if the seg has acked
  94. data []byte
  95. }
  96. // encode a segment into buffer
  97. func (seg *segment) encode(ptr []byte) []byte {
  98. ptr = ikcp_encode32u(ptr, seg.conv)
  99. ptr = ikcp_encode8u(ptr, seg.cmd)
  100. ptr = ikcp_encode8u(ptr, seg.frg)
  101. ptr = ikcp_encode16u(ptr, seg.wnd)
  102. ptr = ikcp_encode32u(ptr, seg.ts)
  103. ptr = ikcp_encode32u(ptr, seg.sn)
  104. ptr = ikcp_encode32u(ptr, seg.una)
  105. ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
  106. atomic.AddUint64(&DefaultSnmp.OutSegs, 1)
  107. return ptr
  108. }
  109. // KCP defines a single KCP connection
  110. type KCP struct {
  111. conv, mtu, mss, state uint32
  112. snd_una, snd_nxt, rcv_nxt uint32
  113. ssthresh uint32
  114. rx_rttvar, rx_srtt int32
  115. rx_rto, rx_minrto uint32
  116. snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
  117. interval, ts_flush uint32
  118. nodelay, updated uint32
  119. ts_probe, probe_wait uint32
  120. dead_link, incr uint32
  121. fastresend int32
  122. nocwnd, stream int32
  123. snd_queue []segment
  124. rcv_queue []segment
  125. snd_buf []segment
  126. rcv_buf []segment
  127. acklist []ackItem
  128. buffer []byte
  129. output output_callback
  130. }
  131. type ackItem struct {
  132. sn uint32
  133. ts uint32
  134. }
  135. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  136. // from the same connection.
  137. func NewKCP(conv uint32, output output_callback) *KCP {
  138. kcp := new(KCP)
  139. kcp.conv = conv
  140. kcp.snd_wnd = IKCP_WND_SND
  141. kcp.rcv_wnd = IKCP_WND_RCV
  142. kcp.rmt_wnd = IKCP_WND_RCV
  143. kcp.mtu = IKCP_MTU_DEF
  144. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  145. kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
  146. kcp.rx_rto = IKCP_RTO_DEF
  147. kcp.rx_minrto = IKCP_RTO_MIN
  148. kcp.interval = IKCP_INTERVAL
  149. kcp.ts_flush = IKCP_INTERVAL
  150. kcp.ssthresh = IKCP_THRESH_INIT
  151. kcp.dead_link = IKCP_DEADLINK
  152. kcp.output = output
  153. return kcp
  154. }
  155. // newSegment creates a KCP segment
  156. func (kcp *KCP) newSegment(size int) (seg segment) {
  157. seg.data = xmitBuf.Get().([]byte)[:size]
  158. return
  159. }
  160. // delSegment recycles a KCP segment
  161. func (kcp *KCP) delSegment(seg *segment) {
  162. if seg.data != nil {
  163. xmitBuf.Put(seg.data)
  164. seg.data = nil
  165. }
  166. }
  167. // PeekSize checks the size of next message in the recv queue
  168. func (kcp *KCP) PeekSize() (length int) {
  169. if len(kcp.rcv_queue) == 0 {
  170. return -1
  171. }
  172. seg := &kcp.rcv_queue[0]
  173. if seg.frg == 0 {
  174. return len(seg.data)
  175. }
  176. if len(kcp.rcv_queue) < int(seg.frg+1) {
  177. return -1
  178. }
  179. for k := range kcp.rcv_queue {
  180. seg := &kcp.rcv_queue[k]
  181. length += len(seg.data)
  182. if seg.frg == 0 {
  183. break
  184. }
  185. }
  186. return
  187. }
  188. // Recv is user/upper level recv: returns size, returns below zero for EAGAIN
  189. func (kcp *KCP) Recv(buffer []byte) (n int) {
  190. if len(kcp.rcv_queue) == 0 {
  191. return -1
  192. }
  193. peeksize := kcp.PeekSize()
  194. if peeksize < 0 {
  195. return -2
  196. }
  197. if peeksize > len(buffer) {
  198. return -3
  199. }
  200. var fast_recover bool
  201. if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
  202. fast_recover = true
  203. }
  204. // merge fragment
  205. count := 0
  206. for k := range kcp.rcv_queue {
  207. seg := &kcp.rcv_queue[k]
  208. copy(buffer, seg.data)
  209. buffer = buffer[len(seg.data):]
  210. n += len(seg.data)
  211. count++
  212. kcp.delSegment(seg)
  213. if seg.frg == 0 {
  214. break
  215. }
  216. }
  217. if count > 0 {
  218. kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
  219. }
  220. // move available data from rcv_buf -> rcv_queue
  221. count = 0
  222. for k := range kcp.rcv_buf {
  223. seg := &kcp.rcv_buf[k]
  224. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  225. kcp.rcv_nxt++
  226. count++
  227. } else {
  228. break
  229. }
  230. }
  231. if count > 0 {
  232. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  233. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  234. }
  235. // fast recover
  236. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
  237. // ready to send back IKCP_CMD_WINS in ikcp_flush
  238. // tell remote my window size
  239. kcp.probe |= IKCP_ASK_TELL
  240. }
  241. return
  242. }
  243. // Send is user/upper level send, returns below zero for error
  244. func (kcp *KCP) Send(buffer []byte) int {
  245. var count int
  246. if len(buffer) == 0 {
  247. return -1
  248. }
  249. // append to previous segment in streaming mode (if possible)
  250. if kcp.stream != 0 {
  251. n := len(kcp.snd_queue)
  252. if n > 0 {
  253. seg := &kcp.snd_queue[n-1]
  254. if len(seg.data) < int(kcp.mss) {
  255. capacity := int(kcp.mss) - len(seg.data)
  256. extend := capacity
  257. if len(buffer) < capacity {
  258. extend = len(buffer)
  259. }
  260. // grow slice, the underlying cap is guaranteed to
  261. // be larger than kcp.mss
  262. oldlen := len(seg.data)
  263. seg.data = seg.data[:oldlen+extend]
  264. copy(seg.data[oldlen:], buffer)
  265. buffer = buffer[extend:]
  266. }
  267. }
  268. if len(buffer) == 0 {
  269. return 0
  270. }
  271. }
  272. if len(buffer) <= int(kcp.mss) {
  273. count = 1
  274. } else {
  275. count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
  276. }
  277. if count > 255 {
  278. return -2
  279. }
  280. if count == 0 {
  281. count = 1
  282. }
  283. for i := 0; i < count; i++ {
  284. var size int
  285. if len(buffer) > int(kcp.mss) {
  286. size = int(kcp.mss)
  287. } else {
  288. size = len(buffer)
  289. }
  290. seg := kcp.newSegment(size)
  291. copy(seg.data, buffer[:size])
  292. if kcp.stream == 0 { // message mode
  293. seg.frg = uint8(count - i - 1)
  294. } else { // stream mode
  295. seg.frg = 0
  296. }
  297. kcp.snd_queue = append(kcp.snd_queue, seg)
  298. buffer = buffer[size:]
  299. }
  300. return 0
  301. }
  302. func (kcp *KCP) update_ack(rtt int32) {
  303. // https://tools.ietf.org/html/rfc6298
  304. var rto uint32
  305. if kcp.rx_srtt == 0 {
  306. kcp.rx_srtt = rtt
  307. kcp.rx_rttvar = rtt >> 1
  308. } else {
  309. delta := rtt - kcp.rx_srtt
  310. kcp.rx_srtt += delta >> 3
  311. if delta < 0 {
  312. delta = -delta
  313. }
  314. if rtt < kcp.rx_srtt-kcp.rx_rttvar {
  315. // if the new RTT sample is below the bottom of the range of
  316. // what an RTT measurement is expected to be.
  317. // give an 8x reduced weight versus its normal weighting
  318. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5
  319. } else {
  320. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2
  321. }
  322. }
  323. rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2)
  324. kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
  325. }
  326. func (kcp *KCP) shrink_buf() {
  327. if len(kcp.snd_buf) > 0 {
  328. seg := &kcp.snd_buf[0]
  329. kcp.snd_una = seg.sn
  330. } else {
  331. kcp.snd_una = kcp.snd_nxt
  332. }
  333. }
  334. func (kcp *KCP) parse_ack(sn uint32) {
  335. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  336. return
  337. }
  338. for k := range kcp.snd_buf {
  339. seg := &kcp.snd_buf[k]
  340. if sn == seg.sn {
  341. seg.acked = 1
  342. kcp.delSegment(seg)
  343. break
  344. }
  345. if _itimediff(sn, seg.sn) < 0 {
  346. break
  347. }
  348. }
  349. }
  350. func (kcp *KCP) parse_fastack(sn, ts uint32) {
  351. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  352. return
  353. }
  354. for k := range kcp.snd_buf {
  355. seg := &kcp.snd_buf[k]
  356. if _itimediff(sn, seg.sn) < 0 {
  357. break
  358. } else if sn != seg.sn && _itimediff(seg.ts, ts) <= 0 {
  359. seg.fastack++
  360. }
  361. }
  362. }
  363. func (kcp *KCP) parse_una(una uint32) {
  364. count := 0
  365. for k := range kcp.snd_buf {
  366. seg := &kcp.snd_buf[k]
  367. if _itimediff(una, seg.sn) > 0 {
  368. kcp.delSegment(seg)
  369. count++
  370. } else {
  371. break
  372. }
  373. }
  374. if count > 0 {
  375. kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count)
  376. }
  377. }
  378. // ack append
  379. func (kcp *KCP) ack_push(sn, ts uint32) {
  380. kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
  381. }
  382. // returns true if data has repeated
  383. func (kcp *KCP) parse_data(newseg segment) bool {
  384. sn := newseg.sn
  385. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  386. _itimediff(sn, kcp.rcv_nxt) < 0 {
  387. return true
  388. }
  389. n := len(kcp.rcv_buf) - 1
  390. insert_idx := 0
  391. repeat := false
  392. for i := n; i >= 0; i-- {
  393. seg := &kcp.rcv_buf[i]
  394. if seg.sn == sn {
  395. repeat = true
  396. break
  397. }
  398. if _itimediff(sn, seg.sn) > 0 {
  399. insert_idx = i + 1
  400. break
  401. }
  402. }
  403. if !repeat {
  404. // replicate the content if it's new
  405. dataCopy := xmitBuf.Get().([]byte)[:len(newseg.data)]
  406. copy(dataCopy, newseg.data)
  407. newseg.data = dataCopy
  408. if insert_idx == n+1 {
  409. kcp.rcv_buf = append(kcp.rcv_buf, newseg)
  410. } else {
  411. kcp.rcv_buf = append(kcp.rcv_buf, segment{})
  412. copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
  413. kcp.rcv_buf[insert_idx] = newseg
  414. }
  415. }
  416. // move available data from rcv_buf -> rcv_queue
  417. count := 0
  418. for k := range kcp.rcv_buf {
  419. seg := &kcp.rcv_buf[k]
  420. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  421. kcp.rcv_nxt++
  422. count++
  423. } else {
  424. break
  425. }
  426. }
  427. if count > 0 {
  428. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  429. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  430. }
  431. return repeat
  432. }
  433. // Input when you received a low level packet (eg. UDP packet), call it
  434. // regular indicates a regular packet has received(not from FEC)
  435. func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
  436. snd_una := kcp.snd_una
  437. if len(data) < IKCP_OVERHEAD {
  438. return -1
  439. }
  440. var latest uint32 // the latest ack packet
  441. var flag int
  442. var inSegs uint64
  443. for {
  444. var ts, sn, length, una, conv uint32
  445. var wnd uint16
  446. var cmd, frg uint8
  447. if len(data) < int(IKCP_OVERHEAD) {
  448. break
  449. }
  450. data = ikcp_decode32u(data, &conv)
  451. if conv != kcp.conv {
  452. return -1
  453. }
  454. data = ikcp_decode8u(data, &cmd)
  455. data = ikcp_decode8u(data, &frg)
  456. data = ikcp_decode16u(data, &wnd)
  457. data = ikcp_decode32u(data, &ts)
  458. data = ikcp_decode32u(data, &sn)
  459. data = ikcp_decode32u(data, &una)
  460. data = ikcp_decode32u(data, &length)
  461. if len(data) < int(length) {
  462. return -2
  463. }
  464. if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
  465. cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
  466. return -3
  467. }
  468. // only trust window updates from regular packets. i.e: latest update
  469. if regular {
  470. kcp.rmt_wnd = uint32(wnd)
  471. }
  472. kcp.parse_una(una)
  473. kcp.shrink_buf()
  474. if cmd == IKCP_CMD_ACK {
  475. kcp.parse_ack(sn)
  476. kcp.parse_fastack(sn, ts)
  477. flag |= 1
  478. latest = ts
  479. } else if cmd == IKCP_CMD_PUSH {
  480. repeat := true
  481. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
  482. kcp.ack_push(sn, ts)
  483. if _itimediff(sn, kcp.rcv_nxt) >= 0 {
  484. var seg segment
  485. seg.conv = conv
  486. seg.cmd = cmd
  487. seg.frg = frg
  488. seg.wnd = wnd
  489. seg.ts = ts
  490. seg.sn = sn
  491. seg.una = una
  492. seg.data = data[:length] // delayed data copying
  493. repeat = kcp.parse_data(seg)
  494. }
  495. }
  496. if regular && repeat {
  497. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  498. }
  499. } else if cmd == IKCP_CMD_WASK {
  500. // ready to send back IKCP_CMD_WINS in Ikcp_flush
  501. // tell remote my window size
  502. kcp.probe |= IKCP_ASK_TELL
  503. } else if cmd == IKCP_CMD_WINS {
  504. // do nothing
  505. } else {
  506. return -3
  507. }
  508. inSegs++
  509. data = data[length:]
  510. }
  511. atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)
  512. // update rtt with the latest ts
  513. // ignore the FEC packet
  514. if flag != 0 && regular {
  515. current := currentMs()
  516. if _itimediff(current, latest) >= 0 {
  517. kcp.update_ack(_itimediff(current, latest))
  518. }
  519. }
  520. // cwnd update when packet arrived
  521. if kcp.nocwnd == 0 {
  522. if _itimediff(kcp.snd_una, snd_una) > 0 {
  523. if kcp.cwnd < kcp.rmt_wnd {
  524. mss := kcp.mss
  525. if kcp.cwnd < kcp.ssthresh {
  526. kcp.cwnd++
  527. kcp.incr += mss
  528. } else {
  529. if kcp.incr < mss {
  530. kcp.incr = mss
  531. }
  532. kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
  533. if (kcp.cwnd+1)*mss <= kcp.incr {
  534. kcp.cwnd++
  535. }
  536. }
  537. if kcp.cwnd > kcp.rmt_wnd {
  538. kcp.cwnd = kcp.rmt_wnd
  539. kcp.incr = kcp.rmt_wnd * mss
  540. }
  541. }
  542. }
  543. }
  544. if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
  545. kcp.flush(true)
  546. }
  547. return 0
  548. }
  549. func (kcp *KCP) wnd_unused() uint16 {
  550. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  551. return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
  552. }
  553. return 0
  554. }
  555. // flush pending data
  556. func (kcp *KCP) flush(ackOnly bool) uint32 {
  557. var seg segment
  558. seg.conv = kcp.conv
  559. seg.cmd = IKCP_CMD_ACK
  560. seg.wnd = kcp.wnd_unused()
  561. seg.una = kcp.rcv_nxt
  562. buffer := kcp.buffer
  563. // flush acknowledges
  564. ptr := buffer
  565. for i, ack := range kcp.acklist {
  566. size := len(buffer) - len(ptr)
  567. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  568. kcp.output(buffer, size)
  569. ptr = buffer
  570. }
  571. // filter jitters caused by bufferbloat
  572. if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
  573. seg.sn, seg.ts = ack.sn, ack.ts
  574. ptr = seg.encode(ptr)
  575. }
  576. }
  577. kcp.acklist = kcp.acklist[0:0]
  578. if ackOnly { // flash remain ack segments
  579. size := len(buffer) - len(ptr)
  580. if size > 0 {
  581. kcp.output(buffer, size)
  582. }
  583. return kcp.interval
  584. }
  585. // probe window size (if remote window size equals zero)
  586. if kcp.rmt_wnd == 0 {
  587. current := currentMs()
  588. if kcp.probe_wait == 0 {
  589. kcp.probe_wait = IKCP_PROBE_INIT
  590. kcp.ts_probe = current + kcp.probe_wait
  591. } else {
  592. if _itimediff(current, kcp.ts_probe) >= 0 {
  593. if kcp.probe_wait < IKCP_PROBE_INIT {
  594. kcp.probe_wait = IKCP_PROBE_INIT
  595. }
  596. kcp.probe_wait += kcp.probe_wait / 2
  597. if kcp.probe_wait > IKCP_PROBE_LIMIT {
  598. kcp.probe_wait = IKCP_PROBE_LIMIT
  599. }
  600. kcp.ts_probe = current + kcp.probe_wait
  601. kcp.probe |= IKCP_ASK_SEND
  602. }
  603. }
  604. } else {
  605. kcp.ts_probe = 0
  606. kcp.probe_wait = 0
  607. }
  608. // flush window probing commands
  609. if (kcp.probe & IKCP_ASK_SEND) != 0 {
  610. seg.cmd = IKCP_CMD_WASK
  611. size := len(buffer) - len(ptr)
  612. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  613. kcp.output(buffer, size)
  614. ptr = buffer
  615. }
  616. ptr = seg.encode(ptr)
  617. }
  618. // flush window probing commands
  619. if (kcp.probe & IKCP_ASK_TELL) != 0 {
  620. seg.cmd = IKCP_CMD_WINS
  621. size := len(buffer) - len(ptr)
  622. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  623. kcp.output(buffer, size)
  624. ptr = buffer
  625. }
  626. ptr = seg.encode(ptr)
  627. }
  628. kcp.probe = 0
  629. // calculate window size
  630. cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
  631. if kcp.nocwnd == 0 {
  632. cwnd = _imin_(kcp.cwnd, cwnd)
  633. }
  634. // sliding window, controlled by snd_nxt && sna_una+cwnd
  635. newSegsCount := 0
  636. for k := range kcp.snd_queue {
  637. if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
  638. break
  639. }
  640. newseg := kcp.snd_queue[k]
  641. newseg.conv = kcp.conv
  642. newseg.cmd = IKCP_CMD_PUSH
  643. newseg.sn = kcp.snd_nxt
  644. kcp.snd_buf = append(kcp.snd_buf, newseg)
  645. kcp.snd_nxt++
  646. newSegsCount++
  647. }
  648. if newSegsCount > 0 {
  649. kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
  650. }
  651. // calculate resent
  652. resent := uint32(kcp.fastresend)
  653. if kcp.fastresend <= 0 {
  654. resent = 0xffffffff
  655. }
  656. // check for retransmissions
  657. current := currentMs()
  658. var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
  659. minrto := int32(kcp.interval)
  660. ref := kcp.snd_buf[:len(kcp.snd_buf)] // for bounds check elimination
  661. for k := range ref {
  662. segment := &ref[k]
  663. needsend := false
  664. if segment.acked == 1 {
  665. continue
  666. }
  667. if segment.xmit == 0 { // initial transmit
  668. needsend = true
  669. segment.rto = kcp.rx_rto
  670. segment.resendts = current + segment.rto
  671. } else if _itimediff(current, segment.resendts) >= 0 { // RTO
  672. needsend = true
  673. if kcp.nodelay == 0 {
  674. segment.rto += kcp.rx_rto
  675. } else {
  676. segment.rto += kcp.rx_rto / 2
  677. }
  678. segment.resendts = current + segment.rto
  679. lost++
  680. lostSegs++
  681. } else if segment.fastack >= resent { // fast retransmit
  682. needsend = true
  683. segment.fastack = 0
  684. segment.rto = kcp.rx_rto
  685. segment.resendts = current + segment.rto
  686. change++
  687. fastRetransSegs++
  688. } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
  689. needsend = true
  690. segment.fastack = 0
  691. segment.rto = kcp.rx_rto
  692. segment.resendts = current + segment.rto
  693. change++
  694. earlyRetransSegs++
  695. }
  696. if needsend {
  697. current = currentMs() // time update for a blocking call
  698. segment.xmit++
  699. segment.ts = current
  700. segment.wnd = seg.wnd
  701. segment.una = seg.una
  702. size := len(buffer) - len(ptr)
  703. need := IKCP_OVERHEAD + len(segment.data)
  704. if size+need > int(kcp.mtu) {
  705. kcp.output(buffer, size)
  706. ptr = buffer
  707. }
  708. ptr = segment.encode(ptr)
  709. copy(ptr, segment.data)
  710. ptr = ptr[len(segment.data):]
  711. if segment.xmit >= kcp.dead_link {
  712. kcp.state = 0xFFFFFFFF
  713. }
  714. }
  715. // get the nearest rto
  716. if rto := _itimediff(segment.resendts, current); rto > 0 && rto < minrto {
  717. minrto = rto
  718. }
  719. }
  720. // flash remain segments
  721. size := len(buffer) - len(ptr)
  722. if size > 0 {
  723. kcp.output(buffer, size)
  724. }
  725. // counter updates
  726. sum := lostSegs
  727. if lostSegs > 0 {
  728. atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
  729. }
  730. if fastRetransSegs > 0 {
  731. atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
  732. sum += fastRetransSegs
  733. }
  734. if earlyRetransSegs > 0 {
  735. atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
  736. sum += earlyRetransSegs
  737. }
  738. if sum > 0 {
  739. atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
  740. }
  741. // cwnd update
  742. if kcp.nocwnd == 0 {
  743. // update ssthresh
  744. // rate halving, https://tools.ietf.org/html/rfc6937
  745. if change > 0 {
  746. inflight := kcp.snd_nxt - kcp.snd_una
  747. kcp.ssthresh = inflight / 2
  748. if kcp.ssthresh < IKCP_THRESH_MIN {
  749. kcp.ssthresh = IKCP_THRESH_MIN
  750. }
  751. kcp.cwnd = kcp.ssthresh + resent
  752. kcp.incr = kcp.cwnd * kcp.mss
  753. }
  754. // congestion control, https://tools.ietf.org/html/rfc5681
  755. if lost > 0 {
  756. kcp.ssthresh = cwnd / 2
  757. if kcp.ssthresh < IKCP_THRESH_MIN {
  758. kcp.ssthresh = IKCP_THRESH_MIN
  759. }
  760. kcp.cwnd = 1
  761. kcp.incr = kcp.mss
  762. }
  763. if kcp.cwnd < 1 {
  764. kcp.cwnd = 1
  765. kcp.incr = kcp.mss
  766. }
  767. }
  768. return uint32(minrto)
  769. }
  770. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  771. // ikcp_check when to call it again (without ikcp_input/_send calling).
  772. // 'current' - current timestamp in millisec.
  773. func (kcp *KCP) Update() {
  774. var slap int32
  775. current := currentMs()
  776. if kcp.updated == 0 {
  777. kcp.updated = 1
  778. kcp.ts_flush = current
  779. }
  780. slap = _itimediff(current, kcp.ts_flush)
  781. if slap >= 10000 || slap < -10000 {
  782. kcp.ts_flush = current
  783. slap = 0
  784. }
  785. if slap >= 0 {
  786. kcp.ts_flush += kcp.interval
  787. if _itimediff(current, kcp.ts_flush) >= 0 {
  788. kcp.ts_flush = current + kcp.interval
  789. }
  790. kcp.flush(false)
  791. }
  792. }
  793. // Check determines when should you invoke ikcp_update:
  794. // returns when you should invoke ikcp_update in millisec, if there
  795. // is no ikcp_input/_send calling. you can call ikcp_update in that
  796. // time, instead of call update repeatly.
  797. // Important to reduce unnacessary ikcp_update invoking. use it to
  798. // schedule ikcp_update (eg. implementing an epoll-like mechanism,
  799. // or optimize ikcp_update when handling massive kcp connections)
  800. func (kcp *KCP) Check() uint32 {
  801. current := currentMs()
  802. ts_flush := kcp.ts_flush
  803. tm_flush := int32(0x7fffffff)
  804. tm_packet := int32(0x7fffffff)
  805. minimal := uint32(0)
  806. if kcp.updated == 0 {
  807. return current
  808. }
  809. if _itimediff(current, ts_flush) >= 10000 ||
  810. _itimediff(current, ts_flush) < -10000 {
  811. ts_flush = current
  812. }
  813. if _itimediff(current, ts_flush) >= 0 {
  814. return current
  815. }
  816. tm_flush = _itimediff(ts_flush, current)
  817. for k := range kcp.snd_buf {
  818. seg := &kcp.snd_buf[k]
  819. diff := _itimediff(seg.resendts, current)
  820. if diff <= 0 {
  821. return current
  822. }
  823. if diff < tm_packet {
  824. tm_packet = diff
  825. }
  826. }
  827. minimal = uint32(tm_packet)
  828. if tm_packet >= tm_flush {
  829. minimal = uint32(tm_flush)
  830. }
  831. if minimal >= kcp.interval {
  832. minimal = kcp.interval
  833. }
  834. return current + minimal
  835. }
  836. // SetMtu changes MTU size, default is 1400
  837. func (kcp *KCP) SetMtu(mtu int) int {
  838. if mtu < 50 || mtu < IKCP_OVERHEAD {
  839. return -1
  840. }
  841. buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
  842. if buffer == nil {
  843. return -2
  844. }
  845. kcp.mtu = uint32(mtu)
  846. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  847. kcp.buffer = buffer
  848. return 0
  849. }
  850. // NoDelay options
  851. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  852. // nodelay: 0:disable(default), 1:enable
  853. // interval: internal update timer interval in millisec, default is 100ms
  854. // resend: 0:disable fast resend(default), 1:enable fast resend
  855. // nc: 0:normal congestion control(default), 1:disable congestion control
  856. func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
  857. if nodelay >= 0 {
  858. kcp.nodelay = uint32(nodelay)
  859. if nodelay != 0 {
  860. kcp.rx_minrto = IKCP_RTO_NDL
  861. } else {
  862. kcp.rx_minrto = IKCP_RTO_MIN
  863. }
  864. }
  865. if interval >= 0 {
  866. if interval > 5000 {
  867. interval = 5000
  868. } else if interval < 10 {
  869. interval = 10
  870. }
  871. kcp.interval = uint32(interval)
  872. }
  873. if resend >= 0 {
  874. kcp.fastresend = int32(resend)
  875. }
  876. if nc >= 0 {
  877. kcp.nocwnd = int32(nc)
  878. }
  879. return 0
  880. }
  881. // WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
  882. func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
  883. if sndwnd > 0 {
  884. kcp.snd_wnd = uint32(sndwnd)
  885. }
  886. if rcvwnd > 0 {
  887. kcp.rcv_wnd = uint32(rcvwnd)
  888. }
  889. return 0
  890. }
  891. // WaitSnd gets how many packet is waiting to be sent
  892. func (kcp *KCP) WaitSnd() int {
  893. return len(kcp.snd_buf) + len(kcp.snd_queue)
  894. }
  895. // remove front n elements from queue
  896. func (kcp *KCP) remove_front(q []segment, n int) []segment {
  897. newn := copy(q, q[n:])
  898. return q[:newn]
  899. }