load_test_leveldb.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package load_test_leveldb
  2. import (
  3. "crypto/md5"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "math/rand"
  9. "os"
  10. "sync"
  11. "time"
  12. "github.com/syndtr/goleveldb/leveldb"
  13. "github.com/syndtr/goleveldb/leveldb/opt"
  14. )
  15. var (
  16. dir = flag.String("dir", "./t", "directory to store level db files")
  17. useHash = flag.Bool("isHash", false, "hash the path as the key")
  18. dbCount = flag.Int("dbCount", 1, "the number of leveldb")
  19. )
  20. func main() {
  21. flag.Parse()
  22. totalTenants := 300
  23. totalYears := 3
  24. opts := &opt.Options{
  25. BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
  26. WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
  27. CompactionTableSizeMultiplier: 4,
  28. }
  29. var dbs []*leveldb.DB
  30. var chans []chan string
  31. for d := 0; d < *dbCount; d++ {
  32. dbFolder := fmt.Sprintf("%s/%02d", *dir, d)
  33. os.MkdirAll(dbFolder, 0755)
  34. db, err := leveldb.OpenFile(dbFolder, opts)
  35. if err != nil {
  36. log.Printf("filer store open dir %s: %v", *dir, err)
  37. return
  38. }
  39. dbs = append(dbs, db)
  40. chans = append(chans, make(chan string, 1024))
  41. }
  42. var wg sync.WaitGroup
  43. for d := 0; d < *dbCount; d++ {
  44. wg.Add(1)
  45. go func(d int) {
  46. defer wg.Done()
  47. ch := chans[d]
  48. db := dbs[d]
  49. for p := range ch {
  50. if *useHash {
  51. insertAsHash(db, p)
  52. } else {
  53. insertAsFullPath(db, p)
  54. }
  55. }
  56. }(d)
  57. }
  58. counter := int64(0)
  59. lastResetTime := time.Now()
  60. r := rand.New(rand.NewSource(35))
  61. for y := 0; y < totalYears; y++ {
  62. for m := 0; m < 12; m++ {
  63. for d := 0; d < 31; d++ {
  64. for h := 0; h < 24; h++ {
  65. for min := 0; min < 60; min++ {
  66. for i := 0; i < totalTenants; i++ {
  67. p := fmt.Sprintf("tenent%03d/%4d/%02d/%02d/%02d/%02d", i, 2015+y, 1+m, 1+d, h, min)
  68. x := r.Intn(*dbCount)
  69. chans[x] <- p
  70. counter++
  71. }
  72. t := time.Now()
  73. if lastResetTime.Add(time.Second).Before(t) {
  74. p := fmt.Sprintf("%4d/%02d/%02d/%02d/%02d", 2015+y, 1+m, 1+d, h, min)
  75. fmt.Printf("%s = %4d put/sec\n", p, counter)
  76. counter = 0
  77. lastResetTime = t
  78. }
  79. }
  80. }
  81. }
  82. }
  83. }
  84. for d := 0; d < *dbCount; d++ {
  85. close(chans[d])
  86. }
  87. wg.Wait()
  88. }
  89. func insertAsFullPath(db *leveldb.DB, p string) {
  90. _, getErr := db.Get([]byte(p), nil)
  91. if getErr == leveldb.ErrNotFound {
  92. putErr := db.Put([]byte(p), []byte(p), nil)
  93. if putErr != nil {
  94. log.Printf("failed to put %s", p)
  95. }
  96. }
  97. }
  98. func insertAsHash(db *leveldb.DB, p string) {
  99. key := fmt.Sprintf("%d:%s", hashToLong(p), p)
  100. _, getErr := db.Get([]byte(key), nil)
  101. if getErr == leveldb.ErrNotFound {
  102. putErr := db.Put([]byte(key), []byte(p), nil)
  103. if putErr != nil {
  104. log.Printf("failed to put %s", p)
  105. }
  106. }
  107. }
  108. func hashToLong(dir string) (v int64) {
  109. h := md5.New()
  110. io.WriteString(h, dir)
  111. b := h.Sum(nil)
  112. v += int64(b[0])
  113. v <<= 8
  114. v += int64(b[1])
  115. v <<= 8
  116. v += int64(b[2])
  117. v <<= 8
  118. v += int64(b[3])
  119. v <<= 8
  120. v += int64(b[4])
  121. v <<= 8
  122. v += int64(b[5])
  123. v <<= 8
  124. v += int64(b[6])
  125. v <<= 8
  126. v += int64(b[7])
  127. return
  128. }