server_manager.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package server
  2. import (
  3. "heckel.io/ntfy/v2/log"
  4. "heckel.io/ntfy/v2/util"
  5. "strings"
  6. )
  7. func (s *Server) execManager() {
  8. // WARNING: Make sure to only selectively lock with the mutex, and be aware that this
  9. // there is no mutex for the entire function.
  10. // Prune all the things
  11. s.pruneVisitors()
  12. s.pruneTokens()
  13. s.pruneAttachments()
  14. s.pruneMessages()
  15. s.pruneAndNotifyWebPushSubscriptions()
  16. // Message count per topic
  17. var messagesCached int
  18. messageCounts, err := s.messageCache.MessageCounts()
  19. if err != nil {
  20. log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
  21. messageCounts = make(map[string]int) // Empty, so we can continue
  22. }
  23. for _, count := range messageCounts {
  24. messagesCached += count
  25. }
  26. // Remove subscriptions without subscribers
  27. var emptyTopics, subscribers int
  28. log.
  29. Tag(tagManager).
  30. Timing(func() {
  31. s.mu.Lock()
  32. defer s.mu.Unlock()
  33. for _, t := range s.topics {
  34. subs, lastAccess := t.Stats()
  35. ev := log.Tag(tagManager).With(t)
  36. if t.Stale() {
  37. if ev.IsTrace() {
  38. ev.Trace("- topic %s: Deleting stale topic (%d subscribers, accessed %s)", t.ID, subs, util.FormatTime(lastAccess))
  39. }
  40. emptyTopics++
  41. delete(s.topics, t.ID)
  42. } else {
  43. if ev.IsTrace() {
  44. ev.Trace("- topic %s: %d subscribers, accessed %s", t.ID, subs, util.FormatTime(lastAccess))
  45. }
  46. subscribers += subs
  47. }
  48. }
  49. }).
  50. Debug("Removed %d empty topic(s)", emptyTopics)
  51. // Mail stats
  52. var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
  53. if s.smtpServerBackend != nil {
  54. receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
  55. }
  56. var sentMailTotal, sentMailSuccess, sentMailFailure int64
  57. if s.smtpSender != nil {
  58. sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
  59. }
  60. // Users
  61. var usersCount int64
  62. if s.userManager != nil {
  63. usersCount, err = s.userManager.UsersCount()
  64. if err != nil {
  65. log.Tag(tagManager).Err(err).Warn("Error counting users")
  66. }
  67. }
  68. // Print stats
  69. s.mu.RLock()
  70. messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
  71. s.mu.RUnlock()
  72. // Update stats
  73. s.updateAndWriteStats(messagesCount)
  74. // Log stats
  75. log.
  76. Tag(tagManager).
  77. Fields(log.Context{
  78. "messages_published": messagesCount,
  79. "messages_cached": messagesCached,
  80. "topics_active": topicsCount,
  81. "subscribers": subscribers,
  82. "visitors": visitorsCount,
  83. "users": usersCount,
  84. "emails_received": receivedMailTotal,
  85. "emails_received_success": receivedMailSuccess,
  86. "emails_received_failure": receivedMailFailure,
  87. "emails_sent": sentMailTotal,
  88. "emails_sent_success": sentMailSuccess,
  89. "emails_sent_failure": sentMailFailure,
  90. }).
  91. Info("Server stats")
  92. mset(metricMessagesCached, messagesCached)
  93. mset(metricVisitors, visitorsCount)
  94. mset(metricUsers, usersCount)
  95. mset(metricSubscribers, subscribers)
  96. mset(metricTopics, topicsCount)
  97. }
  98. func (s *Server) pruneVisitors() {
  99. staleVisitors := 0
  100. log.
  101. Tag(tagManager).
  102. Timing(func() {
  103. s.mu.Lock()
  104. defer s.mu.Unlock()
  105. for ip, v := range s.visitors {
  106. if v.Stale() {
  107. log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
  108. delete(s.visitors, ip)
  109. staleVisitors++
  110. }
  111. }
  112. }).
  113. Field("stale_visitors", staleVisitors).
  114. Debug("Deleted %d stale visitor(s)", staleVisitors)
  115. }
  116. func (s *Server) pruneTokens() {
  117. if s.userManager != nil {
  118. log.
  119. Tag(tagManager).
  120. Timing(func() {
  121. if err := s.userManager.RemoveExpiredTokens(); err != nil {
  122. log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
  123. }
  124. if err := s.userManager.RemoveDeletedUsers(); err != nil {
  125. log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
  126. }
  127. }).
  128. Debug("Removed expired tokens and users")
  129. }
  130. }
  131. func (s *Server) pruneAttachments() {
  132. if s.fileCache == nil {
  133. return
  134. }
  135. log.
  136. Tag(tagManager).
  137. Timing(func() {
  138. ids, err := s.messageCache.AttachmentsExpired()
  139. if err != nil {
  140. log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
  141. } else if len(ids) > 0 {
  142. if log.Tag(tagManager).IsDebug() {
  143. log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
  144. }
  145. if err := s.fileCache.Remove(ids...); err != nil {
  146. log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
  147. }
  148. if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
  149. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  150. }
  151. } else {
  152. log.Tag(tagManager).Debug("No expired attachments to delete")
  153. }
  154. }).
  155. Debug("Deleted expired attachments")
  156. }
  157. func (s *Server) pruneMessages() {
  158. log.
  159. Tag(tagManager).
  160. Timing(func() {
  161. expiredMessageIDs, err := s.messageCache.MessagesExpired()
  162. if err != nil {
  163. log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
  164. } else if len(expiredMessageIDs) > 0 {
  165. if s.fileCache != nil {
  166. if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
  167. log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
  168. }
  169. }
  170. if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
  171. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  172. }
  173. } else {
  174. log.Tag(tagManager).Debug("No expired messages to delete")
  175. }
  176. }).
  177. Debug("Pruned messages")
  178. }