123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768 |
- #include "client.h"
- #include "protocol.h"
- #include <library/cpp/clickhouse/client/base/coded.h>
- #include <library/cpp/clickhouse/client/base/compressed.h>
- #include <library/cpp/clickhouse/client/base/wire_format.h>
- #include <library/cpp/clickhouse/client/columns/factory.h>
- #include <library/cpp/openssl/io/stream.h>
- #include <util/generic/buffer.h>
- #include <util/generic/vector.h>
- #include <util/network/socket.h>
- #include <util/random/random.h>
- #include <util/stream/buffered.h>
- #include <util/stream/buffer.h>
- #include <util/stream/mem.h>
- #include <util/string/builder.h>
- #include <util/string/cast.h>
- #include <util/system/unaligned_mem.h>
- #include <contrib/libs/lz4/lz4.h>
- #include <contrib/restricted/cityhash-1.0.2/city.h>
- #define DBMS_NAME "ClickHouse"
- #define DBMS_VERSION_MAJOR 1
- #define DBMS_VERSION_MINOR 1
- #define REVISION 54126
- #define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
- #define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
- #define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
- #define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
- #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
- #define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
- namespace NClickHouse {
- struct TClientInfo {
- ui8 IfaceType = 1; // TCP
- ui8 QueryKind;
- TString InitialUser;
- TString InitialQueryId;
- TString QuotaKey;
- TString OsUser;
- TString ClientHostname;
- TString ClientName;
- TString InitialAddress = "[::ffff:127.0.0.1]:0";
- ui64 ClientVersionMajor = 0;
- ui64 ClientVersionMinor = 0;
- ui32 ClientRevision = 0;
- };
- struct TServerInfo {
- TString Name;
- TString Timezone;
- ui64 VersionMajor;
- ui64 VersionMinor;
- ui64 Revision;
- };
- class TClient::TImpl {
- public:
- TImpl(const TClientOptions& opts);
- ~TImpl();
- void ExecuteQuery(TQuery query);
- void Insert(const TString& table_name, const TBlock& block, const TString& query_id, const TString& deduplication_token);
- void Ping();
- void ResetConnection();
- private:
- bool Handshake();
- bool ReceivePacket(ui64* server_packet = nullptr);
- void SendQuery(const TString& query, const TString& query_id, const TString& deduplication_token = "");
- void SendData(const TBlock& block);
- bool SendHello();
- bool ReadBlock(TBlock* block, TCodedInputStream* input);
- bool ReceiveHello();
- /// Reads data packet form input stream.
- bool ReceiveData();
- /// Reads exception packet form input stream.
- bool ReceiveException(bool rethrow = false);
- void WriteBlock(const TBlock& block, TCodedOutputStream* output);
- private:
- void Disconnect() {
- Socket_ = TSocket();
- }
- /// In case of network errors tries to reconnect to server and
- /// call fuc several times.
- void RetryGuard(std::function<void()> fuc);
- private:
- class EnsureNull {
- public:
- inline EnsureNull(TQueryEvents* ev, TQueryEvents** ptr)
- : ptr_(ptr)
- {
- if (ptr_) {
- *ptr_ = ev;
- }
- }
- inline ~EnsureNull() {
- if (ptr_) {
- *ptr_ = nullptr;
- }
- }
- private:
- TQueryEvents** ptr_;
- };
- const TClientOptions Options_;
- TQueryEvents* Events_;
- int Compression_ = CompressionState::Disable;
- TSocket Socket_;
- TSocketInput SocketInput_;
- TSocketOutput SocketOutput_;
- THolder<TBufferedInput> BufferedInput_;
- THolder<TBufferedOutput> BufferedOutput_;
- THolder<TOpenSslClientIO> SslClient_;
- TCodedInputStream Input_;
- TCodedOutputStream Output_;
- TServerInfo ServerInfo_;
- };
- TClient::TImpl::TImpl(const TClientOptions& opts)
- : Options_(opts)
- , Events_(nullptr)
- , Socket_(TNetworkAddress(opts.Host, opts.Port), Options_.ConnectTimeout)
- , SocketInput_(Socket_)
- , SocketOutput_(Socket_)
- {
- if (opts.UseSsl) {
- SslClient_ = MakeHolder<TOpenSslClientIO>(&SocketInput_, &SocketOutput_, opts.SslOptions);
- BufferedInput_ = MakeHolder<TBufferedInput>(SslClient_.Get());
- BufferedOutput_ = MakeHolder<TBufferedOutput>(SslClient_.Get());
- } else {
- BufferedInput_ = MakeHolder<TBufferedInput>(&SocketInput_);
- BufferedOutput_ = MakeHolder<TBufferedOutput>(&SocketOutput_);
- }
- Input_ = TCodedInputStream(BufferedInput_.Get());
- Output_ = TCodedOutputStream(BufferedOutput_.Get());
- if (Options_.RequestTimeout.Seconds()) {
- Socket_.SetSocketTimeout(Options_.RequestTimeout.Seconds());
- }
- if (!Handshake()) {
- ythrow yexception() << "fail to connect to " << Options_.Host;
- }
- if (Options_.CompressionMethod != ECompressionMethod::None) {
- Compression_ = CompressionState::Enable;
- }
- }
- TClient::TImpl::~TImpl() {
- Disconnect();
- }
- void TClient::TImpl::ExecuteQuery(TQuery query) {
- EnsureNull en(static_cast<TQueryEvents*>(&query), &Events_);
- if (Options_.PingBeforeQuery) {
- RetryGuard([this]() { Ping(); });
- }
- SendQuery(query.GetText(), query.GetId());
- ui64 server_packet = 0;
- while (ReceivePacket(&server_packet)) {
- ;
- }
- if (server_packet != ServerCodes::EndOfStream && server_packet != ServerCodes::Exception) {
- ythrow yexception() << "unexpected packet from server while receiving end of query (got: "
- << (server_packet ? ToString(server_packet) : "nothing") << ")";
- }
- }
- void TClient::TImpl::Insert(const TString& table_name, const TBlock& block, const TString& query_id, const TString& deduplication_token) {
- if (Options_.PingBeforeQuery) {
- RetryGuard([this]() { Ping(); });
- }
- TVector<TString> fields;
- fields.reserve(block.GetColumnCount());
- // Enumerate all fields
- for (TBlock::TIterator bi(block); bi.IsValid(); bi.Next()) {
- fields.push_back(bi.Name());
- }
- TStringBuilder fields_section;
- for (auto elem = fields.begin(); elem != fields.end(); ++elem) {
- if (std::distance(elem, fields.end()) == 1) {
- fields_section << *elem;
- } else {
- fields_section << *elem << ",";
- }
- }
- SendQuery("INSERT INTO " + table_name + " ( " + fields_section + " ) VALUES", query_id, deduplication_token);
- ui64 server_packet(0);
- // Receive data packet.
- while (true) {
- bool ret = ReceivePacket(&server_packet);
- if (!ret) {
- ythrow yexception() << "unable to receive data packet";
- }
- if (server_packet == ServerCodes::Data) {
- break;
- }
- if (server_packet == ServerCodes::Progress) {
- continue;
- }
- }
- // Send data.
- SendData(block);
- // Send empty block as marker of
- // end of data.
- SendData(TBlock());
- // Wait for EOS.
- ui64 eos_packet{0};
- while (ReceivePacket(&eos_packet)) {
- ;
- }
- if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception
- && eos_packet != ServerCodes::Log && Options_.RethrowExceptions) {
- ythrow yexception() << "unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "
- << (eos_packet ? ToString(eos_packet) : "nothing") << ")";
- }
- }
- void TClient::TImpl::Ping() {
- TWireFormat::WriteUInt64(&Output_, ClientCodes::Ping);
- Output_.Flush();
- ui64 server_packet;
- const bool ret = ReceivePacket(&server_packet);
- if (!ret || server_packet != ServerCodes::Pong) {
- ythrow yexception() << "fail to ping server";
- }
- }
- void TClient::TImpl::ResetConnection() {
- Socket_ = TSocket(TNetworkAddress(Options_.Host, Options_.Port), Options_.ConnectTimeout);
- if (Options_.UseSsl) {
- SslClient_.Reset(new TOpenSslClientIO(&SocketInput_, &SocketOutput_, Options_.SslOptions));
- BufferedInput_.Reset(new TBufferedInput(SslClient_.Get()));
- BufferedOutput_.Reset(new TBufferedOutput(SslClient_.Get()));
- } else {
- BufferedInput_.Reset(new TBufferedInput(&SocketInput_));
- BufferedOutput_.Reset(new TBufferedOutput(&SocketOutput_));
- }
- SocketInput_ = TSocketInput(Socket_);
- SocketOutput_ = TSocketOutput(Socket_);
- Input_ = TCodedInputStream(BufferedInput_.Get());
- Output_ = TCodedOutputStream(BufferedOutput_.Get());
- if (Options_.RequestTimeout.Seconds()) {
- Socket_.SetSocketTimeout(Options_.RequestTimeout.Seconds());
- }
- if (!Handshake()) {
- ythrow yexception() << "fail to connect to " << Options_.Host;
- }
- }
- bool TClient::TImpl::Handshake() {
- if (!SendHello()) {
- return false;
- }
- if (!ReceiveHello()) {
- return false;
- }
- return true;
- }
- bool TClient::TImpl::ReceivePacket(ui64* server_packet) {
- ui64 packet_type = 0;
- if (!Input_.ReadVarint64(&packet_type)) {
- return false;
- }
- if (server_packet) {
- *server_packet = packet_type;
- }
- switch (packet_type) {
- case ServerCodes::Totals:
- case ServerCodes::Data: {
- if (!ReceiveData()) {
- ythrow yexception() << "can't read data packet from input stream";
- }
- return true;
- }
- case ServerCodes::Exception: {
- ReceiveException();
- return false;
- }
- case ServerCodes::ProfileInfo: {
- TProfile profile;
- if (!TWireFormat::ReadUInt64(&Input_, &profile.rows)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &profile.blocks)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &profile.bytes)) {
- return false;
- }
- if (!TWireFormat::ReadFixed(&Input_, &profile.applied_limit)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &profile.rows_before_limit)) {
- return false;
- }
- if (!TWireFormat::ReadFixed(&Input_, &profile.calculated_rows_before_limit)) {
- return false;
- }
- if (Events_) {
- Events_->OnProfile(profile);
- }
- return true;
- }
- case ServerCodes::Progress: {
- TProgress info;
- if (!TWireFormat::ReadUInt64(&Input_, &info.rows)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &info.bytes)) {
- return false;
- }
- if (REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) {
- if (!TWireFormat::ReadUInt64(&Input_, &info.total_rows)) {
- return false;
- }
- }
- if (Events_) {
- Events_->OnProgress(info);
- }
- return true;
- }
- case ServerCodes::Pong: {
- return true;
- }
- case ServerCodes::EndOfStream: {
- if (Events_) {
- Events_->OnFinish();
- }
- return false;
- }
- default:
- ythrow yexception() << "unimplemented " << (int)packet_type;
- break;
- }
- return false;
- }
- bool TClient::TImpl::ReadBlock(TBlock* block, TCodedInputStream* input) {
- // Additional information about block.
- if (REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
- ui64 num;
- TBlockInfo info;
- // BlockInfo
- if (!TWireFormat::ReadUInt64(input, &num)) {
- return false;
- }
- if (!TWireFormat::ReadFixed(input, &info.IsOverflows)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(input, &num)) {
- return false;
- }
- if (!TWireFormat::ReadFixed(input, &info.BucketNum)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(input, &num)) {
- return false;
- }
- // TODO use data
- }
- ui64 num_columns = 0;
- ui64 num_rows = 0;
- if (!TWireFormat::ReadUInt64(input, &num_columns)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(input, &num_rows)) {
- return false;
- }
- for (size_t i = 0; i < num_columns; ++i) {
- TString name;
- TString type;
- if (!TWireFormat::ReadString(input, &name)) {
- return false;
- }
- if (!TWireFormat::ReadString(input, &type)) {
- return false;
- }
- if (TColumnRef col = CreateColumnByType(type)) {
- if (num_rows && !col->Load(input, num_rows)) {
- ythrow yexception() << "can't load";
- }
- block->AppendColumn(name, col);
- } else {
- ythrow yexception() << "unsupported column type: " << type;
- }
- }
- return true;
- }
- bool TClient::TImpl::ReceiveData() {
- TBlock block;
- if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
- TString table_name;
- if (!TWireFormat::ReadString(&Input_, &table_name)) {
- return false;
- }
- }
- if (Compression_ == CompressionState::Enable) {
- TCompressedInput compressed(&Input_);
- TCodedInputStream coded(&compressed);
- if (!ReadBlock(&block, &coded)) {
- return false;
- }
- } else {
- if (!ReadBlock(&block, &Input_)) {
- return false;
- }
- }
- if (Events_) {
- Events_->OnData(block);
- }
- return true;
- }
- bool TClient::TImpl::ReceiveException(bool rethrow) {
- std::unique_ptr<TException> e(new TException);
- TException* current = e.get();
- bool exception_received = true;
- do {
- bool has_nested = false;
- if (!TWireFormat::ReadFixed(&Input_, ¤t->Code)) {
- exception_received = false;
- break;
- }
- if (!TWireFormat::ReadString(&Input_, ¤t->Name)) {
- exception_received = false;
- break;
- }
- if (!TWireFormat::ReadString(&Input_, ¤t->DisplayText)) {
- exception_received = false;
- break;
- }
- if (!TWireFormat::ReadString(&Input_, ¤t->StackTrace)) {
- exception_received = false;
- break;
- }
- if (!TWireFormat::ReadFixed(&Input_, &has_nested)) {
- exception_received = false;
- break;
- }
- if (has_nested) {
- current->Nested.reset(new TException);
- current = current->Nested.get();
- } else {
- break;
- }
- } while (true);
- if (Events_) {
- Events_->OnServerException(*e);
- }
- if (rethrow || Options_.RethrowExceptions) {
- throw TServerException(std::move(e));
- }
- return exception_received;
- }
- void TClient::TImpl::SendQuery(const TString& query, const TString& query_id, const TString& deduplication_token) {
- TWireFormat::WriteUInt64(&Output_, ClientCodes::Query);
- TWireFormat::WriteString(&Output_, query_id);
- /// Client info.
- if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) {
- TClientInfo info;
- info.QueryKind = 1;
- info.ClientName = "ClickHouse client";
- info.ClientVersionMajor = DBMS_VERSION_MAJOR;
- info.ClientVersionMinor = DBMS_VERSION_MINOR;
- info.ClientRevision = REVISION;
- TWireFormat::WriteFixed(&Output_, info.QueryKind);
- TWireFormat::WriteString(&Output_, info.InitialUser);
- TWireFormat::WriteString(&Output_, info.InitialQueryId);
- TWireFormat::WriteString(&Output_, info.InitialAddress);
- TWireFormat::WriteFixed(&Output_, info.IfaceType);
- TWireFormat::WriteString(&Output_, info.OsUser);
- TWireFormat::WriteString(&Output_, info.ClientHostname);
- TWireFormat::WriteString(&Output_, info.ClientName);
- TWireFormat::WriteUInt64(&Output_, info.ClientVersionMajor);
- TWireFormat::WriteUInt64(&Output_, info.ClientVersionMinor);
- TWireFormat::WriteUInt64(&Output_, info.ClientRevision);
- if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
- TWireFormat::WriteString(&Output_, info.QuotaKey);
- }
- if (!deduplication_token.empty()) {
- static const TString insert_deduplication_token_setting_name = "insert_deduplication_token";
- TWireFormat::WriteString(&Output_, insert_deduplication_token_setting_name);
- TWireFormat::WriteString(&Output_, deduplication_token);
- }
- TWireFormat::WriteString(&Output_, TString()); // Empty string is a marker of end SETTINGS section
- TWireFormat::WriteUInt64(&Output_, Stages::Complete);
- TWireFormat::WriteUInt64(&Output_, Compression_);
- TWireFormat::WriteString(&Output_, query);
- // Send empty block as marker of
- // end of data
- SendData(TBlock());
- Output_.Flush();
- }
- void TClient::TImpl::WriteBlock(const TBlock& block, TCodedOutputStream* output) {
- /// Дополнительная информация о блоке.
- if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
- TWireFormat::WriteUInt64(output, 1);
- TWireFormat::WriteFixed(output, block.Info().IsOverflows);
- TWireFormat::WriteUInt64(output, 2);
- TWireFormat::WriteFixed(output, block.Info().BucketNum);
- TWireFormat::WriteUInt64(output, 0);
- }
- TWireFormat::WriteUInt64(output, block.GetColumnCount());
- TWireFormat::WriteUInt64(output, block.GetRowCount());
- for (TBlock::TIterator bi(block); bi.IsValid(); bi.Next()) {
- TWireFormat::WriteString(output, bi.Name());
- TWireFormat::WriteString(output, bi.Type()->GetName());
- bi.Column()->Save(output);
- }
- }
- void TClient::TImpl::SendData(const TBlock& block) {
- TWireFormat::WriteUInt64(&Output_, ClientCodes::Data);
- if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
- TWireFormat::WriteString(&Output_, TString());
- }
- if (Compression_ == CompressionState::Enable) {
- switch (Options_.CompressionMethod) {
- case ECompressionMethod::None: {
- Y_ABORT_UNLESS(false, "invalid state");
- break;
- }
- case ECompressionMethod::LZ4: {
- TBufferOutput tmp;
- // Serialize block's data
- {
- TCodedOutputStream out(&tmp);
- WriteBlock(block, &out);
- }
- // Reserver space for data
- TBuffer buf;
- buf.Resize(9 + LZ4_compressBound(tmp.Buffer().Size()));
- // Compress data
- int size = LZ4_compress(tmp.Buffer().Data(), buf.Data() + 9, tmp.Buffer().Size());
- buf.Resize(9 + size);
- // Fill header
- ui8* p = (ui8*)buf.Data();
- // Compression method
- WriteUnaligned<ui8>(p, (ui8)0x82);
- p += 1;
- // Compressed data size with header
- WriteUnaligned<ui32>(p, (ui32)buf.Size());
- p += 4;
- // Original data size
- WriteUnaligned<ui32>(p, (ui32)tmp.Buffer().Size());
- TWireFormat::WriteFixed(&Output_, CityHash_v1_0_2::CityHash128(
- buf.Data(), buf.Size()));
- TWireFormat::WriteBytes(&Output_, buf.Data(), buf.Size());
- break;
- }
- }
- } else {
- WriteBlock(block, &Output_);
- }
- Output_.Flush();
- }
- bool TClient::TImpl::SendHello() {
- TWireFormat::WriteUInt64(&Output_, ClientCodes::Hello);
- TWireFormat::WriteString(&Output_, TString(DBMS_NAME) + " client");
- TWireFormat::WriteUInt64(&Output_, DBMS_VERSION_MAJOR);
- TWireFormat::WriteUInt64(&Output_, DBMS_VERSION_MINOR);
- TWireFormat::WriteUInt64(&Output_, REVISION);
- TWireFormat::WriteString(&Output_, Options_.DefaultDatabase);
- TWireFormat::WriteString(&Output_, Options_.User);
- TWireFormat::WriteString(&Output_, Options_.Password);
- Output_.Flush();
- return true;
- }
- bool TClient::TImpl::ReceiveHello() {
- ui64 packet_type = 0;
- if (!Input_.ReadVarint64(&packet_type)) {
- return false;
- }
- if (packet_type == ServerCodes::Hello) {
- if (!TWireFormat::ReadString(&Input_, &ServerInfo_.Name)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.VersionMajor)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.VersionMinor)) {
- return false;
- }
- if (!TWireFormat::ReadUInt64(&Input_, &ServerInfo_.Revision)) {
- return false;
- }
- if (ServerInfo_.Revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) {
- if (!TWireFormat::ReadString(&Input_, &ServerInfo_.Timezone)) {
- return false;
- }
- }
- return true;
- } else if (packet_type == ServerCodes::Exception) {
- ReceiveException(true);
- return false;
- }
- return false;
- }
- void TClient::TImpl::RetryGuard(std::function<void()> func) {
- for (int i = 0; i <= Options_.SendRetries; ++i) {
- try {
- func();
- return;
- } catch (const yexception&) {
- bool ok = true;
- try {
- Sleep(Options_.RetryTimeout);
- ResetConnection();
- } catch (...) {
- ok = false;
- }
- if (!ok) {
- throw;
- }
- }
- }
- }
- TClient::TClient(const TClientOptions& opts)
- : Options_(opts)
- , Impl_(new TImpl(opts))
- {
- }
- TClient::~TClient() {
- }
- void TClient::Execute(const TQuery& query) {
- Impl_->ExecuteQuery(query);
- }
- void TClient::Select(const TString& query, TSelectCallback cb, const TString& query_id) {
- Execute(TQuery(query, query_id).OnData(cb));
- }
- void TClient::Select(const TQuery& query) {
- Execute(query);
- }
- void TClient::Insert(const TString& table_name, const TBlock& block, const TString& query_id, const TString& deduplication_token) {
- Impl_->Insert(table_name, block, query_id, deduplication_token);
- }
- void TClient::Ping() {
- Impl_->Ping();
- }
- void TClient::ResetConnection() {
- Impl_->ResetConnection();
- }
- }
|