needle_read_write.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. const (
  11. FlagGzip = 0x01
  12. FlagHasName = 0x02
  13. FlagHasMime = 0x04
  14. FlagHasLastModifiedDate = 0x08
  15. FlagHasTtl = 0x10
  16. FlagHasPairs = 0x20
  17. FlagIsChunkManifest = 0x80
  18. LastModifiedBytesLength = 5
  19. TtlBytesLength = 2
  20. )
  21. func (n *Needle) DiskSize() int64 {
  22. return getActualSize(n.Size)
  23. }
  24. func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) {
  25. if s, ok := w.(io.Seeker); ok {
  26. if end, e := s.Seek(0, 1); e == nil {
  27. defer func(s io.Seeker, off int64) {
  28. if err != nil {
  29. if _, e = s.Seek(off, 0); e != nil {
  30. glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
  31. }
  32. }
  33. }(s, end)
  34. } else {
  35. err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
  36. return
  37. }
  38. }
  39. switch version {
  40. case Version1:
  41. header := make([]byte, NeedleHeaderSize)
  42. util.Uint32toBytes(header[0:4], n.Cookie)
  43. util.Uint64toBytes(header[4:12], n.Id)
  44. n.Size = uint32(len(n.Data))
  45. size = n.Size
  46. util.Uint32toBytes(header[12:16], n.Size)
  47. if _, err = w.Write(header); err != nil {
  48. return
  49. }
  50. if _, err = w.Write(n.Data); err != nil {
  51. return
  52. }
  53. actualSize = NeedleHeaderSize + int64(n.Size)
  54. padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
  55. util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
  56. _, err = w.Write(header[0 : NeedleChecksumSize+padding])
  57. return
  58. case Version2:
  59. header := make([]byte, NeedleHeaderSize)
  60. util.Uint32toBytes(header[0:4], n.Cookie)
  61. util.Uint64toBytes(header[4:12], n.Id)
  62. n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
  63. if n.DataSize > 0 {
  64. n.Size = 4 + n.DataSize + 1
  65. if n.HasName() {
  66. n.Size = n.Size + 1 + uint32(n.NameSize)
  67. }
  68. if n.HasMime() {
  69. n.Size = n.Size + 1 + uint32(n.MimeSize)
  70. }
  71. if n.HasLastModifiedDate() {
  72. n.Size = n.Size + LastModifiedBytesLength
  73. }
  74. if n.HasTtl() {
  75. n.Size = n.Size + TtlBytesLength
  76. }
  77. if n.HasPairs() {
  78. n.Size += 2 + uint32(n.PairsSize)
  79. }
  80. } else {
  81. n.Size = 0
  82. }
  83. size = n.DataSize
  84. util.Uint32toBytes(header[12:16], n.Size)
  85. if _, err = w.Write(header); err != nil {
  86. return
  87. }
  88. if n.DataSize > 0 {
  89. util.Uint32toBytes(header[0:4], n.DataSize)
  90. if _, err = w.Write(header[0:4]); err != nil {
  91. return
  92. }
  93. if _, err = w.Write(n.Data); err != nil {
  94. return
  95. }
  96. util.Uint8toBytes(header[0:1], n.Flags)
  97. if _, err = w.Write(header[0:1]); err != nil {
  98. return
  99. }
  100. if n.HasName() {
  101. util.Uint8toBytes(header[0:1], n.NameSize)
  102. if _, err = w.Write(header[0:1]); err != nil {
  103. return
  104. }
  105. if _, err = w.Write(n.Name); err != nil {
  106. return
  107. }
  108. }
  109. if n.HasMime() {
  110. util.Uint8toBytes(header[0:1], n.MimeSize)
  111. if _, err = w.Write(header[0:1]); err != nil {
  112. return
  113. }
  114. if _, err = w.Write(n.Mime); err != nil {
  115. return
  116. }
  117. }
  118. if n.HasLastModifiedDate() {
  119. util.Uint64toBytes(header[0:8], n.LastModified)
  120. if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
  121. return
  122. }
  123. }
  124. if n.HasTtl() && n.Ttl != nil {
  125. n.Ttl.ToBytes(header[0:TtlBytesLength])
  126. if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
  127. return
  128. }
  129. }
  130. if n.HasPairs() {
  131. util.Uint16toBytes(header[0:2], n.PairsSize)
  132. if _, err = w.Write(header[0:2]); err != nil {
  133. return
  134. }
  135. if _, err = w.Write(n.Pairs); err != nil {
  136. return
  137. }
  138. }
  139. }
  140. padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
  141. util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
  142. _, err = w.Write(header[0 : NeedleChecksumSize+padding])
  143. return n.DataSize, getActualSize(n.Size), err
  144. }
  145. return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
  146. }
  147. func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) {
  148. return getBytesForFileBlock(r, offset, int(getActualSize(size)))
  149. }
  150. func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
  151. bytes, err := ReadNeedleBlob(r, offset, size)
  152. if err != nil {
  153. return err
  154. }
  155. n.ParseNeedleHeader(bytes)
  156. if n.Size != size {
  157. return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
  158. }
  159. switch version {
  160. case Version1:
  161. n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
  162. case Version2:
  163. n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
  164. }
  165. if size == 0 {
  166. return nil
  167. }
  168. checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
  169. newChecksum := NewCRC(n.Data)
  170. if checksum != newChecksum.Value() {
  171. return errors.New("CRC error! Data On Disk Corrupted")
  172. }
  173. n.Checksum = newChecksum
  174. return nil
  175. }
  176. func (n *Needle) ParseNeedleHeader(bytes []byte) {
  177. n.Cookie = util.BytesToUint32(bytes[0:4])
  178. n.Id = util.BytesToUint64(bytes[4:12])
  179. n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
  180. }
  181. func (n *Needle) readNeedleDataVersion2(bytes []byte) {
  182. index, lenBytes := 0, len(bytes)
  183. if index < lenBytes {
  184. n.DataSize = util.BytesToUint32(bytes[index : index+4])
  185. index = index + 4
  186. if int(n.DataSize)+index > lenBytes {
  187. // this if clause is due to bug #87 and #93, fixed in v0.69
  188. // remove this clause later
  189. return
  190. }
  191. n.Data = bytes[index : index+int(n.DataSize)]
  192. index = index + int(n.DataSize)
  193. n.Flags = bytes[index]
  194. index = index + 1
  195. }
  196. if index < lenBytes && n.HasName() {
  197. n.NameSize = uint8(bytes[index])
  198. index = index + 1
  199. n.Name = bytes[index : index+int(n.NameSize)]
  200. index = index + int(n.NameSize)
  201. }
  202. if index < lenBytes && n.HasMime() {
  203. n.MimeSize = uint8(bytes[index])
  204. index = index + 1
  205. n.Mime = bytes[index : index+int(n.MimeSize)]
  206. index = index + int(n.MimeSize)
  207. }
  208. if index < lenBytes && n.HasLastModifiedDate() {
  209. n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
  210. index = index + LastModifiedBytesLength
  211. }
  212. if index < lenBytes && n.HasTtl() {
  213. n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
  214. index = index + TtlBytesLength
  215. }
  216. if index < lenBytes && n.HasPairs() {
  217. n.PairsSize = util.BytesToUint16(bytes[index : index+2])
  218. index += 2
  219. end := index + int(n.PairsSize)
  220. n.Pairs = bytes[index:end]
  221. index = end
  222. }
  223. }
  224. func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
  225. n = new(Needle)
  226. if version == Version1 || version == Version2 {
  227. bytes := make([]byte, NeedleHeaderSize)
  228. var count int
  229. count, err = r.ReadAt(bytes, offset)
  230. if count <= 0 || err != nil {
  231. return nil, 0, err
  232. }
  233. n.ParseNeedleHeader(bytes)
  234. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  235. bodyLength = n.Size + NeedleChecksumSize + padding
  236. }
  237. return
  238. }
  239. //n should be a needle already read the header
  240. //the input stream will read until next file entry
  241. func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
  242. if bodyLength <= 0 {
  243. return nil
  244. }
  245. switch version {
  246. case Version1:
  247. bytes := make([]byte, bodyLength)
  248. if _, err = r.ReadAt(bytes, offset); err != nil {
  249. return
  250. }
  251. n.Data = bytes[:n.Size]
  252. n.Checksum = NewCRC(n.Data)
  253. case Version2:
  254. bytes := make([]byte, bodyLength)
  255. if _, err = r.ReadAt(bytes, offset); err != nil {
  256. return
  257. }
  258. n.readNeedleDataVersion2(bytes[0:n.Size])
  259. n.Checksum = NewCRC(n.Data)
  260. default:
  261. err = fmt.Errorf("Unsupported Version! (%d)", version)
  262. }
  263. return
  264. }
  265. func (n *Needle) IsGzipped() bool {
  266. return n.Flags&FlagGzip > 0
  267. }
  268. func (n *Needle) SetGzipped() {
  269. n.Flags = n.Flags | FlagGzip
  270. }
  271. func (n *Needle) HasName() bool {
  272. return n.Flags&FlagHasName > 0
  273. }
  274. func (n *Needle) SetHasName() {
  275. n.Flags = n.Flags | FlagHasName
  276. }
  277. func (n *Needle) HasMime() bool {
  278. return n.Flags&FlagHasMime > 0
  279. }
  280. func (n *Needle) SetHasMime() {
  281. n.Flags = n.Flags | FlagHasMime
  282. }
  283. func (n *Needle) HasLastModifiedDate() bool {
  284. return n.Flags&FlagHasLastModifiedDate > 0
  285. }
  286. func (n *Needle) SetHasLastModifiedDate() {
  287. n.Flags = n.Flags | FlagHasLastModifiedDate
  288. }
  289. func (n *Needle) HasTtl() bool {
  290. return n.Flags&FlagHasTtl > 0
  291. }
  292. func (n *Needle) SetHasTtl() {
  293. n.Flags = n.Flags | FlagHasTtl
  294. }
  295. func (n *Needle) IsChunkedManifest() bool {
  296. return n.Flags&FlagIsChunkManifest > 0
  297. }
  298. func (n *Needle) SetIsChunkManifest() {
  299. n.Flags = n.Flags | FlagIsChunkManifest
  300. }
  301. func (n *Needle) HasPairs() bool {
  302. return n.Flags&FlagHasPairs != 0
  303. }
  304. func (n *Needle) SetHasPairs() {
  305. n.Flags = n.Flags | FlagHasPairs
  306. }