123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- package main
- import (
- "bytes"
- "context"
- "errors"
- "flag"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/storage/idx"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "io"
- "math"
- "os"
- )
- var (
- serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
- volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
- volumeCollection = flag.String("collection", "", "the volume collection name")
- grpcDialOption grpc.DialOption
- )
- /*
- Diff the volume's files across multiple volume servers.
- diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
- Example Output:
- reference 127.0.0.1:8081
- fileId volumeServer message
- 5,01617c3f61 127.0.0.1:8080 wrongSize
- */
- func main() {
- flag.Parse()
- util.LoadConfiguration("security", false)
- grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- vid := uint32(*volumeId)
- servers := pb.ServerAddresses(*serversStr).ToAddresses()
- if len(servers) < 2 {
- glog.Fatalf("You must specify more than 1 server\n")
- }
- var referenceServer pb.ServerAddress
- var maxOffset int64
- allFiles := map[pb.ServerAddress]map[types.NeedleId]needleState{}
- for _, addr := range servers {
- files, offset, err := getVolumeFiles(vid, addr)
- if err != nil {
- glog.Fatalf("Failed to copy idx from volume server %s\n", err)
- }
- allFiles[addr] = files
- if offset > maxOffset {
- referenceServer = addr
- }
- }
- same := true
- fmt.Println("reference", referenceServer)
- fmt.Println("fileId volumeServer message")
- for nid, n := range allFiles[referenceServer] {
- for addr, files := range allFiles {
- if addr == referenceServer {
- continue
- }
- var diffMsg string
- n2, ok := files[nid]
- if !ok {
- if n.state == stateDeleted {
- continue
- }
- diffMsg = "missing"
- } else if n2.state != n.state {
- switch n.state {
- case stateDeleted:
- diffMsg = "notDeleted"
- case statePresent:
- diffMsg = "deleted"
- }
- } else if n2.size != n.size {
- diffMsg = "wrongSize"
- } else {
- continue
- }
- same = false
- // fetch the needle details
- var id string
- var err error
- if n.state == statePresent {
- id, err = getNeedleFileId(vid, nid, referenceServer)
- } else {
- id, err = getNeedleFileId(vid, nid, addr)
- }
- if err != nil {
- glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
- }
- fmt.Println(id, addr, diffMsg)
- }
- }
- if !same {
- os.Exit(1)
- }
- }
- const (
- stateDeleted uint8 = 1
- statePresent uint8 = 2
- )
- type needleState struct {
- state uint8
- size types.Size
- }
- func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleState, int64, error) {
- var idxFile *bytes.Reader
- err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: v,
- Ext: ".idx",
- CompactionRevision: math.MaxUint32,
- StopOffset: math.MaxInt64,
- Collection: *volumeCollection,
- })
- if err != nil {
- return err
- }
- var buf bytes.Buffer
- for {
- resp, err := copyFileClient.Recv()
- if errors.Is(err, io.EOF) {
- break
- }
- if err != nil {
- return err
- }
- buf.Write(resp.FileContent)
- }
- idxFile = bytes.NewReader(buf.Bytes())
- return nil
- })
- if err != nil {
- return nil, 0, err
- }
- var maxOffset int64
- files := map[types.NeedleId]needleState{}
- err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
- if offset.IsZero() || size.IsDeleted() {
- files[key] = needleState{
- state: stateDeleted,
- size: size,
- }
- } else {
- files[key] = needleState{
- state: statePresent,
- size: size,
- }
- }
- if actual := offset.ToActualOffset(); actual > maxOffset {
- maxOffset = actual
- }
- return nil
- })
- if err != nil {
- return nil, 0, err
- }
- return files, maxOffset, nil
- }
- func getNeedleFileId(v uint32, nid types.NeedleId, addr pb.ServerAddress) (string, error) {
- var id string
- err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
- resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
- VolumeId: v,
- NeedleId: uint64(nid),
- })
- if err != nil {
- return err
- }
- id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
- return nil
- })
- return id, err
- }
|