filechunks.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  6. "math"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
  11. for _, c := range chunks {
  12. t := uint64(c.Offset + int64(c.Size))
  13. if size < t {
  14. size = t
  15. }
  16. }
  17. return
  18. }
  19. func FileSize(entry *filer_pb.Entry) (size uint64) {
  20. if entry == nil || entry.Attributes == nil {
  21. return 0
  22. }
  23. fileSize := entry.Attributes.FileSize
  24. if entry.RemoteEntry != nil {
  25. if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime {
  26. fileSize = maxUint64(fileSize, uint64(entry.RemoteEntry.RemoteSize))
  27. }
  28. }
  29. return maxUint64(TotalSize(entry.GetChunks()), fileSize)
  30. }
  31. func ETag(entry *filer_pb.Entry) (etag string) {
  32. if entry.Attributes == nil || entry.Attributes.Md5 == nil {
  33. return ETagChunks(entry.GetChunks())
  34. }
  35. return fmt.Sprintf("%x", entry.Attributes.Md5)
  36. }
  37. func ETagEntry(entry *Entry) (etag string) {
  38. if entry.IsInRemoteOnly() {
  39. return entry.Remote.RemoteETag
  40. }
  41. if entry.Attr.Md5 == nil {
  42. return ETagChunks(entry.GetChunks())
  43. }
  44. return fmt.Sprintf("%x", entry.Attr.Md5)
  45. }
  46. func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
  47. if len(chunks) == 1 {
  48. return fmt.Sprintf("%x", util.Base64Md5ToBytes(chunks[0].ETag))
  49. }
  50. var md5Digests [][]byte
  51. for _, c := range chunks {
  52. md5Digests = append(md5Digests, util.Base64Md5ToBytes(c.ETag))
  53. }
  54. return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks))
  55. }
  56. func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
  57. visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
  58. compacted, garbage = SeparateGarbageChunks(visibles, chunks)
  59. return
  60. }
  61. func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk) {
  62. fileIds := make(map[string]bool)
  63. for x := visibles.Front(); x != nil; x = x.Next {
  64. interval := x.Value
  65. fileIds[interval.fileId] = true
  66. }
  67. for _, chunk := range chunks {
  68. if _, found := fileIds[chunk.GetFileIdString()]; found {
  69. compacted = append(compacted, chunk)
  70. } else {
  71. garbage = append(garbage, chunk)
  72. }
  73. }
  74. return compacted, garbage
  75. }
  76. func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{}) {
  77. garbageFileIds = make(map[string]struct{})
  78. for x := visibles.Front(); x != nil; x = x.Next {
  79. interval := x.Value
  80. offset := interval.start - interval.offsetInChunk
  81. if start <= offset && offset+int64(interval.chunkSize) <= stop {
  82. garbageFileIds[interval.fileId] = struct{}{}
  83. }
  84. }
  85. return
  86. }
  87. func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
  88. aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64)
  89. if aErr != nil {
  90. return nil, aErr
  91. }
  92. bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64)
  93. if bErr != nil {
  94. return nil, bErr
  95. }
  96. delta = append(delta, DoMinusChunks(aData, bData)...)
  97. delta = append(delta, DoMinusChunks(aMeta, bMeta)...)
  98. return
  99. }
  100. func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
  101. fileIds := make(map[string]bool)
  102. for _, interval := range bs {
  103. fileIds[interval.GetFileIdString()] = true
  104. }
  105. for _, chunk := range as {
  106. if _, found := fileIds[chunk.GetFileIdString()]; !found {
  107. delta = append(delta, chunk)
  108. }
  109. }
  110. return
  111. }
  112. func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
  113. fileIds := make(map[string]bool)
  114. for _, interval := range bs {
  115. fileIds[interval.GetFileIdString()] = true
  116. fileIds[interval.GetSourceFileId()] = true
  117. }
  118. for _, chunk := range as {
  119. _, sourceFileIdFound := fileIds[chunk.GetSourceFileId()]
  120. _, fileIdFound := fileIds[chunk.GetFileId()]
  121. if !sourceFileIdFound && !fileIdFound {
  122. delta = append(delta, chunk)
  123. }
  124. }
  125. return
  126. }
  127. type ChunkView struct {
  128. FileId string
  129. OffsetInChunk int64 // offset within the chunk
  130. ViewSize uint64
  131. ViewOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
  132. ChunkSize uint64
  133. CipherKey []byte
  134. IsGzipped bool
  135. ModifiedTsNs int64
  136. }
  137. func (cv *ChunkView) SetStartStop(start, stop int64) {
  138. cv.OffsetInChunk += start - cv.ViewOffset
  139. cv.ViewOffset = start
  140. cv.ViewSize = uint64(stop - start)
  141. }
  142. func (cv *ChunkView) Clone() IntervalValue {
  143. return &ChunkView{
  144. FileId: cv.FileId,
  145. OffsetInChunk: cv.OffsetInChunk,
  146. ViewSize: cv.ViewSize,
  147. ViewOffset: cv.ViewOffset,
  148. ChunkSize: cv.ChunkSize,
  149. CipherKey: cv.CipherKey,
  150. IsGzipped: cv.IsGzipped,
  151. ModifiedTsNs: cv.ModifiedTsNs,
  152. }
  153. }
  154. func (cv *ChunkView) IsFullChunk() bool {
  155. return cv.ViewSize == cv.ChunkSize
  156. }
  157. func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
  158. visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
  159. return ViewFromVisibleIntervals(visibles, offset, size)
  160. }
  161. func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
  162. stop := offset + size
  163. if size == math.MaxInt64 {
  164. stop = math.MaxInt64
  165. }
  166. if stop < offset {
  167. stop = math.MaxInt64
  168. }
  169. chunkViews = NewIntervalList[*ChunkView]()
  170. for x := visibles.Front(); x != nil; x = x.Next {
  171. chunk := x.Value
  172. chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
  173. if chunkStart < chunkStop {
  174. chunkView := &ChunkView{
  175. FileId: chunk.fileId,
  176. OffsetInChunk: chunkStart - chunk.start + chunk.offsetInChunk,
  177. ViewSize: uint64(chunkStop - chunkStart),
  178. ViewOffset: chunkStart,
  179. ChunkSize: chunk.chunkSize,
  180. CipherKey: chunk.cipherKey,
  181. IsGzipped: chunk.isGzipped,
  182. ModifiedTsNs: chunk.modifiedTsNs,
  183. }
  184. chunkViews.AppendInterval(&Interval[*ChunkView]{
  185. StartOffset: chunkStart,
  186. StopOffset: chunkStop,
  187. TsNs: chunk.modifiedTsNs,
  188. Value: chunkView,
  189. Prev: nil,
  190. Next: nil,
  191. })
  192. }
  193. }
  194. return chunkViews
  195. }
  196. func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk) {
  197. newV := &VisibleInterval{
  198. start: start,
  199. stop: stop,
  200. fileId: chunk.GetFileIdString(),
  201. modifiedTsNs: chunk.ModifiedTsNs,
  202. offsetInChunk: start - chunk.Offset, // the starting position in the chunk
  203. chunkSize: chunk.Size, // size of the chunk
  204. cipherKey: chunk.CipherKey,
  205. isGzipped: chunk.IsCompressed,
  206. }
  207. visibles.InsertInterval(start, stop, chunk.ModifiedTsNs, newV)
  208. }
  209. func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, chunk *filer_pb.FileChunk) {
  210. chunkView := &ChunkView{
  211. FileId: chunk.GetFileIdString(),
  212. OffsetInChunk: start - chunk.Offset,
  213. ViewSize: uint64(stop - start),
  214. ViewOffset: start,
  215. ChunkSize: chunk.Size,
  216. CipherKey: chunk.CipherKey,
  217. IsGzipped: chunk.IsCompressed,
  218. ModifiedTsNs: chunk.ModifiedTsNs,
  219. }
  220. chunkViews.InsertInterval(start, stop, chunk.ModifiedTsNs, chunkView)
  221. }
  222. // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
  223. // If the file chunk content is a chunk manifest
  224. func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
  225. chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
  226. if err != nil {
  227. return
  228. }
  229. visibles2 := readResolvedChunks(chunks, 0, math.MaxInt64)
  230. return visibles2, err
  231. }
  232. // find non-overlapping visible intervals
  233. // visible interval map to one file chunk
  234. type VisibleInterval struct {
  235. start int64
  236. stop int64
  237. modifiedTsNs int64
  238. fileId string
  239. offsetInChunk int64
  240. chunkSize uint64
  241. cipherKey []byte
  242. isGzipped bool
  243. }
  244. func (v *VisibleInterval) SetStartStop(start, stop int64) {
  245. v.offsetInChunk += start - v.start
  246. v.start, v.stop = start, stop
  247. }
  248. func (v *VisibleInterval) Clone() IntervalValue {
  249. return &VisibleInterval{
  250. start: v.start,
  251. stop: v.stop,
  252. modifiedTsNs: v.modifiedTsNs,
  253. fileId: v.fileId,
  254. offsetInChunk: v.offsetInChunk,
  255. chunkSize: v.chunkSize,
  256. cipherKey: v.cipherKey,
  257. isGzipped: v.isGzipped,
  258. }
  259. }
  260. func min(x, y int64) int64 {
  261. if x <= y {
  262. return x
  263. }
  264. return y
  265. }
  266. func max(x, y int64) int64 {
  267. if x <= y {
  268. return y
  269. }
  270. return x
  271. }