sem.cpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. #include "sem.h"
  2. #ifdef _win_
  3. #include <malloc.h>
  4. #elif defined(_sun)
  5. #include <alloca.h>
  6. #endif
  7. #include <cstring>
  8. #ifdef _win_
  9. #include "winint.h"
  10. #else
  11. #include <semaphore.h>
  12. #if defined(_bionic_) || defined(_darwin_) && defined(_arm_)
  13. #include <fcntl.h>
  14. #else
  15. #define USE_SYSV_SEMAPHORES // unixoids declared the standard but not implemented it...
  16. #endif
  17. #endif
  18. #ifdef USE_SYSV_SEMAPHORES
  19. #include <errno.h>
  20. #include <sys/types.h>
  21. #include <sys/ipc.h>
  22. #include <sys/sem.h>
  23. #if defined(_linux_) || defined(_sun_) || defined(_cygwin_)
  24. union semun {
  25. int val;
  26. struct semid_ds* buf;
  27. unsigned short* array;
  28. } arg;
  29. #else
  30. union semun arg;
  31. #endif
  32. #endif
  33. #include <util/digest/city.h>
  34. #include <util/string/cast.h>
  35. #include <util/random/fast.h>
  36. #if !defined(_unix_) || defined(_darwin_)
  37. #include <util/random/random.h>
  38. #endif
  39. namespace {
  40. class TSemaphoreImpl {
  41. private:
  42. #ifdef _win_
  43. using SEMHANDLE = HANDLE;
  44. #else
  45. #ifdef USE_SYSV_SEMAPHORES
  46. using SEMHANDLE = int;
  47. #else
  48. using SEMHANDLE = sem_t*;
  49. #endif
  50. #endif
  51. SEMHANDLE Handle;
  52. public:
  53. inline TSemaphoreImpl(const char* name, ui32 max_free_count)
  54. : Handle(0)
  55. {
  56. #ifdef _win_
  57. char* key = (char*)name;
  58. if (name) {
  59. size_t len = strlen(name);
  60. key = (char*)alloca(len + 1);
  61. strcpy(key, name);
  62. if (len > MAX_PATH) {
  63. *(key + MAX_PATH) = 0;
  64. }
  65. char* p = key;
  66. while (*p) {
  67. if (*p == '\\') {
  68. *p = '/';
  69. }
  70. ++p;
  71. }
  72. }
  73. // non-blocking on init
  74. Handle = ::CreateSemaphore(0, max_free_count, max_free_count, key);
  75. #else
  76. #ifdef USE_SYSV_SEMAPHORES
  77. key_t key = TPCGMixer::Mix(CityHash64(name, strlen(name))); // 32 bit hash
  78. Handle = semget(key, 0, 0); // try to open exist semaphore
  79. if (Handle == -1) { // create new semaphore
  80. Handle = semget(key, 1, 0666 | IPC_CREAT);
  81. if (Handle != -1) {
  82. union semun arg;
  83. arg.val = max_free_count;
  84. semctl(Handle, 0, SETVAL, arg);
  85. } else {
  86. ythrow TSystemError() << "can not init sempahore";
  87. }
  88. }
  89. #else
  90. Handle = sem_open(name, O_CREAT, 0666, max_free_count);
  91. if (Handle == SEM_FAILED) {
  92. ythrow TSystemError() << "can not init sempahore";
  93. }
  94. #endif
  95. #endif
  96. }
  97. inline ~TSemaphoreImpl() {
  98. #ifdef _win_
  99. ::CloseHandle(Handle);
  100. #else
  101. #ifdef USE_SYSV_SEMAPHORES
  102. // we DO NOT want 'semctl(Handle, 0, IPC_RMID)' for multiprocess tasks;
  103. // struct sembuf ops[] = {{0, 0, IPC_NOWAIT}};
  104. // if (semop(Handle, ops, 1) != 0) // close only if semaphore's value is zero
  105. // semctl(Handle, 0, IPC_RMID);
  106. #else
  107. sem_close(Handle); // we DO NOT want sem_unlink(...)
  108. #endif
  109. #endif
  110. }
  111. inline void Release() noexcept {
  112. #ifdef _win_
  113. ::ReleaseSemaphore(Handle, 1, 0);
  114. #else
  115. #ifdef USE_SYSV_SEMAPHORES
  116. struct sembuf ops[] = {{0, 1, SEM_UNDO}};
  117. int ret = semop(Handle, ops, 1);
  118. #else
  119. int ret = sem_post(Handle);
  120. #endif
  121. Y_ABORT_UNLESS(ret == 0, "can not release semaphore");
  122. #endif
  123. }
  124. // The UNIX semaphore object does not support a timed "wait", and
  125. // hence to maintain consistancy, for win32 case we use INFINITE or 0 timeout.
  126. inline void Acquire() noexcept {
  127. #ifdef _win_
  128. Y_ABORT_UNLESS(::WaitForSingleObject(Handle, INFINITE) == WAIT_OBJECT_0, "can not acquire semaphore");
  129. #else
  130. #ifdef USE_SYSV_SEMAPHORES
  131. struct sembuf ops[] = {{0, -1, SEM_UNDO}};
  132. int ret = semop(Handle, ops, 1);
  133. #else
  134. int ret = sem_wait(Handle);
  135. #endif
  136. Y_ABORT_UNLESS(ret == 0, "can not acquire semaphore");
  137. #endif
  138. }
  139. inline bool TryAcquire() noexcept {
  140. #ifdef _win_
  141. // zero-second time-out interval
  142. // WAIT_OBJECT_0: current free count > 0
  143. // WAIT_TIMEOUT: current free count == 0
  144. return ::WaitForSingleObject(Handle, 0) == WAIT_OBJECT_0;
  145. #else
  146. #ifdef USE_SYSV_SEMAPHORES
  147. struct sembuf ops[] = {{0, -1, SEM_UNDO | IPC_NOWAIT}};
  148. int ret = semop(Handle, ops, 1);
  149. #else
  150. int ret = sem_trywait(Handle);
  151. #endif
  152. return ret == 0;
  153. #endif
  154. }
  155. };
  156. #if defined(_unix_)
  157. /*
  158. Disable errors/warnings about deprecated sem_* in Darwin
  159. */
  160. #ifdef _darwin_
  161. Y_PRAGMA_DIAGNOSTIC_PUSH
  162. Y_PRAGMA_NO_DEPRECATED
  163. #endif
  164. struct TPosixSemaphore {
  165. inline TPosixSemaphore(ui32 maxFreeCount) {
  166. if (sem_init(&S_, 0, maxFreeCount)) {
  167. ythrow TSystemError() << "can not init semaphore";
  168. }
  169. }
  170. inline ~TPosixSemaphore() {
  171. Y_ABORT_UNLESS(sem_destroy(&S_) == 0, "semaphore destroy failed");
  172. }
  173. inline void Acquire() noexcept {
  174. Y_ABORT_UNLESS(sem_wait(&S_) == 0, "semaphore acquire failed");
  175. }
  176. inline void Release() noexcept {
  177. Y_ABORT_UNLESS(sem_post(&S_) == 0, "semaphore release failed");
  178. }
  179. inline bool TryAcquire() noexcept {
  180. if (sem_trywait(&S_)) {
  181. Y_ABORT_UNLESS(errno == EAGAIN, "semaphore try wait failed");
  182. return false;
  183. }
  184. return true;
  185. }
  186. sem_t S_;
  187. };
  188. #ifdef _darwin_
  189. Y_PRAGMA_DIAGNOSTIC_POP
  190. #endif
  191. #endif
  192. } // namespace
  193. class TSemaphore::TImpl: public TSemaphoreImpl {
  194. public:
  195. inline TImpl(const char* name, ui32 maxFreeCount)
  196. : TSemaphoreImpl(name, maxFreeCount)
  197. {
  198. }
  199. };
  200. TSemaphore::TSemaphore(const char* name, ui32 maxFreeCount)
  201. : Impl_(new TImpl(name, maxFreeCount))
  202. {
  203. }
  204. TSemaphore::~TSemaphore() = default;
  205. void TSemaphore::Release() noexcept {
  206. Impl_->Release();
  207. }
  208. void TSemaphore::Acquire() noexcept {
  209. Impl_->Acquire();
  210. }
  211. bool TSemaphore::TryAcquire() noexcept {
  212. return Impl_->TryAcquire();
  213. }
  214. #if defined(_unix_) && !defined(_darwin_)
  215. class TFastSemaphore::TImpl: public TPosixSemaphore {
  216. public:
  217. inline TImpl(ui32 n)
  218. : TPosixSemaphore(n)
  219. {
  220. }
  221. };
  222. #else
  223. class TFastSemaphore::TImpl: public TString, public TSemaphoreImpl {
  224. public:
  225. inline TImpl(ui32 n)
  226. : TString(ToString(RandomNumber<ui64>()))
  227. , TSemaphoreImpl(c_str(), n)
  228. {
  229. }
  230. };
  231. #endif
  232. TFastSemaphore::TFastSemaphore(ui32 maxFreeCount)
  233. : Impl_(new TImpl(maxFreeCount))
  234. {
  235. }
  236. TFastSemaphore::~TFastSemaphore() = default;
  237. void TFastSemaphore::Release() noexcept {
  238. Impl_->Release();
  239. }
  240. void TFastSemaphore::Acquire() noexcept {
  241. Impl_->Acquire();
  242. }
  243. bool TFastSemaphore::TryAcquire() noexcept {
  244. return Impl_->TryAcquire();
  245. }