retryless_writer.h 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. #pragma once
  2. #include <yt/cpp/mapreduce/http/helpers.h>
  3. #include <yt/cpp/mapreduce/http/http.h>
  4. #include <yt/cpp/mapreduce/http/http_client.h>
  5. #include <yt/cpp/mapreduce/interface/config.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/config.h>
  8. #include <yt/cpp/mapreduce/interface/io.h>
  9. #include <yt/cpp/mapreduce/interface/tvm.h>
  10. #include <yt/cpp/mapreduce/io/helpers.h>
  11. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  12. #include <util/stream/buffered.h>
  13. namespace NYT {
  14. ////////////////////////////////////////////////////////////////////////////////
  15. class TRetrylessWriter
  16. : public TRawTableWriter
  17. {
  18. public:
  19. template <class TWriterOptions>
  20. TRetrylessWriter(
  21. const TClientContext& context,
  22. const TTransactionId& parentId,
  23. const TString& command,
  24. const TMaybe<TFormat>& format,
  25. const TRichYPath& path,
  26. size_t bufferSize,
  27. const TWriterOptions& options)
  28. : BufferSize_(bufferSize)
  29. , AutoFinish_(options.AutoFinish_)
  30. {
  31. THttpHeader header("PUT", command);
  32. header.SetInputFormat(format);
  33. header.MergeParameters(FormIORequestParameters(path, options));
  34. header.AddTransactionId(parentId);
  35. header.SetRequestCompression(ToString(context.Config->ContentEncoding));
  36. if (context.ServiceTicketAuth) {
  37. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  38. } else {
  39. header.SetToken(context.Token);
  40. }
  41. if (context.ImpersonationUser) {
  42. header.SetImpersonationUser(*context.ImpersonationUser);
  43. }
  44. TString requestId = CreateGuidAsString();
  45. auto hostName = GetProxyForHeavyRequest(context);
  46. UpdateHeaderForProxyIfNeed(hostName, context, header);
  47. Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
  48. BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
  49. }
  50. ~TRetrylessWriter() override;
  51. void NotifyRowEnd() override;
  52. void Abort() override;
  53. size_t GetBufferMemoryUsage() const override;
  54. protected:
  55. void DoWrite(const void* buf, size_t len) override;
  56. void DoFinish() override;
  57. private:
  58. const size_t BufferSize_ = 0;
  59. const bool AutoFinish_;
  60. bool Running_ = true;
  61. NHttpClient::IHttpRequestPtr Request_;
  62. THolder<TBufferedOutput> BufferedOutput_;
  63. };
  64. ////////////////////////////////////////////////////////////////////////////////
  65. } // namespace NYT