inbox.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package mysql
  2. import (
  3. "context"
  4. "database/sql"
  5. "strings"
  6. "github.com/pkg/errors"
  7. "google.golang.org/protobuf/encoding/protojson"
  8. storepb "github.com/usememos/memos/proto/gen/store"
  9. "github.com/usememos/memos/store"
  10. )
  11. func (d *DB) CreateInbox(ctx context.Context, create *store.Inbox) (*store.Inbox, error) {
  12. messageString := "{}"
  13. if create.Message != nil {
  14. bytes, err := protojson.Marshal(create.Message)
  15. if err != nil {
  16. return nil, errors.Wrap(err, "failed to marshal inbox message")
  17. }
  18. messageString = string(bytes)
  19. }
  20. fields := []string{"`sender_id`", "`receiver_id`", "`status`", "`message`"}
  21. placeholder := []string{"?", "?", "?", "?"}
  22. args := []any{create.SenderID, create.ReceiverID, create.Status, messageString}
  23. stmt := "INSERT INTO `inbox` (" + strings.Join(fields, ", ") + ") VALUES (" + strings.Join(placeholder, ", ") + ")"
  24. result, err := d.db.ExecContext(ctx, stmt, args...)
  25. if err != nil {
  26. return nil, err
  27. }
  28. id, err := result.LastInsertId()
  29. if err != nil {
  30. return nil, err
  31. }
  32. id32 := int32(id)
  33. inbox, err := d.GetInbox(ctx, &store.FindInbox{ID: &id32})
  34. if err != nil {
  35. return nil, err
  36. }
  37. return inbox, nil
  38. }
  39. func (d *DB) ListInboxes(ctx context.Context, find *store.FindInbox) ([]*store.Inbox, error) {
  40. where, args := []string{"1 = 1"}, []any{}
  41. if find.ID != nil {
  42. where, args = append(where, "`id` = ?"), append(args, *find.ID)
  43. }
  44. if find.SenderID != nil {
  45. where, args = append(where, "`sender_id` = ?"), append(args, *find.SenderID)
  46. }
  47. if find.ReceiverID != nil {
  48. where, args = append(where, "`receiver_id` = ?"), append(args, *find.ReceiverID)
  49. }
  50. if find.Status != nil {
  51. where, args = append(where, "`status` = ?"), append(args, *find.Status)
  52. }
  53. query := "SELECT `id`, UNIX_TIMESTAMP(`created_ts`), `sender_id`, `receiver_id`, `status`, `message` FROM `inbox` WHERE " + strings.Join(where, " AND ") + " ORDER BY `created_ts` DESC"
  54. rows, err := d.db.QueryContext(ctx, query, args...)
  55. if err != nil {
  56. return nil, err
  57. }
  58. defer rows.Close()
  59. list := []*store.Inbox{}
  60. for rows.Next() {
  61. inbox := &store.Inbox{}
  62. var messageBytes []byte
  63. if err := rows.Scan(
  64. &inbox.ID,
  65. &inbox.CreatedTs,
  66. &inbox.SenderID,
  67. &inbox.ReceiverID,
  68. &inbox.Status,
  69. &messageBytes,
  70. ); err != nil {
  71. return nil, err
  72. }
  73. message := &storepb.InboxMessage{}
  74. if err := protojsonUnmarshaler.Unmarshal(messageBytes, message); err != nil {
  75. return nil, err
  76. }
  77. inbox.Message = message
  78. list = append(list, inbox)
  79. }
  80. if err := rows.Err(); err != nil {
  81. return nil, err
  82. }
  83. return list, nil
  84. }
  85. func (d *DB) GetInbox(ctx context.Context, find *store.FindInbox) (*store.Inbox, error) {
  86. list, err := d.ListInboxes(ctx, find)
  87. if err != nil {
  88. return nil, errors.Wrap(err, "failed to get inbox")
  89. }
  90. if len(list) != 1 {
  91. return nil, errors.Wrapf(nil, "unexpected inbox count: %d", len(list))
  92. }
  93. return list[0], nil
  94. }
  95. func (d *DB) UpdateInbox(ctx context.Context, update *store.UpdateInbox) (*store.Inbox, error) {
  96. set, args := []string{"`status` = ?"}, []any{update.Status.String()}
  97. args = append(args, update.ID)
  98. query := "UPDATE `inbox` SET " + strings.Join(set, ", ") + " WHERE `id` = ?"
  99. if _, err := d.db.ExecContext(ctx, query, args...); err != nil {
  100. return nil, errors.Wrap(err, "failed to update inbox")
  101. }
  102. inbox, err := d.GetInbox(ctx, &store.FindInbox{ID: &update.ID})
  103. if err != nil {
  104. return nil, err
  105. }
  106. return inbox, nil
  107. }
  108. func (d *DB) DeleteInbox(ctx context.Context, delete *store.DeleteInbox) error {
  109. result, err := d.db.ExecContext(ctx, "DELETE FROM `inbox` WHERE `id` = ?", delete.ID)
  110. if err != nil {
  111. return errors.Wrap(err, "failed to delete inbox")
  112. }
  113. if _, err := result.RowsAffected(); err != nil {
  114. return err
  115. }
  116. return nil
  117. }
  118. func vacuumInbox(ctx context.Context, tx *sql.Tx) error {
  119. stmt := "DELETE FROM `inbox` WHERE `sender_id` NOT IN (SELECT `id` FROM `user`)"
  120. _, err := tx.ExecContext(ctx, stmt)
  121. if err != nil {
  122. return err
  123. }
  124. return nil
  125. }