pipe.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #include "pipe.h"
  2. #include <util/generic/yexception.h>
  3. ssize_t TPipeHandle::Read(void* buffer, size_t byteCount) const noexcept {
  4. #ifdef _win_
  5. return recv(Fd_, (char*)buffer, byteCount, 0);
  6. #else
  7. return read(Fd_, buffer, byteCount);
  8. #endif
  9. }
  10. ssize_t TPipeHandle::Write(const void* buffer, size_t byteCount) const noexcept {
  11. #ifdef _win_
  12. return send(Fd_, (const char*)buffer, byteCount, 0);
  13. #else
  14. return write(Fd_, buffer, byteCount);
  15. #endif
  16. }
  17. bool TPipeHandle::Close() noexcept {
  18. bool ok = true;
  19. if (Fd_ != INVALID_PIPEHANDLE) {
  20. #ifdef _win_
  21. ok = closesocket(Fd_) == 0;
  22. #else
  23. ok = close(Fd_) == 0;
  24. #endif
  25. }
  26. Fd_ = INVALID_PIPEHANDLE;
  27. return ok;
  28. }
  29. void TPipeHandle::Pipe(TPipeHandle& reader, TPipeHandle& writer, EOpenMode mode) {
  30. PIPEHANDLE fds[2];
  31. #ifdef _win_
  32. int r = SocketPair(fds, false /* non-overlapped */, mode & CloseOnExec /* cloexec */);
  33. #elif defined(_linux_)
  34. int r = pipe2(fds, mode & CloseOnExec ? O_CLOEXEC : 0);
  35. #else
  36. int r = pipe(fds);
  37. #endif
  38. if (r < 0) {
  39. ythrow TFileError() << "failed to create a pipe";
  40. }
  41. #if !defined(_win_) && !defined(_linux_)
  42. // Non-atomic wrt exec
  43. if (mode & CloseOnExec) {
  44. for (int i = 0; i < 2; ++i) {
  45. int flags = fcntl(fds[i], F_GETFD, 0);
  46. if (flags < 0) {
  47. ythrow TFileError() << "failed to get flags";
  48. }
  49. int r = fcntl(fds[i], F_SETFD, flags | FD_CLOEXEC);
  50. if (r < 0) {
  51. ythrow TFileError() << "failed to set flags";
  52. }
  53. }
  54. }
  55. #endif
  56. TPipeHandle(fds[0]).Swap(reader);
  57. TPipeHandle(fds[1]).Swap(writer);
  58. }
  59. class TPipe::TImpl: public TAtomicRefCount<TImpl> {
  60. public:
  61. TImpl()
  62. : Handle_(INVALID_PIPEHANDLE)
  63. {
  64. }
  65. TImpl(PIPEHANDLE fd)
  66. : Handle_(fd)
  67. {
  68. }
  69. inline ~TImpl() {
  70. Close();
  71. }
  72. bool IsOpen() {
  73. return Handle_.IsOpen();
  74. }
  75. inline void Close() {
  76. if (!Handle_.IsOpen()) {
  77. return;
  78. }
  79. if (!Handle_.Close()) {
  80. ythrow TFileError() << "failed to close pipe";
  81. }
  82. }
  83. TPipeHandle& GetHandle() noexcept {
  84. return Handle_;
  85. }
  86. size_t Read(void* buffer, size_t count) const {
  87. ssize_t r = Handle_.Read(buffer, count);
  88. if (r < 0) {
  89. ythrow TFileError() << "failed to read from pipe";
  90. }
  91. return r;
  92. }
  93. size_t Write(const void* buffer, size_t count) const {
  94. ssize_t r = Handle_.Write(buffer, count);
  95. if (r < 0) {
  96. ythrow TFileError() << "failed to write to pipe";
  97. }
  98. return r;
  99. }
  100. private:
  101. TPipeHandle Handle_;
  102. };
  103. TPipe::TPipe()
  104. : Impl_(new TImpl)
  105. {
  106. }
  107. TPipe::TPipe(PIPEHANDLE fd)
  108. : Impl_(new TImpl(fd))
  109. {
  110. }
  111. TPipe::~TPipe() = default;
  112. void TPipe::Close() {
  113. Impl_->Close();
  114. }
  115. PIPEHANDLE TPipe::GetHandle() const noexcept {
  116. return Impl_->GetHandle();
  117. }
  118. bool TPipe::IsOpen() const noexcept {
  119. return Impl_->IsOpen();
  120. }
  121. size_t TPipe::Read(void* buf, size_t len) const {
  122. return Impl_->Read(buf, len);
  123. }
  124. size_t TPipe::Write(const void* buf, size_t len) const {
  125. return Impl_->Write(buf, len);
  126. }
  127. void TPipe::Pipe(TPipe& reader, TPipe& writer, EOpenMode mode) {
  128. TImplRef r(new TImpl());
  129. TImplRef w(new TImpl());
  130. TPipeHandle::Pipe(r->GetHandle(), w->GetHandle(), mode);
  131. r.Swap(reader.Impl_);
  132. w.Swap(writer.Impl_);
  133. }