Browse Source

support atomic renaming for mysql/postgres filer store

Chris Lu 6 years ago
parent
commit
97406333a5

+ 1 - 1
other/java/client/pom.xml

@@ -4,7 +4,7 @@
 
     <groupId>com.github.chrislusf</groupId>
     <artifactId>seaweedfs-client</artifactId>
-    <version>1.0.8</version>
+    <version>1.0.9</version>
 
     <parent>
         <groupId>org.sonatype.oss</groupId>

+ 85 - 56
other/java/client/src/main/java/seaweedfs/client/FilerClient.java

@@ -51,12 +51,26 @@ public class FilerClient {
         }
 
         return createEntry(
-            parent,
-            newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
+                parent,
+                newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
         );
 
     }
 
+    public boolean mv(String oldPath, String newPath) {
+
+        Path oldPathObject = Paths.get(oldPath);
+        String oldParent = oldPathObject.getParent().toString();
+        String oldName = oldPathObject.getFileName().toString();
+
+        Path newPathObject = Paths.get(newPath);
+        String newParent = newPathObject.getParent().toString();
+        String newName = newPathObject.getFileName().toString();
+
+        return atomicRenameEntry(oldParent, oldName, newParent, newName);
+
+    }
+
     public boolean rm(String path, boolean isRecursive) {
 
         Path pathObject = Paths.get(path);
@@ -64,10 +78,10 @@ public class FilerClient {
         String name = pathObject.getFileName().toString();
 
         return deleteEntry(
-            parent,
-            name,
-            true,
-            isRecursive);
+                parent,
+                name,
+                true,
+                isRecursive);
     }
 
     public boolean touch(String path, int mode) {
@@ -84,18 +98,18 @@ public class FilerClient {
         FilerProto.Entry entry = lookupEntry(parent, name);
         if (entry == null) {
             return createEntry(
-                parent,
-                newFileEntry(name, mode, uid, gid, userName, groupNames).build()
+                    parent,
+                    newFileEntry(name, mode, uid, gid, userName, groupNames).build()
             );
         }
         long now = System.currentTimeMillis() / 1000L;
         FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder()
-            .setMtime(now)
-            .setUid(uid)
-            .setGid(gid)
-            .setUserName(userName)
-            .clearGroupName()
-            .addAllGroupName(Arrays.asList(groupNames));
+                .setMtime(now)
+                .setUid(uid)
+                .setGid(gid)
+                .setUserName(userName)
+                .clearGroupName()
+                .addAllGroupName(Arrays.asList(groupNames));
         return updateEntry(parent, entry.toBuilder().setAttributes(attr).build());
     }
 
@@ -105,17 +119,17 @@ public class FilerClient {
         long now = System.currentTimeMillis() / 1000L;
 
         return FilerProto.Entry.newBuilder()
-            .setName(name)
-            .setIsDirectory(true)
-            .setAttributes(FilerProto.FuseAttributes.newBuilder()
-                .setMtime(now)
-                .setCrtime(now)
-                .setUid(uid)
-                .setGid(gid)
-                .setFileMode(mode | 1 << 31)
-                .setUserName(userName)
-                .clearGroupName()
-                .addAllGroupName(Arrays.asList(groupNames)));
+                .setName(name)
+                .setIsDirectory(true)
+                .setAttributes(FilerProto.FuseAttributes.newBuilder()
+                        .setMtime(now)
+                        .setCrtime(now)
+                        .setUid(uid)
+                        .setGid(gid)
+                        .setFileMode(mode | 1 << 31)
+                        .setUserName(userName)
+                        .clearGroupName()
+                        .addAllGroupName(Arrays.asList(groupNames)));
     }
 
     public FilerProto.Entry.Builder newFileEntry(String name, int mode,
@@ -124,17 +138,17 @@ public class FilerClient {
         long now = System.currentTimeMillis() / 1000L;
 
         return FilerProto.Entry.newBuilder()
-            .setName(name)
-            .setIsDirectory(false)
-            .setAttributes(FilerProto.FuseAttributes.newBuilder()
-                .setMtime(now)
-                .setCrtime(now)
-                .setUid(uid)
-                .setGid(gid)
-                .setFileMode(mode)
-                .setUserName(userName)
-                .clearGroupName()
-                .addAllGroupName(Arrays.asList(groupNames)));
+                .setName(name)
+                .setIsDirectory(false)
+                .setAttributes(FilerProto.FuseAttributes.newBuilder()
+                        .setMtime(now)
+                        .setCrtime(now)
+                        .setUid(uid)
+                        .setGid(gid)
+                        .setFileMode(mode)
+                        .setUserName(userName)
+                        .clearGroupName()
+                        .addAllGroupName(Arrays.asList(groupNames)));
     }
 
     public List<FilerProto.Entry> listEntries(String path) {
@@ -160,20 +174,20 @@ public class FilerClient {
 
     public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
         return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
-            .setDirectory(path)
-            .setPrefix(entryPrefix)
-            .setStartFromFileName(lastEntryName)
-            .setLimit(limit)
-            .build()).getEntriesList();
+                .setDirectory(path)
+                .setPrefix(entryPrefix)
+                .setStartFromFileName(lastEntryName)
+                .setLimit(limit)
+                .build()).getEntriesList();
     }
 
     public FilerProto.Entry lookupEntry(String directory, String entryName) {
         try {
             return filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
-                FilerProto.LookupDirectoryEntryRequest.newBuilder()
-                    .setDirectory(directory)
-                    .setName(entryName)
-                    .build()).getEntry();
+                    FilerProto.LookupDirectoryEntryRequest.newBuilder()
+                            .setDirectory(directory)
+                            .setName(entryName)
+                            .build()).getEntry();
         } catch (Exception e) {
             LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
             return null;
@@ -184,9 +198,9 @@ public class FilerClient {
     public boolean createEntry(String parent, FilerProto.Entry entry) {
         try {
             filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
-                .setDirectory(parent)
-                .setEntry(entry)
-                .build());
+                    .setDirectory(parent)
+                    .setEntry(entry)
+                    .build());
         } catch (Exception e) {
             LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
             return false;
@@ -197,9 +211,9 @@ public class FilerClient {
     public boolean updateEntry(String parent, FilerProto.Entry entry) {
         try {
             filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
-                .setDirectory(parent)
-                .setEntry(entry)
-                .build());
+                    .setDirectory(parent)
+                    .setEntry(entry)
+                    .build());
         } catch (Exception e) {
             LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
             return false;
@@ -210,11 +224,11 @@ public class FilerClient {
     public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) {
         try {
             filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
-                .setDirectory(parent)
-                .setName(entryName)
-                .setIsDeleteData(isDeleteFileChunk)
-                .setIsRecursive(isRecursive)
-                .build());
+                    .setDirectory(parent)
+                    .setName(entryName)
+                    .setIsDeleteData(isDeleteFileChunk)
+                    .setIsRecursive(isRecursive)
+                    .build());
         } catch (Exception e) {
             LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e);
             return false;
@@ -222,4 +236,19 @@ public class FilerClient {
         return true;
     }
 
+    public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
+        try {
+            filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
+                    .setOldDirectory(oldParent)
+                    .setOldName(oldName)
+                    .setNewDirectory(newParent)
+                    .setNewName(newName)
+                    .build());
+        } catch (Exception e) {
+            LOG.warn("atomicRenameEntry {}/{} => {}/{}: {}", oldParent, oldName, newParent, newName, e);
+            return false;
+        }
+        return true;
+    }
+
 }

+ 13 - 0
other/java/client/src/main/proto/filer.proto

@@ -24,6 +24,9 @@ service SeaweedFiler {
     rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
     }
 
+    rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
+    }
+
     rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) {
     }
 
@@ -126,6 +129,16 @@ message DeleteEntryRequest {
 message DeleteEntryResponse {
 }
 
+message AtomicRenameEntryRequest {
+    string old_directory = 1;
+    string old_name = 2;
+    string new_directory = 3;
+    string new_name = 4;
+}
+
+message AtomicRenameEntryResponse {
+}
+
 message AssignVolumeRequest {
     int32 count = 1;
     string collection = 2;

+ 1 - 1
other/java/hdfs/pom.xml

@@ -5,7 +5,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <properties>
-        <seaweedfs.client.version>1.0.8</seaweedfs.client.version>
+        <seaweedfs.client.version>1.0.9</seaweedfs.client.version>
         <hadoop.version>3.1.1</hadoop.version>
     </properties>
 

+ 1 - 29
other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java

@@ -151,35 +151,7 @@ public class SeaweedFileSystemStore {
             LOG.warn("rename non-existing source: {}", source);
             return;
         }
-        LOG.warn("rename moveEntry source: {}", source);
-        moveEntry(source.getParent(), entry, destination);
-    }
-
-    private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) {
-
-        LOG.debug("moveEntry: {}/{}  => {}", oldParent, entry.getName(), destination);
-
-        FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
-        boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build());
-
-        if (!isDirectoryCreated) {
-            return false;
-        }
-
-        if (entry.getIsDirectory()) {
-            Path entryPath = new Path(oldParent, entry.getName());
-            List<FilerProto.Entry> entries = filerClient.listEntries(entryPath.toUri().getPath());
-            for (FilerProto.Entry ent : entries) {
-                boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName()));
-                if (!isSucess) {
-                    return false;
-                }
-            }
-        }
-
-        return filerClient.deleteEntry(
-            oldParent.toUri().getPath(), entry.getName(), false, false);
-
+        filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
     }
 
     public OutputStream createFile(final Path path,

+ 39 - 5
weed/filer2/abstract_sql/abstract_sql_store.go

@@ -19,6 +19,40 @@ type AbstractSqlStore struct {
 	SqlListInclusive string
 }
 
+type TxOrDB interface {
+	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
+	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
+}
+
+func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+	tx, err := store.DB.BeginTx(ctx, nil)
+	if err != nil {
+		return ctx, err
+	}
+
+	return context.WithValue(ctx, "tx", tx), nil
+}
+func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
+	if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+		return tx.Commit()
+	}
+	return nil
+}
+func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
+	if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+		return tx.Rollback()
+	}
+	return nil
+}
+
+func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
+	if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+		return tx
+	}
+	return store.DB
+}
+
 func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
 
 	dir, name := entry.FullPath.DirAndName()
@@ -27,7 +61,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.En
 		return fmt.Errorf("encode %s: %s", entry.FullPath, err)
 	}
 
-	res, err := store.DB.Exec(store.SqlInsert, hashToLong(dir), name, dir, meta)
+	res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta)
 	if err != nil {
 		return fmt.Errorf("insert %s: %s", entry.FullPath, err)
 	}
@@ -47,7 +81,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
 		return fmt.Errorf("encode %s: %s", entry.FullPath, err)
 	}
 
-	res, err := store.DB.Exec(store.SqlUpdate, meta, hashToLong(dir), name, dir)
+	res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir)
 	if err != nil {
 		return fmt.Errorf("update %s: %s", entry.FullPath, err)
 	}
@@ -62,7 +96,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
 func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
 
 	dir, name := fullpath.DirAndName()
-	row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir)
+	row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir)
 	var data []byte
 	if err := row.Scan(&data); err != nil {
 		return nil, filer2.ErrNotFound
@@ -82,7 +116,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
 
 	dir, name := fullpath.DirAndName()
 
-	res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir)
+	res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir)
 	if err != nil {
 		return fmt.Errorf("delete %s: %s", fullpath, err)
 	}
@@ -102,7 +136,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
 		sqlText = store.SqlListInclusive
 	}
 
-	rows, err := store.DB.Query(sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
+	rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
 	if err != nil {
 		return nil, fmt.Errorf("list %s : %v", fullpath, err)
 	}

+ 10 - 0
weed/filer2/cassandra/cassandra_store.go

@@ -40,6 +40,16 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er
 	return
 }
 
+func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error){
+	return ctx, nil
+}
+func (store *CassandraStore) CommitTransaction(ctx context.Context) error{
+	return nil
+}
+func (store *CassandraStore) RollbackTransaction(ctx context.Context) error{
+	return nil
+}
+
 func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
 
 	dir, name := entry.FullPath.DirAndName()

+ 12 - 0
weed/filer2/filer.go

@@ -57,6 +57,18 @@ func (fs *Filer) KeepConnectedToMaster() {
 	fs.MasterClient.KeepConnectedToMaster()
 }
 
+func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
+	return f.store.BeginTransaction(ctx)
+}
+
+func (f *Filer) CommitTransaction(ctx context.Context) error {
+	return f.store.CommitTransaction(ctx)
+}
+
+func (f *Filer) RollbackTransaction(ctx context.Context) error {
+	return f.store.RollbackTransaction(ctx)
+}
+
 func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
 
 	if string(entry.FullPath) == "/" {

+ 4 - 0
weed/filer2/filerstore.go

@@ -17,6 +17,10 @@ type FilerStore interface {
 	FindEntry(context.Context, FullPath) (entry *Entry, err error)
 	DeleteEntry(context.Context, FullPath) (err error)
 	ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+
+	BeginTransaction(ctx context.Context) (context.Context, error)
+	CommitTransaction(ctx context.Context) error
+	RollbackTransaction(ctx context.Context) error
 }
 
 var ErrNotFound = errors.New("filer: no entry is found in filer store")

+ 9 - 4
weed/filer2/fullpath.go

@@ -8,10 +8,7 @@ import (
 type FullPath string
 
 func NewFullPath(dir, name string) FullPath {
-	if strings.HasSuffix(dir, "/") {
-		return FullPath(dir + name)
-	}
-	return FullPath(dir + "/" + name)
+	return FullPath(dir).Child(name)
 }
 
 func (fp FullPath) DirAndName() (string, string) {
@@ -29,3 +26,11 @@ func (fp FullPath) Name() string {
 	_, name := filepath.Split(string(fp))
 	return name
 }
+
+func (fp FullPath) Child(name string) FullPath {
+	dir := string(fp)
+	if strings.HasSuffix(dir, "/") {
+		return FullPath(dir + name)
+	}
+	return FullPath(dir + "/" + name)
+}

Some files were not shown because too many files changed in this diff