edf.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package wrr
  18. import (
  19. "container/heap"
  20. "sync"
  21. )
  22. // edfWrr is a struct for EDF weighted round robin implementation.
  23. type edfWrr struct {
  24. lock sync.Mutex
  25. items edfPriorityQueue
  26. currentOrderOffset uint64
  27. currentTime float64
  28. }
  29. // NewEDF creates Earliest Deadline First (EDF)
  30. // (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) implementation for weighted round robin.
  31. // Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
  32. // at current time + 1 / weight, providing weighted round robin behavior with O(log n) pick time.
  33. func NewEDF() WRR {
  34. return &edfWrr{}
  35. }
  36. // edfEntry is an internal wrapper for item that also stores weight and relative position in the queue.
  37. type edfEntry struct {
  38. deadline float64
  39. weight int64
  40. orderOffset uint64
  41. item interface{}
  42. }
  43. // edfPriorityQueue is a heap.Interface implementation for edfEntry elements.
  44. type edfPriorityQueue []*edfEntry
  45. func (pq edfPriorityQueue) Len() int { return len(pq) }
  46. func (pq edfPriorityQueue) Less(i, j int) bool {
  47. return pq[i].deadline < pq[j].deadline || pq[i].deadline == pq[j].deadline && pq[i].orderOffset < pq[j].orderOffset
  48. }
  49. func (pq edfPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
  50. func (pq *edfPriorityQueue) Push(x interface{}) {
  51. *pq = append(*pq, x.(*edfEntry))
  52. }
  53. func (pq *edfPriorityQueue) Pop() interface{} {
  54. old := *pq
  55. *pq = old[0 : len(old)-1]
  56. return old[len(old)-1]
  57. }
  58. func (edf *edfWrr) Add(item interface{}, weight int64) {
  59. edf.lock.Lock()
  60. defer edf.lock.Unlock()
  61. entry := edfEntry{
  62. deadline: edf.currentTime + 1.0/float64(weight),
  63. weight: weight,
  64. item: item,
  65. orderOffset: edf.currentOrderOffset,
  66. }
  67. edf.currentOrderOffset++
  68. heap.Push(&edf.items, &entry)
  69. }
  70. func (edf *edfWrr) Next() interface{} {
  71. edf.lock.Lock()
  72. defer edf.lock.Unlock()
  73. if len(edf.items) == 0 {
  74. return nil
  75. }
  76. item := edf.items[0]
  77. edf.currentTime = item.deadline
  78. item.deadline = edf.currentTime + 1.0/float64(item.weight)
  79. heap.Fix(&edf.items, 0)
  80. return item.item
  81. }