123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- # Copyright 2015 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
- 'Internal gRPC call error %d. ' +
- 'Please report to https://github.com/grpc/grpc/issues')
- cdef str _call_error_metadata(metadata):
- return 'metadata was invalid: %s' % metadata
- cdef str _call_error_no_metadata(c_call_error):
- return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
- cdef str _call_error(c_call_error, metadata):
- if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
- return _call_error_metadata(metadata)
- else:
- return _call_error_no_metadata(c_call_error)
- cdef _check_call_error_no_metadata(c_call_error):
- if c_call_error != GRPC_CALL_OK:
- return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
- else:
- return None
- cdef _check_and_raise_call_error_no_metadata(c_call_error):
- error = _check_call_error_no_metadata(c_call_error)
- if error is not None:
- raise ValueError(error)
- cdef _check_call_error(c_call_error, metadata):
- if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
- return _call_error_metadata(metadata)
- else:
- return _check_call_error_no_metadata(c_call_error)
- cdef void _raise_call_error_no_metadata(c_call_error) except *:
- raise ValueError(_call_error_no_metadata(c_call_error))
- cdef void _raise_call_error(c_call_error, metadata) except *:
- raise ValueError(_call_error(c_call_error, metadata))
- cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
- grpc_completion_queue_shutdown(c_completion_queue)
- grpc_completion_queue_destroy(c_completion_queue)
- cdef class _CallState:
- def __cinit__(self):
- self.due = set()
- cdef class _ChannelState:
- def __cinit__(self):
- self.condition = threading.Condition()
- self.open = True
- self.integrated_call_states = {}
- self.segregated_call_states = set()
- self.connectivity_due = set()
- self.closed_reason = None
- cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
- cdef grpc_call_error c_call_error
- cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
- tag.prepare()
- cpython.Py_INCREF(tag)
- with nogil:
- c_call_error = grpc_call_start_batch(
- c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
- return c_call_error, tag
- cdef object _operate_from_integrated_call(
- _ChannelState channel_state, _CallState call_state, object operations,
- object user_tag):
- cdef grpc_call_error c_call_error
- cdef _BatchOperationTag tag
- with channel_state.condition:
- if call_state.due:
- c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
- if c_call_error == GRPC_CALL_OK:
- call_state.due.add(tag)
- channel_state.integrated_call_states[tag] = call_state
- return True
- else:
- _raise_call_error_no_metadata(c_call_error)
- else:
- return False
- cdef object _operate_from_segregated_call(
- _ChannelState channel_state, _CallState call_state, object operations,
- object user_tag):
- cdef grpc_call_error c_call_error
- cdef _BatchOperationTag tag
- with channel_state.condition:
- if call_state.due:
- c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
- if c_call_error == GRPC_CALL_OK:
- call_state.due.add(tag)
- return True
- else:
- _raise_call_error_no_metadata(c_call_error)
- else:
- return False
- cdef _cancel(
- _ChannelState channel_state, _CallState call_state, grpc_status_code code,
- str details):
- cdef grpc_call_error c_call_error
- with channel_state.condition:
- if call_state.due:
- c_call_error = grpc_call_cancel_with_status(
- call_state.c_call, code, _encode(details), NULL)
- _check_and_raise_call_error_no_metadata(c_call_error)
- cdef _next_call_event(
- _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
- on_success, on_failure, deadline):
- """Block on the next event out of the completion queue.
- On success, `on_success` will be invoked with the tag taken from the CQ.
- In the case of a failure due to an exception raised in a signal handler,
- `on_failure` will be invoked with no arguments. Note that this situation
- can only occur on the main thread.
- Args:
- channel_state: The state for the channel on which the RPC is running.
- c_completion_queue: The CQ which will be polled.
- on_success: A callable object to be invoked upon successful receipt of a
- tag from the CQ.
- on_failure: A callable object to be invoked in case a Python exception is
- raised from a signal handler during polling.
- deadline: The point after which the RPC will time out.
- """
- try:
- tag, event = _latent_event(c_completion_queue, deadline)
- # NOTE(rbellevi): This broad except enables us to clean up resources before
- # propagating any exceptions raised by signal handlers to the application.
- except:
- if on_failure is not None:
- on_failure()
- raise
- else:
- with channel_state.condition:
- on_success(tag)
- channel_state.condition.notify_all()
- return event
- # TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
- cdef void _call(
- _ChannelState channel_state, _CallState call_state,
- grpc_completion_queue *c_completion_queue, on_success, int flags, method,
- host, object deadline, CallCredentials credentials,
- object operationses_and_user_tags, object metadata,
- object context) except *:
- """Invokes an RPC.
- Args:
- channel_state: A _ChannelState with its "open" attribute set to True. RPCs
- may not be invoked on a closed channel.
- call_state: An empty _CallState to be altered (specifically assigned a
- c_call and having its due set populated) if the RPC invocation is
- successful.
- c_completion_queue: A grpc_completion_queue to be used for the call's
- operations.
- on_success: A behavior to be called if attempting to start operations for
- the call succeeds. If called the behavior will be called while holding the
- channel_state condition and passed the tags associated with operations
- that were successfully started for the call.
- flags: Flags to be passed to gRPC Core as part of call creation.
- method: The fully-qualified name of the RPC method being invoked.
- host: A "host" string to be passed to gRPC Core as part of call creation.
- deadline: A float for the deadline of the RPC, or None if the RPC is to have
- no deadline.
- credentials: A _CallCredentials for the RPC or None.
- operationses_and_user_tags: A sequence of length-two sequences the first
- element of which is a sequence of Operations and the second element of
- which is an object to be used as a tag. A SendInitialMetadataOperation
- must be present in the first element of this value.
- metadata: The metadata for this call.
- context: Context object for distributed tracing.
- """
- cdef grpc_slice method_slice
- cdef grpc_slice host_slice
- cdef grpc_slice *host_slice_ptr
- cdef grpc_call_credentials *c_call_credentials
- cdef grpc_call_error c_call_error
- cdef tuple error_and_wrapper_tag
- cdef _BatchOperationTag wrapper_tag
- with channel_state.condition:
- if channel_state.open:
- method_slice = _slice_from_bytes(method)
- if host is None:
- host_slice_ptr = NULL
- else:
- host_slice = _slice_from_bytes(host)
- host_slice_ptr = &host_slice
- call_state.c_call = grpc_channel_create_call(
- channel_state.c_channel, NULL, flags,
- c_completion_queue, method_slice, host_slice_ptr,
- _timespec_from_time(deadline), NULL)
- grpc_slice_unref(method_slice)
- if host_slice_ptr:
- grpc_slice_unref(host_slice)
- if context is not None:
- set_census_context_on_call(call_state, context)
- if credentials is not None:
- c_call_credentials = credentials.c()
- c_call_error = grpc_call_set_credentials(
- call_state.c_call, c_call_credentials)
- grpc_call_credentials_release(c_call_credentials)
- if c_call_error != GRPC_CALL_OK:
- grpc_call_unref(call_state.c_call)
- call_state.c_call = NULL
- _raise_call_error_no_metadata(c_call_error)
- started_tags = set()
- for operations, user_tag in operationses_and_user_tags:
- c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
- if c_call_error == GRPC_CALL_OK:
- started_tags.add(tag)
- else:
- grpc_call_cancel(call_state.c_call, NULL)
- grpc_call_unref(call_state.c_call)
- call_state.c_call = NULL
- _raise_call_error(c_call_error, metadata)
- else:
- call_state.due.update(started_tags)
- on_success(started_tags)
- else:
- raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
- cdef void _process_integrated_call_tag(
- _ChannelState state, _BatchOperationTag tag) except *:
- cdef _CallState call_state = state.integrated_call_states.pop(tag)
- call_state.due.remove(tag)
- if not call_state.due:
- grpc_call_unref(call_state.c_call)
- call_state.c_call = NULL
- cdef class IntegratedCall:
- def __cinit__(self, _ChannelState channel_state, _CallState call_state):
- self._channel_state = channel_state
- self._call_state = call_state
- def operate(self, operations, tag):
- return _operate_from_integrated_call(
- self._channel_state, self._call_state, operations, tag)
- def cancel(self, code, details):
- _cancel(self._channel_state, self._call_state, code, details)
- cdef IntegratedCall _integrated_call(
- _ChannelState state, int flags, method, host, object deadline,
- object metadata, CallCredentials credentials, operationses_and_user_tags,
- object context):
- call_state = _CallState()
- def on_success(started_tags):
- for started_tag in started_tags:
- state.integrated_call_states[started_tag] = call_state
- _call(
- state, call_state, state.c_call_completion_queue, on_success, flags,
- method, host, deadline, credentials, operationses_and_user_tags, metadata, context)
- return IntegratedCall(state, call_state)
- cdef object _process_segregated_call_tag(
- _ChannelState state, _CallState call_state,
- grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
- call_state.due.remove(tag)
- if not call_state.due:
- grpc_call_unref(call_state.c_call)
- call_state.c_call = NULL
- state.segregated_call_states.remove(call_state)
- _destroy_c_completion_queue(c_completion_queue)
- return True
- else:
- return False
- cdef class SegregatedCall:
- def __cinit__(self, _ChannelState channel_state, _CallState call_state):
- self._channel_state = channel_state
- self._call_state = call_state
- def operate(self, operations, tag):
- return _operate_from_segregated_call(
- self._channel_state, self._call_state, operations, tag)
- def cancel(self, code, details):
- _cancel(self._channel_state, self._call_state, code, details)
- def next_event(self):
- def on_success(tag):
- _process_segregated_call_tag(
- self._channel_state, self._call_state, self._c_completion_queue, tag)
- def on_failure():
- self._call_state.due.clear()
- grpc_call_unref(self._call_state.c_call)
- self._call_state.c_call = NULL
- self._channel_state.segregated_call_states.remove(self._call_state)
- _destroy_c_completion_queue(self._c_completion_queue)
- return _next_call_event(
- self._channel_state, self._c_completion_queue, on_success, on_failure, None)
- cdef SegregatedCall _segregated_call(
- _ChannelState state, int flags, method, host, object deadline,
- object metadata, CallCredentials credentials, operationses_and_user_tags,
- object context):
- cdef _CallState call_state = _CallState()
- cdef SegregatedCall segregated_call
- cdef grpc_completion_queue *c_completion_queue
- def on_success(started_tags):
- state.segregated_call_states.add(call_state)
- with state.condition:
- if state.open:
- c_completion_queue = (grpc_completion_queue_create_for_next(NULL))
- else:
- raise ValueError('Cannot invoke RPC on closed channel!')
- try:
- _call(
- state, call_state, c_completion_queue, on_success, flags, method, host,
- deadline, credentials, operationses_and_user_tags, metadata,
- context)
- except:
- _destroy_c_completion_queue(c_completion_queue)
- raise
- segregated_call = SegregatedCall(state, call_state)
- segregated_call._c_completion_queue = c_completion_queue
- return segregated_call
- cdef object _watch_connectivity_state(
- _ChannelState state, grpc_connectivity_state last_observed_state,
- object deadline):
- cdef _ConnectivityTag tag = _ConnectivityTag(object())
- with state.condition:
- if state.open:
- cpython.Py_INCREF(tag)
- grpc_channel_watch_connectivity_state(
- state.c_channel, last_observed_state, _timespec_from_time(deadline),
- state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
- state.connectivity_due.add(tag)
- else:
- raise ValueError('Cannot monitor channel state: %s' % state.closed_reason)
- completed_tag, event = _latent_event(
- state.c_connectivity_completion_queue, None)
- with state.condition:
- state.connectivity_due.remove(completed_tag)
- state.condition.notify_all()
- return event
- cdef _close(Channel channel, grpc_status_code code, object details,
- drain_calls):
- cdef _ChannelState state = channel._state
- cdef _CallState call_state
- encoded_details = _encode(details)
- with state.condition:
- if state.open:
- state.open = False
- state.closed_reason = details
- for call_state in set(state.integrated_call_states.values()):
- grpc_call_cancel_with_status(
- call_state.c_call, code, encoded_details, NULL)
- for call_state in state.segregated_call_states:
- grpc_call_cancel_with_status(
- call_state.c_call, code, encoded_details, NULL)
- # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
- # watching.
- if drain_calls:
- while not _calls_drained(state):
- event = channel.next_call_event()
- if event.completion_type == CompletionType.queue_timeout:
- continue
- event.tag(event)
- else:
- while state.integrated_call_states:
- state.condition.wait()
- while state.connectivity_due:
- state.condition.wait()
- _destroy_c_completion_queue(state.c_call_completion_queue)
- _destroy_c_completion_queue(state.c_connectivity_completion_queue)
- grpc_channel_destroy(state.c_channel)
- state.c_channel = NULL
- grpc_shutdown()
- state.condition.notify_all()
- else:
- # Another call to close already completed in the past or is currently
- # being executed in another thread.
- while state.c_channel != NULL:
- state.condition.wait()
- cdef _calls_drained(_ChannelState state):
- return not (state.integrated_call_states or state.segregated_call_states or
- state.connectivity_due)
- cdef class Channel:
- def __cinit__(
- self, bytes target, object arguments,
- ChannelCredentials channel_credentials):
- arguments = () if arguments is None else tuple(arguments)
- fork_handlers_and_grpc_init()
- self._state = _ChannelState()
- self._state.c_call_completion_queue = (
- grpc_completion_queue_create_for_next(NULL))
- self._state.c_connectivity_completion_queue = (
- grpc_completion_queue_create_for_next(NULL))
- self._arguments = arguments
- cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
- c_channel_credentials = (
- channel_credentials.c() if channel_credentials is not None
- else grpc_insecure_credentials_create())
- self._state.c_channel = grpc_channel_create(
- <char *>target, c_channel_credentials, channel_args.c_args())
- grpc_channel_credentials_release(c_channel_credentials)
- def target(self):
- cdef char *c_target
- with self._state.condition:
- c_target = grpc_channel_get_target(self._state.c_channel)
- target = <bytes>c_target
- gpr_free(c_target)
- return target
- def integrated_call(
- self, int flags, method, host, object deadline, object metadata,
- CallCredentials credentials, operationses_and_tags,
- object context = None):
- return _integrated_call(
- self._state, flags, method, host, deadline, metadata, credentials,
- operationses_and_tags, context)
- def next_call_event(self):
- def on_success(tag):
- if tag is not None:
- _process_integrated_call_tag(self._state, tag)
- if is_fork_support_enabled():
- queue_deadline = time.time() + 1.0
- else:
- queue_deadline = None
- # NOTE(gnossen): It is acceptable for on_failure to be None here because
- # failure conditions can only ever happen on the main thread and this
- # method is only ever invoked on the channel spin thread.
- return _next_call_event(self._state, self._state.c_call_completion_queue,
- on_success, None, queue_deadline)
- def segregated_call(
- self, int flags, method, host, object deadline, object metadata,
- CallCredentials credentials, operationses_and_tags,
- object context = None):
- return _segregated_call(
- self._state, flags, method, host, deadline, metadata, credentials,
- operationses_and_tags, context)
- def check_connectivity_state(self, bint try_to_connect):
- with self._state.condition:
- if self._state.open:
- return grpc_channel_check_connectivity_state(
- self._state.c_channel, try_to_connect)
- else:
- raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
- def watch_connectivity_state(
- self, grpc_connectivity_state last_observed_state, object deadline):
- return _watch_connectivity_state(self._state, last_observed_state, deadline)
- def close(self, code, details):
- _close(self, code, details, False)
- def close_on_fork(self, code, details):
- _close(self, code, details, True)
|