commands.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  16. "github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
  17. )
  18. type ShellOptions struct {
  19. Masters *string
  20. GrpcDialOption grpc.DialOption
  21. // shell transient context
  22. FilerHost string
  23. FilerPort int64
  24. FilerGroup *string
  25. FilerAddress pb.ServerAddress
  26. Directory string
  27. }
  28. type CommandEnv struct {
  29. env map[string]string
  30. MasterClient *wdclient.MasterClient
  31. option *ShellOptions
  32. locker *exclusive_locks.ExclusiveLocker
  33. }
  34. func NewCommandEnv(options *ShellOptions) *CommandEnv {
  35. ce := &CommandEnv{
  36. env: make(map[string]string),
  37. MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()),
  38. option: options,
  39. }
  40. ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell")
  41. return ce
  42. }
  43. func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
  44. if strings.HasPrefix(input, "http") {
  45. err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
  46. return
  47. }
  48. if !strings.HasPrefix(input, "/") {
  49. input = util.Join(ce.option.Directory, input)
  50. }
  51. return input, err
  52. }
  53. func (ce *CommandEnv) isDirectory(path string) bool {
  54. return ce.checkDirectory(path) == nil
  55. }
  56. func (ce *CommandEnv) confirmIsLocked(args []string) error {
  57. if ce.locker.IsLocked() {
  58. return nil
  59. }
  60. ce.locker.SetMessage(fmt.Sprintf("%v", args))
  61. return fmt.Errorf("need to run \"lock\" first to continue")
  62. }
  63. func (ce *CommandEnv) isLocked() bool {
  64. if ce == nil {
  65. return true
  66. }
  67. return ce.locker.IsLocked()
  68. }
  69. func (ce *CommandEnv) checkDirectory(path string) error {
  70. dir, name := util.FullPath(path).DirAndName()
  71. exists, err := filer_pb.Exists(ce, dir, name, true)
  72. if !exists {
  73. return fmt.Errorf("%s is not a directory", path)
  74. }
  75. return err
  76. }
  77. var _ = filer_pb.FilerClient(&CommandEnv{})
  78. func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  79. return pb.WithGrpcFilerClient(streamingMode, 0, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
  80. }
  81. func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
  82. return location.Url
  83. }
  84. func (ce *CommandEnv) GetDataCenter() string {
  85. return ce.MasterClient.DataCenter
  86. }
  87. func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
  88. if strings.HasPrefix(entryPath, "http") {
  89. var u *url.URL
  90. u, err = url.Parse(entryPath)
  91. if err != nil {
  92. return
  93. }
  94. filerServer = u.Hostname()
  95. portString := u.Port()
  96. if portString != "" {
  97. filerPort, err = strconv.ParseInt(portString, 10, 32)
  98. }
  99. path = u.Path
  100. } else {
  101. err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
  102. }
  103. return
  104. }
  105. func findInputDirectory(args []string) (input string) {
  106. input = "."
  107. if len(args) > 0 {
  108. input = args[len(args)-1]
  109. if strings.HasPrefix(input, "-") {
  110. input = "."
  111. }
  112. }
  113. return input
  114. }
  115. func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) {
  116. err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption,
  117. func(client volume_server_pb.VolumeServerClient) error {
  118. if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
  119. VolumeId: volumeId,
  120. NeedleId: uint64(needleValue.Key),
  121. Offset: needleValue.Offset.ToActualOffset(),
  122. Size: int32(needleValue.Size),
  123. }); err != nil {
  124. return err
  125. }
  126. return nil
  127. },
  128. )
  129. return
  130. }
  131. func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
  132. err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
  133. func(client volume_server_pb.VolumeServerClient) error {
  134. if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
  135. VolumeId: volumeId,
  136. NeedleId: uint64(needleValue.Key),
  137. }); err != nil {
  138. return err
  139. }
  140. return nil
  141. },
  142. )
  143. return
  144. }
  145. func getCollectionName(commandEnv *CommandEnv, bucket string) string {
  146. if *commandEnv.option.FilerGroup != "" {
  147. return fmt.Sprintf("%s_%s", *commandEnv.option.FilerGroup, bucket)
  148. }
  149. return bucket
  150. }