sem.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. #include "sem.h"
  2. #ifdef _win_
  3. #include <malloc.h>
  4. #elif defined(_sun)
  5. #include <alloca.h>
  6. #endif
  7. #include <cstring>
  8. #ifdef _win_
  9. #include "winint.h"
  10. #else
  11. #include <semaphore.h>
  12. #if defined(_bionic_) || defined(_darwin_) && defined(_arm_)
  13. #include <fcntl.h>
  14. #else
  15. #define USE_SYSV_SEMAPHORES // unixoids declared the standard but not implemented it...
  16. #endif
  17. #endif
  18. #ifdef USE_SYSV_SEMAPHORES
  19. #include <errno.h>
  20. #include <sys/types.h>
  21. #include <sys/ipc.h>
  22. #include <sys/sem.h>
  23. #if defined(_linux_) || defined(_sun_) || defined(_cygwin_)
  24. union semun {
  25. int val;
  26. struct semid_ds* buf;
  27. unsigned short* array;
  28. } arg;
  29. #else
  30. union semun arg;
  31. #endif
  32. #endif
  33. #include <util/digest/city.h>
  34. #include <util/string/cast.h>
  35. #include <util/random/fast.h>
  36. #if !defined(_unix_) || defined(_darwin_)
  37. #include <util/random/random.h>
  38. #endif
  39. namespace {
  40. class TSemaphoreImpl {
  41. private:
  42. #ifdef _win_
  43. using SEMHANDLE = HANDLE;
  44. #else
  45. #ifdef USE_SYSV_SEMAPHORES
  46. using SEMHANDLE = int;
  47. #else
  48. using SEMHANDLE = sem_t*;
  49. #endif
  50. #endif
  51. SEMHANDLE Handle;
  52. public:
  53. inline TSemaphoreImpl(const char* name, ui32 max_free_count)
  54. : Handle(0)
  55. {
  56. #ifdef _win_
  57. char* key = (char*)name;
  58. if (name) {
  59. size_t len = strlen(name);
  60. key = (char*)alloca(len + 1);
  61. strcpy(key, name);
  62. if (len > MAX_PATH)
  63. *(key + MAX_PATH) = 0;
  64. char* p = key;
  65. while (*p) {
  66. if (*p == '\\')
  67. *p = '/';
  68. ++p;
  69. }
  70. }
  71. // non-blocking on init
  72. Handle = ::CreateSemaphore(0, max_free_count, max_free_count, key);
  73. #else
  74. #ifdef USE_SYSV_SEMAPHORES
  75. key_t key = TPCGMixer::Mix(CityHash64(name, strlen(name))); // 32 bit hash
  76. Handle = semget(key, 0, 0); // try to open exist semaphore
  77. if (Handle == -1) { // create new semaphore
  78. Handle = semget(key, 1, 0666 | IPC_CREAT);
  79. if (Handle != -1) {
  80. union semun arg;
  81. arg.val = max_free_count;
  82. semctl(Handle, 0, SETVAL, arg);
  83. } else {
  84. ythrow TSystemError() << "can not init sempahore";
  85. }
  86. }
  87. #else
  88. Handle = sem_open(name, O_CREAT, 0666, max_free_count);
  89. if (Handle == SEM_FAILED) {
  90. ythrow TSystemError() << "can not init sempahore";
  91. }
  92. #endif
  93. #endif
  94. }
  95. inline ~TSemaphoreImpl() {
  96. #ifdef _win_
  97. ::CloseHandle(Handle);
  98. #else
  99. #ifdef USE_SYSV_SEMAPHORES
  100. // we DO NOT want 'semctl(Handle, 0, IPC_RMID)' for multiprocess tasks;
  101. // struct sembuf ops[] = {{0, 0, IPC_NOWAIT}};
  102. // if (semop(Handle, ops, 1) != 0) // close only if semaphore's value is zero
  103. // semctl(Handle, 0, IPC_RMID);
  104. #else
  105. sem_close(Handle); // we DO NOT want sem_unlink(...)
  106. #endif
  107. #endif
  108. }
  109. inline void Release() noexcept {
  110. #ifdef _win_
  111. ::ReleaseSemaphore(Handle, 1, 0);
  112. #else
  113. #ifdef USE_SYSV_SEMAPHORES
  114. struct sembuf ops[] = {{0, 1, SEM_UNDO}};
  115. int ret = semop(Handle, ops, 1);
  116. #else
  117. int ret = sem_post(Handle);
  118. #endif
  119. Y_ABORT_UNLESS(ret == 0, "can not release semaphore");
  120. #endif
  121. }
  122. // The UNIX semaphore object does not support a timed "wait", and
  123. // hence to maintain consistancy, for win32 case we use INFINITE or 0 timeout.
  124. inline void Acquire() noexcept {
  125. #ifdef _win_
  126. Y_ABORT_UNLESS(::WaitForSingleObject(Handle, INFINITE) == WAIT_OBJECT_0, "can not acquire semaphore");
  127. #else
  128. #ifdef USE_SYSV_SEMAPHORES
  129. struct sembuf ops[] = {{0, -1, SEM_UNDO}};
  130. int ret = semop(Handle, ops, 1);
  131. #else
  132. int ret = sem_wait(Handle);
  133. #endif
  134. Y_ABORT_UNLESS(ret == 0, "can not acquire semaphore");
  135. #endif
  136. }
  137. inline bool TryAcquire() noexcept {
  138. #ifdef _win_
  139. // zero-second time-out interval
  140. // WAIT_OBJECT_0: current free count > 0
  141. // WAIT_TIMEOUT: current free count == 0
  142. return ::WaitForSingleObject(Handle, 0) == WAIT_OBJECT_0;
  143. #else
  144. #ifdef USE_SYSV_SEMAPHORES
  145. struct sembuf ops[] = {{0, -1, SEM_UNDO | IPC_NOWAIT}};
  146. int ret = semop(Handle, ops, 1);
  147. #else
  148. int ret = sem_trywait(Handle);
  149. #endif
  150. return ret == 0;
  151. #endif
  152. }
  153. };
  154. #if defined(_unix_)
  155. /*
  156. Disable errors/warnings about deprecated sem_* in Darwin
  157. */
  158. #ifdef _darwin_
  159. Y_PRAGMA_DIAGNOSTIC_PUSH
  160. Y_PRAGMA_NO_DEPRECATED
  161. #endif
  162. struct TPosixSemaphore {
  163. inline TPosixSemaphore(ui32 maxFreeCount) {
  164. if (sem_init(&S_, 0, maxFreeCount)) {
  165. ythrow TSystemError() << "can not init semaphore";
  166. }
  167. }
  168. inline ~TPosixSemaphore() {
  169. Y_ABORT_UNLESS(sem_destroy(&S_) == 0, "semaphore destroy failed");
  170. }
  171. inline void Acquire() noexcept {
  172. Y_ABORT_UNLESS(sem_wait(&S_) == 0, "semaphore acquire failed");
  173. }
  174. inline void Release() noexcept {
  175. Y_ABORT_UNLESS(sem_post(&S_) == 0, "semaphore release failed");
  176. }
  177. inline bool TryAcquire() noexcept {
  178. if (sem_trywait(&S_)) {
  179. Y_ABORT_UNLESS(errno == EAGAIN, "semaphore try wait failed");
  180. return false;
  181. }
  182. return true;
  183. }
  184. sem_t S_;
  185. };
  186. #ifdef _darwin_
  187. Y_PRAGMA_DIAGNOSTIC_POP
  188. #endif
  189. #endif
  190. } // namespace
  191. class TSemaphore::TImpl: public TSemaphoreImpl {
  192. public:
  193. inline TImpl(const char* name, ui32 maxFreeCount)
  194. : TSemaphoreImpl(name, maxFreeCount)
  195. {
  196. }
  197. };
  198. TSemaphore::TSemaphore(const char* name, ui32 maxFreeCount)
  199. : Impl_(new TImpl(name, maxFreeCount))
  200. {
  201. }
  202. TSemaphore::~TSemaphore() = default;
  203. void TSemaphore::Release() noexcept {
  204. Impl_->Release();
  205. }
  206. void TSemaphore::Acquire() noexcept {
  207. Impl_->Acquire();
  208. }
  209. bool TSemaphore::TryAcquire() noexcept {
  210. return Impl_->TryAcquire();
  211. }
  212. #if defined(_unix_) && !defined(_darwin_)
  213. class TFastSemaphore::TImpl: public TPosixSemaphore {
  214. public:
  215. inline TImpl(ui32 n)
  216. : TPosixSemaphore(n)
  217. {
  218. }
  219. };
  220. #else
  221. class TFastSemaphore::TImpl: public TString, public TSemaphoreImpl {
  222. public:
  223. inline TImpl(ui32 n)
  224. : TString(ToString(RandomNumber<ui64>()))
  225. , TSemaphoreImpl(c_str(), n)
  226. {
  227. }
  228. };
  229. #endif
  230. TFastSemaphore::TFastSemaphore(ui32 maxFreeCount)
  231. : Impl_(new TImpl(maxFreeCount))
  232. {
  233. }
  234. TFastSemaphore::~TFastSemaphore() = default;
  235. void TFastSemaphore::Release() noexcept {
  236. Impl_->Release();
  237. }
  238. void TFastSemaphore::Acquire() noexcept {
  239. Impl_->Acquire();
  240. }
  241. bool TFastSemaphore::TryAcquire() noexcept {
  242. return Impl_->TryAcquire();
  243. }