fileio_asyncio.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. /*
  2. * Copyright (c) Meta Platforms, Inc. and affiliates.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. * You may select, at your option, one of the above-listed licenses.
  9. */
  10. #include "platform.h"
  11. #include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
  12. #include <stdlib.h> /* malloc, free */
  13. #include <assert.h>
  14. #include <errno.h> /* errno */
  15. #if defined (_MSC_VER)
  16. # include <sys/stat.h>
  17. # include <io.h>
  18. #endif
  19. #include "fileio_asyncio.h"
  20. #include "fileio_common.h"
  21. /* **********************************************************************
  22. * Sparse write
  23. ************************************************************************/
  24. /** AIO_fwriteSparse() :
  25. * @return : storedSkips,
  26. * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
  27. static unsigned
  28. AIO_fwriteSparse(FILE* file,
  29. const void* buffer, size_t bufferSize,
  30. const FIO_prefs_t* const prefs,
  31. unsigned storedSkips)
  32. {
  33. const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
  34. size_t bufferSizeT = bufferSize / sizeof(size_t);
  35. const size_t* const bufferTEnd = bufferT + bufferSizeT;
  36. const size_t* ptrT = bufferT;
  37. static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
  38. if (prefs->testMode) return 0; /* do not output anything in test mode */
  39. if (!prefs->sparseFileSupport) { /* normal write */
  40. size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
  41. if (sizeCheck != bufferSize)
  42. EXM_THROW(70, "Write error : cannot write block : %s",
  43. strerror(errno));
  44. return 0;
  45. }
  46. /* avoid int overflow */
  47. if (storedSkips > 1 GB) {
  48. if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
  49. EXM_THROW(91, "1 GB skip error (sparse file support)");
  50. storedSkips -= 1 GB;
  51. }
  52. while (ptrT < bufferTEnd) {
  53. size_t nb0T;
  54. /* adjust last segment if < 32 KB */
  55. size_t seg0SizeT = segmentSizeT;
  56. if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
  57. bufferSizeT -= seg0SizeT;
  58. /* count leading zeroes */
  59. for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
  60. storedSkips += (unsigned)(nb0T * sizeof(size_t));
  61. if (nb0T != seg0SizeT) { /* not all 0s */
  62. size_t const nbNon0ST = seg0SizeT - nb0T;
  63. /* skip leading zeros */
  64. if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
  65. EXM_THROW(92, "Sparse skip error ; try --no-sparse");
  66. storedSkips = 0;
  67. /* write the rest */
  68. if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
  69. EXM_THROW(93, "Write error : cannot write block : %s",
  70. strerror(errno));
  71. }
  72. ptrT += seg0SizeT;
  73. }
  74. { static size_t const maskT = sizeof(size_t)-1;
  75. if (bufferSize & maskT) {
  76. /* size not multiple of sizeof(size_t) : implies end of block */
  77. const char* const restStart = (const char*)bufferTEnd;
  78. const char* restPtr = restStart;
  79. const char* const restEnd = (const char*)buffer + bufferSize;
  80. assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
  81. for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
  82. storedSkips += (unsigned) (restPtr - restStart);
  83. if (restPtr != restEnd) {
  84. /* not all remaining bytes are 0 */
  85. size_t const restSize = (size_t)(restEnd - restPtr);
  86. if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
  87. EXM_THROW(92, "Sparse skip error ; try --no-sparse");
  88. if (fwrite(restPtr, 1, restSize, file) != restSize)
  89. EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
  90. strerror(errno));
  91. storedSkips = 0;
  92. } } }
  93. return storedSkips;
  94. }
  95. static void
  96. AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
  97. {
  98. if (prefs->testMode) assert(storedSkips == 0);
  99. if (storedSkips>0) {
  100. assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
  101. (void)prefs; /* assert can be disabled, in which case prefs becomes unused */
  102. if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
  103. EXM_THROW(69, "Final skip error (sparse file support)");
  104. /* last zero must be explicitly written,
  105. * so that skipped ones get implicitly translated as zero by FS */
  106. { const char lastZeroByte[1] = { 0 };
  107. if (fwrite(lastZeroByte, 1, 1, file) != 1)
  108. EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
  109. } }
  110. }
  111. /* **********************************************************************
  112. * AsyncIO functionality
  113. ************************************************************************/
  114. /* AIO_supported:
  115. * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
  116. int AIO_supported(void) {
  117. #ifdef ZSTD_MULTITHREAD
  118. return 1;
  119. #else
  120. return 0;
  121. #endif
  122. }
  123. /* ***********************************
  124. * Generic IoPool implementation
  125. *************************************/
  126. static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
  127. IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
  128. void* const buffer = malloc(bufferSize);
  129. if(!job || !buffer)
  130. EXM_THROW(101, "Allocation error : not enough memory");
  131. job->buffer = buffer;
  132. job->bufferSize = bufferSize;
  133. job->usedBufferSize = 0;
  134. job->file = NULL;
  135. job->ctx = ctx;
  136. job->offset = 0;
  137. return job;
  138. }
  139. /* AIO_IOPool_createThreadPool:
  140. * Creates a thread pool and a mutex for threaded IO pool.
  141. * Displays warning if asyncio is requested but MT isn't available. */
  142. static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
  143. ctx->threadPool = NULL;
  144. ctx->threadPoolActive = 0;
  145. if(prefs->asyncIO) {
  146. if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
  147. EXM_THROW(102,"Failed creating ioJobsMutex mutex");
  148. /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
  149. * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
  150. assert(MAX_IO_JOBS >= 2);
  151. ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
  152. ctx->threadPoolActive = 1;
  153. if (!ctx->threadPool)
  154. EXM_THROW(104, "Failed creating I/O thread pool");
  155. }
  156. }
  157. /* AIO_IOPool_init:
  158. * Allocates and sets and a new I/O thread pool including its included availableJobs. */
  159. static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
  160. int i;
  161. AIO_IOPool_createThreadPool(ctx, prefs);
  162. ctx->prefs = prefs;
  163. ctx->poolFunction = poolFunction;
  164. ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
  165. ctx->availableJobsCount = ctx->totalIoJobs;
  166. for(i=0; i < ctx->availableJobsCount; i++) {
  167. ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
  168. }
  169. ctx->jobBufferSize = bufferSize;
  170. ctx->file = NULL;
  171. }
  172. /* AIO_IOPool_threadPoolActive:
  173. * Check if current operation uses thread pool.
  174. * Note that in some cases we have a thread pool initialized but choose not to use it. */
  175. static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
  176. return ctx->threadPool && ctx->threadPoolActive;
  177. }
  178. /* AIO_IOPool_lockJobsMutex:
  179. * Locks the IO jobs mutex if threading is active */
  180. static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
  181. if(AIO_IOPool_threadPoolActive(ctx))
  182. ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
  183. }
  184. /* AIO_IOPool_unlockJobsMutex:
  185. * Unlocks the IO jobs mutex if threading is active */
  186. static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
  187. if(AIO_IOPool_threadPoolActive(ctx))
  188. ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
  189. }
  190. /* AIO_IOPool_releaseIoJob:
  191. * Releases an acquired job back to the pool. Doesn't execute the job. */
  192. static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
  193. IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
  194. AIO_IOPool_lockJobsMutex(ctx);
  195. assert(ctx->availableJobsCount < ctx->totalIoJobs);
  196. ctx->availableJobs[ctx->availableJobsCount++] = job;
  197. AIO_IOPool_unlockJobsMutex(ctx);
  198. }
  199. /* AIO_IOPool_join:
  200. * Waits for all tasks in the pool to finish executing. */
  201. static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
  202. if(AIO_IOPool_threadPoolActive(ctx))
  203. POOL_joinJobs(ctx->threadPool);
  204. }
  205. /* AIO_IOPool_setThreaded:
  206. * Allows (de)activating threaded mode, to be used when the expected overhead
  207. * of threading costs more than the expected gains. */
  208. static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
  209. assert(threaded == 0 || threaded == 1);
  210. assert(ctx != NULL);
  211. if(ctx->threadPoolActive != threaded) {
  212. AIO_IOPool_join(ctx);
  213. ctx->threadPoolActive = threaded;
  214. }
  215. }
  216. /* AIO_IOPool_free:
  217. * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
  218. static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
  219. int i;
  220. if(ctx->threadPool) {
  221. /* Make sure we finish all tasks and then free the resources */
  222. AIO_IOPool_join(ctx);
  223. /* Make sure we are not leaking availableJobs */
  224. assert(ctx->availableJobsCount == ctx->totalIoJobs);
  225. POOL_free(ctx->threadPool);
  226. ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
  227. }
  228. assert(ctx->file == NULL);
  229. for(i=0; i<ctx->availableJobsCount; i++) {
  230. IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
  231. free(job->buffer);
  232. free(job);
  233. }
  234. }
  235. /* AIO_IOPool_acquireJob:
  236. * Returns an available io job to be used for a future io. */
  237. static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
  238. IOJob_t *job;
  239. assert(ctx->file != NULL || ctx->prefs->testMode);
  240. AIO_IOPool_lockJobsMutex(ctx);
  241. assert(ctx->availableJobsCount > 0);
  242. job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
  243. AIO_IOPool_unlockJobsMutex(ctx);
  244. job->usedBufferSize = 0;
  245. job->file = ctx->file;
  246. job->offset = 0;
  247. return job;
  248. }
  249. /* AIO_IOPool_setFile:
  250. * Sets the destination file for future files in the pool.
  251. * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
  252. static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
  253. assert(ctx!=NULL);
  254. AIO_IOPool_join(ctx);
  255. assert(ctx->availableJobsCount == ctx->totalIoJobs);
  256. ctx->file = file;
  257. }
  258. static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
  259. return ctx->file;
  260. }
  261. /* AIO_IOPool_enqueueJob:
  262. * Enqueues an io job for execution.
  263. * The queued job shouldn't be used directly after queueing it. */
  264. static void AIO_IOPool_enqueueJob(IOJob_t* job) {
  265. IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
  266. if(AIO_IOPool_threadPoolActive(ctx))
  267. POOL_add(ctx->threadPool, ctx->poolFunction, job);
  268. else
  269. ctx->poolFunction(job);
  270. }
  271. /* ***********************************
  272. * WritePool implementation
  273. *************************************/
  274. /* AIO_WritePool_acquireJob:
  275. * Returns an available write job to be used for a future write. */
  276. IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
  277. return AIO_IOPool_acquireJob(&ctx->base);
  278. }
  279. /* AIO_WritePool_enqueueAndReacquireWriteJob:
  280. * Queues a write job for execution and acquires a new one.
  281. * After execution `job`'s pointed value would change to the newly acquired job.
  282. * Make sure to set `usedBufferSize` to the wanted length before call.
  283. * The queued job shouldn't be used directly after queueing it. */
  284. void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
  285. AIO_IOPool_enqueueJob(*job);
  286. *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
  287. }
  288. /* AIO_WritePool_sparseWriteEnd:
  289. * Ends sparse writes to the current file.
  290. * Blocks on completion of all current write jobs before executing. */
  291. void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
  292. assert(ctx != NULL);
  293. AIO_IOPool_join(&ctx->base);
  294. AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
  295. ctx->storedSkips = 0;
  296. }
  297. /* AIO_WritePool_setFile:
  298. * Sets the destination file for future writes in the pool.
  299. * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
  300. * Also requires ending of sparse write if a previous file was used in sparse mode. */
  301. void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
  302. AIO_IOPool_setFile(&ctx->base, file);
  303. assert(ctx->storedSkips == 0);
  304. }
  305. /* AIO_WritePool_getFile:
  306. * Returns the file the writePool is currently set to write to. */
  307. FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
  308. return AIO_IOPool_getFile(&ctx->base);
  309. }
  310. /* AIO_WritePool_releaseIoJob:
  311. * Releases an acquired job back to the pool. Doesn't execute the job. */
  312. void AIO_WritePool_releaseIoJob(IOJob_t* job) {
  313. AIO_IOPool_releaseIoJob(job);
  314. }
  315. /* AIO_WritePool_closeFile:
  316. * Ends sparse write and closes the writePool's current file and sets the file to NULL.
  317. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
  318. int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
  319. FILE* const dstFile = ctx->base.file;
  320. assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
  321. AIO_WritePool_sparseWriteEnd(ctx);
  322. AIO_IOPool_setFile(&ctx->base, NULL);
  323. return fclose(dstFile);
  324. }
  325. /* AIO_WritePool_executeWriteJob:
  326. * Executes a write job synchronously. Can be used as a function for a thread pool. */
  327. static void AIO_WritePool_executeWriteJob(void* opaque){
  328. IOJob_t* const job = (IOJob_t*) opaque;
  329. WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
  330. ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
  331. AIO_IOPool_releaseIoJob(job);
  332. }
  333. /* AIO_WritePool_create:
  334. * Allocates and sets and a new write pool including its included jobs. */
  335. WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
  336. WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
  337. if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
  338. AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
  339. ctx->storedSkips = 0;
  340. return ctx;
  341. }
  342. /* AIO_WritePool_free:
  343. * Frees and releases a writePool and its resources. Closes destination file if needs to. */
  344. void AIO_WritePool_free(WritePoolCtx_t* ctx) {
  345. /* Make sure we finish all tasks and then free the resources */
  346. if(AIO_WritePool_getFile(ctx))
  347. AIO_WritePool_closeFile(ctx);
  348. AIO_IOPool_destroy(&ctx->base);
  349. assert(ctx->storedSkips==0);
  350. free(ctx);
  351. }
  352. /* AIO_WritePool_setAsync:
  353. * Allows (de)activating async mode, to be used when the expected overhead
  354. * of asyncio costs more than the expected gains. */
  355. void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
  356. AIO_IOPool_setThreaded(&ctx->base, async);
  357. }
  358. /* ***********************************
  359. * ReadPool implementation
  360. *************************************/
  361. static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
  362. int i;
  363. for(i=0; i<ctx->completedJobsCount; i++) {
  364. IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
  365. AIO_IOPool_releaseIoJob(job);
  366. }
  367. ctx->completedJobsCount = 0;
  368. }
  369. static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
  370. ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
  371. AIO_IOPool_lockJobsMutex(&ctx->base);
  372. assert(ctx->completedJobsCount < MAX_IO_JOBS);
  373. ctx->completedJobs[ctx->completedJobsCount++] = job;
  374. if(AIO_IOPool_threadPoolActive(&ctx->base)) {
  375. ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
  376. }
  377. AIO_IOPool_unlockJobsMutex(&ctx->base);
  378. }
  379. /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
  380. * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
  381. * if job wasn't found returns NULL.
  382. * IMPORTANT: assumes ioJobsMutex is locked. */
  383. static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
  384. IOJob_t *job = NULL;
  385. int i;
  386. /* This implementation goes through all completed jobs and looks for the one matching the next offset.
  387. * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
  388. * reads to be completed in order) this implementation was chosen as it better fits other asyncio
  389. * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
  390. for (i=0; i<ctx->completedJobsCount; i++) {
  391. job = (IOJob_t *) ctx->completedJobs[i];
  392. if (job->offset == ctx->waitingOnOffset) {
  393. ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
  394. return job;
  395. }
  396. }
  397. return NULL;
  398. }
  399. /* AIO_ReadPool_numReadsInFlight:
  400. * Returns the number of IO read jobs currently in flight. */
  401. static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
  402. const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
  403. return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
  404. }
  405. /* AIO_ReadPool_getNextCompletedJob:
  406. * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
  407. * Would block. */
  408. static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
  409. IOJob_t *job = NULL;
  410. AIO_IOPool_lockJobsMutex(&ctx->base);
  411. job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
  412. /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
  413. while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
  414. assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
  415. ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
  416. job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
  417. }
  418. if(job) {
  419. assert(job->offset == ctx->waitingOnOffset);
  420. ctx->waitingOnOffset += job->usedBufferSize;
  421. }
  422. AIO_IOPool_unlockJobsMutex(&ctx->base);
  423. return job;
  424. }
  425. /* AIO_ReadPool_executeReadJob:
  426. * Executes a read job synchronously. Can be used as a function for a thread pool. */
  427. static void AIO_ReadPool_executeReadJob(void* opaque){
  428. IOJob_t* const job = (IOJob_t*) opaque;
  429. ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
  430. if(ctx->reachedEof) {
  431. job->usedBufferSize = 0;
  432. AIO_ReadPool_addJobToCompleted(job);
  433. return;
  434. }
  435. job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
  436. if(job->usedBufferSize < job->bufferSize) {
  437. if(ferror(job->file)) {
  438. EXM_THROW(37, "Read error");
  439. } else if(feof(job->file)) {
  440. ctx->reachedEof = 1;
  441. } else {
  442. EXM_THROW(37, "Unexpected short read");
  443. }
  444. }
  445. AIO_ReadPool_addJobToCompleted(job);
  446. }
  447. static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
  448. IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
  449. job->offset = ctx->nextReadOffset;
  450. ctx->nextReadOffset += job->bufferSize;
  451. AIO_IOPool_enqueueJob(job);
  452. }
  453. static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
  454. while(ctx->base.availableJobsCount) {
  455. AIO_ReadPool_enqueueRead(ctx);
  456. }
  457. }
  458. /* AIO_ReadPool_setFile:
  459. * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
  460. * Waits for all current enqueued tasks to complete if a previous file was set. */
  461. void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
  462. assert(ctx!=NULL);
  463. AIO_IOPool_join(&ctx->base);
  464. AIO_ReadPool_releaseAllCompletedJobs(ctx);
  465. if (ctx->currentJobHeld) {
  466. AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
  467. ctx->currentJobHeld = NULL;
  468. }
  469. AIO_IOPool_setFile(&ctx->base, file);
  470. ctx->nextReadOffset = 0;
  471. ctx->waitingOnOffset = 0;
  472. ctx->srcBuffer = ctx->coalesceBuffer;
  473. ctx->srcBufferLoaded = 0;
  474. ctx->reachedEof = 0;
  475. if(file != NULL)
  476. AIO_ReadPool_startReading(ctx);
  477. }
  478. /* AIO_ReadPool_create:
  479. * Allocates and sets and a new readPool including its included jobs.
  480. * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
  481. * as our basic read size. */
  482. ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
  483. ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
  484. if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
  485. AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
  486. ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
  487. if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
  488. ctx->srcBuffer = ctx->coalesceBuffer;
  489. ctx->srcBufferLoaded = 0;
  490. ctx->completedJobsCount = 0;
  491. ctx->currentJobHeld = NULL;
  492. if(ctx->base.threadPool)
  493. if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
  494. EXM_THROW(103,"Failed creating jobCompletedCond cond");
  495. return ctx;
  496. }
  497. /* AIO_ReadPool_free:
  498. * Frees and releases a readPool and its resources. Closes source file. */
  499. void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
  500. if(AIO_ReadPool_getFile(ctx))
  501. AIO_ReadPool_closeFile(ctx);
  502. if(ctx->base.threadPool)
  503. ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
  504. AIO_IOPool_destroy(&ctx->base);
  505. free(ctx->coalesceBuffer);
  506. free(ctx);
  507. }
  508. /* AIO_ReadPool_consumeBytes:
  509. * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
  510. void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
  511. assert(n <= ctx->srcBufferLoaded);
  512. ctx->srcBufferLoaded -= n;
  513. ctx->srcBuffer += n;
  514. }
  515. /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
  516. * Release the current held job and get the next one, returns NULL if no next job available. */
  517. static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
  518. if (ctx->currentJobHeld) {
  519. AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
  520. ctx->currentJobHeld = NULL;
  521. AIO_ReadPool_enqueueRead(ctx);
  522. }
  523. ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
  524. return (IOJob_t*) ctx->currentJobHeld;
  525. }
  526. /* AIO_ReadPool_fillBuffer:
  527. * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
  528. * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
  529. * Return value is the number of bytes added to the buffer.
  530. * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
  531. size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
  532. IOJob_t *job;
  533. int useCoalesce = 0;
  534. if(n > ctx->base.jobBufferSize)
  535. n = ctx->base.jobBufferSize;
  536. /* We are good, don't read anything */
  537. if (ctx->srcBufferLoaded >= n)
  538. return 0;
  539. /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
  540. * and coalesce the remaining bytes with the next job's buffer */
  541. if (ctx->srcBufferLoaded > 0) {
  542. useCoalesce = 1;
  543. memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
  544. ctx->srcBuffer = ctx->coalesceBuffer;
  545. }
  546. /* Read the next chunk */
  547. job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
  548. if(!job)
  549. return 0;
  550. if(useCoalesce) {
  551. assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
  552. memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
  553. ctx->srcBufferLoaded += job->usedBufferSize;
  554. }
  555. else {
  556. ctx->srcBuffer = (U8 *) job->buffer;
  557. ctx->srcBufferLoaded = job->usedBufferSize;
  558. }
  559. return job->usedBufferSize;
  560. }
  561. /* AIO_ReadPool_consumeAndRefill:
  562. * Consumes the current buffer and refills it with bufferSize bytes. */
  563. size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
  564. AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
  565. return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
  566. }
  567. /* AIO_ReadPool_getFile:
  568. * Returns the current file set for the read pool. */
  569. FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
  570. return AIO_IOPool_getFile(&ctx->base);
  571. }
  572. /* AIO_ReadPool_closeFile:
  573. * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
  574. int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
  575. FILE* const file = AIO_ReadPool_getFile(ctx);
  576. AIO_ReadPool_setFile(ctx, NULL);
  577. return fclose(file);
  578. }
  579. /* AIO_ReadPool_setAsync:
  580. * Allows (de)activating async mode, to be used when the expected overhead
  581. * of asyncio costs more than the expected gains. */
  582. void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
  583. AIO_IOPool_setThreaded(&ctx->base, async);
  584. }