migrator.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package store
  2. import (
  3. "context"
  4. "database/sql"
  5. "embed"
  6. "fmt"
  7. "io/fs"
  8. "log/slog"
  9. "path/filepath"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "github.com/pkg/errors"
  14. storepb "github.com/usememos/memos/proto/gen/store"
  15. "github.com/usememos/memos/server/version"
  16. )
  17. //go:embed migration
  18. var migrationFS embed.FS
  19. //go:embed seed
  20. var seedFS embed.FS
  21. const (
  22. // MigrateFileNameSplit is the split character between the patch version and the description in the migration file name.
  23. // For example, "1__create_table.sql".
  24. MigrateFileNameSplit = "__"
  25. // LatestSchemaFileName is the name of the latest schema file.
  26. // This file is used to apply the latest schema when no migration history is found.
  27. LatestSchemaFileName = "LATEST.sql"
  28. )
  29. // Migrate applies the latest schema to the database.
  30. func (s *Store) Migrate(ctx context.Context) error {
  31. if err := s.preMigrate(ctx); err != nil {
  32. return errors.Wrap(err, "failed to pre-migrate")
  33. }
  34. if s.Profile.Mode == "prod" {
  35. migrationHistoryList, err := s.driver.FindMigrationHistoryList(ctx, &FindMigrationHistory{})
  36. if err != nil {
  37. return errors.Wrap(err, "failed to find migration history")
  38. }
  39. if len(migrationHistoryList) == 0 {
  40. return errors.Errorf("no migration history found")
  41. }
  42. migrationHistoryVersions := []string{}
  43. for _, migrationHistory := range migrationHistoryList {
  44. migrationHistoryVersions = append(migrationHistoryVersions, migrationHistory.Version)
  45. }
  46. sort.Sort(version.SortVersion(migrationHistoryVersions))
  47. latestMigrationHistoryVersion := migrationHistoryVersions[len(migrationHistoryVersions)-1]
  48. schemaVersion, err := s.GetCurrentSchemaVersion()
  49. if err != nil {
  50. return errors.Wrap(err, "failed to get current schema version")
  51. }
  52. if version.IsVersionGreaterThan(schemaVersion, latestMigrationHistoryVersion) {
  53. filePaths, err := fs.Glob(migrationFS, fmt.Sprintf("%s*/*.sql", s.getMigrationBasePath()))
  54. if err != nil {
  55. return errors.Wrap(err, "failed to read migration files")
  56. }
  57. sort.Strings(filePaths)
  58. // Start a transaction to apply the latest schema.
  59. tx, err := s.driver.GetDB().Begin()
  60. if err != nil {
  61. return errors.Wrap(err, "failed to start transaction")
  62. }
  63. defer tx.Rollback()
  64. slog.Info("start migration", slog.String("currentSchemaVersion", latestMigrationHistoryVersion), slog.String("targetSchemaVersion", schemaVersion))
  65. for _, filePath := range filePaths {
  66. fileSchemaVersion, err := s.getSchemaVersionOfMigrateScript(filePath)
  67. if err != nil {
  68. return errors.Wrap(err, "failed to get schema version of migrate script")
  69. }
  70. if version.IsVersionGreaterThan(fileSchemaVersion, latestMigrationHistoryVersion) && version.IsVersionGreaterOrEqualThan(schemaVersion, fileSchemaVersion) {
  71. bytes, err := migrationFS.ReadFile(filePath)
  72. if err != nil {
  73. return errors.Wrapf(err, "failed to read minor version migration file: %s", filePath)
  74. }
  75. stmt := string(bytes)
  76. if err := s.execute(ctx, tx, stmt); err != nil {
  77. return errors.Wrapf(err, "migrate error: %s", stmt)
  78. }
  79. }
  80. }
  81. if err := tx.Commit(); err != nil {
  82. return errors.Wrap(err, "failed to commit transaction")
  83. }
  84. slog.Info("end migrate")
  85. // Upsert the current schema version to migration_history.
  86. // TODO: retire using migration history later.
  87. if _, err = s.driver.UpsertMigrationHistory(ctx, &UpsertMigrationHistory{
  88. Version: schemaVersion,
  89. }); err != nil {
  90. return errors.Wrapf(err, "failed to upsert migration history with version: %s", schemaVersion)
  91. }
  92. if err := s.updateCurrentSchemaVersion(ctx, schemaVersion); err != nil {
  93. return errors.Wrap(err, "failed to update current schema version")
  94. }
  95. }
  96. } else if s.Profile.Mode == "demo" {
  97. // In demo mode, we should seed the database.
  98. if err := s.seed(ctx); err != nil {
  99. return errors.Wrap(err, "failed to seed")
  100. }
  101. }
  102. return nil
  103. }
  104. func (s *Store) preMigrate(ctx context.Context) error {
  105. // TODO: using schema version in basic setting instead of migration history.
  106. migrationHistoryList, err := s.driver.FindMigrationHistoryList(ctx, &FindMigrationHistory{})
  107. // If any error occurs or no migration history found, apply the latest schema.
  108. if err != nil || len(migrationHistoryList) == 0 {
  109. if err != nil {
  110. slog.Warn("failed to find migration history in pre-migrate", slog.String("error", err.Error()))
  111. }
  112. filePath := s.getMigrationBasePath() + LatestSchemaFileName
  113. bytes, err := migrationFS.ReadFile(filePath)
  114. if err != nil {
  115. return errors.Errorf("failed to read latest schema file: %s", err)
  116. }
  117. schemaVersion, err := s.GetCurrentSchemaVersion()
  118. if err != nil {
  119. return errors.Wrap(err, "failed to get current schema version")
  120. }
  121. // Start a transaction to apply the latest schema.
  122. tx, err := s.driver.GetDB().Begin()
  123. if err != nil {
  124. return errors.Wrap(err, "failed to start transaction")
  125. }
  126. defer tx.Rollback()
  127. if err := s.execute(ctx, tx, string(bytes)); err != nil {
  128. return errors.Errorf("failed to execute SQL file %s, err %s", filePath, err)
  129. }
  130. if err := tx.Commit(); err != nil {
  131. return errors.Wrap(err, "failed to commit transaction")
  132. }
  133. // TODO: using schema version in basic setting instead of migration history.
  134. if _, err := s.driver.UpsertMigrationHistory(ctx, &UpsertMigrationHistory{
  135. Version: schemaVersion,
  136. }); err != nil {
  137. return errors.Wrap(err, "failed to upsert migration history")
  138. }
  139. if err := s.updateCurrentSchemaVersion(ctx, schemaVersion); err != nil {
  140. return errors.Wrap(err, "failed to update current schema version")
  141. }
  142. }
  143. if s.Profile.Mode == "prod" {
  144. if err := s.normalizedMigrationHistoryList(ctx); err != nil {
  145. return errors.Wrap(err, "failed to normalize migration history list")
  146. }
  147. }
  148. return nil
  149. }
  150. func (s *Store) getMigrationBasePath() string {
  151. return fmt.Sprintf("migration/%s/", s.Profile.Driver)
  152. }
  153. func (s *Store) getSeedBasePath() string {
  154. return fmt.Sprintf("seed/%s/", s.Profile.Driver)
  155. }
  156. func (s *Store) seed(ctx context.Context) error {
  157. // Only seed for SQLite.
  158. if s.Profile.Driver != "sqlite" {
  159. slog.Warn("seed is only supported for SQLite")
  160. return nil
  161. }
  162. filenames, err := fs.Glob(seedFS, fmt.Sprintf("%s*.sql", s.getSeedBasePath()))
  163. if err != nil {
  164. return errors.Wrap(err, "failed to read seed files")
  165. }
  166. // Sort seed files by name. This is important to ensure that seed files are applied in order.
  167. sort.Strings(filenames)
  168. // Start a transaction to apply the seed files.
  169. tx, err := s.driver.GetDB().Begin()
  170. if err != nil {
  171. return errors.Wrap(err, "failed to start transaction")
  172. }
  173. defer tx.Rollback()
  174. // Loop over all seed files and execute them in order.
  175. for _, filename := range filenames {
  176. bytes, err := seedFS.ReadFile(filename)
  177. if err != nil {
  178. return errors.Wrapf(err, "failed to read seed file, filename=%s", filename)
  179. }
  180. if err := s.execute(ctx, tx, string(bytes)); err != nil {
  181. return errors.Wrapf(err, "seed error: %s", filename)
  182. }
  183. }
  184. return tx.Commit()
  185. }
  186. func (s *Store) GetCurrentSchemaVersion() (string, error) {
  187. currentVersion := version.GetCurrentVersion(s.Profile.Mode)
  188. minorVersion := version.GetMinorVersion(currentVersion)
  189. filePaths, err := fs.Glob(migrationFS, fmt.Sprintf("%s%s/*.sql", s.getMigrationBasePath(), minorVersion))
  190. if err != nil {
  191. return "", errors.Wrap(err, "failed to read migration files")
  192. }
  193. sort.Strings(filePaths)
  194. if len(filePaths) == 0 {
  195. return fmt.Sprintf("%s.0", minorVersion), nil
  196. }
  197. return s.getSchemaVersionOfMigrateScript(filePaths[len(filePaths)-1])
  198. }
  199. func (s *Store) getSchemaVersionOfMigrateScript(filePath string) (string, error) {
  200. // If the file is the latest schema file, return the current schema version.
  201. if strings.HasSuffix(filePath, LatestSchemaFileName) {
  202. return s.GetCurrentSchemaVersion()
  203. }
  204. normalizedPath := filepath.ToSlash(filePath)
  205. elements := strings.Split(normalizedPath, "/")
  206. if len(elements) < 2 {
  207. return "", errors.Errorf("invalid file path: %s", filePath)
  208. }
  209. minorVersion := elements[len(elements)-2]
  210. rawPatchVersion := strings.Split(elements[len(elements)-1], MigrateFileNameSplit)[0]
  211. patchVersion, err := strconv.Atoi(rawPatchVersion)
  212. if err != nil {
  213. return "", errors.Wrapf(err, "failed to convert patch version to int: %s", rawPatchVersion)
  214. }
  215. return fmt.Sprintf("%s.%d", minorVersion, patchVersion+1), nil
  216. }
  217. // execute runs a single SQL statement within a transaction.
  218. func (*Store) execute(ctx context.Context, tx *sql.Tx, stmt string) error {
  219. if _, err := tx.ExecContext(ctx, stmt); err != nil {
  220. return errors.Wrap(err, "failed to execute statement")
  221. }
  222. return nil
  223. }
  224. func (s *Store) normalizedMigrationHistoryList(ctx context.Context) error {
  225. migrationHistoryList, err := s.driver.FindMigrationHistoryList(ctx, &FindMigrationHistory{})
  226. if err != nil {
  227. return errors.Wrap(err, "failed to find migration history")
  228. }
  229. versions := []string{}
  230. for _, migrationHistory := range migrationHistoryList {
  231. versions = append(versions, migrationHistory.Version)
  232. }
  233. sort.Sort(version.SortVersion(versions))
  234. latestVersion := versions[len(versions)-1]
  235. latestMinorVersion := version.GetMinorVersion(latestVersion)
  236. // If the latest version is greater than 0.22, return.
  237. // As of 0.22, the migration history is already normalized.
  238. if version.IsVersionGreaterThan(latestMinorVersion, "0.22") {
  239. return nil
  240. }
  241. schemaVersionMap := map[string]string{}
  242. filePaths, err := fs.Glob(migrationFS, fmt.Sprintf("%s*/*.sql", s.getMigrationBasePath()))
  243. if err != nil {
  244. return errors.Wrap(err, "failed to read migration files")
  245. }
  246. sort.Strings(filePaths)
  247. for _, filePath := range filePaths {
  248. fileSchemaVersion, err := s.getSchemaVersionOfMigrateScript(filePath)
  249. if err != nil {
  250. return errors.Wrap(err, "failed to get schema version of migrate script")
  251. }
  252. schemaVersionMap[version.GetMinorVersion(fileSchemaVersion)] = fileSchemaVersion
  253. }
  254. latestSchemaVersion := schemaVersionMap[latestMinorVersion]
  255. if latestSchemaVersion == "" {
  256. return errors.Errorf("latest schema version not found")
  257. }
  258. if version.IsVersionGreaterOrEqualThan(latestVersion, latestSchemaVersion) {
  259. return nil
  260. }
  261. // Start a transaction to insert the latest schema version to migration_history.
  262. tx, err := s.driver.GetDB().Begin()
  263. if err != nil {
  264. return errors.Wrap(err, "failed to start transaction")
  265. }
  266. defer tx.Rollback()
  267. if err := s.execute(ctx, tx, fmt.Sprintf("INSERT INTO migration_history (version) VALUES ('%s')", latestSchemaVersion)); err != nil {
  268. return errors.Wrap(err, "failed to insert migration history")
  269. }
  270. return tx.Commit()
  271. }
  272. func (s *Store) updateCurrentSchemaVersion(ctx context.Context, schemaVersion string) error {
  273. workspaceBasicSetting, err := s.GetWorkspaceBasicSetting(ctx)
  274. if err != nil {
  275. return errors.Wrap(err, "failed to get workspace basic setting")
  276. }
  277. workspaceBasicSetting.SchemaVersion = schemaVersion
  278. if _, err := s.UpsertWorkspaceSetting(ctx, &storepb.WorkspaceSetting{
  279. Key: storepb.WorkspaceSettingKey_BASIC,
  280. Value: &storepb.WorkspaceSetting_BasicSetting{BasicSetting: workspaceBasicSetting},
  281. }); err != nil {
  282. return errors.Wrap(err, "failed to upsert workspace setting")
  283. }
  284. return nil
  285. }