1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021 |
- // Package kcp-go is a Reliable-UDP library for golang.
- //
- // This library intents to provide a smooth, resilient, ordered,
- // error-checked and anonymous delivery of streams over UDP packets.
- //
- // The interfaces of this package aims to be compatible with
- // net.Conn in standard library, but offers powerful features for advanced users.
- package kcp
- import (
- "crypto/rand"
- "encoding/binary"
- "hash/crc32"
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "github.com/pkg/errors"
- "golang.org/x/net/ipv4"
- "golang.org/x/net/ipv6"
- )
- const (
- // 16-bytes nonce for each packet
- nonceSize = 16
- // 4-bytes packet checksum
- crcSize = 4
- // overall crypto header size
- cryptHeaderSize = nonceSize + crcSize
- // maximum packet size
- mtuLimit = 1500
- // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
- rxFECMulti = 3
- // accept backlog
- acceptBacklog = 128
- )
- var (
- errInvalidOperation = errors.New("invalid operation")
- errTimeout = errors.New("timeout")
- )
- var (
- // a system-wide packet buffer shared among sending, receiving and FEC
- // to mitigate high-frequency memory allocation for packets
- xmitBuf sync.Pool
- )
- func init() {
- xmitBuf.New = func() interface{} {
- return make([]byte, mtuLimit)
- }
- }
- type (
- // UDPSession defines a KCP session implemented by UDP
- UDPSession struct {
- updaterIdx int // record slice index in updater
- conn net.PacketConn // the underlying packet connection
- kcp *KCP // KCP ARQ protocol
- l *Listener // pointing to the Listener object if it's been accepted by a Listener
- block BlockCrypt // block encryption object
- // kcp receiving is based on packets
- // recvbuf turns packets into stream
- recvbuf []byte
- bufptr []byte
- // FEC codec
- fecDecoder *fecDecoder
- fecEncoder *fecEncoder
- // settings
- remote net.Addr // remote peer address
- rd time.Time // read deadline
- wd time.Time // write deadline
- headerSize int // the header size additional to a KCP frame
- ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
- writeDelay bool // delay kcp.flush() for Write() for bulk transfer
- dup int // duplicate udp packets(testing purpose)
- // notifications
- die chan struct{} // notify current session has Closed
- dieOnce sync.Once
- chReadEvent chan struct{} // notify Read() can be called without blocking
- chWriteEvent chan struct{} // notify Write() can be called without blocking
- // socket error handling
- socketReadError atomic.Value
- socketWriteError atomic.Value
- chSocketReadError chan struct{}
- chSocketWriteError chan struct{}
- socketReadErrorOnce sync.Once
- socketWriteErrorOnce sync.Once
- // nonce generator
- nonce Entropy
- // packets waiting to be sent on wire
- txqueue []ipv4.Message
- xconn batchConn // for x/net
- xconnWriteError error
- mu sync.Mutex
- }
- setReadBuffer interface {
- SetReadBuffer(bytes int) error
- }
- setWriteBuffer interface {
- SetWriteBuffer(bytes int) error
- }
- )
- // newUDPSession create a new udp session for client or server
- func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
- sess := new(UDPSession)
- sess.die = make(chan struct{})
- sess.nonce = new(nonceAES128)
- sess.nonce.Init()
- sess.chReadEvent = make(chan struct{}, 1)
- sess.chWriteEvent = make(chan struct{}, 1)
- sess.chSocketReadError = make(chan struct{})
- sess.chSocketWriteError = make(chan struct{})
- sess.remote = remote
- sess.conn = conn
- sess.l = l
- sess.block = block
- sess.recvbuf = make([]byte, mtuLimit)
- // cast to writebatch conn
- if _, ok := conn.(*net.UDPConn); ok {
- addr, err := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
- if err == nil {
- if addr.IP.To4() != nil {
- sess.xconn = ipv4.NewPacketConn(conn)
- } else {
- sess.xconn = ipv6.NewPacketConn(conn)
- }
- }
- }
- // FEC codec initialization
- sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
- if sess.block != nil {
- sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
- } else {
- sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
- }
- // calculate additional header size introduced by FEC and encryption
- if sess.block != nil {
- sess.headerSize += cryptHeaderSize
- }
- if sess.fecEncoder != nil {
- sess.headerSize += fecHeaderSizePlus2
- }
- sess.kcp = NewKCP(conv, func(buf []byte, size int) {
- if size >= IKCP_OVERHEAD+sess.headerSize {
- sess.output(buf[:size])
- }
- })
- sess.kcp.ReserveBytes(sess.headerSize)
- // register current session to the global updater,
- // which call sess.update() periodically.
- updater.addSession(sess)
- if sess.l == nil { // it's a client connection
- go sess.readLoop()
- atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
- } else {
- atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
- }
- currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
- maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
- if currestab > maxconn {
- atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
- }
- return sess
- }
- // Read implements net.Conn
- func (s *UDPSession) Read(b []byte) (n int, err error) {
- for {
- s.mu.Lock()
- if len(s.bufptr) > 0 { // copy from buffer into b
- n = copy(b, s.bufptr)
- s.bufptr = s.bufptr[n:]
- s.mu.Unlock()
- atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
- return n, nil
- }
- if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
- if len(b) >= size { // receive data into 'b' directly
- s.kcp.Recv(b)
- s.mu.Unlock()
- atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
- return size, nil
- }
- // if necessary resize the stream buffer to guarantee a sufficent buffer space
- if cap(s.recvbuf) < size {
- s.recvbuf = make([]byte, size)
- }
- // resize the length of recvbuf to correspond to data size
- s.recvbuf = s.recvbuf[:size]
- s.kcp.Recv(s.recvbuf)
- n = copy(b, s.recvbuf) // copy to 'b'
- s.bufptr = s.recvbuf[n:] // pointer update
- s.mu.Unlock()
- atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
- return n, nil
- }
- // deadline for current reading operation
- var timeout *time.Timer
- var c <-chan time.Time
- if !s.rd.IsZero() {
- if time.Now().After(s.rd) {
- s.mu.Unlock()
- return 0, errors.WithStack(errTimeout)
- }
- delay := s.rd.Sub(time.Now())
- timeout = time.NewTimer(delay)
- c = timeout.C
- }
- s.mu.Unlock()
- // wait for read event or timeout or error
- select {
- case <-s.chReadEvent:
- if timeout != nil {
- timeout.Stop()
- }
- case <-c:
- return 0, errors.WithStack(errTimeout)
- case <-s.chSocketReadError:
- return 0, s.socketReadError.Load().(error)
- case <-s.die:
- return 0, errors.WithStack(io.ErrClosedPipe)
- }
- }
- }
- // Write implements net.Conn
- func (s *UDPSession) Write(b []byte) (n int, err error) { return s.WriteBuffers([][]byte{b}) }
- // WriteBuffers write a vector of byte slices to the underlying connection
- func (s *UDPSession) WriteBuffers(v [][]byte) (n int, err error) {
- for {
- select {
- case <-s.chSocketWriteError:
- return 0, s.socketWriteError.Load().(error)
- case <-s.die:
- return 0, errors.WithStack(io.ErrClosedPipe)
- default:
- }
- s.mu.Lock()
- if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
- for _, b := range v {
- n += len(b)
- for {
- if len(b) <= int(s.kcp.mss) {
- s.kcp.Send(b)
- break
- } else {
- s.kcp.Send(b[:s.kcp.mss])
- b = b[s.kcp.mss:]
- }
- }
- }
- if s.kcp.WaitSnd() >= int(s.kcp.snd_wnd) || !s.writeDelay {
- s.kcp.flush(false)
- s.uncork()
- }
- s.mu.Unlock()
- atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
- return n, nil
- }
- var timeout *time.Timer
- var c <-chan time.Time
- if !s.wd.IsZero() {
- if time.Now().After(s.wd) {
- s.mu.Unlock()
- return 0, errors.WithStack(errTimeout)
- }
- delay := s.wd.Sub(time.Now())
- timeout = time.NewTimer(delay)
- c = timeout.C
- }
- s.mu.Unlock()
- select {
- case <-s.chWriteEvent:
- if timeout != nil {
- timeout.Stop()
- }
- case <-c:
- return 0, errors.WithStack(errTimeout)
- case <-s.chSocketWriteError:
- return 0, s.socketWriteError.Load().(error)
- case <-s.die:
- return 0, errors.WithStack(io.ErrClosedPipe)
- }
- }
- }
- // uncork sends data in txqueue if there is any
- func (s *UDPSession) uncork() {
- if len(s.txqueue) > 0 {
- s.tx(s.txqueue)
- s.txqueue = s.txqueue[:0]
- }
- return
- }
- // Close closes the connection.
- func (s *UDPSession) Close() error {
- var once bool
- s.dieOnce.Do(func() {
- close(s.die)
- once = true
- })
- if once {
- // remove from updater
- updater.removeSession(s)
- atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
- if s.l != nil { // belongs to listener
- s.l.closeSession(s.remote)
- return nil
- } else { // client socket close
- return s.conn.Close()
- }
- } else {
- return errors.WithStack(io.ErrClosedPipe)
- }
- }
- // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
- func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
- // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
- func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
- // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
- func (s *UDPSession) SetDeadline(t time.Time) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.rd = t
- s.wd = t
- s.notifyReadEvent()
- s.notifyWriteEvent()
- return nil
- }
- // SetReadDeadline implements the Conn SetReadDeadline method.
- func (s *UDPSession) SetReadDeadline(t time.Time) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.rd = t
- s.notifyReadEvent()
- return nil
- }
- // SetWriteDeadline implements the Conn SetWriteDeadline method.
- func (s *UDPSession) SetWriteDeadline(t time.Time) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.wd = t
- s.notifyWriteEvent()
- return nil
- }
- // SetWriteDelay delays write for bulk transfer until the next update interval
- func (s *UDPSession) SetWriteDelay(delay bool) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.writeDelay = delay
- }
- // SetWindowSize set maximum window size
- func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.kcp.WndSize(sndwnd, rcvwnd)
- }
- // SetMtu sets the maximum transmission unit(not including UDP header)
- func (s *UDPSession) SetMtu(mtu int) bool {
- if mtu > mtuLimit {
- return false
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- s.kcp.SetMtu(mtu)
- return true
- }
- // SetStreamMode toggles the stream mode on/off
- func (s *UDPSession) SetStreamMode(enable bool) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if enable {
- s.kcp.stream = 1
- } else {
- s.kcp.stream = 0
- }
- }
- // SetACKNoDelay changes ack flush option, set true to flush ack immediately,
- func (s *UDPSession) SetACKNoDelay(nodelay bool) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.ackNoDelay = nodelay
- }
- // (deprecated)
- //
- // SetDUP duplicates udp packets for kcp output.
- func (s *UDPSession) SetDUP(dup int) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.dup = dup
- }
- // SetNoDelay calls nodelay() of kcp
- // https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
- func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.kcp.NoDelay(nodelay, interval, resend, nc)
- }
- // SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header.
- //
- // It has no effect if it's accepted from Listener.
- func (s *UDPSession) SetDSCP(dscp int) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.l != nil {
- return errInvalidOperation
- }
- if nc, ok := s.conn.(net.Conn); ok {
- var succeed bool
- if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil {
- succeed = true
- }
- if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil {
- succeed = true
- }
- if succeed {
- return nil
- }
- }
- return errInvalidOperation
- }
- // SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
- func (s *UDPSession) SetReadBuffer(bytes int) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.l == nil {
- if nc, ok := s.conn.(setReadBuffer); ok {
- return nc.SetReadBuffer(bytes)
- }
- }
- return errInvalidOperation
- }
- // SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
- func (s *UDPSession) SetWriteBuffer(bytes int) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.l == nil {
- if nc, ok := s.conn.(setWriteBuffer); ok {
- return nc.SetWriteBuffer(bytes)
- }
- }
- return errInvalidOperation
- }
- // post-processing for sending a packet from kcp core
- // steps:
- // 1. FEC packet generation
- // 2. CRC32 integrity
- // 3. Encryption
- // 4. TxQueue
- func (s *UDPSession) output(buf []byte) {
- var ecc [][]byte
- // 1. FEC encoding
- if s.fecEncoder != nil {
- ecc = s.fecEncoder.encode(buf)
- }
- // 2&3. crc32 & encryption
- if s.block != nil {
- s.nonce.Fill(buf[:nonceSize])
- checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
- binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
- s.block.Encrypt(buf, buf)
- for k := range ecc {
- s.nonce.Fill(ecc[k][:nonceSize])
- checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
- binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
- s.block.Encrypt(ecc[k], ecc[k])
- }
- }
- // 4. TxQueue
- var msg ipv4.Message
- for i := 0; i < s.dup+1; i++ {
- bts := xmitBuf.Get().([]byte)[:len(buf)]
- copy(bts, buf)
- msg.Buffers = [][]byte{bts}
- msg.Addr = s.remote
- s.txqueue = append(s.txqueue, msg)
- }
- for k := range ecc {
- bts := xmitBuf.Get().([]byte)[:len(ecc[k])]
- copy(bts, ecc[k])
- msg.Buffers = [][]byte{bts}
- msg.Addr = s.remote
- s.txqueue = append(s.txqueue, msg)
- }
- }
- // kcp update, returns interval for next calling
- func (s *UDPSession) update() (interval time.Duration) {
- s.mu.Lock()
- waitsnd := s.kcp.WaitSnd()
- interval = time.Duration(s.kcp.flush(false)) * time.Millisecond
- if s.kcp.WaitSnd() < waitsnd {
- s.notifyWriteEvent()
- }
- s.uncork()
- s.mu.Unlock()
- return
- }
- // GetConv gets conversation id of a session
- func (s *UDPSession) GetConv() uint32 { return s.kcp.conv }
- func (s *UDPSession) notifyReadEvent() {
- select {
- case s.chReadEvent <- struct{}{}:
- default:
- }
- }
- func (s *UDPSession) notifyWriteEvent() {
- select {
- case s.chWriteEvent <- struct{}{}:
- default:
- }
- }
- func (s *UDPSession) notifyReadError(err error) {
- s.socketReadErrorOnce.Do(func() {
- s.socketReadError.Store(err)
- close(s.chSocketReadError)
- })
- }
- func (s *UDPSession) notifyWriteError(err error) {
- s.socketWriteErrorOnce.Do(func() {
- s.socketWriteError.Store(err)
- close(s.chSocketWriteError)
- })
- }
- // packet input stage
- func (s *UDPSession) packetInput(data []byte) {
- dataValid := false
- if s.block != nil {
- s.block.Decrypt(data, data)
- data = data[nonceSize:]
- checksum := crc32.ChecksumIEEE(data[crcSize:])
- if checksum == binary.LittleEndian.Uint32(data) {
- data = data[crcSize:]
- dataValid = true
- } else {
- atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
- }
- } else if s.block == nil {
- dataValid = true
- }
- if dataValid {
- s.kcpInput(data)
- }
- }
- func (s *UDPSession) kcpInput(data []byte) {
- var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
- if s.fecDecoder != nil {
- if len(data) > fecHeaderSize { // must be larger than fec header size
- f := fecPacket(data)
- if f.flag() == typeData || f.flag() == typeParity { // header check
- if f.flag() == typeParity {
- fecParityShards++
- }
- recovers := s.fecDecoder.decode(f)
- s.mu.Lock()
- waitsnd := s.kcp.WaitSnd()
- if f.flag() == typeData {
- if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
- kcpInErrors++
- }
- }
- for _, r := range recovers {
- if len(r) >= 2 { // must be larger than 2bytes
- sz := binary.LittleEndian.Uint16(r)
- if int(sz) <= len(r) && sz >= 2 {
- if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
- fecRecovered++
- } else {
- kcpInErrors++
- }
- } else {
- fecErrs++
- }
- } else {
- fecErrs++
- }
- // recycle the recovers
- xmitBuf.Put(r)
- }
- // to notify the readers to receive the data
- if n := s.kcp.PeekSize(); n > 0 {
- s.notifyReadEvent()
- }
- // to notify the writers when queue is shorter(e.g. ACKed)
- if s.kcp.WaitSnd() < waitsnd {
- s.notifyWriteEvent()
- }
- s.uncork()
- s.mu.Unlock()
- } else {
- atomic.AddUint64(&DefaultSnmp.InErrs, 1)
- }
- } else {
- atomic.AddUint64(&DefaultSnmp.InErrs, 1)
- }
- } else {
- s.mu.Lock()
- waitsnd := s.kcp.WaitSnd()
- if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
- kcpInErrors++
- }
- if n := s.kcp.PeekSize(); n > 0 {
- s.notifyReadEvent()
- }
- if s.kcp.WaitSnd() < waitsnd {
- s.notifyWriteEvent()
- }
- s.uncork()
- s.mu.Unlock()
- }
- atomic.AddUint64(&DefaultSnmp.InPkts, 1)
- atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
- if fecParityShards > 0 {
- atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
- }
- if kcpInErrors > 0 {
- atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
- }
- if fecErrs > 0 {
- atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
- }
- if fecRecovered > 0 {
- atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
- }
- }
- type (
- // Listener defines a server which will be waiting to accept incoming connections
- Listener struct {
- block BlockCrypt // block encryption
- dataShards int // FEC data shard
- parityShards int // FEC parity shard
- fecDecoder *fecDecoder // FEC mock initialization
- conn net.PacketConn // the underlying packet connection
- sessions map[string]*UDPSession // all sessions accepted by this Listener
- sessionLock sync.Mutex
- chAccepts chan *UDPSession // Listen() backlog
- chSessionClosed chan net.Addr // session close queue
- headerSize int // the additional header to a KCP frame
- die chan struct{} // notify the listener has closed
- dieOnce sync.Once
- // socket error handling
- socketReadError atomic.Value
- chSocketReadError chan struct{}
- socketReadErrorOnce sync.Once
- rd atomic.Value // read deadline for Accept()
- }
- )
- // packet input stage
- func (l *Listener) packetInput(data []byte, addr net.Addr) {
- dataValid := false
- if l.block != nil {
- l.block.Decrypt(data, data)
- data = data[nonceSize:]
- checksum := crc32.ChecksumIEEE(data[crcSize:])
- if checksum == binary.LittleEndian.Uint32(data) {
- data = data[crcSize:]
- dataValid = true
- } else {
- atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
- }
- } else if l.block == nil {
- dataValid = true
- }
- if dataValid {
- l.sessionLock.Lock()
- s, ok := l.sessions[addr.String()]
- l.sessionLock.Unlock()
- if !ok { // new address:port
- if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
- var conv uint32
- convValid := false
- if l.fecDecoder != nil {
- isfec := binary.LittleEndian.Uint16(data[4:])
- if isfec == typeData {
- conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
- convValid = true
- }
- } else {
- conv = binary.LittleEndian.Uint32(data)
- convValid = true
- }
- if convValid { // creates a new session only if the 'conv' field in kcp is accessible
- s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, addr, l.block)
- s.kcpInput(data)
- l.sessionLock.Lock()
- l.sessions[addr.String()] = s
- l.sessionLock.Unlock()
- l.chAccepts <- s
- }
- }
- } else {
- s.kcpInput(data)
- }
- }
- }
- func (l *Listener) notifyReadError(err error) {
- l.socketReadErrorOnce.Do(func() {
- l.socketReadError.Store(err)
- close(l.chSocketReadError)
- // propagate read error to all sessions
- l.sessionLock.Lock()
- for _, s := range l.sessions {
- s.notifyReadError(err)
- }
- l.sessionLock.Unlock()
- })
- }
- // SetReadBuffer sets the socket read buffer for the Listener
- func (l *Listener) SetReadBuffer(bytes int) error {
- if nc, ok := l.conn.(setReadBuffer); ok {
- return nc.SetReadBuffer(bytes)
- }
- return errInvalidOperation
- }
- // SetWriteBuffer sets the socket write buffer for the Listener
- func (l *Listener) SetWriteBuffer(bytes int) error {
- if nc, ok := l.conn.(setWriteBuffer); ok {
- return nc.SetWriteBuffer(bytes)
- }
- return errInvalidOperation
- }
- // SetDSCP sets the 6bit DSCP field in IPv4 header, or 8bit Traffic Class in IPv6 header.
- func (l *Listener) SetDSCP(dscp int) error {
- if nc, ok := l.conn.(net.Conn); ok {
- var succeed bool
- if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err == nil {
- succeed = true
- }
- if err := ipv6.NewConn(nc).SetTrafficClass(dscp); err == nil {
- succeed = true
- }
- if succeed {
- return nil
- }
- }
- return errInvalidOperation
- }
- // Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
- func (l *Listener) Accept() (net.Conn, error) {
- return l.AcceptKCP()
- }
- // AcceptKCP accepts a KCP connection
- func (l *Listener) AcceptKCP() (*UDPSession, error) {
- var timeout <-chan time.Time
- if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
- timeout = time.After(tdeadline.Sub(time.Now()))
- }
- select {
- case <-timeout:
- return nil, errors.WithStack(errTimeout)
- case c := <-l.chAccepts:
- return c, nil
- case <-l.chSocketReadError:
- return nil, l.socketReadError.Load().(error)
- case <-l.die:
- return nil, errors.WithStack(io.ErrClosedPipe)
- }
- }
- // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
- func (l *Listener) SetDeadline(t time.Time) error {
- l.SetReadDeadline(t)
- l.SetWriteDeadline(t)
- return nil
- }
- // SetReadDeadline implements the Conn SetReadDeadline method.
- func (l *Listener) SetReadDeadline(t time.Time) error {
- l.rd.Store(t)
- return nil
- }
- // SetWriteDeadline implements the Conn SetWriteDeadline method.
- func (l *Listener) SetWriteDeadline(t time.Time) error { return errInvalidOperation }
- // Close stops listening on the UDP address, and closes the socket
- func (l *Listener) Close() error {
- var once bool
- l.dieOnce.Do(func() {
- close(l.die)
- once = true
- })
- if once {
- return l.conn.Close()
- } else {
- return errors.WithStack(io.ErrClosedPipe)
- }
- }
- // closeSession notify the listener that a session has closed
- func (l *Listener) closeSession(remote net.Addr) (ret bool) {
- l.sessionLock.Lock()
- defer l.sessionLock.Unlock()
- if _, ok := l.sessions[remote.String()]; ok {
- delete(l.sessions, remote.String())
- return true
- }
- return false
- }
- // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
- func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
- // Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
- func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
- // ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption.
- //
- // 'block' is the block encryption algorithm to encrypt packets.
- //
- // 'dataShards', 'parityShards' specifiy how many parity packets will be generated following the data packets.
- //
- // Check https://github.com/klauspost/reedsolomon for details
- func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
- udpaddr, err := net.ResolveUDPAddr("udp", laddr)
- if err != nil {
- return nil, errors.WithStack(err)
- }
- conn, err := net.ListenUDP("udp", udpaddr)
- if err != nil {
- return nil, errors.WithStack(err)
- }
- return ServeConn(block, dataShards, parityShards, conn)
- }
- // ServeConn serves KCP protocol for a single packet connection.
- func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
- l := new(Listener)
- l.conn = conn
- l.sessions = make(map[string]*UDPSession)
- l.chAccepts = make(chan *UDPSession, acceptBacklog)
- l.chSessionClosed = make(chan net.Addr)
- l.die = make(chan struct{})
- l.dataShards = dataShards
- l.parityShards = parityShards
- l.block = block
- l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
- l.chSocketReadError = make(chan struct{})
- // calculate header size
- if l.block != nil {
- l.headerSize += cryptHeaderSize
- }
- if l.fecDecoder != nil {
- l.headerSize += fecHeaderSizePlus2
- }
- go l.monitor()
- return l, nil
- }
- // Dial connects to the remote address "raddr" on the network "udp" without encryption and FEC
- func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
- // DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
- //
- // 'block' is the block encryption algorithm to encrypt packets.
- //
- // 'dataShards', 'parityShards' specifiy how many parity packets will be generated following the data packets.
- //
- // Check https://github.com/klauspost/reedsolomon for details
- func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
- // network type detection
- udpaddr, err := net.ResolveUDPAddr("udp", raddr)
- if err != nil {
- return nil, errors.WithStack(err)
- }
- network := "udp4"
- if udpaddr.IP.To4() == nil {
- network = "udp"
- }
- conn, err := net.ListenUDP(network, nil)
- if err != nil {
- return nil, errors.WithStack(err)
- }
- return NewConn(raddr, block, dataShards, parityShards, conn)
- }
- // NewConn3 establishes a session and talks KCP protocol over a packet connection.
- func NewConn3(convid uint32, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
- return newUDPSession(convid, dataShards, parityShards, nil, conn, raddr, block), nil
- }
- // NewConn2 establishes a session and talks KCP protocol over a packet connection.
- func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
- var convid uint32
- binary.Read(rand.Reader, binary.LittleEndian, &convid)
- return NewConn3(convid, raddr, block, dataShards, parityShards, conn)
- }
- // NewConn establishes a session and talks KCP protocol over a packet connection.
- func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
- udpaddr, err := net.ResolveUDPAddr("udp", raddr)
- if err != nil {
- return nil, errors.WithStack(err)
- }
- return NewConn2(udpaddr, block, dataShards, parityShards, conn)
- }
- func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, dataShards, parityShards int, conn *net.UDPConn) (*UDPSession, error) {
- udpaddr, err := net.ResolveUDPAddr("udp", raddr)
- if err != nil {
- return nil, errors.Wrap(err, "net.ResolveUDPAddr")
- }
- var pConn net.PacketConn = conn
- if connected {
- pConn = &connectedUDPConn{conn}
- }
- return newUDPSession(convid, dataShards, parityShards, nil, pConn, udpaddr, block), nil
- }
- // connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
- // to Write syscalls that are 4 times faster on some OS'es. This should only be
- // used for connections that were produced by a net.Dial* call.
- type connectedUDPConn struct{ *net.UDPConn }
- // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
- func (c *connectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Write(b) }
|