volume_grpc_batch_delete.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package weed_server
  2. import (
  3. "context"
  4. "net/http"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. )
  11. func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
  12. resp := &volume_server_pb.BatchDeleteResponse{}
  13. now := uint64(time.Now().Unix())
  14. for _, fid := range req.FileIds {
  15. vid, id_cookie, err := operation.ParseFileId(fid)
  16. if err != nil {
  17. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  18. FileId: fid,
  19. Status: http.StatusBadRequest,
  20. Error: err.Error()})
  21. continue
  22. }
  23. n := new(needle.Needle)
  24. volumeId, _ := needle.NewVolumeId(vid)
  25. if req.SkipCookieCheck {
  26. n.Id, err = types.ParseNeedleId(id_cookie)
  27. if err != nil {
  28. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  29. FileId: fid,
  30. Status: http.StatusBadRequest,
  31. Error: err.Error()})
  32. continue
  33. }
  34. } else {
  35. n.ParsePath(id_cookie)
  36. cookie := n.Cookie
  37. if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
  38. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  39. FileId: fid,
  40. Status: http.StatusNotFound,
  41. Error: err.Error(),
  42. })
  43. continue
  44. }
  45. if n.Cookie != cookie {
  46. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  47. FileId: fid,
  48. Status: http.StatusBadRequest,
  49. Error: "File Random Cookie does not match.",
  50. })
  51. break
  52. }
  53. }
  54. if n.IsChunkedManifest() {
  55. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  56. FileId: fid,
  57. Status: http.StatusNotAcceptable,
  58. Error: "ChunkManifest: not allowed in batch delete mode.",
  59. })
  60. continue
  61. }
  62. n.LastModified = now
  63. if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
  64. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  65. FileId: fid,
  66. Status: http.StatusInternalServerError,
  67. Error: err.Error()},
  68. )
  69. } else {
  70. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  71. FileId: fid,
  72. Status: http.StatusAccepted,
  73. Size: uint32(size)},
  74. )
  75. }
  76. }
  77. return resp, nil
  78. }