conn.go 15 KB


  1. package udptransfer
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net"
  8. "time"
  9. )
  10. const (
  11. _MAX_RETRIES = 6
  12. _MIN_RTT = 8
  13. _MIN_RTO = 30
  14. _MIN_ATO = 2
  15. _MAX_ATO = 10
  16. _MIN_SWND = 10
  17. _MAX_SWND = 960
  18. )
  19. const (
  20. _VACK_SCHED = iota + 1
  21. _VACK_QUICK
  22. _VACK_MUST
  23. _VSWND_ACTIVE
  24. _VRETR_IMMED
  25. )
  26. const (
  27. _RETR_REST = -1
  28. _CLOSE = 0xff
  29. )
  30. var debug int
  31. func nodeOf(pk *packet) *qNode {
  32. return &qNode{packet: pk}
  33. }
  34. func (c *Conn) internalRecvLoop() {
  35. defer func() {
  36. // avoid send to closed channel while some replaying
  37. // data packets were received in shutting down.
  38. _ = recover()
  39. }()
  40. var buf, body []byte
  41. for {
  42. select {
  43. case buf = <-c.evRecv:
  44. if buf != nil {
  45. body = buf[_TH_SIZE:]
  46. } else { // shutdown
  47. return
  48. }
  49. }
  50. pk := new(packet)
  51. // keep the original buffer, so we could recycle it in future
  52. pk.buffer = buf
  53. unmarshall(pk, body)
  54. if pk.flag&_F_SACK != 0 {
  55. c.processSAck(pk)
  56. continue
  57. }
  58. if pk.flag&_F_ACK != 0 {
  59. c.processAck(pk)
  60. }
  61. if pk.flag&_F_DATA != 0 {
  62. c.insertData(pk)
  63. } else if pk.flag&_F_FIN != 0 {
  64. if pk.flag&_F_RESET != 0 {
  65. go c.forceShutdownWithLock()
  66. } else {
  67. go c.closeR(pk)
  68. }
  69. }
  70. }
  71. }
  72. func (c *Conn) internalSendLoop() {
  73. var timer = time.NewTimer(time.Duration(c.rtt) * time.Millisecond)
  74. for {
  75. select {
  76. case v := <-c.evSWnd:
  77. switch v {
  78. case _VRETR_IMMED:
  79. c.outlock.Lock()
  80. c.retransmit2()
  81. c.outlock.Unlock()
  82. case _VSWND_ACTIVE:
  83. timer.Reset(time.Duration(c.rtt) * time.Millisecond)
  84. case _CLOSE:
  85. return
  86. }
  87. case <-timer.C: // timeout yet
  88. var notifySender bool
  89. c.outlock.Lock()
  90. rest, _ := c.retransmit()
  91. switch rest {
  92. case _RETR_REST, 0: // nothing to send
  93. if c.outQ.size() > 0 {
  94. timer.Reset(time.Duration(c.rtt) * time.Millisecond)
  95. } else {
  96. timer.Stop()
  97. // avoid sender blocking
  98. notifySender = true
  99. }
  100. default: // recent rto point
  101. timer.Reset(time.Duration(minI64(rest, c.rtt)) * time.Millisecond)
  102. }
  103. c.outlock.Unlock()
  104. if notifySender {
  105. select {
  106. case c.evSend <- 1:
  107. default:
  108. }
  109. }
  110. }
  111. }
  112. }
  113. func (c *Conn) internalAckLoop() {
  114. // var ackTimer = time.NewTicker(time.Duration(c.ato))
  115. var ackTimer = time.NewTimer(time.Duration(c.ato) * time.Millisecond)
  116. var lastAckState byte
  117. for {
  118. var v byte
  119. select {
  120. case <-ackTimer.C:
  121. // may cause sending duplicated ack if ato>rtt
  122. v = _VACK_QUICK
  123. case v = <-c.evAck:
  124. ackTimer.Reset(time.Duration(c.ato) * time.Millisecond)
  125. state := lastAckState
  126. lastAckState = v
  127. if state != v {
  128. if v == _CLOSE {
  129. return
  130. }
  131. v = _VACK_MUST
  132. }
  133. }
  134. c.inlock.Lock()
  135. if pkAck := c.makeAck(v); pkAck != nil {
  136. c.internalWrite(nodeOf(pkAck))
  137. }
  138. c.inlock.Unlock()
  139. }
  140. }
  141. func (c *Conn) retransmit() (rest int64, count int32) {
  142. var now, rto = Now(), c.rto
  143. var limit = c.cwnd
  144. for item := c.outQ.head; item != nil && limit > 0; item = item.next {
  145. if item.scnt != _SENT_OK { // ACKed has scnt==-1
  146. diff := now - item.sent
  147. if diff > rto { // already rto
  148. c.internalWrite(item)
  149. count++
  150. } else {
  151. // continue search next min rto duration
  152. if rest > 0 {
  153. rest = minI64(rest, rto-diff+1)
  154. } else {
  155. rest = rto - diff + 1
  156. }
  157. limit--
  158. }
  159. }
  160. }
  161. c.outDupCnt += int(count)
  162. if count > 0 {
  163. shrcond := (c.fastRetransmit && count > maxI32(c.cwnd>>5, 4)) || (!c.fastRetransmit && count > c.cwnd>>3)
  164. if shrcond && now-c.lastShrink > c.rto {
  165. log.Printf("shrink cwnd from=%d to=%d s/4=%d", c.cwnd, c.cwnd>>1, c.swnd>>2)
  166. c.lastShrink = now
  167. // shrink cwnd and ensure cwnd >= swnd/4
  168. if c.cwnd > c.swnd>>1 {
  169. c.cwnd >>= 1
  170. }
  171. }
  172. }
  173. if c.outQ.size() > 0 {
  174. return
  175. }
  176. return _RETR_REST, 0
  177. }
  178. func (c *Conn) retransmit2() (count int32) {
  179. var limit, now = minI32(c.outPending>>4, 8), Now()
  180. var fRtt = c.rtt
  181. if now-c.lastShrink > c.rto {
  182. fRtt += maxI64(c.rtt>>4, 1)
  183. } else {
  184. fRtt += maxI64(c.rtt>>1, 2)
  185. }
  186. for item := c.outQ.head; item != nil && count < limit; item = item.next {
  187. if item.scnt != _SENT_OK { // ACKed has scnt==-1
  188. if item.miss >= 3 && now-item.sent >= fRtt {
  189. item.miss = 0
  190. c.internalWrite(item)
  191. count++
  192. }
  193. }
  194. }
  195. c.fRCnt += int(count)
  196. c.outDupCnt += int(count)
  197. return
  198. }
  199. func (c *Conn) inputAndSend(pk *packet) error {
  200. item := &qNode{packet: pk}
  201. if c.mySeq&3 == 1 {
  202. c.tSlotT0 = NowNS()
  203. }
  204. c.outlock.Lock()
  205. // inflight packets exceeds cwnd
  206. // inflight includes: 1, unacked; 2, missed
  207. for c.outPending >= c.cwnd+c.missed {
  208. c.outlock.Unlock()
  209. if c.wtmo > 0 {
  210. var tmo int64
  211. tmo, c.wtmo = c.wtmo, 0
  212. select {
  213. case v := <-c.evSend:
  214. if v == _CLOSE {
  215. return io.EOF
  216. }
  217. case <-NewTimerChan(tmo):
  218. return ErrIOTimeout
  219. }
  220. } else {
  221. if v := <-c.evSend; v == _CLOSE {
  222. return io.EOF
  223. }
  224. }
  225. c.outlock.Lock()
  226. }
  227. c.outPending++
  228. c.outPkCnt++
  229. c.mySeq++
  230. pk.seq = c.mySeq
  231. c.outQ.appendTail(item)
  232. c.internalWrite(item)
  233. c.outlock.Unlock()
  234. // active resending timer, must blocking
  235. c.evSWnd <- _VSWND_ACTIVE
  236. if c.mySeq&3 == 0 && c.flatTraffic {
  237. // calculate time error bewteen tslot with actual usage.
  238. // consider last sleep time error
  239. t1 := NowNS()
  240. terr := c.tSlot<<2 - c.lastSErr - (t1 - c.tSlotT0)
  241. // rest terr/2 if current time usage less than tslot of 100us.
  242. if terr > 1e5 { // 100us
  243. time.Sleep(time.Duration(terr >> 1))
  244. c.lastSErr = maxI64(NowNS()-t1-terr, 0)
  245. } else {
  246. c.lastSErr >>= 1
  247. }
  248. }
  249. return nil
  250. }
  251. func (c *Conn) internalWrite(item *qNode) {
  252. if item.scnt >= 20 {
  253. // no exception of sending fin
  254. if item.flag&_F_FIN != 0 {
  255. c.fakeShutdown()
  256. c.dest = nil
  257. return
  258. } else {
  259. log.Println("Warn: too many retries", item)
  260. if c.urgent > 0 { // abort
  261. c.forceShutdown()
  262. return
  263. } else { // continue to retry 10
  264. c.urgent++
  265. item.scnt = 10
  266. }
  267. }
  268. }
  269. // update current sent time and prev sent time
  270. item.sent, item.sent_1 = Now(), item.sent
  271. item.scnt++
  272. buf := item.marshall(c.connID)
  273. if debug >= 3 {
  274. var pkType = packetTypeNames[item.flag]
  275. if item.flag&_F_SACK != 0 {
  276. log.Printf("send %s trp=%d on=%d %x", pkType, item.seq, item.ack, buf[_AH_SIZE+4:])
  277. } else {
  278. log.Printf("send %s seq=%d ack=%d scnt=%d len=%d", pkType, item.seq, item.ack, item.scnt, len(buf)-_TH_SIZE)
  279. }
  280. }
  281. c.sock.WriteToUDP(buf, c.dest)
  282. }
  283. func (c *Conn) logAck(ack uint32) {
  284. c.lastAck = ack
  285. c.lastAckTime = Now()
  286. }
  287. func (c *Conn) makeLastAck() (pk *packet) {
  288. c.inlock.Lock()
  289. defer c.inlock.Unlock()
  290. if Now()-c.lastAckTime < c.rtt {
  291. return nil
  292. }
  293. pk = &packet{
  294. ack: maxU32(c.lastAck, c.inQ.maxCtnSeq),
  295. flag: _F_ACK,
  296. }
  297. c.logAck(pk.ack)
  298. return
  299. }
  300. func (c *Conn) makeAck(level byte) (pk *packet) {
  301. now := Now()
  302. if level < _VACK_MUST && now-c.lastAckTime < c.ato {
  303. if level < _VACK_QUICK || now-c.lastAckTime < minI64(c.ato>>2, 1) {
  304. return
  305. }
  306. }
  307. // ready Q <-|
  308. // |-> outQ start (or more right)
  309. // |-> bitmap start
  310. // [predecessor] [predecessor+1] [predecessor+2] .....
  311. var fakeSAck bool
  312. var predecessor = c.inQ.maxCtnSeq
  313. bmap, tbl := c.inQ.makeHolesBitmap(predecessor)
  314. if len(bmap) <= 0 { // fake sack
  315. bmap = make([]uint64, 1)
  316. bmap[0], tbl = 1, 1
  317. fakeSAck = true
  318. }
  319. // head 4-byte: TBL:1 | SCNT:1 | DELAY:2
  320. buf := make([]byte, len(bmap)*8+4)
  321. pk = &packet{
  322. ack: predecessor + 1,
  323. flag: _F_SACK,
  324. payload: buf,
  325. }
  326. if fakeSAck {
  327. pk.ack--
  328. }
  329. buf[0] = byte(tbl)
  330. // mark delayed time according to the time reference point
  331. if trp := c.inQ.lastIns; trp != nil {
  332. delayed := now - trp.sent
  333. if delayed < c.rtt {
  334. pk.seq = trp.seq
  335. pk.flag |= _F_TIME
  336. buf[1] = trp.scnt
  337. if delayed <= 0 {
  338. delayed = 1
  339. }
  340. binary.BigEndian.PutUint16(buf[2:], uint16(delayed))
  341. }
  342. }
  343. buf1 := buf[4:]
  344. for i, b := range bmap {
  345. binary.BigEndian.PutUint64(buf1[i*8:], b)
  346. }
  347. c.logAck(predecessor)
  348. return
  349. }
  350. func unmarshallSAck(data []byte) (bmap []uint64, tbl uint32, delayed uint16, scnt uint8) {
  351. if len(data) > 0 {
  352. bmap = make([]uint64, len(data)>>3)
  353. } else {
  354. return
  355. }
  356. tbl = uint32(data[0])
  357. scnt = data[1]
  358. delayed = binary.BigEndian.Uint16(data[2:])
  359. data = data[4:]
  360. for i := 0; i < len(bmap); i++ {
  361. bmap[i] = binary.BigEndian.Uint64(data[i*8:])
  362. }
  363. return
  364. }
  365. func calSwnd(bandwidth, rtt int64) int32 {
  366. w := int32(bandwidth * rtt / (8000 * _MSS))
  367. if w <= _MAX_SWND {
  368. if w >= _MIN_SWND {
  369. return w
  370. } else {
  371. return _MIN_SWND
  372. }
  373. } else {
  374. return _MAX_SWND
  375. }
  376. }
  377. func (c *Conn) measure(seq uint32, delayed int64, scnt uint8) {
  378. target := c.outQ.get(seq)
  379. if target != nil {
  380. var lastSent int64
  381. switch target.scnt - scnt {
  382. case 0:
  383. // not sent again since this ack was sent out
  384. lastSent = target.sent
  385. case 1:
  386. // sent again once since this ack was sent out
  387. // then use prev sent time
  388. lastSent = target.sent_1
  389. default:
  390. // can't measure here because the packet was sent too many times
  391. return
  392. }
  393. // real-time rtt
  394. rtt := Now() - lastSent - delayed
  395. // reject these abnormal measures:
  396. // 1. rtt too small -> rtt/8
  397. // 2. backlogging too long
  398. if rtt < maxI64(c.rtt>>3, 1) || delayed > c.rtt>>1 {
  399. return
  400. }
  401. // srtt: update 1/8
  402. err := rtt - (c.srtt >> 3)
  403. c.srtt += err
  404. c.rtt = c.srtt >> 3
  405. if c.rtt < _MIN_RTT {
  406. c.rtt = _MIN_RTT
  407. }
  408. // s-swnd: update 1/4
  409. swnd := c.swnd<<3 - c.swnd + calSwnd(c.bandwidth, c.rtt)
  410. c.swnd = swnd >> 3
  411. c.tSlot = c.rtt * 1e6 / int64(c.swnd)
  412. c.ato = c.rtt >> 4
  413. if c.ato < _MIN_ATO {
  414. c.ato = _MIN_ATO
  415. } else if c.ato > _MAX_ATO {
  416. c.ato = _MAX_ATO
  417. }
  418. if err < 0 {
  419. err = -err
  420. err -= c.mdev >> 2
  421. if err > 0 {
  422. err >>= 3
  423. }
  424. } else {
  425. err -= c.mdev >> 2
  426. }
  427. // mdev: update 1/4
  428. c.mdev += err
  429. rto := c.rtt + maxI64(c.rtt<<1, c.mdev)
  430. if rto >= c.rto {
  431. c.rto = rto
  432. } else {
  433. c.rto = (c.rto + rto) >> 1
  434. }
  435. if c.rto < _MIN_RTO {
  436. c.rto = _MIN_RTO
  437. }
  438. if debug >= 1 {
  439. log.Printf("--- rtt=%d srtt=%d rto=%d swnd=%d", c.rtt, c.srtt, c.rto, c.swnd)
  440. }
  441. }
  442. }
  443. func (c *Conn) processSAck(pk *packet) {
  444. c.outlock.Lock()
  445. bmap, tbl, delayed, scnt := unmarshallSAck(pk.payload)
  446. if bmap == nil { // bad packet
  447. c.outlock.Unlock()
  448. return
  449. }
  450. if pk.flag&_F_TIME != 0 {
  451. c.measure(pk.seq, int64(delayed), scnt)
  452. }
  453. deleted, missed, continuous := c.outQ.deleteByBitmap(bmap, pk.ack, tbl)
  454. if deleted > 0 {
  455. c.ackHit(deleted, missed)
  456. // lock is released
  457. } else {
  458. c.outlock.Unlock()
  459. }
  460. if c.fastRetransmit && !continuous {
  461. // peer Q is uncontinuous, then trigger FR
  462. if deleted == 0 {
  463. c.evSWnd <- _VRETR_IMMED
  464. } else {
  465. select {
  466. case c.evSWnd <- _VRETR_IMMED:
  467. default:
  468. }
  469. }
  470. }
  471. if debug >= 2 {
  472. log.Printf("SACK qhead=%d deleted=%d outPending=%d on=%d %016x",
  473. c.outQ.distanceOfHead(0), deleted, c.outPending, pk.ack, bmap)
  474. }
  475. }
  476. func (c *Conn) processAck(pk *packet) {
  477. c.outlock.Lock()
  478. if end := c.outQ.get(pk.ack); end != nil { // ack hit
  479. _, deleted := c.outQ.deleteBefore(end)
  480. c.ackHit(deleted, 0) // lock is released
  481. if debug >= 2 {
  482. log.Printf("ACK hit on=%d", pk.ack)
  483. }
  484. // special case: ack the FIN
  485. if pk.seq == _FIN_ACK_SEQ {
  486. select {
  487. case c.evClose <- _S_FIN0:
  488. default:
  489. }
  490. }
  491. } else { // duplicated ack
  492. if debug >= 2 {
  493. log.Printf("ACK miss on=%d", pk.ack)
  494. }
  495. if pk.flag&_F_SYN != 0 { // No.3 Ack lost
  496. if pkAck := c.makeLastAck(); pkAck != nil {
  497. c.internalWrite(nodeOf(pkAck))
  498. }
  499. }
  500. c.outlock.Unlock()
  501. }
  502. }
  503. func (c *Conn) ackHit(deleted, missed int32) {
  504. // must in outlock
  505. c.outPending -= deleted
  506. now := Now()
  507. if c.cwnd < c.swnd && now-c.lastShrink > c.rto {
  508. if c.cwnd < c.swnd>>1 {
  509. c.cwnd <<= 1
  510. } else {
  511. c.cwnd += deleted << 1
  512. }
  513. }
  514. if c.cwnd > c.swnd {
  515. c.cwnd = c.swnd
  516. }
  517. if now-c.lastRstMis > c.ato {
  518. c.lastRstMis = now
  519. c.missed = missed
  520. } else {
  521. c.missed = c.missed>>1 + missed
  522. }
  523. if qswnd := c.swnd >> 4; c.missed > qswnd {
  524. c.missed = qswnd
  525. }
  526. c.outlock.Unlock()
  527. select {
  528. case c.evSend <- 1:
  529. default:
  530. }
  531. }
  532. func (c *Conn) insertData(pk *packet) {
  533. c.inlock.Lock()
  534. defer c.inlock.Unlock()
  535. exists := c.inQ.contains(pk.seq)
  536. // duplicated with already queued or history
  537. // means: last ACK were lost
  538. if exists || pk.seq <= c.inQ.maxCtnSeq {
  539. // then send ACK for dups
  540. select {
  541. case c.evAck <- _VACK_MUST:
  542. default:
  543. }
  544. if debug >= 2 {
  545. dumpQ(fmt.Sprint("duplicated ", pk.seq), c.inQ)
  546. }
  547. c.inDupCnt++
  548. return
  549. }
  550. // record current time in sent and regard as received time
  551. item := &qNode{packet: pk, sent: Now()}
  552. dis := c.inQ.searchInsert(item, c.lastReadSeq)
  553. if debug >= 3 {
  554. log.Printf("\t\t\trecv DATA seq=%d dis=%d maxCtn=%d lastReadSeq=%d", item.seq, dis, c.inQ.maxCtnSeq, c.lastReadSeq)
  555. }
  556. var ackState byte = _VACK_MUST
  557. var available bool
  558. switch dis {
  559. case 0: // impossible
  560. return
  561. case 1:
  562. if c.inQDirty {
  563. available = c.inQ.updateContinuous(item)
  564. if c.inQ.isWholeContinuous() { // whole Q is ordered
  565. c.inQDirty = false
  566. } else { //those holes still exists.
  567. ackState = _VACK_QUICK
  568. }
  569. } else {
  570. // here is an ideal situation
  571. c.inQ.maxCtnSeq = pk.seq
  572. available = true
  573. ackState = _VACK_SCHED
  574. }
  575. default: // there is an unordered packet, hole occurred here.
  576. if !c.inQDirty {
  577. c.inQDirty = true
  578. }
  579. }
  580. // write valid received count
  581. c.inPkCnt++
  582. c.inQ.lastIns = item
  583. // try notify ack
  584. select {
  585. case c.evAck <- ackState:
  586. default:
  587. }
  588. if available { // try notify reader
  589. select {
  590. case c.evRead <- 1:
  591. default:
  592. }
  593. }
  594. }
  595. func (c *Conn) readInQ() bool {
  596. c.inlock.Lock()
  597. defer c.inlock.Unlock()
  598. // read already <-|-> expected Q
  599. // [lastReadSeq] | [lastReadSeq+1] [lastReadSeq+2] ......
  600. if c.inQ.isEqualsHead(c.lastReadSeq+1) && c.lastReadSeq < c.inQ.maxCtnSeq {
  601. c.lastReadSeq = c.inQ.maxCtnSeq
  602. availabled := c.inQ.get(c.inQ.maxCtnSeq)
  603. availabled, _ = c.inQ.deleteBefore(availabled)
  604. for i := availabled; i != nil; i = i.next {
  605. c.inQReady = append(c.inQReady, i.payload...)
  606. // data was copied, then could recycle memory
  607. bpool.Put(i.buffer)
  608. i.payload = nil
  609. i.buffer = nil
  610. }
  611. return true
  612. }
  613. return false
  614. }
  615. // should not call this function concurrently.
  616. func (c *Conn) Read(buf []byte) (nr int, err error) {
  617. for {
  618. if len(c.inQReady) > 0 {
  619. n := copy(buf, c.inQReady)
  620. c.inQReady = c.inQReady[n:]
  621. return n, nil
  622. }
  623. if !c.readInQ() {
  624. if c.rtmo > 0 {
  625. var tmo int64
  626. tmo, c.rtmo = c.rtmo, 0
  627. select {
  628. case _, y := <-c.evRead:
  629. if !y && len(c.inQReady) == 0 {
  630. return 0, io.EOF
  631. }
  632. case <-NewTimerChan(tmo):
  633. return 0, ErrIOTimeout
  634. }
  635. } else {
  636. // only when evRead is closed and inQReady is empty
  637. // then could reply eof
  638. if _, y := <-c.evRead; !y && len(c.inQReady) == 0 {
  639. return 0, io.EOF
  640. }
  641. }
  642. }
  643. }
  644. }
  645. // should not call this function concurrently.
  646. func (c *Conn) Write(data []byte) (nr int, err error) {
  647. for len(data) > 0 && err == nil {
  648. //buf := make([]byte, _MSS+_AH_SIZE)
  649. buf := bpool.Get(c.mss + _AH_SIZE)
  650. body := buf[_TH_SIZE+_CH_SIZE:]
  651. n := copy(body, data)
  652. nr += n
  653. data = data[n:]
  654. pk := &packet{flag: _F_DATA, payload: body[:n], buffer: buf[:_AH_SIZE+n]}
  655. err = c.inputAndSend(pk)
  656. }
  657. return
  658. }
  659. func (c *Conn) LocalAddr() net.Addr {
  660. return c.sock.LocalAddr()
  661. }
  662. func (c *Conn) RemoteAddr() net.Addr {
  663. return c.dest
  664. }
  665. func (c *Conn) SetDeadline(t time.Time) error {
  666. c.SetReadDeadline(t)
  667. c.SetWriteDeadline(t)
  668. return nil
  669. }
  670. func (c *Conn) SetReadDeadline(t time.Time) error {
  671. if d := t.UnixNano()/Millisecond - Now(); d > 0 {
  672. c.rtmo = d
  673. }
  674. return nil
  675. }
  676. func (c *Conn) SetWriteDeadline(t time.Time) error {
  677. if d := t.UnixNano()/Millisecond - Now(); d > 0 {
  678. c.wtmo = d
  679. }
  680. return nil
  681. }