hdfs_storage_client.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package hdfs
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  7. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "github.com/colinmarc/hdfs/v2"
  10. "io"
  11. "os"
  12. "path"
  13. )
  14. func init() {
  15. remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker)
  16. }
  17. type hdfsRemoteStorageMaker struct{}
  18. func (s hdfsRemoteStorageMaker) HasBucket() bool {
  19. return false
  20. }
  21. func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  22. client := &hdfsRemoteStorageClient{
  23. conf: conf,
  24. }
  25. options := hdfs.ClientOptions{
  26. Addresses: conf.HdfsNamenodes,
  27. UseDatanodeHostname: false,
  28. }
  29. if conf.HdfsServicePrincipalName != "" {
  30. var err error
  31. options.KerberosClient, err = getKerberosClient()
  32. if err != nil {
  33. return nil, fmt.Errorf("get kerberos authentication: %s", err)
  34. }
  35. options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName
  36. if conf.HdfsDataTransferProtection != "" {
  37. options.DataTransferProtection = conf.HdfsDataTransferProtection
  38. }
  39. } else {
  40. options.User = conf.HdfsUsername
  41. }
  42. c, err := hdfs.NewClient(options)
  43. if err != nil {
  44. return nil, err
  45. }
  46. client.client = c
  47. return client, nil
  48. }
  49. type hdfsRemoteStorageClient struct {
  50. conf *remote_pb.RemoteConf
  51. client *hdfs.Client
  52. }
  53. var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
  54. func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  55. return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
  56. children, err := c.client.ReadDir(string(parentDir))
  57. if err != nil {
  58. return err
  59. }
  60. for _, child := range children {
  61. if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{
  62. StorageName: c.conf.Name,
  63. LastLocalSyncTsNs: 0,
  64. RemoteETag: "",
  65. RemoteMtime: child.ModTime().Unix(),
  66. RemoteSize: child.Size(),
  67. }); err != nil {
  68. return nil
  69. }
  70. }
  71. return nil
  72. }, util.FullPath(loc.Path), visitFn)
  73. }
  74. func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
  75. f, err := c.client.Open(loc.Path)
  76. if err != nil {
  77. return
  78. }
  79. defer f.Close()
  80. data = make([]byte, size)
  81. _, err = f.ReadAt(data, offset)
  82. return
  83. }
  84. func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
  85. return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode))
  86. }
  87. func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
  88. return c.client.RemoveAll(loc.Path)
  89. }
  90. func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
  91. dirname := path.Dir(loc.Path)
  92. // ensure parent directory
  93. if err = c.client.MkdirAll(dirname, 0755); err != nil {
  94. return
  95. }
  96. // remove existing file
  97. info, err := c.client.Stat(loc.Path)
  98. if err == nil {
  99. err = c.client.Remove(loc.Path)
  100. if err != nil {
  101. return
  102. }
  103. }
  104. // create new file
  105. out, err := c.client.Create(loc.Path)
  106. if err != nil {
  107. return
  108. }
  109. cleanup := func() {
  110. if removeErr := c.client.Remove(loc.Path); removeErr != nil {
  111. glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr)
  112. }
  113. }
  114. if _, err = io.Copy(out, reader); err != nil {
  115. cleanup()
  116. return
  117. }
  118. if err = out.Close(); err != nil {
  119. cleanup()
  120. return
  121. }
  122. info, err = c.client.Stat(loc.Path)
  123. if err != nil {
  124. return
  125. }
  126. return &filer_pb.RemoteEntry{
  127. RemoteMtime: info.ModTime().Unix(),
  128. RemoteSize: info.Size(),
  129. RemoteETag: "",
  130. StorageName: c.conf.Name,
  131. }, nil
  132. }
  133. func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
  134. if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode {
  135. if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil {
  136. return err
  137. }
  138. }
  139. return nil
  140. }
  141. func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
  142. if err = c.client.Remove(loc.Path); err != nil {
  143. return fmt.Errorf("hdfs delete %s: %v", loc.Path, err)
  144. }
  145. return
  146. }
  147. func (c *hdfsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
  148. return
  149. }
  150. func (c *hdfsRemoteStorageClient) CreateBucket(name string) (err error) {
  151. return
  152. }
  153. func (c *hdfsRemoteStorageClient) DeleteBucket(name string) (err error) {
  154. return
  155. }