writer_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. // Copyright (c) 2019+ Klaus Post. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package s2
  5. import (
  6. "bytes"
  7. "fmt"
  8. "io"
  9. "math/rand"
  10. "os"
  11. "runtime"
  12. "strings"
  13. "testing"
  14. "github.com/klauspost/compress/internal/snapref"
  15. "github.com/klauspost/compress/zip"
  16. )
  17. func testOptions(_ testing.TB) map[string][]WriterOption {
  18. var testOptions = map[string][]WriterOption{
  19. "default": {WriterAddIndex()},
  20. "better": {WriterBetterCompression()},
  21. "best": {WriterBestCompression()},
  22. "none": {WriterUncompressed()},
  23. }
  24. x := make(map[string][]WriterOption)
  25. cloneAdd := func(org []WriterOption, add ...WriterOption) []WriterOption {
  26. y := make([]WriterOption, len(org)+len(add))
  27. copy(y, org)
  28. copy(y[len(org):], add)
  29. return y
  30. }
  31. for name, opt := range testOptions {
  32. x[name] = opt
  33. x[name+"-c1"] = cloneAdd(opt, WriterConcurrency(1))
  34. }
  35. testOptions = x
  36. x = make(map[string][]WriterOption)
  37. for name, opt := range testOptions {
  38. x[name] = opt
  39. if !testing.Short() {
  40. x[name+"-4k-win"] = cloneAdd(opt, WriterBlockSize(4<<10))
  41. x[name+"-4M-win"] = cloneAdd(opt, WriterBlockSize(4<<20))
  42. }
  43. }
  44. testOptions = x
  45. x = make(map[string][]WriterOption)
  46. for name, opt := range testOptions {
  47. x[name] = opt
  48. x[name+"-pad-min"] = cloneAdd(opt, WriterPadding(2), WriterPaddingSrc(zeroReader{}))
  49. if !testing.Short() {
  50. x[name+"-pad-8000"] = cloneAdd(opt, WriterPadding(8000), WriterPaddingSrc(zeroReader{}))
  51. x[name+"-pad-max"] = cloneAdd(opt, WriterPadding(4<<20), WriterPaddingSrc(zeroReader{}))
  52. }
  53. }
  54. for name, opt := range testOptions {
  55. x[name] = opt
  56. x[name+"-snappy"] = cloneAdd(opt, WriterSnappyCompat())
  57. x[name+"-custom"] = cloneAdd(opt, WriterCustomEncoder(snapref.EncodeBlockInto))
  58. }
  59. testOptions = x
  60. return testOptions
  61. }
  62. type zeroReader struct{}
  63. func (zeroReader) Read(p []byte) (int, error) {
  64. for i := range p {
  65. p[i] = 0
  66. }
  67. return len(p), nil
  68. }
  69. func TestEncoderRegression(t *testing.T) {
  70. data, err := os.ReadFile("testdata/enc_regressions.zip")
  71. if err != nil {
  72. t.Fatal(err)
  73. }
  74. zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. // Same as fuzz test...
  79. test := func(t *testing.T, data []byte) {
  80. if testing.Short() && len(data) > 10000 {
  81. t.SkipNow()
  82. }
  83. var blocksTested bool
  84. for name, opts := range testOptions(t) {
  85. t.Run(name, func(t *testing.T) {
  86. var buf bytes.Buffer
  87. dec := NewReader(nil)
  88. enc := NewWriter(&buf, opts...)
  89. if !blocksTested {
  90. comp := Encode(make([]byte, MaxEncodedLen(len(data))), data)
  91. decoded, err := Decode(nil, comp)
  92. if err != nil {
  93. t.Error(err)
  94. return
  95. }
  96. if !bytes.Equal(data, decoded) {
  97. t.Error("block decoder mismatch")
  98. return
  99. }
  100. if mel := MaxEncodedLen(len(data)); len(comp) > mel {
  101. t.Error(fmt.Errorf("MaxEncodedLen Exceed: input: %d, mel: %d, got %d", len(data), mel, len(comp)))
  102. return
  103. }
  104. comp = EncodeBetter(make([]byte, MaxEncodedLen(len(data))), data)
  105. decoded, err = Decode(nil, comp)
  106. if err != nil {
  107. t.Error(err)
  108. return
  109. }
  110. if !bytes.Equal(data, decoded) {
  111. t.Error("block decoder mismatch")
  112. return
  113. }
  114. if mel := MaxEncodedLen(len(data)); len(comp) > mel {
  115. t.Error(fmt.Errorf("MaxEncodedLen Exceed: input: %d, mel: %d, got %d", len(data), mel, len(comp)))
  116. return
  117. }
  118. comp = EncodeBest(make([]byte, MaxEncodedLen(len(data))), data)
  119. decoded, err = Decode(nil, comp)
  120. if err != nil {
  121. t.Error(err)
  122. return
  123. }
  124. if !bytes.Equal(data, decoded) {
  125. t.Error("block decoder mismatch")
  126. return
  127. }
  128. if mel := MaxEncodedLen(len(data)); len(comp) > mel {
  129. t.Error(fmt.Errorf("MaxEncodedLen Exceed: input: %d, mel: %d, got %d", len(data), mel, len(comp)))
  130. return
  131. }
  132. blocksTested = true
  133. }
  134. // Test writer.
  135. n, err := enc.Write(data)
  136. if err != nil {
  137. t.Error(err)
  138. return
  139. }
  140. if n != len(data) {
  141. t.Error(fmt.Errorf("Write: Short write, want %d, got %d", len(data), n))
  142. return
  143. }
  144. err = enc.Close()
  145. if err != nil {
  146. t.Error(err)
  147. return
  148. }
  149. // Calling close twice should not affect anything.
  150. err = enc.Close()
  151. if err != nil {
  152. t.Error(err)
  153. return
  154. }
  155. comp := buf.Bytes()
  156. if enc.pad > 0 && len(comp)%enc.pad != 0 {
  157. t.Error(fmt.Errorf("wanted size to be mutiple of %d, got size %d with remainder %d", enc.pad, len(comp), len(comp)%enc.pad))
  158. return
  159. }
  160. var got []byte
  161. if !strings.Contains(name, "-snappy") {
  162. dec.Reset(&buf)
  163. got, err = io.ReadAll(dec)
  164. } else {
  165. sdec := snapref.NewReader(&buf)
  166. got, err = io.ReadAll(sdec)
  167. }
  168. if err != nil {
  169. t.Error(err)
  170. return
  171. }
  172. if !bytes.Equal(data, got) {
  173. t.Error("block (reset) decoder mismatch")
  174. return
  175. }
  176. // Test Reset on both and use ReadFrom instead.
  177. buf.Reset()
  178. enc.Reset(&buf)
  179. n2, err := enc.ReadFrom(bytes.NewBuffer(data))
  180. if err != nil {
  181. t.Error(err)
  182. return
  183. }
  184. if n2 != int64(len(data)) {
  185. t.Error(fmt.Errorf("ReadFrom: Short read, want %d, got %d", len(data), n2))
  186. return
  187. }
  188. err = enc.Close()
  189. if err != nil {
  190. t.Error(err)
  191. return
  192. }
  193. if enc.pad > 0 && buf.Len()%enc.pad != 0 {
  194. t.Error(fmt.Errorf("wanted size to be mutiple of %d, got size %d with remainder %d", enc.pad, buf.Len(), buf.Len()%enc.pad))
  195. return
  196. }
  197. if !strings.Contains(name, "-snappy") {
  198. dec.Reset(&buf)
  199. got, err = io.ReadAll(dec)
  200. } else {
  201. sdec := snapref.NewReader(&buf)
  202. got, err = io.ReadAll(sdec)
  203. }
  204. if err != nil {
  205. t.Error(err)
  206. return
  207. }
  208. if !bytes.Equal(data, got) {
  209. t.Error("frame (reset) decoder mismatch")
  210. return
  211. }
  212. })
  213. }
  214. }
  215. for _, tt := range zr.File {
  216. if !strings.HasSuffix(t.Name(), "") {
  217. continue
  218. }
  219. t.Run(tt.Name, func(t *testing.T) {
  220. r, err := tt.Open()
  221. if err != nil {
  222. t.Error(err)
  223. return
  224. }
  225. b, err := io.ReadAll(r)
  226. if err != nil {
  227. t.Error(err)
  228. return
  229. }
  230. test(t, b[:len(b):len(b)])
  231. })
  232. }
  233. }
  234. func TestIndex(t *testing.T) {
  235. fatalErr := func(t testing.TB, err error) {
  236. if err != nil {
  237. t.Fatal(err)
  238. }
  239. }
  240. // Create a test corpus
  241. var input []byte
  242. if !testing.Short() {
  243. input = make([]byte, 10<<20)
  244. } else {
  245. input = make([]byte, 500<<10)
  246. }
  247. rng := rand.New(rand.NewSource(0xabeefcafe))
  248. rng.Read(input)
  249. // Make it compressible...
  250. for i, v := range input {
  251. input[i] = '0' + v&3
  252. }
  253. // Compress it...
  254. var buf bytes.Buffer
  255. // We use smaller blocks just for the example...
  256. enc := NewWriter(&buf, WriterBlockSize(100<<10), WriterAddIndex(), WriterBetterCompression(), WriterConcurrency(runtime.GOMAXPROCS(0)))
  257. todo := input
  258. for len(todo) > 0 {
  259. // Write random sized inputs..
  260. x := todo[:rng.Intn(1+len(todo)&65535)]
  261. if len(x) == 0 {
  262. x = todo[:1]
  263. }
  264. _, err := enc.Write(x)
  265. fatalErr(t, err)
  266. // Flush once in a while
  267. if rng.Intn(8) == 0 {
  268. err = enc.Flush()
  269. fatalErr(t, err)
  270. }
  271. todo = todo[len(x):]
  272. }
  273. // Close and also get index...
  274. idxBytes, err := enc.CloseIndex()
  275. fatalErr(t, err)
  276. if false {
  277. // Load the index.
  278. var index Index
  279. _, err = index.Load(idxBytes)
  280. fatalErr(t, err)
  281. t.Log(string(index.JSON()))
  282. }
  283. // This is our compressed stream...
  284. compressed := buf.Bytes()
  285. for wantOffset := int64(0); wantOffset < int64(len(input)); wantOffset += 65531 {
  286. t.Run(fmt.Sprintf("offset-%d", wantOffset), func(t *testing.T) {
  287. // Let's assume we want to read from uncompressed offset 'i'
  288. // and we cannot seek in input, but we have the index.
  289. want := input[wantOffset:]
  290. // Load the index.
  291. var index Index
  292. _, err = index.Load(idxBytes)
  293. fatalErr(t, err)
  294. // Find offset in file:
  295. compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
  296. fatalErr(t, err)
  297. // Offset the input to the compressed offset.
  298. // Notice how we do not provide any bytes before the offset.
  299. in := io.Reader(bytes.NewBuffer(compressed[compressedOffset:]))
  300. // When creating the decoder we must specify that it should not
  301. // expect a stream identifier at the beginning og the frame.
  302. dec := NewReader(in, ReaderIgnoreStreamIdentifier())
  303. // We now have a reader, but it will start outputting at uncompressedOffset,
  304. // and not the actual offset we want, so skip forward to that.
  305. toSkip := wantOffset - uncompressedOffset
  306. err = dec.Skip(toSkip)
  307. fatalErr(t, err)
  308. // Read the rest of the stream...
  309. got, err := io.ReadAll(dec)
  310. fatalErr(t, err)
  311. if !bytes.Equal(got, want) {
  312. t.Error("Result mismatch", wantOffset)
  313. }
  314. // Test with stream index...
  315. for i := io.SeekStart; i <= io.SeekEnd; i++ {
  316. t.Run(fmt.Sprintf("seek-%d", i), func(t *testing.T) {
  317. // Read it from a seekable stream
  318. dec = NewReader(bytes.NewReader(compressed))
  319. rs, err := dec.ReadSeeker(true, nil)
  320. fatalErr(t, err)
  321. // Read a little...
  322. var tmp = make([]byte, len(input)/2)
  323. _, err = io.ReadFull(rs, tmp[:])
  324. fatalErr(t, err)
  325. toSkip := wantOffset
  326. switch i {
  327. case io.SeekStart:
  328. case io.SeekCurrent:
  329. toSkip = wantOffset - int64(len(input)/2)
  330. case io.SeekEnd:
  331. toSkip = -(int64(len(input)) - wantOffset)
  332. }
  333. gotOffset, err := rs.Seek(toSkip, i)
  334. if gotOffset != wantOffset {
  335. t.Errorf("got offset %d, want %d", gotOffset, wantOffset)
  336. }
  337. // Read the rest of the stream...
  338. got, err := io.ReadAll(dec)
  339. fatalErr(t, err)
  340. if !bytes.Equal(got, want) {
  341. t.Error("Result mismatch", wantOffset)
  342. }
  343. })
  344. }
  345. t.Run(fmt.Sprintf("ReadAt"), func(t *testing.T) {
  346. // Read it from a seekable stream
  347. dec = NewReader(bytes.NewReader(compressed))
  348. rs, err := dec.ReadSeeker(true, nil)
  349. fatalErr(t, err)
  350. // Read a little...
  351. var tmp = make([]byte, len(input)/2)
  352. _, err = io.ReadFull(rs, tmp[:])
  353. fatalErr(t, err)
  354. wantLen := len(tmp)
  355. if wantLen+int(wantOffset) > len(input) {
  356. wantLen = len(input) - int(wantOffset)
  357. }
  358. // Read from wantOffset
  359. n, err := rs.ReadAt(tmp, wantOffset)
  360. if n != wantLen {
  361. t.Errorf("got length %d, want %d", n, wantLen)
  362. }
  363. if err != io.EOF {
  364. fatalErr(t, err)
  365. }
  366. want := want[:n]
  367. got := tmp[:n]
  368. // Read the rest of the stream...
  369. if !bytes.Equal(got, want) {
  370. t.Error("Result mismatch", wantOffset)
  371. }
  372. })
  373. })
  374. }
  375. }
  376. func TestWriterPadding(t *testing.T) {
  377. n := 100
  378. if testing.Short() {
  379. n = 5
  380. }
  381. rng := rand.New(rand.NewSource(0x1337))
  382. d := NewReader(nil)
  383. for i := 0; i < n; i++ {
  384. padding := (rng.Int() & 0xffff) + 1
  385. src := make([]byte, (rng.Int()&0xfffff)+1)
  386. for i := range src {
  387. src[i] = uint8(rng.Uint32()) & 3
  388. }
  389. var dst bytes.Buffer
  390. e := NewWriter(&dst, WriterPadding(padding))
  391. // Test the added padding is invisible.
  392. _, err := io.Copy(e, bytes.NewBuffer(src))
  393. if err != nil {
  394. t.Fatal(err)
  395. }
  396. err = e.Close()
  397. if err != nil {
  398. t.Fatal(err)
  399. }
  400. err = e.Close()
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. if dst.Len()%padding != 0 {
  405. t.Fatalf("wanted size to be mutiple of %d, got size %d with remainder %d", padding, dst.Len(), dst.Len()%padding)
  406. }
  407. var got bytes.Buffer
  408. d.Reset(&dst)
  409. _, err = io.Copy(&got, d)
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. if !bytes.Equal(src, got.Bytes()) {
  414. t.Fatal("output mismatch")
  415. }
  416. // Try after reset
  417. dst.Reset()
  418. e.Reset(&dst)
  419. _, err = io.Copy(e, bytes.NewBuffer(src))
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. err = e.Close()
  424. if err != nil {
  425. t.Fatal(err)
  426. }
  427. if dst.Len()%padding != 0 {
  428. t.Fatalf("wanted size to be mutiple of %d, got size %d with remainder %d", padding, dst.Len(), dst.Len()%padding)
  429. }
  430. got.Reset()
  431. d.Reset(&dst)
  432. _, err = io.Copy(&got, d)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. if !bytes.Equal(src, got.Bytes()) {
  437. t.Fatal("output mismatch after reset")
  438. }
  439. }
  440. }
  441. func TestBigRegularWrites(t *testing.T) {
  442. var buf [maxBlockSize * 2]byte
  443. dst := bytes.NewBuffer(nil)
  444. enc := NewWriter(dst, WriterBestCompression())
  445. max := uint8(10)
  446. if testing.Short() {
  447. max = 4
  448. }
  449. for n := uint8(0); n < max; n++ {
  450. for i := range buf[:] {
  451. buf[i] = n
  452. }
  453. // Writes may not keep a reference to the data beyond the Write call.
  454. _, err := enc.Write(buf[:])
  455. if err != nil {
  456. t.Fatal(err)
  457. }
  458. }
  459. err := enc.Close()
  460. if err != nil {
  461. t.Fatal(err)
  462. }
  463. dec := NewReader(dst)
  464. _, err = io.Copy(io.Discard, dec)
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. }
  469. func TestBigEncodeBuffer(t *testing.T) {
  470. const blockSize = 1 << 20
  471. var buf [blockSize * 2]byte
  472. dst := bytes.NewBuffer(nil)
  473. enc := NewWriter(dst, WriterBlockSize(blockSize), WriterBestCompression())
  474. max := uint8(10)
  475. if testing.Short() {
  476. max = 4
  477. }
  478. for n := uint8(0); n < max; n++ {
  479. // Change the buffer to a new value.
  480. for i := range buf[:] {
  481. buf[i] = n
  482. }
  483. err := enc.EncodeBuffer(buf[:])
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. // We can write it again since we aren't changing it.
  488. err = enc.EncodeBuffer(buf[:])
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. err = enc.Flush()
  493. if err != nil {
  494. t.Fatal(err)
  495. }
  496. }
  497. err := enc.Close()
  498. if err != nil {
  499. t.Fatal(err)
  500. }
  501. dec := NewReader(dst)
  502. n, err := io.Copy(io.Discard, dec)
  503. if err != nil {
  504. t.Fatal(err)
  505. }
  506. t.Log(n)
  507. }
  508. func TestBigEncodeBufferSync(t *testing.T) {
  509. const blockSize = 1 << 20
  510. var buf [blockSize * 2]byte
  511. dst := bytes.NewBuffer(nil)
  512. enc := NewWriter(dst, WriterBlockSize(blockSize), WriterConcurrency(1), WriterBestCompression())
  513. max := uint8(10)
  514. if testing.Short() {
  515. max = 2
  516. }
  517. for n := uint8(0); n < max; n++ {
  518. // Change the buffer to a new value.
  519. for i := range buf[:] {
  520. buf[i] = n
  521. }
  522. // When WriterConcurrency == 1 we can encode and reuse the buffer.
  523. err := enc.EncodeBuffer(buf[:])
  524. if err != nil {
  525. t.Fatal(err)
  526. }
  527. }
  528. err := enc.Close()
  529. if err != nil {
  530. t.Fatal(err)
  531. }
  532. dec := NewReader(dst)
  533. n, err := io.Copy(io.Discard, dec)
  534. if err != nil {
  535. t.Fatal(err)
  536. }
  537. t.Log(n)
  538. }
  539. func BenchmarkWriterRandom(b *testing.B) {
  540. rng := rand.New(rand.NewSource(1))
  541. // Make max window so we never get matches.
  542. data := make([]byte, 4<<20)
  543. for i := range data {
  544. data[i] = uint8(rng.Intn(256))
  545. }
  546. for name, opts := range testOptions(b) {
  547. w := NewWriter(io.Discard, opts...)
  548. b.Run(name, func(b *testing.B) {
  549. b.ResetTimer()
  550. b.ReportAllocs()
  551. b.SetBytes(int64(len(data)))
  552. for i := 0; i < b.N; i++ {
  553. err := w.EncodeBuffer(data)
  554. if err != nil {
  555. b.Fatal(err)
  556. }
  557. }
  558. // Flush output
  559. w.Flush()
  560. })
  561. w.Close()
  562. }
  563. }
  564. func BenchmarkIndexFind(b *testing.B) {
  565. fatalErr := func(t testing.TB, err error) {
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. }
  570. for blocks := 1; blocks <= 65536; blocks *= 2 {
  571. if blocks == 65536 {
  572. blocks = 65535
  573. }
  574. var index Index
  575. index.reset(100)
  576. index.TotalUncompressed = int64(blocks) * 100
  577. index.TotalCompressed = int64(blocks) * 100
  578. for i := 0; i < blocks; i++ {
  579. err := index.add(int64(i*100), int64(i*100))
  580. fatalErr(b, err)
  581. }
  582. rng := rand.New(rand.NewSource(0xabeefcafe))
  583. b.Run(fmt.Sprintf("blocks-%d", len(index.info)), func(b *testing.B) {
  584. b.ResetTimer()
  585. b.ReportAllocs()
  586. const prime4bytes = 2654435761
  587. rng2 := rng.Int63()
  588. for i := 0; i < b.N; i++ {
  589. rng2 = ((rng2 + prime4bytes) * prime4bytes) >> 32
  590. // Find offset:
  591. _, _, err := index.Find(rng2 % (int64(blocks) * 100))
  592. fatalErr(b, err)
  593. }
  594. })
  595. }
  596. }