fileio_asyncio.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /*
  2. * Copyright (c) Meta Platforms, Inc. and affiliates.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. * You may select, at your option, one of the above-listed licenses.
  9. */
  10. /*
  11. * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
  12. * Current implementation relies on having one thread that reads and one that
  13. * writes.
  14. * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
  15. * are performed serially by the appropriate worker thread.
  16. * Most systems exposes better primitives to perform asynchronous IO, such as
  17. * io_uring on newer linux systems. The API is built in such a way that in the
  18. * future we could replace the threads with better solutions when available.
  19. */
  20. #ifndef ZSTD_FILEIO_ASYNCIO_H
  21. #define ZSTD_FILEIO_ASYNCIO_H
  22. #if defined (__cplusplus)
  23. extern "C" {
  24. #endif
  25. #include "../lib/common/mem.h" /* U32, U64 */
  26. #include "fileio_types.h"
  27. #include "platform.h"
  28. #include "util.h"
  29. #include "../lib/common/pool.h"
  30. #include "../lib/common/threading.h"
  31. #define MAX_IO_JOBS (10)
  32. typedef struct {
  33. /* These struct fields should be set only on creation and not changed afterwards */
  34. POOL_ctx* threadPool;
  35. int threadPoolActive;
  36. int totalIoJobs;
  37. const FIO_prefs_t* prefs;
  38. POOL_function poolFunction;
  39. /* Controls the file we currently write to, make changes only by using provided utility functions */
  40. FILE* file;
  41. /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
  42. * only be mutated after locking the mutex */
  43. ZSTD_pthread_mutex_t ioJobsMutex;
  44. void* availableJobs[MAX_IO_JOBS];
  45. int availableJobsCount;
  46. size_t jobBufferSize;
  47. } IOPoolCtx_t;
  48. typedef struct {
  49. IOPoolCtx_t base;
  50. /* State regarding the currently read file */
  51. int reachedEof;
  52. U64 nextReadOffset;
  53. U64 waitingOnOffset;
  54. /* We may hold an IOJob object as needed if we actively expose its buffer. */
  55. void *currentJobHeld;
  56. /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
  57. * the first of them. Shouldn't be accessed from outside ot utility functions. */
  58. U8 *coalesceBuffer;
  59. /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
  60. * change when consuming / refilling buffer. */
  61. U8 *srcBuffer;
  62. size_t srcBufferLoaded;
  63. /* We need to know what tasks completed so we can use their buffers when their time comes.
  64. * Should only be accessed after locking base.ioJobsMutex . */
  65. void* completedJobs[MAX_IO_JOBS];
  66. int completedJobsCount;
  67. ZSTD_pthread_cond_t jobCompletedCond;
  68. } ReadPoolCtx_t;
  69. typedef struct {
  70. IOPoolCtx_t base;
  71. unsigned storedSkips;
  72. } WritePoolCtx_t;
  73. typedef struct {
  74. /* These fields are automatically set and shouldn't be changed by non WritePool code. */
  75. void *ctx;
  76. FILE* file;
  77. void *buffer;
  78. size_t bufferSize;
  79. /* This field should be changed before a job is queued for execution and should contain the number
  80. * of bytes to write from the buffer. */
  81. size_t usedBufferSize;
  82. U64 offset;
  83. } IOJob_t;
  84. /* AIO_supported:
  85. * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
  86. int AIO_supported(void);
  87. /* AIO_WritePool_releaseIoJob:
  88. * Releases an acquired job back to the pool. Doesn't execute the job. */
  89. void AIO_WritePool_releaseIoJob(IOJob_t *job);
  90. /* AIO_WritePool_acquireJob:
  91. * Returns an available write job to be used for a future write. */
  92. IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
  93. /* AIO_WritePool_enqueueAndReacquireWriteJob:
  94. * Enqueues a write job for execution and acquires a new one.
  95. * After execution `job`'s pointed value would change to the newly acquired job.
  96. * Make sure to set `usedBufferSize` to the wanted length before call.
  97. * The queued job shouldn't be used directly after queueing it. */
  98. void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
  99. /* AIO_WritePool_sparseWriteEnd:
  100. * Ends sparse writes to the current file.
  101. * Blocks on completion of all current write jobs before executing. */
  102. void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
  103. /* AIO_WritePool_setFile:
  104. * Sets the destination file for future writes in the pool.
  105. * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
  106. * Also requires ending of sparse write if a previous file was used in sparse mode. */
  107. void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
  108. /* AIO_WritePool_getFile:
  109. * Returns the file the writePool is currently set to write to. */
  110. FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
  111. /* AIO_WritePool_closeFile:
  112. * Ends sparse write and closes the writePool's current file and sets the file to NULL.
  113. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
  114. int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
  115. /* AIO_WritePool_create:
  116. * Allocates and sets and a new write pool including its included jobs.
  117. * bufferSize should be set to the maximal buffer we want to write to at a time. */
  118. WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
  119. /* AIO_WritePool_free:
  120. * Frees and releases a writePool and its resources. Closes destination file. */
  121. void AIO_WritePool_free(WritePoolCtx_t* ctx);
  122. /* AIO_WritePool_setAsync:
  123. * Allows (de)activating async mode, to be used when the expected overhead
  124. * of asyncio costs more than the expected gains. */
  125. void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
  126. /* AIO_ReadPool_create:
  127. * Allocates and sets and a new readPool including its included jobs.
  128. * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
  129. * as our basic read size. */
  130. ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
  131. /* AIO_ReadPool_free:
  132. * Frees and releases a readPool and its resources. Closes source file. */
  133. void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
  134. /* AIO_ReadPool_setAsync:
  135. * Allows (de)activating async mode, to be used when the expected overhead
  136. * of asyncio costs more than the expected gains. */
  137. void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
  138. /* AIO_ReadPool_consumeBytes:
  139. * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
  140. void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
  141. /* AIO_ReadPool_fillBuffer:
  142. * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
  143. * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
  144. * Return value is the number of bytes added to the buffer.
  145. * Note that srcBuffer might have up to 2 times bufferSize bytes. */
  146. size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
  147. /* AIO_ReadPool_consumeAndRefill:
  148. * Consumes the current buffer and refills it with bufferSize bytes. */
  149. size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
  150. /* AIO_ReadPool_setFile:
  151. * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
  152. * Waits for all current enqueued tasks to complete if a previous file was set. */
  153. void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
  154. /* AIO_ReadPool_getFile:
  155. * Returns the current file set for the read pool. */
  156. FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
  157. /* AIO_ReadPool_closeFile:
  158. * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
  159. int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
  160. #if defined (__cplusplus)
  161. }
  162. #endif
  163. #endif /* ZSTD_FILEIO_ASYNCIO_H */