123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- package udptransfer
- import (
- "errors"
- "log"
- "net"
- "sync"
- "sync/atomic"
- "time"
- )
- const (
- _10ms = time.Millisecond * 10
- _100ms = time.Millisecond * 100
- )
- const (
- _FIN_ACK_SEQ uint32 = 0xffFF0000
- _INVALID_SEQ uint32 = 0xffFFffFF
- )
- var (
- ErrIOTimeout error = &TimeoutError{}
- ErrUnknown = errors.New("Unknown error")
- ErrInexplicableData = errors.New("Inexplicable data")
- ErrTooManyAttempts = errors.New("Too many attempts to connect")
- )
- type TimeoutError struct{}
- func (e *TimeoutError) Error() string { return "i/o timeout" }
- func (e *TimeoutError) Timeout() bool { return true }
- func (e *TimeoutError) Temporary() bool { return true }
- type Conn struct {
- sock *net.UDPConn
- dest *net.UDPAddr
- edp *Endpoint
- connID connID // 8 bytes
- // events
- evRecv chan []byte
- evRead chan byte
- evSend chan byte
- evSWnd chan byte
- evAck chan byte
- evClose chan byte
- // protocol state
- inlock sync.Mutex
- outlock sync.Mutex
- state int32
- mySeq uint32
- swnd int32
- cwnd int32
- missed int32
- outPending int32
- lastAck uint32
- lastAckTime int64
- lastAckTime2 int64
- lastShrink int64
- lastRstMis int64
- ato int64
- rto int64
- rtt int64
- srtt int64
- mdev int64
- rtmo int64
- wtmo int64
- tSlot int64
- tSlotT0 int64
- lastSErr int64
- // queue
- outQ *linkedMap
- inQ *linkedMap
- inQReady []byte
- inQDirty bool
- lastReadSeq uint32 // last user read seq
- // params
- bandwidth int64
- fastRetransmit bool
- flatTraffic bool
- mss int
- // statistics
- urgent int
- inPkCnt int
- inDupCnt int
- outPkCnt int
- outDupCnt int
- fRCnt int
- }
- func NewConn(e *Endpoint, dest *net.UDPAddr, id connID) *Conn {
- c := &Conn{
- sock: e.udpconn,
- dest: dest,
- edp: e,
- connID: id,
- evRecv: make(chan []byte, 128),
- evRead: make(chan byte, 1),
- evSWnd: make(chan byte, 2),
- evSend: make(chan byte, 4),
- evAck: make(chan byte, 1),
- evClose: make(chan byte, 2),
- outQ: newLinkedMap(_QModeOut),
- inQ: newLinkedMap(_QModeIn),
- }
- p := e.params
- c.bandwidth = p.Bandwidth
- c.fastRetransmit = p.FastRetransmit
- c.flatTraffic = p.FlatTraffic
- c.mss = _MSS
- if dest.IP.To4() == nil {
- // typical ipv6 header length=40
- c.mss -= 20
- }
- return c
- }
- func (c *Conn) initConnection(buf []byte) (err error) {
- if buf == nil {
- err = c.initDialing()
- } else { //server
- err = c.acceptConnection(buf[_TH_SIZE:])
- }
- if err != nil {
- return
- }
- if c.state == _S_EST1 {
- c.lastReadSeq = c.lastAck
- c.inQ.maxCtnSeq = c.lastAck
- c.rtt = maxI64(c.rtt, _MIN_RTT)
- c.mdev = c.rtt << 1
- c.srtt = c.rtt << 3
- c.rto = maxI64(c.rtt*2, _MIN_RTO)
- c.ato = maxI64(c.rtt>>4, _MIN_ATO)
- c.ato = minI64(c.ato, _MAX_ATO)
- // initial cwnd
- c.swnd = calSwnd(c.bandwidth, c.rtt) >> 1
- c.cwnd = 8
- go c.internalRecvLoop()
- go c.internalSendLoop()
- go c.internalAckLoop()
- if debug >= 0 {
- go c.internal_state()
- }
- return nil
- } else {
- return ErrUnknown
- }
- }
- func (c *Conn) initDialing() error {
- // first syn
- pk := &packet{
- seq: c.mySeq,
- flag: _F_SYN,
- }
- item := nodeOf(pk)
- var buf []byte
- c.state = _S_SYN0
- t0 := Now()
- for i := 0; i < _MAX_RETRIES && c.state == _S_SYN0; i++ {
- // send syn
- c.internalWrite(item)
- select {
- case buf = <-c.evRecv:
- c.rtt = Now() - t0
- c.state = _S_SYN1
- c.connID.setRid(buf)
- buf = buf[_TH_SIZE:]
- case <-time.After(time.Second):
- continue
- }
- }
- if c.state == _S_SYN0 {
- return ErrTooManyAttempts
- }
- unmarshall(pk, buf)
- // expected syn+ack
- if pk.flag == _F_SYN|_F_ACK && pk.ack == c.mySeq {
- if scnt := pk.scnt - 1; scnt > 0 {
- c.rtt -= int64(scnt) * 1e3
- }
- log.Println("rtt", c.rtt)
- c.state = _S_EST0
- // build ack3
- pk.scnt = 0
- pk.ack = pk.seq
- pk.flag = _F_ACK
- item := nodeOf(pk)
- // send ack3
- c.internalWrite(item)
- // update lastAck
- c.logAck(pk.ack)
- c.state = _S_EST1
- return nil
- } else {
- return ErrInexplicableData
- }
- }
- func (c *Conn) acceptConnection(buf []byte) error {
- var pk = new(packet)
- var item *qNode
- unmarshall(pk, buf)
- // expected syn
- if pk.flag == _F_SYN {
- c.state = _S_SYN1
- // build syn+ack
- pk.ack = pk.seq
- pk.seq = c.mySeq
- pk.flag |= _F_ACK
- // update lastAck
- c.logAck(pk.ack)
- item = nodeOf(pk)
- item.scnt = pk.scnt - 1
- } else {
- dumpb("Syn1 ?", buf)
- return ErrInexplicableData
- }
- for i := 0; i < 5 && c.state == _S_SYN1; i++ {
- t0 := Now()
- // reply syn+ack
- c.internalWrite(item)
- // recv ack3
- select {
- case buf = <-c.evRecv:
- c.state = _S_EST0
- c.rtt = Now() - t0
- buf = buf[_TH_SIZE:]
- log.Println("rtt", c.rtt)
- case <-time.After(time.Second):
- continue
- }
- }
- if c.state == _S_SYN1 {
- return ErrTooManyAttempts
- }
- pk = new(packet)
- unmarshall(pk, buf)
- // expected ack3
- if pk.flag == _F_ACK && pk.ack == c.mySeq {
- c.state = _S_EST1
- } else {
- // if ack3 lost, resend syn+ack 3-times
- // and drop these coming data
- if pk.flag&_F_DATA != 0 && pk.seq > c.lastAck {
- c.internalWrite(item)
- c.state = _S_EST1
- } else {
- dumpb("Ack3 ?", buf)
- return ErrInexplicableData
- }
- }
- return nil
- }
- // 20,20,20,20, 100,100,100,100, 1s,1s,1s,1s
- func selfSpinWait(fn func() bool) error {
- const _MAX_SPIN = 12
- for i := 0; i < _MAX_SPIN; i++ {
- if fn() {
- return nil
- } else if i <= 3 {
- time.Sleep(_10ms * 2)
- } else if i <= 7 {
- time.Sleep(_100ms)
- } else {
- time.Sleep(time.Second)
- }
- }
- return ErrIOTimeout
- }
- func (c *Conn) IsClosed() bool {
- return atomic.LoadInt32(&c.state) <= _S_FIN1
- }
- /*
- active close:
- 1 <- send fin-W: closeW()
- before sending, ensure all outQ items has beed sent out and all of them has been acked.
- 2 -> wait to recv ack{fin-W}
- then trigger closeR, including send fin-R and wait to recv ack{fin-R}
- passive close:
- -> fin:
- if outQ is not empty then self-spin wait.
- if outQ empty, send ack{fin-W} then goto closeW().
- */
- func (c *Conn) Close() (err error) {
- if !atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN0) {
- return selfSpinWait(func() bool {
- return atomic.LoadInt32(&c.state) == _S_FIN
- })
- }
- var err0 error
- err0 = c.closeW()
- // waiting for fin-2 of peer
- err = selfSpinWait(func() bool {
- select {
- case v := <-c.evClose:
- if v == _S_FIN {
- return true
- } else {
- time.AfterFunc(_100ms, func() { c.evClose <- v })
- }
- default:
- }
- return false
- })
- defer c.afterShutdown()
- if err != nil {
- // backup path for wait ack(finW) timeout
- c.closeR(nil)
- }
- if err0 != nil {
- return err0
- } else {
- return
- }
- }
- func (c *Conn) beforeCloseW() (err error) {
- // check outQ was empty and all has been acked.
- // self-spin waiting
- for i := 0; i < 2; i++ {
- err = selfSpinWait(func() bool {
- return atomic.LoadInt32(&c.outPending) <= 0
- })
- if err == nil {
- break
- }
- }
- // send fin, reliably
- c.outlock.Lock()
- c.mySeq++
- c.outPending++
- pk := &packet{seq: c.mySeq, flag: _F_FIN}
- item := nodeOf(pk)
- c.outQ.appendTail(item)
- c.internalWrite(item)
- c.outlock.Unlock()
- c.evSWnd <- _VSWND_ACTIVE
- return
- }
- func (c *Conn) closeW() (err error) {
- // close resource of sending
- defer c.afterCloseW()
- // send fin
- err = c.beforeCloseW()
- var closed bool
- var max = 20
- if c.rtt > 200 {
- max = int(c.rtt) / 10
- }
- // waiting for outQ means:
- // 1. all outQ has been acked, for passive
- // 2. fin has been acked, for active
- for i := 0; i < max && (atomic.LoadInt32(&c.outPending) > 0 || !closed); i++ {
- select {
- case v := <-c.evClose:
- if v == _S_FIN0 {
- // namely, last fin has been acked.
- closed = true
- } else {
- time.AfterFunc(_100ms, func() { c.evClose <- v })
- }
- case <-time.After(_100ms):
- }
- }
- if closed || err != nil {
- return
- } else {
- return ErrIOTimeout
- }
- }
- func (c *Conn) afterCloseW() {
- // can't close(c.evRecv), avoid endpoint dispatch exception
- // stop pending inputAndSend
- select {
- case c.evSend <- _CLOSE:
- default:
- }
- // stop internalSendLoop
- c.evSWnd <- _CLOSE
- }
- // called by active and passive close()
- func (c *Conn) afterShutdown() {
- // stop internalRecvLoop
- c.evRecv <- nil
- // remove registry
- c.edp.removeConn(c.connID, c.dest)
- log.Println("shutdown", c.state)
- }
- // trigger by reset
- func (c *Conn) forceShutdownWithLock() {
- c.outlock.Lock()
- defer c.outlock.Unlock()
- c.forceShutdown()
- }
- // called by:
- // 1/ send exception
- // 2/ recv reset
- // drop outQ and force shutdown
- func (c *Conn) forceShutdown() {
- if atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN) {
- defer c.afterShutdown()
- // stop sender
- for i := 0; i < cap(c.evSend); i++ {
- select {
- case <-c.evSend:
- default:
- }
- }
- select {
- case c.evSend <- _CLOSE:
- default:
- }
- c.outQ.reset()
- // stop reader
- close(c.evRead)
- c.inQ.reset()
- // stop internalLoops
- c.evSWnd <- _CLOSE
- c.evAck <- _CLOSE
- //log.Println("force shutdown")
- }
- }
- // for sending fin failed
- func (c *Conn) fakeShutdown() {
- select {
- case c.evClose <- _S_FIN0:
- default:
- }
- }
- func (c *Conn) closeR(pk *packet) {
- var passive = true
- for {
- state := atomic.LoadInt32(&c.state)
- switch state {
- case _S_FIN:
- return
- case _S_FIN1: // multiple FIN, maybe lost
- c.passiveCloseReply(pk, false)
- return
- case _S_FIN0: // active close preformed
- passive = false
- }
- if !atomic.CompareAndSwapInt32(&c.state, state, _S_FIN1) {
- continue
- }
- c.passiveCloseReply(pk, true)
- break
- }
- // here, R is closed.
- // ^^^^^^^^^^^^^^^^^^^^^
- if passive {
- // passive closing call closeW contains sending fin and recv ack
- // may the ack of fin-2 was lost, then the closeW will timeout
- c.closeW()
- }
- // here, R,W both were closed.
- // ^^^^^^^^^^^^^^^^^^^^^
- atomic.StoreInt32(&c.state, _S_FIN)
- // stop internalAckLoop
- c.evAck <- _CLOSE
- if passive {
- // close evRecv within here
- c.afterShutdown()
- } else {
- // notify active close thread
- select {
- case c.evClose <- _S_FIN:
- default:
- }
- }
- }
- func (c *Conn) passiveCloseReply(pk *packet, first bool) {
- if pk != nil && pk.flag&_F_FIN != 0 {
- if first {
- c.checkInQ(pk)
- close(c.evRead)
- }
- // ack the FIN
- pk = &packet{seq: _FIN_ACK_SEQ, ack: pk.seq, flag: _F_ACK}
- item := nodeOf(pk)
- c.internalWrite(item)
- }
- }
- // check inQ ends orderly, and copy queue data to user space
- func (c *Conn) checkInQ(pk *packet) {
- if nil != selfSpinWait(func() bool {
- return c.inQ.maxCtnSeq+1 == pk.seq
- }) { // timeout for waiting inQ to finish
- return
- }
- c.inlock.Lock()
- defer c.inlock.Unlock()
- if c.inQ.size() > 0 {
- for i := c.inQ.head; i != nil; i = i.next {
- c.inQReady = append(c.inQReady, i.payload...)
- }
- }
- }
|