filechunks.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package filer2
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "sort"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. )
  9. func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
  10. for _, c := range chunks {
  11. t := uint64(c.Offset + int64(c.Size))
  12. if size < t {
  13. size = t
  14. }
  15. }
  16. return
  17. }
  18. func ETag(chunks []*filer_pb.FileChunk) (etag string) {
  19. if len(chunks) == 1 {
  20. return chunks[0].ETag
  21. }
  22. h := fnv.New32a()
  23. for _, c := range chunks {
  24. h.Write([]byte(c.ETag))
  25. }
  26. return fmt.Sprintf("%x", h.Sum32())
  27. }
  28. func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
  29. visibles := NonOverlappingVisibleIntervals(chunks)
  30. fileIds := make(map[string]bool)
  31. for _, interval := range visibles {
  32. fileIds[interval.fileId] = true
  33. }
  34. for _, chunk := range chunks {
  35. if _, found := fileIds[chunk.GetFileIdString()]; found {
  36. compacted = append(compacted, chunk)
  37. } else {
  38. garbage = append(garbage, chunk)
  39. }
  40. }
  41. return
  42. }
  43. func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
  44. fileIds := make(map[string]bool)
  45. for _, interval := range bs {
  46. fileIds[interval.GetFileIdString()] = true
  47. }
  48. for _, chunk := range as {
  49. if _, found := fileIds[chunk.GetFileIdString()]; !found {
  50. delta = append(delta, chunk)
  51. }
  52. }
  53. return
  54. }
  55. type ChunkView struct {
  56. FileId string
  57. Offset int64
  58. Size uint64
  59. LogicOffset int64
  60. IsFullChunk bool
  61. }
  62. func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
  63. visibles := NonOverlappingVisibleIntervals(chunks)
  64. return ViewFromVisibleIntervals(visibles, offset, size)
  65. }
  66. func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) {
  67. stop := offset + int64(size)
  68. for _, chunk := range visibles {
  69. if chunk.start <= offset && offset < chunk.stop && offset < stop {
  70. isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
  71. views = append(views, &ChunkView{
  72. FileId: chunk.fileId,
  73. Offset: offset - chunk.start, // offset is the data starting location in this file id
  74. Size: uint64(min(chunk.stop, stop) - offset),
  75. LogicOffset: offset,
  76. IsFullChunk: isFullChunk,
  77. })
  78. offset = min(chunk.stop, stop)
  79. }
  80. }
  81. return views
  82. }
  83. func logPrintf(name string, visibles []VisibleInterval) {
  84. /*
  85. log.Printf("%s len %d", name, len(visibles))
  86. for _, v := range visibles {
  87. log.Printf("%s: => %+v", name, v)
  88. }
  89. */
  90. }
  91. var bufPool = sync.Pool{
  92. New: func() interface{} {
  93. return new(VisibleInterval)
  94. },
  95. }
  96. func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
  97. newV := newVisibleInterval(
  98. chunk.Offset,
  99. chunk.Offset+int64(chunk.Size),
  100. chunk.GetFileIdString(),
  101. chunk.Mtime,
  102. true,
  103. )
  104. length := len(visibles)
  105. if length == 0 {
  106. return append(visibles, newV)
  107. }
  108. last := visibles[length-1]
  109. if last.stop <= chunk.Offset {
  110. return append(visibles, newV)
  111. }
  112. logPrintf(" before", visibles)
  113. for _, v := range visibles {
  114. if v.start < chunk.Offset && chunk.Offset < v.stop {
  115. newVisibles = append(newVisibles, newVisibleInterval(
  116. v.start,
  117. chunk.Offset,
  118. v.fileId,
  119. v.modifiedTime,
  120. false,
  121. ))
  122. }
  123. chunkStop := chunk.Offset + int64(chunk.Size)
  124. if v.start < chunkStop && chunkStop < v.stop {
  125. newVisibles = append(newVisibles, newVisibleInterval(
  126. chunkStop,
  127. v.stop,
  128. v.fileId,
  129. v.modifiedTime,
  130. false,
  131. ))
  132. }
  133. if chunkStop <= v.start || v.stop <= chunk.Offset {
  134. newVisibles = append(newVisibles, v)
  135. }
  136. }
  137. newVisibles = append(newVisibles, newV)
  138. logPrintf(" append", newVisibles)
  139. for i := len(newVisibles) - 1; i >= 0; i-- {
  140. if i > 0 && newV.start < newVisibles[i-1].start {
  141. newVisibles[i] = newVisibles[i-1]
  142. } else {
  143. newVisibles[i] = newV
  144. break
  145. }
  146. }
  147. logPrintf(" sorted", newVisibles)
  148. return newVisibles
  149. }
  150. func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
  151. sort.Slice(chunks, func(i, j int) bool {
  152. return chunks[i].Mtime < chunks[j].Mtime
  153. })
  154. var newVisibles []VisibleInterval
  155. for _, chunk := range chunks {
  156. newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk)
  157. t := visibles[:0]
  158. visibles = newVisibles
  159. newVisibles = t
  160. logPrintf("add", visibles)
  161. }
  162. return
  163. }
  164. // find non-overlapping visible intervals
  165. // visible interval map to one file chunk
  166. type VisibleInterval struct {
  167. start int64
  168. stop int64
  169. modifiedTime int64
  170. fileId string
  171. isFullChunk bool
  172. }
  173. func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval {
  174. return VisibleInterval{
  175. start: start,
  176. stop: stop,
  177. fileId: fileId,
  178. modifiedTime: modifiedTime,
  179. isFullChunk: isFullChunk,
  180. }
  181. }
  182. func min(x, y int64) int64 {
  183. if x <= y {
  184. return x
  185. }
  186. return y
  187. }