Browse Source

HCFS can read files

Chris Lu 6 years ago
parent
commit
4119c61df8

+ 1 - 0
.gitignore

@@ -79,3 +79,4 @@ test_data
 build
 build
 target
 target
 *.class
 *.class
+other/java/hdfs/dependency-reduced-pom.xml

+ 14 - 3
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@@ -13,9 +13,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URI;
 
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+
 public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
 public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
 
 
     public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
     public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
@@ -23,6 +26,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
     public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
     public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
 
 
     private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
     private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
+    private static int BUFFER_SIZE = 16 * 1024 * 1024;
 
 
     private URI uri;
     private URI uri;
     private Path workingDirectory = new Path("/");
     private Path workingDirectory = new Path("/");
@@ -53,6 +57,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
         port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
         port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
         conf.setInt(FS_SEAWEED_FILER_PORT, port);
         conf.setInt(FS_SEAWEED_FILER_PORT, port);
 
 
+        conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
+
         setConf(conf);
         setConf(conf);
         this.uri = uri;
         this.uri = uri;
 
 
@@ -65,7 +71,12 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
 
 
         path = qualify(path);
         path = qualify(path);
 
 
-        return null;
+        try {
+            InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
+            return new FSDataInputStream(inputStream);
+        } catch (Exception ex) {
+            return null;
+        }
     }
     }
 
 
     public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
     public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
@@ -77,7 +88,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
 
 
         try {
         try {
             String replicaPlacement = String.format("%03d", replication - 1);
             String replicaPlacement = String.format("%03d", replication - 1);
-            OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, replicaPlacement);
+            OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
             return new FSDataOutputStream(outputStream, statistics);
             return new FSDataOutputStream(outputStream, statistics);
         } catch (Exception ex) {
         } catch (Exception ex) {
             return null;
             return null;
@@ -90,7 +101,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
 
 
         path = qualify(path);
         path = qualify(path);
         try {
         try {
-            OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, "");
+            OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
             return new FSDataOutputStream(outputStream, statistics);
             return new FSDataOutputStream(outputStream, statistics);
         } catch (Exception ex) {
         } catch (Exception ex) {
             return null;
             return null;

+ 27 - 4
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java

@@ -1,6 +1,7 @@
 package seaweed.hdfs;
 package seaweed.hdfs;
 
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -9,7 +10,9 @@ import org.slf4j.LoggerFactory;
 import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerProto;
 import seaweedfs.client.FilerProto;
 
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -35,7 +38,6 @@ public class SeaweedFileSystemStore {
         if (isDirectory) {
         if (isDirectory) {
             p = p | 1 << 31;
             p = p | 1 << 31;
         }
         }
-        System.out.println(permission + " = " + p);
         return p;
         return p;
     }
     }
 
 
@@ -126,7 +128,7 @@ public class SeaweedFileSystemStore {
 
 
     private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
     private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
         FilerProto.FuseAttributes attributes = entry.getAttributes();
         FilerProto.FuseAttributes attributes = entry.getAttributes();
-        long length = attributes.getFileSize();
+        long length = SeaweedRead.totalSize(entry.getChunksList());
         boolean isDir = entry.getIsDirectory();
         boolean isDir = entry.getIsDirectory();
         int block_replication = 1;
         int block_replication = 1;
         int blocksize = 512;
         int blocksize = 512;
@@ -206,6 +208,7 @@ public class SeaweedFileSystemStore {
     public OutputStream createFile(final Path path,
     public OutputStream createFile(final Path path,
                                    final boolean overwrite,
                                    final boolean overwrite,
                                    FsPermission permission,
                                    FsPermission permission,
+                                   int bufferSize,
                                    String replication) throws IOException {
                                    String replication) throws IOException {
 
 
         permission = permission == null ? FsPermission.getFileDefault() : permission;
         permission = permission == null ? FsPermission.getFileDefault() : permission;
@@ -226,7 +229,7 @@ public class SeaweedFileSystemStore {
                 entry.mergeFrom(existingEntry);
                 entry.mergeFrom(existingEntry);
                 entry.getAttributesBuilder().setMtime(now);
                 entry.getAttributesBuilder().setMtime(now);
             }
             }
-            writePosition = existingEntry.getAttributes().getFileSize();
+            writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
             replication = existingEntry.getAttributes().getReplication();
             replication = existingEntry.getAttributes().getReplication();
         }
         }
         if (entry == null) {
         if (entry == null) {
@@ -243,7 +246,27 @@ public class SeaweedFileSystemStore {
                 );
                 );
         }
         }
 
 
-        return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, 16 * 1024 * 1024, replication);
+        return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
 
 
     }
     }
+
+    public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
+                                       int bufferSize) throws IOException {
+
+        LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+
+        int readAheadQueueDepth = 2;
+        FilerProto.Entry entry = lookupEntry(path);
+
+        if (entry == null) {
+            throw new FileNotFoundException("read non-exist file " + path);
+        }
+
+        return new SeaweedInputStream(filerGrpcClient,
+            statistics,
+            path.toUri().getPath(),
+            entry,
+            bufferSize,
+            readAheadQueueDepth);
+    }
 }
 }

+ 5 - 1
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java

@@ -6,6 +6,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerProto;
 import seaweedfs.client.FilerProto;
 
 
@@ -15,6 +17,8 @@ import java.util.List;
 
 
 public class SeaweedInputStream extends FSInputStream {
 public class SeaweedInputStream extends FSInputStream {
 
 
+    private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+
     private final FilerGrpcClient filerGrpcClient;
     private final FilerGrpcClient filerGrpcClient;
     private final Statistics statistics;
     private final Statistics statistics;
     private final String path;
     private final String path;
@@ -45,7 +49,7 @@ public class SeaweedInputStream extends FSInputStream {
         this.statistics = statistics;
         this.statistics = statistics;
         this.path = path;
         this.path = path;
         this.entry = entry;
         this.entry = entry;
-        this.contentLength = entry.getAttributes().getFileSize();
+        this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
         this.bufferSize = bufferSize;
         this.bufferSize = bufferSize;
         this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
         this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
         this.readAheadEnabled = true;
         this.readAheadEnabled = true;

+ 30 - 7
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java

@@ -5,6 +5,8 @@ import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerGrpcClient;
 import seaweedfs.client.FilerProto;
 import seaweedfs.client.FilerProto;
 
 
@@ -17,6 +19,8 @@ import java.util.Map;
 
 
 public class SeaweedRead {
 public class SeaweedRead {
 
 
+    private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
     // returns bytesRead
     // returns bytesRead
     public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
     public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
                             final long position, final byte[] buffer, final int bufferOffset,
                             final long position, final byte[] buffer, final int bufferOffset,
@@ -54,9 +58,11 @@ public class SeaweedRead {
                 HttpResponse response = client.execute(request);
                 HttpResponse response = client.execute(request);
                 HttpEntity entity = response.getEntity();
                 HttpEntity entity = response.getEntity();
 
 
-                readCount += entity.getContent().read(buffer,
-                    (int) (chunkView.logicOffset - position),
-                    (int) (chunkView.logicOffset - position + chunkView.size));
+                int len = (int) (chunkView.logicOffset - position + chunkView.size);
+                entity.getContent().read(buffer, bufferOffset, len);
+
+                LOG.debug("* read chunkView:{} length:{} position:{} bufferLength:{}", chunkView, len, position, bufferLength);
+                readCount += len;
 
 
             } catch (IOException e) {
             } catch (IOException e) {
                 e.printStackTrace();
                 e.printStackTrace();
@@ -93,11 +99,17 @@ public class SeaweedRead {
         List<VisibleInterval> newVisibles = new ArrayList<>();
         List<VisibleInterval> newVisibles = new ArrayList<>();
         List<VisibleInterval> visibles = new ArrayList<>();
         List<VisibleInterval> visibles = new ArrayList<>();
         for (FilerProto.FileChunk chunk : chunks) {
         for (FilerProto.FileChunk chunk : chunks) {
+            List<VisibleInterval> t = newVisibles;
             newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk);
             newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk);
-            visibles.clear();
-            List<VisibleInterval> t = visibles;
-            visibles = newVisibles;
-            newVisibles = t;
+            if (t != newVisibles) {
+                // visibles are changed in place
+            } else {
+                // newVisibles are modified
+                visibles.clear();
+                t = visibles;
+                visibles = newVisibles;
+                newVisibles = t;
+            }
         }
         }
 
 
         return visibles;
         return visibles;
@@ -168,6 +180,17 @@ public class SeaweedRead {
         return fileId;
         return fileId;
     }
     }
 
 
+    public static long totalSize(List<FilerProto.FileChunk> chunksList) {
+        long size = 0;
+        for (FilerProto.FileChunk chunk : chunksList) {
+            long t = chunk.getOffset() + chunk.getSize();
+            if (size < t) {
+                size = t;
+            }
+        }
+        return size;
+    }
+
     public static class VisibleInterval {
     public static class VisibleInterval {
         long start;
         long start;
         long stop;
         long stop;