retryless_writer.h 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. #pragma once
  2. #include <yt/cpp/mapreduce/http/context.h>
  3. #include <yt/cpp/mapreduce/http/helpers.h>
  4. #include <yt/cpp/mapreduce/http/http.h>
  5. #include <yt/cpp/mapreduce/http/http_client.h>
  6. #include <yt/cpp/mapreduce/http/requests.h>
  7. #include <yt/cpp/mapreduce/interface/config.h>
  8. #include <yt/cpp/mapreduce/interface/common.h>
  9. #include <yt/cpp/mapreduce/interface/config.h>
  10. #include <yt/cpp/mapreduce/interface/io.h>
  11. #include <yt/cpp/mapreduce/interface/tvm.h>
  12. #include <yt/cpp/mapreduce/io/helpers.h>
  13. #include <util/stream/buffered.h>
  14. namespace NYT {
  15. ////////////////////////////////////////////////////////////////////////////////
  16. class TRetrylessWriter
  17. : public TRawTableWriter
  18. {
  19. public:
  20. template <class TWriterOptions>
  21. TRetrylessWriter(
  22. const TClientContext& context,
  23. const TTransactionId& parentId,
  24. const TString& command,
  25. const TMaybe<TFormat>& format,
  26. const TRichYPath& path,
  27. size_t bufferSize,
  28. const TWriterOptions& options)
  29. : BufferSize_(bufferSize)
  30. , AutoFinish_(options.AutoFinish_)
  31. {
  32. THttpHeader header("PUT", command);
  33. header.SetInputFormat(format);
  34. header.MergeParameters(FormIORequestParameters(path, options));
  35. header.AddTransactionId(parentId);
  36. header.SetRequestCompression(ToString(context.Config->ContentEncoding));
  37. if (context.ServiceTicketAuth) {
  38. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  39. } else {
  40. header.SetToken(context.Token);
  41. }
  42. if (context.ImpersonationUser) {
  43. header.SetImpersonationUser(*context.ImpersonationUser);
  44. }
  45. TString requestId = CreateGuidAsString();
  46. auto hostName = GetProxyForHeavyRequest(context);
  47. UpdateHeaderForProxyIfNeed(hostName, context, header);
  48. Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
  49. BufferedOutput_ = std::make_unique<TBufferedOutput>(Request_->GetStream(), BufferSize_);
  50. }
  51. ~TRetrylessWriter() override;
  52. void NotifyRowEnd() override;
  53. void Abort() override;
  54. size_t GetBufferMemoryUsage() const override;
  55. protected:
  56. void DoWrite(const void* buf, size_t len) override;
  57. void DoFinish() override;
  58. private:
  59. const size_t BufferSize_ = 0;
  60. const bool AutoFinish_;
  61. bool Running_ = true;
  62. NHttpClient::IHttpRequestPtr Request_;
  63. std::unique_ptr<TBufferedOutput> BufferedOutput_;
  64. };
  65. ////////////////////////////////////////////////////////////////////////////////
  66. } // namespace NYT