Browse Source

add SeaweedInputStream

Chris Lu 6 years ago
parent
commit
c6a567acab

+ 137 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBuffer {
+
+  private SeaweedInputStream stream;
+  private long offset;                   // offset within the file for the buffer
+  private int length;                    // actual length, set after the buffer is filles
+  private int requestedLength;           // requested length of the read
+  private byte[] buffer;                 // the buffer itself
+  private int bufferindex = -1;          // index in the buffers array in Buffer manager
+  private ReadBufferStatus status;             // status of the buffer
+  private CountDownLatch latch = null;   // signaled when the buffer is done reading, so any client
+  // waiting on this buffer gets unblocked
+
+  // fields to help with eviction logic
+  private long timeStamp = 0;  // tick at which buffer became available to read
+  private boolean isFirstByteConsumed = false;
+  private boolean isLastByteConsumed = false;
+  private boolean isAnyByteConsumed = false;
+
+  public SeaweedInputStream getStream() {
+    return stream;
+  }
+
+  public void setStream(SeaweedInputStream stream) {
+    this.stream = stream;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  public int getLength() {
+    return length;
+  }
+
+  public void setLength(int length) {
+    this.length = length;
+  }
+
+  public int getRequestedLength() {
+    return requestedLength;
+  }
+
+  public void setRequestedLength(int requestedLength) {
+    this.requestedLength = requestedLength;
+  }
+
+  public byte[] getBuffer() {
+    return buffer;
+  }
+
+  public void setBuffer(byte[] buffer) {
+    this.buffer = buffer;
+  }
+
+  public int getBufferindex() {
+    return bufferindex;
+  }
+
+  public void setBufferindex(int bufferindex) {
+    this.bufferindex = bufferindex;
+  }
+
+  public ReadBufferStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(ReadBufferStatus status) {
+    this.status = status;
+  }
+
+  public CountDownLatch getLatch() {
+    return latch;
+  }
+
+  public void setLatch(CountDownLatch latch) {
+    this.latch = latch;
+  }
+
+  public long getTimeStamp() {
+    return timeStamp;
+  }
+
+  public void setTimeStamp(long timeStamp) {
+    this.timeStamp = timeStamp;
+  }
+
+  public boolean isFirstByteConsumed() {
+    return isFirstByteConsumed;
+  }
+
+  public void setFirstByteConsumed(boolean isFirstByteConsumed) {
+    this.isFirstByteConsumed = isFirstByteConsumed;
+  }
+
+  public boolean isLastByteConsumed() {
+    return isLastByteConsumed;
+  }
+
+  public void setLastByteConsumed(boolean isLastByteConsumed) {
+    this.isLastByteConsumed = isLastByteConsumed;
+  }
+
+  public boolean isAnyByteConsumed() {
+    return isAnyByteConsumed;
+  }
+
+  public void setAnyByteConsumed(boolean isAnyByteConsumed) {
+    this.isAnyByteConsumed = isAnyByteConsumed;
+  }
+
+}

+ 394 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java

@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package seaweed.hdfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ */
+final class ReadBufferManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
+
+  private static final int NUM_BUFFERS = 16;
+  private static final int BLOCK_SIZE = 4 * 1024 * 1024;
+  private static final int NUM_THREADS = 8;
+  private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+
+  private Thread[] threads = new Thread[NUM_THREADS];
+  private byte[][] buffers;    // array of byte[] buffers, to hold the data that is read
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] array that are available
+
+  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
+  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
+  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
+  private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
+
+  static {
+    BUFFER_MANAGER = new ReadBufferManager();
+    BUFFER_MANAGER.init();
+  }
+
+  static ReadBufferManager getBufferManager() {
+    return BUFFER_MANAGER;
+  }
+
+  private void init() {
+    buffers = new byte[NUM_BUFFERS][];
+    for (int i = 0; i < NUM_BUFFERS; i++) {
+      buffers[i] = new byte[BLOCK_SIZE];  // same buffers are reused. The byte array never goes back to GC
+      freeList.add(i);
+    }
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Thread t = new Thread(new ReadBufferWorker(i));
+      t.setDaemon(true);
+      threads[i] = t;
+      t.setName("SeaweedFS-prefetch-" + i);
+      t.start();
+    }
+    ReadBufferWorker.UNLEASH_WORKERS.countDown();
+  }
+
+  // hide instance constructor
+  private ReadBufferManager() {
+  }
+
+
+  /*
+   *
+   *  SeaweedInputStream-facing methods
+   *
+   */
+
+
+  /**
+   * {@link SeaweedInputStream} calls this method to queue read-aheads.
+   *
+   * @param stream          The {@link SeaweedInputStream} for which to do the read-ahead
+   * @param requestedOffset The offset in the file which shoukd be read
+   * @param requestedLength The length to read
+   */
+  void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
+          stream.getPath(), requestedOffset, requestedLength);
+    }
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream, requestedOffset)) {
+        return; // already queued, do not queue again
+      }
+      if (freeList.isEmpty() && !tryEvict()) {
+        return; // no buffers available, cannot queue anything
+      }
+
+      buffer = new ReadBuffer();
+      buffer.setStream(stream);
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+
+      Integer bufferIndex = freeList.pop();  // will return a value, since we have checked size > 0 already
+
+      buffer.setBuffer(buffers[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      readAheadQueue.add(buffer);
+      notifyAll();
+    }
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
+          stream.getPath(), requestedOffset, buffer.getBufferindex());
+    }
+  }
+
+
+  /**
+   * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
+   * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
+   *
+   * @param stream   the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will be written into from offset 0.
+   * @return the number of bytes read
+   */
+  int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
+    // not synchronized, so have to be careful with locking
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("getBlock for file {}  position {}  thread {}",
+          stream.getPath(), position, Thread.currentThread().getName());
+    }
+
+    waitForProcess(stream, position);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+    }
+    if (bytesRead > 0) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done read from Cache for {} position {} length {}",
+            stream.getPath(), position, bytesRead);
+      }
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
+    return 0;
+  }
+
+  /*
+   *
+   *  Internal methods
+   *
+   */
+
+  private void waitForProcess(final SeaweedInputStream stream, final long position) {
+    ReadBuffer readBuf;
+    synchronized (this) {
+      clearFromReadAheadQueue(stream, position);
+      readBuf = getFromList(inProgressList, stream, position);
+    }
+    if (readBuf != null) {         // if in in-progress queue, then block for it
+      try {
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
+              stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
+        }
+        readBuf.getLatch().await();  // blocking wait on the caller stream's thread
+        // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
+        // is done processing it (in doneReading). There, the latch is set after removing the buffer from
+        // inProgressList. So this latch is safe to be outside the synchronized block.
+        // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
+        // while waiting, so no one will be able to  change any state. If this becomes more complex in the future,
+        // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("latch done for file {} buffer idx {} length {}",
+            stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
+      }
+    }
+  }
+
+  /**
+   * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
+   * The objective is to find just one buffer - there is no advantage to evicting more than one.
+   *
+   * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
+   */
+  private synchronized boolean tryEvict() {
+    ReadBuffer nodeToEvict = null;
+    if (completedReadList.size() <= 0) {
+      return false;  // there are no evict-able buffers
+    }
+
+    // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+    if (nodeToEvict != null) {
+      return evict(nodeToEvict);
+    }
+
+    // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.isAnyByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+
+    if (nodeToEvict != null) {
+      return evict(nodeToEvict);
+    }
+
+    // next, try any old nodes that have not been consumed
+    long earliestBirthday = Long.MAX_VALUE;
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.getTimeStamp() < earliestBirthday) {
+        nodeToEvict = buf;
+        earliestBirthday = buf.getTimeStamp();
+      }
+    }
+    if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+      return evict(nodeToEvict);
+    }
+
+    // nothing can be evicted
+    return false;
+  }
+
+  private boolean evict(final ReadBuffer buf) {
+    freeList.push(buf.getBufferindex());
+    completedReadList.remove(buf);
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
+          buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
+    }
+    return true;
+  }
+
+  private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(readAheadQueue, stream, requestedOffset)
+        || isInList(inProgressList, stream, requestedOffset)
+        || isInList(completedReadList, stream, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
+    return (getFromList(list, stream, requestedOffset) != null);
+  }
+
+  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
+    for (ReadBuffer buffer : list) {
+      if (buffer.getStream() == stream) {
+        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+            && requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+          return buffer;
+        } else if (requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
+          return buffer;
+        }
+      }
+    }
+    return null;
+  }
+
+  private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
+    ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+    if (buffer != null) {
+      readAheadQueue.remove(buffer);
+      notifyAll();   // lock is held in calling method
+      freeList.push(buffer.getBufferindex());
+    }
+  }
+
+  private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
+                                         final byte[] buffer) {
+    ReadBuffer buf = getFromList(completedReadList, stream, position);
+    if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+      return 0;
+    }
+    int cursor = (int) (position - buf.getOffset());
+    int availableLengthInBuffer = buf.getLength() - cursor;
+    int lengthToCopy = Math.min(length, availableLengthInBuffer);
+    System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+    if (cursor == 0) {
+      buf.setFirstByteConsumed(true);
+    }
+    if (cursor + lengthToCopy == buf.getLength()) {
+      buf.setLastByteConsumed(true);
+    }
+    buf.setAnyByteConsumed(true);
+    return lengthToCopy;
+  }
+
+  /*
+   *
+   *  ReadBufferWorker-thread-facing methods
+   *
+   */
+
+  /**
+   * ReadBufferWorker thread calls this to get the next buffer that it should work on.
+   *
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
+   */
+  ReadBuffer getNextBlockToRead() throws InterruptedException {
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      //buffer = readAheadQueue.take();  // blocking method
+      while (readAheadQueue.size() == 0) {
+        wait();
+      }
+      buffer = readAheadQueue.remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;            // should never happen
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      inProgressList.add(buffer);
+    }
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+          buffer.getStream().getPath(), buffer.getOffset());
+    }
+    return buffer;
+  }
+
+  /**
+   * ReadBufferWorker thread calls this method to post completion.
+   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
+   */
+  void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), bytesActuallyRead);
+    }
+    synchronized (this) {
+      inProgressList.remove(buffer);
+      if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+        buffer.setStatus(ReadBufferStatus.AVAILABLE);
+        buffer.setTimeStamp(currentTimeMillis());
+        buffer.setLength(bytesActuallyRead);
+        completedReadList.add(buffer);
+      } else {
+        freeList.push(buffer.getBufferindex());
+        // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+      }
+    }
+    //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
+  }
+
+  /**
+   * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
+   * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
+   * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
+   * Note: it is not monotonic across Sockets, and even within a CPU, its only the
+   * more recent parts which share a clock across all cores.
+   *
+   * @return current time in milliseconds
+   */
+  private long currentTimeMillis() {
+    return System.nanoTime() / 1000 / 1000;
+  }
+}

+ 29 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+/**
+ * The ReadBufferStatus for Rest AbfsClient
+ */
+public enum ReadBufferStatus {
+  NOT_AVAILABLE,  // buffers sitting in readaheadqueue have this stats
+  READING_IN_PROGRESS,  // reading is in progress on this buffer. Buffer should be in inProgressList
+  AVAILABLE,  // data is available in buffer. It should be in completedList
+  READ_FAILED  // read completed, but failed.
+}

+ 70 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBufferWorker implements Runnable {
+
+  protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
+  private int id;
+
+  ReadBufferWorker(final int id) {
+    this.id = id;
+  }
+
+  /**
+   * return the ID of ReadBufferWorker.
+   */
+  public int getId() {
+    return this.id;
+  }
+
+  /**
+   * Waits until a buffer becomes available in ReadAheadQueue.
+   * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
+   * Rinse and repeat. Forever.
+   */
+  public void run() {
+    try {
+      UNLEASH_WORKERS.await();
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+    ReadBuffer buffer;
+    while (true) {
+      try {
+        buffer = bufferManager.getNextBlockToRead();   // blocks, until a buffer is available for this thread
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        return;
+      }
+      if (buffer != null) {
+        try {
+          // do the actual read, from the file.
+          int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+          bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead);  // post result back to ReadBufferManager
+        } catch (Exception ex) {
+          bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
+        }
+      }
+    }
+  }
+}

+ 2 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@@ -57,6 +57,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
     }
 
     public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+        path = qualify(path);
+
         return null;
     }
 

+ 363 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java

@@ -0,0 +1,363 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+public class SeaweedInputStream extends FSInputStream {
+
+    private final FilerGrpcClient filerGrpcClient;
+    private final Statistics statistics;
+    private final String path;
+    private final FilerProto.Entry entry;
+    private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
+    private final long contentLength;
+    private final int bufferSize; // default buffer size
+    private final int readAheadQueueDepth;         // initialized in constructor
+    private final boolean readAheadEnabled; // whether enable readAhead;
+
+    private byte[] buffer = null;            // will be initialized on first use
+
+    private long fCursor = 0;  // cursor of buffer within file - offset of next byte to read from remote server
+    private long fCursorAfterLastRead = -1;
+    private int bCursor = 0;   // cursor of read within buffer - offset of next byte to be returned from buffer
+    private int limit = 0;     // offset of next byte to be read into buffer from service (i.e., upper marker+1
+    //                                                      of valid bytes in buffer)
+    private boolean closed = false;
+
+    public SeaweedInputStream(
+        final FilerGrpcClient filerGrpcClient,
+        final Statistics statistics,
+        final String path,
+        final FilerProto.Entry entry,
+        final int bufferSize,
+        final int readAheadQueueDepth) {
+        this.filerGrpcClient = filerGrpcClient;
+        this.statistics = statistics;
+        this.path = path;
+        this.entry = entry;
+        this.contentLength = entry.getAttributes().getFileSize();
+        this.bufferSize = bufferSize;
+        this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
+        this.readAheadEnabled = true;
+
+        this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] b = new byte[1];
+        int numberOfBytesRead = read(b, 0, 1);
+        if (numberOfBytesRead < 0) {
+            return -1;
+        } else {
+            return (b[0] & 0xFF);
+        }
+    }
+
+    @Override
+    public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+        int currentOff = off;
+        int currentLen = len;
+        int lastReadBytes;
+        int totalReadBytes = 0;
+        do {
+            lastReadBytes = readOneBlock(b, currentOff, currentLen);
+            if (lastReadBytes > 0) {
+                currentOff += lastReadBytes;
+                currentLen -= lastReadBytes;
+                totalReadBytes += lastReadBytes;
+            }
+            if (currentLen <= 0 || currentLen > b.length - currentOff) {
+                break;
+            }
+        } while (lastReadBytes > 0);
+        return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
+    }
+
+    private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
+        if (closed) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+
+        Preconditions.checkNotNull(b);
+
+        if (len == 0) {
+            return 0;
+        }
+
+        if (this.available() == 0) {
+            return -1;
+        }
+
+        if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        //If buffer is empty, then fill the buffer.
+        if (bCursor == limit) {
+            //If EOF, then return -1
+            if (fCursor >= contentLength) {
+                return -1;
+            }
+
+            long bytesRead = 0;
+            //reset buffer to initial state - i.e., throw away existing data
+            bCursor = 0;
+            limit = 0;
+            if (buffer == null) {
+                buffer = new byte[bufferSize];
+            }
+
+            // Enable readAhead when reading sequentially
+            if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+                bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
+            } else {
+                bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+            }
+
+            if (bytesRead == -1) {
+                return -1;
+            }
+
+            limit += bytesRead;
+            fCursor += bytesRead;
+            fCursorAfterLastRead = fCursor;
+        }
+
+        //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
+        //(bytes returned may be less than requested)
+        int bytesRemaining = limit - bCursor;
+        int bytesToRead = Math.min(len, bytesRemaining);
+        System.arraycopy(buffer, bCursor, b, off, bytesToRead);
+        bCursor += bytesToRead;
+        if (statistics != null) {
+            statistics.incrementBytesRead(bytesToRead);
+        }
+        return bytesToRead;
+    }
+
+
+    private int readInternal(final long position, final byte[] b, final int offset, final int length,
+                             final boolean bypassReadAhead) throws IOException {
+        if (readAheadEnabled && !bypassReadAhead) {
+            // try reading from read-ahead
+            if (offset != 0) {
+                throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
+            }
+            int receivedBytes;
+
+            // queue read-aheads
+            int numReadAheads = this.readAheadQueueDepth;
+            long nextSize;
+            long nextOffset = position;
+            while (numReadAheads > 0 && nextOffset < contentLength) {
+                nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+                ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
+                nextOffset = nextOffset + nextSize;
+                numReadAheads--;
+            }
+
+            // try reading from buffers first
+            receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
+            if (receivedBytes > 0) {
+                return receivedBytes;
+            }
+
+            // got nothing from read-ahead, do our own read now
+            receivedBytes = readRemote(position, b, offset, length);
+            return receivedBytes;
+        } else {
+            return readRemote(position, b, offset, length);
+        }
+    }
+
+    int readRemote(long position, byte[] b, int offset, int length) throws IOException {
+        if (position < 0) {
+            throw new IllegalArgumentException("attempting to read from negative offset");
+        }
+        if (position >= contentLength) {
+            return -1;  // Hadoop prefers -1 to EOFException
+        }
+        if (b == null) {
+            throw new IllegalArgumentException("null byte array passed in to read() method");
+        }
+        if (offset >= b.length) {
+            throw new IllegalArgumentException("offset greater than length of array");
+        }
+        if (length < 0) {
+            throw new IllegalArgumentException("requested read length is less than zero");
+        }
+        if (length > (b.length - offset)) {
+            throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+        }
+
+        long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
+        if (bytesRead > Integer.MAX_VALUE) {
+            throw new IOException("Unexpected Content-Length");
+        }
+        return (int) bytesRead;
+    }
+
+    /**
+     * Seek to given position in stream.
+     *
+     * @param n position to seek to
+     * @throws IOException  if there is an error
+     * @throws EOFException if attempting to seek past end of file
+     */
+    @Override
+    public synchronized void seek(long n) throws IOException {
+        if (closed) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+        if (n < 0) {
+            throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+        }
+        if (n > contentLength) {
+            throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+        }
+
+        if (n >= fCursor - limit && n <= fCursor) { // within buffer
+            bCursor = (int) (n - (fCursor - limit));
+            return;
+        }
+
+        // next read will read from here
+        fCursor = n;
+
+        //invalidate buffer
+        limit = 0;
+        bCursor = 0;
+    }
+
+    @Override
+    public synchronized long skip(long n) throws IOException {
+        if (closed) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+        long currentPos = getPos();
+        if (currentPos == contentLength) {
+            if (n > 0) {
+                throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+            }
+        }
+        long newPos = currentPos + n;
+        if (newPos < 0) {
+            newPos = 0;
+            n = newPos - currentPos;
+        }
+        if (newPos > contentLength) {
+            newPos = contentLength;
+            n = newPos - currentPos;
+        }
+        seek(newPos);
+        return n;
+    }
+
+    /**
+     * Return the size of the remaining available bytes
+     * if the size is less than or equal to {@link Integer#MAX_VALUE},
+     * otherwise, return {@link Integer#MAX_VALUE}.
+     * <p>
+     * This is to match the behavior of DFSInputStream.available(),
+     * which some clients may rely on (HBase write-ahead log reading in
+     * particular).
+     */
+    @Override
+    public synchronized int available() throws IOException {
+        if (closed) {
+            throw new IOException(
+                FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+        final long remaining = this.contentLength - this.getPos();
+        return remaining <= Integer.MAX_VALUE
+            ? (int) remaining : Integer.MAX_VALUE;
+    }
+
+    /**
+     * Returns the length of the file that this stream refers to. Note that the length returned is the length
+     * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+     * they wont be reflected in the returned length.
+     *
+     * @return length of the file.
+     * @throws IOException if the stream is closed
+     */
+    public long length() throws IOException {
+        if (closed) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+        return contentLength;
+    }
+
+    /**
+     * Return the current offset from the start of the file
+     *
+     * @throws IOException throws {@link IOException} if there is an error
+     */
+    @Override
+    public synchronized long getPos() throws IOException {
+        if (closed) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+        return fCursor - limit + bCursor;
+    }
+
+    /**
+     * Seeks a different copy of the data.  Returns true if
+     * found a new source, false otherwise.
+     *
+     * @throws IOException throws {@link IOException} if there is an error
+     */
+    @Override
+    public boolean seekToNewSource(long l) throws IOException {
+        return false;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        closed = true;
+        buffer = null; // de-reference the buffer so it can be GC'ed sooner
+    }
+
+    /**
+     * Not supported by this stream. Throws {@link UnsupportedOperationException}
+     *
+     * @param readlimit ignored
+     */
+    @Override
+    public synchronized void mark(int readlimit) {
+        throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+    }
+
+    /**
+     * Not supported by this stream. Throws {@link UnsupportedOperationException}
+     */
+    @Override
+    public synchronized void reset() throws IOException {
+        throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+    }
+
+    /**
+     * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+     *
+     * @return always {@code false}
+     */
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+}

+ 0 - 1
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java

@@ -68,7 +68,6 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
 
         this.entry = entry;
 
-
     }
 
     private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {

+ 94 - 0
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java

@@ -1,14 +1,86 @@
 package seaweed.hdfs;
 
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerProto;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 
 public class SeaweedRead {
 
+    // returns bytesRead
+    public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
+                            final long position, final byte[] buffer, final int bufferOffset,
+                            final int bufferLength) {
+
+        List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+
+        FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+        for (ChunkView chunkView : chunkViews) {
+            String vid = parseVolumeId(chunkView.fileId);
+            lookupRequest.addVolumeIds(vid);
+        }
+
+        FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+            .getBlockingStub().lookupVolume(lookupRequest.build());
+
+        Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+
+        //TODO parallel this
+        long readCount = 0;
+        for (ChunkView chunkView : chunkViews) {
+            FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
+            if (locations.getLocationsCount() == 0) {
+                // log here!
+                return 0;
+            }
+
+            HttpClient client = HttpClientBuilder.create().build();
+            HttpGet request = new HttpGet(
+                String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+            request.setHeader("Range",
+                String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
+
+            try {
+                HttpResponse response = client.execute(request);
+                HttpEntity entity = response.getEntity();
+
+                readCount += entity.getContent().read(buffer,
+                    (int) (chunkView.logicOffset - position),
+                    (int) (chunkView.logicOffset - position + chunkView.size));
+
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        return readCount;
+    }
+
+    private static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
+        List<ChunkView> views = new ArrayList<>();
+
+        long stop = offset + size;
+        for (VisibleInterval chunk : visibleIntervals) {
+            views.add(new ChunkView(
+                chunk.fileId,
+                offset - chunk.start,
+                Math.min(chunk.stop, stop) - offset,
+                offset
+            ));
+        }
+        return views;
+    }
+
     public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
         FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
         Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@@ -88,6 +160,14 @@ public class SeaweedRead {
         return newVisibles;
     }
 
+    public static String parseVolumeId(String fileId) {
+        int commaIndex = fileId.lastIndexOf(',');
+        if (commaIndex > 0) {
+            return fileId.substring(0, commaIndex);
+        }
+        return fileId;
+    }
+
     public static class VisibleInterval {
         long start;
         long stop;
@@ -102,4 +182,18 @@ public class SeaweedRead {
         }
     }
 
+    public static class ChunkView {
+        String fileId;
+        long offset;
+        long size;
+        long logicOffset;
+
+        public ChunkView(String fileId, long offset, long size, long logicOffset) {
+            this.fileId = fileId;
+            this.offset = offset;
+            this.size = size;
+            this.logicOffset = logicOffset;
+        }
+    }
+
 }