volume_tailer.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package main
  2. import (
  3. "flag"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "log"
  6. "time"
  7. "context"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. util2 "github.com/seaweedfs/seaweedfs/weed/util"
  12. "golang.org/x/tools/godoc/util"
  13. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  14. )
  15. var (
  16. master = flag.String("master", "localhost:9333", "master server host and port")
  17. volumeId = flag.Int("volumeId", -1, "a volume id")
  18. rewindDuration = flag.Duration("rewind", -1, "rewind back in time. -1 means from the first entry. 0 means from now.")
  19. timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds")
  20. showTextFile = flag.Bool("showTextFile", false, "display textual file content")
  21. )
  22. func main() {
  23. flag.Parse()
  24. util_http.InitGlobalHttpClient()
  25. util2.LoadSecurityConfiguration()
  26. grpcDialOption := security.LoadClientTLS(util2.GetViper(), "grpc.client")
  27. vid := needle.VolumeId(*volumeId)
  28. var sinceTimeNs int64
  29. if *rewindDuration == 0 {
  30. sinceTimeNs = time.Now().UnixNano()
  31. } else if *rewindDuration == -1 {
  32. sinceTimeNs = 0
  33. } else if *rewindDuration > 0 {
  34. sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
  35. }
  36. err := operation.TailVolume(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
  37. if n.Size == 0 {
  38. println("-", n.String())
  39. return nil
  40. } else {
  41. println("+", n.String())
  42. }
  43. if *showTextFile {
  44. data := n.Data
  45. if n.IsCompressed() {
  46. if data, err = util2.DecompressData(data); err != nil {
  47. return err
  48. }
  49. }
  50. if util.IsText(data) {
  51. println(string(data))
  52. }
  53. println("-", n.String(), "compressed", n.IsCompressed(), "original size", len(data))
  54. }
  55. return nil
  56. })
  57. if err != nil {
  58. log.Printf("Error VolumeTailSender volume %d: %v", vid, err)
  59. }
  60. }