server.pyx.pxi 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. cdef class Server:
  15. def __cinit__(self, object arguments, bint xds):
  16. fork_handlers_and_grpc_init()
  17. self.references = []
  18. self.registered_completion_queues = []
  19. self.is_started = False
  20. self.is_shutting_down = False
  21. self.is_shutdown = False
  22. self.c_server = NULL
  23. cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
  24. self.c_server = grpc_server_create(channel_args.c_args(), NULL)
  25. cdef grpc_server_xds_status_notifier notifier
  26. notifier.on_serving_status_update = NULL
  27. notifier.user_data = NULL
  28. if xds:
  29. grpc_server_set_config_fetcher(self.c_server,
  30. grpc_server_config_fetcher_xds_create(notifier, channel_args.c_args()))
  31. self.references.append(arguments)
  32. def request_call(
  33. self, CompletionQueue call_queue not None,
  34. CompletionQueue server_queue not None, tag):
  35. if not self.is_started or self.is_shutting_down:
  36. raise ValueError("server must be started and not shutting down")
  37. if server_queue not in self.registered_completion_queues:
  38. raise ValueError("server_queue must be a registered completion queue")
  39. cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
  40. request_call_tag.prepare()
  41. cpython.Py_INCREF(request_call_tag)
  42. return grpc_server_request_call(
  43. self.c_server, &request_call_tag.call.c_call,
  44. &request_call_tag.call_details.c_details,
  45. &request_call_tag.c_invocation_metadata,
  46. call_queue.c_completion_queue, server_queue.c_completion_queue,
  47. <cpython.PyObject *>request_call_tag)
  48. def register_completion_queue(
  49. self, CompletionQueue queue not None):
  50. if self.is_started:
  51. raise ValueError("cannot register completion queues after start")
  52. with nogil:
  53. grpc_server_register_completion_queue(
  54. self.c_server, queue.c_completion_queue, NULL)
  55. self.registered_completion_queues.append(queue)
  56. def start(self, backup_queue=True):
  57. """Start the Cython gRPC Server.
  58. Args:
  59. backup_queue: a bool indicates whether to spawn a backup completion
  60. queue. In the case that no CQ is bound to the server, and the shutdown
  61. of server becomes un-observable.
  62. """
  63. if self.is_started:
  64. raise ValueError("the server has already started")
  65. if backup_queue:
  66. self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
  67. self.register_completion_queue(self.backup_shutdown_queue)
  68. self.is_started = True
  69. with nogil:
  70. grpc_server_start(self.c_server)
  71. if backup_queue:
  72. # Ensure the core has gotten a chance to do the start-up work
  73. self.backup_shutdown_queue.poll(deadline=time.time())
  74. def add_http2_port(self, bytes address,
  75. ServerCredentials server_credentials=None):
  76. address = str_to_bytes(address)
  77. self.references.append(address)
  78. cdef int result
  79. cdef char *address_c_string = address
  80. if server_credentials is not None:
  81. self.references.append(server_credentials)
  82. with nogil:
  83. result = grpc_server_add_http2_port(
  84. self.c_server, address_c_string, server_credentials.c_credentials)
  85. else:
  86. with nogil:
  87. creds = grpc_insecure_server_credentials_create()
  88. result = grpc_server_add_http2_port(self.c_server,
  89. address_c_string, creds)
  90. grpc_server_credentials_release(creds)
  91. return result
  92. cdef _c_shutdown(self, CompletionQueue queue, tag):
  93. self.is_shutting_down = True
  94. cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
  95. cpython.Py_INCREF(server_shutdown_tag)
  96. with nogil:
  97. grpc_server_shutdown_and_notify(
  98. self.c_server, queue.c_completion_queue,
  99. <cpython.PyObject *>server_shutdown_tag)
  100. def shutdown(self, CompletionQueue queue not None, tag):
  101. if queue.is_shutting_down:
  102. raise ValueError("queue must be live")
  103. elif not self.is_started:
  104. raise ValueError("the server hasn't started yet")
  105. elif self.is_shutting_down:
  106. return
  107. elif queue not in self.registered_completion_queues:
  108. raise ValueError("expected registered completion queue")
  109. else:
  110. self._c_shutdown(queue, tag)
  111. cdef notify_shutdown_complete(self):
  112. # called only after our server shutdown tag has emerged from a completion
  113. # queue.
  114. self.is_shutdown = True
  115. def cancel_all_calls(self):
  116. if not self.is_shutting_down:
  117. raise UsageError("the server must be shutting down to cancel all calls")
  118. elif self.is_shutdown:
  119. return
  120. else:
  121. with nogil:
  122. grpc_server_cancel_all_calls(self.c_server)
  123. # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
  124. # portion of this is safe to call from __dealloc__, and potentially remove
  125. # backup_shutdown_queue.
  126. def destroy(self):
  127. if self.c_server != NULL:
  128. if not self.is_started:
  129. pass
  130. elif self.is_shutdown:
  131. pass
  132. elif not self.is_shutting_down:
  133. if self.backup_shutdown_queue is None:
  134. raise InternalError('Server shutdown failed: no completion queue.')
  135. else:
  136. # the user didn't call shutdown - use our backup queue
  137. self._c_shutdown(self.backup_shutdown_queue, None)
  138. # and now we wait
  139. while not self.is_shutdown:
  140. self.backup_shutdown_queue.poll()
  141. else:
  142. # We're in the process of shutting down, but have not shutdown; can't do
  143. # much but repeatedly release the GIL and wait
  144. while not self.is_shutdown:
  145. time.sleep(0)
  146. with nogil:
  147. grpc_server_destroy(self.c_server)
  148. self.c_server = NULL
  149. def __dealloc__(self):
  150. if self.c_server == NULL:
  151. grpc_shutdown()