bufconn.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package bufconn provides a net.Conn implemented by a buffer and related
  19. // dialing and listening functionality.
  20. package bufconn
  21. import (
  22. "context"
  23. "fmt"
  24. "io"
  25. "net"
  26. "sync"
  27. "time"
  28. )
  29. // Listener implements a net.Listener that creates local, buffered net.Conns
  30. // via its Accept and Dial method.
  31. type Listener struct {
  32. mu sync.Mutex
  33. sz int
  34. ch chan net.Conn
  35. done chan struct{}
  36. }
  37. // Implementation of net.Error providing timeout
  38. type netErrorTimeout struct {
  39. error
  40. }
  41. func (e netErrorTimeout) Timeout() bool { return true }
  42. func (e netErrorTimeout) Temporary() bool { return false }
  43. var errClosed = fmt.Errorf("closed")
  44. var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")}
  45. // Listen returns a Listener that can only be contacted by its own Dialers and
  46. // creates buffered connections between the two.
  47. func Listen(sz int) *Listener {
  48. return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})}
  49. }
  50. // Accept blocks until Dial is called, then returns a net.Conn for the server
  51. // half of the connection.
  52. func (l *Listener) Accept() (net.Conn, error) {
  53. select {
  54. case <-l.done:
  55. return nil, errClosed
  56. case c := <-l.ch:
  57. return c, nil
  58. }
  59. }
  60. // Close stops the listener.
  61. func (l *Listener) Close() error {
  62. l.mu.Lock()
  63. defer l.mu.Unlock()
  64. select {
  65. case <-l.done:
  66. // Already closed.
  67. break
  68. default:
  69. close(l.done)
  70. }
  71. return nil
  72. }
  73. // Addr reports the address of the listener.
  74. func (l *Listener) Addr() net.Addr { return addr{} }
  75. // Dial creates an in-memory full-duplex network connection, unblocks Accept by
  76. // providing it the server half of the connection, and returns the client half
  77. // of the connection.
  78. func (l *Listener) Dial() (net.Conn, error) {
  79. return l.DialContext(context.Background())
  80. }
  81. // DialContext creates an in-memory full-duplex network connection, unblocks Accept by
  82. // providing it the server half of the connection, and returns the client half
  83. // of the connection. If ctx is Done, returns ctx.Err()
  84. func (l *Listener) DialContext(ctx context.Context) (net.Conn, error) {
  85. p1, p2 := newPipe(l.sz), newPipe(l.sz)
  86. select {
  87. case <-ctx.Done():
  88. return nil, ctx.Err()
  89. case <-l.done:
  90. return nil, errClosed
  91. case l.ch <- &conn{p1, p2}:
  92. return &conn{p2, p1}, nil
  93. }
  94. }
  95. type pipe struct {
  96. mu sync.Mutex
  97. // buf contains the data in the pipe. It is a ring buffer of fixed capacity,
  98. // with r and w pointing to the offset to read and write, respsectively.
  99. //
  100. // Data is read between [r, w) and written to [w, r), wrapping around the end
  101. // of the slice if necessary.
  102. //
  103. // The buffer is empty if r == len(buf), otherwise if r == w, it is full.
  104. //
  105. // w and r are always in the range [0, cap(buf)) and [0, len(buf)].
  106. buf []byte
  107. w, r int
  108. wwait sync.Cond
  109. rwait sync.Cond
  110. // Indicate that a write/read timeout has occurred
  111. wtimedout bool
  112. rtimedout bool
  113. wtimer *time.Timer
  114. rtimer *time.Timer
  115. closed bool
  116. writeClosed bool
  117. }
  118. func newPipe(sz int) *pipe {
  119. p := &pipe{buf: make([]byte, 0, sz)}
  120. p.wwait.L = &p.mu
  121. p.rwait.L = &p.mu
  122. p.wtimer = time.AfterFunc(0, func() {})
  123. p.rtimer = time.AfterFunc(0, func() {})
  124. return p
  125. }
  126. func (p *pipe) empty() bool {
  127. return p.r == len(p.buf)
  128. }
  129. func (p *pipe) full() bool {
  130. return p.r < len(p.buf) && p.r == p.w
  131. }
  132. func (p *pipe) Read(b []byte) (n int, err error) {
  133. p.mu.Lock()
  134. defer p.mu.Unlock()
  135. // Block until p has data.
  136. for {
  137. if p.closed {
  138. return 0, io.ErrClosedPipe
  139. }
  140. if !p.empty() {
  141. break
  142. }
  143. if p.writeClosed {
  144. return 0, io.EOF
  145. }
  146. if p.rtimedout {
  147. return 0, errTimeout
  148. }
  149. p.rwait.Wait()
  150. }
  151. wasFull := p.full()
  152. n = copy(b, p.buf[p.r:len(p.buf)])
  153. p.r += n
  154. if p.r == cap(p.buf) {
  155. p.r = 0
  156. p.buf = p.buf[:p.w]
  157. }
  158. // Signal a blocked writer, if any
  159. if wasFull {
  160. p.wwait.Signal()
  161. }
  162. return n, nil
  163. }
  164. func (p *pipe) Write(b []byte) (n int, err error) {
  165. p.mu.Lock()
  166. defer p.mu.Unlock()
  167. if p.closed {
  168. return 0, io.ErrClosedPipe
  169. }
  170. for len(b) > 0 {
  171. // Block until p is not full.
  172. for {
  173. if p.closed || p.writeClosed {
  174. return 0, io.ErrClosedPipe
  175. }
  176. if !p.full() {
  177. break
  178. }
  179. if p.wtimedout {
  180. return 0, errTimeout
  181. }
  182. p.wwait.Wait()
  183. }
  184. wasEmpty := p.empty()
  185. end := cap(p.buf)
  186. if p.w < p.r {
  187. end = p.r
  188. }
  189. x := copy(p.buf[p.w:end], b)
  190. b = b[x:]
  191. n += x
  192. p.w += x
  193. if p.w > len(p.buf) {
  194. p.buf = p.buf[:p.w]
  195. }
  196. if p.w == cap(p.buf) {
  197. p.w = 0
  198. }
  199. // Signal a blocked reader, if any.
  200. if wasEmpty {
  201. p.rwait.Signal()
  202. }
  203. }
  204. return n, nil
  205. }
  206. func (p *pipe) Close() error {
  207. p.mu.Lock()
  208. defer p.mu.Unlock()
  209. p.closed = true
  210. // Signal all blocked readers and writers to return an error.
  211. p.rwait.Broadcast()
  212. p.wwait.Broadcast()
  213. return nil
  214. }
  215. func (p *pipe) closeWrite() error {
  216. p.mu.Lock()
  217. defer p.mu.Unlock()
  218. p.writeClosed = true
  219. // Signal all blocked readers and writers to return an error.
  220. p.rwait.Broadcast()
  221. p.wwait.Broadcast()
  222. return nil
  223. }
  224. type conn struct {
  225. io.Reader
  226. io.Writer
  227. }
  228. func (c *conn) Close() error {
  229. err1 := c.Reader.(*pipe).Close()
  230. err2 := c.Writer.(*pipe).closeWrite()
  231. if err1 != nil {
  232. return err1
  233. }
  234. return err2
  235. }
  236. func (c *conn) SetDeadline(t time.Time) error {
  237. c.SetReadDeadline(t)
  238. c.SetWriteDeadline(t)
  239. return nil
  240. }
  241. func (c *conn) SetReadDeadline(t time.Time) error {
  242. p := c.Reader.(*pipe)
  243. p.mu.Lock()
  244. defer p.mu.Unlock()
  245. p.rtimer.Stop()
  246. p.rtimedout = false
  247. if !t.IsZero() {
  248. p.rtimer = time.AfterFunc(time.Until(t), func() {
  249. p.mu.Lock()
  250. defer p.mu.Unlock()
  251. p.rtimedout = true
  252. p.rwait.Broadcast()
  253. })
  254. }
  255. return nil
  256. }
  257. func (c *conn) SetWriteDeadline(t time.Time) error {
  258. p := c.Writer.(*pipe)
  259. p.mu.Lock()
  260. defer p.mu.Unlock()
  261. p.wtimer.Stop()
  262. p.wtimedout = false
  263. if !t.IsZero() {
  264. p.wtimer = time.AfterFunc(time.Until(t), func() {
  265. p.mu.Lock()
  266. defer p.mu.Unlock()
  267. p.wtimedout = true
  268. p.wwait.Broadcast()
  269. })
  270. }
  271. return nil
  272. }
  273. func (*conn) LocalAddr() net.Addr { return addr{} }
  274. func (*conn) RemoteAddr() net.Addr { return addr{} }
  275. type addr struct{}
  276. func (addr) Network() string { return "bufconn" }
  277. func (addr) String() string { return "bufconn" }