pipe.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #include "pipe.h"
  2. #include <util/generic/yexception.h>
  3. #include <cstdio>
  4. #include <cerrno>
  5. class TPipeBase::TImpl {
  6. public:
  7. inline TImpl(const TString& command, const char* mode)
  8. : Pipe_(nullptr)
  9. {
  10. #ifndef _freebsd_
  11. if (strcmp(mode, "r+") == 0) {
  12. ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD";
  13. }
  14. #endif
  15. Pipe_ = ::popen(command.data(), mode);
  16. if (Pipe_ == nullptr) {
  17. ythrow TSystemError() << "failed to open pipe: " << command.Quote();
  18. }
  19. }
  20. inline ~TImpl() {
  21. if (Pipe_ != nullptr) {
  22. ::pclose(Pipe_);
  23. }
  24. }
  25. public:
  26. FILE* Pipe_;
  27. };
  28. TPipeBase::TPipeBase(const TString& command, const char* mode)
  29. : Impl_(new TImpl(command, mode))
  30. {
  31. }
  32. TPipeBase::~TPipeBase() = default;
  33. TPipeInput::TPipeInput(const TString& command)
  34. : TPipeBase(command, "r")
  35. {
  36. }
  37. size_t TPipeInput::DoRead(void* buf, size_t len) {
  38. if (Impl_->Pipe_ == nullptr) {
  39. return 0;
  40. }
  41. size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_);
  42. if (bytesRead == 0) {
  43. int exitStatus = ::pclose(Impl_->Pipe_);
  44. Impl_->Pipe_ = nullptr;
  45. if (exitStatus == -1) {
  46. ythrow TSystemError() << "pclose() failed";
  47. } else if (exitStatus != 0) {
  48. ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
  49. }
  50. }
  51. return bytesRead;
  52. }
  53. TPipeOutput::TPipeOutput(const TString& command)
  54. : TPipeBase(command, "w")
  55. {
  56. }
  57. void TPipeOutput::DoWrite(const void* buf, size_t len) {
  58. if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) {
  59. ythrow TSystemError() << "fwrite failed";
  60. }
  61. }
  62. void TPipeOutput::Close() {
  63. int exitStatus = ::pclose(Impl_->Pipe_);
  64. Impl_->Pipe_ = nullptr;
  65. if (exitStatus == -1) {
  66. ythrow TSystemError() << "pclose() failed";
  67. } else if (exitStatus != 0) {
  68. ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
  69. }
  70. }
  71. TPipedBase::TPipedBase(PIPEHANDLE fd)
  72. : Handle_(fd)
  73. {
  74. }
  75. TPipedBase::~TPipedBase() {
  76. if (Handle_.IsOpen()) {
  77. Handle_.Close();
  78. }
  79. }
  80. TPipedInput::TPipedInput(PIPEHANDLE fd)
  81. : TPipedBase(fd)
  82. {
  83. }
  84. TPipedInput::~TPipedInput() = default;
  85. size_t TPipedInput::DoRead(void* buf, size_t len) {
  86. if (!Handle_.IsOpen()) {
  87. return 0;
  88. }
  89. return Handle_.Read(buf, len);
  90. }
  91. TPipedOutput::TPipedOutput(PIPEHANDLE fd)
  92. : TPipedBase(fd)
  93. {
  94. }
  95. TPipedOutput::~TPipedOutput() = default;
  96. void TPipedOutput::DoWrite(const void* buf, size_t len) {
  97. if (!Handle_.IsOpen() || static_cast<ssize_t>(len) != Handle_.Write(buf, len)) {
  98. ythrow TSystemError() << "pipe writing failed";
  99. }
  100. }