ratelimitedclient_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. package prom
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "math"
  7. "net/http"
  8. "net/url"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/opencost/opencost/pkg/util"
  13. "github.com/opencost/opencost/pkg/util/httputil"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. )
  16. // ResponseAndBody is just a test objet used to hold predefined responses
  17. // and response bodies
  18. type ResponseAndBody struct {
  19. Response *http.Response
  20. Body []byte
  21. }
  22. // MockPromClient accepts a slice of responses and bodies to return on requests made.
  23. // It will cycle these responses linearly, then reset back to the first.
  24. // Also works with concurrent requests.
  25. type MockPromClient struct {
  26. sync.Mutex
  27. responses []*ResponseAndBody
  28. current int
  29. }
  30. // prometheus.Client URL()
  31. func (mpc *MockPromClient) URL(ep string, args map[string]string) *url.URL {
  32. return nil
  33. }
  34. // prometheus.Client Do
  35. func (mpc *MockPromClient) Do(context.Context, *http.Request) (*http.Response, []byte, error) {
  36. // fake latency
  37. time.Sleep(250 * time.Millisecond)
  38. mpc.Lock()
  39. defer mpc.Unlock()
  40. rnb := mpc.responses[mpc.current]
  41. mpc.current++
  42. if mpc.current >= len(mpc.responses) {
  43. mpc.current = 0
  44. }
  45. return rnb.Response, rnb.Body, nil
  46. }
  47. // Creates a new mock prometheus client
  48. func newMockPromClientWith(responses []*ResponseAndBody) prometheus.Client {
  49. return &MockPromClient{
  50. responses: responses,
  51. current: 0,
  52. }
  53. }
  54. // creates a ResponseAndBody representing a 200 status code
  55. func newSuccessfulResponse() *ResponseAndBody {
  56. body := []byte("Success")
  57. return &ResponseAndBody{
  58. Response: &http.Response{
  59. StatusCode: 200,
  60. Body: io.NopCloser(bytes.NewReader(body)),
  61. },
  62. Body: body,
  63. }
  64. }
  65. // creates a ResponseAndBody representing a 400 status code
  66. func newFailureResponse() *ResponseAndBody {
  67. body := []byte("Fail")
  68. return &ResponseAndBody{
  69. Response: &http.Response{
  70. StatusCode: 400,
  71. Body: io.NopCloser(bytes.NewReader(body)),
  72. },
  73. Body: body,
  74. }
  75. }
  76. // creates a ResponseAndBody representing a 429 status code and 'Retry-After' header
  77. func newNormalRateLimitedResponse(retryAfter string) *ResponseAndBody {
  78. body := []byte("Rate Limitted")
  79. return &ResponseAndBody{
  80. Response: &http.Response{
  81. StatusCode: 429,
  82. Header: http.Header{
  83. "Retry-After": []string{retryAfter},
  84. },
  85. Body: io.NopCloser(bytes.NewReader(body)),
  86. },
  87. Body: body,
  88. }
  89. }
  90. // creates a ResponseAndBody representing some amazon services ThrottlingException 400 status
  91. func newHackyAmazonRateLimitedResponse() *ResponseAndBody {
  92. body := []byte("<ThrottlingException>\n <Message>Rate exceeded</Message>\n</ThrottlingException>\n")
  93. return &ResponseAndBody{
  94. Response: &http.Response{
  95. StatusCode: 400,
  96. Body: io.NopCloser(bytes.NewReader(body)),
  97. },
  98. Body: body,
  99. }
  100. }
  101. func newTestRetryOpts() *RateLimitRetryOpts {
  102. return &RateLimitRetryOpts{
  103. MaxRetries: 5,
  104. DefaultRetryWait: 100 * time.Millisecond,
  105. }
  106. }
  107. func TestRateLimitedOnceAndSuccess(t *testing.T) {
  108. t.Parallel()
  109. // creates a prom client with hard coded responses for any requests that
  110. // are issued
  111. promClient := newMockPromClientWith([]*ResponseAndBody{
  112. newNormalRateLimitedResponse("2"),
  113. newSuccessfulResponse(),
  114. })
  115. client, err := NewRateLimitedClient(
  116. "TestClient",
  117. promClient,
  118. 1,
  119. nil,
  120. nil,
  121. newTestRetryOpts(),
  122. "",
  123. "",
  124. )
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. req, err := http.NewRequest(http.MethodPost, "", nil)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. // we just need to execute this once to see retries in effect
  133. res, body, err := client.Do(context.Background(), req)
  134. if res.StatusCode != 200 {
  135. t.Fatalf("200 StatusCode expected. Got: %d", res.StatusCode)
  136. }
  137. if string(body) != "Success" {
  138. t.Fatalf("Expected 'Success' message body. Got: %s", string(body))
  139. }
  140. }
  141. func TestRateLimitedOnceAndFail(t *testing.T) {
  142. t.Parallel()
  143. // creates a prom client with hard coded responses for any requests that
  144. // are issued
  145. promClient := newMockPromClientWith([]*ResponseAndBody{
  146. newNormalRateLimitedResponse("2"),
  147. newFailureResponse(),
  148. })
  149. client, err := NewRateLimitedClient(
  150. "TestClient",
  151. promClient,
  152. 1,
  153. nil,
  154. nil,
  155. newTestRetryOpts(),
  156. "",
  157. "",
  158. )
  159. if err != nil {
  160. t.Fatal(err)
  161. }
  162. req, err := http.NewRequest(http.MethodPost, "", nil)
  163. if err != nil {
  164. t.Fatal(err)
  165. }
  166. // we just need to execute this once to see retries in effect
  167. res, body, err := client.Do(context.Background(), req)
  168. if res.StatusCode != 400 {
  169. t.Fatalf("400 StatusCode expected. Got: %d", res.StatusCode)
  170. }
  171. if string(body) != "Fail" {
  172. t.Fatalf("Expected 'fail' message body. Got: %s", string(body))
  173. }
  174. }
  175. func TestRateLimitedResponses(t *testing.T) {
  176. t.Parallel()
  177. dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
  178. // creates a prom client with hard coded responses for any requests that
  179. // are issued
  180. promClient := newMockPromClientWith([]*ResponseAndBody{
  181. newNormalRateLimitedResponse("2"),
  182. newNormalRateLimitedResponse(dateRetry),
  183. newHackyAmazonRateLimitedResponse(),
  184. newHackyAmazonRateLimitedResponse(),
  185. newNormalRateLimitedResponse("3"),
  186. })
  187. client, err := NewRateLimitedClient(
  188. "TestClient",
  189. promClient,
  190. 1,
  191. nil,
  192. nil,
  193. newTestRetryOpts(),
  194. "",
  195. "",
  196. )
  197. if err != nil {
  198. t.Fatal(err)
  199. }
  200. req, err := http.NewRequest(http.MethodPost, "", nil)
  201. if err != nil {
  202. t.Fatal(err)
  203. }
  204. // we just need to execute this once to see retries in effect
  205. _, _, err = client.Do(context.Background(), req)
  206. if err == nil {
  207. t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
  208. }
  209. rateLimitErr, ok := err.(*RateLimitedResponseError)
  210. if !ok {
  211. t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
  212. }
  213. t.Logf("%s\n", rateLimitErr.Error())
  214. // RateLimitedResponseStatus checks just ensure that wait times were close configuration
  215. rateLimitRetries := rateLimitErr.RateLimitStatus
  216. if len(rateLimitRetries) != 5 {
  217. t.Fatalf("Expected 5 retries. Got: %d", len(rateLimitRetries))
  218. }
  219. // check 2s wait after
  220. seconds := rateLimitRetries[0].WaitTime.Seconds()
  221. if !util.IsApproximately(seconds, 2.0) {
  222. t.Fatalf("Expected 2.0 seconds. Got %.2f", seconds)
  223. }
  224. // check to see if fuzzed wait time for datetime parsing
  225. seconds = rateLimitRetries[1].WaitTime.Seconds()
  226. if math.Abs(seconds-2.0) > 3.0 {
  227. t.Fatalf("Expected delta between 2s and resulting wait time to be within 3s. Seconds: %.2f, Delta: %.2f", seconds, math.Abs(seconds-2.0))
  228. }
  229. // check 1s wait
  230. seconds = rateLimitRetries[2].WaitTime.Seconds()
  231. if !util.IsApproximately(seconds, 0.4) {
  232. t.Fatalf("Expected 0.4 seconds. Got %.2f", seconds)
  233. }
  234. // check 1s wait
  235. seconds = rateLimitRetries[3].WaitTime.Seconds()
  236. if !util.IsApproximately(seconds, 0.8) {
  237. t.Fatalf("Expected 0.8 seconds. Got %.2f", seconds)
  238. }
  239. // check 3s wait
  240. seconds = rateLimitRetries[4].WaitTime.Seconds()
  241. if !util.IsApproximately(seconds, 3.0) {
  242. t.Fatalf("Expected 3.0 seconds. Got %.2f", seconds)
  243. }
  244. }
  245. func AssertDurationEqual(t *testing.T, expected, actual time.Duration) {
  246. if actual != expected {
  247. t.Fatalf("Expected: %dms, Got: %dms", expected.Milliseconds(), actual.Milliseconds())
  248. }
  249. }
  250. func TestExponentialBackOff(t *testing.T) {
  251. var ExpectedResults = []time.Duration{
  252. 100 * time.Millisecond,
  253. 200 * time.Millisecond,
  254. 400 * time.Millisecond,
  255. 800 * time.Millisecond,
  256. 1600 * time.Millisecond,
  257. }
  258. w := 100 * time.Millisecond
  259. for retry := 0; retry < 5; retry++ {
  260. AssertDurationEqual(t, ExpectedResults[retry], httputil.ExponentialBackoffWaitFor(w, retry))
  261. }
  262. }
  263. func TestConcurrentRateLimiting(t *testing.T) {
  264. t.Parallel()
  265. // Set QueryConcurrency to 3 here, then add a few for total requests
  266. const QueryConcurrency = 3
  267. const TotalRequests = QueryConcurrency + 2
  268. dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
  269. // creates a prom client with hard coded responses for any requests that
  270. // are issued
  271. promClient := newMockPromClientWith([]*ResponseAndBody{
  272. newNormalRateLimitedResponse("2"),
  273. newNormalRateLimitedResponse(dateRetry),
  274. newHackyAmazonRateLimitedResponse(),
  275. newHackyAmazonRateLimitedResponse(),
  276. newNormalRateLimitedResponse("3"),
  277. })
  278. client, err := NewRateLimitedClient(
  279. "TestClient",
  280. promClient,
  281. QueryConcurrency,
  282. nil,
  283. nil,
  284. newTestRetryOpts(),
  285. "",
  286. "",
  287. )
  288. if err != nil {
  289. t.Fatal(err)
  290. }
  291. errs := make(chan error, TotalRequests)
  292. for i := 0; i < TotalRequests; i++ {
  293. go func() {
  294. req, err := http.NewRequest(http.MethodPost, "", nil)
  295. if err != nil {
  296. errs <- err
  297. return
  298. }
  299. // we just need to execute this once to see retries in effect
  300. _, _, err = client.Do(context.Background(), req)
  301. errs <- err
  302. }()
  303. }
  304. for i := 0; i < TotalRequests; i++ {
  305. err := <-errs
  306. if err == nil {
  307. t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
  308. }
  309. rateLimitErr, ok := err.(*RateLimitedResponseError)
  310. if !ok {
  311. t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
  312. }
  313. t.Logf("%s\n", rateLimitErr.Error())
  314. }
  315. }