retry_transaction.h 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. #pragma once
  2. #include <yt/cpp/mapreduce/http/retry_request.h>
  3. #include <yt/cpp/mapreduce/client/client.h>
  4. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  7. namespace NYT::NDetail {
  8. template <typename TResult>
  9. TResult RetryTransactionWithPolicy(
  10. const TClientBasePtr& client,
  11. std::function<TResult(ITransactionPtr)> func,
  12. IRequestRetryPolicyPtr retryPolicy)
  13. {
  14. if (!retryPolicy) {
  15. retryPolicy = CreateDefaultRequestRetryPolicy(client->GetContext().Config);
  16. }
  17. while (true) {
  18. try {
  19. retryPolicy->NotifyNewAttempt();
  20. auto transaction = client->StartTransaction(TStartTransactionOptions());
  21. if constexpr (std::is_same<TResult, void>::value) {
  22. func(transaction);
  23. transaction->Commit();
  24. return;
  25. } else {
  26. auto result = func(transaction);
  27. transaction->Commit();
  28. return result;
  29. }
  30. } catch (const TErrorResponse& e) {
  31. YT_LOG_ERROR("Retry failed %v - %v",
  32. e.GetError().GetMessage(),
  33. retryPolicy->GetAttemptDescription());
  34. if (!IsRetriable(e)) {
  35. throw;
  36. }
  37. auto maybeRetryTimeout = retryPolicy->OnRetriableError(e);
  38. if (maybeRetryTimeout) {
  39. TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
  40. } else {
  41. throw;
  42. }
  43. } catch (const std::exception& e) {
  44. YT_LOG_ERROR("Retry failed %v - %v",
  45. e.what(),
  46. retryPolicy->GetAttemptDescription());
  47. if (!IsRetriable(e)) {
  48. throw;
  49. }
  50. auto maybeRetryTimeout = retryPolicy->OnGenericError(e);
  51. if (maybeRetryTimeout) {
  52. TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
  53. } else {
  54. throw;
  55. }
  56. }
  57. }
  58. }
  59. } // namespace NYT::NDetail