operation_helpers.cpp 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #include "operation_helpers.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/common/retry_request.h>
  4. #include <yt/cpp/mapreduce/http/context.h>
  5. #include <yt/cpp/mapreduce/interface/config.h>
  6. #include <yt/cpp/mapreduce/interface/raw_client.h>
  7. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  8. #include <util/string/builder.h>
  9. #include <util/system/mutex.h>
  10. #include <util/system/rwlock.h>
  11. namespace NYT::NDetail {
  12. ////////////////////////////////////////////////////////////////////////////////
  13. ui64 RoundUpFileSize(ui64 size)
  14. {
  15. constexpr ui64 roundUpTo = 4ull << 10;
  16. return (size + roundUpTo - 1) & ~(roundUpTo - 1);
  17. }
  18. bool UseLocalModeOptimization(
  19. const IRawClientPtr& rawClient,
  20. const TClientContext& context,
  21. const IClientRetryPolicyPtr& clientRetryPolicy)
  22. {
  23. if (!context.Config->EnableLocalModeOptimization) {
  24. return false;
  25. }
  26. static THashMap<TString, bool> localModeMap;
  27. static TRWMutex mutex;
  28. {
  29. TReadGuard guard(mutex);
  30. auto it = localModeMap.find(context.ServerName);
  31. if (it != localModeMap.end()) {
  32. return it->second;
  33. }
  34. }
  35. bool isLocalMode = false;
  36. TString localModeAttr("//sys/@local_mode_fqdn");
  37. // We don't want to pollute logs with errors about failed request,
  38. // so we check if path exists before getting it.
  39. auto exists = RequestWithRetry<bool>(
  40. clientRetryPolicy->CreatePolicyForGenericRequest(),
  41. [&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
  42. return rawClient->Exists(
  43. TTransactionId(),
  44. localModeAttr,
  45. TExistsOptions().ReadFrom(EMasterReadKind::Cache));
  46. });
  47. if (exists)
  48. {
  49. auto fqdnNode = RequestWithRetry<TNode>(
  50. clientRetryPolicy->CreatePolicyForGenericRequest(),
  51. [&rawClient, &localModeAttr] (TMutationId /*mutationId*/) {
  52. return rawClient->TryGet(
  53. TTransactionId(),
  54. localModeAttr,
  55. TGetOptions().ReadFrom(EMasterReadKind::Cache));
  56. });
  57. if (!fqdnNode.IsUndefined()) {
  58. auto fqdn = fqdnNode.AsString();
  59. isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName);
  60. YT_LOG_DEBUG("Checking local mode; LocalModeFqdn: %v FqdnHostName: %v IsLocalMode: %v",
  61. fqdn,
  62. TProcessState::Get()->FqdnHostName,
  63. isLocalMode ? "true" : "false");
  64. }
  65. }
  66. {
  67. TWriteGuard guard(mutex);
  68. localModeMap[context.ServerName] = isLocalMode;
  69. }
  70. return isLocalMode;
  71. }
  72. TString GetOperationWebInterfaceUrl(TStringBuf serverName, TOperationId operationId)
  73. {
  74. serverName.ChopSuffix(":80");
  75. serverName.ChopSuffix(".yt.yandex-team.ru");
  76. serverName.ChopSuffix(".yt.yandex.net");
  77. return ::TStringBuilder() << "https://yt.yandex-team.ru/" << serverName <<
  78. "/operations/" << GetGuidAsString(operationId);
  79. }
  80. ////////////////////////////////////////////////////////////////////////////////
  81. } // namespace NYT::NDetail