volume_grpc_batch_delete.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package weed_server
  2. import (
  3. "context"
  4. "net/http"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/operation"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. )
  10. func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
  11. resp := &volume_server_pb.BatchDeleteResponse{}
  12. now := uint64(time.Now().Unix())
  13. for _, fid := range req.FileIds {
  14. vid, id_cookie, err := operation.ParseFileId(fid)
  15. if err != nil {
  16. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  17. FileId: fid,
  18. Status: http.StatusBadRequest,
  19. Error: err.Error()})
  20. continue
  21. }
  22. n := new(needle.Needle)
  23. volumeId, _ := needle.NewVolumeId(vid)
  24. ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId)
  25. if req.SkipCookieCheck {
  26. n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie)
  27. if err != nil {
  28. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  29. FileId: fid,
  30. Status: http.StatusBadRequest,
  31. Error: err.Error()})
  32. continue
  33. }
  34. } else {
  35. n.ParsePath(id_cookie)
  36. cookie := n.Cookie
  37. if !isEcVolume {
  38. if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
  39. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  40. FileId: fid,
  41. Status: http.StatusNotFound,
  42. Error: err.Error(),
  43. })
  44. continue
  45. }
  46. } else {
  47. if _, err := vs.store.ReadEcShardNeedle(volumeId, n, nil); err != nil {
  48. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  49. FileId: fid,
  50. Status: http.StatusNotFound,
  51. Error: err.Error(),
  52. })
  53. continue
  54. }
  55. }
  56. if n.Cookie != cookie {
  57. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  58. FileId: fid,
  59. Status: http.StatusBadRequest,
  60. Error: "File Random Cookie does not match.",
  61. })
  62. break
  63. }
  64. }
  65. if n.IsChunkedManifest() {
  66. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  67. FileId: fid,
  68. Status: http.StatusNotAcceptable,
  69. Error: "ChunkManifest: not allowed in batch delete mode.",
  70. })
  71. continue
  72. }
  73. n.LastModified = now
  74. if !isEcVolume {
  75. if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
  76. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  77. FileId: fid,
  78. Status: http.StatusInternalServerError,
  79. Error: err.Error()},
  80. )
  81. } else {
  82. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  83. FileId: fid,
  84. Status: http.StatusAccepted,
  85. Size: uint32(size)},
  86. )
  87. }
  88. } else {
  89. if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, n.Cookie); err != nil {
  90. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  91. FileId: fid,
  92. Status: http.StatusInternalServerError,
  93. Error: err.Error()},
  94. )
  95. } else {
  96. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  97. FileId: fid,
  98. Status: http.StatusAccepted,
  99. Size: uint32(size)},
  100. )
  101. }
  102. }
  103. }
  104. return resp, nil
  105. }