diff_volume_servers.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "math"
  10. "os"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/weed/util/log"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/chrislusf/seaweedfs/weed/security"
  16. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  17. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  18. "github.com/chrislusf/seaweedfs/weed/storage/types"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "google.golang.org/grpc"
  21. )
  22. var (
  23. serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
  24. volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
  25. volumeCollection = flag.String("collection", "", "the volume collection name")
  26. grpcDialOption grpc.DialOption
  27. )
  28. /*
  29. Diff the volume's files across multiple volume servers.
  30. diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
  31. Example Output:
  32. reference 127.0.0.1:8081
  33. fileId volumeServer message
  34. 5,01617c3f61 127.0.0.1:8080 wrongSize
  35. */
  36. func main() {
  37. flag.Parse()
  38. util.LoadConfiguration("security", false)
  39. grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  40. vid := uint32(*volumeId)
  41. servers := strings.Split(*serversStr, ",")
  42. if len(servers) < 2 {
  43. log.Fatalf("You must specify more than 1 server\n")
  44. }
  45. var referenceServer string
  46. var maxOffset int64
  47. allFiles := map[string]map[types.NeedleId]needleState{}
  48. for _, addr := range servers {
  49. files, offset, err := getVolumeFiles(vid, addr)
  50. if err != nil {
  51. log.Fatalf("Failed to copy idx from volume server %s\n", err)
  52. }
  53. allFiles[addr] = files
  54. if offset > maxOffset {
  55. referenceServer = addr
  56. }
  57. }
  58. same := true
  59. fmt.Println("reference", referenceServer)
  60. fmt.Println("fileId volumeServer message")
  61. for nid, n := range allFiles[referenceServer] {
  62. for addr, files := range allFiles {
  63. if addr == referenceServer {
  64. continue
  65. }
  66. var diffMsg string
  67. n2, ok := files[nid]
  68. if !ok {
  69. if n.state == stateDeleted {
  70. continue
  71. }
  72. diffMsg = "missing"
  73. } else if n2.state != n.state {
  74. switch n.state {
  75. case stateDeleted:
  76. diffMsg = "notDeleted"
  77. case statePresent:
  78. diffMsg = "deleted"
  79. }
  80. } else if n2.size != n.size {
  81. diffMsg = "wrongSize"
  82. } else {
  83. continue
  84. }
  85. same = false
  86. // fetch the needle details
  87. var id string
  88. var err error
  89. if n.state == statePresent {
  90. id, err = getNeedleFileId(vid, nid, referenceServer)
  91. } else {
  92. id, err = getNeedleFileId(vid, nid, addr)
  93. }
  94. if err != nil {
  95. log.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
  96. }
  97. fmt.Println(id, addr, diffMsg)
  98. }
  99. }
  100. if !same {
  101. os.Exit(1)
  102. }
  103. }
  104. const (
  105. stateDeleted uint8 = 1
  106. statePresent uint8 = 2
  107. )
  108. type needleState struct {
  109. state uint8
  110. size types.Size
  111. }
  112. func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) {
  113. var idxFile *bytes.Reader
  114. err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  115. ctx, cancel := context.WithCancel(context.Background())
  116. defer cancel()
  117. copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  118. VolumeId: v,
  119. Ext: ".idx",
  120. CompactionRevision: math.MaxUint32,
  121. StopOffset: math.MaxInt64,
  122. Collection: *volumeCollection,
  123. })
  124. if err != nil {
  125. return err
  126. }
  127. var buf bytes.Buffer
  128. for {
  129. resp, err := copyFileClient.Recv()
  130. if errors.Is(err, io.EOF) {
  131. break
  132. }
  133. if err != nil {
  134. return err
  135. }
  136. buf.Write(resp.FileContent)
  137. }
  138. idxFile = bytes.NewReader(buf.Bytes())
  139. return nil
  140. })
  141. if err != nil {
  142. return nil, 0, err
  143. }
  144. var maxOffset int64
  145. files := map[types.NeedleId]needleState{}
  146. err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
  147. if offset.IsZero() || size.IsDeleted() {
  148. files[key] = needleState{
  149. state: stateDeleted,
  150. size: size,
  151. }
  152. } else {
  153. files[key] = needleState{
  154. state: statePresent,
  155. size: size,
  156. }
  157. }
  158. if actual := offset.ToAcutalOffset(); actual > maxOffset {
  159. maxOffset = actual
  160. }
  161. return nil
  162. })
  163. if err != nil {
  164. return nil, 0, err
  165. }
  166. return files, maxOffset, nil
  167. }
  168. func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) {
  169. var id string
  170. err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  171. resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
  172. VolumeId: v,
  173. NeedleId: uint64(nid),
  174. })
  175. if err != nil {
  176. return err
  177. }
  178. id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
  179. return nil
  180. })
  181. return id, err
  182. }