thread.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #if !defined(__MACH__)
  6. # define _GNU_SOURCE
  7. #endif
  8. #include <aws/common/clock.h>
  9. #include <aws/common/linked_list.h>
  10. #include <aws/common/logging.h>
  11. #include <aws/common/private/dlloads.h>
  12. #include <aws/common/private/thread_shared.h>
  13. #include <aws/common/string.h>
  14. #include <aws/common/thread.h>
  15. #include <dlfcn.h>
  16. #include <errno.h>
  17. #include <inttypes.h>
  18. #include <limits.h>
  19. #include <sched.h>
  20. #include <time.h>
  21. #include <unistd.h>
  22. #if defined(__FreeBSD__) || defined(__NETBSD__)
  23. # include <pthread_np.h>
  24. typedef cpuset_t cpu_set_t;
  25. #endif
  26. #if !defined(AWS_AFFINITY_METHOD)
  27. # error "Must provide a method for setting thread affinity"
  28. #endif
  29. // Possible methods for setting thread affinity
  30. #define AWS_AFFINITY_METHOD_NONE 0
  31. #define AWS_AFFINITY_METHOD_PTHREAD_ATTR 1
  32. #define AWS_AFFINITY_METHOD_PTHREAD 2
  33. // Ensure provided affinity method matches one of the supported values
  34. // clang-format off
  35. #if AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_NONE \
  36. && AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_PTHREAD_ATTR \
  37. && AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_PTHREAD
  38. // clang-format on
  39. # error "Invalid thread affinity method"
  40. #endif
  41. static struct aws_thread_options s_default_options = {
  42. /* this will make sure platform default stack size is used. */
  43. .stack_size = 0,
  44. .cpu_id = -1,
  45. .join_strategy = AWS_TJS_MANUAL,
  46. };
  47. struct thread_atexit_callback {
  48. aws_thread_atexit_fn *callback;
  49. void *user_data;
  50. struct thread_atexit_callback *next;
  51. };
  52. struct thread_wrapper {
  53. struct aws_allocator *allocator;
  54. struct aws_linked_list_node node;
  55. void (*func)(void *arg);
  56. void *arg;
  57. struct thread_atexit_callback *atexit;
  58. void (*call_once)(void *);
  59. void *once_arg;
  60. struct aws_string *name;
  61. /*
  62. * The managed thread system does lazy joins on threads once finished via their wrapper. For that to work
  63. * we need something to join against, so we keep a by-value copy of the original thread here. The tricky part
  64. * is how to set the threadid/handle of this copy since the copy must be injected into the thread function before
  65. * the threadid/handle is known. We get around that by just querying it at the top of the wrapper thread function.
  66. */
  67. struct aws_thread thread_copy;
  68. bool membind;
  69. };
  70. static AWS_THREAD_LOCAL struct thread_wrapper *tl_wrapper = NULL;
  71. static void s_thread_wrapper_destroy(struct thread_wrapper *wrapper) {
  72. if (!wrapper) {
  73. return;
  74. }
  75. aws_string_destroy(wrapper->name);
  76. aws_mem_release(wrapper->allocator, wrapper);
  77. }
  78. /*
  79. * thread_wrapper is platform-dependent so this function ends up being duplicated in each thread implementation
  80. */
  81. void aws_thread_join_and_free_wrapper_list(struct aws_linked_list *wrapper_list) {
  82. struct aws_linked_list_node *iter = aws_linked_list_begin(wrapper_list);
  83. while (iter != aws_linked_list_end(wrapper_list)) {
  84. struct thread_wrapper *join_thread_wrapper = AWS_CONTAINER_OF(iter, struct thread_wrapper, node);
  85. /*
  86. * Can't do a for-loop since we need to advance to the next wrapper before we free the wrapper
  87. */
  88. iter = aws_linked_list_next(iter);
  89. join_thread_wrapper->thread_copy.detach_state = AWS_THREAD_JOINABLE;
  90. aws_thread_join(&join_thread_wrapper->thread_copy);
  91. /*
  92. * This doesn't actually do anything when using posix threads, but it keeps us
  93. * in sync with the Windows version as well as the lifecycle contract we're
  94. * presenting for threads.
  95. */
  96. aws_thread_clean_up(&join_thread_wrapper->thread_copy);
  97. s_thread_wrapper_destroy(join_thread_wrapper);
  98. aws_thread_decrement_unjoined_count();
  99. }
  100. }
  101. /* This must be called from the thread itself.
  102. * (only necessary for Apple, but we'll do it that way on every platform for consistency) */
  103. static void s_set_thread_name(pthread_t thread_id, const char *name) {
  104. #if defined(__APPLE__)
  105. (void)thread_id;
  106. pthread_setname_np(name);
  107. #elif defined(AWS_PTHREAD_SETNAME_TAKES_2ARGS)
  108. pthread_setname_np(thread_id, name);
  109. #elif defined(AWS_PTHREAD_SETNAME_TAKES_3ARGS)
  110. pthread_setname_np(thread_id, name, NULL);
  111. #else
  112. (void)thread_id;
  113. (void)name;
  114. #endif
  115. }
  116. static void *thread_fn(void *arg) {
  117. struct thread_wrapper *wrapper_ptr = arg;
  118. /*
  119. * Make sure the aws_thread copy has the right thread id stored in it.
  120. */
  121. wrapper_ptr->thread_copy.thread_id = aws_thread_current_thread_id();
  122. /* If there's a name, set it.
  123. * Then free the aws_string before we make copies of the wrapper struct */
  124. if (wrapper_ptr->name) {
  125. s_set_thread_name(wrapper_ptr->thread_copy.thread_id, aws_string_c_str(wrapper_ptr->name));
  126. aws_string_destroy(wrapper_ptr->name);
  127. wrapper_ptr->name = NULL;
  128. }
  129. struct thread_wrapper wrapper = *wrapper_ptr;
  130. struct aws_allocator *allocator = wrapper.allocator;
  131. tl_wrapper = &wrapper;
  132. if (wrapper.membind && g_set_mempolicy_ptr) {
  133. AWS_LOGF_INFO(
  134. AWS_LS_COMMON_THREAD,
  135. "a cpu affinity was specified when launching this thread and set_mempolicy() is available on this "
  136. "system. Setting the memory policy to MPOL_PREFERRED");
  137. /* if a user set a cpu id in their thread options, we're going to make sure the numa policy honors that
  138. * and makes sure the numa node of the cpu we launched this thread on is where memory gets allocated. However,
  139. * we don't want to fail the application if this fails, so make the call, and ignore the result. */
  140. long resp = g_set_mempolicy_ptr(AWS_MPOL_PREFERRED_ALIAS, NULL, 0);
  141. if (resp) {
  142. AWS_LOGF_WARN(AWS_LS_COMMON_THREAD, "call to set_mempolicy() failed with errno %d", errno);
  143. }
  144. }
  145. wrapper.func(wrapper.arg);
  146. /*
  147. * Managed threads don't free the wrapper yet. The thread management system does it later after the thread
  148. * is joined.
  149. */
  150. bool is_managed_thread = wrapper.thread_copy.detach_state == AWS_THREAD_MANAGED;
  151. if (!is_managed_thread) {
  152. s_thread_wrapper_destroy(wrapper_ptr);
  153. wrapper_ptr = NULL;
  154. }
  155. struct thread_atexit_callback *exit_callback_data = wrapper.atexit;
  156. while (exit_callback_data) {
  157. aws_thread_atexit_fn *exit_callback = exit_callback_data->callback;
  158. void *exit_callback_user_data = exit_callback_data->user_data;
  159. struct thread_atexit_callback *next_exit_callback_data = exit_callback_data->next;
  160. aws_mem_release(allocator, exit_callback_data);
  161. exit_callback(exit_callback_user_data);
  162. exit_callback_data = next_exit_callback_data;
  163. }
  164. tl_wrapper = NULL;
  165. /*
  166. * Release this thread to the managed thread system for lazy join.
  167. */
  168. if (is_managed_thread) {
  169. aws_thread_pending_join_add(&wrapper_ptr->node);
  170. }
  171. return NULL;
  172. }
  173. const struct aws_thread_options *aws_default_thread_options(void) {
  174. return &s_default_options;
  175. }
  176. void aws_thread_clean_up(struct aws_thread *thread) {
  177. if (thread->detach_state == AWS_THREAD_JOINABLE) {
  178. pthread_detach(thread->thread_id);
  179. }
  180. }
  181. static void s_call_once(void) {
  182. tl_wrapper->call_once(tl_wrapper->once_arg);
  183. }
  184. void aws_thread_call_once(aws_thread_once *flag, void (*call_once)(void *), void *user_data) {
  185. // If this is a non-aws_thread, then gin up a temp thread wrapper
  186. struct thread_wrapper temp_wrapper;
  187. if (!tl_wrapper) {
  188. tl_wrapper = &temp_wrapper;
  189. }
  190. tl_wrapper->call_once = call_once;
  191. tl_wrapper->once_arg = user_data;
  192. pthread_once(flag, s_call_once);
  193. if (tl_wrapper == &temp_wrapper) {
  194. tl_wrapper = NULL;
  195. }
  196. }
  197. int aws_thread_init(struct aws_thread *thread, struct aws_allocator *allocator) {
  198. *thread = (struct aws_thread){.allocator = allocator, .detach_state = AWS_THREAD_NOT_CREATED};
  199. return AWS_OP_SUCCESS;
  200. }
  201. int aws_thread_launch(
  202. struct aws_thread *thread,
  203. void (*func)(void *arg),
  204. void *arg,
  205. const struct aws_thread_options *options) {
  206. pthread_attr_t attributes;
  207. pthread_attr_t *attributes_ptr = NULL;
  208. int attr_return = 0;
  209. struct thread_wrapper *wrapper = NULL;
  210. bool is_managed_thread = options != NULL && options->join_strategy == AWS_TJS_MANAGED;
  211. if (is_managed_thread) {
  212. thread->detach_state = AWS_THREAD_MANAGED;
  213. }
  214. if (options) {
  215. attr_return = pthread_attr_init(&attributes);
  216. if (attr_return) {
  217. goto cleanup;
  218. }
  219. attributes_ptr = &attributes;
  220. if (options->stack_size > PTHREAD_STACK_MIN) {
  221. attr_return = pthread_attr_setstacksize(attributes_ptr, options->stack_size);
  222. if (attr_return) {
  223. goto cleanup;
  224. }
  225. }
  226. /* AFAIK you can't set thread affinity on apple platforms, and it doesn't really matter since all memory
  227. * NUMA or not is setup in interleave mode.
  228. * Thread afinity is also not supported on Android systems, and honestly, if you're running android on a NUMA
  229. * configuration, you've got bigger problems. */
  230. #if AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD_ATTR
  231. if (options->cpu_id >= 0) {
  232. AWS_LOGF_INFO(
  233. AWS_LS_COMMON_THREAD,
  234. "id=%p: cpu affinity of cpu_id %d was specified, attempting to honor the value.",
  235. (void *)thread,
  236. options->cpu_id);
  237. cpu_set_t cpuset;
  238. CPU_ZERO(&cpuset);
  239. CPU_SET((uint32_t)options->cpu_id, &cpuset);
  240. attr_return = pthread_attr_setaffinity_np(attributes_ptr, sizeof(cpuset), &cpuset);
  241. if (attr_return) {
  242. AWS_LOGF_ERROR(
  243. AWS_LS_COMMON_THREAD,
  244. "id=%p: pthread_attr_setaffinity_np() failed with %d.",
  245. (void *)thread,
  246. attr_return);
  247. goto cleanup;
  248. }
  249. }
  250. #endif /* AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD_ATTR */
  251. }
  252. wrapper = aws_mem_calloc(thread->allocator, 1, sizeof(struct thread_wrapper));
  253. if (options) {
  254. if (options->cpu_id >= 0) {
  255. wrapper->membind = true;
  256. }
  257. if (options->name.len > 0) {
  258. wrapper->name = aws_string_new_from_cursor(thread->allocator, &options->name);
  259. }
  260. }
  261. wrapper->thread_copy = *thread;
  262. wrapper->allocator = thread->allocator;
  263. wrapper->func = func;
  264. wrapper->arg = arg;
  265. /*
  266. * Increment the count prior to spawning the thread. Decrement back if the create failed.
  267. */
  268. if (is_managed_thread) {
  269. aws_thread_increment_unjoined_count();
  270. }
  271. attr_return = pthread_create(&thread->thread_id, attributes_ptr, thread_fn, (void *)wrapper);
  272. if (attr_return) {
  273. AWS_LOGF_ERROR(AWS_LS_COMMON_THREAD, "id=%p: pthread_create() failed with %d", (void *)thread, attr_return);
  274. if (is_managed_thread) {
  275. aws_thread_decrement_unjoined_count();
  276. }
  277. goto cleanup;
  278. }
  279. #if AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD
  280. /* If we don't have pthread_attr_setaffinity_np, we may
  281. * still be able to set the thread affinity after creation. */
  282. if (options && options->cpu_id >= 0) {
  283. AWS_LOGF_INFO(
  284. AWS_LS_COMMON_THREAD,
  285. "id=%p: cpu affinity of cpu_id %d was specified, attempting to honor the value.",
  286. (void *)thread,
  287. options->cpu_id);
  288. cpu_set_t cpuset;
  289. CPU_ZERO(&cpuset);
  290. CPU_SET((uint32_t)options->cpu_id, &cpuset);
  291. /* If this fails, just warn. We can't fail anymore, the thread has already launched. */
  292. int setaffinity_return = pthread_setaffinity_np(thread->thread_id, sizeof(cpuset), &cpuset);
  293. if (setaffinity_return) {
  294. AWS_LOGF_WARN(
  295. AWS_LS_COMMON_THREAD,
  296. "id=%p: pthread_setaffinity_np() failed with %d. Running thread without CPU affinity.",
  297. (void *)thread,
  298. setaffinity_return);
  299. }
  300. }
  301. #endif /* AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD */
  302. /*
  303. * Managed threads need to stay unjoinable from an external perspective. We'll handle it after thread function
  304. * completion.
  305. */
  306. if (is_managed_thread) {
  307. aws_thread_clean_up(thread);
  308. } else {
  309. thread->detach_state = AWS_THREAD_JOINABLE;
  310. }
  311. cleanup:
  312. if (attributes_ptr) {
  313. pthread_attr_destroy(attributes_ptr);
  314. }
  315. if (attr_return) {
  316. s_thread_wrapper_destroy(wrapper);
  317. switch (attr_return) {
  318. case EINVAL:
  319. return aws_raise_error(AWS_ERROR_THREAD_INVALID_SETTINGS);
  320. case EAGAIN:
  321. return aws_raise_error(AWS_ERROR_THREAD_INSUFFICIENT_RESOURCE);
  322. case EPERM:
  323. return aws_raise_error(AWS_ERROR_THREAD_NO_PERMISSIONS);
  324. case ENOMEM:
  325. return aws_raise_error(AWS_ERROR_OOM);
  326. default:
  327. return aws_raise_error(AWS_ERROR_UNKNOWN);
  328. }
  329. }
  330. return AWS_OP_SUCCESS;
  331. }
  332. aws_thread_id_t aws_thread_get_id(struct aws_thread *thread) {
  333. return thread->thread_id;
  334. }
  335. enum aws_thread_detach_state aws_thread_get_detach_state(struct aws_thread *thread) {
  336. return thread->detach_state;
  337. }
  338. int aws_thread_join(struct aws_thread *thread) {
  339. if (thread->detach_state == AWS_THREAD_JOINABLE) {
  340. int err_no = pthread_join(thread->thread_id, 0);
  341. if (err_no) {
  342. if (err_no == EINVAL) {
  343. return aws_raise_error(AWS_ERROR_THREAD_NOT_JOINABLE);
  344. }
  345. if (err_no == ESRCH) {
  346. return aws_raise_error(AWS_ERROR_THREAD_NO_SUCH_THREAD_ID);
  347. }
  348. if (err_no == EDEADLK) {
  349. return aws_raise_error(AWS_ERROR_THREAD_DEADLOCK_DETECTED);
  350. }
  351. }
  352. thread->detach_state = AWS_THREAD_JOIN_COMPLETED;
  353. }
  354. return AWS_OP_SUCCESS;
  355. }
  356. aws_thread_id_t aws_thread_current_thread_id(void) {
  357. return pthread_self();
  358. }
  359. bool aws_thread_thread_id_equal(aws_thread_id_t t1, aws_thread_id_t t2) {
  360. return pthread_equal(t1, t2) != 0;
  361. }
  362. void aws_thread_current_sleep(uint64_t nanos) {
  363. uint64_t nano = 0;
  364. time_t seconds = (time_t)aws_timestamp_convert(nanos, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &nano);
  365. struct timespec tm = {
  366. .tv_sec = seconds,
  367. .tv_nsec = (long)nano,
  368. };
  369. struct timespec output;
  370. nanosleep(&tm, &output);
  371. }
  372. int aws_thread_current_at_exit(aws_thread_atexit_fn *callback, void *user_data) {
  373. if (!tl_wrapper) {
  374. return aws_raise_error(AWS_ERROR_THREAD_NOT_JOINABLE);
  375. }
  376. struct thread_atexit_callback *cb = aws_mem_calloc(tl_wrapper->allocator, 1, sizeof(struct thread_atexit_callback));
  377. if (!cb) {
  378. return AWS_OP_ERR;
  379. }
  380. cb->callback = callback;
  381. cb->user_data = user_data;
  382. cb->next = tl_wrapper->atexit;
  383. tl_wrapper->atexit = cb;
  384. return AWS_OP_SUCCESS;
  385. }