thread.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #if !defined(__MACH__)
  6. # define _GNU_SOURCE
  7. #endif
  8. #include <aws/common/clock.h>
  9. #include <aws/common/linked_list.h>
  10. #include <aws/common/logging.h>
  11. #include <aws/common/private/dlloads.h>
  12. #include <aws/common/private/thread_shared.h>
  13. #include <aws/common/string.h>
  14. #include <aws/common/thread.h>
  15. #include <dlfcn.h>
  16. #include <errno.h>
  17. #include <inttypes.h>
  18. #include <limits.h>
  19. #include <sched.h>
  20. #include <time.h>
  21. #include <unistd.h>
  22. #if defined(__FreeBSD__) || defined(__NetBSD__)
  23. # include <pthread_np.h>
  24. typedef cpuset_t cpu_set_t;
  25. #elif defined(__OpenBSD__)
  26. # include <pthread_np.h>
  27. #endif
  28. #if !defined(AWS_AFFINITY_METHOD)
  29. # error "Must provide a method for setting thread affinity"
  30. #endif
  31. // Possible methods for setting thread affinity
  32. #define AWS_AFFINITY_METHOD_NONE 0
  33. #define AWS_AFFINITY_METHOD_PTHREAD_ATTR 1
  34. #define AWS_AFFINITY_METHOD_PTHREAD 2
  35. // Ensure provided affinity method matches one of the supported values
  36. // clang-format off
  37. #if AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_NONE \
  38. && AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_PTHREAD_ATTR \
  39. && AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_PTHREAD
  40. // clang-format on
  41. # error "Invalid thread affinity method"
  42. #endif
  43. static struct aws_thread_options s_default_options = {
  44. /* this will make sure platform default stack size is used. */
  45. .stack_size = 0,
  46. .cpu_id = -1,
  47. .join_strategy = AWS_TJS_MANUAL,
  48. };
  49. struct thread_atexit_callback {
  50. aws_thread_atexit_fn *callback;
  51. void *user_data;
  52. struct thread_atexit_callback *next;
  53. };
  54. struct thread_wrapper {
  55. struct aws_allocator *allocator;
  56. struct aws_linked_list_node node;
  57. void (*func)(void *arg);
  58. void *arg;
  59. struct thread_atexit_callback *atexit;
  60. void (*call_once)(void *);
  61. void *once_arg;
  62. struct aws_string *name;
  63. /*
  64. * The managed thread system does lazy joins on threads once finished via their wrapper. For that to work
  65. * we need something to join against, so we keep a by-value copy of the original thread here. The tricky part
  66. * is how to set the threadid/handle of this copy since the copy must be injected into the thread function before
  67. * the threadid/handle is known. We get around that by just querying it at the top of the wrapper thread function.
  68. */
  69. struct aws_thread thread_copy;
  70. bool membind;
  71. };
  72. static AWS_THREAD_LOCAL struct thread_wrapper *tl_wrapper = NULL;
  73. static void s_thread_wrapper_destroy(struct thread_wrapper *wrapper) {
  74. if (!wrapper) {
  75. return;
  76. }
  77. aws_string_destroy(wrapper->name);
  78. aws_mem_release(wrapper->allocator, wrapper);
  79. }
  80. /*
  81. * thread_wrapper is platform-dependent so this function ends up being duplicated in each thread implementation
  82. */
  83. void aws_thread_join_and_free_wrapper_list(struct aws_linked_list *wrapper_list) {
  84. struct aws_linked_list_node *iter = aws_linked_list_begin(wrapper_list);
  85. while (iter != aws_linked_list_end(wrapper_list)) {
  86. struct thread_wrapper *join_thread_wrapper = AWS_CONTAINER_OF(iter, struct thread_wrapper, node);
  87. /*
  88. * Can't do a for-loop since we need to advance to the next wrapper before we free the wrapper
  89. */
  90. iter = aws_linked_list_next(iter);
  91. join_thread_wrapper->thread_copy.detach_state = AWS_THREAD_JOINABLE;
  92. aws_thread_join(&join_thread_wrapper->thread_copy);
  93. /*
  94. * This doesn't actually do anything when using posix threads, but it keeps us
  95. * in sync with the Windows version as well as the lifecycle contract we're
  96. * presenting for threads.
  97. */
  98. aws_thread_clean_up(&join_thread_wrapper->thread_copy);
  99. s_thread_wrapper_destroy(join_thread_wrapper);
  100. aws_thread_decrement_unjoined_count();
  101. }
  102. }
  103. /* This must be called from the thread itself.
  104. * (only necessary for Apple, but we'll do it that way on every platform for consistency) */
  105. static void s_set_thread_name(pthread_t thread_id, const char *name) {
  106. #if defined(__APPLE__)
  107. (void)thread_id;
  108. pthread_setname_np(name);
  109. #elif defined(AWS_PTHREAD_SETNAME_TAKES_2ARGS)
  110. pthread_setname_np(thread_id, name);
  111. #elif defined(AWS_PTHREAD_SET_NAME_TAKES_2ARGS)
  112. pthread_set_name_np(thread_id, name);
  113. #elif defined(AWS_PTHREAD_SETNAME_TAKES_3ARGS)
  114. pthread_setname_np(thread_id, name, NULL);
  115. #else
  116. (void)thread_id;
  117. (void)name;
  118. #endif
  119. }
  120. static void *thread_fn(void *arg) {
  121. struct thread_wrapper *wrapper_ptr = arg;
  122. /*
  123. * Make sure the aws_thread copy has the right thread id stored in it.
  124. */
  125. wrapper_ptr->thread_copy.thread_id = aws_thread_current_thread_id();
  126. /* If there's a name, set it.
  127. * Then free the aws_string before we make copies of the wrapper struct */
  128. if (wrapper_ptr->name) {
  129. s_set_thread_name(wrapper_ptr->thread_copy.thread_id, aws_string_c_str(wrapper_ptr->name));
  130. aws_string_destroy(wrapper_ptr->name);
  131. wrapper_ptr->name = NULL;
  132. }
  133. struct thread_wrapper wrapper = *wrapper_ptr;
  134. struct aws_allocator *allocator = wrapper.allocator;
  135. tl_wrapper = &wrapper;
  136. if (wrapper.membind && g_set_mempolicy_ptr) {
  137. AWS_LOGF_INFO(
  138. AWS_LS_COMMON_THREAD,
  139. "a cpu affinity was specified when launching this thread and set_mempolicy() is available on this "
  140. "system. Setting the memory policy to MPOL_PREFERRED");
  141. /* if a user set a cpu id in their thread options, we're going to make sure the numa policy honors that
  142. * and makes sure the numa node of the cpu we launched this thread on is where memory gets allocated. However,
  143. * we don't want to fail the application if this fails, so make the call, and ignore the result. */
  144. long resp = g_set_mempolicy_ptr(AWS_MPOL_PREFERRED_ALIAS, NULL, 0);
  145. int errno_value = errno; /* Always cache errno before potential side-effect */
  146. if (resp) {
  147. AWS_LOGF_WARN(AWS_LS_COMMON_THREAD, "call to set_mempolicy() failed with errno %d", errno_value);
  148. }
  149. }
  150. wrapper.func(wrapper.arg);
  151. /*
  152. * Managed threads don't free the wrapper yet. The thread management system does it later after the thread
  153. * is joined.
  154. */
  155. bool is_managed_thread = wrapper.thread_copy.detach_state == AWS_THREAD_MANAGED;
  156. if (!is_managed_thread) {
  157. s_thread_wrapper_destroy(wrapper_ptr);
  158. wrapper_ptr = NULL;
  159. }
  160. struct thread_atexit_callback *exit_callback_data = wrapper.atexit;
  161. while (exit_callback_data) {
  162. aws_thread_atexit_fn *exit_callback = exit_callback_data->callback;
  163. void *exit_callback_user_data = exit_callback_data->user_data;
  164. struct thread_atexit_callback *next_exit_callback_data = exit_callback_data->next;
  165. aws_mem_release(allocator, exit_callback_data);
  166. exit_callback(exit_callback_user_data);
  167. exit_callback_data = next_exit_callback_data;
  168. }
  169. tl_wrapper = NULL;
  170. /*
  171. * Release this thread to the managed thread system for lazy join.
  172. */
  173. if (is_managed_thread) {
  174. aws_thread_pending_join_add(&wrapper_ptr->node);
  175. }
  176. return NULL;
  177. }
  178. const struct aws_thread_options *aws_default_thread_options(void) {
  179. return &s_default_options;
  180. }
  181. void aws_thread_clean_up(struct aws_thread *thread) {
  182. if (thread->detach_state == AWS_THREAD_JOINABLE) {
  183. pthread_detach(thread->thread_id);
  184. }
  185. }
  186. static void s_call_once(void) {
  187. tl_wrapper->call_once(tl_wrapper->once_arg);
  188. }
  189. void aws_thread_call_once(aws_thread_once *flag, void (*call_once)(void *), void *user_data) {
  190. // If this is a non-aws_thread, then gin up a temp thread wrapper
  191. struct thread_wrapper temp_wrapper;
  192. if (!tl_wrapper) {
  193. tl_wrapper = &temp_wrapper;
  194. }
  195. tl_wrapper->call_once = call_once;
  196. tl_wrapper->once_arg = user_data;
  197. pthread_once(flag, s_call_once);
  198. if (tl_wrapper == &temp_wrapper) {
  199. tl_wrapper = NULL;
  200. }
  201. }
  202. int aws_thread_init(struct aws_thread *thread, struct aws_allocator *allocator) {
  203. *thread = (struct aws_thread){.allocator = allocator, .detach_state = AWS_THREAD_NOT_CREATED};
  204. return AWS_OP_SUCCESS;
  205. }
  206. int aws_thread_launch(
  207. struct aws_thread *thread,
  208. void (*func)(void *arg),
  209. void *arg,
  210. const struct aws_thread_options *options) {
  211. pthread_attr_t attributes;
  212. pthread_attr_t *attributes_ptr = NULL;
  213. int attr_return = 0;
  214. struct thread_wrapper *wrapper = NULL;
  215. bool is_managed_thread = options != NULL && options->join_strategy == AWS_TJS_MANAGED;
  216. if (is_managed_thread) {
  217. thread->detach_state = AWS_THREAD_MANAGED;
  218. }
  219. if (options) {
  220. attr_return = pthread_attr_init(&attributes);
  221. if (attr_return) {
  222. goto cleanup;
  223. }
  224. attributes_ptr = &attributes;
  225. if (options->stack_size > PTHREAD_STACK_MIN) {
  226. attr_return = pthread_attr_setstacksize(attributes_ptr, options->stack_size);
  227. if (attr_return) {
  228. goto cleanup;
  229. }
  230. }
  231. /* AFAIK you can't set thread affinity on apple platforms, and it doesn't really matter since all memory
  232. * NUMA or not is setup in interleave mode.
  233. * Thread affinity is also not supported on Android systems, and honestly, if you're running android on a NUMA
  234. * configuration, you've got bigger problems. */
  235. #if AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD_ATTR
  236. if (options->cpu_id >= 0) {
  237. AWS_LOGF_INFO(
  238. AWS_LS_COMMON_THREAD,
  239. "id=%p: cpu affinity of cpu_id %d was specified, attempting to honor the value.",
  240. (void *)thread,
  241. options->cpu_id);
  242. cpu_set_t cpuset;
  243. CPU_ZERO(&cpuset);
  244. CPU_SET((uint32_t)options->cpu_id, &cpuset);
  245. attr_return = pthread_attr_setaffinity_np(attributes_ptr, sizeof(cpuset), &cpuset);
  246. if (attr_return) {
  247. AWS_LOGF_ERROR(
  248. AWS_LS_COMMON_THREAD,
  249. "id=%p: pthread_attr_setaffinity_np() failed with %d.",
  250. (void *)thread,
  251. attr_return);
  252. goto cleanup;
  253. }
  254. }
  255. #endif /* AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD_ATTR */
  256. }
  257. wrapper = aws_mem_calloc(thread->allocator, 1, sizeof(struct thread_wrapper));
  258. if (options) {
  259. if (options->cpu_id >= 0) {
  260. wrapper->membind = true;
  261. }
  262. if (options->name.len > 0) {
  263. wrapper->name = aws_string_new_from_cursor(thread->allocator, &options->name);
  264. }
  265. }
  266. wrapper->thread_copy = *thread;
  267. wrapper->allocator = thread->allocator;
  268. wrapper->func = func;
  269. wrapper->arg = arg;
  270. /*
  271. * Increment the count prior to spawning the thread. Decrement back if the create failed.
  272. */
  273. if (is_managed_thread) {
  274. aws_thread_increment_unjoined_count();
  275. }
  276. attr_return = pthread_create(&thread->thread_id, attributes_ptr, thread_fn, (void *)wrapper);
  277. if (attr_return) {
  278. AWS_LOGF_ERROR(AWS_LS_COMMON_THREAD, "id=%p: pthread_create() failed with %d", (void *)thread, attr_return);
  279. if (is_managed_thread) {
  280. aws_thread_decrement_unjoined_count();
  281. }
  282. goto cleanup;
  283. }
  284. #if AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD
  285. /* If we don't have pthread_attr_setaffinity_np, we may
  286. * still be able to set the thread affinity after creation. */
  287. if (options && options->cpu_id >= 0) {
  288. AWS_LOGF_INFO(
  289. AWS_LS_COMMON_THREAD,
  290. "id=%p: cpu affinity of cpu_id %d was specified, attempting to honor the value.",
  291. (void *)thread,
  292. options->cpu_id);
  293. cpu_set_t cpuset;
  294. CPU_ZERO(&cpuset);
  295. CPU_SET((uint32_t)options->cpu_id, &cpuset);
  296. /* If this fails, just warn. We can't fail anymore, the thread has already launched. */
  297. int setaffinity_return = pthread_setaffinity_np(thread->thread_id, sizeof(cpuset), &cpuset);
  298. if (setaffinity_return) {
  299. AWS_LOGF_WARN(
  300. AWS_LS_COMMON_THREAD,
  301. "id=%p: pthread_setaffinity_np() failed with %d. Running thread without CPU affinity.",
  302. (void *)thread,
  303. setaffinity_return);
  304. }
  305. }
  306. #endif /* AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD */
  307. /*
  308. * Managed threads need to stay unjoinable from an external perspective. We'll handle it after thread function
  309. * completion.
  310. */
  311. if (is_managed_thread) {
  312. aws_thread_clean_up(thread);
  313. } else {
  314. thread->detach_state = AWS_THREAD_JOINABLE;
  315. }
  316. cleanup:
  317. if (attributes_ptr) {
  318. pthread_attr_destroy(attributes_ptr);
  319. }
  320. if (attr_return) {
  321. s_thread_wrapper_destroy(wrapper);
  322. switch (attr_return) {
  323. case EINVAL:
  324. return aws_raise_error(AWS_ERROR_THREAD_INVALID_SETTINGS);
  325. case EAGAIN:
  326. return aws_raise_error(AWS_ERROR_THREAD_INSUFFICIENT_RESOURCE);
  327. case EPERM:
  328. return aws_raise_error(AWS_ERROR_THREAD_NO_PERMISSIONS);
  329. case ENOMEM:
  330. return aws_raise_error(AWS_ERROR_OOM);
  331. default:
  332. return aws_raise_error(AWS_ERROR_UNKNOWN);
  333. }
  334. }
  335. return AWS_OP_SUCCESS;
  336. }
  337. aws_thread_id_t aws_thread_get_id(struct aws_thread *thread) {
  338. return thread->thread_id;
  339. }
  340. enum aws_thread_detach_state aws_thread_get_detach_state(struct aws_thread *thread) {
  341. return thread->detach_state;
  342. }
  343. int aws_thread_join(struct aws_thread *thread) {
  344. if (thread->detach_state == AWS_THREAD_JOINABLE) {
  345. int err_no = pthread_join(thread->thread_id, 0);
  346. if (err_no) {
  347. if (err_no == EINVAL) {
  348. return aws_raise_error(AWS_ERROR_THREAD_NOT_JOINABLE);
  349. }
  350. if (err_no == ESRCH) {
  351. return aws_raise_error(AWS_ERROR_THREAD_NO_SUCH_THREAD_ID);
  352. }
  353. if (err_no == EDEADLK) {
  354. return aws_raise_error(AWS_ERROR_THREAD_DEADLOCK_DETECTED);
  355. }
  356. }
  357. thread->detach_state = AWS_THREAD_JOIN_COMPLETED;
  358. }
  359. return AWS_OP_SUCCESS;
  360. }
  361. aws_thread_id_t aws_thread_current_thread_id(void) {
  362. return pthread_self();
  363. }
  364. bool aws_thread_thread_id_equal(aws_thread_id_t t1, aws_thread_id_t t2) {
  365. return pthread_equal(t1, t2) != 0;
  366. }
  367. void aws_thread_current_sleep(uint64_t nanos) {
  368. uint64_t nano = 0;
  369. time_t seconds = (time_t)aws_timestamp_convert(nanos, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &nano);
  370. struct timespec tm = {
  371. .tv_sec = seconds,
  372. .tv_nsec = (long)nano,
  373. };
  374. struct timespec output;
  375. nanosleep(&tm, &output);
  376. }
  377. int aws_thread_current_at_exit(aws_thread_atexit_fn *callback, void *user_data) {
  378. if (!tl_wrapper) {
  379. return aws_raise_error(AWS_ERROR_THREAD_NOT_JOINABLE);
  380. }
  381. struct thread_atexit_callback *cb = aws_mem_calloc(tl_wrapper->allocator, 1, sizeof(struct thread_atexit_callback));
  382. if (!cb) {
  383. return AWS_OP_ERR;
  384. }
  385. cb->callback = callback;
  386. cb->user_data = user_data;
  387. cb->next = tl_wrapper->atexit;
  388. tl_wrapper->atexit = cb;
  389. return AWS_OP_SUCCESS;
  390. }
  391. int aws_thread_current_name(struct aws_allocator *allocator, struct aws_string **out_name) {
  392. return aws_thread_name(allocator, aws_thread_current_thread_id(), out_name);
  393. }
  394. #define THREAD_NAME_BUFFER_SIZE 256
  395. int aws_thread_name(struct aws_allocator *allocator, aws_thread_id_t thread_id, struct aws_string **out_name) {
  396. *out_name = NULL;
  397. #if defined(AWS_PTHREAD_GETNAME_TAKES_2ARGS) || defined(AWS_PTHREAD_GETNAME_TAKES_3ARGS) || \
  398. defined(AWS_PTHREAD_GET_NAME_TAKES_2_ARGS)
  399. char name[THREAD_NAME_BUFFER_SIZE] = {0};
  400. # ifdef AWS_PTHREAD_GETNAME_TAKES_3ARGS
  401. if (pthread_getname_np(thread_id, name, THREAD_NAME_BUFFER_SIZE)) {
  402. # elif AWS_PTHREAD_GETNAME_TAKES_2ARGS
  403. if (pthread_getname_np(thread_id, name)) {
  404. # elif AWS_PTHREAD_GET_NAME_TAKES_2ARGS
  405. if (pthread_get_name_np(thread_id, name)) {
  406. # endif
  407. return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  408. }
  409. *out_name = aws_string_new_from_c_str(allocator, name);
  410. return AWS_OP_SUCCESS;
  411. #else
  412. return aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
  413. #endif
  414. }