latency.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package latency provides wrappers for net.Conn, net.Listener, and
  19. // net.Dialers, designed to interoperate to inject real-world latency into
  20. // network connections.
  21. package latency
  22. import (
  23. "bytes"
  24. "context"
  25. "encoding/binary"
  26. "fmt"
  27. "io"
  28. "net"
  29. "time"
  30. )
  31. // Dialer is a function matching the signature of net.Dial.
  32. type Dialer func(network, address string) (net.Conn, error)
  33. // TimeoutDialer is a function matching the signature of net.DialTimeout.
  34. type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
  35. // ContextDialer is a function matching the signature of
  36. // net.Dialer.DialContext.
  37. type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
  38. // Network represents a network with the given bandwidth, latency, and MTU
  39. // (Maximum Transmission Unit) configuration, and can produce wrappers of
  40. // net.Listeners, net.Conn, and various forms of dialing functions. The
  41. // Listeners and Dialers/Conns on both sides of connections must come from this
  42. // package, but need not be created from the same Network. Latency is computed
  43. // when sending (in Write), and is injected when receiving (in Read). This
  44. // allows senders' Write calls to be non-blocking, as in real-world
  45. // applications.
  46. //
  47. // Note: Latency is injected by the sender specifying the absolute time data
  48. // should be available, and the reader delaying until that time arrives to
  49. // provide the data. This package attempts to counter-act the effects of clock
  50. // drift and existing network latency by measuring the delay between the
  51. // sender's transmission time and the receiver's reception time during startup.
  52. // No attempt is made to measure the existing bandwidth of the connection.
  53. type Network struct {
  54. Kbps int // Kilobits per second; if non-positive, infinite
  55. Latency time.Duration // One-way latency (sending); if non-positive, no delay
  56. MTU int // Bytes per packet; if non-positive, infinite
  57. }
  58. var (
  59. //Local simulates local network.
  60. Local = Network{0, 0, 0}
  61. //LAN simulates local area network network.
  62. LAN = Network{100 * 1024, 2 * time.Millisecond, 1500}
  63. //WAN simulates wide area network.
  64. WAN = Network{20 * 1024, 30 * time.Millisecond, 1500}
  65. //Longhaul simulates bad network.
  66. Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000}
  67. )
  68. // Conn returns a net.Conn that wraps c and injects n's latency into that
  69. // connection. This function also imposes latency for connection creation.
  70. // If n's Latency is lower than the measured latency in c, an error is
  71. // returned.
  72. func (n *Network) Conn(c net.Conn) (net.Conn, error) {
  73. start := now()
  74. nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
  75. if err := nc.sync(); err != nil {
  76. return nil, err
  77. }
  78. sleep(start.Add(nc.delay).Sub(now()))
  79. return nc, nil
  80. }
  81. type conn struct {
  82. net.Conn
  83. network *Network
  84. readBuf *bytes.Buffer // one packet worth of data received
  85. lastSendEnd time.Time // time the previous Write should be fully on the wire
  86. delay time.Duration // desired latency - measured latency
  87. }
  88. // header is sent before all data transmitted by the application.
  89. type header struct {
  90. ReadTime int64 // Time the reader is allowed to read this packet (UnixNano)
  91. Sz int32 // Size of the data in the packet
  92. }
  93. func (c *conn) Write(p []byte) (n int, err error) {
  94. tNow := now()
  95. if c.lastSendEnd.Before(tNow) {
  96. c.lastSendEnd = tNow
  97. }
  98. for len(p) > 0 {
  99. pkt := p
  100. if c.network.MTU > 0 && len(pkt) > c.network.MTU {
  101. pkt = pkt[:c.network.MTU]
  102. p = p[c.network.MTU:]
  103. } else {
  104. p = nil
  105. }
  106. if c.network.Kbps > 0 {
  107. if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
  108. // The network is full; sleep until this packet can be sent.
  109. sleep(congestion)
  110. tNow = tNow.Add(congestion)
  111. }
  112. }
  113. c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
  114. hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
  115. if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
  116. return n, err
  117. }
  118. x, err := c.Conn.Write(pkt)
  119. n += x
  120. if err != nil {
  121. return n, err
  122. }
  123. }
  124. return n, nil
  125. }
  126. func (c *conn) Read(p []byte) (n int, err error) {
  127. if c.readBuf.Len() == 0 {
  128. var hdr header
  129. if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
  130. return 0, err
  131. }
  132. defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
  133. if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
  134. return 0, err
  135. }
  136. }
  137. // Read from readBuf.
  138. return c.readBuf.Read(p)
  139. }
  140. // sync does a handshake and then measures the latency on the network in
  141. // coordination with the other side.
  142. func (c *conn) sync() error {
  143. const (
  144. pingMsg = "syncPing"
  145. warmup = 10 // minimum number of iterations to measure latency
  146. giveUp = 50 // maximum number of iterations to measure latency
  147. accuracy = time.Millisecond // req'd accuracy to stop early
  148. goodRun = 3 // stop early if latency within accuracy this many times
  149. )
  150. type syncMsg struct {
  151. SendT int64 // Time sent. If zero, stop.
  152. RecvT int64 // Time received. If zero, fill in and respond.
  153. }
  154. // A trivial handshake
  155. if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
  156. return err
  157. }
  158. var ping [8]byte
  159. if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
  160. return err
  161. } else if string(ping[:]) != pingMsg {
  162. return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
  163. }
  164. // Both sides are alive and syncing. Calculate network delay / clock skew.
  165. att := 0
  166. good := 0
  167. var latency time.Duration
  168. localDone, remoteDone := false, false
  169. send := true
  170. for !localDone || !remoteDone {
  171. if send {
  172. if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
  173. return err
  174. }
  175. att++
  176. send = false
  177. }
  178. // Block until we get a syncMsg
  179. m := syncMsg{}
  180. if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
  181. return err
  182. }
  183. if m.RecvT == 0 {
  184. // Message initiated from other side.
  185. if m.SendT == 0 {
  186. remoteDone = true
  187. continue
  188. }
  189. // Send response.
  190. m.RecvT = now().UnixNano()
  191. if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
  192. return err
  193. }
  194. continue
  195. }
  196. lag := time.Duration(m.RecvT - m.SendT)
  197. latency += lag
  198. avgLatency := latency / time.Duration(att)
  199. if e := lag - avgLatency; e > -accuracy && e < accuracy {
  200. good++
  201. } else {
  202. good = 0
  203. }
  204. if att < giveUp && (att < warmup || good < goodRun) {
  205. send = true
  206. continue
  207. }
  208. localDone = true
  209. latency = avgLatency
  210. // Tell the other side we're done.
  211. if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
  212. return err
  213. }
  214. }
  215. if c.network.Latency <= 0 {
  216. return nil
  217. }
  218. c.delay = c.network.Latency - latency
  219. if c.delay < 0 {
  220. return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
  221. }
  222. return nil
  223. }
  224. // Listener returns a net.Listener that wraps l and injects n's latency in its
  225. // connections.
  226. func (n *Network) Listener(l net.Listener) net.Listener {
  227. return &listener{Listener: l, network: n}
  228. }
  229. type listener struct {
  230. net.Listener
  231. network *Network
  232. }
  233. func (l *listener) Accept() (net.Conn, error) {
  234. c, err := l.Listener.Accept()
  235. if err != nil {
  236. return nil, err
  237. }
  238. return l.network.Conn(c)
  239. }
  240. // Dialer returns a Dialer that wraps d and injects n's latency in its
  241. // connections. n's Latency is also injected to the connection's creation.
  242. func (n *Network) Dialer(d Dialer) Dialer {
  243. return func(network, address string) (net.Conn, error) {
  244. conn, err := d(network, address)
  245. if err != nil {
  246. return nil, err
  247. }
  248. return n.Conn(conn)
  249. }
  250. }
  251. // TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency
  252. // in its connections. n's Latency is also injected to the connection's
  253. // creation.
  254. func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
  255. return func(network, address string, timeout time.Duration) (net.Conn, error) {
  256. conn, err := d(network, address, timeout)
  257. if err != nil {
  258. return nil, err
  259. }
  260. return n.Conn(conn)
  261. }
  262. }
  263. // ContextDialer returns a ContextDialer that wraps d and injects n's latency
  264. // in its connections. n's Latency is also injected to the connection's
  265. // creation.
  266. func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
  267. return func(ctx context.Context, network, address string) (net.Conn, error) {
  268. conn, err := d(ctx, network, address)
  269. if err != nil {
  270. return nil, err
  271. }
  272. return n.Conn(conn)
  273. }
  274. }
  275. // pktTime returns the time it takes to transmit one packet of data of size b
  276. // in bytes.
  277. func (n *Network) pktTime(b int) time.Duration {
  278. if n.Kbps <= 0 {
  279. return time.Duration(0)
  280. }
  281. return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
  282. }
  283. // Wrappers for testing
  284. var now = time.Now
  285. var sleep = time.Sleep