operation_helpers.cpp 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. #include "operation_helpers.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/http/context.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/http/retry_request.h>
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/raw_client.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. #include <util/string/builder.h>
  10. #include <util/system/mutex.h>
  11. #include <util/system/rwlock.h>
  12. namespace NYT::NDetail {
  13. ////////////////////////////////////////////////////////////////////////////////
  14. ui64 RoundUpFileSize(ui64 size)
  15. {
  16. constexpr ui64 roundUpTo = 4ull << 10;
  17. return (size + roundUpTo - 1) & ~(roundUpTo - 1);
  18. }
  19. bool UseLocalModeOptimization(
  20. const IRawClientPtr& rawClient,
  21. const TClientContext& context,
  22. const IClientRetryPolicyPtr& clientRetryPolicy)
  23. {
  24. if (!context.Config->EnableLocalModeOptimization) {
  25. return false;
  26. }
  27. static THashMap<TString, bool> localModeMap;
  28. static TRWMutex mutex;
  29. {
  30. TReadGuard guard(mutex);
  31. auto it = localModeMap.find(context.ServerName);
  32. if (it != localModeMap.end()) {
  33. return it->second;
  34. }
  35. }
  36. bool isLocalMode = false;
  37. TString localModeAttr("//sys/@local_mode_fqdn");
  38. // We don't want to pollute logs with errors about failed request,
  39. // so we check if path exists before getting it.
  40. auto exists = RequestWithRetry<bool>(
  41. clientRetryPolicy->CreatePolicyForGenericRequest(),
  42. [&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
  43. return rawClient->Exists(
  44. TTransactionId(),
  45. localModeAttr,
  46. TExistsOptions().ReadFrom(EMasterReadKind::Cache));
  47. });
  48. if (exists)
  49. {
  50. auto fqdnNode = RequestWithRetry<TNode>(
  51. clientRetryPolicy->CreatePolicyForGenericRequest(),
  52. [&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
  53. return rawClient->TryGet(
  54. TTransactionId(),
  55. localModeAttr,
  56. TGetOptions().ReadFrom(EMasterReadKind::Cache));
  57. });
  58. if (!fqdnNode.IsUndefined()) {
  59. auto fqdn = fqdnNode.AsString();
  60. isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName);
  61. YT_LOG_DEBUG("Checking local mode; LocalModeFqdn: %v FqdnHostName: %v IsLocalMode: %v",
  62. fqdn,
  63. TProcessState::Get()->FqdnHostName,
  64. isLocalMode ? "true" : "false");
  65. }
  66. }
  67. {
  68. TWriteGuard guard(mutex);
  69. localModeMap[context.ServerName] = isLocalMode;
  70. }
  71. return isLocalMode;
  72. }
  73. TString GetOperationWebInterfaceUrl(TStringBuf serverName, TOperationId operationId)
  74. {
  75. serverName.ChopSuffix(":80");
  76. serverName.ChopSuffix(".yt.yandex-team.ru");
  77. serverName.ChopSuffix(".yt.yandex.net");
  78. return ::TStringBuilder() << "https://yt.yandex-team.ru/" << serverName <<
  79. "/operations/" << GetGuidAsString(operationId);
  80. }
  81. ////////////////////////////////////////////////////////////////////////////////
  82. } // namespace NYT::NDetail