retryless_writer.h 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #pragma once
  2. #include "transaction.h"
  3. #include <yt/cpp/mapreduce/http/helpers.h>
  4. #include <yt/cpp/mapreduce/http/http.h>
  5. #include <yt/cpp/mapreduce/http/http_client.h>
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/common.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/io.h>
  10. #include <yt/cpp/mapreduce/interface/tvm.h>
  11. #include <yt/cpp/mapreduce/io/helpers.h>
  12. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  13. #include <util/stream/buffered.h>
  14. namespace NYT {
  15. ////////////////////////////////////////////////////////////////////////////////
  16. class TRetrylessWriter
  17. : public TRawTableWriter
  18. {
  19. public:
  20. template <class TWriterOptions>
  21. TRetrylessWriter(
  22. const TClientContext& context,
  23. const TTransactionId& parentId,
  24. const TString& command,
  25. const TMaybe<TFormat>& format,
  26. const TRichYPath& path,
  27. size_t bufferSize,
  28. const TWriterOptions& options)
  29. : BufferSize_(bufferSize)
  30. {
  31. THttpHeader header("PUT", command);
  32. header.SetInputFormat(format);
  33. header.MergeParameters(FormIORequestParameters(path, options));
  34. header.AddTransactionId(parentId);
  35. header.SetRequestCompression(ToString(context.Config->ContentEncoding));
  36. if (context.ServiceTicketAuth) {
  37. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  38. } else {
  39. header.SetToken(context.Token);
  40. }
  41. if (context.ImpersonationUser) {
  42. header.SetImpersonationUser(*context.ImpersonationUser);
  43. }
  44. TString requestId = CreateGuidAsString();
  45. auto hostName = GetProxyForHeavyRequest(context);
  46. Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
  47. BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
  48. }
  49. ~TRetrylessWriter() override;
  50. void NotifyRowEnd() override;
  51. void Abort() override;
  52. size_t GetBufferMemoryUsage() const override;
  53. protected:
  54. void DoWrite(const void* buf, size_t len) override;
  55. void DoFinish() override;
  56. private:
  57. const size_t BufferSize_ = 0;
  58. bool Running_ = true;
  59. NHttpClient::IHttpRequestPtr Request_;
  60. THolder<TBufferedOutput> BufferedOutput_;
  61. };
  62. ////////////////////////////////////////////////////////////////////////////////
  63. } // namespace NYT