123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- # Copyright 2016 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Shared implementation."""
- import logging
- import time
- from typing import Any, AnyStr, Callable, Optional, Union
- import grpc
- from grpc._cython import cygrpc
- from grpc._typing import DeserializingFunction
- from grpc._typing import SerializingFunction
- _LOGGER = logging.getLogger(__name__)
- CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
- cygrpc.ConnectivityState.idle:
- grpc.ChannelConnectivity.IDLE,
- cygrpc.ConnectivityState.connecting:
- grpc.ChannelConnectivity.CONNECTING,
- cygrpc.ConnectivityState.ready:
- grpc.ChannelConnectivity.READY,
- cygrpc.ConnectivityState.transient_failure:
- grpc.ChannelConnectivity.TRANSIENT_FAILURE,
- cygrpc.ConnectivityState.shutdown:
- grpc.ChannelConnectivity.SHUTDOWN,
- }
- CYGRPC_STATUS_CODE_TO_STATUS_CODE = {
- cygrpc.StatusCode.ok: grpc.StatusCode.OK,
- cygrpc.StatusCode.cancelled: grpc.StatusCode.CANCELLED,
- cygrpc.StatusCode.unknown: grpc.StatusCode.UNKNOWN,
- cygrpc.StatusCode.invalid_argument: grpc.StatusCode.INVALID_ARGUMENT,
- cygrpc.StatusCode.deadline_exceeded: grpc.StatusCode.DEADLINE_EXCEEDED,
- cygrpc.StatusCode.not_found: grpc.StatusCode.NOT_FOUND,
- cygrpc.StatusCode.already_exists: grpc.StatusCode.ALREADY_EXISTS,
- cygrpc.StatusCode.permission_denied: grpc.StatusCode.PERMISSION_DENIED,
- cygrpc.StatusCode.unauthenticated: grpc.StatusCode.UNAUTHENTICATED,
- cygrpc.StatusCode.resource_exhausted: grpc.StatusCode.RESOURCE_EXHAUSTED,
- cygrpc.StatusCode.failed_precondition: grpc.StatusCode.FAILED_PRECONDITION,
- cygrpc.StatusCode.aborted: grpc.StatusCode.ABORTED,
- cygrpc.StatusCode.out_of_range: grpc.StatusCode.OUT_OF_RANGE,
- cygrpc.StatusCode.unimplemented: grpc.StatusCode.UNIMPLEMENTED,
- cygrpc.StatusCode.internal: grpc.StatusCode.INTERNAL,
- cygrpc.StatusCode.unavailable: grpc.StatusCode.UNAVAILABLE,
- cygrpc.StatusCode.data_loss: grpc.StatusCode.DATA_LOSS,
- }
- STATUS_CODE_TO_CYGRPC_STATUS_CODE = {
- grpc_code: cygrpc_code
- for cygrpc_code, grpc_code in CYGRPC_STATUS_CODE_TO_STATUS_CODE.items()
- }
- MAXIMUM_WAIT_TIMEOUT = 0.1
- _ERROR_MESSAGE_PORT_BINDING_FAILED = 'Failed to bind to address %s; set ' \
- 'GRPC_VERBOSITY=debug environment variable to see detailed error message.'
- def encode(s: AnyStr) -> bytes:
- if isinstance(s, bytes):
- return s
- else:
- return s.encode('utf8')
- def decode(b: AnyStr) -> str:
- if isinstance(b, bytes):
- return b.decode('utf-8', 'replace')
- return b
- def _transform(message: Any, transformer: Union[SerializingFunction,
- DeserializingFunction, None],
- exception_message: str) -> Any:
- if transformer is None:
- return message
- else:
- try:
- return transformer(message)
- except Exception: # pylint: disable=broad-except
- _LOGGER.exception(exception_message)
- return None
- def serialize(message: Any, serializer: Optional[SerializingFunction]) -> bytes:
- return _transform(message, serializer, 'Exception serializing message!')
- def deserialize(serialized_message: bytes,
- deserializer: Optional[DeserializingFunction]) -> Any:
- return _transform(serialized_message, deserializer,
- 'Exception deserializing message!')
- def fully_qualified_method(group: str, method: str) -> str:
- return '/{}/{}'.format(group, method)
- def _wait_once(wait_fn: Callable[..., bool], timeout: float,
- spin_cb: Optional[Callable[[], None]]):
- wait_fn(timeout=timeout)
- if spin_cb is not None:
- spin_cb()
- def wait(wait_fn: Callable[..., bool],
- wait_complete_fn: Callable[[], bool],
- timeout: Optional[float] = None,
- spin_cb: Optional[Callable[[], None]] = None) -> bool:
- """Blocks waiting for an event without blocking the thread indefinitely.
- See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
- `threading.Event.wait` and `threading.Condition.wait` methods, if invoked
- without a timeout kwarg, may block the calling thread indefinitely. If the
- call is made from the main thread, this means that signal handlers may not
- run for an arbitrarily long period of time.
- This wrapper calls the supplied wait function with an arbitrary short
- timeout to ensure that no signal handler has to wait longer than
- MAXIMUM_WAIT_TIMEOUT before executing.
- Args:
- wait_fn: A callable acceptable a single float-valued kwarg named
- `timeout`. This function is expected to be one of `threading.Event.wait`
- or `threading.Condition.wait`.
- wait_complete_fn: A callable taking no arguments and returning a bool.
- When this function returns true, it indicates that waiting should cease.
- timeout: An optional float-valued number of seconds after which the wait
- should cease.
- spin_cb: An optional Callable taking no arguments and returning nothing.
- This callback will be called on each iteration of the spin. This may be
- used for, e.g. work related to forking.
- Returns:
- True if a timeout was supplied and it was reached. False otherwise.
- """
- if timeout is None:
- while not wait_complete_fn():
- _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
- else:
- end = time.time() + timeout
- while not wait_complete_fn():
- remaining = min(end - time.time(), MAXIMUM_WAIT_TIMEOUT)
- if remaining < 0:
- return True
- _wait_once(wait_fn, remaining, spin_cb)
- return False
- def validate_port_binding_result(address: str, port: int) -> int:
- """Validates if the port binding succeed.
- If the port returned by Core is 0, the binding is failed. However, in that
- case, the Core API doesn't return a detailed failing reason. The best we
- can do is raising an exception to prevent further confusion.
- Args:
- address: The address string to be bound.
- port: An int returned by core
- """
- if port == 0:
- # The Core API doesn't return a failure message. The best we can do
- # is raising an exception to prevent further confusion.
- raise RuntimeError(_ERROR_MESSAGE_PORT_BINDING_FAILED % address)
- else:
- return port
|