sess.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. // Package kcp-go is a Reliable-UDP library for golang.
  2. //
  3. // This library intents to provide a smooth, resilient, ordered,
  4. // error-checked and anonymous delivery of streams over UDP packets.
  5. //
  6. // The interfaces of this package aims to be compatible with
  7. // net.Conn in standard library, but offers powerful features for advanced users.
  8. package kcp
  9. import (
  10. "crypto/rand"
  11. "encoding/binary"
  12. "hash/crc32"
  13. "io"
  14. "net"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. "github.com/pkg/errors"
  19. "golang.org/x/net/ipv4"
  20. "golang.org/x/net/ipv6"
  21. )
  22. const (
  23. // 16-bytes nonce for each packet
  24. nonceSize = 16
  25. // 4-bytes packet checksum
  26. crcSize = 4
  27. // overall crypto header size
  28. cryptHeaderSize = nonceSize + crcSize
  29. // maximum packet size
  30. mtuLimit = 1500
  31. // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
  32. rxFECMulti = 3
  33. // accept backlog
  34. acceptBacklog = 128
  35. )
  36. var (
  37. errInvalidOperation = errors.New("invalid operation")
  38. errTimeout = errors.New("timeout")
  39. )
  40. var (
  41. // a system-wide packet buffer shared among sending, receiving and FEC
  42. // to mitigate high-frequency memory allocation for packets
  43. xmitBuf sync.Pool
  44. )
  45. func init() {
  46. xmitBuf.New = func() interface{} {
  47. return make([]byte, mtuLimit)
  48. }
  49. }
  50. type (
  51. // UDPSession defines a KCP session implemented by UDP
  52. UDPSession struct {
  53. updaterIdx int // record slice index in updater
  54. conn net.PacketConn // the underlying packet connection
  55. kcp *KCP // KCP ARQ protocol
  56. l *Listener // pointing to the Listener object if it's been accepted by a Listener
  57. block BlockCrypt // block encryption object
  58. // kcp receiving is based on packets
  59. // recvbuf turns packets into stream
  60. recvbuf []byte
  61. bufptr []byte
  62. // FEC codec
  63. fecDecoder *fecDecoder
  64. fecEncoder *fecEncoder
  65. // settings
  66. remote net.Addr // remote peer address
  67. rd time.Time // read deadline
  68. wd time.Time // write deadline
  69. headerSize int // the header size additional to a KCP frame
  70. ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
  71. writeDelay bool // delay kcp.flush() for Write() for bulk transfer
  72. dup int // duplicate udp packets(testing purpose)
  73. // notifications
  74. die chan struct{} // notify current session has Closed
  75. dieOnce sync.Once
  76. chReadEvent chan struct{} // notify Read() can be called without blocking
  77. chWriteEvent chan struct{} // notify Write() can be called without blocking
  78. // socket error handling
  79. socketReadError atomic.Value
  80. socketWriteError atomic.Value
  81. chSocketReadError chan struct{}
  82. chSocketWriteError chan struct{}
  83. socketReadErrorOnce sync.Once
  84. socketWriteErrorOnce sync.Once
  85. // nonce generator
  86. nonce Entropy
  87. // packets waiting to be sent on wire
  88. txqueue []ipv4.Message
  89. xconn batchConn // for x/net
  90. xconnWriteError error
  91. mu sync.Mutex
  92. }
  93. setReadBuffer interface {
  94. SetReadBuffer(bytes int) error
  95. }
  96. setWriteBuffer interface {
  97. SetWriteBuffer(bytes int) error
  98. }
  99. )
  100. // newUDPSession create a new udp session for client or server
  101. func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
  102. sess := new(UDPSession)
  103. sess.die = make(chan struct{})
  104. sess.nonce = new(nonceAES128)
  105. sess.nonce.Init()
  106. sess.chReadEvent = make(chan struct{}, 1)
  107. sess.chWriteEvent = make(chan struct{}, 1)
  108. sess.chSocketReadError = make(chan struct{})
  109. sess.chSocketWriteError = make(chan struct{})
  110. sess.remote = remote
  111. sess.conn = conn
  112. sess.l = l
  113. sess.block = block
  114. sess.recvbuf = make([]byte, mtuLimit)
  115. // cast to writebatch conn
  116. if _, ok := conn.(*net.UDPConn); ok {
  117. addr, err := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
  118. if err == nil {
  119. if addr.IP.To4() != nil {
  120. sess.xconn = ipv4.NewPacketConn(conn)
  121. } else {
  122. sess.xconn = ipv6.NewPacketConn(conn)
  123. }
  124. }
  125. }
  126. // FEC codec initialization
  127. sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  128. if sess.block != nil {
  129. sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
  130. } else {
  131. sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
  132. }
  133. // calculate additional header size introduced by FEC and encryption
  134. if sess.block != nil {
  135. sess.headerSize += cryptHeaderSize
  136. }
  137. if sess.fecEncoder != nil {
  138. sess.headerSize += fecHeaderSizePlus2
  139. }
  140. sess.kcp = NewKCP(conv, func(buf []byte, size int) {
  141. if size >= IKCP_OVERHEAD+sess.headerSize {
  142. sess.output(buf[:size])
  143. }
  144. })
  145. sess.kcp.ReserveBytes(sess.headerSize)
  146. // register current session to the global updater,
  147. // which call sess.update() periodically.
  148. updater.addSession(sess)
  149. if sess.l == nil { // it's a client connection
  150. go sess.readLoop()
  151. atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
  152. } else {
  153. atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
  154. }
  155. currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
  156. maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
  157. if currestab > maxconn {
  158. atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
  159. }
  160. return sess
  161. }
  162. // Read implements net.Conn
  163. func (s *UDPSession) Read(b []byte) (n int, err error) {
  164. for {
  165. s.mu.Lock()
  166. if len(s.bufptr) > 0 { // copy from buffer into b
  167. n = copy(b, s.bufptr)
  168. s.bufptr = s.bufptr[n:]
  169. s.mu.Unlock()
  170. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
  171. return n, nil
  172. }
  173. if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
  174. if len(b) >= size { // receive data into 'b' directly
  175. s.kcp.Recv(b)
  176. s.mu.Unlock()
  177. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
  178. return size, nil
  179. }
  180. // if necessary resize the stream buffer to guarantee a sufficent buffer space
  181. if cap(s.recvbuf) < size {
  182. s.recvbuf = make([]byte, size)
  183. }
  184. // resize the length of recvbuf to correspond to data size
  185. s.recvbuf = s.recvbuf[:size]
  186. s.kcp.Recv(s.recvbuf)
  187. n = copy(b, s.recvbuf) // copy to 'b'
  188. s.bufptr = s.recvbuf[n:] // pointer update
  189. s.mu.Unlock()
  190. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
  191. return n, nil
  192. }
  193. // deadline for current reading operation
  194. var timeout *time.Timer
  195. var c <-chan time.Time
  196. if !s.rd.IsZero() {
  197. if time.Now().After(s.rd) {
  198. s.mu.Unlock()
  199. return 0, errors.WithStack(errTimeout)
  200. }
  201. delay := s.rd.Sub(time.Now())
  202. timeout = time.NewTimer(delay)
  203. c = timeout.C
  204. }
  205. s.mu.Unlock()
  206. // wait for read event or timeout or error
  207. select {
  208. case <-s.chReadEvent:
  209. if timeout != nil {
  210. timeout.Stop()
  211. }
  212. case <-c:
  213. return 0, errors.WithStack(errTimeout)
  214. case <-s.chSocketReadError:
  215. return 0, s.socketReadError.Load().(error)
  216. case <-s.die:
  217. return 0, errors.WithStack(io.ErrClosedPipe)
  218. }
  219. }
  220. }
  221. // Write implements net.Conn
  222. func (s *UDPSession) Write(b []byte) (n int, err error) { return s.WriteBuffers([][]byte{b}) }
  223. // WriteBuffers write a vector of byte slices to the underlying connection
  224. func (s *UDPSession) WriteBuffers(v [][]byte) (n int, err error) {
  225. for {
  226. select {
  227. case <-s.chSocketWriteError:
  228. return 0, s.socketWriteError.Load().(error)
  229. case <-s.die:
  230. return 0, errors.WithStack(io.ErrClosedPipe)
  231. default:
  232. }
  233. s.mu.Lock()
  234. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  235. for _, b := range v {
  236. n += len(b)
  237. for {
  238. if len(b) <= int(s.kcp.mss) {
  239. s.kcp.Send(b)
  240. break
  241. } else {
  242. s.kcp.Send(b[:s.kcp.mss])
  243. b = b[s.kcp.mss:]
  244. }
  245. }
  246. }
  247. if s.kcp.WaitSnd() >= int(s.kcp.snd_wnd) || !s.writeDelay {
  248. s.kcp.flush(false)
  249. s.uncork()
  250. }
  251. s.mu.Unlock()
  252. atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
  253. return n, nil
  254. }
  255. var timeout *time.Timer
  256. var c <-chan time.Time
  257. if !s.wd.IsZero() {
  258. if time.Now().After(s.wd) {
  259. s.mu.Unlock()
  260. return 0, errors.WithStack(errTimeout)
  261. }
  262. delay := s.wd.Sub(time.Now())
  263. timeout = time.NewTimer(delay)
  264. c = timeout.C
  265. }
  266. s.mu.Unlock()
  267. select {
  268. case <-s.chWriteEvent:
  269. if timeout != nil {
  270. timeout.Stop()
  271. }
  272. case <-c:
  273. return 0, errors.WithStack(errTimeout)
  274. case <-s.chSocketWriteError:
  275. return 0, s.socketWriteError.Load().(error)
  276. case <-s.die:
  277. return 0, errors.WithStack(io.ErrClosedPipe)
  278. }
  279. }
  280. }
  281. // uncork sends data in txqueue if there is any
  282. func (s *UDPSession) uncork() {
  283. if len(s.txqueue) > 0 {
  284. s.tx(s.txqueue)
  285. s.txqueue = s.txqueue[:0]
  286. }
  287. return
  288. }
  289. // Close closes the connection.
  290. func (s *UDPSession) Close() error {
  291. var once bool
  292. s.dieOnce.Do(func() {
  293. close(s.die)
  294. once = true
  295. })
  296. if once {
  297. // remove from updater
  298. updater.removeSession(s)
  299. atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
  300. if s.l != nil { // belongs to listener
  301. s.l.closeSession(s.remote)
  302. return nil
  303. } else { // client socket close
  304. return s.conn.Close()
  305. }
  306. } else {
  307. return errors.WithStack(io.ErrClosedPipe)
  308. }
  309. }
  310. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  311. func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
  312. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  313. func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
  314. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  315. func (s *UDPSession) SetDeadline(t time.Time) error {
  316. s.mu.Lock()
  317. defer s.mu.Unlock()
  318. s.rd = t
  319. s.wd = t
  320. s.notifyReadEvent()
  321. s.notifyWriteEvent()
  322. return nil
  323. }
  324. // SetReadDeadline implements the Conn SetReadDeadline method.
  325. func (s *UDPSession) SetReadDeadline(t time.Time) error {
  326. s.mu.Lock()
  327. defer s.mu.Unlock()
  328. s.rd = t
  329. s.notifyReadEvent()
  330. return nil
  331. }
  332. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  333. func (s *UDPSession) SetWriteDeadline(t time.Time) error {
  334. s.mu.Lock()
  335. defer s.mu.Unlock()
  336. s.wd = t
  337. s.notifyWriteEvent()
  338. return nil
  339. }
  340. // SetWriteDelay delays write for bulk transfer until the next update interval
  341. func (s *UDPSession) SetWriteDelay(delay bool) {
  342. s.mu.Lock()
  343. defer s.mu.Unlock()
  344. s.writeDelay = delay
  345. }
  346. // SetWindowSize set maximum window size
  347. func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
  348. s.mu.Lock()
  349. defer s.mu.Unlock()
  350. s.kcp.WndSize(sndwnd, rcvwnd)
  351. }
  352. // SetMtu sets the maximum transmission unit(not including UDP header)
  353. func (s *UDPSession) SetMtu(mtu int) bool {
  354. if mtu > mtuLimit {
  355. return false
  356. }
  357. s.mu.Lock()
  358. defer s.mu.Unlock()
  359. s.kcp.SetMtu(mtu)
  360. return true
  361. }
  362. // SetStreamMode toggles the stream mode on/off
  363. func (s *UDPSession) SetStreamMode(enable bool) {
  364. s.mu.Lock()
  365. defer s.mu.Unlock()
  366. if enable {
  367. s.kcp.stream = 1
  368. } else {
  369. s.kcp.stream = 0
  370. }
  371. }
  372. // SetACKNoDelay changes ack flush option, set true to flush ack immediately,
  373. func (s *UDPSession) SetACKNoDelay(nodelay bool) {
  374. s.mu.Lock()
  375. defer s.mu.Unlock()
  376. s.ackNoDelay = nodelay
  377. }
  378. // (deprecated)
  379. //
  380. // SetDUP duplicates udp packets for kcp output.
  381. func (s *UDPSession) SetDUP(dup int) {
  382. s.mu.Lock()
  383. defer s.mu.Unlock()
  384. s.dup = dup
  385. }
  386. // SetNoDelay calls nodelay() of kcp
  387. // https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
  388. func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
  389. s.mu.Lock()
  390. defer s.mu.Unlock()
  391. s.kcp.NoDelay(nodelay, interval, resend, nc)
  392. }
  393. // SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header.
  394. //
  395. // It has no effect if it's accepted from Listener.
  396. func (s *UDPSession) SetDSCP(dscp int) error {
  397. s.mu.Lock()
  398. defer s.mu.Unlock()
  399. if s.l != nil {
  400. return errInvalidOperation
  401. }
  402. if nc, ok := s.conn.(net.Conn); ok {
  403. var succeed bool
  404. if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil {
  405. succeed = true
  406. }
  407. if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil {
  408. succeed = true
  409. }
  410. if succeed {
  411. return nil
  412. }
  413. }
  414. return errInvalidOperation
  415. }
  416. // SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
  417. func (s *UDPSession) SetReadBuffer(bytes int) error {
  418. s.mu.Lock()
  419. defer s.mu.Unlock()
  420. if s.l == nil {
  421. if nc, ok := s.conn.(setReadBuffer); ok {
  422. return nc.SetReadBuffer(bytes)
  423. }
  424. }
  425. return errInvalidOperation
  426. }
  427. // SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
  428. func (s *UDPSession) SetWriteBuffer(bytes int) error {
  429. s.mu.Lock()
  430. defer s.mu.Unlock()
  431. if s.l == nil {
  432. if nc, ok := s.conn.(setWriteBuffer); ok {
  433. return nc.SetWriteBuffer(bytes)
  434. }
  435. }
  436. return errInvalidOperation
  437. }
  438. // post-processing for sending a packet from kcp core
  439. // steps:
  440. // 1. FEC packet generation
  441. // 2. CRC32 integrity
  442. // 3. Encryption
  443. // 4. TxQueue
  444. func (s *UDPSession) output(buf []byte) {
  445. var ecc [][]byte
  446. // 1. FEC encoding
  447. if s.fecEncoder != nil {
  448. ecc = s.fecEncoder.encode(buf)
  449. }
  450. // 2&3. crc32 & encryption
  451. if s.block != nil {
  452. s.nonce.Fill(buf[:nonceSize])
  453. checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
  454. binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
  455. s.block.Encrypt(buf, buf)
  456. for k := range ecc {
  457. s.nonce.Fill(ecc[k][:nonceSize])
  458. checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
  459. binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
  460. s.block.Encrypt(ecc[k], ecc[k])
  461. }
  462. }
  463. // 4. TxQueue
  464. var msg ipv4.Message
  465. for i := 0; i < s.dup+1; i++ {
  466. bts := xmitBuf.Get().([]byte)[:len(buf)]
  467. copy(bts, buf)
  468. msg.Buffers = [][]byte{bts}
  469. msg.Addr = s.remote
  470. s.txqueue = append(s.txqueue, msg)
  471. }
  472. for k := range ecc {
  473. bts := xmitBuf.Get().([]byte)[:len(ecc[k])]
  474. copy(bts, ecc[k])
  475. msg.Buffers = [][]byte{bts}
  476. msg.Addr = s.remote
  477. s.txqueue = append(s.txqueue, msg)
  478. }
  479. }
  480. // kcp update, returns interval for next calling
  481. func (s *UDPSession) update() (interval time.Duration) {
  482. s.mu.Lock()
  483. waitsnd := s.kcp.WaitSnd()
  484. interval = time.Duration(s.kcp.flush(false)) * time.Millisecond
  485. if s.kcp.WaitSnd() < waitsnd {
  486. s.notifyWriteEvent()
  487. }
  488. s.uncork()
  489. s.mu.Unlock()
  490. return
  491. }
  492. // GetConv gets conversation id of a session
  493. func (s *UDPSession) GetConv() uint32 { return s.kcp.conv }
  494. func (s *UDPSession) notifyReadEvent() {
  495. select {
  496. case s.chReadEvent <- struct{}{}:
  497. default:
  498. }
  499. }
  500. func (s *UDPSession) notifyWriteEvent() {
  501. select {
  502. case s.chWriteEvent <- struct{}{}:
  503. default:
  504. }
  505. }
  506. func (s *UDPSession) notifyReadError(err error) {
  507. s.socketReadErrorOnce.Do(func() {
  508. s.socketReadError.Store(err)
  509. close(s.chSocketReadError)
  510. })
  511. }
  512. func (s *UDPSession) notifyWriteError(err error) {
  513. s.socketWriteErrorOnce.Do(func() {
  514. s.socketWriteError.Store(err)
  515. close(s.chSocketWriteError)
  516. })
  517. }
  518. // packet input stage
  519. func (s *UDPSession) packetInput(data []byte) {
  520. dataValid := false
  521. if s.block != nil {
  522. s.block.Decrypt(data, data)
  523. data = data[nonceSize:]
  524. checksum := crc32.ChecksumIEEE(data[crcSize:])
  525. if checksum == binary.LittleEndian.Uint32(data) {
  526. data = data[crcSize:]
  527. dataValid = true
  528. } else {
  529. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  530. }
  531. } else if s.block == nil {
  532. dataValid = true
  533. }
  534. if dataValid {
  535. s.kcpInput(data)
  536. }
  537. }
  538. func (s *UDPSession) kcpInput(data []byte) {
  539. var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
  540. if s.fecDecoder != nil {
  541. if len(data) > fecHeaderSize { // must be larger than fec header size
  542. f := fecPacket(data)
  543. if f.flag() == typeData || f.flag() == typeParity { // header check
  544. if f.flag() == typeParity {
  545. fecParityShards++
  546. }
  547. recovers := s.fecDecoder.decode(f)
  548. s.mu.Lock()
  549. waitsnd := s.kcp.WaitSnd()
  550. if f.flag() == typeData {
  551. if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
  552. kcpInErrors++
  553. }
  554. }
  555. for _, r := range recovers {
  556. if len(r) >= 2 { // must be larger than 2bytes
  557. sz := binary.LittleEndian.Uint16(r)
  558. if int(sz) <= len(r) && sz >= 2 {
  559. if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
  560. fecRecovered++
  561. } else {
  562. kcpInErrors++
  563. }
  564. } else {
  565. fecErrs++
  566. }
  567. } else {
  568. fecErrs++
  569. }
  570. // recycle the recovers
  571. xmitBuf.Put(r)
  572. }
  573. // to notify the readers to receive the data
  574. if n := s.kcp.PeekSize(); n > 0 {
  575. s.notifyReadEvent()
  576. }
  577. // to notify the writers when queue is shorter(e.g. ACKed)
  578. if s.kcp.WaitSnd() < waitsnd {
  579. s.notifyWriteEvent()
  580. }
  581. s.uncork()
  582. s.mu.Unlock()
  583. } else {
  584. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  585. }
  586. } else {
  587. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  588. }
  589. } else {
  590. s.mu.Lock()
  591. waitsnd := s.kcp.WaitSnd()
  592. if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
  593. kcpInErrors++
  594. }
  595. if n := s.kcp.PeekSize(); n > 0 {
  596. s.notifyReadEvent()
  597. }
  598. if s.kcp.WaitSnd() < waitsnd {
  599. s.notifyWriteEvent()
  600. }
  601. s.uncork()
  602. s.mu.Unlock()
  603. }
  604. atomic.AddUint64(&DefaultSnmp.InPkts, 1)
  605. atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
  606. if fecParityShards > 0 {
  607. atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
  608. }
  609. if kcpInErrors > 0 {
  610. atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
  611. }
  612. if fecErrs > 0 {
  613. atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
  614. }
  615. if fecRecovered > 0 {
  616. atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
  617. }
  618. }
  619. type (
  620. // Listener defines a server which will be waiting to accept incoming connections
  621. Listener struct {
  622. block BlockCrypt // block encryption
  623. dataShards int // FEC data shard
  624. parityShards int // FEC parity shard
  625. fecDecoder *fecDecoder // FEC mock initialization
  626. conn net.PacketConn // the underlying packet connection
  627. sessions map[string]*UDPSession // all sessions accepted by this Listener
  628. sessionLock sync.Mutex
  629. chAccepts chan *UDPSession // Listen() backlog
  630. chSessionClosed chan net.Addr // session close queue
  631. headerSize int // the additional header to a KCP frame
  632. die chan struct{} // notify the listener has closed
  633. dieOnce sync.Once
  634. // socket error handling
  635. socketReadError atomic.Value
  636. chSocketReadError chan struct{}
  637. socketReadErrorOnce sync.Once
  638. rd atomic.Value // read deadline for Accept()
  639. }
  640. )
  641. // packet input stage
  642. func (l *Listener) packetInput(data []byte, addr net.Addr) {
  643. dataValid := false
  644. if l.block != nil {
  645. l.block.Decrypt(data, data)
  646. data = data[nonceSize:]
  647. checksum := crc32.ChecksumIEEE(data[crcSize:])
  648. if checksum == binary.LittleEndian.Uint32(data) {
  649. data = data[crcSize:]
  650. dataValid = true
  651. } else {
  652. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  653. }
  654. } else if l.block == nil {
  655. dataValid = true
  656. }
  657. if dataValid {
  658. l.sessionLock.Lock()
  659. s, ok := l.sessions[addr.String()]
  660. l.sessionLock.Unlock()
  661. if !ok { // new address:port
  662. if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
  663. var conv uint32
  664. convValid := false
  665. if l.fecDecoder != nil {
  666. isfec := binary.LittleEndian.Uint16(data[4:])
  667. if isfec == typeData {
  668. conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
  669. convValid = true
  670. }
  671. } else {
  672. conv = binary.LittleEndian.Uint32(data)
  673. convValid = true
  674. }
  675. if convValid { // creates a new session only if the 'conv' field in kcp is accessible
  676. s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, addr, l.block)
  677. s.kcpInput(data)
  678. l.sessionLock.Lock()
  679. l.sessions[addr.String()] = s
  680. l.sessionLock.Unlock()
  681. l.chAccepts <- s
  682. }
  683. }
  684. } else {
  685. s.kcpInput(data)
  686. }
  687. }
  688. }
  689. func (l *Listener) notifyReadError(err error) {
  690. l.socketReadErrorOnce.Do(func() {
  691. l.socketReadError.Store(err)
  692. close(l.chSocketReadError)
  693. // propagate read error to all sessions
  694. l.sessionLock.Lock()
  695. for _, s := range l.sessions {
  696. s.notifyReadError(err)
  697. }
  698. l.sessionLock.Unlock()
  699. })
  700. }
  701. // SetReadBuffer sets the socket read buffer for the Listener
  702. func (l *Listener) SetReadBuffer(bytes int) error {
  703. if nc, ok := l.conn.(setReadBuffer); ok {
  704. return nc.SetReadBuffer(bytes)
  705. }
  706. return errInvalidOperation
  707. }
  708. // SetWriteBuffer sets the socket write buffer for the Listener
  709. func (l *Listener) SetWriteBuffer(bytes int) error {
  710. if nc, ok := l.conn.(setWriteBuffer); ok {
  711. return nc.SetWriteBuffer(bytes)
  712. }
  713. return errInvalidOperation
  714. }
  715. // SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header.
  716. func (l *Listener) SetDSCP(dscp int) error {
  717. if nc, ok := l.conn.(net.Conn); ok {
  718. var succeed bool
  719. if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil {
  720. succeed = true
  721. }
  722. if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil {
  723. succeed = true
  724. }
  725. if succeed {
  726. return nil
  727. }
  728. }
  729. return errInvalidOperation
  730. }
  731. // Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
  732. func (l *Listener) Accept() (net.Conn, error) {
  733. return l.AcceptKCP()
  734. }
  735. // AcceptKCP accepts a KCP connection
  736. func (l *Listener) AcceptKCP() (*UDPSession, error) {
  737. var timeout <-chan time.Time
  738. if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
  739. timeout = time.After(tdeadline.Sub(time.Now()))
  740. }
  741. select {
  742. case <-timeout:
  743. return nil, errors.WithStack(errTimeout)
  744. case c := <-l.chAccepts:
  745. return c, nil
  746. case <-l.chSocketReadError:
  747. return nil, l.socketReadError.Load().(error)
  748. case <-l.die:
  749. return nil, errors.WithStack(io.ErrClosedPipe)
  750. }
  751. }
  752. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  753. func (l *Listener) SetDeadline(t time.Time) error {
  754. l.SetReadDeadline(t)
  755. l.SetWriteDeadline(t)
  756. return nil
  757. }
  758. // SetReadDeadline implements the Conn SetReadDeadline method.
  759. func (l *Listener) SetReadDeadline(t time.Time) error {
  760. l.rd.Store(t)
  761. return nil
  762. }
  763. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  764. func (l *Listener) SetWriteDeadline(t time.Time) error { return errInvalidOperation }
  765. // Close stops listening on the UDP address, and closes the socket
  766. func (l *Listener) Close() error {
  767. var once bool
  768. l.dieOnce.Do(func() {
  769. close(l.die)
  770. once = true
  771. })
  772. if once {
  773. return l.conn.Close()
  774. } else {
  775. return errors.WithStack(io.ErrClosedPipe)
  776. }
  777. }
  778. // closeSession notify the listener that a session has closed
  779. func (l *Listener) closeSession(remote net.Addr) (ret bool) {
  780. l.sessionLock.Lock()
  781. defer l.sessionLock.Unlock()
  782. if _, ok := l.sessions[remote.String()]; ok {
  783. delete(l.sessions, remote.String())
  784. return true
  785. }
  786. return false
  787. }
  788. // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
  789. func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
  790. // Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
  791. func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
  792. // ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption.
  793. //
  794. // 'block' is the block encryption algorithm to encrypt packets.
  795. //
  796. // 'dataShards', 'parityShards' specifiy how many parity packets will be generated following the data packets.
  797. //
  798. // Check https://github.com/klauspost/reedsolomon for details
  799. func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
  800. udpaddr, err := net.ResolveUDPAddr("udp", laddr)
  801. if err != nil {
  802. return nil, errors.WithStack(err)
  803. }
  804. conn, err := net.ListenUDP("udp", udpaddr)
  805. if err != nil {
  806. return nil, errors.WithStack(err)
  807. }
  808. return ServeConn(block, dataShards, parityShards, conn)
  809. }
  810. // ServeConn serves KCP protocol for a single packet connection.
  811. func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
  812. l := new(Listener)
  813. l.conn = conn
  814. l.sessions = make(map[string]*UDPSession)
  815. l.chAccepts = make(chan *UDPSession, acceptBacklog)
  816. l.chSessionClosed = make(chan net.Addr)
  817. l.die = make(chan struct{})
  818. l.dataShards = dataShards
  819. l.parityShards = parityShards
  820. l.block = block
  821. l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  822. l.chSocketReadError = make(chan struct{})
  823. // calculate header size
  824. if l.block != nil {
  825. l.headerSize += cryptHeaderSize
  826. }
  827. if l.fecDecoder != nil {
  828. l.headerSize += fecHeaderSizePlus2
  829. }
  830. go l.monitor()
  831. return l, nil
  832. }
  833. // Dial connects to the remote address "raddr" on the network "udp" without encryption and FEC
  834. func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
  835. // DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
  836. //
  837. // 'block' is the block encryption algorithm to encrypt packets.
  838. //
  839. // 'dataShards', 'parityShards' specifiy how many parity packets will be generated following the data packets.
  840. //
  841. // Check https://github.com/klauspost/reedsolomon for details
  842. func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
  843. // network type detection
  844. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  845. if err != nil {
  846. return nil, errors.WithStack(err)
  847. }
  848. network := "udp4"
  849. if udpaddr.IP.To4() == nil {
  850. network = "udp"
  851. }
  852. conn, err := net.ListenUDP(network, nil)
  853. if err != nil {
  854. return nil, errors.WithStack(err)
  855. }
  856. return NewConn(raddr, block, dataShards, parityShards, conn)
  857. }
  858. // NewConn3 establishes a session and talks KCP protocol over a packet connection.
  859. func NewConn3(convid uint32, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
  860. return newUDPSession(convid, dataShards, parityShards, nil, conn, raddr, block), nil
  861. }
  862. // NewConn2 establishes a session and talks KCP protocol over a packet connection.
  863. func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
  864. var convid uint32
  865. binary.Read(rand.Reader, binary.LittleEndian, &convid)
  866. return NewConn3(convid, raddr, block, dataShards, parityShards, conn)
  867. }
  868. // NewConn establishes a session and talks KCP protocol over a packet connection.
  869. func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
  870. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  871. if err != nil {
  872. return nil, errors.WithStack(err)
  873. }
  874. return NewConn2(udpaddr, block, dataShards, parityShards, conn)
  875. }
  876. func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, dataShards, parityShards int, conn *net.UDPConn) (*UDPSession, error) {
  877. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  878. if err != nil {
  879. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  880. }
  881. var pConn net.PacketConn = conn
  882. if connected {
  883. pConn = &connectedUDPConn{conn}
  884. }
  885. return newUDPSession(convid, dataShards, parityShards, nil, pConn, udpaddr, block), nil
  886. }
  887. // connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
  888. // to Write syscalls that are 4 times faster on some OS'es. This should only be
  889. // used for connections that were produced by a net.Dial* call.
  890. type connectedUDPConn struct{ *net.UDPConn }
  891. // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
  892. func (c *connectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Write(b) }