cachegroup_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package cache
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. )
  7. type Obj struct {
  8. Value int
  9. }
  10. func TestGroupCacheSingleFlighting(t *testing.T) {
  11. g := NewCacheGroup[*Obj](3)
  12. factory := func() (*Obj, error) {
  13. time.Sleep(2 * time.Second)
  14. return &Obj{10}, nil
  15. }
  16. next := make(chan struct{})
  17. done := make(chan struct{})
  18. go func() {
  19. now := time.Now()
  20. o, _ := g.Do("a", func() (*Obj, error) {
  21. next <- struct{}{}
  22. return factory()
  23. })
  24. t.Logf("Took: %d ms, Obj Value: %d\n", time.Now().Sub(now).Milliseconds(), o.Value)
  25. }()
  26. go func() {
  27. <-next
  28. time.Sleep(1 * time.Second)
  29. now := time.Now()
  30. o, _ := g.Do("a", factory)
  31. delta := time.Now().Sub(now)
  32. t.Logf("Other Go Routine Took: %d ms, Obj Value: %d\n", delta.Milliseconds(), o.Value)
  33. if delta > (time.Duration(1250 * time.Millisecond)) {
  34. t.Errorf("Delta Time > 1250ms. Delta: %d, Expected 1000ms\n", delta)
  35. }
  36. done <- struct{}{}
  37. }()
  38. <-done
  39. }
  40. func TestGroupCacheAfterSingleFlighting(t *testing.T) {
  41. g := NewCacheGroup[*Obj](3)
  42. factory := func() (*Obj, error) {
  43. time.Sleep(2 * time.Second)
  44. return &Obj{10}, nil
  45. }
  46. next := make(chan struct{})
  47. done := make(chan struct{})
  48. go func() {
  49. now := time.Now()
  50. o, _ := g.Do("a", func() (*Obj, error) {
  51. next <- struct{}{}
  52. return factory()
  53. })
  54. t.Logf("Took: %d ms, Obj Value: %d\n", time.Now().Sub(now).Milliseconds(), o.Value)
  55. }()
  56. go func() {
  57. <-next
  58. // wait the full 2 seconds and then some, which will ensure we are no longer
  59. // single flighting, and should reach into the cache
  60. time.Sleep(2500 * time.Millisecond)
  61. now := time.Now()
  62. o, _ := g.Do("a", factory)
  63. delta := time.Now().Sub(now)
  64. t.Logf("Other Go Routine Took: %d ms, Obj Value: %d\n", delta.Milliseconds(), o.Value)
  65. if delta > (time.Duration(1250 * time.Millisecond)) {
  66. t.Errorf("Delta Time > 1250ms. Delta: %d, Expected 1000ms\n", delta)
  67. }
  68. done <- struct{}{}
  69. }()
  70. <-done
  71. }
  72. func TestGroupCacheMany(t *testing.T) {
  73. // Apologies this test can be difficult to follow. (Concurrent tests are hard)
  74. // The idea here is that we test a "request" that takes 1 second to return an
  75. // Obj{10} result (factory).
  76. // * To test the single flight behavior, we make a series of requests that will
  77. // happen while the initial request is in flight.
  78. // * The second half of requests will be made after the original request returns
  79. // to ensure that we pull from cache.
  80. // * The failure case is if all of these actions takes too long to execute, which
  81. // _should_ indicate a deadlock or problem with the API.
  82. g := NewCacheGroup[*Obj](3).WithExpiration(10*time.Second, 5*time.Second)
  83. factory := func() (*Obj, error) {
  84. time.Sleep(1 * time.Second)
  85. return &Obj{10}, nil
  86. }
  87. next := make(chan struct{})
  88. go func() {
  89. now := time.Now()
  90. o, _ := g.Do("a", func() (*Obj, error) {
  91. next <- struct{}{}
  92. return factory()
  93. })
  94. t.Logf("Took: %d ms, Obj Value: %d\n", time.Now().Sub(now).Milliseconds(), o.Value)
  95. }()
  96. <-next
  97. var wg sync.WaitGroup
  98. wg.Add(10)
  99. for i := 0; i < 10; i++ {
  100. go func(ii int) {
  101. t.Logf("Created Go Routine: %d\n", ii)
  102. now := time.Now()
  103. o, _ := g.Do("a", factory)
  104. delta := time.Now().Sub(now)
  105. t.Logf("Go Routine[%d] Took: %d ms, Obj Value: %d\n", ii, delta.Milliseconds(), o.Value)
  106. wg.Done()
  107. }(i)
  108. time.Sleep(250 * time.Millisecond)
  109. }
  110. select {
  111. case <-waitChannelFor(&wg):
  112. t.Logf("Successfully returned values for all requests.")
  113. case <-time.After(time.Second * 8):
  114. t.Logf("Failed to complete after 8 second timeout")
  115. }
  116. }
  117. func TestCacheGroupExpirationPolicy(t *testing.T) {
  118. g := NewCacheGroup[*Obj](3).WithExpiration(2*time.Second, time.Second)
  119. g.Do("a", func() (*Obj, error) {
  120. return &Obj{10}, nil
  121. })
  122. time.Sleep(2100 * time.Millisecond)
  123. if len(g.cache) > 0 {
  124. t.Errorf("Expected cache to be empty (expired). Cache length was: %d\n", len(g.cache))
  125. }
  126. }
  127. func TestCacheGroupMaxRollOff(t *testing.T) {
  128. g := NewCacheGroup[*Obj](3)
  129. g.Do("a", func() (*Obj, error) {
  130. return &Obj{1}, nil
  131. })
  132. g.Do("b", func() (*Obj, error) {
  133. return &Obj{1}, nil
  134. })
  135. g.Do("c", func() (*Obj, error) {
  136. return &Obj{1}, nil
  137. })
  138. g.Do("d", func() (*Obj, error) {
  139. return &Obj{1}, nil
  140. })
  141. if _, ok := g.cache["a"]; ok {
  142. t.Errorf("Expected 'a' group cache to be evicted")
  143. }
  144. }
  145. func waitChannelFor(wg *sync.WaitGroup) <-chan struct{} {
  146. ch := make(chan struct{})
  147. go func() {
  148. wg.Wait()
  149. ch <- struct{}{}
  150. }()
  151. return ch
  152. }