topic_test.go 875 B

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. package server
  2. import (
  3. "github.com/stretchr/testify/require"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. )
  8. func TestTopic_CancelSubscribers(t *testing.T) {
  9. t.Parallel()
  10. subFn := func(v *visitor, msg *message) error {
  11. return nil
  12. }
  13. canceled1 := atomic.Bool{}
  14. cancelFn1 := func() {
  15. canceled1.Store(true)
  16. }
  17. canceled2 := atomic.Bool{}
  18. cancelFn2 := func() {
  19. canceled2.Store(true)
  20. }
  21. to := newTopic("mytopic")
  22. to.Subscribe(subFn, "", cancelFn1)
  23. to.Subscribe(subFn, "u_phil", cancelFn2)
  24. to.CancelSubscribers("u_phil")
  25. require.True(t, canceled1.Load())
  26. require.False(t, canceled2.Load())
  27. }
  28. func TestTopic_Keepalive(t *testing.T) {
  29. t.Parallel()
  30. to := newTopic("mytopic")
  31. to.lastAccess = time.Now().Add(-1 * time.Hour)
  32. to.Keepalive()
  33. require.True(t, to.LastAccess().Unix() >= time.Now().Unix()-2)
  34. require.True(t, to.LastAccess().Unix() <= time.Now().Unix()+2)
  35. }