retryless_writer.h 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. #pragma once
  2. #include "transaction.h"
  3. #include <yt/cpp/mapreduce/http/helpers.h>
  4. #include <yt/cpp/mapreduce/http/http.h>
  5. #include <yt/cpp/mapreduce/http/http_client.h>
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/common.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/io.h>
  10. #include <yt/cpp/mapreduce/interface/tvm.h>
  11. #include <yt/cpp/mapreduce/io/helpers.h>
  12. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  13. #include <util/stream/buffered.h>
  14. namespace NYT {
  15. ////////////////////////////////////////////////////////////////////////////////
  16. class TRetrylessWriter
  17. : public TRawTableWriter
  18. {
  19. public:
  20. template <class TWriterOptions>
  21. TRetrylessWriter(
  22. const TClientContext& context,
  23. const TTransactionId& parentId,
  24. const TString& command,
  25. const TMaybe<TFormat>& format,
  26. const TRichYPath& path,
  27. size_t bufferSize,
  28. const TWriterOptions& options)
  29. : BufferSize_(bufferSize)
  30. {
  31. THttpHeader header("PUT", command);
  32. header.SetInputFormat(format);
  33. header.MergeParameters(FormIORequestParameters(path, options));
  34. header.AddTransactionId(parentId);
  35. header.SetRequestCompression(ToString(context.Config->ContentEncoding));
  36. if (context.ServiceTicketAuth) {
  37. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  38. } else {
  39. header.SetToken(context.Token);
  40. }
  41. if (context.ImpersonationUser) {
  42. header.SetImpersonationUser(*context.ImpersonationUser);
  43. }
  44. TString requestId = CreateGuidAsString();
  45. auto hostName = GetProxyForHeavyRequest(context);
  46. UpdateHeaderForProxyIfNeed(hostName, context, header);
  47. Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
  48. BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
  49. }
  50. ~TRetrylessWriter() override;
  51. void NotifyRowEnd() override;
  52. void Abort() override;
  53. size_t GetBufferMemoryUsage() const override;
  54. protected:
  55. void DoWrite(const void* buf, size_t len) override;
  56. void DoFinish() override;
  57. private:
  58. const size_t BufferSize_ = 0;
  59. bool Running_ = true;
  60. NHttpClient::IHttpRequestPtr Request_;
  61. THolder<TBufferedOutput> BufferedOutput_;
  62. };
  63. ////////////////////////////////////////////////////////////////////////////////
  64. } // namespace NYT