123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package hdfs
- import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
- "github.com/chrislusf/seaweedfs/weed/remote_storage"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/colinmarc/hdfs/v2"
- "io"
- "os"
- "path"
- )
- func init() {
- remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker)
- }
- type hdfsRemoteStorageMaker struct{}
- func (s hdfsRemoteStorageMaker) HasBucket() bool {
- return false
- }
- func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
- client := &hdfsRemoteStorageClient{
- conf: conf,
- }
- options := hdfs.ClientOptions{
- Addresses: conf.HdfsNamenodes,
- UseDatanodeHostname: false,
- }
- if conf.HdfsServicePrincipalName != "" {
- var err error
- options.KerberosClient, err = getKerberosClient()
- if err != nil {
- return nil, fmt.Errorf("get kerberos authentication: %s", err)
- }
- options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName
- if conf.HdfsDataTransferProtection != "" {
- options.DataTransferProtection = conf.HdfsDataTransferProtection
- }
- } else {
- options.User = conf.HdfsUsername
- }
- c, err := hdfs.NewClient(options)
- if err != nil {
- return nil, err
- }
- client.client = c
- return client, nil
- }
- type hdfsRemoteStorageClient struct {
- conf *remote_pb.RemoteConf
- client *hdfs.Client
- }
- var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
- func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
- return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
- children, err := c.client.ReadDir(string(parentDir))
- if err != nil {
- return err
- }
- for _, child := range children {
- if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{
- StorageName: c.conf.Name,
- LastLocalSyncTsNs: 0,
- RemoteETag: "",
- RemoteMtime: child.ModTime().Unix(),
- RemoteSize: child.Size(),
- }); err != nil {
- return nil
- }
- }
- return nil
- }, util.FullPath(loc.Path), visitFn)
- }
- func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
- f, err := c.client.Open(loc.Path)
- if err != nil {
- return
- }
- defer f.Close()
- data = make([]byte, size)
- _, err = f.ReadAt(data, offset)
- return
- }
- func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
- return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode))
- }
- func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
- return c.client.RemoveAll(loc.Path)
- }
- func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
- dirname := path.Dir(loc.Path)
- // ensure parent directory
- if err = c.client.MkdirAll(dirname, 0755); err != nil {
- return
- }
- // remove existing file
- info, err := c.client.Stat(loc.Path)
- if err == nil {
- err = c.client.Remove(loc.Path)
- if err != nil {
- return
- }
- }
- // create new file
- out, err := c.client.Create(loc.Path)
- if err != nil {
- return
- }
- cleanup := func() {
- if removeErr := c.client.Remove(loc.Path); removeErr != nil {
- glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr)
- }
- }
- if _, err = io.Copy(out, reader); err != nil {
- cleanup()
- return
- }
- if err = out.Close(); err != nil {
- cleanup()
- return
- }
- info, err = c.client.Stat(loc.Path)
- if err != nil {
- return
- }
- return &filer_pb.RemoteEntry{
- RemoteMtime: info.ModTime().Unix(),
- RemoteSize: info.Size(),
- RemoteETag: "",
- StorageName: c.conf.Name,
- }, nil
- }
- func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
- if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode {
- if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil {
- return err
- }
- }
- return nil
- }
- func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
- if err = c.client.Remove(loc.Path); err != nil {
- return fmt.Errorf("hdfs delete %s: %v", loc.Path, err)
- }
- return
- }
- func (c *hdfsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
- return
- }
- func (c *hdfsRemoteStorageClient) CreateBucket(name string) (err error) {
- return
- }
- func (c *hdfsRemoteStorageClient) DeleteBucket(name string) (err error) {
- return
- }
|