config.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. #include "config.h"
  2. #include "operation.h"
  3. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  4. #include <library/cpp/json/json_reader.h>
  5. #include <library/cpp/svnversion/svnversion.h>
  6. #include <library/cpp/yson/node/node_builder.h>
  7. #include <library/cpp/yson/node/node_io.h>
  8. #include <library/cpp/yson/json/yson2json_adapter.h>
  9. #include <util/string/strip.h>
  10. #include <util/folder/dirut.h>
  11. #include <util/folder/path.h>
  12. #include <util/stream/file.h>
  13. #include <util/generic/singleton.h>
  14. #include <util/string/builder.h>
  15. #include <util/string/cast.h>
  16. #include <util/string/type.h>
  17. #include <util/system/hostname.h>
  18. #include <util/system/user.h>
  19. #include <util/system/env.h>
  20. namespace NYT {
  21. ////////////////////////////////////////////////////////////////////////////////
  22. bool TConfig::GetBool(const char* var, bool defaultValue)
  23. {
  24. TString val = GetEnv(var, "");
  25. if (val.empty()) {
  26. return defaultValue;
  27. }
  28. return IsTrue(val);
  29. }
  30. int TConfig::GetInt(const char* var, int defaultValue)
  31. {
  32. int result = 0;
  33. TString val = GetEnv(var, "");
  34. if (val.empty()) {
  35. return defaultValue;
  36. }
  37. try {
  38. result = FromString<int>(val);
  39. } catch (const yexception& e) {
  40. ythrow yexception() << "Cannot parse " << var << '=' << val << " as integer: " << e.what();
  41. }
  42. return result;
  43. }
  44. TDuration TConfig::GetDuration(const char* var, TDuration defaultValue)
  45. {
  46. return TDuration::Seconds(GetInt(var, defaultValue.Seconds()));
  47. }
  48. EEncoding TConfig::GetEncoding(const char* var)
  49. {
  50. const TString encodingName = GetEnv(var, "identity");
  51. EEncoding encoding;
  52. if (TryFromString(encodingName, encoding)) {
  53. return encoding;
  54. } else {
  55. ythrow yexception() << var << ": encoding '" << encodingName << "' is not supported";
  56. }
  57. }
  58. EUploadDeduplicationMode TConfig::GetUploadingDeduplicationMode(
  59. const char* var,
  60. EUploadDeduplicationMode defaultValue)
  61. {
  62. const TString deduplicationMode = GetEnv(var, TEnumTraits<EUploadDeduplicationMode>::ToString(defaultValue));
  63. return TEnumTraits<EUploadDeduplicationMode>::FromString(deduplicationMode);
  64. }
  65. void TConfig::ValidateToken(const TString& token)
  66. {
  67. for (size_t i = 0; i < token.size(); ++i) {
  68. ui8 ch = token[i];
  69. if (ch < 0x21 || ch > 0x7e) {
  70. ythrow yexception() << "Incorrect token character '" << ch << "' at position " << i;
  71. }
  72. }
  73. }
  74. TString TConfig::LoadTokenFromFile(const TString& tokenPath)
  75. {
  76. TFsPath path(tokenPath);
  77. return path.IsFile() ? Strip(TIFStream(path).ReadAll()) : TString();
  78. }
  79. TNode TConfig::LoadJsonSpec(const TString& strSpec)
  80. {
  81. TNode spec;
  82. TStringInput input(strSpec);
  83. TNodeBuilder builder(&spec);
  84. TYson2JsonCallbacksAdapter callbacks(&builder);
  85. Y_ENSURE(NJson::ReadJson(&input, &callbacks), "Cannot parse json spec: " << strSpec);
  86. Y_ENSURE(spec.IsMap(), "Json spec is not a map");
  87. return spec;
  88. }
  89. TRichYPath TConfig::LoadApiFilePathOptions(const TString& ysonMap)
  90. {
  91. TNode attributes;
  92. try {
  93. attributes = NodeFromYsonString(ysonMap);
  94. } catch (const yexception& exc) {
  95. ythrow yexception() << "Failed to parse YT_API_FILE_PATH_OPTIONS (it must be yson map): " << exc;
  96. }
  97. TNode pathNode = "";
  98. pathNode.Attributes() = attributes;
  99. TRichYPath path;
  100. Deserialize(path, pathNode);
  101. return path;
  102. }
  103. void TConfig::LoadToken()
  104. {
  105. if (auto envToken = GetEnv("YT_TOKEN")) {
  106. Token = envToken;
  107. } else if (auto envToken = GetEnv("YT_SECURE_VAULT_YT_TOKEN")) {
  108. // If this code runs inside an vanilla peration in YT
  109. // it should not use regular environment variable `YT_TOKEN`
  110. // because it would be visible in UI.
  111. // Token should be passed via `secure_vault` parameter in operation spec.
  112. Token = envToken;
  113. } else if (auto tokenPath = GetEnv("YT_TOKEN_PATH")) {
  114. Token = LoadTokenFromFile(tokenPath);
  115. } else {
  116. Token = LoadTokenFromFile(GetHomeDir() + "/.yt/token");
  117. }
  118. ValidateToken(Token);
  119. }
  120. void TConfig::LoadSpec()
  121. {
  122. TString strSpec = GetEnv("YT_SPEC", "{}");
  123. Spec = LoadJsonSpec(strSpec);
  124. strSpec = GetEnv("YT_TABLE_WRITER", "{}");
  125. TableWriter = LoadJsonSpec(strSpec);
  126. }
  127. void TConfig::LoadTimings()
  128. {
  129. ConnectTimeout = GetDuration("YT_CONNECT_TIMEOUT",
  130. TDuration::Seconds(10));
  131. SocketTimeout = GetDuration("YT_SOCKET_TIMEOUT",
  132. GetDuration("YT_SEND_RECEIVE_TIMEOUT", // common
  133. TDuration::Seconds(60)));
  134. AddressCacheExpirationTimeout = TDuration::Minutes(15);
  135. CacheLockTimeoutPerGb = TDuration::MilliSeconds(1000.0 * 1_GB * 8 / 20_MB); // 20 Mbps = 20 MBps / 8.
  136. TxTimeout = GetDuration("YT_TX_TIMEOUT",
  137. TDuration::Seconds(120));
  138. PingTimeout = GetDuration("YT_PING_TIMEOUT",
  139. TDuration::Seconds(5));
  140. PingInterval = GetDuration("YT_PING_INTERVAL",
  141. TDuration::Seconds(5));
  142. WaitLockPollInterval = TDuration::Seconds(5);
  143. RetryInterval = GetDuration("YT_RETRY_INTERVAL",
  144. TDuration::Seconds(3));
  145. ChunkErrorsRetryInterval = GetDuration("YT_CHUNK_ERRORS_RETRY_INTERVAL",
  146. TDuration::Seconds(60));
  147. RateLimitExceededRetryInterval = GetDuration("YT_RATE_LIMIT_EXCEEDED_RETRY_INTERVAL",
  148. TDuration::Seconds(60));
  149. StartOperationRetryInterval = GetDuration("YT_START_OPERATION_RETRY_INTERVAL",
  150. TDuration::Seconds(60));
  151. HostListUpdateInterval = TDuration::Seconds(60);
  152. }
  153. void TConfig::Reset()
  154. {
  155. Hosts = GetEnv("YT_HOSTS", "hosts");
  156. Pool = GetEnv("YT_POOL");
  157. Prefix = GetEnv("YT_PREFIX");
  158. ApiVersion = GetEnv("YT_VERSION", "v3");
  159. LogLevel = GetEnv("YT_LOG_LEVEL", "error");
  160. ContentEncoding = GetEncoding("YT_CONTENT_ENCODING");
  161. AcceptEncoding = GetEncoding("YT_ACCEPT_ENCODING");
  162. GlobalTxId = GetEnv("YT_TRANSACTION", "");
  163. UseAsyncTxPinger = true;
  164. AsyncHttpClientThreads = 1;
  165. AsyncTxPingerPoolThreads = 1;
  166. ForceIpV4 = GetBool("YT_FORCE_IPV4");
  167. ForceIpV6 = GetBool("YT_FORCE_IPV6");
  168. UseHosts = GetBool("YT_USE_HOSTS", true);
  169. LoadToken();
  170. LoadSpec();
  171. LoadTimings();
  172. CacheUploadDeduplicationMode = GetUploadingDeduplicationMode("YT_UPLOAD_DEDUPLICATION", EUploadDeduplicationMode::Host);
  173. CacheUploadDeduplicationThreshold = 10_MB;
  174. RetryCount = Max(GetInt("YT_RETRY_COUNT", 10), 1);
  175. ReadRetryCount = Max(GetInt("YT_READ_RETRY_COUNT", 30), 1);
  176. StartOperationRetryCount = Max(GetInt("YT_START_OPERATION_RETRY_COUNT", 30), 1);
  177. RemoteTempFilesDirectory = GetEnv("YT_FILE_STORAGE",
  178. "//tmp/yt_wrapper/file_storage");
  179. RemoteTempTablesDirectory = GetEnv("YT_TEMP_TABLES_STORAGE",
  180. "//tmp/yt_wrapper/table_storage");
  181. RemoteTempTablesDirectory = GetEnv("YT_TEMP_DIR",
  182. RemoteTempTablesDirectory);
  183. InferTableSchema = false;
  184. UseClientProtobuf = GetBool("YT_USE_CLIENT_PROTOBUF", false);
  185. NodeReaderFormat = ENodeReaderFormat::Auto;
  186. ProtobufFormatWithDescriptors = true;
  187. MountSandboxInTmpfs = GetBool("YT_MOUNT_SANDBOX_IN_TMPFS");
  188. ApiFilePathOptions = LoadApiFilePathOptions(GetEnv("YT_API_FILE_PATH_OPTIONS", "{}"));
  189. ConnectionPoolSize = GetInt("YT_CONNECTION_POOL_SIZE", 16);
  190. TraceHttpRequestsMode = FromString<ETraceHttpRequestsMode>(to_lower(GetEnv("YT_TRACE_HTTP_REQUESTS", "never")));
  191. CommandsWithFraming = {
  192. "read_table",
  193. "get_table_columnar_statistics",
  194. "get_job_input",
  195. "concatenate",
  196. "partition_tables",
  197. };
  198. }
  199. TConfig::TConfig()
  200. {
  201. Reset();
  202. }
  203. TConfigPtr TConfig::Get()
  204. {
  205. struct TConfigHolder
  206. {
  207. TConfigHolder()
  208. : Config(::MakeIntrusive<TConfig>())
  209. { }
  210. TConfigPtr Config;
  211. };
  212. return Singleton<TConfigHolder>()->Config;
  213. }
  214. ////////////////////////////////////////////////////////////////////////////////
  215. TProcessState::TProcessState()
  216. {
  217. try {
  218. FqdnHostName = ::FQDNHostName();
  219. } catch (const yexception& e) {
  220. try {
  221. FqdnHostName = ::HostName();
  222. } catch (const yexception& e) {
  223. ythrow yexception() << "Cannot get fqdn and host name: " << e.what();
  224. }
  225. }
  226. try {
  227. UserName = ::GetUsername();
  228. } catch (const yexception& e) {
  229. #ifdef _win_
  230. ythrow yexception() << "Cannot get user name: " << e.what();
  231. #else
  232. UserName = "u" + ToString(geteuid());
  233. #endif
  234. }
  235. Pid = static_cast<int>(getpid());
  236. if (!ClientVersion) {
  237. ClientVersion = ::TStringBuilder() << "YT C++ native " << GetProgramCommitId();
  238. }
  239. }
  240. static TString CensorString(TString input)
  241. {
  242. static const TString prefix = "AQAD-";
  243. if (input.find(prefix) == TString::npos) {
  244. return input;
  245. } else {
  246. return TString(input.size(), '*');
  247. }
  248. }
  249. void TProcessState::SetCommandLine(int argc, const char* argv[])
  250. {
  251. for (int i = 0; i < argc; ++i) {
  252. CommandLine.push_back(argv[i]);
  253. CensoredCommandLine.push_back(CensorString(CommandLine.back()));
  254. }
  255. }
  256. TProcessState* TProcessState::Get()
  257. {
  258. return Singleton<TProcessState>();
  259. }
  260. ////////////////////////////////////////////////////////////////////////////////
  261. } // namespace NYT