|
@@ -4,6 +4,8 @@
|
|
|
|
|
|
#include <yt/yt/library/profiling/sensor.h>
|
|
|
|
|
|
+#include <yt/yt/library/tvm/service/tvm_service.h>
|
|
|
+
|
|
|
#include <yt/yt/core/rpc/grpc/channel.h>
|
|
|
|
|
|
#include <yt/yt/core/concurrency/action_queue.h>
|
|
@@ -27,11 +29,14 @@ using namespace NRpc;
|
|
|
using namespace NConcurrency;
|
|
|
using namespace NProfiling;
|
|
|
using namespace NYTree;
|
|
|
+using namespace NAuth;
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
static const NLogging::TLogger Logger{"Jaeger"};
|
|
|
static const NProfiling::TProfiler Profiler{"/tracing"};
|
|
|
+static const TString ServiceTicketMetadataName = "x-ya-service-ticket";
|
|
|
+static const TString TracingServiceAlias = "tracing";
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
@@ -85,6 +90,9 @@ void TJaegerTracerConfig::Register(TRegistrar registrar)
|
|
|
.Default();
|
|
|
registrar.Parameter("enable_pid_tag", &TThis::EnablePidTag)
|
|
|
.Default(false);
|
|
|
+
|
|
|
+ registrar.Parameter("tvm_service", &TThis::TvmService)
|
|
|
+ .Optional();
|
|
|
}
|
|
|
|
|
|
TJaegerTracerConfigPtr TJaegerTracerConfig::ApplyDynamic(const TJaegerTracerDynamicConfigPtr& dynamicConfig) const
|
|
@@ -108,6 +116,7 @@ TJaegerTracerConfigPtr TJaegerTracerConfig::ApplyDynamic(const TJaegerTracerDyna
|
|
|
config->ServiceName = ServiceName;
|
|
|
config->ProcessTags = ProcessTags;
|
|
|
config->EnablePidTag = EnablePidTag;
|
|
|
+ config->TvmService = TvmService;
|
|
|
|
|
|
config->Postprocess();
|
|
|
return config;
|
|
@@ -287,8 +296,12 @@ TJaegerChannelManager::TJaegerChannelManager()
|
|
|
, RpcTimeout_()
|
|
|
{ }
|
|
|
|
|
|
-TJaegerChannelManager::TJaegerChannelManager(const TIntrusivePtr<TJaegerTracerConfig>& config, const TString& endpoint)
|
|
|
- : Endpoint_(endpoint)
|
|
|
+TJaegerChannelManager::TJaegerChannelManager(
|
|
|
+ const TIntrusivePtr<TJaegerTracerConfig>& config,
|
|
|
+ const TString& endpoint,
|
|
|
+ const ITvmServicePtr& tvmService)
|
|
|
+ : TvmService_(tvmService)
|
|
|
+ , Endpoint_(endpoint)
|
|
|
, ReopenTime_(TInstant::Now() + config->ReconnectPeriod + RandomDuration(config->ReconnectPeriod))
|
|
|
, RpcTimeout_(config->RpcTimeout)
|
|
|
, PushedBytes_(Profiler.WithTag("endpoint", endpoint).Counter("/pushed_bytes"))
|
|
@@ -312,6 +325,12 @@ bool TJaegerChannelManager::Push(const std::vector<TSharedRef>& batches, int spa
|
|
|
req->SetEnableLegacyRpcCodecs(false);
|
|
|
req->set_batch(MergeRefsToString(batches));
|
|
|
|
|
|
+ if (TvmService_) {
|
|
|
+ auto* ticketExt = req->Header().MutableExtension(NRpc::NProto::TCustomMetadataExt::custom_metadata_ext);
|
|
|
+ ticketExt->mutable_entries()->insert(
|
|
|
+ {ServiceTicketMetadataName, TvmService_->GetServiceTicket(TracingServiceAlias)});
|
|
|
+ }
|
|
|
+
|
|
|
YT_LOG_DEBUG("Sending spans (SpanCount: %v, PayloadSize: %v, Endpoint: %v)",
|
|
|
spanCount,
|
|
|
req->batch().size(),
|
|
@@ -359,6 +378,7 @@ TJaegerTracer::TJaegerTracer(
|
|
|
BIND(&TJaegerTracer::Flush, MakeStrong(this)),
|
|
|
config->FlushPeriod))
|
|
|
, Config_(config)
|
|
|
+ , TvmService_(config->TvmService ? CreateTvmService(config->TvmService) : nullptr)
|
|
|
{
|
|
|
Profiler.AddFuncGauge("/enabled", MakeStrong(this), [this] {
|
|
|
return Config_.Acquire()->IsEnabled();
|
|
@@ -563,13 +583,13 @@ void TJaegerTracer::Flush()
|
|
|
|
|
|
auto it = CollectorChannels_.find(endpoint);
|
|
|
if (it == CollectorChannels_.end()) {
|
|
|
- it = CollectorChannels_.insert({endpoint, TJaegerChannelManager(config, endpoint)}).first;
|
|
|
+ it = CollectorChannels_.emplace(endpoint, TJaegerChannelManager(config, endpoint, TvmService_)).first;
|
|
|
}
|
|
|
|
|
|
auto& channel = it->second;
|
|
|
|
|
|
if (channel.NeedsReopen(flushStartTime)) {
|
|
|
- channel = TJaegerChannelManager(config, endpoint);
|
|
|
+ channel = TJaegerChannelManager(config, endpoint, TvmService_);
|
|
|
}
|
|
|
|
|
|
if (channel.Push(batches, spanCount)) {
|