12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001 |
- # Copyright 2019 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Interceptors implementation of gRPC Asyncio Python."""
- from abc import ABCMeta
- from abc import abstractmethod
- import asyncio
- import collections
- import functools
- from typing import (AsyncIterable, Awaitable, Callable, Iterator, List,
- Optional, Sequence, Union)
- import grpc
- from grpc._cython import cygrpc
- from . import _base_call
- from ._call import AioRpcError
- from ._call import StreamStreamCall
- from ._call import StreamUnaryCall
- from ._call import UnaryStreamCall
- from ._call import UnaryUnaryCall
- from ._call import _API_STYLE_ERROR
- from ._call import _RPC_ALREADY_FINISHED_DETAILS
- from ._call import _RPC_HALF_CLOSED_DETAILS
- from ._metadata import Metadata
- from ._typing import DeserializingFunction
- from ._typing import DoneCallbackType
- from ._typing import RequestIterableType
- from ._typing import RequestType
- from ._typing import ResponseIterableType
- from ._typing import ResponseType
- from ._typing import SerializingFunction
- from ._utils import _timeout_to_deadline
- _LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!'
- class ServerInterceptor(metaclass=ABCMeta):
- """Affords intercepting incoming RPCs on the service-side.
- This is an EXPERIMENTAL API.
- """
- @abstractmethod
- async def intercept_service(
- self, continuation: Callable[[grpc.HandlerCallDetails],
- Awaitable[grpc.RpcMethodHandler]],
- handler_call_details: grpc.HandlerCallDetails
- ) -> grpc.RpcMethodHandler:
- """Intercepts incoming RPCs before handing them over to a handler.
- Args:
- continuation: A function that takes a HandlerCallDetails and
- proceeds to invoke the next interceptor in the chain, if any,
- or the RPC handler lookup logic, with the call details passed
- as an argument, and returns an RpcMethodHandler instance if
- the RPC is considered serviced, or None otherwise.
- handler_call_details: A HandlerCallDetails describing the RPC.
- Returns:
- An RpcMethodHandler with which the RPC may be serviced if the
- interceptor chooses to service this RPC, or None otherwise.
- """
- class ClientCallDetails(
- collections.namedtuple(
- 'ClientCallDetails',
- ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')),
- grpc.ClientCallDetails):
- """Describes an RPC to be invoked.
- This is an EXPERIMENTAL API.
- Args:
- method: The method name of the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: Optional metadata to be transmitted to the service-side of
- the RPC.
- credentials: An optional CallCredentials for the RPC.
- wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
- """
- method: str
- timeout: Optional[float]
- metadata: Optional[Metadata]
- credentials: Optional[grpc.CallCredentials]
- wait_for_ready: Optional[bool]
- class ClientInterceptor(metaclass=ABCMeta):
- """Base class used for all Aio Client Interceptor classes"""
- class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
- """Affords intercepting unary-unary invocations."""
- @abstractmethod
- async def intercept_unary_unary(
- self, continuation: Callable[[ClientCallDetails, RequestType],
- UnaryUnaryCall],
- client_call_details: ClientCallDetails,
- request: RequestType) -> Union[UnaryUnaryCall, ResponseType]:
- """Intercepts a unary-unary invocation asynchronously.
- Args:
- continuation: A coroutine that proceeds with the invocation by
- executing the next interceptor in the chain or invoking the
- actual RPC on the underlying Channel. It is the interceptor's
- responsibility to call it if it decides to move the RPC forward.
- The interceptor can use
- `call = await continuation(client_call_details, request)`
- to continue with the RPC. `continuation` returns the call to the
- RPC.
- client_call_details: A ClientCallDetails object describing the
- outgoing RPC.
- request: The request value for the RPC.
- Returns:
- An object with the RPC response.
- Raises:
- AioRpcError: Indicating that the RPC terminated with non-OK status.
- asyncio.CancelledError: Indicating that the RPC was canceled.
- """
- class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
- """Affords intercepting unary-stream invocations."""
- @abstractmethod
- async def intercept_unary_stream(
- self, continuation: Callable[[ClientCallDetails, RequestType],
- UnaryStreamCall],
- client_call_details: ClientCallDetails, request: RequestType
- ) -> Union[ResponseIterableType, UnaryStreamCall]:
- """Intercepts a unary-stream invocation asynchronously.
- The function could return the call object or an asynchronous
- iterator, in case of being an asyncrhonous iterator this will
- become the source of the reads done by the caller.
- Args:
- continuation: A coroutine that proceeds with the invocation by
- executing the next interceptor in the chain or invoking the
- actual RPC on the underlying Channel. It is the interceptor's
- responsibility to call it if it decides to move the RPC forward.
- The interceptor can use
- `call = await continuation(client_call_details, request)`
- to continue with the RPC. `continuation` returns the call to the
- RPC.
- client_call_details: A ClientCallDetails object describing the
- outgoing RPC.
- request: The request value for the RPC.
- Returns:
- The RPC Call or an asynchronous iterator.
- Raises:
- AioRpcError: Indicating that the RPC terminated with non-OK status.
- asyncio.CancelledError: Indicating that the RPC was canceled.
- """
- class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
- """Affords intercepting stream-unary invocations."""
- @abstractmethod
- async def intercept_stream_unary(
- self,
- continuation: Callable[[ClientCallDetails, RequestType],
- StreamUnaryCall],
- client_call_details: ClientCallDetails,
- request_iterator: RequestIterableType,
- ) -> StreamUnaryCall:
- """Intercepts a stream-unary invocation asynchronously.
- Within the interceptor the usage of the call methods like `write` or
- even awaiting the call should be done carefully, since the caller
- could be expecting an untouched call, for example for start writing
- messages to it.
- Args:
- continuation: A coroutine that proceeds with the invocation by
- executing the next interceptor in the chain or invoking the
- actual RPC on the underlying Channel. It is the interceptor's
- responsibility to call it if it decides to move the RPC forward.
- The interceptor can use
- `call = await continuation(client_call_details, request_iterator)`
- to continue with the RPC. `continuation` returns the call to the
- RPC.
- client_call_details: A ClientCallDetails object describing the
- outgoing RPC.
- request_iterator: The request iterator that will produce requests
- for the RPC.
- Returns:
- The RPC Call.
- Raises:
- AioRpcError: Indicating that the RPC terminated with non-OK status.
- asyncio.CancelledError: Indicating that the RPC was canceled.
- """
- class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
- """Affords intercepting stream-stream invocations."""
- @abstractmethod
- async def intercept_stream_stream(
- self,
- continuation: Callable[[ClientCallDetails, RequestType],
- StreamStreamCall],
- client_call_details: ClientCallDetails,
- request_iterator: RequestIterableType,
- ) -> Union[ResponseIterableType, StreamStreamCall]:
- """Intercepts a stream-stream invocation asynchronously.
- Within the interceptor the usage of the call methods like `write` or
- even awaiting the call should be done carefully, since the caller
- could be expecting an untouched call, for example for start writing
- messages to it.
- The function could return the call object or an asynchronous
- iterator, in case of being an asyncrhonous iterator this will
- become the source of the reads done by the caller.
- Args:
- continuation: A coroutine that proceeds with the invocation by
- executing the next interceptor in the chain or invoking the
- actual RPC on the underlying Channel. It is the interceptor's
- responsibility to call it if it decides to move the RPC forward.
- The interceptor can use
- `call = await continuation(client_call_details, request_iterator)`
- to continue with the RPC. `continuation` returns the call to the
- RPC.
- client_call_details: A ClientCallDetails object describing the
- outgoing RPC.
- request_iterator: The request iterator that will produce requests
- for the RPC.
- Returns:
- The RPC Call or an asynchronous iterator.
- Raises:
- AioRpcError: Indicating that the RPC terminated with non-OK status.
- asyncio.CancelledError: Indicating that the RPC was canceled.
- """
- class InterceptedCall:
- """Base implementation for all intercepted call arities.
- Interceptors might have some work to do before the RPC invocation with
- the capacity of changing the invocation parameters, and some work to do
- after the RPC invocation with the capacity for accessing to the wrapped
- `UnaryUnaryCall`.
- It handles also early and later cancellations, when the RPC has not even
- started and the execution is still held by the interceptors or when the
- RPC has finished but again the execution is still held by the interceptors.
- Once the RPC is finally executed, all methods are finally done against the
- intercepted call, being at the same time the same call returned to the
- interceptors.
- As a base class for all of the interceptors implements the logic around
- final status, metadata and cancellation.
- """
- _interceptors_task: asyncio.Task
- _pending_add_done_callbacks: Sequence[DoneCallbackType]
- def __init__(self, interceptors_task: asyncio.Task) -> None:
- self._interceptors_task = interceptors_task
- self._pending_add_done_callbacks = []
- self._interceptors_task.add_done_callback(
- self._fire_or_add_pending_done_callbacks)
- def __del__(self):
- self.cancel()
- def _fire_or_add_pending_done_callbacks(
- self, interceptors_task: asyncio.Task) -> None:
- if not self._pending_add_done_callbacks:
- return
- call_completed = False
- try:
- call = interceptors_task.result()
- if call.done():
- call_completed = True
- except (AioRpcError, asyncio.CancelledError):
- call_completed = True
- if call_completed:
- for callback in self._pending_add_done_callbacks:
- callback(self)
- else:
- for callback in self._pending_add_done_callbacks:
- callback = functools.partial(self._wrap_add_done_callback,
- callback)
- call.add_done_callback(callback)
- self._pending_add_done_callbacks = []
- def _wrap_add_done_callback(self, callback: DoneCallbackType,
- unused_call: _base_call.Call) -> None:
- callback(self)
- def cancel(self) -> bool:
- if not self._interceptors_task.done():
- # There is no yet the intercepted call available,
- # Trying to cancel it by using the generic Asyncio
- # cancellation method.
- return self._interceptors_task.cancel()
- try:
- call = self._interceptors_task.result()
- except AioRpcError:
- return False
- except asyncio.CancelledError:
- return False
- return call.cancel()
- def cancelled(self) -> bool:
- if not self._interceptors_task.done():
- return False
- try:
- call = self._interceptors_task.result()
- except AioRpcError as err:
- return err.code() == grpc.StatusCode.CANCELLED
- except asyncio.CancelledError:
- return True
- return call.cancelled()
- def done(self) -> bool:
- if not self._interceptors_task.done():
- return False
- try:
- call = self._interceptors_task.result()
- except (AioRpcError, asyncio.CancelledError):
- return True
- return call.done()
- def add_done_callback(self, callback: DoneCallbackType) -> None:
- if not self._interceptors_task.done():
- self._pending_add_done_callbacks.append(callback)
- return
- try:
- call = self._interceptors_task.result()
- except (AioRpcError, asyncio.CancelledError):
- callback(self)
- return
- if call.done():
- callback(self)
- else:
- callback = functools.partial(self._wrap_add_done_callback, callback)
- call.add_done_callback(callback)
- def time_remaining(self) -> Optional[float]:
- raise NotImplementedError()
- async def initial_metadata(self) -> Optional[Metadata]:
- try:
- call = await self._interceptors_task
- except AioRpcError as err:
- return err.initial_metadata()
- except asyncio.CancelledError:
- return None
- return await call.initial_metadata()
- async def trailing_metadata(self) -> Optional[Metadata]:
- try:
- call = await self._interceptors_task
- except AioRpcError as err:
- return err.trailing_metadata()
- except asyncio.CancelledError:
- return None
- return await call.trailing_metadata()
- async def code(self) -> grpc.StatusCode:
- try:
- call = await self._interceptors_task
- except AioRpcError as err:
- return err.code()
- except asyncio.CancelledError:
- return grpc.StatusCode.CANCELLED
- return await call.code()
- async def details(self) -> str:
- try:
- call = await self._interceptors_task
- except AioRpcError as err:
- return err.details()
- except asyncio.CancelledError:
- return _LOCAL_CANCELLATION_DETAILS
- return await call.details()
- async def debug_error_string(self) -> Optional[str]:
- try:
- call = await self._interceptors_task
- except AioRpcError as err:
- return err.debug_error_string()
- except asyncio.CancelledError:
- return ''
- return await call.debug_error_string()
- async def wait_for_connection(self) -> None:
- call = await self._interceptors_task
- return await call.wait_for_connection()
- class _InterceptedUnaryResponseMixin:
- def __await__(self):
- call = yield from self._interceptors_task.__await__()
- response = yield from call.__await__()
- return response
- class _InterceptedStreamResponseMixin:
- _response_aiter: Optional[AsyncIterable[ResponseType]]
- def _init_stream_response_mixin(self) -> None:
- # Is initalized later, otherwise if the iterator is not finnally
- # consumed a logging warning is emmited by Asyncio.
- self._response_aiter = None
- async def _wait_for_interceptor_task_response_iterator(
- self) -> ResponseType:
- call = await self._interceptors_task
- async for response in call:
- yield response
- def __aiter__(self) -> AsyncIterable[ResponseType]:
- if self._response_aiter is None:
- self._response_aiter = self._wait_for_interceptor_task_response_iterator(
- )
- return self._response_aiter
- async def read(self) -> ResponseType:
- if self._response_aiter is None:
- self._response_aiter = self._wait_for_interceptor_task_response_iterator(
- )
- return await self._response_aiter.asend(None)
- class _InterceptedStreamRequestMixin:
- _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
- _write_to_iterator_queue: Optional[asyncio.Queue]
- _status_code_task: Optional[asyncio.Task]
- _FINISH_ITERATOR_SENTINEL = object()
- def _init_stream_request_mixin(
- self, request_iterator: Optional[RequestIterableType]
- ) -> RequestIterableType:
- if request_iterator is None:
- # We provide our own request iterator which is a proxy
- # of the futures writes that will be done by the caller.
- self._write_to_iterator_queue = asyncio.Queue(maxsize=1)
- self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator(
- )
- self._status_code_task = None
- request_iterator = self._write_to_iterator_async_gen
- else:
- self._write_to_iterator_queue = None
- return request_iterator
- async def _proxy_writes_as_request_iterator(self):
- await self._interceptors_task
- while True:
- value = await self._write_to_iterator_queue.get()
- if value is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL:
- break
- yield value
- async def _write_to_iterator_queue_interruptible(self, request: RequestType,
- call: InterceptedCall):
- # Write the specified 'request' to the request iterator queue using the
- # specified 'call' to allow for interruption of the write in the case
- # of abrupt termination of the call.
- if self._status_code_task is None:
- self._status_code_task = self._loop.create_task(call.code())
- await asyncio.wait(
- (self._loop.create_task(self._write_to_iterator_queue.put(request)),
- self._status_code_task),
- return_when=asyncio.FIRST_COMPLETED)
- async def write(self, request: RequestType) -> None:
- # If no queue was created it means that requests
- # should be expected through an iterators provided
- # by the caller.
- if self._write_to_iterator_queue is None:
- raise cygrpc.UsageError(_API_STYLE_ERROR)
- try:
- call = await self._interceptors_task
- except (asyncio.CancelledError, AioRpcError):
- raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
- if call.done():
- raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
- elif call._done_writing_flag:
- raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
- await self._write_to_iterator_queue_interruptible(request, call)
- if call.done():
- raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
- async def done_writing(self) -> None:
- """Signal peer that client is done writing.
- This method is idempotent.
- """
- # If no queue was created it means that requests
- # should be expected through an iterators provided
- # by the caller.
- if self._write_to_iterator_queue is None:
- raise cygrpc.UsageError(_API_STYLE_ERROR)
- try:
- call = await self._interceptors_task
- except asyncio.CancelledError:
- raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
- await self._write_to_iterator_queue_interruptible(
- _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call)
- class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall,
- _base_call.UnaryUnaryCall):
- """Used for running a `UnaryUnaryCall` wrapped by interceptors.
- For the `__await__` method is it is proxied to the intercepted call only when
- the interceptor task is finished.
- """
- _loop: asyncio.AbstractEventLoop
- _channel: cygrpc.AioChannel
- # pylint: disable=too-many-arguments
- def __init__(self, interceptors: Sequence[UnaryUnaryClientInterceptor],
- request: RequestType, timeout: Optional[float],
- metadata: Metadata,
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
- method: bytes, request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction,
- loop: asyncio.AbstractEventLoop) -> None:
- self._loop = loop
- self._channel = channel
- interceptors_task = loop.create_task(
- self._invoke(interceptors, method, timeout, metadata, credentials,
- wait_for_ready, request, request_serializer,
- response_deserializer))
- super().__init__(interceptors_task)
- # pylint: disable=too-many-arguments
- async def _invoke(
- self, interceptors: Sequence[UnaryUnaryClientInterceptor],
- method: bytes, timeout: Optional[float],
- metadata: Optional[Metadata],
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool], request: RequestType,
- request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction) -> UnaryUnaryCall:
- """Run the RPC call wrapped in interceptors"""
- async def _run_interceptor(
- interceptors: List[UnaryUnaryClientInterceptor],
- client_call_details: ClientCallDetails,
- request: RequestType) -> _base_call.UnaryUnaryCall:
- if interceptors:
- continuation = functools.partial(_run_interceptor,
- interceptors[1:])
- call_or_response = await interceptors[0].intercept_unary_unary(
- continuation, client_call_details, request)
- if isinstance(call_or_response, _base_call.UnaryUnaryCall):
- return call_or_response
- else:
- return UnaryUnaryCallResponse(call_or_response)
- else:
- return UnaryUnaryCall(
- request, _timeout_to_deadline(client_call_details.timeout),
- client_call_details.metadata,
- client_call_details.credentials,
- client_call_details.wait_for_ready, self._channel,
- client_call_details.method, request_serializer,
- response_deserializer, self._loop)
- client_call_details = ClientCallDetails(method, timeout, metadata,
- credentials, wait_for_ready)
- return await _run_interceptor(list(interceptors), client_call_details,
- request)
- def time_remaining(self) -> Optional[float]:
- raise NotImplementedError()
- class InterceptedUnaryStreamCall(_InterceptedStreamResponseMixin,
- InterceptedCall, _base_call.UnaryStreamCall):
- """Used for running a `UnaryStreamCall` wrapped by interceptors."""
- _loop: asyncio.AbstractEventLoop
- _channel: cygrpc.AioChannel
- _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
- # pylint: disable=too-many-arguments
- def __init__(self, interceptors: Sequence[UnaryStreamClientInterceptor],
- request: RequestType, timeout: Optional[float],
- metadata: Metadata,
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
- method: bytes, request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction,
- loop: asyncio.AbstractEventLoop) -> None:
- self._loop = loop
- self._channel = channel
- self._init_stream_response_mixin()
- self._last_returned_call_from_interceptors = None
- interceptors_task = loop.create_task(
- self._invoke(interceptors, method, timeout, metadata, credentials,
- wait_for_ready, request, request_serializer,
- response_deserializer))
- super().__init__(interceptors_task)
- # pylint: disable=too-many-arguments
- async def _invoke(
- self, interceptors: Sequence[UnaryUnaryClientInterceptor],
- method: bytes, timeout: Optional[float],
- metadata: Optional[Metadata],
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool], request: RequestType,
- request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction) -> UnaryStreamCall:
- """Run the RPC call wrapped in interceptors"""
- async def _run_interceptor(
- interceptors: List[UnaryStreamClientInterceptor],
- client_call_details: ClientCallDetails,
- request: RequestType,
- ) -> _base_call.UnaryUnaryCall:
- if interceptors:
- continuation = functools.partial(_run_interceptor,
- interceptors[1:])
- call_or_response_iterator = await interceptors[
- 0].intercept_unary_stream(continuation, client_call_details,
- request)
- if isinstance(call_or_response_iterator,
- _base_call.UnaryStreamCall):
- self._last_returned_call_from_interceptors = call_or_response_iterator
- else:
- self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator(
- self._last_returned_call_from_interceptors,
- call_or_response_iterator)
- return self._last_returned_call_from_interceptors
- else:
- self._last_returned_call_from_interceptors = UnaryStreamCall(
- request, _timeout_to_deadline(client_call_details.timeout),
- client_call_details.metadata,
- client_call_details.credentials,
- client_call_details.wait_for_ready, self._channel,
- client_call_details.method, request_serializer,
- response_deserializer, self._loop)
- return self._last_returned_call_from_interceptors
- client_call_details = ClientCallDetails(method, timeout, metadata,
- credentials, wait_for_ready)
- return await _run_interceptor(list(interceptors), client_call_details,
- request)
- def time_remaining(self) -> Optional[float]:
- raise NotImplementedError()
- class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
- _InterceptedStreamRequestMixin,
- InterceptedCall, _base_call.StreamUnaryCall):
- """Used for running a `StreamUnaryCall` wrapped by interceptors.
- For the `__await__` method is it is proxied to the intercepted call only when
- the interceptor task is finished.
- """
- _loop: asyncio.AbstractEventLoop
- _channel: cygrpc.AioChannel
- # pylint: disable=too-many-arguments
- def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor],
- request_iterator: Optional[RequestIterableType],
- timeout: Optional[float], metadata: Metadata,
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
- method: bytes, request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction,
- loop: asyncio.AbstractEventLoop) -> None:
- self._loop = loop
- self._channel = channel
- request_iterator = self._init_stream_request_mixin(request_iterator)
- interceptors_task = loop.create_task(
- self._invoke(interceptors, method, timeout, metadata, credentials,
- wait_for_ready, request_iterator, request_serializer,
- response_deserializer))
- super().__init__(interceptors_task)
- # pylint: disable=too-many-arguments
- async def _invoke(
- self, interceptors: Sequence[StreamUnaryClientInterceptor],
- method: bytes, timeout: Optional[float],
- metadata: Optional[Metadata],
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool],
- request_iterator: RequestIterableType,
- request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction) -> StreamUnaryCall:
- """Run the RPC call wrapped in interceptors"""
- async def _run_interceptor(
- interceptors: Iterator[UnaryUnaryClientInterceptor],
- client_call_details: ClientCallDetails,
- request_iterator: RequestIterableType
- ) -> _base_call.StreamUnaryCall:
- if interceptors:
- continuation = functools.partial(_run_interceptor,
- interceptors[1:])
- return await interceptors[0].intercept_stream_unary(
- continuation, client_call_details, request_iterator)
- else:
- return StreamUnaryCall(
- request_iterator,
- _timeout_to_deadline(client_call_details.timeout),
- client_call_details.metadata,
- client_call_details.credentials,
- client_call_details.wait_for_ready, self._channel,
- client_call_details.method, request_serializer,
- response_deserializer, self._loop)
- client_call_details = ClientCallDetails(method, timeout, metadata,
- credentials, wait_for_ready)
- return await _run_interceptor(list(interceptors), client_call_details,
- request_iterator)
- def time_remaining(self) -> Optional[float]:
- raise NotImplementedError()
- class InterceptedStreamStreamCall(_InterceptedStreamResponseMixin,
- _InterceptedStreamRequestMixin,
- InterceptedCall, _base_call.StreamStreamCall):
- """Used for running a `StreamStreamCall` wrapped by interceptors."""
- _loop: asyncio.AbstractEventLoop
- _channel: cygrpc.AioChannel
- _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
- # pylint: disable=too-many-arguments
- def __init__(self, interceptors: Sequence[StreamStreamClientInterceptor],
- request_iterator: Optional[RequestIterableType],
- timeout: Optional[float], metadata: Metadata,
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
- method: bytes, request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction,
- loop: asyncio.AbstractEventLoop) -> None:
- self._loop = loop
- self._channel = channel
- self._init_stream_response_mixin()
- request_iterator = self._init_stream_request_mixin(request_iterator)
- self._last_returned_call_from_interceptors = None
- interceptors_task = loop.create_task(
- self._invoke(interceptors, method, timeout, metadata, credentials,
- wait_for_ready, request_iterator, request_serializer,
- response_deserializer))
- super().__init__(interceptors_task)
- # pylint: disable=too-many-arguments
- async def _invoke(
- self, interceptors: Sequence[StreamStreamClientInterceptor],
- method: bytes, timeout: Optional[float],
- metadata: Optional[Metadata],
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool],
- request_iterator: RequestIterableType,
- request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction) -> StreamStreamCall:
- """Run the RPC call wrapped in interceptors"""
- async def _run_interceptor(
- interceptors: List[StreamStreamClientInterceptor],
- client_call_details: ClientCallDetails,
- request_iterator: RequestIterableType
- ) -> _base_call.StreamStreamCall:
- if interceptors:
- continuation = functools.partial(_run_interceptor,
- interceptors[1:])
- call_or_response_iterator = await interceptors[
- 0].intercept_stream_stream(continuation,
- client_call_details,
- request_iterator)
- if isinstance(call_or_response_iterator,
- _base_call.StreamStreamCall):
- self._last_returned_call_from_interceptors = call_or_response_iterator
- else:
- self._last_returned_call_from_interceptors = StreamStreamCallResponseIterator(
- self._last_returned_call_from_interceptors,
- call_or_response_iterator)
- return self._last_returned_call_from_interceptors
- else:
- self._last_returned_call_from_interceptors = StreamStreamCall(
- request_iterator,
- _timeout_to_deadline(client_call_details.timeout),
- client_call_details.metadata,
- client_call_details.credentials,
- client_call_details.wait_for_ready, self._channel,
- client_call_details.method, request_serializer,
- response_deserializer, self._loop)
- return self._last_returned_call_from_interceptors
- client_call_details = ClientCallDetails(method, timeout, metadata,
- credentials, wait_for_ready)
- return await _run_interceptor(list(interceptors), client_call_details,
- request_iterator)
- def time_remaining(self) -> Optional[float]:
- raise NotImplementedError()
- class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall):
- """Final UnaryUnaryCall class finished with a response."""
- _response: ResponseType
- def __init__(self, response: ResponseType) -> None:
- self._response = response
- def cancel(self) -> bool:
- return False
- def cancelled(self) -> bool:
- return False
- def done(self) -> bool:
- return True
- def add_done_callback(self, unused_callback) -> None:
- raise NotImplementedError()
- def time_remaining(self) -> Optional[float]:
- raise NotImplementedError()
- async def initial_metadata(self) -> Optional[Metadata]:
- return None
- async def trailing_metadata(self) -> Optional[Metadata]:
- return None
- async def code(self) -> grpc.StatusCode:
- return grpc.StatusCode.OK
- async def details(self) -> str:
- return ''
- async def debug_error_string(self) -> Optional[str]:
- return None
- def __await__(self):
- if False: # pylint: disable=using-constant-test
- # This code path is never used, but a yield statement is needed
- # for telling the interpreter that __await__ is a generator.
- yield None
- return self._response
- async def wait_for_connection(self) -> None:
- pass
- class _StreamCallResponseIterator:
- _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall]
- _response_iterator: AsyncIterable[ResponseType]
- def __init__(self, call: Union[_base_call.UnaryStreamCall,
- _base_call.StreamStreamCall],
- response_iterator: AsyncIterable[ResponseType]) -> None:
- self._response_iterator = response_iterator
- self._call = call
- def cancel(self) -> bool:
- return self._call.cancel()
- def cancelled(self) -> bool:
- return self._call.cancelled()
- def done(self) -> bool:
- return self._call.done()
- def add_done_callback(self, callback) -> None:
- self._call.add_done_callback(callback)
- def time_remaining(self) -> Optional[float]:
- return self._call.time_remaining()
- async def initial_metadata(self) -> Optional[Metadata]:
- return await self._call.initial_metadata()
- async def trailing_metadata(self) -> Optional[Metadata]:
- return await self._call.trailing_metadata()
- async def code(self) -> grpc.StatusCode:
- return await self._call.code()
- async def details(self) -> str:
- return await self._call.details()
- async def debug_error_string(self) -> Optional[str]:
- return await self._call.debug_error_string()
- def __aiter__(self):
- return self._response_iterator.__aiter__()
- async def wait_for_connection(self) -> None:
- return await self._call.wait_for_connection()
- class UnaryStreamCallResponseIterator(_StreamCallResponseIterator,
- _base_call.UnaryStreamCall):
- """UnaryStreamCall class wich uses an alternative response iterator."""
- async def read(self) -> ResponseType:
- # Behind the scenes everyting goes through the
- # async iterator. So this path should not be reached.
- raise NotImplementedError()
- class StreamStreamCallResponseIterator(_StreamCallResponseIterator,
- _base_call.StreamStreamCall):
- """StreamStreamCall class wich uses an alternative response iterator."""
- async def read(self) -> ResponseType:
- # Behind the scenes everyting goes through the
- # async iterator. So this path should not be reached.
- raise NotImplementedError()
- async def write(self, request: RequestType) -> None:
- # Behind the scenes everyting goes through the
- # async iterator provided by the InterceptedStreamStreamCall.
- # So this path should not be reached.
- raise NotImplementedError()
- async def done_writing(self) -> None:
- # Behind the scenes everyting goes through the
- # async iterator provided by the InterceptedStreamStreamCall.
- # So this path should not be reached.
- raise NotImplementedError()
- @property
- def _done_writing_flag(self) -> bool:
- return self._call._done_writing_flag
|