client.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. #include "client.h"
  2. #include "protocol.h"
  3. #include <library/cpp/clickhouse/client/base/coded.h>
  4. #include <library/cpp/clickhouse/client/base/compressed.h>
  5. #include <library/cpp/clickhouse/client/base/wire_format.h>
  6. #include <library/cpp/clickhouse/client/columns/factory.h>
  7. #include <library/cpp/openssl/io/stream.h>
  8. #include <util/generic/buffer.h>
  9. #include <util/generic/vector.h>
  10. #include <util/network/socket.h>
  11. #include <util/random/random.h>
  12. #include <util/stream/buffered.h>
  13. #include <util/stream/buffer.h>
  14. #include <util/stream/mem.h>
  15. #include <util/string/builder.h>
  16. #include <util/string/cast.h>
  17. #include <util/system/unaligned_mem.h>
  18. #include <contrib/libs/lz4/lz4.h>
  19. #include <contrib/restricted/cityhash-1.0.2/city.h>
  20. #define DBMS_NAME "ClickHouse"
  21. #define DBMS_VERSION_MAJOR 1
  22. #define DBMS_VERSION_MINOR 1
  23. #define REVISION 54126
  24. #define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
  25. #define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
  26. #define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
  27. #define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
  28. #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
  29. #define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
  30. namespace NClickHouse {
  31. struct TClientInfo {
  32. ui8 IfaceType = 1; // TCP
  33. ui8 QueryKind;
  34. TString InitialUser;
  35. TString InitialQueryId;
  36. TString QuotaKey;
  37. TString OsUser;
  38. TString ClientHostname;
  39. TString ClientName;
  40. TString InitialAddress = "[::ffff:127.0.0.1]:0";
  41. ui64 ClientVersionMajor = 0;
  42. ui64 ClientVersionMinor = 0;
  43. ui32 ClientRevision = 0;
  44. };
  45. struct TServerInfo {
  46. TString Name;
  47. TString Timezone;
  48. ui64 VersionMajor;
  49. ui64 VersionMinor;
  50. ui64 Revision;
  51. };
  52. class TClient::TImpl {
  53. public:
  54. TImpl(const TClientOptions& opts);
  55. ~TImpl();
  56. void ExecuteQuery(TQuery query);
  57. void Insert(const TString& table_name, const TBlock& block, const TString& query_id, const TString& deduplication_token);
  58. void Ping();
  59. void ResetConnection();
  60. private:
  61. bool Handshake();
  62. bool ReceivePacket(ui64* server_packet = nullptr);
  63. void SendQuery(const TString& query, const TString& query_id, const TString& deduplication_token = "");
  64. void SendData(const TBlock& block);
  65. bool SendHello();
  66. bool ReadBlock(TBlock* block, TCodedInputStream* input);
  67. bool ReceiveHello();
  68. /// Reads data packet form input stream.
  69. bool ReceiveData();
  70. /// Reads exception packet form input stream.
  71. bool ReceiveException(bool rethrow = false);
  72. void WriteBlock(const TBlock& block, TCodedOutputStream* output);
  73. private:
  74. void Disconnect() {
  75. Socket_ = TSocket();
  76. }
  77. /// In case of network errors tries to reconnect to server and
  78. /// call fuc several times.
  79. void RetryGuard(std::function<void()> fuc);
  80. private:
  81. class EnsureNull {
  82. public:
  83. inline EnsureNull(TQueryEvents* ev, TQueryEvents** ptr)
  84. : ptr_(ptr)
  85. {
  86. if (ptr_) {
  87. *ptr_ = ev;
  88. }
  89. }
  90. inline ~EnsureNull() {
  91. if (ptr_) {
  92. *ptr_ = nullptr;
  93. }
  94. }
  95. private:
  96. TQueryEvents** ptr_;
  97. };
  98. const TClientOptions Options_;
  99. TQueryEvents* Events_;
  100. int Compression_ = CompressionState::Disable;
  101. TSocket Socket_;
  102. TSocketInput SocketInput_;
  103. TSocketOutput SocketOutput_;
  104. THolder<TBufferedInput> BufferedInput_;
  105. THolder<TBufferedOutput> BufferedOutput_;
  106. THolder<TOpenSslClientIO> SslClient_;
  107. TCodedInputStream Input_;
  108. TCodedOutputStream Output_;
  109. TServerInfo ServerInfo_;
  110. };
  111. TClient::TImpl::TImpl(const TClientOptions& opts)
  112. : Options_(opts)
  113. , Events_(nullptr)
  114. , Socket_(TNetworkAddress(opts.Host, opts.Port), Options_.ConnectTimeout)
  115. , SocketInput_(Socket_)
  116. , SocketOutput_(Socket_)
  117. {
  118. if (opts.UseSsl) {
  119. SslClient_ = MakeHolder<TOpenSslClientIO>(&SocketInput_, &SocketOutput_, opts.SslOptions);
  120. BufferedInput_ = MakeHolder<TBufferedInput>(SslClient_.Get());
  121. BufferedOutput_ = MakeHolder<TBufferedOutput>(SslClient_.Get());
  122. } else {
  123. BufferedInput_ = MakeHolder<TBufferedInput>(&SocketInput_);
  124. BufferedOutput_ = MakeHolder<TBufferedOutput>(&SocketOutput_);
  125. }
  126. Input_ = TCodedInputStream(BufferedInput_.Get());
  127. Output_ = TCodedOutputStream(BufferedOutput_.Get());
  128. if (Options_.RequestTimeout.Seconds()) {
  129. Socket_.SetSocketTimeout(Options_.RequestTimeout.Seconds());
  130. }
  131. if (!Handshake()) {
  132. ythrow yexception() << "fail to connect to " << Options_.Host;
  133. }
  134. if (Options_.CompressionMethod != ECompressionMethod::None) {
  135. Compression_ = CompressionState::Enable;
  136. }
  137. }
  138. TClient::TImpl::~TImpl() {
  139. Disconnect();
  140. }
  141. void TClient::TImpl::ExecuteQuery(TQuery query) {
  142. EnsureNull en(static_cast<TQueryEvents*>(&query), &Events_);
  143. if (Options_.PingBeforeQuery) {
  144. RetryGuard([this]() { Ping(); });
  145. }
  146. SendQuery(query.GetText(), query.GetId());
  147. ui64 server_packet = 0;
  148. while (ReceivePacket(&server_packet)) {
  149. ;
  150. }
  151. if (server_packet != ServerCodes::EndOfStream && server_packet != ServerCodes::Exception) {
  152. ythrow yexception() << "unexpected packet from server while receiving end of query (got: "
  153. << (server_packet ? ToString(server_packet) : "nothing") << ")";
  154. }
  155. }
  156. void TClient::TImpl::Insert(const TString& table_name, const TBlock& block, const TString& query_id, const TString& deduplication_token) {
  157. if (Options_.PingBeforeQuery) {
  158. RetryGuard([this]() { Ping(); });
  159. }
  160. TVector<TString> fields;
  161. fields.reserve(block.GetColumnCount());
  162. // Enumerate all fields
  163. for (TBlock::TIterator bi(block); bi.IsValid(); bi.Next()) {
  164. fields.push_back(bi.Name());
  165. }
  166. TStringBuilder fields_section;
  167. for (auto elem = fields.begin(); elem != fields.end(); ++elem) {
  168. if (std::distance(elem, fields.end()) == 1) {
  169. fields_section << *elem;
  170. } else {
  171. fields_section << *elem << ",";
  172. }
  173. }
  174. SendQuery("INSERT INTO " + table_name + " ( " + fields_section + " ) VALUES", query_id, deduplication_token);
  175. ui64 server_packet(0);
  176. // Receive data packet.
  177. while (true) {
  178. bool ret = ReceivePacket(&server_packet);
  179. if (!ret) {
  180. ythrow yexception() << "unable to receive data packet";
  181. }
  182. if (server_packet == ServerCodes::Data) {
  183. break;
  184. }
  185. if (server_packet == ServerCodes::Progress) {
  186. continue;
  187. }
  188. }
  189. // Send data.
  190. SendData(block);
  191. // Send empty block as marker of
  192. // end of data.
  193. SendData(TBlock());
  194. // Wait for EOS.
  195. ui64 eos_packet{0};
  196. while (ReceivePacket(&eos_packet)) {
  197. ;
  198. }
  199. if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception
  200. && eos_packet != ServerCodes::Log && Options_.RethrowExceptions) {
  201. ythrow yexception() << "unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "
  202. << (eos_packet ? ToString(eos_packet) : "nothing") << ")";
  203. }
  204. }
  205. void TClient::TImpl::Ping() {
  206. TWireFormat::WriteUInt64(&Output_, ClientCodes::Ping);
  207. Output_.Flush();
  208. ui64 server_packet;
  209. const bool ret = ReceivePacket(&server_packet);
  210. if (!ret || server_packet != ServerCodes::Pong) {
  211. ythrow yexception() << "fail to ping server";
  212. }
  213. }
  214. void TClient::TImpl::ResetConnection() {
  215. Socket_ = TSocket(TNetworkAddress(Options_.Host, Options_.Port), Options_.ConnectTimeout);
  216. if (Options_.UseSsl) {
  217. SslClient_.Reset(new TOpenSslClientIO(&SocketInput_, &SocketOutput_, Options_.SslOptions));
  218. BufferedInput_.Reset(new TBufferedInput(SslClient_.Get()));
  219. BufferedOutput_.Reset(new TBufferedOutput(SslClient_.Get()));
  220. } else {
  221. BufferedInput_.Reset(new TBufferedInput(&SocketInput_));
  222. BufferedOutput_.Reset(new TBufferedOutput(&SocketOutput_));
  223. }
  224. SocketInput_ = TSocketInput(Socket_);
  225. SocketOutput_ = TSocketOutput(Socket_);
  226. Input_ = TCodedInputStream(BufferedInput_.Get());
  227. Output_ = TCodedOutputStream(BufferedOutput_.Get());
  228. if (Options_.RequestTimeout.Seconds()) {
  229. Socket_.SetSocketTimeout(Options_.RequestTimeout.Seconds());
  230. }
  231. if (!Handshake()) {
  232. ythrow yexception() << "fail to connect to " << Options_.Host;
  233. }
  234. }
  235. bool TClient::TImpl::Handshake() {
  236. if (!SendHello()) {
  237. return false;
  238. }
  239. if (!ReceiveHello()) {
  240. return false;
  241. }
  242. return true;
  243. }
  244. bool TClient::TImpl::ReceivePacket(ui64* server_packet) {
  245. ui64 packet_type = 0;
  246. if (!Input_.ReadVarint64(&packet_type)) {
  247. return false;
  248. }
  249. if (server_packet) {
  250. *server_packet = packet_type;
  251. }
  252. switch (packet_type) {
  253. case ServerCodes::Totals:
  254. case ServerCodes::Data: {
  255. if (!ReceiveData()) {
  256. ythrow yexception() << "can't read data packet from input stream";
  257. }
  258. return true;
  259. }
  260. case ServerCodes::Exception: {
  261. ReceiveException();
  262. return false;
  263. }
  264. case ServerCodes::ProfileInfo: {
  265. TProfile profile;
  266. if (!TWireFormat::ReadUInt64(&Input_, &profile.rows)) {
  267. return false;
  268. }
  269. if (!TWireFormat::ReadUInt64(&Input_, &profile.blocks)) {
  270. return false;
  271. }
  272. if (!TWireFormat::ReadUInt64(&Input_, &profile.bytes)) {
  273. return false;
  274. }
  275. if (!TWireFormat::ReadFixed(&Input_, &profile.applied_limit)) {
  276. return false;
  277. }
  278. if (!TWireFormat::ReadUInt64(&Input_, &profile.rows_before_limit)) {
  279. return false;
  280. }
  281. if (!TWireFormat::ReadFixed(&Input_, &profile.calculated_rows_before_limit)) {
  282. return false;
  283. }
  284. if (Events_) {
  285. Events_->OnProfile(profile);
  286. }
  287. return true;
  288. }
  289. case ServerCodes::Progress: {
  290. TProgress info;
  291. if (!TWireFormat::ReadUInt64(&Input_, &info.rows)) {
  292. return false;
  293. }
  294. if (!TWireFormat::ReadUInt64(&Input_, &info.bytes)) {
  295. return false;
  296. }
  297. if (REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) {
  298. if (!TWireFormat::ReadUInt64(&Input_, &info.total_rows)) {
  299. return false;
  300. }
  301. }
  302. if (Events_) {
  303. Events_->OnProgress(info);
  304. }
  305. return true;
  306. }
  307. case ServerCodes::Pong: {
  308. return true;
  309. }
  310. case ServerCodes::EndOfStream: {
  311. if (Events_) {
  312. Events_->OnFinish();
  313. }
  314. return false;
  315. }
  316. default:
  317. ythrow yexception() << "unimplemented " << (int)packet_type;
  318. break;
  319. }
  320. return false;
  321. }
  322. bool TClient::TImpl::ReadBlock(TBlock* block, TCodedInputStream* input) {
  323. // Additional information about block.
  324. if (REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
  325. ui64 num;
  326. TBlockInfo info;
  327. // BlockInfo
  328. if (!TWireFormat::ReadUInt64(input, &num)) {
  329. return false;
  330. }
  331. if (!TWireFormat::ReadFixed(input, &info.IsOverflows)) {
  332. return false;
  333. }
  334. if (!TWireFormat::ReadUInt64(input, &num)) {
  335. return false;
  336. }
  337. if (!TWireFormat::ReadFixed(input, &info.BucketNum)) {
  338. return false;
  339. }
  340. if (!TWireFormat::ReadUInt64(input, &num)) {
  341. return false;
  342. }
  343. // TODO use data
  344. }
  345. ui64 num_columns = 0;
  346. ui64 num_rows = 0;
  347. if (!TWireFormat::ReadUInt64(input, &num_columns)) {
  348. return false;
  349. }
  350. if (!TWireFormat::ReadUInt64(input, &num_rows)) {
  351. return false;
  352. }
  353. for (size_t i = 0; i < num_columns; ++i) {
  354. TString name;
  355. TString type;
  356. if (!TWireFormat::ReadString(input, &name)) {
  357. return false;
  358. }
  359. if (!TWireFormat::ReadString(input, &type)) {
  360. return false;
  361. }
  362. if (TColumnRef col = CreateColumnByType(type)) {
  363. if (num_rows && !col->Load(input, num_rows)) {
  364. ythrow yexception() << "can't load";
  365. }
  366. block->AppendColumn(name, col);
  367. } else {
  368. ythrow yexception() << "unsupported column type: " << type;
  369. }
  370. }
  371. return true;
  372. }
  373. bool TClient::TImpl::ReceiveData() {
  374. TBlock block;
  375. if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
  376. TString table_name;
  377. if (!TWireFormat::ReadString(&Input_, &table_name)) {
  378. return false;
  379. }
  380. }
  381. if (Compression_ == CompressionState::Enable) {
  382. TCompressedInput compressed(&Input_);
  383. TCodedInputStream coded(&compressed);
  384. if (!ReadBlock(&block, &coded)) {
  385. return false;
  386. }
  387. } else {
  388. if (!ReadBlock(&block, &Input_)) {
  389. return false;
  390. }
  391. }
  392. if (Events_) {
  393. Events_->OnData(block);
  394. }
  395. return true;
  396. }
  397. bool TClient::TImpl::ReceiveException(bool rethrow) {
  398. std::unique_ptr<TException> e(new TException);
  399. TException* current = e.get();
  400. bool exception_received = true;
  401. do {
  402. bool has_nested = false;
  403. if (!TWireFormat::ReadFixed(&Input_, &current->Code)) {
  404. exception_received = false;
  405. break;
  406. }
  407. if (!TWireFormat::ReadString(&Input_, &current->Name)) {
  408. exception_received = false;
  409. break;
  410. }
  411. if (!TWireFormat::ReadString(&Input_, &current->DisplayText)) {
  412. exception_received = false;
  413. break;
  414. }
  415. if (!TWireFormat::ReadString(&Input_, &current->StackTrace)) {
  416. exception_received = false;
  417. break;
  418. }
  419. if (!TWireFormat::ReadFixed(&Input_, &has_nested)) {
  420. exception_received = false;
  421. break;
  422. }
  423. if (has_nested) {
  424. current->Nested.reset(new TException);
  425. current = current->Nested.get();
  426. } else {
  427. break;
  428. }
  429. } while (true);
  430. if (Events_) {
  431. Events_->OnServerException(*e);
  432. }
  433. if (rethrow || Options_.RethrowExceptions) {
  434. throw TServerException(std::move(e));
  435. }
  436. return exception_received;
  437. }
  438. void TClient::TImpl::SendQuery(const TString& query, const TString& query_id, const TString& deduplication_token) {
  439. TWireFormat::WriteUInt64(&Output_, ClientCodes::Query);
  440. TWireFormat::WriteString(&Output_, query_id);
  441. /// Client info.
  442. if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) {
  443. TClientInfo info;
  444. info.QueryKind = 1;
  445. info.ClientName = "ClickHouse client";
  446. info.ClientVersionMajor = DBMS_VERSION_MAJOR;
  447. info.ClientVersionMinor = DBMS_VERSION_MINOR;
  448. info.ClientRevision = REVISION;
  449. TWireFormat::WriteFixed(&Output_, info.QueryKind);
  450. TWireFormat::WriteString(&Output_, info.InitialUser);
  451. TWireFormat::WriteString(&Output_, info.InitialQueryId);
  452. TWireFormat::WriteString(&Output_, info.InitialAddress);
  453. TWireFormat::WriteFixed(&Output_, info.IfaceType);
  454. TWireFormat::WriteString(&Output_, info.OsUser);
  455. TWireFormat::WriteString(&Output_, info.ClientHostname);
  456. TWireFormat::WriteString(&Output_, info.ClientName);
  457. TWireFormat::WriteUInt64(&Output_, info.ClientVersionMajor);
  458. TWireFormat::WriteUInt64(&Output_, info.ClientVersionMinor);
  459. TWireFormat::WriteUInt64(&Output_, info.ClientRevision);
  460. if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
  461. TWireFormat::WriteString(&Output_, info.QuotaKey);
  462. }
  463. if (!deduplication_token.empty()) {
  464. static const TString insert_deduplication_token_setting_name = "insert_deduplication_token";
  465. TWireFormat::WriteString(&Output_, insert_deduplication_token_setting_name);
  466. TWireFormat::WriteString(&Output_, deduplication_token);
  467. }
  468. TWireFormat::WriteString(&Output_, TString()); // Empty string is a marker of end SETTINGS section
  469. TWireFormat::WriteUInt64(&Output_, Stages::Complete);
  470. TWireFormat::WriteUInt64(&Output_, Compression_);
  471. TWireFormat::WriteString(&Output_, query);
  472. // Send empty block as marker of
  473. // end of data
  474. SendData(TBlock());
  475. Output_.Flush();
  476. }
  477. void TClient::TImpl::WriteBlock(const TBlock& block, TCodedOutputStream* output) {
  478. /// Дополнительная информация о блоке.
  479. if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
  480. TWireFormat::WriteUInt64(output, 1);
  481. TWireFormat::WriteFixed(output, block.Info().IsOverflows);
  482. TWireFormat::WriteUInt64(output, 2);
  483. TWireFormat::WriteFixed(output, block.Info().BucketNum);
  484. TWireFormat::WriteUInt64(output, 0);
  485. }
  486. TWireFormat::WriteUInt64(output, block.GetColumnCount());
  487. TWireFormat::WriteUInt64(output, block.GetRowCount());
  488. for (TBlock::TIterator bi(block); bi.IsValid(); bi.Next()) {
  489. TWireFormat::WriteString(output, bi.Name());
  490. TWireFormat::WriteString(output, bi.Type()->GetName());
  491. bi.Column()->Save(output);
  492. }
  493. }
  494. void TClient::TImpl::SendData(const TBlock& block) {
  495. TWireFormat::WriteUInt64(&Output_, ClientCodes::Data);
  496. if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
  497. TWireFormat::WriteString(&Output_, TString());
  498. }
  499. if (Compression_ == CompressionState::Enable) {
  500. switch (Options_.CompressionMethod) {
  501. case ECompressionMethod::None: {
  502. Y_ABORT_UNLESS(false, "invalid state");
  503. break;
  504. }
  505. case ECompressionMethod::LZ4: {
  506. TBufferOutput tmp;
  507. // Serialize block's data
  508. {
  509. TCodedOutputStream out(&tmp);
  510. WriteBlock(block, &out);
  511. }
  512. // Reserver space for data
  513. TBuffer buf;
  514. buf.Resize(9 + LZ4_compressBound(tmp.Buffer().Size()));
  515. // Compress data
  516. int size = LZ4_compress(tmp.Buffer().Data(), buf.Data() + 9, tmp.Buffer().Size());
  517. buf.Resize(9 + size);
  518. // Fill header
  519. ui8* p = (ui8*)buf.Data();
  520. // Compression method
  521. WriteUnaligned<ui8>(p, (ui8)0x82);
  522. p += 1;
  523. // Compressed data size with header
  524. WriteUnaligned<ui32>(p, (ui32)buf.Size());
  525. p += 4;
  526. // Original data size
  527. WriteUnaligned<ui32>(p, (ui32)tmp.Buffer().Size());
  528. TWireFormat::WriteFixed(&Output_, CityHash_v1_0_2::CityHash128(
  529. buf.Data(), buf.Size()));
  530. TWireFormat::WriteBytes(&Output_, buf.Data(), buf.Size());
  531. break;
  532. }
  533. }
  534. } else {
  535. WriteBlock(block, &Output_);
  536. }
  537. Output_.Flush();
  538. }
  539. bool TClient::TImpl::SendHello() {
  540. TWireFormat::WriteUInt64(&Output_, ClientCodes::Hello);
  541. TWireFormat::WriteString(&Output_, TString(DBMS_NAME) + " client");
  542. TWireFormat::WriteUInt64(&Output_, DBMS_VERSION_MAJOR);
  543. TWireFormat::WriteUInt64(&Output_, DBMS_VERSION_MINOR);
  544. TWireFormat::WriteUInt64(&Output_, REVISION);
  545. TWireFormat::WriteString(&Output_, Options_.DefaultDatabase);
  546. TWireFormat::WriteString(&Output_, Options_.User);
  547. TWireFormat::WriteString(&Output_, Options_.Password);
  548. Output_.Flush();
  549. return true;
  550. }
  551. bool TClient::TImpl::ReceiveHello() {
  552. ui64 packet_type = 0;
  553. if (!Input_.ReadVarint64(&packet_type)) {
  554. return false;
  555. }
  556. if (packet_type == ServerCodes::Hello) {
  557. if (!TWireFormat::ReadString(&Input_, &ServerInfo_.Name)) {
  558. return false;
  559. }
  560. if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.VersionMajor)) {
  561. return false;
  562. }
  563. if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.VersionMinor)) {
  564. return false;
  565. }
  566. if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.Revision)) {
  567. return false;
  568. }
  569. if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) {
  570. if (!TWireFormat::ReadString(&Input_, &ServerInfo_.Timezone)) {
  571. return false;
  572. }
  573. }
  574. return true;
  575. } else if (packet_type == ServerCodes::Exception) {
  576. ReceiveException(true);
  577. return false;
  578. }
  579. return false;
  580. }
  581. void TClient::TImpl::RetryGuard(std::function<void()> func) {
  582. for (int i = 0; i <= Options_.SendRetries; ++i) {
  583. try {
  584. func();
  585. return;
  586. } catch (const yexception&) {
  587. bool ok = true;
  588. try {
  589. Sleep(Options_.RetryTimeout);
  590. ResetConnection();
  591. } catch (...) {
  592. ok = false;
  593. }
  594. if (!ok) {
  595. throw;
  596. }
  597. }
  598. }
  599. }
  600. TClient::TClient(const TClientOptions& opts)
  601. : Options_(opts)
  602. , Impl_(new TImpl(opts))
  603. {
  604. }
  605. TClient::~TClient() {
  606. }
  607. void TClient::Execute(const TQuery& query) {
  608. Impl_->ExecuteQuery(query);
  609. }
  610. void TClient::Select(const TString& query, TSelectCallback cb, const TString& query_id) {
  611. Execute(TQuery(query, query_id).OnData(cb));
  612. }
  613. void TClient::Select(const TQuery& query) {
  614. Execute(query);
  615. }
  616. void TClient::Insert(const TString& table_name, const TBlock& block, const TString& query_id, const TString& deduplication_token) {
  617. Impl_->Insert(table_name, block, query_id, deduplication_token);
  618. }
  619. void TClient::Ping() {
  620. Impl_->Ping();
  621. }
  622. void TClient::ResetConnection() {
  623. Impl_->ResetConnection();
  624. }
  625. }