|
@@ -48,13 +48,10 @@
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
|
|
|
#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
|
|
|
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
|
|
|
-#include "src/core/ext/filters/client_channel/resolver_registry.h"
|
|
|
#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
|
|
|
#include "src/core/ext/filters/client_channel/retry_filter.h"
|
|
|
#include "src/core/ext/filters/client_channel/subchannel.h"
|
|
|
#include "src/core/ext/filters/deadline/deadline_filter.h"
|
|
|
-#include "src/core/ext/service_config/service_config.h"
|
|
|
-#include "src/core/ext/service_config/service_config_call_data.h"
|
|
|
#include "src/core/lib/backoff/backoff.h"
|
|
|
#include "src/core/lib/channel/channel_args.h"
|
|
|
#include "src/core/lib/channel/connected_channel.h"
|
|
@@ -65,15 +62,15 @@
|
|
|
#include "src/core/lib/iomgr/polling_entity.h"
|
|
|
#include "src/core/lib/iomgr/work_serializer.h"
|
|
|
#include "src/core/lib/profiling/timers.h"
|
|
|
+#include "src/core/lib/resolver/resolver_registry.h"
|
|
|
+#include "src/core/lib/service_config/service_config.h"
|
|
|
+#include "src/core/lib/service_config/service_config_call_data.h"
|
|
|
#include "src/core/lib/slice/slice_internal.h"
|
|
|
#include "src/core/lib/slice/slice_string_helpers.h"
|
|
|
#include "src/core/lib/surface/channel.h"
|
|
|
#include "src/core/lib/transport/connectivity_state.h"
|
|
|
#include "src/core/lib/transport/error_utils.h"
|
|
|
-#include "src/core/lib/transport/metadata.h"
|
|
|
#include "src/core/lib/transport/metadata_batch.h"
|
|
|
-#include "src/core/lib/transport/static_metadata.h"
|
|
|
-#include "src/core/lib/transport/status_metadata.h"
|
|
|
|
|
|
//
|
|
|
// Client channel filter
|
|
@@ -417,16 +414,11 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
|
|
|
}
|
|
|
|
|
|
- void ReturnResult(Resolver::Result result) override
|
|
|
+ void ReportResult(Resolver::Result result) override
|
|
|
Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
|
|
|
chand_->OnResolverResultChangedLocked(std::move(result));
|
|
|
}
|
|
|
|
|
|
- void ReturnError(grpc_error_handle error) override
|
|
|
- Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
|
|
|
- chand_->OnResolverErrorLocked(error);
|
|
|
- }
|
|
|
-
|
|
|
private:
|
|
|
ClientChannel* chand_;
|
|
|
};
|
|
@@ -1122,7 +1114,6 @@ ClientChannel::~ClientChannel() {
|
|
|
}
|
|
|
DestroyResolverAndLbPolicyLocked();
|
|
|
grpc_channel_args_destroy(channel_args_);
|
|
|
- GRPC_ERROR_UNREF(resolver_transient_failure_error_);
|
|
|
// Stop backup polling.
|
|
|
grpc_client_channel_stop_backup_polling(interested_parties_);
|
|
|
grpc_pollset_set_destroy(interested_parties_);
|
|
@@ -1203,26 +1194,29 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
|
|
|
//
|
|
|
// We track a list of strings to eventually be concatenated and traced.
|
|
|
y_absl::InlinedVector<const char*, 3> trace_strings;
|
|
|
- if (result.addresses.empty() && previous_resolution_contained_addresses_) {
|
|
|
+ const bool resolution_contains_addresses =
|
|
|
+ result.addresses.ok() && !result.addresses->empty();
|
|
|
+ if (!resolution_contains_addresses &&
|
|
|
+ previous_resolution_contained_addresses_) {
|
|
|
trace_strings.push_back("Address list became empty");
|
|
|
- } else if (!result.addresses.empty() &&
|
|
|
+ } else if (resolution_contains_addresses &&
|
|
|
!previous_resolution_contained_addresses_) {
|
|
|
trace_strings.push_back("Address list became non-empty");
|
|
|
}
|
|
|
- previous_resolution_contained_addresses_ = !result.addresses.empty();
|
|
|
+ previous_resolution_contained_addresses_ = resolution_contains_addresses;
|
|
|
TString service_config_error_string_storage;
|
|
|
- if (result.service_config_error != GRPC_ERROR_NONE) {
|
|
|
+ if (!result.service_config.ok()) {
|
|
|
service_config_error_string_storage =
|
|
|
- grpc_error_std_string(result.service_config_error);
|
|
|
+ result.service_config.status().ToString();
|
|
|
trace_strings.push_back(service_config_error_string_storage.c_str());
|
|
|
}
|
|
|
// Choose the service config.
|
|
|
RefCountedPtr<ServiceConfig> service_config;
|
|
|
RefCountedPtr<ConfigSelector> config_selector;
|
|
|
- if (result.service_config_error != GRPC_ERROR_NONE) {
|
|
|
+ if (!result.service_config.ok()) {
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
|
|
|
gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
|
|
|
- this, grpc_error_std_string(result.service_config_error).c_str());
|
|
|
+ this, result.service_config.status().ToString().c_str());
|
|
|
}
|
|
|
// If the service config was invalid, then fallback to the
|
|
|
// previously returned service config.
|
|
@@ -1236,13 +1230,13 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
|
|
|
service_config = saved_service_config_;
|
|
|
config_selector = saved_config_selector_;
|
|
|
} else {
|
|
|
- // We received an invalid service config and we don't have a
|
|
|
+ // We received a service config error and we don't have a
|
|
|
// previous service config to fall back to. Put the channel into
|
|
|
// TRANSIENT_FAILURE.
|
|
|
- OnResolverErrorLocked(GRPC_ERROR_REF(result.service_config_error));
|
|
|
+ OnResolverErrorLocked(result.service_config.status());
|
|
|
trace_strings.push_back("no valid service config");
|
|
|
}
|
|
|
- } else if (result.service_config == nullptr) {
|
|
|
+ } else if (*result.service_config == nullptr) {
|
|
|
// Resolver did not return any service config.
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
|
|
|
gpr_log(GPR_INFO,
|
|
@@ -1253,9 +1247,12 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
|
|
|
service_config = default_service_config_;
|
|
|
} else {
|
|
|
// Use ServiceConfig and ConfigSelector returned by resolver.
|
|
|
- service_config = result.service_config;
|
|
|
+ service_config = std::move(*result.service_config);
|
|
|
config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
|
|
|
}
|
|
|
+ // Note: The only case in which service_config is null here is if the resolver
|
|
|
+ // returned a service config error and we don't have a previous service
|
|
|
+ // config to fall back to.
|
|
|
if (service_config != nullptr) {
|
|
|
// Extract global config for client channel.
|
|
|
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
|
|
@@ -1307,28 +1304,21 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
|
|
|
- if (resolver_ == nullptr) {
|
|
|
- GRPC_ERROR_UNREF(error);
|
|
|
- return;
|
|
|
- }
|
|
|
+void ClientChannel::OnResolverErrorLocked(y_absl::Status status) {
|
|
|
+ if (resolver_ == nullptr) return;
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
|
|
|
gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
|
|
|
- grpc_error_std_string(error).c_str());
|
|
|
+ status.ToString().c_str());
|
|
|
}
|
|
|
// If we already have an LB policy from a previous resolution
|
|
|
// result, then we continue to let it set the connectivity state.
|
|
|
// Otherwise, we go into TRANSIENT_FAILURE.
|
|
|
if (lb_policy_ == nullptr) {
|
|
|
- grpc_error_handle state_error =
|
|
|
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
|
|
|
- "Resolver transient failure", &error, 1);
|
|
|
- y_absl::Status status = grpc_error_to_absl_status(state_error);
|
|
|
+ grpc_error_handle error = absl_status_to_grpc_error(status);
|
|
|
{
|
|
|
MutexLock lock(&resolution_mu_);
|
|
|
// Update resolver transient failure.
|
|
|
- GRPC_ERROR_UNREF(resolver_transient_failure_error_);
|
|
|
- resolver_transient_failure_error_ = state_error;
|
|
|
+ resolver_transient_failure_error_ = status;
|
|
|
// Process calls that were queued waiting for the resolver result.
|
|
|
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
|
|
|
call = call->next) {
|
|
@@ -1340,12 +1330,12 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ GRPC_ERROR_UNREF(error);
|
|
|
// Update connectivity state.
|
|
|
UpdateStateAndPickerLocked(
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
|
|
|
y_absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(status));
|
|
|
}
|
|
|
- GRPC_ERROR_UNREF(error);
|
|
|
}
|
|
|
|
|
|
void ClientChannel::CreateOrUpdateLbPolicyLocked(
|
|
@@ -1356,6 +1346,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked(
|
|
|
LoadBalancingPolicy::UpdateArgs update_args;
|
|
|
update_args.addresses = std::move(result.addresses);
|
|
|
update_args.config = std::move(lb_policy_config);
|
|
|
+ update_args.resolution_note = std::move(result.resolution_note);
|
|
|
// Add health check service name to channel args.
|
|
|
y_absl::InlinedVector<grpc_arg, 1> args_to_add;
|
|
|
if (health_check_service_name.has_value()) {
|
|
@@ -1496,8 +1487,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
|
|
|
// after releasing the lock to keep the critical section small.
|
|
|
{
|
|
|
MutexLock lock(&resolution_mu_);
|
|
|
- GRPC_ERROR_UNREF(resolver_transient_failure_error_);
|
|
|
- resolver_transient_failure_error_ = GRPC_ERROR_NONE;
|
|
|
+ resolver_transient_failure_error_ = y_absl::OkStatus();
|
|
|
// Update service config.
|
|
|
received_service_config_data_ = true;
|
|
|
// Old values will be unreffed after lock is released.
|
|
@@ -2353,12 +2343,11 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
|
|
|
if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
|
|
|
// If the resolver returned transient failure before returning the
|
|
|
// first service config, fail any non-wait_for_ready calls.
|
|
|
- grpc_error_handle resolver_error = chand->resolver_transient_failure_error_;
|
|
|
- if (resolver_error != GRPC_ERROR_NONE &&
|
|
|
- (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
|
|
|
- 0) {
|
|
|
+ y_absl::Status resolver_error = chand->resolver_transient_failure_error_;
|
|
|
+ if (!resolver_error.ok() && (send_initial_metadata_flags &
|
|
|
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
|
|
|
MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
|
|
|
- *error = GRPC_ERROR_REF(resolver_error);
|
|
|
+ *error = absl_status_to_grpc_error(resolver_error);
|
|
|
return true;
|
|
|
}
|
|
|
// Either the resolver has not yet returned a result, or it has
|
|
@@ -2414,16 +2403,25 @@ void ClientChannel::CallData::CreateDynamicCall(grpc_call_element* elem) {
|
|
|
class ClientChannel::LoadBalancedCall::Metadata
|
|
|
: public LoadBalancingPolicy::MetadataInterface {
|
|
|
public:
|
|
|
- Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
|
|
|
- : lb_call_(lb_call), batch_(batch) {}
|
|
|
+ explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
|
|
|
|
|
|
void Add(y_absl::string_view key, y_absl::string_view value) override {
|
|
|
- grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
|
|
|
- lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
|
|
|
- linked_mdelem->md = grpc_mdelem_from_slices(
|
|
|
- ExternallyManagedSlice(key.data(), key.size()),
|
|
|
- ExternallyManagedSlice(value.data(), value.size()));
|
|
|
- GPR_ASSERT(batch_->LinkTail(linked_mdelem) == GRPC_ERROR_NONE);
|
|
|
+ // Gross, egregious hack to support legacy grpclb behavior.
|
|
|
+ // TODO(ctiller): Use a promise context for this once that plumbing is done.
|
|
|
+ if (key == GrpcLbClientStatsMetadata::key()) {
|
|
|
+ batch_->Set(
|
|
|
+ GrpcLbClientStatsMetadata(),
|
|
|
+ const_cast<GrpcLbClientStats*>(
|
|
|
+ reinterpret_cast<const GrpcLbClientStats*>(value.data())));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ batch_->Append(key, Slice::FromStaticString(value),
|
|
|
+ [key](y_absl::string_view error, const Slice& value) {
|
|
|
+ gpr_log(GPR_ERROR, "%s",
|
|
|
+ y_absl::StrCat(error, " key:", key,
|
|
|
+ " value:", value.as_string_view())
|
|
|
+ .c_str());
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
std::vector<std::pair<TString, TString>> TestOnlyCopyToVector()
|
|
@@ -2435,18 +2433,15 @@ class ClientChannel::LoadBalancedCall::Metadata
|
|
|
|
|
|
y_absl::optional<y_absl::string_view> Lookup(y_absl::string_view key,
|
|
|
TString* buffer) const override {
|
|
|
- return batch_->GetValue(key, buffer);
|
|
|
+ return batch_->GetStringValue(key, buffer);
|
|
|
}
|
|
|
|
|
|
private:
|
|
|
class Encoder {
|
|
|
public:
|
|
|
- void Encode(grpc_mdelem md) {
|
|
|
- auto key = StringViewFromSlice(GRPC_MDKEY(md));
|
|
|
- if (key != ":path") {
|
|
|
- out_.emplace_back(TString(key),
|
|
|
- TString(StringViewFromSlice(GRPC_MDVALUE(md))));
|
|
|
- }
|
|
|
+ void Encode(const Slice& key, const Slice& value) {
|
|
|
+ out_.emplace_back(TString(key.as_string_view()),
|
|
|
+ TString(value.as_string_view()));
|
|
|
}
|
|
|
|
|
|
template <class Which>
|
|
@@ -2457,6 +2452,9 @@ class ClientChannel::LoadBalancedCall::Metadata
|
|
|
}
|
|
|
|
|
|
void Encode(GrpcTimeoutMetadata, grpc_millis) {}
|
|
|
+ void Encode(HttpPathMetadata, const Slice&) {}
|
|
|
+ void Encode(HttpMethodMetadata,
|
|
|
+ const typename HttpMethodMetadata::ValueType&) {}
|
|
|
|
|
|
std::vector<std::pair<TString, TString>> Take() {
|
|
|
return std::move(out_);
|
|
@@ -2466,7 +2464,6 @@ class ClientChannel::LoadBalancedCall::Metadata
|
|
|
std::vector<std::pair<TString, TString>> out_;
|
|
|
};
|
|
|
|
|
|
- LoadBalancedCall* lb_call_;
|
|
|
grpc_metadata_batch* batch_;
|
|
|
};
|
|
|
|
|
@@ -2558,7 +2555,6 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
|
|
|
GetCallAttemptTracer(args.context, is_transparent_retry)) {}
|
|
|
|
|
|
ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
|
|
|
- grpc_slice_unref_internal(path_);
|
|
|
GRPC_ERROR_UNREF(cancel_error_);
|
|
|
GRPC_ERROR_UNREF(failure_error_);
|
|
|
if (backend_metric_data_ != nullptr) {
|
|
@@ -2877,10 +2873,8 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
|
|
|
} else {
|
|
|
// Get status from headers.
|
|
|
const auto& md = *self->recv_trailing_metadata_;
|
|
|
- const auto& fields = md.legacy_index()->named;
|
|
|
- GPR_ASSERT(fields.grpc_status != nullptr);
|
|
|
grpc_status_code code =
|
|
|
- grpc_get_status_code_from_metadata(fields.grpc_status->md);
|
|
|
+ md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
|
|
|
if (code != GRPC_STATUS_OK) {
|
|
|
y_absl::string_view message;
|
|
|
if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
|
|
@@ -2898,7 +2892,7 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
|
|
|
// If the LB policy requested a callback for trailing metadata, invoke
|
|
|
// the callback.
|
|
|
if (self->lb_subchannel_call_tracker_ != nullptr) {
|
|
|
- Metadata trailing_metadata(self, self->recv_trailing_metadata_);
|
|
|
+ Metadata trailing_metadata(self->recv_trailing_metadata_);
|
|
|
BackendMetricAccessor backend_metric_accessor(self);
|
|
|
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
|
|
|
status, &trailing_metadata, &backend_metric_accessor};
|
|
@@ -2919,7 +2913,7 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
|
|
|
|
|
|
void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
|
|
|
SubchannelCall::Args call_args = {
|
|
|
- std::move(connected_subchannel_), pollent_, path_, /*start_time=*/0,
|
|
|
+ std::move(connected_subchannel_), pollent_, path_.Ref(), /*start_time=*/0,
|
|
|
deadline_, arena_,
|
|
|
// TODO(roth): When we implement hedging support, we will probably
|
|
|
// need to use a separate call context for each subchannel call.
|
|
@@ -3063,10 +3057,10 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
|
|
|
send_initial_metadata.send_initial_metadata_flags;
|
|
|
// Perform LB pick.
|
|
|
LoadBalancingPolicy::PickArgs pick_args;
|
|
|
- pick_args.path = StringViewFromSlice(path_);
|
|
|
+ pick_args.path = path_.as_string_view();
|
|
|
LbCallState lb_call_state(this);
|
|
|
pick_args.call_state = &lb_call_state;
|
|
|
- Metadata initial_metadata(this, initial_metadata_batch);
|
|
|
+ Metadata initial_metadata(initial_metadata_batch);
|
|
|
pick_args.initial_metadata = &initial_metadata;
|
|
|
auto result = chand_->picker_->Pick(pick_args);
|
|
|
return HandlePickResult<bool>(
|