ratelimitedclient_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package prom
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "math"
  7. "net/http"
  8. "net/url"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/opencost/opencost/core/pkg/util"
  13. "github.com/opencost/opencost/core/pkg/util/httputil"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. )
  16. // ResponseAndBody is just a test objet used to hold predefined responses
  17. // and response bodies
  18. type ResponseAndBody struct {
  19. Response *http.Response
  20. Body []byte
  21. }
  22. // MockPromClient accepts a slice of responses and bodies to return on requests made.
  23. // It will cycle these responses linearly, then reset back to the first.
  24. // Also works with concurrent requests.
  25. type MockPromClient struct {
  26. sync.Mutex
  27. responses []*ResponseAndBody
  28. current int
  29. }
  30. // prometheus.Client URL()
  31. func (mpc *MockPromClient) URL(ep string, args map[string]string) *url.URL {
  32. return nil
  33. }
  34. // prometheus.Client Do
  35. func (mpc *MockPromClient) Do(context.Context, *http.Request) (*http.Response, []byte, error) {
  36. // fake latency
  37. time.Sleep(250 * time.Millisecond)
  38. mpc.Lock()
  39. defer mpc.Unlock()
  40. rnb := mpc.responses[mpc.current]
  41. mpc.current++
  42. if mpc.current >= len(mpc.responses) {
  43. mpc.current = 0
  44. }
  45. return rnb.Response, rnb.Body, nil
  46. }
  47. // Creates a new mock prometheus client
  48. func newMockPromClientWith(responses []*ResponseAndBody) prometheus.Client {
  49. return &MockPromClient{
  50. responses: responses,
  51. current: 0,
  52. }
  53. }
  54. // creates a ResponseAndBody representing a 200 status code
  55. func newSuccessfulResponse() *ResponseAndBody {
  56. body := []byte("Success")
  57. return &ResponseAndBody{
  58. Response: &http.Response{
  59. StatusCode: 200,
  60. Body: io.NopCloser(bytes.NewReader(body)),
  61. },
  62. Body: body,
  63. }
  64. }
  65. // creates a ResponseAndBody representing a 400 status code
  66. func newFailureResponse() *ResponseAndBody {
  67. body := []byte("Fail")
  68. return &ResponseAndBody{
  69. Response: &http.Response{
  70. StatusCode: 400,
  71. Body: io.NopCloser(bytes.NewReader(body)),
  72. },
  73. Body: body,
  74. }
  75. }
  76. // creates a ResponseAndBody representing a 429 status code and 'Retry-After' header
  77. func newNormalRateLimitedResponse(retryAfter string) *ResponseAndBody {
  78. body := []byte("Rate Limitted")
  79. return &ResponseAndBody{
  80. Response: &http.Response{
  81. StatusCode: 429,
  82. Header: http.Header{
  83. "Retry-After": []string{retryAfter},
  84. },
  85. Body: io.NopCloser(bytes.NewReader(body)),
  86. },
  87. Body: body,
  88. }
  89. }
  90. // creates a ResponseAndBody representing some amazon services ThrottlingException 400 status
  91. func newHackyAmazonRateLimitedResponse() *ResponseAndBody {
  92. body := []byte("<ThrottlingException>\n <Message>Rate exceeded</Message>\n</ThrottlingException>\n")
  93. return &ResponseAndBody{
  94. Response: &http.Response{
  95. StatusCode: 400,
  96. Body: io.NopCloser(bytes.NewReader(body)),
  97. },
  98. Body: body,
  99. }
  100. }
  101. func newTestRetryOpts() *RateLimitRetryOpts {
  102. return &RateLimitRetryOpts{
  103. MaxRetries: 5,
  104. DefaultRetryWait: 100 * time.Millisecond,
  105. }
  106. }
  107. func TestRateLimitedOnceAndSuccess(t *testing.T) {
  108. t.Parallel()
  109. // creates a prom client with hard coded responses for any requests that
  110. // are issued
  111. promClient := newMockPromClientWith([]*ResponseAndBody{
  112. newNormalRateLimitedResponse("2"),
  113. newSuccessfulResponse(),
  114. })
  115. client, err := NewRateLimitedClient(
  116. "TestClient",
  117. promClient,
  118. 1,
  119. nil,
  120. nil,
  121. newTestRetryOpts(),
  122. "",
  123. "",
  124. )
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. req, err := http.NewRequest(http.MethodPost, "", nil)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. // we just need to execute this once to see retries in effect
  133. res, body, err := client.Do(context.Background(), req)
  134. if err != nil {
  135. t.Fatal(err)
  136. }
  137. if res.StatusCode != 200 {
  138. t.Fatalf("200 StatusCode expected. Got: %d", res.StatusCode)
  139. }
  140. if string(body) != "Success" {
  141. t.Fatalf("Expected 'Success' message body. Got: %s", string(body))
  142. }
  143. }
  144. func TestRateLimitedOnceAndFail(t *testing.T) {
  145. t.Parallel()
  146. // creates a prom client with hard coded responses for any requests that
  147. // are issued
  148. promClient := newMockPromClientWith([]*ResponseAndBody{
  149. newNormalRateLimitedResponse("2"),
  150. newFailureResponse(),
  151. })
  152. client, err := NewRateLimitedClient(
  153. "TestClient",
  154. promClient,
  155. 1,
  156. nil,
  157. nil,
  158. newTestRetryOpts(),
  159. "",
  160. "",
  161. )
  162. if err != nil {
  163. t.Fatal(err)
  164. }
  165. req, err := http.NewRequest(http.MethodPost, "", nil)
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. // we just need to execute this once to see retries in effect
  170. res, body, err := client.Do(context.Background(), req)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. if res.StatusCode != 400 {
  175. t.Fatalf("400 StatusCode expected. Got: %d", res.StatusCode)
  176. }
  177. if string(body) != "Fail" {
  178. t.Fatalf("Expected 'fail' message body. Got: %s", string(body))
  179. }
  180. }
  181. func TestRateLimitedResponses(t *testing.T) {
  182. t.Parallel()
  183. dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
  184. // creates a prom client with hard coded responses for any requests that
  185. // are issued
  186. promClient := newMockPromClientWith([]*ResponseAndBody{
  187. newNormalRateLimitedResponse("2"),
  188. newNormalRateLimitedResponse(dateRetry),
  189. newHackyAmazonRateLimitedResponse(),
  190. newHackyAmazonRateLimitedResponse(),
  191. newNormalRateLimitedResponse("3"),
  192. })
  193. client, err := NewRateLimitedClient(
  194. "TestClient",
  195. promClient,
  196. 1,
  197. nil,
  198. nil,
  199. newTestRetryOpts(),
  200. "",
  201. "",
  202. )
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. req, err := http.NewRequest(http.MethodPost, "", nil)
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. // we just need to execute this once to see retries in effect
  211. _, _, err = client.Do(context.Background(), req)
  212. if err == nil {
  213. t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
  214. }
  215. rateLimitErr, ok := err.(*RateLimitedResponseError)
  216. if !ok {
  217. t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
  218. }
  219. t.Logf("%s\n", rateLimitErr.Error())
  220. // RateLimitedResponseStatus checks just ensure that wait times were close configuration
  221. rateLimitRetries := rateLimitErr.RateLimitStatus
  222. if len(rateLimitRetries) != 5 {
  223. t.Fatalf("Expected 5 retries. Got: %d", len(rateLimitRetries))
  224. }
  225. // check 2s wait after
  226. seconds := rateLimitRetries[0].WaitTime.Seconds()
  227. if !util.IsApproximately(seconds, 2.0) {
  228. t.Fatalf("Expected 2.0 seconds. Got %.2f", seconds)
  229. }
  230. // check to see if fuzzed wait time for datetime parsing
  231. seconds = rateLimitRetries[1].WaitTime.Seconds()
  232. if math.Abs(seconds-2.0) > 3.0 {
  233. t.Fatalf("Expected delta between 2s and resulting wait time to be within 3s. Seconds: %.2f, Delta: %.2f", seconds, math.Abs(seconds-2.0))
  234. }
  235. // check 1s wait
  236. seconds = rateLimitRetries[2].WaitTime.Seconds()
  237. if !util.IsApproximately(seconds, 0.4) {
  238. t.Fatalf("Expected 0.4 seconds. Got %.2f", seconds)
  239. }
  240. // check 1s wait
  241. seconds = rateLimitRetries[3].WaitTime.Seconds()
  242. if !util.IsApproximately(seconds, 0.8) {
  243. t.Fatalf("Expected 0.8 seconds. Got %.2f", seconds)
  244. }
  245. // check 3s wait
  246. seconds = rateLimitRetries[4].WaitTime.Seconds()
  247. if !util.IsApproximately(seconds, 3.0) {
  248. t.Fatalf("Expected 3.0 seconds. Got %.2f", seconds)
  249. }
  250. }
  251. func AssertDurationEqual(t *testing.T, expected, actual time.Duration) {
  252. if actual != expected {
  253. t.Fatalf("Expected: %dms, Got: %dms", expected.Milliseconds(), actual.Milliseconds())
  254. }
  255. }
  256. func TestExponentialBackOff(t *testing.T) {
  257. var ExpectedResults = []time.Duration{
  258. 100 * time.Millisecond,
  259. 200 * time.Millisecond,
  260. 400 * time.Millisecond,
  261. 800 * time.Millisecond,
  262. 1600 * time.Millisecond,
  263. }
  264. w := 100 * time.Millisecond
  265. for retry := 0; retry < 5; retry++ {
  266. AssertDurationEqual(t, ExpectedResults[retry], httputil.ExponentialBackoffWaitFor(w, retry))
  267. }
  268. }
  269. func TestConcurrentRateLimiting(t *testing.T) {
  270. t.Parallel()
  271. // Set QueryConcurrency to 3 here, then add a few for total requests
  272. const QueryConcurrency = 3
  273. const TotalRequests = QueryConcurrency + 2
  274. dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
  275. // creates a prom client with hard coded responses for any requests that
  276. // are issued
  277. promClient := newMockPromClientWith([]*ResponseAndBody{
  278. newNormalRateLimitedResponse("2"),
  279. newNormalRateLimitedResponse(dateRetry),
  280. newHackyAmazonRateLimitedResponse(),
  281. newHackyAmazonRateLimitedResponse(),
  282. newNormalRateLimitedResponse("3"),
  283. })
  284. client, err := NewRateLimitedClient(
  285. "TestClient",
  286. promClient,
  287. QueryConcurrency,
  288. nil,
  289. nil,
  290. newTestRetryOpts(),
  291. "",
  292. "",
  293. )
  294. if err != nil {
  295. t.Fatal(err)
  296. }
  297. errs := make(chan error, TotalRequests)
  298. for i := 0; i < TotalRequests; i++ {
  299. go func() {
  300. req, err := http.NewRequest(http.MethodPost, "", nil)
  301. if err != nil {
  302. errs <- err
  303. return
  304. }
  305. // we just need to execute this once to see retries in effect
  306. _, _, err = client.Do(context.Background(), req)
  307. errs <- err
  308. }()
  309. }
  310. for i := 0; i < TotalRequests; i++ {
  311. err := <-errs
  312. if err == nil {
  313. t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
  314. }
  315. rateLimitErr, ok := err.(*RateLimitedResponseError)
  316. if !ok {
  317. t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
  318. }
  319. t.Logf("%s\n", rateLimitErr.Error())
  320. }
  321. }