channel.pyx.pxi 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
  15. 'Internal gRPC call error %d. ' +
  16. 'Please report to https://github.com/grpc/grpc/issues')
  17. cdef str _call_error_metadata(metadata):
  18. return 'metadata was invalid: %s' % metadata
  19. cdef str _call_error_no_metadata(c_call_error):
  20. return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
  21. cdef str _call_error(c_call_error, metadata):
  22. if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
  23. return _call_error_metadata(metadata)
  24. else:
  25. return _call_error_no_metadata(c_call_error)
  26. cdef _check_call_error_no_metadata(c_call_error):
  27. if c_call_error != GRPC_CALL_OK:
  28. return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
  29. else:
  30. return None
  31. cdef _check_and_raise_call_error_no_metadata(c_call_error):
  32. error = _check_call_error_no_metadata(c_call_error)
  33. if error is not None:
  34. raise ValueError(error)
  35. cdef _check_call_error(c_call_error, metadata):
  36. if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
  37. return _call_error_metadata(metadata)
  38. else:
  39. return _check_call_error_no_metadata(c_call_error)
  40. cdef void _raise_call_error_no_metadata(c_call_error) except *:
  41. raise ValueError(_call_error_no_metadata(c_call_error))
  42. cdef void _raise_call_error(c_call_error, metadata) except *:
  43. raise ValueError(_call_error(c_call_error, metadata))
  44. cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
  45. grpc_completion_queue_shutdown(c_completion_queue)
  46. grpc_completion_queue_destroy(c_completion_queue)
  47. cdef class _CallState:
  48. def __cinit__(self):
  49. self.due = set()
  50. cdef class _ChannelState:
  51. def __cinit__(self):
  52. self.condition = threading.Condition()
  53. self.open = True
  54. self.integrated_call_states = {}
  55. self.segregated_call_states = set()
  56. self.connectivity_due = set()
  57. self.closed_reason = None
  58. cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
  59. cdef grpc_call_error c_call_error
  60. cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
  61. tag.prepare()
  62. cpython.Py_INCREF(tag)
  63. with nogil:
  64. c_call_error = grpc_call_start_batch(
  65. c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
  66. return c_call_error, tag
  67. cdef object _operate_from_integrated_call(
  68. _ChannelState channel_state, _CallState call_state, object operations,
  69. object user_tag):
  70. cdef grpc_call_error c_call_error
  71. cdef _BatchOperationTag tag
  72. with channel_state.condition:
  73. if call_state.due:
  74. c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
  75. if c_call_error == GRPC_CALL_OK:
  76. call_state.due.add(tag)
  77. channel_state.integrated_call_states[tag] = call_state
  78. return True
  79. else:
  80. _raise_call_error_no_metadata(c_call_error)
  81. else:
  82. return False
  83. cdef object _operate_from_segregated_call(
  84. _ChannelState channel_state, _CallState call_state, object operations,
  85. object user_tag):
  86. cdef grpc_call_error c_call_error
  87. cdef _BatchOperationTag tag
  88. with channel_state.condition:
  89. if call_state.due:
  90. c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
  91. if c_call_error == GRPC_CALL_OK:
  92. call_state.due.add(tag)
  93. return True
  94. else:
  95. _raise_call_error_no_metadata(c_call_error)
  96. else:
  97. return False
  98. cdef _cancel(
  99. _ChannelState channel_state, _CallState call_state, grpc_status_code code,
  100. str details):
  101. cdef grpc_call_error c_call_error
  102. with channel_state.condition:
  103. if call_state.due:
  104. c_call_error = grpc_call_cancel_with_status(
  105. call_state.c_call, code, _encode(details), NULL)
  106. _check_and_raise_call_error_no_metadata(c_call_error)
  107. cdef _next_call_event(
  108. _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
  109. on_success, on_failure, deadline):
  110. """Block on the next event out of the completion queue.
  111. On success, `on_success` will be invoked with the tag taken from the CQ.
  112. In the case of a failure due to an exception raised in a signal handler,
  113. `on_failure` will be invoked with no arguments. Note that this situation
  114. can only occur on the main thread.
  115. Args:
  116. channel_state: The state for the channel on which the RPC is running.
  117. c_completion_queue: The CQ which will be polled.
  118. on_success: A callable object to be invoked upon successful receipt of a
  119. tag from the CQ.
  120. on_failure: A callable object to be invoked in case a Python exception is
  121. raised from a signal handler during polling.
  122. deadline: The point after which the RPC will time out.
  123. """
  124. try:
  125. tag, event = _latent_event(c_completion_queue, deadline)
  126. # NOTE(rbellevi): This broad except enables us to clean up resources before
  127. # propagating any exceptions raised by signal handlers to the application.
  128. except:
  129. if on_failure is not None:
  130. on_failure()
  131. raise
  132. else:
  133. with channel_state.condition:
  134. on_success(tag)
  135. channel_state.condition.notify_all()
  136. return event
  137. # TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
  138. cdef void _call(
  139. _ChannelState channel_state, _CallState call_state,
  140. grpc_completion_queue *c_completion_queue, on_success, int flags, method,
  141. host, object deadline, CallCredentials credentials,
  142. object operationses_and_user_tags, object metadata,
  143. object context) except *:
  144. """Invokes an RPC.
  145. Args:
  146. channel_state: A _ChannelState with its "open" attribute set to True. RPCs
  147. may not be invoked on a closed channel.
  148. call_state: An empty _CallState to be altered (specifically assigned a
  149. c_call and having its due set populated) if the RPC invocation is
  150. successful.
  151. c_completion_queue: A grpc_completion_queue to be used for the call's
  152. operations.
  153. on_success: A behavior to be called if attempting to start operations for
  154. the call succeeds. If called the behavior will be called while holding the
  155. channel_state condition and passed the tags associated with operations
  156. that were successfully started for the call.
  157. flags: Flags to be passed to gRPC Core as part of call creation.
  158. method: The fully-qualified name of the RPC method being invoked.
  159. host: A "host" string to be passed to gRPC Core as part of call creation.
  160. deadline: A float for the deadline of the RPC, or None if the RPC is to have
  161. no deadline.
  162. credentials: A _CallCredentials for the RPC or None.
  163. operationses_and_user_tags: A sequence of length-two sequences the first
  164. element of which is a sequence of Operations and the second element of
  165. which is an object to be used as a tag. A SendInitialMetadataOperation
  166. must be present in the first element of this value.
  167. metadata: The metadata for this call.
  168. context: Context object for distributed tracing.
  169. """
  170. cdef grpc_slice method_slice
  171. cdef grpc_slice host_slice
  172. cdef grpc_slice *host_slice_ptr
  173. cdef grpc_call_credentials *c_call_credentials
  174. cdef grpc_call_error c_call_error
  175. cdef tuple error_and_wrapper_tag
  176. cdef _BatchOperationTag wrapper_tag
  177. with channel_state.condition:
  178. if channel_state.open:
  179. method_slice = _slice_from_bytes(method)
  180. if host is None:
  181. host_slice_ptr = NULL
  182. else:
  183. host_slice = _slice_from_bytes(host)
  184. host_slice_ptr = &host_slice
  185. call_state.c_call = grpc_channel_create_call(
  186. channel_state.c_channel, NULL, flags,
  187. c_completion_queue, method_slice, host_slice_ptr,
  188. _timespec_from_time(deadline), NULL)
  189. grpc_slice_unref(method_slice)
  190. if host_slice_ptr:
  191. grpc_slice_unref(host_slice)
  192. if context is not None:
  193. set_census_context_on_call(call_state, context)
  194. if credentials is not None:
  195. c_call_credentials = credentials.c()
  196. c_call_error = grpc_call_set_credentials(
  197. call_state.c_call, c_call_credentials)
  198. grpc_call_credentials_release(c_call_credentials)
  199. if c_call_error != GRPC_CALL_OK:
  200. grpc_call_unref(call_state.c_call)
  201. call_state.c_call = NULL
  202. _raise_call_error_no_metadata(c_call_error)
  203. started_tags = set()
  204. for operations, user_tag in operationses_and_user_tags:
  205. c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
  206. if c_call_error == GRPC_CALL_OK:
  207. started_tags.add(tag)
  208. else:
  209. grpc_call_cancel(call_state.c_call, NULL)
  210. grpc_call_unref(call_state.c_call)
  211. call_state.c_call = NULL
  212. _raise_call_error(c_call_error, metadata)
  213. else:
  214. call_state.due.update(started_tags)
  215. on_success(started_tags)
  216. else:
  217. raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
  218. cdef void _process_integrated_call_tag(
  219. _ChannelState state, _BatchOperationTag tag) except *:
  220. cdef _CallState call_state = state.integrated_call_states.pop(tag)
  221. call_state.due.remove(tag)
  222. if not call_state.due:
  223. grpc_call_unref(call_state.c_call)
  224. call_state.c_call = NULL
  225. cdef class IntegratedCall:
  226. def __cinit__(self, _ChannelState channel_state, _CallState call_state):
  227. self._channel_state = channel_state
  228. self._call_state = call_state
  229. def operate(self, operations, tag):
  230. return _operate_from_integrated_call(
  231. self._channel_state, self._call_state, operations, tag)
  232. def cancel(self, code, details):
  233. _cancel(self._channel_state, self._call_state, code, details)
  234. cdef IntegratedCall _integrated_call(
  235. _ChannelState state, int flags, method, host, object deadline,
  236. object metadata, CallCredentials credentials, operationses_and_user_tags,
  237. object context):
  238. call_state = _CallState()
  239. def on_success(started_tags):
  240. for started_tag in started_tags:
  241. state.integrated_call_states[started_tag] = call_state
  242. _call(
  243. state, call_state, state.c_call_completion_queue, on_success, flags,
  244. method, host, deadline, credentials, operationses_and_user_tags, metadata, context)
  245. return IntegratedCall(state, call_state)
  246. cdef object _process_segregated_call_tag(
  247. _ChannelState state, _CallState call_state,
  248. grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
  249. call_state.due.remove(tag)
  250. if not call_state.due:
  251. grpc_call_unref(call_state.c_call)
  252. call_state.c_call = NULL
  253. state.segregated_call_states.remove(call_state)
  254. _destroy_c_completion_queue(c_completion_queue)
  255. return True
  256. else:
  257. return False
  258. cdef class SegregatedCall:
  259. def __cinit__(self, _ChannelState channel_state, _CallState call_state):
  260. self._channel_state = channel_state
  261. self._call_state = call_state
  262. def operate(self, operations, tag):
  263. return _operate_from_segregated_call(
  264. self._channel_state, self._call_state, operations, tag)
  265. def cancel(self, code, details):
  266. _cancel(self._channel_state, self._call_state, code, details)
  267. def next_event(self):
  268. def on_success(tag):
  269. _process_segregated_call_tag(
  270. self._channel_state, self._call_state, self._c_completion_queue, tag)
  271. def on_failure():
  272. self._call_state.due.clear()
  273. grpc_call_unref(self._call_state.c_call)
  274. self._call_state.c_call = NULL
  275. self._channel_state.segregated_call_states.remove(self._call_state)
  276. _destroy_c_completion_queue(self._c_completion_queue)
  277. return _next_call_event(
  278. self._channel_state, self._c_completion_queue, on_success, on_failure, None)
  279. cdef SegregatedCall _segregated_call(
  280. _ChannelState state, int flags, method, host, object deadline,
  281. object metadata, CallCredentials credentials, operationses_and_user_tags,
  282. object context):
  283. cdef _CallState call_state = _CallState()
  284. cdef SegregatedCall segregated_call
  285. cdef grpc_completion_queue *c_completion_queue
  286. def on_success(started_tags):
  287. state.segregated_call_states.add(call_state)
  288. with state.condition:
  289. if state.open:
  290. c_completion_queue = (grpc_completion_queue_create_for_next(NULL))
  291. else:
  292. raise ValueError('Cannot invoke RPC on closed channel!')
  293. try:
  294. _call(
  295. state, call_state, c_completion_queue, on_success, flags, method, host,
  296. deadline, credentials, operationses_and_user_tags, metadata,
  297. context)
  298. except:
  299. _destroy_c_completion_queue(c_completion_queue)
  300. raise
  301. segregated_call = SegregatedCall(state, call_state)
  302. segregated_call._c_completion_queue = c_completion_queue
  303. return segregated_call
  304. cdef object _watch_connectivity_state(
  305. _ChannelState state, grpc_connectivity_state last_observed_state,
  306. object deadline):
  307. cdef _ConnectivityTag tag = _ConnectivityTag(object())
  308. with state.condition:
  309. if state.open:
  310. cpython.Py_INCREF(tag)
  311. grpc_channel_watch_connectivity_state(
  312. state.c_channel, last_observed_state, _timespec_from_time(deadline),
  313. state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
  314. state.connectivity_due.add(tag)
  315. else:
  316. raise ValueError('Cannot monitor channel state: %s' % state.closed_reason)
  317. completed_tag, event = _latent_event(
  318. state.c_connectivity_completion_queue, None)
  319. with state.condition:
  320. state.connectivity_due.remove(completed_tag)
  321. state.condition.notify_all()
  322. return event
  323. cdef _close(Channel channel, grpc_status_code code, object details,
  324. drain_calls):
  325. cdef _ChannelState state = channel._state
  326. cdef _CallState call_state
  327. encoded_details = _encode(details)
  328. with state.condition:
  329. if state.open:
  330. state.open = False
  331. state.closed_reason = details
  332. for call_state in set(state.integrated_call_states.values()):
  333. grpc_call_cancel_with_status(
  334. call_state.c_call, code, encoded_details, NULL)
  335. for call_state in state.segregated_call_states:
  336. grpc_call_cancel_with_status(
  337. call_state.c_call, code, encoded_details, NULL)
  338. # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
  339. # watching.
  340. if drain_calls:
  341. while not _calls_drained(state):
  342. event = channel.next_call_event()
  343. if event.completion_type == CompletionType.queue_timeout:
  344. continue
  345. event.tag(event)
  346. else:
  347. while state.integrated_call_states:
  348. state.condition.wait()
  349. while state.connectivity_due:
  350. state.condition.wait()
  351. _destroy_c_completion_queue(state.c_call_completion_queue)
  352. _destroy_c_completion_queue(state.c_connectivity_completion_queue)
  353. grpc_channel_destroy(state.c_channel)
  354. state.c_channel = NULL
  355. grpc_shutdown()
  356. state.condition.notify_all()
  357. else:
  358. # Another call to close already completed in the past or is currently
  359. # being executed in another thread.
  360. while state.c_channel != NULL:
  361. state.condition.wait()
  362. cdef _calls_drained(_ChannelState state):
  363. return not (state.integrated_call_states or state.segregated_call_states or
  364. state.connectivity_due)
  365. cdef class Channel:
  366. def __cinit__(
  367. self, bytes target, object arguments,
  368. ChannelCredentials channel_credentials):
  369. arguments = () if arguments is None else tuple(arguments)
  370. fork_handlers_and_grpc_init()
  371. self._state = _ChannelState()
  372. self._state.c_call_completion_queue = (
  373. grpc_completion_queue_create_for_next(NULL))
  374. self._state.c_connectivity_completion_queue = (
  375. grpc_completion_queue_create_for_next(NULL))
  376. self._arguments = arguments
  377. cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
  378. c_channel_credentials = (
  379. channel_credentials.c() if channel_credentials is not None
  380. else grpc_insecure_credentials_create())
  381. self._state.c_channel = grpc_channel_create(
  382. <char *>target, c_channel_credentials, channel_args.c_args())
  383. grpc_channel_credentials_release(c_channel_credentials)
  384. def target(self):
  385. cdef char *c_target
  386. with self._state.condition:
  387. c_target = grpc_channel_get_target(self._state.c_channel)
  388. target = <bytes>c_target
  389. gpr_free(c_target)
  390. return target
  391. def integrated_call(
  392. self, int flags, method, host, object deadline, object metadata,
  393. CallCredentials credentials, operationses_and_tags,
  394. object context = None):
  395. return _integrated_call(
  396. self._state, flags, method, host, deadline, metadata, credentials,
  397. operationses_and_tags, context)
  398. def next_call_event(self):
  399. def on_success(tag):
  400. if tag is not None:
  401. _process_integrated_call_tag(self._state, tag)
  402. if is_fork_support_enabled():
  403. queue_deadline = time.time() + 1.0
  404. else:
  405. queue_deadline = None
  406. # NOTE(gnossen): It is acceptable for on_failure to be None here because
  407. # failure conditions can only ever happen on the main thread and this
  408. # method is only ever invoked on the channel spin thread.
  409. return _next_call_event(self._state, self._state.c_call_completion_queue,
  410. on_success, None, queue_deadline)
  411. def segregated_call(
  412. self, int flags, method, host, object deadline, object metadata,
  413. CallCredentials credentials, operationses_and_tags,
  414. object context = None):
  415. return _segregated_call(
  416. self._state, flags, method, host, deadline, metadata, credentials,
  417. operationses_and_tags, context)
  418. def check_connectivity_state(self, bint try_to_connect):
  419. with self._state.condition:
  420. if self._state.open:
  421. return grpc_channel_check_connectivity_state(
  422. self._state.c_channel, try_to_connect)
  423. else:
  424. raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
  425. def watch_connectivity_state(
  426. self, grpc_connectivity_state last_observed_state, object deadline):
  427. return _watch_connectivity_state(self._state, last_observed_state, deadline)
  428. def close(self, code, details):
  429. _close(self, code, details, False)
  430. def close_on_fork(self, code, details):
  431. _close(self, code, details, True)