timeoutCache.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. // Package cache implements caches to be used in gRPC.
  17. package cache
  18. import (
  19. "sync"
  20. "time"
  21. )
  22. type cacheEntry struct {
  23. item interface{}
  24. // Note that to avoid deadlocks (potentially caused by lock ordering),
  25. // callback can only be called without holding cache's mutex.
  26. callback func()
  27. timer *time.Timer
  28. // deleted is set to true in Remove() when the call to timer.Stop() fails.
  29. // This can happen when the timer in the cache entry fires around the same
  30. // time that timer.stop() is called in Remove().
  31. deleted bool
  32. }
  33. // TimeoutCache is a cache with items to be deleted after a timeout.
  34. type TimeoutCache struct {
  35. mu sync.Mutex
  36. timeout time.Duration
  37. cache map[interface{}]*cacheEntry
  38. }
  39. // NewTimeoutCache creates a TimeoutCache with the given timeout.
  40. func NewTimeoutCache(timeout time.Duration) *TimeoutCache {
  41. return &TimeoutCache{
  42. timeout: timeout,
  43. cache: make(map[interface{}]*cacheEntry),
  44. }
  45. }
  46. // Add adds an item to the cache, with the specified callback to be called when
  47. // the item is removed from the cache upon timeout. If the item is removed from
  48. // the cache using a call to Remove before the timeout expires, the callback
  49. // will not be called.
  50. //
  51. // If the Add was successful, it returns (newly added item, true). If there is
  52. // an existing entry for the specified key, the cache entry is not be updated
  53. // with the specified item and it returns (existing item, false).
  54. func (c *TimeoutCache) Add(key, item interface{}, callback func()) (interface{}, bool) {
  55. c.mu.Lock()
  56. defer c.mu.Unlock()
  57. if e, ok := c.cache[key]; ok {
  58. return e.item, false
  59. }
  60. entry := &cacheEntry{
  61. item: item,
  62. callback: callback,
  63. }
  64. entry.timer = time.AfterFunc(c.timeout, func() {
  65. c.mu.Lock()
  66. if entry.deleted {
  67. c.mu.Unlock()
  68. // Abort the delete since this has been taken care of in Remove().
  69. return
  70. }
  71. delete(c.cache, key)
  72. c.mu.Unlock()
  73. entry.callback()
  74. })
  75. c.cache[key] = entry
  76. return item, true
  77. }
  78. // Remove the item with the key from the cache.
  79. //
  80. // If the specified key exists in the cache, it returns (item associated with
  81. // key, true) and the callback associated with the item is guaranteed to be not
  82. // called. If the given key is not found in the cache, it returns (nil, false)
  83. func (c *TimeoutCache) Remove(key interface{}) (item interface{}, ok bool) {
  84. c.mu.Lock()
  85. defer c.mu.Unlock()
  86. entry, ok := c.removeInternal(key)
  87. if !ok {
  88. return nil, false
  89. }
  90. return entry.item, true
  91. }
  92. // removeInternal removes and returns the item with key.
  93. //
  94. // caller must hold c.mu.
  95. func (c *TimeoutCache) removeInternal(key interface{}) (*cacheEntry, bool) {
  96. entry, ok := c.cache[key]
  97. if !ok {
  98. return nil, false
  99. }
  100. delete(c.cache, key)
  101. if !entry.timer.Stop() {
  102. // If stop was not successful, the timer has fired (this can only happen
  103. // in a race). But the deleting function is blocked on c.mu because the
  104. // mutex was held by the caller of this function.
  105. //
  106. // Set deleted to true to abort the deleting function. When the lock is
  107. // released, the delete function will acquire the lock, check the value
  108. // of deleted and return.
  109. entry.deleted = true
  110. }
  111. return entry, true
  112. }
  113. // Clear removes all entries, and runs the callbacks if runCallback is true.
  114. func (c *TimeoutCache) Clear(runCallback bool) {
  115. var entries []*cacheEntry
  116. c.mu.Lock()
  117. for key := range c.cache {
  118. if e, ok := c.removeInternal(key); ok {
  119. entries = append(entries, e)
  120. }
  121. }
  122. c.mu.Unlock()
  123. if !runCallback {
  124. return
  125. }
  126. // removeInternal removes entries from cache, and also stops the timer, so
  127. // the callback is guaranteed to be not called. If runCallback is true,
  128. // manual execute all callbacks.
  129. for _, entry := range entries {
  130. entry.callback()
  131. }
  132. }