direct_io.cpp 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. #include "direct_io.h"
  2. #include <util/generic/singleton.h>
  3. #include <util/generic/yexception.h>
  4. #include <util/system/info.h>
  5. #include "align.h"
  6. #ifdef _linux_
  7. #include <util/string/cast.h>
  8. #include <linux/version.h>
  9. #include <sys/utsname.h>
  10. #endif
  11. namespace {
  12. struct TAlignmentCalcer {
  13. inline TAlignmentCalcer()
  14. : Alignment(0)
  15. {
  16. #ifdef _linux_
  17. utsname sysInfo;
  18. Y_ABORT_UNLESS(!uname(&sysInfo), "Error while call uname: %s", LastSystemErrorText());
  19. TStringBuf release(sysInfo.release);
  20. release = release.substr(0, release.find_first_not_of(".0123456789"));
  21. int v1 = FromString<int>(release.NextTok('.'));
  22. int v2 = FromString<int>(release.NextTok('.'));
  23. int v3 = FromString<int>(release.NextTok('.'));
  24. int linuxVersionCode = KERNEL_VERSION(v1, v2, v3);
  25. if (linuxVersionCode < KERNEL_VERSION(2, 4, 10)) {
  26. Alignment = 0;
  27. } else if (linuxVersionCode < KERNEL_VERSION(2, 6, 0)) {
  28. Alignment = NSystemInfo::GetPageSize();
  29. } else {
  30. // Default alignment used to be 512, but most modern devices rely on 4k physical blocks.
  31. // 4k alignment works well for both 512 and 4k blocks and doesn't require 512e support in the kernel.
  32. // See IGNIETFERRO-946.
  33. Alignment = 4096;
  34. }
  35. #endif
  36. }
  37. size_t Alignment;
  38. };
  39. } // namespace
  40. TDirectIOBufferedFile::TDirectIOBufferedFile(const TString& path, EOpenMode oMode, size_t buflen /*= 1 << 17*/)
  41. : File(path, oMode)
  42. , Alignment(0)
  43. , DataLen(0)
  44. , ReadPosition(0)
  45. , WritePosition(0)
  46. , DirectIO(false)
  47. {
  48. if (buflen == 0) {
  49. ythrow TFileError() << "unbuffered usage is not supported";
  50. }
  51. if (oMode & Direct) {
  52. Alignment = Singleton<TAlignmentCalcer>()->Alignment;
  53. SetDirectIO(true);
  54. }
  55. WritePosition = File.GetLength();
  56. FlushedBytes = WritePosition;
  57. FlushedToDisk = FlushedBytes;
  58. BufLen = (!!Alignment) ? AlignUp(buflen, Alignment) : buflen;
  59. BufferStorage.Resize(BufLen + Alignment);
  60. Buffer = (!!Alignment) ? AlignUp(BufferStorage.Data(), Alignment) : BufferStorage.Data();
  61. }
  62. #define DIRECT_IO_FLAGS (O_DIRECT | O_SYNC)
  63. void TDirectIOBufferedFile::SetDirectIO(bool value) {
  64. #ifdef _linux_
  65. if (DirectIO == value) {
  66. return;
  67. }
  68. if (!!Alignment && value) {
  69. (void)fcntl(File.GetHandle(), F_SETFL, fcntl(File.GetHandle(), F_GETFL) | DIRECT_IO_FLAGS);
  70. } else {
  71. (void)fcntl(File.GetHandle(), F_SETFL, fcntl(File.GetHandle(), F_GETFL) & ~DIRECT_IO_FLAGS);
  72. }
  73. DirectIO = value;
  74. #else
  75. DirectIO = value;
  76. #endif
  77. }
  78. TDirectIOBufferedFile::~TDirectIOBufferedFile() {
  79. try {
  80. Finish();
  81. } catch (...) {
  82. }
  83. }
  84. void TDirectIOBufferedFile::FlushData() {
  85. WriteToFile(Buffer, DataLen, FlushedBytes);
  86. DataLen = 0;
  87. File.FlushData();
  88. }
  89. void TDirectIOBufferedFile::Finish() {
  90. FlushData();
  91. File.Flush();
  92. File.Close();
  93. }
  94. void TDirectIOBufferedFile::Write(const void* buffer, size_t byteCount) {
  95. WriteToBuffer(buffer, byteCount, DataLen);
  96. WritePosition += byteCount;
  97. }
  98. void TDirectIOBufferedFile::WriteToBuffer(const void* buf, size_t len, ui64 position) {
  99. while (len > 0) {
  100. size_t writeLen = Min<size_t>(BufLen - position, len);
  101. if (writeLen > 0) {
  102. memcpy((char*)Buffer + position, buf, writeLen);
  103. buf = (char*)buf + writeLen;
  104. len -= writeLen;
  105. DataLen = (size_t)Max(position + writeLen, (ui64)DataLen);
  106. position += writeLen;
  107. }
  108. if (DataLen == BufLen) {
  109. WriteToFile(Buffer, DataLen, FlushedBytes);
  110. DataLen = 0;
  111. position = 0;
  112. }
  113. }
  114. }
  115. void TDirectIOBufferedFile::WriteToFile(const void* buf, size_t len, ui64 position) {
  116. if (!!len) {
  117. SetDirectIO(IsAligned(buf) && IsAligned(len) && IsAligned(position));
  118. File.Pwrite(buf, len, position);
  119. FlushedBytes = Max(FlushedBytes, position + len);
  120. FlushedToDisk = Min(FlushedToDisk, position);
  121. }
  122. }
  123. size_t TDirectIOBufferedFile::PreadSafe(void* buffer, size_t byteCount, ui64 offset) {
  124. if (FlushedToDisk < offset + byteCount) {
  125. File.FlushData();
  126. FlushedToDisk = FlushedBytes;
  127. }
  128. #ifdef _linux_
  129. ssize_t bytesRead = 0;
  130. do {
  131. bytesRead = pread(File.GetHandle(), buffer, byteCount, offset);
  132. } while (bytesRead == -1 && errno == EINTR);
  133. if (bytesRead < 0) {
  134. ythrow yexception() << "error while pread file: " << LastSystemError() << "(" << LastSystemErrorText() << ")";
  135. }
  136. return bytesRead;
  137. #else
  138. return File.Pread(buffer, byteCount, offset);
  139. #endif
  140. }
  141. size_t TDirectIOBufferedFile::ReadFromFile(void* buffer, size_t byteCount, ui64 offset) {
  142. SetDirectIO(true);
  143. ui64 bytesRead = 0;
  144. while (byteCount) {
  145. if (!Alignment || IsAligned(buffer) && IsAligned(byteCount) && IsAligned(offset)) {
  146. if (const ui64 fromFile = PreadSafe(buffer, byteCount, offset)) {
  147. buffer = (char*)buffer + fromFile;
  148. byteCount -= fromFile;
  149. offset += fromFile;
  150. bytesRead += fromFile;
  151. } else {
  152. return bytesRead;
  153. }
  154. } else {
  155. break;
  156. }
  157. }
  158. if (!byteCount) {
  159. return bytesRead;
  160. }
  161. ui64 bufSize = AlignUp(Min<size_t>(BufferStorage.Size(), byteCount + (Alignment << 1)), Alignment);
  162. TBuffer readBufferStorage(bufSize + Alignment);
  163. char* readBuffer = AlignUp((char*)readBufferStorage.Data(), Alignment);
  164. while (byteCount) {
  165. ui64 begin = AlignDown(offset, (ui64)Alignment);
  166. ui64 end = AlignUp(offset + byteCount, (ui64)Alignment);
  167. ui64 toRead = Min(end - begin, bufSize);
  168. ui64 fromFile = PreadSafe(readBuffer, toRead, begin);
  169. if (!fromFile) {
  170. break;
  171. }
  172. ui64 delta = offset - begin;
  173. ui64 count = Min<ui64>(fromFile - delta, byteCount);
  174. memcpy(buffer, readBuffer + delta, count);
  175. buffer = (char*)buffer + count;
  176. byteCount -= count;
  177. offset += count;
  178. bytesRead += count;
  179. }
  180. return bytesRead;
  181. }
  182. size_t TDirectIOBufferedFile::Read(void* buffer, size_t byteCount) {
  183. size_t bytesRead = Pread(buffer, byteCount, ReadPosition);
  184. ReadPosition += bytesRead;
  185. return bytesRead;
  186. }
  187. size_t TDirectIOBufferedFile::Pread(void* buffer, size_t byteCount, ui64 offset) {
  188. if (!byteCount) {
  189. return 0;
  190. }
  191. size_t readFromFile = 0;
  192. if (offset < FlushedBytes) {
  193. readFromFile = Min<ui64>(byteCount, FlushedBytes - offset);
  194. size_t bytesRead = ReadFromFile(buffer, readFromFile, offset);
  195. if (bytesRead != readFromFile || readFromFile == byteCount) {
  196. return bytesRead;
  197. }
  198. }
  199. ui64 start = offset > FlushedBytes ? offset - FlushedBytes : 0;
  200. ui64 count = Min<ui64>(DataLen - start, byteCount - readFromFile);
  201. if (count) {
  202. memcpy((char*)buffer + readFromFile, (const char*)Buffer + start, count);
  203. }
  204. return count + readFromFile;
  205. }
  206. void TDirectIOBufferedFile::Pwrite(const void* buffer, size_t byteCount, ui64 offset) {
  207. if (offset > WritePosition) {
  208. ythrow yexception() << "cannot frite to position" << offset;
  209. }
  210. size_t writeToBufer = byteCount;
  211. size_t writeToFile = 0;
  212. if (FlushedBytes > offset) {
  213. writeToFile = Min<ui64>(byteCount, FlushedBytes - offset);
  214. WriteToFile(buffer, writeToFile, offset);
  215. writeToBufer -= writeToFile;
  216. }
  217. if (writeToBufer > 0) {
  218. ui64 bufferOffset = offset + writeToFile - FlushedBytes;
  219. WriteToBuffer((const char*)buffer + writeToFile, writeToBufer, bufferOffset);
  220. }
  221. }