postgres_native.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package postgres_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. _ "github.com/lib/pq"
  10. _ "path/filepath"
  11. "strings"
  12. )
  13. type DirectoryId int32
  14. func databaseExists(db *sql.DB, databaseName string) (bool, error) {
  15. sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
  16. row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
  17. var dbName string
  18. err := row.Scan(&dbName)
  19. if err != nil {
  20. if err == sql.ErrNoRows {
  21. return false, nil
  22. }
  23. return false, err
  24. }
  25. return true, nil
  26. }
  27. func createDatabase(db *sql.DB, databaseName string) error {
  28. sqlStatement := "CREATE DATABASE %s ENCODING='UTF8'"
  29. _, err := db.Exec(fmt.Sprintf(sqlStatement, databaseName))
  30. return err
  31. }
  32. func getDbConnection(conf PostgresConf) *sql.DB {
  33. _init_db.Do(func() {
  34. sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode)
  35. glog.V(3).Infoln("Opening postgres master database")
  36. var dbErr error
  37. _db_connection, dbErr := sql.Open("postgres", sqlUrl)
  38. if dbErr != nil {
  39. _db_connection.Close()
  40. _db_connection = nil
  41. panic(dbErr)
  42. }
  43. pingErr := _db_connection.Ping()
  44. if pingErr != nil {
  45. _db_connection.Close()
  46. _db_connection = nil
  47. panic(pingErr)
  48. }
  49. glog.V(3).Infoln("Checking to see if DB exists: ", conf.DataBase)
  50. var existsErr error
  51. dbExists, existsErr := databaseExists(_db_connection, conf.DataBase)
  52. if existsErr != nil {
  53. _db_connection.Close()
  54. _db_connection = nil
  55. panic(existsErr)
  56. }
  57. if !dbExists {
  58. glog.V(3).Infoln("Database doesn't exist. Attempting to create one: ", conf.DataBase)
  59. createErr := createDatabase(_db_connection, conf.DataBase)
  60. if createErr != nil {
  61. _db_connection.Close()
  62. _db_connection = nil
  63. panic(createErr)
  64. }
  65. }
  66. glog.V(3).Infoln("Closing master postgres database and opening configured database: ", conf.DataBase)
  67. _db_connection.Close()
  68. _db_connection = nil
  69. sqlUrl = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, conf.DataBase, conf.SslMode)
  70. _db_connection, dbErr = sql.Open("postgres", sqlUrl)
  71. if dbErr != nil {
  72. _db_connection.Close()
  73. _db_connection = nil
  74. panic(dbErr)
  75. }
  76. pingErr = _db_connection.Ping()
  77. if pingErr != nil {
  78. _db_connection.Close()
  79. _db_connection = nil
  80. panic(pingErr)
  81. }
  82. maxIdleConnections, maxOpenConnections := default_maxIdleConnections, default_maxOpenConnections
  83. if conf.MaxIdleConnections != 0 {
  84. maxIdleConnections = conf.MaxIdleConnections
  85. }
  86. if conf.MaxOpenConnections != 0 {
  87. maxOpenConnections = conf.MaxOpenConnections
  88. }
  89. _db_connection.SetMaxIdleConns(maxIdleConnections)
  90. _db_connection.SetMaxOpenConns(maxOpenConnections)
  91. })
  92. return _db_connection
  93. }
  94. var createDirectoryTable = `
  95. CREATE TABLE IF NOT EXISTS %s (
  96. id BIGSERIAL NOT NULL,
  97. directoryRoot VARCHAR(1024) NOT NULL DEFAULT '',
  98. directoryName VARCHAR(1024) NOT NULL DEFAULT '',
  99. CONSTRAINT unique_directory UNIQUE (directoryRoot, directoryName)
  100. );
  101. `
  102. var createFileTable = `
  103. CREATE TABLE IF NOT EXISTS %s (
  104. id BIGSERIAL NOT NULL,
  105. directoryPart VARCHAR(1024) NOT NULL DEFAULT '',
  106. filePart VARCHAR(1024) NOT NULL DEFAULT '',
  107. fid VARCHAR(36) NOT NULL DEFAULT '',
  108. createTime BIGINT NOT NULL DEFAULT 0,
  109. updateTime BIGINT NOT NULL DEFAULT 0,
  110. remark VARCHAR(20) NOT NULL DEFAULT '',
  111. status SMALLINT NOT NULL DEFAULT '1',
  112. PRIMARY KEY (id),
  113. CONSTRAINT %s_unique_file UNIQUE (directoryPart, filePart)
  114. );
  115. `
  116. func (s *PostgresStore) createDirectoriesTable() error {
  117. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", directoriesTableName)
  118. sqlCreate := fmt.Sprintf(createDirectoryTable, directoriesTableName)
  119. stmt, err := s.db.Prepare(sqlCreate)
  120. if err != nil {
  121. return err
  122. }
  123. defer stmt.Close()
  124. _, err = stmt.Exec()
  125. if err != nil {
  126. return err
  127. }
  128. return nil
  129. }
  130. func (s *PostgresStore) createFilesTable() error {
  131. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", filesTableName)
  132. sqlCreate := fmt.Sprintf(createFileTable, filesTableName, filesTableName)
  133. stmt, err := s.db.Prepare(sqlCreate)
  134. if err != nil {
  135. return err
  136. }
  137. defer stmt.Close()
  138. _, err = stmt.Exec()
  139. if err != nil {
  140. return err
  141. }
  142. return nil
  143. }
  144. func (s *PostgresStore) query(uriPath string) (string, error) {
  145. directoryPart, filePart := filepath.Split(uriPath)
  146. sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  147. row := s.db.QueryRow(sqlStatement, directoryPart, filePart)
  148. var fid string
  149. err := row.Scan(&fid)
  150. glog.V(3).Infof("Postgres query -- looking up path '%s' and found id '%s' ", uriPath, fid)
  151. if err != nil {
  152. return "", err
  153. }
  154. return fid, nil
  155. }
  156. func (s *PostgresStore) update(uriPath string, fid string) error {
  157. directoryPart, filePart := filepath.Split(uriPath)
  158. sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE directoryPart=$3 AND filePart=$4", filesTableName)
  159. glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid)
  160. res, err := s.db.Exec(sqlStatement, fid, time.Now().Unix(), directoryPart, filePart)
  161. if err != nil {
  162. return err
  163. }
  164. _, err = res.RowsAffected()
  165. if err != nil {
  166. return err
  167. }
  168. return nil
  169. }
  170. func (s *PostgresStore) insert(uriPath string, fid string) error {
  171. directoryPart, filePart := filepath.Split(uriPath)
  172. existingId, _, _ := s.lookupDirectory(directoryPart)
  173. if existingId == 0 {
  174. s.recursiveInsertDirectory(directoryPart)
  175. }
  176. sqlStatement := fmt.Sprintf("INSERT INTO %s (directoryPart,filePart,fid,createTime) VALUES($1, $2, $3, $4)", filesTableName)
  177. glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid)
  178. res, err := s.db.Exec(sqlStatement, directoryPart, filePart, fid, time.Now().Unix())
  179. if err != nil {
  180. return err
  181. }
  182. rows, err := res.RowsAffected()
  183. if rows != 1 {
  184. return fmt.Errorf("Postgres insert -- rows affected = %d. Expecting 1", rows)
  185. }
  186. if err != nil {
  187. return err
  188. }
  189. return nil
  190. }
  191. func (s *PostgresStore) recursiveInsertDirectory(dirPath string) {
  192. pathParts := strings.Split(dirPath, "/")
  193. var workingPath string = "/"
  194. for _, part := range pathParts {
  195. if part == "" {
  196. continue
  197. }
  198. workingPath += (part + "/")
  199. existingId, _, _ := s.lookupDirectory(workingPath)
  200. if existingId == 0 {
  201. s.insertDirectory(workingPath)
  202. }
  203. }
  204. }
  205. func (s *PostgresStore) insertDirectory(dirPath string) {
  206. pathParts := strings.Split(dirPath, "/")
  207. directoryRoot := "/"
  208. directoryName := ""
  209. if len(pathParts) > 1 {
  210. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  211. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  212. } else if len(pathParts) == 1 {
  213. directoryRoot = "/"
  214. directoryName = pathParts[0] + "/"
  215. }
  216. sqlInsertDirectoryStatement := fmt.Sprintf("INSERT INTO %s (directoryroot, directoryname) "+
  217. "SELECT $1, $2 WHERE NOT EXISTS ( SELECT id FROM %s WHERE directoryroot=$3 AND directoryname=$4 )",
  218. directoriesTableName, directoriesTableName)
  219. glog.V(4).Infof("Postgres query -- Inserting directory (if it doesn't exist) - root = %s, name = %s",
  220. directoryRoot, directoryName)
  221. _, err := s.db.Exec(sqlInsertDirectoryStatement, directoryRoot, directoryName, directoryRoot, directoryName)
  222. if err != nil {
  223. glog.V(0).Infof("Postgres query -- Error inserting directory - root = %s, name = %s: %s",
  224. directoryRoot, directoryName, err)
  225. }
  226. }
  227. func (s *PostgresStore) delete(uriPath string) error {
  228. directoryPart, filePart := filepath.Split(uriPath)
  229. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  230. glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath)
  231. res, err := s.db.Exec(sqlStatement, directoryPart, filePart)
  232. if err != nil {
  233. return err
  234. }
  235. _, err = res.RowsAffected()
  236. if err != nil {
  237. return err
  238. }
  239. return nil
  240. }
  241. func (s *PostgresStore) lookupDirectory(dirPath string) (DirectoryId, string, error) {
  242. directoryRoot, directoryName := s.mySplitPath(dirPath)
  243. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  244. row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
  245. var id DirectoryId
  246. var dirRoot string
  247. var dirName string
  248. err := row.Scan(&id, &dirRoot, &dirName)
  249. glog.V(3).Infof("Postgres lookupDirectory -- looking up directory '%s' and found id '%d', root '%s', name '%s' ", dirPath, id, dirRoot, dirName)
  250. if err != nil {
  251. return 0, "", err
  252. }
  253. return id, filepath.Join(dirRoot, dirName), err
  254. }
  255. func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryName, err error) {
  256. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
  257. rows, err := s.db.Query(sqlStatement, dirPath, limit)
  258. if err != nil {
  259. glog.V(0).Infof("Postgres findDirectories error: %s", err)
  260. }
  261. if rows != nil {
  262. defer rows.Close()
  263. for rows.Next() {
  264. var id DirectoryId
  265. var directoryRoot string
  266. var directoryName string
  267. scanErr := rows.Scan(&id, &directoryRoot, &directoryName)
  268. if scanErr != nil {
  269. err = scanErr
  270. }
  271. dirs = append(dirs, filer.DirectoryName(directoryName))
  272. }
  273. }
  274. return
  275. }
  276. func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bool {
  277. if recursive {
  278. return true
  279. }
  280. sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
  281. row := s.db.QueryRow(sqlStatement, dirPath+"%")
  282. var id DirectoryId
  283. err := row.Scan(&id)
  284. if err != nil {
  285. if err == sql.ErrNoRows {
  286. return true
  287. }
  288. }
  289. return false
  290. }
  291. func (s *PostgresStore) mySplitPath(dirPath string) (directoryRoot string, directoryName string) {
  292. pathParts := strings.Split(dirPath, "/")
  293. directoryRoot = "/"
  294. directoryName = ""
  295. if len(pathParts) > 1 {
  296. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  297. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  298. } else if len(pathParts) == 1 {
  299. directoryRoot = "/"
  300. directoryName = pathParts[0] + "/"
  301. }
  302. return directoryRoot, directoryName
  303. }
  304. func (s *PostgresStore) deleteDirectory(dirPath string, recursive bool) (err error) {
  305. directoryRoot, directoryName := s.mySplitPath(dirPath)
  306. // delete files
  307. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directorypart=$1", filesTableName)
  308. _, err = s.db.Exec(sqlStatement, dirPath)
  309. if err != nil {
  310. return err
  311. }
  312. // delete specific directory if it is empty or recursive delete was requested
  313. safeToDelete := s.safeToDeleteDirectory(dirPath, recursive)
  314. if safeToDelete {
  315. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  316. _, err = s.db.Exec(sqlStatement, directoryRoot, directoryName)
  317. if err != nil {
  318. return err
  319. }
  320. }
  321. if recursive {
  322. // delete descendant files
  323. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directorypart LIKE $1", filesTableName)
  324. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  325. if err != nil {
  326. return err
  327. }
  328. // delete descendant directories
  329. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot LIKE $1", directoriesTableName)
  330. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  331. if err != nil {
  332. return err
  333. }
  334. }
  335. return err
  336. }
  337. func (s *PostgresStore) findFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
  338. var rows *sql.Rows = nil
  339. if lastFileName == "" {
  340. sqlStatement :=
  341. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 ORDER BY id LIMIT $2", filesTableName)
  342. rows, err = s.db.Query(sqlStatement, dirPath, limit)
  343. } else {
  344. sqlStatement :=
  345. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 "+
  346. "AND id > (SELECT id FROM %s WHERE directoryPart=$2 AND filepart=$3) ORDER BY id LIMIT $4",
  347. filesTableName, filesTableName)
  348. _, lastFileNameName := filepath.Split(lastFileName)
  349. rows, err = s.db.Query(sqlStatement, dirPath, dirPath, lastFileNameName, limit)
  350. }
  351. if err != nil {
  352. glog.V(0).Infof("Postgres find files error: %s", err)
  353. }
  354. if rows != nil {
  355. defer rows.Close()
  356. for rows.Next() {
  357. var fid filer.FileId
  358. var directoryPart string
  359. var filePart string
  360. scanErr := rows.Scan(&fid, &directoryPart, &filePart)
  361. if scanErr != nil {
  362. err = scanErr
  363. }
  364. files = append(files, filer.FileEntry{Name: filepath.Join(directoryPart, filePart), Id: fid})
  365. if len(files) >= limit {
  366. break
  367. }
  368. }
  369. }
  370. glog.V(3).Infof("Postgres findFiles -- looking up files under '%s' and found %d files. Limit=%d, lastFileName=%s",
  371. dirPath, len(files), limit, lastFileName)
  372. return files, err
  373. }