state.go 10 KB


  1. package udptransfer
  2. import (
  3. "errors"
  4. "log"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. const (
  11. _10ms = time.Millisecond * 10
  12. _100ms = time.Millisecond * 100
  13. )
  14. const (
  15. _FIN_ACK_SEQ uint32 = 0xffFF0000
  16. _INVALID_SEQ uint32 = 0xffFFffFF
  17. )
  18. var (
  19. ErrIOTimeout error = &TimeoutError{}
  20. ErrUnknown = errors.New("Unknown error")
  21. ErrInexplicableData = errors.New("Inexplicable data")
  22. ErrTooManyAttempts = errors.New("Too many attempts to connect")
  23. )
  24. type TimeoutError struct{}
  25. func (e *TimeoutError) Error() string { return "i/o timeout" }
  26. func (e *TimeoutError) Timeout() bool { return true }
  27. func (e *TimeoutError) Temporary() bool { return true }
  28. type Conn struct {
  29. sock *net.UDPConn
  30. dest *net.UDPAddr
  31. edp *Endpoint
  32. connID connID // 8 bytes
  33. // events
  34. evRecv chan []byte
  35. evRead chan byte
  36. evSend chan byte
  37. evSWnd chan byte
  38. evAck chan byte
  39. evClose chan byte
  40. // protocol state
  41. inlock sync.Mutex
  42. outlock sync.Mutex
  43. state int32
  44. mySeq uint32
  45. swnd int32
  46. cwnd int32
  47. missed int32
  48. outPending int32
  49. lastAck uint32
  50. lastAckTime int64
  51. lastAckTime2 int64
  52. lastShrink int64
  53. lastRstMis int64
  54. ato int64
  55. rto int64
  56. rtt int64
  57. srtt int64
  58. mdev int64
  59. rtmo int64
  60. wtmo int64
  61. tSlot int64
  62. tSlotT0 int64
  63. lastSErr int64
  64. // queue
  65. outQ *linkedMap
  66. inQ *linkedMap
  67. inQReady []byte
  68. inQDirty bool
  69. lastReadSeq uint32 // last user read seq
  70. // params
  71. bandwidth int64
  72. fastRetransmit bool
  73. flatTraffic bool
  74. mss int
  75. // statistics
  76. urgent int
  77. inPkCnt int
  78. inDupCnt int
  79. outPkCnt int
  80. outDupCnt int
  81. fRCnt int
  82. }
  83. func NewConn(e *Endpoint, dest *net.UDPAddr, id connID) *Conn {
  84. c := &Conn{
  85. sock: e.udpconn,
  86. dest: dest,
  87. edp: e,
  88. connID: id,
  89. evRecv: make(chan []byte, 128),
  90. evRead: make(chan byte, 1),
  91. evSWnd: make(chan byte, 2),
  92. evSend: make(chan byte, 4),
  93. evAck: make(chan byte, 1),
  94. evClose: make(chan byte, 2),
  95. outQ: newLinkedMap(_QModeOut),
  96. inQ: newLinkedMap(_QModeIn),
  97. }
  98. p := e.params
  99. c.bandwidth = p.Bandwidth
  100. c.fastRetransmit = p.FastRetransmit
  101. c.flatTraffic = p.FlatTraffic
  102. c.mss = _MSS
  103. if dest.IP.To4() == nil {
  104. // typical ipv6 header length=40
  105. c.mss -= 20
  106. }
  107. return c
  108. }
  109. func (c *Conn) initConnection(buf []byte) (err error) {
  110. if buf == nil {
  111. err = c.initDialing()
  112. } else { //server
  113. err = c.acceptConnection(buf[_TH_SIZE:])
  114. }
  115. if err != nil {
  116. return
  117. }
  118. if c.state == _S_EST1 {
  119. c.lastReadSeq = c.lastAck
  120. c.inQ.maxCtnSeq = c.lastAck
  121. c.rtt = maxI64(c.rtt, _MIN_RTT)
  122. c.mdev = c.rtt << 1
  123. c.srtt = c.rtt << 3
  124. c.rto = maxI64(c.rtt*2, _MIN_RTO)
  125. c.ato = maxI64(c.rtt>>4, _MIN_ATO)
  126. c.ato = minI64(c.ato, _MAX_ATO)
  127. // initial cwnd
  128. c.swnd = calSwnd(c.bandwidth, c.rtt) >> 1
  129. c.cwnd = 8
  130. go c.internalRecvLoop()
  131. go c.internalSendLoop()
  132. go c.internalAckLoop()
  133. if debug >= 0 {
  134. go c.internal_state()
  135. }
  136. return nil
  137. } else {
  138. return ErrUnknown
  139. }
  140. }
  141. func (c *Conn) initDialing() error {
  142. // first syn
  143. pk := &packet{
  144. seq: c.mySeq,
  145. flag: _F_SYN,
  146. }
  147. item := nodeOf(pk)
  148. var buf []byte
  149. c.state = _S_SYN0
  150. t0 := Now()
  151. for i := 0; i < _MAX_RETRIES && c.state == _S_SYN0; i++ {
  152. // send syn
  153. c.internalWrite(item)
  154. select {
  155. case buf = <-c.evRecv:
  156. c.rtt = Now() - t0
  157. c.state = _S_SYN1
  158. c.connID.setRid(buf)
  159. buf = buf[_TH_SIZE:]
  160. case <-time.After(time.Second):
  161. continue
  162. }
  163. }
  164. if c.state == _S_SYN0 {
  165. return ErrTooManyAttempts
  166. }
  167. unmarshall(pk, buf)
  168. // expected syn+ack
  169. if pk.flag == _F_SYN|_F_ACK && pk.ack == c.mySeq {
  170. if scnt := pk.scnt - 1; scnt > 0 {
  171. c.rtt -= int64(scnt) * 1e3
  172. }
  173. log.Println("rtt", c.rtt)
  174. c.state = _S_EST0
  175. // build ack3
  176. pk.scnt = 0
  177. pk.ack = pk.seq
  178. pk.flag = _F_ACK
  179. item := nodeOf(pk)
  180. // send ack3
  181. c.internalWrite(item)
  182. // update lastAck
  183. c.logAck(pk.ack)
  184. c.state = _S_EST1
  185. return nil
  186. } else {
  187. return ErrInexplicableData
  188. }
  189. }
  190. func (c *Conn) acceptConnection(buf []byte) error {
  191. var pk = new(packet)
  192. var item *qNode
  193. unmarshall(pk, buf)
  194. // expected syn
  195. if pk.flag == _F_SYN {
  196. c.state = _S_SYN1
  197. // build syn+ack
  198. pk.ack = pk.seq
  199. pk.seq = c.mySeq
  200. pk.flag |= _F_ACK
  201. // update lastAck
  202. c.logAck(pk.ack)
  203. item = nodeOf(pk)
  204. item.scnt = pk.scnt - 1
  205. } else {
  206. dumpb("Syn1 ?", buf)
  207. return ErrInexplicableData
  208. }
  209. for i := 0; i < 5 && c.state == _S_SYN1; i++ {
  210. t0 := Now()
  211. // reply syn+ack
  212. c.internalWrite(item)
  213. // recv ack3
  214. select {
  215. case buf = <-c.evRecv:
  216. c.state = _S_EST0
  217. c.rtt = Now() - t0
  218. buf = buf[_TH_SIZE:]
  219. log.Println("rtt", c.rtt)
  220. case <-time.After(time.Second):
  221. continue
  222. }
  223. }
  224. if c.state == _S_SYN1 {
  225. return ErrTooManyAttempts
  226. }
  227. pk = new(packet)
  228. unmarshall(pk, buf)
  229. // expected ack3
  230. if pk.flag == _F_ACK && pk.ack == c.mySeq {
  231. c.state = _S_EST1
  232. } else {
  233. // if ack3 lost, resend syn+ack 3-times
  234. // and drop these coming data
  235. if pk.flag&_F_DATA != 0 && pk.seq > c.lastAck {
  236. c.internalWrite(item)
  237. c.state = _S_EST1
  238. } else {
  239. dumpb("Ack3 ?", buf)
  240. return ErrInexplicableData
  241. }
  242. }
  243. return nil
  244. }
  245. // 20,20,20,20, 100,100,100,100, 1s,1s,1s,1s
  246. func selfSpinWait(fn func() bool) error {
  247. const _MAX_SPIN = 12
  248. for i := 0; i < _MAX_SPIN; i++ {
  249. if fn() {
  250. return nil
  251. } else if i <= 3 {
  252. time.Sleep(_10ms * 2)
  253. } else if i <= 7 {
  254. time.Sleep(_100ms)
  255. } else {
  256. time.Sleep(time.Second)
  257. }
  258. }
  259. return ErrIOTimeout
  260. }
  261. func (c *Conn) IsClosed() bool {
  262. return atomic.LoadInt32(&c.state) <= _S_FIN1
  263. }
  264. /*
  265. active close:
  266. 1 <- send fin-W: closeW()
  267. before sending, ensure all outQ items has beed sent out and all of them has been acked.
  268. 2 -> wait to recv ack{fin-W}
  269. then trigger closeR, including send fin-R and wait to recv ack{fin-R}
  270. passive close:
  271. -> fin:
  272. if outQ is not empty then self-spin wait.
  273. if outQ empty, send ack{fin-W} then goto closeW().
  274. */
  275. func (c *Conn) Close() (err error) {
  276. if !atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN0) {
  277. return selfSpinWait(func() bool {
  278. return atomic.LoadInt32(&c.state) == _S_FIN
  279. })
  280. }
  281. var err0 error
  282. err0 = c.closeW()
  283. // waiting for fin-2 of peer
  284. err = selfSpinWait(func() bool {
  285. select {
  286. case v := <-c.evClose:
  287. if v == _S_FIN {
  288. return true
  289. } else {
  290. time.AfterFunc(_100ms, func() { c.evClose <- v })
  291. }
  292. default:
  293. }
  294. return false
  295. })
  296. defer c.afterShutdown()
  297. if err != nil {
  298. // backup path for wait ack(finW) timeout
  299. c.closeR(nil)
  300. }
  301. if err0 != nil {
  302. return err0
  303. } else {
  304. return
  305. }
  306. }
  307. func (c *Conn) beforeCloseW() (err error) {
  308. // check outQ was empty and all has been acked.
  309. // self-spin waiting
  310. for i := 0; i < 2; i++ {
  311. err = selfSpinWait(func() bool {
  312. return atomic.LoadInt32(&c.outPending) <= 0
  313. })
  314. if err == nil {
  315. break
  316. }
  317. }
  318. // send fin, reliably
  319. c.outlock.Lock()
  320. c.mySeq++
  321. c.outPending++
  322. pk := &packet{seq: c.mySeq, flag: _F_FIN}
  323. item := nodeOf(pk)
  324. c.outQ.appendTail(item)
  325. c.internalWrite(item)
  326. c.outlock.Unlock()
  327. c.evSWnd <- _VSWND_ACTIVE
  328. return
  329. }
  330. func (c *Conn) closeW() (err error) {
  331. // close resource of sending
  332. defer c.afterCloseW()
  333. // send fin
  334. err = c.beforeCloseW()
  335. var closed bool
  336. var max = 20
  337. if c.rtt > 200 {
  338. max = int(c.rtt) / 10
  339. }
  340. // waiting for outQ means:
  341. // 1. all outQ has been acked, for passive
  342. // 2. fin has been acked, for active
  343. for i := 0; i < max && (atomic.LoadInt32(&c.outPending) > 0 || !closed); i++ {
  344. select {
  345. case v := <-c.evClose:
  346. if v == _S_FIN0 {
  347. // namely, last fin has been acked.
  348. closed = true
  349. } else {
  350. time.AfterFunc(_100ms, func() { c.evClose <- v })
  351. }
  352. case <-time.After(_100ms):
  353. }
  354. }
  355. if closed || err != nil {
  356. return
  357. } else {
  358. return ErrIOTimeout
  359. }
  360. }
  361. func (c *Conn) afterCloseW() {
  362. // can't close(c.evRecv), avoid endpoint dispatch exception
  363. // stop pending inputAndSend
  364. select {
  365. case c.evSend <- _CLOSE:
  366. default:
  367. }
  368. // stop internalSendLoop
  369. c.evSWnd <- _CLOSE
  370. }
  371. // called by active and passive close()
  372. func (c *Conn) afterShutdown() {
  373. // stop internalRecvLoop
  374. c.evRecv <- nil
  375. // remove registry
  376. c.edp.removeConn(c.connID, c.dest)
  377. log.Println("shutdown", c.state)
  378. }
  379. // trigger by reset
  380. func (c *Conn) forceShutdownWithLock() {
  381. c.outlock.Lock()
  382. defer c.outlock.Unlock()
  383. c.forceShutdown()
  384. }
  385. // called by:
  386. // 1/ send exception
  387. // 2/ recv reset
  388. // drop outQ and force shutdown
  389. func (c *Conn) forceShutdown() {
  390. if atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN) {
  391. defer c.afterShutdown()
  392. // stop sender
  393. for i := 0; i < cap(c.evSend); i++ {
  394. select {
  395. case <-c.evSend:
  396. default:
  397. }
  398. }
  399. select {
  400. case c.evSend <- _CLOSE:
  401. default:
  402. }
  403. c.outQ.reset()
  404. // stop reader
  405. close(c.evRead)
  406. c.inQ.reset()
  407. // stop internalLoops
  408. c.evSWnd <- _CLOSE
  409. c.evAck <- _CLOSE
  410. //log.Println("force shutdown")
  411. }
  412. }
  413. // for sending fin failed
  414. func (c *Conn) fakeShutdown() {
  415. select {
  416. case c.evClose <- _S_FIN0:
  417. default:
  418. }
  419. }
  420. func (c *Conn) closeR(pk *packet) {
  421. var passive = true
  422. for {
  423. state := atomic.LoadInt32(&c.state)
  424. switch state {
  425. case _S_FIN:
  426. return
  427. case _S_FIN1: // multiple FIN, maybe lost
  428. c.passiveCloseReply(pk, false)
  429. return
  430. case _S_FIN0: // active close preformed
  431. passive = false
  432. }
  433. if !atomic.CompareAndSwapInt32(&c.state, state, _S_FIN1) {
  434. continue
  435. }
  436. c.passiveCloseReply(pk, true)
  437. break
  438. }
  439. // here, R is closed.
  440. // ^^^^^^^^^^^^^^^^^^^^^
  441. if passive {
  442. // passive closing call closeW contains sending fin and recv ack
  443. // may the ack of fin-2 was lost, then the closeW will timeout
  444. c.closeW()
  445. }
  446. // here, R,W both were closed.
  447. // ^^^^^^^^^^^^^^^^^^^^^
  448. atomic.StoreInt32(&c.state, _S_FIN)
  449. // stop internalAckLoop
  450. c.evAck <- _CLOSE
  451. if passive {
  452. // close evRecv within here
  453. c.afterShutdown()
  454. } else {
  455. // notify active close thread
  456. select {
  457. case c.evClose <- _S_FIN:
  458. default:
  459. }
  460. }
  461. }
  462. func (c *Conn) passiveCloseReply(pk *packet, first bool) {
  463. if pk != nil && pk.flag&_F_FIN != 0 {
  464. if first {
  465. c.checkInQ(pk)
  466. close(c.evRead)
  467. }
  468. // ack the FIN
  469. pk = &packet{seq: _FIN_ACK_SEQ, ack: pk.seq, flag: _F_ACK}
  470. item := nodeOf(pk)
  471. c.internalWrite(item)
  472. }
  473. }
  474. // check inQ ends orderly, and copy queue data to user space
  475. func (c *Conn) checkInQ(pk *packet) {
  476. if nil != selfSpinWait(func() bool {
  477. return c.inQ.maxCtnSeq+1 == pk.seq
  478. }) { // timeout for waiting inQ to finish
  479. return
  480. }
  481. c.inlock.Lock()
  482. defer c.inlock.Unlock()
  483. if c.inQ.size() > 0 {
  484. for i := c.inQ.head; i != nil; i = i.next {
  485. c.inQReady = append(c.inQReady, i.payload...)
  486. }
  487. }
  488. }