RLE.cc 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "RLEv1.hh"
  19. #include "RLEv2.hh"
  20. #include "orc/Exceptions.hh"
  21. namespace orc {
  22. RleEncoder::~RleEncoder() {
  23. // PASS
  24. }
  25. RleDecoder::~RleDecoder() {
  26. // PASS
  27. }
  28. std::unique_ptr<RleEncoder> createRleEncoder(std::unique_ptr<BufferedOutputStream> output,
  29. bool isSigned, RleVersion version, MemoryPool&,
  30. bool alignedBitpacking) {
  31. switch (static_cast<int64_t>(version)) {
  32. case RleVersion_1:
  33. return std::make_unique<RleEncoderV1>(std::move(output), isSigned);
  34. case RleVersion_2:
  35. return std::make_unique<RleEncoderV2>(std::move(output), isSigned, alignedBitpacking);
  36. default:
  37. throw NotImplementedYet("Not implemented yet");
  38. }
  39. }
  40. std::unique_ptr<RleDecoder> createRleDecoder(std::unique_ptr<SeekableInputStream> input,
  41. bool isSigned, RleVersion version, MemoryPool& pool,
  42. ReaderMetrics* metrics) {
  43. switch (static_cast<int64_t>(version)) {
  44. case RleVersion_1:
  45. return std::make_unique<RleDecoderV1>(std::move(input), isSigned, metrics);
  46. case RleVersion_2:
  47. return std::make_unique<RleDecoderV2>(std::move(input), isSigned, pool, metrics);
  48. default:
  49. throw NotImplementedYet("Not implemented yet");
  50. }
  51. }
  52. template <typename T>
  53. void RleEncoder::add(const T* data, uint64_t numValues, const char* notNull) {
  54. for (uint64_t i = 0; i < numValues; ++i) {
  55. if (!notNull || notNull[i]) {
  56. write(static_cast<int64_t>(data[i]));
  57. }
  58. }
  59. }
  60. void RleEncoder::add(const int64_t* data, uint64_t numValues, const char* notNull) {
  61. add<int64_t>(data, numValues, notNull);
  62. }
  63. void RleEncoder::add(const int32_t* data, uint64_t numValues, const char* notNull) {
  64. add<int32_t>(data, numValues, notNull);
  65. }
  66. void RleEncoder::add(const int16_t* data, uint64_t numValues, const char* notNull) {
  67. add<int16_t>(data, numValues, notNull);
  68. }
  69. void RleEncoder::writeVslong(int64_t val) {
  70. writeVulong((val << 1) ^ (val >> 63));
  71. }
  72. void RleEncoder::writeVulong(int64_t val) {
  73. while (true) {
  74. if ((val & ~0x7f) == 0) {
  75. writeByte(static_cast<char>(val));
  76. return;
  77. } else {
  78. writeByte(static_cast<char>(0x80 | (val & 0x7f)));
  79. // cast val to unsigned so as to force 0-fill right shift
  80. val = (static_cast<uint64_t>(val) >> 7);
  81. }
  82. }
  83. }
  84. void RleEncoder::writeByte(char c) {
  85. if (bufferPosition == bufferLength) {
  86. int addedSize = 0;
  87. if (!outputStream->Next(reinterpret_cast<void**>(&buffer), &addedSize)) {
  88. throw std::bad_alloc();
  89. }
  90. bufferPosition = 0;
  91. bufferLength = static_cast<size_t>(addedSize);
  92. }
  93. buffer[bufferPosition++] = c;
  94. }
  95. void RleEncoder::recordPosition(PositionRecorder* recorder) const {
  96. uint64_t flushedSize = outputStream->getSize();
  97. uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
  98. if (outputStream->isCompressed()) {
  99. recorder->add(flushedSize);
  100. recorder->add(unflushedSize);
  101. } else {
  102. flushedSize -= static_cast<uint64_t>(bufferLength);
  103. recorder->add(flushedSize + unflushedSize);
  104. }
  105. recorder->add(static_cast<uint64_t>(numLiterals));
  106. }
  107. } // namespace orc