package udptransfer import ( "encoding/binary" "fmt" "io" "log" "net" "time" ) const ( _MAX_RETRIES = 6 _MIN_RTT = 8 _MIN_RTO = 30 _MIN_ATO = 2 _MAX_ATO = 10 _MIN_SWND = 10 _MAX_SWND = 960 ) const ( _VACK_SCHED = iota + 1 _VACK_QUICK _VACK_MUST _VSWND_ACTIVE _VRETR_IMMED ) const ( _RETR_REST = -1 _CLOSE = 0xff ) var debug int func nodeOf(pk *packet) *qNode { return &qNode{packet: pk} } func (c *Conn) internalRecvLoop() { defer func() { // avoid send to closed channel while some replaying // data packets were received in shutting down. _ = recover() }() var buf, body []byte for { select { case buf = <-c.evRecv: if buf != nil { body = buf[_TH_SIZE:] } else { // shutdown return } } pk := new(packet) // keep the original buffer, so we could recycle it in future pk.buffer = buf unmarshall(pk, body) if pk.flag&_F_SACK != 0 { c.processSAck(pk) continue } if pk.flag&_F_ACK != 0 { c.processAck(pk) } if pk.flag&_F_DATA != 0 { c.insertData(pk) } else if pk.flag&_F_FIN != 0 { if pk.flag&_F_RESET != 0 { go c.forceShutdownWithLock() } else { go c.closeR(pk) } } } } func (c *Conn) internalSendLoop() { var timer = time.NewTimer(time.Duration(c.rtt) * time.Millisecond) for { select { case v := <-c.evSWnd: switch v { case _VRETR_IMMED: c.outlock.Lock() c.retransmit2() c.outlock.Unlock() case _VSWND_ACTIVE: timer.Reset(time.Duration(c.rtt) * time.Millisecond) case _CLOSE: return } case <-timer.C: // timeout yet var notifySender bool c.outlock.Lock() rest, _ := c.retransmit() switch rest { case _RETR_REST, 0: // nothing to send if c.outQ.size() > 0 { timer.Reset(time.Duration(c.rtt) * time.Millisecond) } else { timer.Stop() // avoid sender blocking notifySender = true } default: // recent rto point timer.Reset(time.Duration(minI64(rest, c.rtt)) * time.Millisecond) } c.outlock.Unlock() if notifySender { select { case c.evSend <- 1: default: } } } } } func (c *Conn) internalAckLoop() { // var ackTimer = time.NewTicker(time.Duration(c.ato)) var ackTimer = time.NewTimer(time.Duration(c.ato) * time.Millisecond) var lastAckState byte for { var v byte select { case <-ackTimer.C: // may cause sending duplicated ack if ato>rtt v = _VACK_QUICK case v = <-c.evAck: ackTimer.Reset(time.Duration(c.ato) * time.Millisecond) state := lastAckState lastAckState = v if state != v { if v == _CLOSE { return } v = _VACK_MUST } } c.inlock.Lock() if pkAck := c.makeAck(v); pkAck != nil { c.internalWrite(nodeOf(pkAck)) } c.inlock.Unlock() } } func (c *Conn) retransmit() (rest int64, count int32) { var now, rto = Now(), c.rto var limit = c.cwnd for item := c.outQ.head; item != nil && limit > 0; item = item.next { if item.scnt != _SENT_OK { // ACKed has scnt==-1 diff := now - item.sent if diff > rto { // already rto c.internalWrite(item) count++ } else { // continue search next min rto duration if rest > 0 { rest = minI64(rest, rto-diff+1) } else { rest = rto - diff + 1 } limit-- } } } c.outDupCnt += int(count) if count > 0 { shrcond := (c.fastRetransmit && count > maxI32(c.cwnd>>5, 4)) || (!c.fastRetransmit && count > c.cwnd>>3) if shrcond && now-c.lastShrink > c.rto { log.Printf("shrink cwnd from=%d to=%d s/4=%d", c.cwnd, c.cwnd>>1, c.swnd>>2) c.lastShrink = now // shrink cwnd and ensure cwnd >= swnd/4 if c.cwnd > c.swnd>>1 { c.cwnd >>= 1 } } } if c.outQ.size() > 0 { return } return _RETR_REST, 0 } func (c *Conn) retransmit2() (count int32) { var limit, now = minI32(c.outPending>>4, 8), Now() var fRtt = c.rtt if now-c.lastShrink > c.rto { fRtt += maxI64(c.rtt>>4, 1) } else { fRtt += maxI64(c.rtt>>1, 2) } for item := c.outQ.head; item != nil && count < limit; item = item.next { if item.scnt != _SENT_OK { // ACKed has scnt==-1 if item.miss >= 3 && now-item.sent >= fRtt { item.miss = 0 c.internalWrite(item) count++ } } } c.fRCnt += int(count) c.outDupCnt += int(count) return } func (c *Conn) inputAndSend(pk *packet) error { item := &qNode{packet: pk} if c.mySeq&3 == 1 { c.tSlotT0 = NowNS() } c.outlock.Lock() // inflight packets exceeds cwnd // inflight includes: 1, unacked; 2, missed for c.outPending >= c.cwnd+c.missed { c.outlock.Unlock() if c.wtmo > 0 { var tmo int64 tmo, c.wtmo = c.wtmo, 0 select { case v := <-c.evSend: if v == _CLOSE { return io.EOF } case <-NewTimerChan(tmo): return ErrIOTimeout } } else { if v := <-c.evSend; v == _CLOSE { return io.EOF } } c.outlock.Lock() } c.outPending++ c.outPkCnt++ c.mySeq++ pk.seq = c.mySeq c.outQ.appendTail(item) c.internalWrite(item) c.outlock.Unlock() // active resending timer, must blocking c.evSWnd <- _VSWND_ACTIVE if c.mySeq&3 == 0 && c.flatTraffic { // calculate time error bewteen tslot with actual usage. // consider last sleep time error t1 := NowNS() terr := c.tSlot<<2 - c.lastSErr - (t1 - c.tSlotT0) // rest terr/2 if current time usage less than tslot of 100us. if terr > 1e5 { // 100us time.Sleep(time.Duration(terr >> 1)) c.lastSErr = maxI64(NowNS()-t1-terr, 0) } else { c.lastSErr >>= 1 } } return nil } func (c *Conn) internalWrite(item *qNode) { if item.scnt >= 20 { // no exception of sending fin if item.flag&_F_FIN != 0 { c.fakeShutdown() c.dest = nil return } else { log.Println("Warn: too many retries", item) if c.urgent > 0 { // abort c.forceShutdown() return } else { // continue to retry 10 c.urgent++ item.scnt = 10 } } } // update current sent time and prev sent time item.sent, item.sent_1 = Now(), item.sent item.scnt++ buf := item.marshall(c.connID) if debug >= 3 { var pkType = packetTypeNames[item.flag] if item.flag&_F_SACK != 0 { log.Printf("send %s trp=%d on=%d %x", pkType, item.seq, item.ack, buf[_AH_SIZE+4:]) } else { log.Printf("send %s seq=%d ack=%d scnt=%d len=%d", pkType, item.seq, item.ack, item.scnt, len(buf)-_TH_SIZE) } } c.sock.WriteToUDP(buf, c.dest) } func (c *Conn) logAck(ack uint32) { c.lastAck = ack c.lastAckTime = Now() } func (c *Conn) makeLastAck() (pk *packet) { c.inlock.Lock() defer c.inlock.Unlock() if Now()-c.lastAckTime < c.rtt { return nil } pk = &packet{ ack: maxU32(c.lastAck, c.inQ.maxCtnSeq), flag: _F_ACK, } c.logAck(pk.ack) return } func (c *Conn) makeAck(level byte) (pk *packet) { now := Now() if level < _VACK_MUST && now-c.lastAckTime < c.ato { if level < _VACK_QUICK || now-c.lastAckTime < minI64(c.ato>>2, 1) { return } } // ready Q <-| // |-> outQ start (or more right) // |-> bitmap start // [predecessor] [predecessor+1] [predecessor+2] ..... var fakeSAck bool var predecessor = c.inQ.maxCtnSeq bmap, tbl := c.inQ.makeHolesBitmap(predecessor) if len(bmap) <= 0 { // fake sack bmap = make([]uint64, 1) bmap[0], tbl = 1, 1 fakeSAck = true } // head 4-byte: TBL:1 | SCNT:1 | DELAY:2 buf := make([]byte, len(bmap)*8+4) pk = &packet{ ack: predecessor + 1, flag: _F_SACK, payload: buf, } if fakeSAck { pk.ack-- } buf[0] = byte(tbl) // mark delayed time according to the time reference point if trp := c.inQ.lastIns; trp != nil { delayed := now - trp.sent if delayed < c.rtt { pk.seq = trp.seq pk.flag |= _F_TIME buf[1] = trp.scnt if delayed <= 0 { delayed = 1 } binary.BigEndian.PutUint16(buf[2:], uint16(delayed)) } } buf1 := buf[4:] for i, b := range bmap { binary.BigEndian.PutUint64(buf1[i*8:], b) } c.logAck(predecessor) return } func unmarshallSAck(data []byte) (bmap []uint64, tbl uint32, delayed uint16, scnt uint8) { if len(data) > 0 { bmap = make([]uint64, len(data)>>3) } else { return } tbl = uint32(data[0]) scnt = data[1] delayed = binary.BigEndian.Uint16(data[2:]) data = data[4:] for i := 0; i < len(bmap); i++ { bmap[i] = binary.BigEndian.Uint64(data[i*8:]) } return } func calSwnd(bandwidth, rtt int64) int32 { w := int32(bandwidth * rtt / (8000 * _MSS)) if w <= _MAX_SWND { if w >= _MIN_SWND { return w } else { return _MIN_SWND } } else { return _MAX_SWND } } func (c *Conn) measure(seq uint32, delayed int64, scnt uint8) { target := c.outQ.get(seq) if target != nil { var lastSent int64 switch target.scnt - scnt { case 0: // not sent again since this ack was sent out lastSent = target.sent case 1: // sent again once since this ack was sent out // then use prev sent time lastSent = target.sent_1 default: // can't measure here because the packet was sent too many times return } // real-time rtt rtt := Now() - lastSent - delayed // reject these abnormal measures: // 1. rtt too small -> rtt/8 // 2. backlogging too long if rtt < maxI64(c.rtt>>3, 1) || delayed > c.rtt>>1 { return } // srtt: update 1/8 err := rtt - (c.srtt >> 3) c.srtt += err c.rtt = c.srtt >> 3 if c.rtt < _MIN_RTT { c.rtt = _MIN_RTT } // s-swnd: update 1/4 swnd := c.swnd<<3 - c.swnd + calSwnd(c.bandwidth, c.rtt) c.swnd = swnd >> 3 c.tSlot = c.rtt * 1e6 / int64(c.swnd) c.ato = c.rtt >> 4 if c.ato < _MIN_ATO { c.ato = _MIN_ATO } else if c.ato > _MAX_ATO { c.ato = _MAX_ATO } if err < 0 { err = -err err -= c.mdev >> 2 if err > 0 { err >>= 3 } } else { err -= c.mdev >> 2 } // mdev: update 1/4 c.mdev += err rto := c.rtt + maxI64(c.rtt<<1, c.mdev) if rto >= c.rto { c.rto = rto } else { c.rto = (c.rto + rto) >> 1 } if c.rto < _MIN_RTO { c.rto = _MIN_RTO } if debug >= 1 { log.Printf("--- rtt=%d srtt=%d rto=%d swnd=%d", c.rtt, c.srtt, c.rto, c.swnd) } } } func (c *Conn) processSAck(pk *packet) { c.outlock.Lock() bmap, tbl, delayed, scnt := unmarshallSAck(pk.payload) if bmap == nil { // bad packet c.outlock.Unlock() return } if pk.flag&_F_TIME != 0 { c.measure(pk.seq, int64(delayed), scnt) } deleted, missed, continuous := c.outQ.deleteByBitmap(bmap, pk.ack, tbl) if deleted > 0 { c.ackHit(deleted, missed) // lock is released } else { c.outlock.Unlock() } if c.fastRetransmit && !continuous { // peer Q is uncontinuous, then trigger FR if deleted == 0 { c.evSWnd <- _VRETR_IMMED } else { select { case c.evSWnd <- _VRETR_IMMED: default: } } } if debug >= 2 { log.Printf("SACK qhead=%d deleted=%d outPending=%d on=%d %016x", c.outQ.distanceOfHead(0), deleted, c.outPending, pk.ack, bmap) } } func (c *Conn) processAck(pk *packet) { c.outlock.Lock() if end := c.outQ.get(pk.ack); end != nil { // ack hit _, deleted := c.outQ.deleteBefore(end) c.ackHit(deleted, 0) // lock is released if debug >= 2 { log.Printf("ACK hit on=%d", pk.ack) } // special case: ack the FIN if pk.seq == _FIN_ACK_SEQ { select { case c.evClose <- _S_FIN0: default: } } } else { // duplicated ack if debug >= 2 { log.Printf("ACK miss on=%d", pk.ack) } if pk.flag&_F_SYN != 0 { // No.3 Ack lost if pkAck := c.makeLastAck(); pkAck != nil { c.internalWrite(nodeOf(pkAck)) } } c.outlock.Unlock() } } func (c *Conn) ackHit(deleted, missed int32) { // must in outlock c.outPending -= deleted now := Now() if c.cwnd < c.swnd && now-c.lastShrink > c.rto { if c.cwnd < c.swnd>>1 { c.cwnd <<= 1 } else { c.cwnd += deleted << 1 } } if c.cwnd > c.swnd { c.cwnd = c.swnd } if now-c.lastRstMis > c.ato { c.lastRstMis = now c.missed = missed } else { c.missed = c.missed>>1 + missed } if qswnd := c.swnd >> 4; c.missed > qswnd { c.missed = qswnd } c.outlock.Unlock() select { case c.evSend <- 1: default: } } func (c *Conn) insertData(pk *packet) { c.inlock.Lock() defer c.inlock.Unlock() exists := c.inQ.contains(pk.seq) // duplicated with already queued or history // means: last ACK were lost if exists || pk.seq <= c.inQ.maxCtnSeq { // then send ACK for dups select { case c.evAck <- _VACK_MUST: default: } if debug >= 2 { dumpQ(fmt.Sprint("duplicated ", pk.seq), c.inQ) } c.inDupCnt++ return } // record current time in sent and regard as received time item := &qNode{packet: pk, sent: Now()} dis := c.inQ.searchInsert(item, c.lastReadSeq) if debug >= 3 { log.Printf("\t\t\trecv DATA seq=%d dis=%d maxCtn=%d lastReadSeq=%d", item.seq, dis, c.inQ.maxCtnSeq, c.lastReadSeq) } var ackState byte = _VACK_MUST var available bool switch dis { case 0: // impossible return case 1: if c.inQDirty { available = c.inQ.updateContinuous(item) if c.inQ.isWholeContinuous() { // whole Q is ordered c.inQDirty = false } else { //those holes still exists. ackState = _VACK_QUICK } } else { // here is an ideal situation c.inQ.maxCtnSeq = pk.seq available = true ackState = _VACK_SCHED } default: // there is an unordered packet, hole occurred here. if !c.inQDirty { c.inQDirty = true } } // write valid received count c.inPkCnt++ c.inQ.lastIns = item // try notify ack select { case c.evAck <- ackState: default: } if available { // try notify reader select { case c.evRead <- 1: default: } } } func (c *Conn) readInQ() bool { c.inlock.Lock() defer c.inlock.Unlock() // read already <-|-> expected Q // [lastReadSeq] | [lastReadSeq+1] [lastReadSeq+2] ...... if c.inQ.isEqualsHead(c.lastReadSeq+1) && c.lastReadSeq < c.inQ.maxCtnSeq { c.lastReadSeq = c.inQ.maxCtnSeq availabled := c.inQ.get(c.inQ.maxCtnSeq) availabled, _ = c.inQ.deleteBefore(availabled) for i := availabled; i != nil; i = i.next { c.inQReady = append(c.inQReady, i.payload...) // data was copied, then could recycle memory bpool.Put(i.buffer) i.payload = nil i.buffer = nil } return true } return false } // should not call this function concurrently. func (c *Conn) Read(buf []byte) (nr int, err error) { for { if len(c.inQReady) > 0 { n := copy(buf, c.inQReady) c.inQReady = c.inQReady[n:] return n, nil } if !c.readInQ() { if c.rtmo > 0 { var tmo int64 tmo, c.rtmo = c.rtmo, 0 select { case _, y := <-c.evRead: if !y && len(c.inQReady) == 0 { return 0, io.EOF } case <-NewTimerChan(tmo): return 0, ErrIOTimeout } } else { // only when evRead is closed and inQReady is empty // then could reply eof if _, y := <-c.evRead; !y && len(c.inQReady) == 0 { return 0, io.EOF } } } } } // should not call this function concurrently. func (c *Conn) Write(data []byte) (nr int, err error) { for len(data) > 0 && err == nil { //buf := make([]byte, _MSS+_AH_SIZE) buf := bpool.Get(c.mss + _AH_SIZE) body := buf[_TH_SIZE+_CH_SIZE:] n := copy(body, data) nr += n data = data[n:] pk := &packet{flag: _F_DATA, payload: body[:n], buffer: buf[:_AH_SIZE+n]} err = c.inputAndSend(pk) } return } func (c *Conn) LocalAddr() net.Addr { return c.sock.LocalAddr() } func (c *Conn) RemoteAddr() net.Addr { return c.dest } func (c *Conn) SetDeadline(t time.Time) error { c.SetReadDeadline(t) c.SetWriteDeadline(t) return nil } func (c *Conn) SetReadDeadline(t time.Time) error { if d := t.UnixNano()/Millisecond - Now(); d > 0 { c.rtmo = d } return nil } func (c *Conn) SetWriteDeadline(t time.Time) error { if d := t.UnixNano()/Millisecond - Now(); d > 0 { c.wtmo = d } return nil }