_client_adaptations.py 27 KB


  1. # Copyright 2016 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Translates gRPC's client-side API into gRPC's client-side Beta API."""
  15. import grpc
  16. from grpc import _common
  17. from grpc.beta import _metadata
  18. from grpc.beta import interfaces
  19. from grpc.framework.common import cardinality
  20. from grpc.framework.foundation import future
  21. from grpc.framework.interfaces.face import face
  22. # pylint: disable=too-many-arguments,too-many-locals,unused-argument
  23. _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
  24. grpc.StatusCode.CANCELLED:
  25. (face.Abortion.Kind.CANCELLED, face.CancellationError),
  26. grpc.StatusCode.UNKNOWN:
  27. (face.Abortion.Kind.REMOTE_FAILURE, face.RemoteError),
  28. grpc.StatusCode.DEADLINE_EXCEEDED:
  29. (face.Abortion.Kind.EXPIRED, face.ExpirationError),
  30. grpc.StatusCode.UNIMPLEMENTED:
  31. (face.Abortion.Kind.LOCAL_FAILURE, face.LocalError),
  32. }
  33. def _effective_metadata(metadata, metadata_transformer):
  34. non_none_metadata = () if metadata is None else metadata
  35. if metadata_transformer is None:
  36. return non_none_metadata
  37. else:
  38. return metadata_transformer(non_none_metadata)
  39. def _credentials(grpc_call_options):
  40. return None if grpc_call_options is None else grpc_call_options.credentials
  41. def _abortion(rpc_error_call):
  42. code = rpc_error_call.code()
  43. pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
  44. error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
  45. return face.Abortion(error_kind, rpc_error_call.initial_metadata(),
  46. rpc_error_call.trailing_metadata(), code,
  47. rpc_error_call.details())
  48. def _abortion_error(rpc_error_call):
  49. code = rpc_error_call.code()
  50. pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
  51. exception_class = face.AbortionError if pair is None else pair[1]
  52. return exception_class(rpc_error_call.initial_metadata(),
  53. rpc_error_call.trailing_metadata(), code,
  54. rpc_error_call.details())
  55. class _InvocationProtocolContext(interfaces.GRPCInvocationContext):
  56. def disable_next_request_compression(self):
  57. pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
  58. class _Rendezvous(future.Future, face.Call):
  59. def __init__(self, response_future, response_iterator, call):
  60. self._future = response_future
  61. self._iterator = response_iterator
  62. self._call = call
  63. def cancel(self):
  64. return self._call.cancel()
  65. def cancelled(self):
  66. return self._future.cancelled()
  67. def running(self):
  68. return self._future.running()
  69. def done(self):
  70. return self._future.done()
  71. def result(self, timeout=None):
  72. try:
  73. return self._future.result(timeout=timeout)
  74. except grpc.RpcError as rpc_error_call:
  75. raise _abortion_error(rpc_error_call)
  76. except grpc.FutureTimeoutError:
  77. raise future.TimeoutError()
  78. except grpc.FutureCancelledError:
  79. raise future.CancelledError()
  80. def exception(self, timeout=None):
  81. try:
  82. rpc_error_call = self._future.exception(timeout=timeout)
  83. if rpc_error_call is None:
  84. return None
  85. else:
  86. return _abortion_error(rpc_error_call)
  87. except grpc.FutureTimeoutError:
  88. raise future.TimeoutError()
  89. except grpc.FutureCancelledError:
  90. raise future.CancelledError()
  91. def traceback(self, timeout=None):
  92. try:
  93. return self._future.traceback(timeout=timeout)
  94. except grpc.FutureTimeoutError:
  95. raise future.TimeoutError()
  96. except grpc.FutureCancelledError:
  97. raise future.CancelledError()
  98. def add_done_callback(self, fn):
  99. self._future.add_done_callback(lambda ignored_callback: fn(self))
  100. def __iter__(self):
  101. return self
  102. def _next(self):
  103. try:
  104. return next(self._iterator)
  105. except grpc.RpcError as rpc_error_call:
  106. raise _abortion_error(rpc_error_call)
  107. def __next__(self):
  108. return self._next()
  109. def next(self):
  110. return self._next()
  111. def is_active(self):
  112. return self._call.is_active()
  113. def time_remaining(self):
  114. return self._call.time_remaining()
  115. def add_abortion_callback(self, abortion_callback):
  116. def done_callback():
  117. if self.code() is not grpc.StatusCode.OK:
  118. abortion_callback(_abortion(self._call))
  119. registered = self._call.add_callback(done_callback)
  120. return None if registered else done_callback()
  121. def protocol_context(self):
  122. return _InvocationProtocolContext()
  123. def initial_metadata(self):
  124. return _metadata.beta(self._call.initial_metadata())
  125. def terminal_metadata(self):
  126. return _metadata.beta(self._call.terminal_metadata())
  127. def code(self):
  128. return self._call.code()
  129. def details(self):
  130. return self._call.details()
  131. def _blocking_unary_unary(channel, group, method, timeout, with_call,
  132. protocol_options, metadata, metadata_transformer,
  133. request, request_serializer, response_deserializer):
  134. try:
  135. multi_callable = channel.unary_unary(
  136. _common.fully_qualified_method(group, method),
  137. request_serializer=request_serializer,
  138. response_deserializer=response_deserializer)
  139. effective_metadata = _effective_metadata(metadata, metadata_transformer)
  140. if with_call:
  141. response, call = multi_callable.with_call(
  142. request,
  143. timeout=timeout,
  144. metadata=_metadata.unbeta(effective_metadata),
  145. credentials=_credentials(protocol_options))
  146. return response, _Rendezvous(None, None, call)
  147. else:
  148. return multi_callable(request,
  149. timeout=timeout,
  150. metadata=_metadata.unbeta(effective_metadata),
  151. credentials=_credentials(protocol_options))
  152. except grpc.RpcError as rpc_error_call:
  153. raise _abortion_error(rpc_error_call)
  154. def _future_unary_unary(channel, group, method, timeout, protocol_options,
  155. metadata, metadata_transformer, request,
  156. request_serializer, response_deserializer):
  157. multi_callable = channel.unary_unary(
  158. _common.fully_qualified_method(group, method),
  159. request_serializer=request_serializer,
  160. response_deserializer=response_deserializer)
  161. effective_metadata = _effective_metadata(metadata, metadata_transformer)
  162. response_future = multi_callable.future(
  163. request,
  164. timeout=timeout,
  165. metadata=_metadata.unbeta(effective_metadata),
  166. credentials=_credentials(protocol_options))
  167. return _Rendezvous(response_future, None, response_future)
  168. def _unary_stream(channel, group, method, timeout, protocol_options, metadata,
  169. metadata_transformer, request, request_serializer,
  170. response_deserializer):
  171. multi_callable = channel.unary_stream(
  172. _common.fully_qualified_method(group, method),
  173. request_serializer=request_serializer,
  174. response_deserializer=response_deserializer)
  175. effective_metadata = _effective_metadata(metadata, metadata_transformer)
  176. response_iterator = multi_callable(
  177. request,
  178. timeout=timeout,
  179. metadata=_metadata.unbeta(effective_metadata),
  180. credentials=_credentials(protocol_options))
  181. return _Rendezvous(None, response_iterator, response_iterator)
  182. def _blocking_stream_unary(channel, group, method, timeout, with_call,
  183. protocol_options, metadata, metadata_transformer,
  184. request_iterator, request_serializer,
  185. response_deserializer):
  186. try:
  187. multi_callable = channel.stream_unary(
  188. _common.fully_qualified_method(group, method),
  189. request_serializer=request_serializer,
  190. response_deserializer=response_deserializer)
  191. effective_metadata = _effective_metadata(metadata, metadata_transformer)
  192. if with_call:
  193. response, call = multi_callable.with_call(
  194. request_iterator,
  195. timeout=timeout,
  196. metadata=_metadata.unbeta(effective_metadata),
  197. credentials=_credentials(protocol_options))
  198. return response, _Rendezvous(None, None, call)
  199. else:
  200. return multi_callable(request_iterator,
  201. timeout=timeout,
  202. metadata=_metadata.unbeta(effective_metadata),
  203. credentials=_credentials(protocol_options))
  204. except grpc.RpcError as rpc_error_call:
  205. raise _abortion_error(rpc_error_call)
  206. def _future_stream_unary(channel, group, method, timeout, protocol_options,
  207. metadata, metadata_transformer, request_iterator,
  208. request_serializer, response_deserializer):
  209. multi_callable = channel.stream_unary(
  210. _common.fully_qualified_method(group, method),
  211. request_serializer=request_serializer,
  212. response_deserializer=response_deserializer)
  213. effective_metadata = _effective_metadata(metadata, metadata_transformer)
  214. response_future = multi_callable.future(
  215. request_iterator,
  216. timeout=timeout,
  217. metadata=_metadata.unbeta(effective_metadata),
  218. credentials=_credentials(protocol_options))
  219. return _Rendezvous(response_future, None, response_future)
  220. def _stream_stream(channel, group, method, timeout, protocol_options, metadata,
  221. metadata_transformer, request_iterator, request_serializer,
  222. response_deserializer):
  223. multi_callable = channel.stream_stream(
  224. _common.fully_qualified_method(group, method),
  225. request_serializer=request_serializer,
  226. response_deserializer=response_deserializer)
  227. effective_metadata = _effective_metadata(metadata, metadata_transformer)
  228. response_iterator = multi_callable(
  229. request_iterator,
  230. timeout=timeout,
  231. metadata=_metadata.unbeta(effective_metadata),
  232. credentials=_credentials(protocol_options))
  233. return _Rendezvous(None, response_iterator, response_iterator)
  234. class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
  235. def __init__(self, channel, group, method, metadata_transformer,
  236. request_serializer, response_deserializer):
  237. self._channel = channel
  238. self._group = group
  239. self._method = method
  240. self._metadata_transformer = metadata_transformer
  241. self._request_serializer = request_serializer
  242. self._response_deserializer = response_deserializer
  243. def __call__(self,
  244. request,
  245. timeout,
  246. metadata=None,
  247. with_call=False,
  248. protocol_options=None):
  249. return _blocking_unary_unary(self._channel, self._group, self._method,
  250. timeout, with_call, protocol_options,
  251. metadata, self._metadata_transformer,
  252. request, self._request_serializer,
  253. self._response_deserializer)
  254. def future(self, request, timeout, metadata=None, protocol_options=None):
  255. return _future_unary_unary(self._channel, self._group, self._method,
  256. timeout, protocol_options, metadata,
  257. self._metadata_transformer, request,
  258. self._request_serializer,
  259. self._response_deserializer)
  260. def event(self,
  261. request,
  262. receiver,
  263. abortion_callback,
  264. timeout,
  265. metadata=None,
  266. protocol_options=None):
  267. raise NotImplementedError()
  268. class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
  269. def __init__(self, channel, group, method, metadata_transformer,
  270. request_serializer, response_deserializer):
  271. self._channel = channel
  272. self._group = group
  273. self._method = method
  274. self._metadata_transformer = metadata_transformer
  275. self._request_serializer = request_serializer
  276. self._response_deserializer = response_deserializer
  277. def __call__(self, request, timeout, metadata=None, protocol_options=None):
  278. return _unary_stream(self._channel, self._group, self._method, timeout,
  279. protocol_options, metadata,
  280. self._metadata_transformer, request,
  281. self._request_serializer,
  282. self._response_deserializer)
  283. def event(self,
  284. request,
  285. receiver,
  286. abortion_callback,
  287. timeout,
  288. metadata=None,
  289. protocol_options=None):
  290. raise NotImplementedError()
  291. class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
  292. def __init__(self, channel, group, method, metadata_transformer,
  293. request_serializer, response_deserializer):
  294. self._channel = channel
  295. self._group = group
  296. self._method = method
  297. self._metadata_transformer = metadata_transformer
  298. self._request_serializer = request_serializer
  299. self._response_deserializer = response_deserializer
  300. def __call__(self,
  301. request_iterator,
  302. timeout,
  303. metadata=None,
  304. with_call=False,
  305. protocol_options=None):
  306. return _blocking_stream_unary(self._channel, self._group, self._method,
  307. timeout, with_call, protocol_options,
  308. metadata, self._metadata_transformer,
  309. request_iterator,
  310. self._request_serializer,
  311. self._response_deserializer)
  312. def future(self,
  313. request_iterator,
  314. timeout,
  315. metadata=None,
  316. protocol_options=None):
  317. return _future_stream_unary(self._channel, self._group, self._method,
  318. timeout, protocol_options, metadata,
  319. self._metadata_transformer,
  320. request_iterator, self._request_serializer,
  321. self._response_deserializer)
  322. def event(self,
  323. receiver,
  324. abortion_callback,
  325. timeout,
  326. metadata=None,
  327. protocol_options=None):
  328. raise NotImplementedError()
  329. class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
  330. def __init__(self, channel, group, method, metadata_transformer,
  331. request_serializer, response_deserializer):
  332. self._channel = channel
  333. self._group = group
  334. self._method = method
  335. self._metadata_transformer = metadata_transformer
  336. self._request_serializer = request_serializer
  337. self._response_deserializer = response_deserializer
  338. def __call__(self,
  339. request_iterator,
  340. timeout,
  341. metadata=None,
  342. protocol_options=None):
  343. return _stream_stream(self._channel, self._group, self._method, timeout,
  344. protocol_options, metadata,
  345. self._metadata_transformer, request_iterator,
  346. self._request_serializer,
  347. self._response_deserializer)
  348. def event(self,
  349. receiver,
  350. abortion_callback,
  351. timeout,
  352. metadata=None,
  353. protocol_options=None):
  354. raise NotImplementedError()
  355. class _GenericStub(face.GenericStub):
  356. def __init__(self, channel, metadata_transformer, request_serializers,
  357. response_deserializers):
  358. self._channel = channel
  359. self._metadata_transformer = metadata_transformer
  360. self._request_serializers = request_serializers or {}
  361. self._response_deserializers = response_deserializers or {}
  362. def blocking_unary_unary(self,
  363. group,
  364. method,
  365. request,
  366. timeout,
  367. metadata=None,
  368. with_call=None,
  369. protocol_options=None):
  370. request_serializer = self._request_serializers.get((
  371. group,
  372. method,
  373. ))
  374. response_deserializer = self._response_deserializers.get((
  375. group,
  376. method,
  377. ))
  378. return _blocking_unary_unary(self._channel, group, method, timeout,
  379. with_call, protocol_options, metadata,
  380. self._metadata_transformer, request,
  381. request_serializer, response_deserializer)
  382. def future_unary_unary(self,
  383. group,
  384. method,
  385. request,
  386. timeout,
  387. metadata=None,
  388. protocol_options=None):
  389. request_serializer = self._request_serializers.get((
  390. group,
  391. method,
  392. ))
  393. response_deserializer = self._response_deserializers.get((
  394. group,
  395. method,
  396. ))
  397. return _future_unary_unary(self._channel, group, method, timeout,
  398. protocol_options, metadata,
  399. self._metadata_transformer, request,
  400. request_serializer, response_deserializer)
  401. def inline_unary_stream(self,
  402. group,
  403. method,
  404. request,
  405. timeout,
  406. metadata=None,
  407. protocol_options=None):
  408. request_serializer = self._request_serializers.get((
  409. group,
  410. method,
  411. ))
  412. response_deserializer = self._response_deserializers.get((
  413. group,
  414. method,
  415. ))
  416. return _unary_stream(self._channel, group, method, timeout,
  417. protocol_options, metadata,
  418. self._metadata_transformer, request,
  419. request_serializer, response_deserializer)
  420. def blocking_stream_unary(self,
  421. group,
  422. method,
  423. request_iterator,
  424. timeout,
  425. metadata=None,
  426. with_call=None,
  427. protocol_options=None):
  428. request_serializer = self._request_serializers.get((
  429. group,
  430. method,
  431. ))
  432. response_deserializer = self._response_deserializers.get((
  433. group,
  434. method,
  435. ))
  436. return _blocking_stream_unary(self._channel, group, method, timeout,
  437. with_call, protocol_options, metadata,
  438. self._metadata_transformer,
  439. request_iterator, request_serializer,
  440. response_deserializer)
  441. def future_stream_unary(self,
  442. group,
  443. method,
  444. request_iterator,
  445. timeout,
  446. metadata=None,
  447. protocol_options=None):
  448. request_serializer = self._request_serializers.get((
  449. group,
  450. method,
  451. ))
  452. response_deserializer = self._response_deserializers.get((
  453. group,
  454. method,
  455. ))
  456. return _future_stream_unary(self._channel, group, method, timeout,
  457. protocol_options, metadata,
  458. self._metadata_transformer,
  459. request_iterator, request_serializer,
  460. response_deserializer)
  461. def inline_stream_stream(self,
  462. group,
  463. method,
  464. request_iterator,
  465. timeout,
  466. metadata=None,
  467. protocol_options=None):
  468. request_serializer = self._request_serializers.get((
  469. group,
  470. method,
  471. ))
  472. response_deserializer = self._response_deserializers.get((
  473. group,
  474. method,
  475. ))
  476. return _stream_stream(self._channel, group, method, timeout,
  477. protocol_options, metadata,
  478. self._metadata_transformer, request_iterator,
  479. request_serializer, response_deserializer)
  480. def event_unary_unary(self,
  481. group,
  482. method,
  483. request,
  484. receiver,
  485. abortion_callback,
  486. timeout,
  487. metadata=None,
  488. protocol_options=None):
  489. raise NotImplementedError()
  490. def event_unary_stream(self,
  491. group,
  492. method,
  493. request,
  494. receiver,
  495. abortion_callback,
  496. timeout,
  497. metadata=None,
  498. protocol_options=None):
  499. raise NotImplementedError()
  500. def event_stream_unary(self,
  501. group,
  502. method,
  503. receiver,
  504. abortion_callback,
  505. timeout,
  506. metadata=None,
  507. protocol_options=None):
  508. raise NotImplementedError()
  509. def event_stream_stream(self,
  510. group,
  511. method,
  512. receiver,
  513. abortion_callback,
  514. timeout,
  515. metadata=None,
  516. protocol_options=None):
  517. raise NotImplementedError()
  518. def unary_unary(self, group, method):
  519. request_serializer = self._request_serializers.get((
  520. group,
  521. method,
  522. ))
  523. response_deserializer = self._response_deserializers.get((
  524. group,
  525. method,
  526. ))
  527. return _UnaryUnaryMultiCallable(self._channel, group, method,
  528. self._metadata_transformer,
  529. request_serializer,
  530. response_deserializer)
  531. def unary_stream(self, group, method):
  532. request_serializer = self._request_serializers.get((
  533. group,
  534. method,
  535. ))
  536. response_deserializer = self._response_deserializers.get((
  537. group,
  538. method,
  539. ))
  540. return _UnaryStreamMultiCallable(self._channel, group, method,
  541. self._metadata_transformer,
  542. request_serializer,
  543. response_deserializer)
  544. def stream_unary(self, group, method):
  545. request_serializer = self._request_serializers.get((
  546. group,
  547. method,
  548. ))
  549. response_deserializer = self._response_deserializers.get((
  550. group,
  551. method,
  552. ))
  553. return _StreamUnaryMultiCallable(self._channel, group, method,
  554. self._metadata_transformer,
  555. request_serializer,
  556. response_deserializer)
  557. def stream_stream(self, group, method):
  558. request_serializer = self._request_serializers.get((
  559. group,
  560. method,
  561. ))
  562. response_deserializer = self._response_deserializers.get((
  563. group,
  564. method,
  565. ))
  566. return _StreamStreamMultiCallable(self._channel, group, method,
  567. self._metadata_transformer,
  568. request_serializer,
  569. response_deserializer)
  570. def __enter__(self):
  571. return self
  572. def __exit__(self, exc_type, exc_val, exc_tb):
  573. return False
  574. class _DynamicStub(face.DynamicStub):
  575. def __init__(self, backing_generic_stub, group, cardinalities):
  576. self._generic_stub = backing_generic_stub
  577. self._group = group
  578. self._cardinalities = cardinalities
  579. def __getattr__(self, attr):
  580. method_cardinality = self._cardinalities.get(attr)
  581. if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
  582. return self._generic_stub.unary_unary(self._group, attr)
  583. elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
  584. return self._generic_stub.unary_stream(self._group, attr)
  585. elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
  586. return self._generic_stub.stream_unary(self._group, attr)
  587. elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
  588. return self._generic_stub.stream_stream(self._group, attr)
  589. else:
  590. raise AttributeError('_DynamicStub object has no attribute "%s"!' %
  591. attr)
  592. def __enter__(self):
  593. return self
  594. def __exit__(self, exc_type, exc_val, exc_tb):
  595. return False
  596. def generic_stub(channel, host, metadata_transformer, request_serializers,
  597. response_deserializers):
  598. return _GenericStub(channel, metadata_transformer, request_serializers,
  599. response_deserializers)
  600. def dynamic_stub(channel, service, cardinalities, host, metadata_transformer,
  601. request_serializers, response_deserializers):
  602. return _DynamicStub(
  603. _GenericStub(channel, metadata_transformer, request_serializers,
  604. response_deserializers), service, cardinalities)