sess.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985
  1. package kcp
  2. import (
  3. "crypto/rand"
  4. "encoding/binary"
  5. "hash/crc32"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/pkg/errors"
  11. "golang.org/x/net/ipv4"
  12. "golang.org/x/net/ipv6"
  13. )
  14. type errTimeout struct {
  15. error
  16. }
  17. func (errTimeout) Timeout() bool { return true }
  18. func (errTimeout) Temporary() bool { return true }
  19. func (errTimeout) Error() string { return "i/o timeout" }
  20. const (
  21. // 16-bytes nonce for each packet
  22. nonceSize = 16
  23. // 4-bytes packet checksum
  24. crcSize = 4
  25. // overall crypto header size
  26. cryptHeaderSize = nonceSize + crcSize
  27. // maximum packet size
  28. mtuLimit = 1500
  29. // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
  30. rxFECMulti = 3
  31. // accept backlog
  32. acceptBacklog = 128
  33. )
  34. const (
  35. errBrokenPipe = "broken pipe"
  36. errInvalidOperation = "invalid operation"
  37. )
  38. var (
  39. // a system-wide packet buffer shared among sending, receiving and FEC
  40. // to mitigate high-frequency memory allocation for packets
  41. xmitBuf sync.Pool
  42. )
  43. func init() {
  44. xmitBuf.New = func() interface{} {
  45. return make([]byte, mtuLimit)
  46. }
  47. }
  48. type (
  49. // UDPSession defines a KCP session implemented by UDP
  50. UDPSession struct {
  51. updaterIdx int // record slice index in updater
  52. conn net.PacketConn // the underlying packet connection
  53. kcp *KCP // KCP ARQ protocol
  54. l *Listener // pointing to the Listener object if it's been accepted by a Listener
  55. block BlockCrypt // block encryption object
  56. // kcp receiving is based on packets
  57. // recvbuf turns packets into stream
  58. recvbuf []byte
  59. bufptr []byte
  60. // header extended output buffer, if has header
  61. ext []byte
  62. // FEC codec
  63. fecDecoder *fecDecoder
  64. fecEncoder *fecEncoder
  65. // settings
  66. remote net.Addr // remote peer address
  67. rd time.Time // read deadline
  68. wd time.Time // write deadline
  69. headerSize int // the header size additional to a KCP frame
  70. ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
  71. writeDelay bool // delay kcp.flush() for Write() for bulk transfer
  72. dup int // duplicate udp packets(testing purpose)
  73. // notifications
  74. die chan struct{} // notify current session has Closed
  75. chReadEvent chan struct{} // notify Read() can be called without blocking
  76. chWriteEvent chan struct{} // notify Write() can be called without blocking
  77. chReadError chan error // notify PacketConn.Read() have an error
  78. chWriteError chan error // notify PacketConn.Write() have an error
  79. // nonce generator
  80. nonce Entropy
  81. isClosed bool // flag the session has Closed
  82. mu sync.Mutex
  83. }
  84. setReadBuffer interface {
  85. SetReadBuffer(bytes int) error
  86. }
  87. setWriteBuffer interface {
  88. SetWriteBuffer(bytes int) error
  89. }
  90. )
  91. // newUDPSession create a new udp session for client or server
  92. func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
  93. sess := new(UDPSession)
  94. sess.die = make(chan struct{})
  95. sess.nonce = new(nonceAES128)
  96. sess.nonce.Init()
  97. sess.chReadEvent = make(chan struct{}, 1)
  98. sess.chWriteEvent = make(chan struct{}, 1)
  99. sess.chReadError = make(chan error, 1)
  100. sess.chWriteError = make(chan error, 1)
  101. sess.remote = remote
  102. sess.conn = conn
  103. sess.l = l
  104. sess.block = block
  105. sess.recvbuf = make([]byte, mtuLimit)
  106. // FEC codec initialization
  107. sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  108. if sess.block != nil {
  109. sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
  110. } else {
  111. sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
  112. }
  113. // calculate additional header size introduced by FEC and encryption
  114. if sess.block != nil {
  115. sess.headerSize += cryptHeaderSize
  116. }
  117. if sess.fecEncoder != nil {
  118. sess.headerSize += fecHeaderSizePlus2
  119. }
  120. // we only need to allocate extended packet buffer if we have the additional header
  121. if sess.headerSize > 0 {
  122. sess.ext = make([]byte, mtuLimit)
  123. }
  124. sess.kcp = NewKCP(conv, func(buf []byte, size int) {
  125. if size >= IKCP_OVERHEAD {
  126. sess.output(buf[:size])
  127. }
  128. })
  129. sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
  130. // register current session to the global updater,
  131. // which call sess.update() periodically.
  132. updater.addSession(sess)
  133. if sess.l == nil { // it's a client connection
  134. go sess.readLoop()
  135. atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
  136. } else {
  137. atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
  138. }
  139. currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
  140. maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
  141. if currestab > maxconn {
  142. atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
  143. }
  144. return sess
  145. }
  146. // Read implements net.Conn
  147. func (s *UDPSession) Read(b []byte) (n int, err error) {
  148. for {
  149. s.mu.Lock()
  150. if len(s.bufptr) > 0 { // copy from buffer into b
  151. n = copy(b, s.bufptr)
  152. s.bufptr = s.bufptr[n:]
  153. s.mu.Unlock()
  154. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
  155. return n, nil
  156. }
  157. if s.isClosed {
  158. s.mu.Unlock()
  159. return 0, errors.New(errBrokenPipe)
  160. }
  161. if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
  162. if len(b) >= size { // receive data into 'b' directly
  163. s.kcp.Recv(b)
  164. s.mu.Unlock()
  165. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
  166. return size, nil
  167. }
  168. // if necessary resize the stream buffer to guarantee a sufficent buffer space
  169. if cap(s.recvbuf) < size {
  170. s.recvbuf = make([]byte, size)
  171. }
  172. // resize the length of recvbuf to correspond to data size
  173. s.recvbuf = s.recvbuf[:size]
  174. s.kcp.Recv(s.recvbuf)
  175. n = copy(b, s.recvbuf) // copy to 'b'
  176. s.bufptr = s.recvbuf[n:] // pointer update
  177. s.mu.Unlock()
  178. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
  179. return n, nil
  180. }
  181. // deadline for current reading operation
  182. var timeout *time.Timer
  183. var c <-chan time.Time
  184. if !s.rd.IsZero() {
  185. if time.Now().After(s.rd) {
  186. s.mu.Unlock()
  187. return 0, errTimeout{}
  188. }
  189. delay := s.rd.Sub(time.Now())
  190. timeout = time.NewTimer(delay)
  191. c = timeout.C
  192. }
  193. s.mu.Unlock()
  194. // wait for read event or timeout
  195. select {
  196. case <-s.chReadEvent:
  197. case <-c:
  198. case <-s.die:
  199. case err = <-s.chReadError:
  200. if timeout != nil {
  201. timeout.Stop()
  202. }
  203. return n, err
  204. }
  205. if timeout != nil {
  206. timeout.Stop()
  207. }
  208. }
  209. }
  210. // Write implements net.Conn
  211. func (s *UDPSession) Write(b []byte) (n int, err error) {
  212. for {
  213. s.mu.Lock()
  214. if s.isClosed {
  215. s.mu.Unlock()
  216. return 0, errors.New(errBrokenPipe)
  217. }
  218. // controls how much data will be sent to kcp core
  219. // to prevent the memory from exhuasting
  220. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  221. n = len(b)
  222. for {
  223. if len(b) <= int(s.kcp.mss) {
  224. s.kcp.Send(b)
  225. break
  226. } else {
  227. s.kcp.Send(b[:s.kcp.mss])
  228. b = b[s.kcp.mss:]
  229. }
  230. }
  231. // flush immediately if the queue is full
  232. if s.kcp.WaitSnd() >= int(s.kcp.snd_wnd) || !s.writeDelay {
  233. s.kcp.flush(false)
  234. }
  235. s.mu.Unlock()
  236. atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
  237. return n, nil
  238. }
  239. // deadline for current writing operation
  240. var timeout *time.Timer
  241. var c <-chan time.Time
  242. if !s.wd.IsZero() {
  243. if time.Now().After(s.wd) {
  244. s.mu.Unlock()
  245. return 0, errTimeout{}
  246. }
  247. delay := s.wd.Sub(time.Now())
  248. timeout = time.NewTimer(delay)
  249. c = timeout.C
  250. }
  251. s.mu.Unlock()
  252. // wait for write event or timeout
  253. select {
  254. case <-s.chWriteEvent:
  255. case <-c:
  256. case <-s.die:
  257. case err = <-s.chWriteError:
  258. if timeout != nil {
  259. timeout.Stop()
  260. }
  261. return n, err
  262. }
  263. if timeout != nil {
  264. timeout.Stop()
  265. }
  266. }
  267. }
  268. // Close closes the connection.
  269. func (s *UDPSession) Close() error {
  270. // remove current session from updater & listener(if necessary)
  271. updater.removeSession(s)
  272. if s.l != nil { // notify listener
  273. s.l.closeSession(s.remote)
  274. }
  275. s.mu.Lock()
  276. defer s.mu.Unlock()
  277. if s.isClosed {
  278. return errors.New(errBrokenPipe)
  279. }
  280. close(s.die)
  281. s.isClosed = true
  282. atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
  283. if s.l == nil { // client socket close
  284. return s.conn.Close()
  285. }
  286. return nil
  287. }
  288. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  289. func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
  290. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  291. func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
  292. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  293. func (s *UDPSession) SetDeadline(t time.Time) error {
  294. s.mu.Lock()
  295. defer s.mu.Unlock()
  296. s.rd = t
  297. s.wd = t
  298. s.notifyReadEvent()
  299. s.notifyWriteEvent()
  300. return nil
  301. }
  302. // SetReadDeadline implements the Conn SetReadDeadline method.
  303. func (s *UDPSession) SetReadDeadline(t time.Time) error {
  304. s.mu.Lock()
  305. defer s.mu.Unlock()
  306. s.rd = t
  307. s.notifyReadEvent()
  308. return nil
  309. }
  310. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  311. func (s *UDPSession) SetWriteDeadline(t time.Time) error {
  312. s.mu.Lock()
  313. defer s.mu.Unlock()
  314. s.wd = t
  315. s.notifyWriteEvent()
  316. return nil
  317. }
  318. // SetWriteDelay delays write for bulk transfer until the next update interval
  319. func (s *UDPSession) SetWriteDelay(delay bool) {
  320. s.mu.Lock()
  321. defer s.mu.Unlock()
  322. s.writeDelay = delay
  323. }
  324. // SetWindowSize set maximum window size
  325. func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
  326. s.mu.Lock()
  327. defer s.mu.Unlock()
  328. s.kcp.WndSize(sndwnd, rcvwnd)
  329. }
  330. // SetMtu sets the maximum transmission unit(not including UDP header)
  331. func (s *UDPSession) SetMtu(mtu int) bool {
  332. if mtu > mtuLimit {
  333. return false
  334. }
  335. s.mu.Lock()
  336. defer s.mu.Unlock()
  337. s.kcp.SetMtu(mtu - s.headerSize)
  338. return true
  339. }
  340. // SetStreamMode toggles the stream mode on/off
  341. func (s *UDPSession) SetStreamMode(enable bool) {
  342. s.mu.Lock()
  343. defer s.mu.Unlock()
  344. if enable {
  345. s.kcp.stream = 1
  346. } else {
  347. s.kcp.stream = 0
  348. }
  349. }
  350. // SetACKNoDelay changes ack flush option, set true to flush ack immediately,
  351. func (s *UDPSession) SetACKNoDelay(nodelay bool) {
  352. s.mu.Lock()
  353. defer s.mu.Unlock()
  354. s.ackNoDelay = nodelay
  355. }
  356. // SetDUP duplicates udp packets for kcp output, for testing purpose only
  357. func (s *UDPSession) SetDUP(dup int) {
  358. s.mu.Lock()
  359. defer s.mu.Unlock()
  360. s.dup = dup
  361. }
  362. // SetNoDelay calls nodelay() of kcp
  363. // https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
  364. func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
  365. s.mu.Lock()
  366. defer s.mu.Unlock()
  367. s.kcp.NoDelay(nodelay, interval, resend, nc)
  368. }
  369. // SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
  370. func (s *UDPSession) SetDSCP(dscp int) error {
  371. s.mu.Lock()
  372. defer s.mu.Unlock()
  373. if s.l == nil {
  374. if nc, ok := s.conn.(net.Conn); ok {
  375. if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err != nil {
  376. return ipv6.NewConn(nc).SetTrafficClass(dscp)
  377. }
  378. return nil
  379. }
  380. }
  381. return errors.New(errInvalidOperation)
  382. }
  383. // SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
  384. func (s *UDPSession) SetReadBuffer(bytes int) error {
  385. s.mu.Lock()
  386. defer s.mu.Unlock()
  387. if s.l == nil {
  388. if nc, ok := s.conn.(setReadBuffer); ok {
  389. return nc.SetReadBuffer(bytes)
  390. }
  391. }
  392. return errors.New(errInvalidOperation)
  393. }
  394. // SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
  395. func (s *UDPSession) SetWriteBuffer(bytes int) error {
  396. s.mu.Lock()
  397. defer s.mu.Unlock()
  398. if s.l == nil {
  399. if nc, ok := s.conn.(setWriteBuffer); ok {
  400. return nc.SetWriteBuffer(bytes)
  401. }
  402. }
  403. return errors.New(errInvalidOperation)
  404. }
  405. // post-processing for sending a packet from kcp core
  406. // steps:
  407. // 0. Header extending
  408. // 1. FEC packet generation
  409. // 2. CRC32 integrity
  410. // 3. Encryption
  411. // 4. WriteTo kernel
  412. func (s *UDPSession) output(buf []byte) {
  413. var ecc [][]byte
  414. // 0. extend buf's header space(if necessary)
  415. ext := buf
  416. if s.headerSize > 0 {
  417. ext = s.ext[:s.headerSize+len(buf)]
  418. copy(ext[s.headerSize:], buf)
  419. }
  420. // 1. FEC encoding
  421. if s.fecEncoder != nil {
  422. ecc = s.fecEncoder.encode(ext)
  423. }
  424. // 2&3. crc32 & encryption
  425. if s.block != nil {
  426. s.nonce.Fill(ext[:nonceSize])
  427. checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
  428. binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
  429. s.block.Encrypt(ext, ext)
  430. for k := range ecc {
  431. s.nonce.Fill(ecc[k][:nonceSize])
  432. checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
  433. binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
  434. s.block.Encrypt(ecc[k], ecc[k])
  435. }
  436. }
  437. // 4. WriteTo kernel
  438. nbytes := 0
  439. npkts := 0
  440. for i := 0; i < s.dup+1; i++ {
  441. if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
  442. nbytes += n
  443. npkts++
  444. } else {
  445. s.notifyWriteError(err)
  446. }
  447. }
  448. for k := range ecc {
  449. if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
  450. nbytes += n
  451. npkts++
  452. } else {
  453. s.notifyWriteError(err)
  454. }
  455. }
  456. atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
  457. atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
  458. }
  459. // kcp update, returns interval for next calling
  460. func (s *UDPSession) update() (interval time.Duration) {
  461. s.mu.Lock()
  462. waitsnd := s.kcp.WaitSnd()
  463. interval = time.Duration(s.kcp.flush(false)) * time.Millisecond
  464. if s.kcp.WaitSnd() < waitsnd {
  465. s.notifyWriteEvent()
  466. }
  467. s.mu.Unlock()
  468. return
  469. }
  470. // GetConv gets conversation id of a session
  471. func (s *UDPSession) GetConv() uint32 { return s.kcp.conv }
  472. func (s *UDPSession) notifyReadEvent() {
  473. select {
  474. case s.chReadEvent <- struct{}{}:
  475. default:
  476. }
  477. }
  478. func (s *UDPSession) notifyWriteEvent() {
  479. select {
  480. case s.chWriteEvent <- struct{}{}:
  481. default:
  482. }
  483. }
  484. func (s *UDPSession) notifyWriteError(err error) {
  485. select {
  486. case s.chWriteError <- err:
  487. default:
  488. }
  489. }
  490. func (s *UDPSession) kcpInput(data []byte) {
  491. var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
  492. if s.fecDecoder != nil {
  493. if len(data) > fecHeaderSize { // must be larger than fec header size
  494. f := s.fecDecoder.decodeBytes(data)
  495. if f.flag == typeData || f.flag == typeFEC { // header check
  496. if f.flag == typeFEC {
  497. fecParityShards++
  498. }
  499. recovers := s.fecDecoder.decode(f)
  500. s.mu.Lock()
  501. waitsnd := s.kcp.WaitSnd()
  502. if f.flag == typeData {
  503. if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
  504. kcpInErrors++
  505. }
  506. }
  507. for _, r := range recovers {
  508. if len(r) >= 2 { // must be larger than 2bytes
  509. sz := binary.LittleEndian.Uint16(r)
  510. if int(sz) <= len(r) && sz >= 2 {
  511. if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
  512. fecRecovered++
  513. } else {
  514. kcpInErrors++
  515. }
  516. } else {
  517. fecErrs++
  518. }
  519. } else {
  520. fecErrs++
  521. }
  522. }
  523. // to notify the readers to receive the data
  524. if n := s.kcp.PeekSize(); n > 0 {
  525. s.notifyReadEvent()
  526. }
  527. // to notify the writers when queue is shorter(e.g. ACKed)
  528. if s.kcp.WaitSnd() < waitsnd {
  529. s.notifyWriteEvent()
  530. }
  531. s.mu.Unlock()
  532. } else {
  533. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  534. }
  535. } else {
  536. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  537. }
  538. } else {
  539. s.mu.Lock()
  540. waitsnd := s.kcp.WaitSnd()
  541. if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
  542. kcpInErrors++
  543. }
  544. if n := s.kcp.PeekSize(); n > 0 {
  545. s.notifyReadEvent()
  546. }
  547. if s.kcp.WaitSnd() < waitsnd {
  548. s.notifyWriteEvent()
  549. }
  550. s.mu.Unlock()
  551. }
  552. atomic.AddUint64(&DefaultSnmp.InPkts, 1)
  553. atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
  554. if fecParityShards > 0 {
  555. atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
  556. }
  557. if kcpInErrors > 0 {
  558. atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
  559. }
  560. if fecErrs > 0 {
  561. atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
  562. }
  563. if fecRecovered > 0 {
  564. atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
  565. }
  566. }
  567. // the read loop for a client session
  568. func (s *UDPSession) readLoop() {
  569. buf := make([]byte, mtuLimit)
  570. var src string
  571. for {
  572. if n, addr, err := s.conn.ReadFrom(buf); err == nil {
  573. // make sure the packet is from the same source
  574. if src == "" { // set source address
  575. src = addr.String()
  576. } else if addr.String() != src {
  577. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  578. continue
  579. }
  580. if n >= s.headerSize+IKCP_OVERHEAD {
  581. data := buf[:n]
  582. dataValid := false
  583. if s.block != nil {
  584. s.block.Decrypt(data, data)
  585. data = data[nonceSize:]
  586. checksum := crc32.ChecksumIEEE(data[crcSize:])
  587. if checksum == binary.LittleEndian.Uint32(data) {
  588. data = data[crcSize:]
  589. dataValid = true
  590. } else {
  591. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  592. }
  593. } else if s.block == nil {
  594. dataValid = true
  595. }
  596. if dataValid {
  597. s.kcpInput(data)
  598. }
  599. } else {
  600. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  601. }
  602. } else {
  603. s.chReadError <- err
  604. return
  605. }
  606. }
  607. }
  608. type (
  609. // Listener defines a server which will be waiting to accept incoming connections
  610. Listener struct {
  611. block BlockCrypt // block encryption
  612. dataShards int // FEC data shard
  613. parityShards int // FEC parity shard
  614. fecDecoder *fecDecoder // FEC mock initialization
  615. conn net.PacketConn // the underlying packet connection
  616. sessions map[string]*UDPSession // all sessions accepted by this Listener
  617. sessionLock sync.Mutex
  618. chAccepts chan *UDPSession // Listen() backlog
  619. chSessionClosed chan net.Addr // session close queue
  620. headerSize int // the additional header to a KCP frame
  621. die chan struct{} // notify the listener has closed
  622. rd atomic.Value // read deadline for Accept()
  623. wd atomic.Value
  624. }
  625. )
  626. // monitor incoming data for all connections of server
  627. func (l *Listener) monitor() {
  628. // a cache for session object last used
  629. var lastAddr string
  630. var lastSession *UDPSession
  631. buf := make([]byte, mtuLimit)
  632. for {
  633. if n, from, err := l.conn.ReadFrom(buf); err == nil {
  634. if n >= l.headerSize+IKCP_OVERHEAD {
  635. data := buf[:n]
  636. dataValid := false
  637. if l.block != nil {
  638. l.block.Decrypt(data, data)
  639. data = data[nonceSize:]
  640. checksum := crc32.ChecksumIEEE(data[crcSize:])
  641. if checksum == binary.LittleEndian.Uint32(data) {
  642. data = data[crcSize:]
  643. dataValid = true
  644. } else {
  645. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  646. }
  647. } else if l.block == nil {
  648. dataValid = true
  649. }
  650. if dataValid {
  651. addr := from.String()
  652. var s *UDPSession
  653. var ok bool
  654. // the packets received from an address always come in batch,
  655. // cache the session for next packet, without querying map.
  656. if addr == lastAddr {
  657. s, ok = lastSession, true
  658. } else {
  659. l.sessionLock.Lock()
  660. if s, ok = l.sessions[addr]; ok {
  661. lastSession = s
  662. lastAddr = addr
  663. }
  664. l.sessionLock.Unlock()
  665. }
  666. if !ok { // new session
  667. if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
  668. var conv uint32
  669. convValid := false
  670. if l.fecDecoder != nil {
  671. isfec := binary.LittleEndian.Uint16(data[4:])
  672. if isfec == typeData {
  673. conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
  674. convValid = true
  675. }
  676. } else {
  677. conv = binary.LittleEndian.Uint32(data)
  678. convValid = true
  679. }
  680. if convValid { // creates a new session only if the 'conv' field in kcp is accessible
  681. s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
  682. s.kcpInput(data)
  683. l.sessionLock.Lock()
  684. l.sessions[addr] = s
  685. l.sessionLock.Unlock()
  686. l.chAccepts <- s
  687. }
  688. }
  689. } else {
  690. s.kcpInput(data)
  691. }
  692. }
  693. } else {
  694. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  695. }
  696. } else {
  697. return
  698. }
  699. }
  700. }
  701. // SetReadBuffer sets the socket read buffer for the Listener
  702. func (l *Listener) SetReadBuffer(bytes int) error {
  703. if nc, ok := l.conn.(setReadBuffer); ok {
  704. return nc.SetReadBuffer(bytes)
  705. }
  706. return errors.New(errInvalidOperation)
  707. }
  708. // SetWriteBuffer sets the socket write buffer for the Listener
  709. func (l *Listener) SetWriteBuffer(bytes int) error {
  710. if nc, ok := l.conn.(setWriteBuffer); ok {
  711. return nc.SetWriteBuffer(bytes)
  712. }
  713. return errors.New(errInvalidOperation)
  714. }
  715. // SetDSCP sets the 6bit DSCP field of IP header
  716. func (l *Listener) SetDSCP(dscp int) error {
  717. if nc, ok := l.conn.(net.Conn); ok {
  718. if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err != nil {
  719. return ipv6.NewConn(nc).SetTrafficClass(dscp)
  720. }
  721. return nil
  722. }
  723. return errors.New(errInvalidOperation)
  724. }
  725. // Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
  726. func (l *Listener) Accept() (net.Conn, error) {
  727. return l.AcceptKCP()
  728. }
  729. // AcceptKCP accepts a KCP connection
  730. func (l *Listener) AcceptKCP() (*UDPSession, error) {
  731. var timeout <-chan time.Time
  732. if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
  733. timeout = time.After(tdeadline.Sub(time.Now()))
  734. }
  735. select {
  736. case <-timeout:
  737. return nil, &errTimeout{}
  738. case c := <-l.chAccepts:
  739. return c, nil
  740. case <-l.die:
  741. return nil, errors.New(errBrokenPipe)
  742. }
  743. }
  744. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  745. func (l *Listener) SetDeadline(t time.Time) error {
  746. l.SetReadDeadline(t)
  747. l.SetWriteDeadline(t)
  748. return nil
  749. }
  750. // SetReadDeadline implements the Conn SetReadDeadline method.
  751. func (l *Listener) SetReadDeadline(t time.Time) error {
  752. l.rd.Store(t)
  753. return nil
  754. }
  755. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  756. func (l *Listener) SetWriteDeadline(t time.Time) error {
  757. l.wd.Store(t)
  758. return nil
  759. }
  760. // Close stops listening on the UDP address. Already Accepted connections are not closed.
  761. func (l *Listener) Close() error {
  762. close(l.die)
  763. return l.conn.Close()
  764. }
  765. // closeSession notify the listener that a session has closed
  766. func (l *Listener) closeSession(remote net.Addr) (ret bool) {
  767. l.sessionLock.Lock()
  768. defer l.sessionLock.Unlock()
  769. if _, ok := l.sessions[remote.String()]; ok {
  770. delete(l.sessions, remote.String())
  771. return true
  772. }
  773. return false
  774. }
  775. // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
  776. func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
  777. // Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
  778. func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
  779. // ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
  780. // dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
  781. func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
  782. udpaddr, err := net.ResolveUDPAddr("udp", laddr)
  783. if err != nil {
  784. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  785. }
  786. conn, err := net.ListenUDP("udp", udpaddr)
  787. if err != nil {
  788. return nil, errors.Wrap(err, "net.ListenUDP")
  789. }
  790. return ServeConn(block, dataShards, parityShards, conn)
  791. }
  792. // ServeConn serves KCP protocol for a single packet connection.
  793. func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
  794. l := new(Listener)
  795. l.conn = conn
  796. l.sessions = make(map[string]*UDPSession)
  797. l.chAccepts = make(chan *UDPSession, acceptBacklog)
  798. l.chSessionClosed = make(chan net.Addr)
  799. l.die = make(chan struct{})
  800. l.dataShards = dataShards
  801. l.parityShards = parityShards
  802. l.block = block
  803. l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  804. // calculate header size
  805. if l.block != nil {
  806. l.headerSize += cryptHeaderSize
  807. }
  808. if l.fecDecoder != nil {
  809. l.headerSize += fecHeaderSizePlus2
  810. }
  811. go l.monitor()
  812. return l, nil
  813. }
  814. // Dial connects to the remote address "raddr" on the network "udp"
  815. func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
  816. // DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
  817. func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
  818. // network type detection
  819. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  820. if err != nil {
  821. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  822. }
  823. network := "udp4"
  824. if udpaddr.IP.To4() == nil {
  825. network = "udp"
  826. }
  827. conn, err := net.ListenUDP(network, nil)
  828. if err != nil {
  829. return nil, errors.Wrap(err, "net.DialUDP")
  830. }
  831. return NewConn(raddr, block, dataShards, parityShards, conn)
  832. }
  833. // NewConn establishes a session and talks KCP protocol over a packet connection.
  834. func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
  835. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  836. if err != nil {
  837. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  838. }
  839. var convid uint32
  840. binary.Read(rand.Reader, binary.LittleEndian, &convid)
  841. return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
  842. }
  843. // monotonic reference time point
  844. var refTime time.Time = time.Now()
  845. // currentMs returns current elasped monotonic milliseconds since program startup
  846. func currentMs() uint32 { return uint32(time.Now().Sub(refTime) / time.Millisecond) }
  847. func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, dataShards, parityShards int, conn *net.UDPConn) (*UDPSession, error) {
  848. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  849. if err != nil {
  850. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  851. }
  852. var pConn net.PacketConn = conn
  853. if connected {
  854. pConn = &connectedUDPConn{conn}
  855. }
  856. return newUDPSession(convid, dataShards, parityShards, nil, pConn, udpaddr, block), nil
  857. }
  858. // connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
  859. // to Write syscalls that are 4 times faster on some OS'es. This should only be
  860. // used for connections that were produced by a net.Dial* call.
  861. type connectedUDPConn struct{ *net.UDPConn }
  862. // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
  863. func (c *connectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Write(b) }