ratelimitedclient_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. package prom
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "math"
  7. "net/http"
  8. "net/url"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/kubecost/cost-model/pkg/util"
  13. "github.com/kubecost/cost-model/pkg/util/httputil"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. )
  16. // ResponseAndBody is just a test objet used to hold predefined responses
  17. // and response bodies
  18. type ResponseAndBody struct {
  19. Response *http.Response
  20. Body []byte
  21. }
  22. // MockPromClient accepts a slice of responses and bodies to return on requests made.
  23. // It will cycle these responses linearly, then reset back to the first.
  24. // Also works with concurrent requests.
  25. type MockPromClient struct {
  26. sync.Mutex
  27. responses []*ResponseAndBody
  28. current int
  29. }
  30. // prometheus.Client URL()
  31. func (mpc *MockPromClient) URL(ep string, args map[string]string) *url.URL {
  32. return nil
  33. }
  34. // prometheus.Client Do
  35. func (mpc *MockPromClient) Do(context.Context, *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
  36. // fake latency
  37. time.Sleep(250 * time.Millisecond)
  38. mpc.Lock()
  39. defer mpc.Unlock()
  40. rnb := mpc.responses[mpc.current]
  41. mpc.current++
  42. if mpc.current >= len(mpc.responses) {
  43. mpc.current = 0
  44. }
  45. return rnb.Response, rnb.Body, nil, nil
  46. }
  47. // Creates a new mock prometheus client
  48. func newMockPromClientWith(responses []*ResponseAndBody) prometheus.Client {
  49. return &MockPromClient{
  50. responses: responses,
  51. current: 0,
  52. }
  53. }
  54. // creates a ResponseAndBody representing a 200 status code
  55. func newSuccessfulResponse() *ResponseAndBody {
  56. body := []byte("Success")
  57. return &ResponseAndBody{
  58. Response: &http.Response{
  59. StatusCode: 200,
  60. Body: io.NopCloser(bytes.NewReader(body)),
  61. },
  62. Body: body,
  63. }
  64. }
  65. // creates a ResponseAndBody representing a 400 status code
  66. func newFailureResponse() *ResponseAndBody {
  67. body := []byte("Fail")
  68. return &ResponseAndBody{
  69. Response: &http.Response{
  70. StatusCode: 400,
  71. Body: io.NopCloser(bytes.NewReader(body)),
  72. },
  73. Body: body,
  74. }
  75. }
  76. // creates a ResponseAndBody representing a 429 status code and 'Retry-After' header
  77. func newNormalRateLimitedResponse(retryAfter string) *ResponseAndBody {
  78. body := []byte("Rate Limitted")
  79. return &ResponseAndBody{
  80. Response: &http.Response{
  81. StatusCode: 429,
  82. Header: http.Header{
  83. "Retry-After": []string{retryAfter},
  84. },
  85. Body: io.NopCloser(bytes.NewReader(body)),
  86. },
  87. Body: body,
  88. }
  89. }
  90. // creates a ResponseAndBody representing some amazon services ThrottlingException 400 status
  91. func newHackyAmazonRateLimitedResponse() *ResponseAndBody {
  92. body := []byte("<ThrottlingException>\n <Message>Rate exceeded</Message>\n</ThrottlingException>\n")
  93. return &ResponseAndBody{
  94. Response: &http.Response{
  95. StatusCode: 400,
  96. Body: io.NopCloser(bytes.NewReader(body)),
  97. },
  98. Body: body,
  99. }
  100. }
  101. func newTestRetryOpts() *RateLimitRetryOpts {
  102. return &RateLimitRetryOpts{
  103. MaxRetries: 5,
  104. DefaultRetryWait: 100 * time.Millisecond,
  105. }
  106. }
  107. func TestRateLimitedOnceAndSuccess(t *testing.T) {
  108. t.Parallel()
  109. // creates a prom client with hard coded responses for any requests that
  110. // are issued
  111. promClient := newMockPromClientWith([]*ResponseAndBody{
  112. newNormalRateLimitedResponse("2"),
  113. newSuccessfulResponse(),
  114. })
  115. client, err := NewRateLimitedClient(
  116. "TestClient",
  117. promClient,
  118. 1,
  119. nil,
  120. nil,
  121. newTestRetryOpts(),
  122. "",
  123. )
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. req, err := http.NewRequest(http.MethodPost, "", nil)
  128. if err != nil {
  129. t.Fatal(err)
  130. }
  131. // we just need to execute this once to see retries in effect
  132. res, body, _, err := client.Do(context.Background(), req)
  133. if res.StatusCode != 200 {
  134. t.Fatalf("200 StatusCode expected. Got: %d", res.StatusCode)
  135. }
  136. if string(body) != "Success" {
  137. t.Fatalf("Expected 'Success' message body. Got: %s", string(body))
  138. }
  139. }
  140. func TestRateLimitedOnceAndFail(t *testing.T) {
  141. t.Parallel()
  142. // creates a prom client with hard coded responses for any requests that
  143. // are issued
  144. promClient := newMockPromClientWith([]*ResponseAndBody{
  145. newNormalRateLimitedResponse("2"),
  146. newFailureResponse(),
  147. })
  148. client, err := NewRateLimitedClient(
  149. "TestClient",
  150. promClient,
  151. 1,
  152. nil,
  153. nil,
  154. newTestRetryOpts(),
  155. "",
  156. )
  157. if err != nil {
  158. t.Fatal(err)
  159. }
  160. req, err := http.NewRequest(http.MethodPost, "", nil)
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. // we just need to execute this once to see retries in effect
  165. res, body, _, err := client.Do(context.Background(), req)
  166. if res.StatusCode != 400 {
  167. t.Fatalf("400 StatusCode expected. Got: %d", res.StatusCode)
  168. }
  169. if string(body) != "Fail" {
  170. t.Fatalf("Expected 'fail' message body. Got: %s", string(body))
  171. }
  172. }
  173. func TestRateLimitedResponses(t *testing.T) {
  174. t.Parallel()
  175. dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
  176. // creates a prom client with hard coded responses for any requests that
  177. // are issued
  178. promClient := newMockPromClientWith([]*ResponseAndBody{
  179. newNormalRateLimitedResponse("2"),
  180. newNormalRateLimitedResponse(dateRetry),
  181. newHackyAmazonRateLimitedResponse(),
  182. newHackyAmazonRateLimitedResponse(),
  183. newNormalRateLimitedResponse("3"),
  184. })
  185. client, err := NewRateLimitedClient(
  186. "TestClient",
  187. promClient,
  188. 1,
  189. nil,
  190. nil,
  191. newTestRetryOpts(),
  192. "",
  193. )
  194. if err != nil {
  195. t.Fatal(err)
  196. }
  197. req, err := http.NewRequest(http.MethodPost, "", nil)
  198. if err != nil {
  199. t.Fatal(err)
  200. }
  201. // we just need to execute this once to see retries in effect
  202. _, _, _, err = client.Do(context.Background(), req)
  203. if err == nil {
  204. t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
  205. }
  206. rateLimitErr, ok := err.(*RateLimitedResponseError)
  207. if !ok {
  208. t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
  209. }
  210. t.Logf("%s\n", rateLimitErr.Error())
  211. // RateLimitedResponseStatus checks just ensure that wait times were close configuration
  212. rateLimitRetries := rateLimitErr.RateLimitStatus
  213. if len(rateLimitRetries) != 5 {
  214. t.Fatalf("Expected 5 retries. Got: %d", len(rateLimitRetries))
  215. }
  216. // check 2s wait after
  217. seconds := rateLimitRetries[0].WaitTime.Seconds()
  218. if !util.IsApproximately(seconds, 2.0) {
  219. t.Fatalf("Expected 2.0 seconds. Got %.2f", seconds)
  220. }
  221. // check to see if fuzzed wait time for datetime parsing
  222. seconds = rateLimitRetries[1].WaitTime.Seconds()
  223. if math.Abs(seconds-2.0) > 3.0 {
  224. t.Fatalf("Expected delta between 2s and resulting wait time to be within 3s. Seconds: %.2f, Delta: %.2f", seconds, math.Abs(seconds-2.0))
  225. }
  226. // check 1s wait
  227. seconds = rateLimitRetries[2].WaitTime.Seconds()
  228. if !util.IsApproximately(seconds, 0.4) {
  229. t.Fatalf("Expected 0.4 seconds. Got %.2f", seconds)
  230. }
  231. // check 1s wait
  232. seconds = rateLimitRetries[3].WaitTime.Seconds()
  233. if !util.IsApproximately(seconds, 0.8) {
  234. t.Fatalf("Expected 0.8 seconds. Got %.2f", seconds)
  235. }
  236. // check 3s wait
  237. seconds = rateLimitRetries[4].WaitTime.Seconds()
  238. if !util.IsApproximately(seconds, 3.0) {
  239. t.Fatalf("Expected 3.0 seconds. Got %.2f", seconds)
  240. }
  241. }
  242. //
  243. func AssertDurationEqual(t *testing.T, expected, actual time.Duration) {
  244. if actual != expected {
  245. t.Fatalf("Expected: %dms, Got: %dms", expected.Milliseconds(), actual.Milliseconds())
  246. }
  247. }
  248. func TestExponentialBackOff(t *testing.T) {
  249. var ExpectedResults = []time.Duration{
  250. 100 * time.Millisecond,
  251. 200 * time.Millisecond,
  252. 400 * time.Millisecond,
  253. 800 * time.Millisecond,
  254. 1600 * time.Millisecond,
  255. }
  256. w := 100 * time.Millisecond
  257. for retry := 0; retry < 5; retry++ {
  258. AssertDurationEqual(t, ExpectedResults[retry], httputil.ExponentialBackoffWaitFor(w, retry))
  259. }
  260. }
  261. func TestConcurrentRateLimiting(t *testing.T) {
  262. t.Parallel()
  263. // Set QueryConcurrency to 3 here, then add a few for total requests
  264. const QueryConcurrency = 3
  265. const TotalRequests = QueryConcurrency + 2
  266. dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
  267. // creates a prom client with hard coded responses for any requests that
  268. // are issued
  269. promClient := newMockPromClientWith([]*ResponseAndBody{
  270. newNormalRateLimitedResponse("2"),
  271. newNormalRateLimitedResponse(dateRetry),
  272. newHackyAmazonRateLimitedResponse(),
  273. newHackyAmazonRateLimitedResponse(),
  274. newNormalRateLimitedResponse("3"),
  275. })
  276. client, err := NewRateLimitedClient(
  277. "TestClient",
  278. promClient,
  279. QueryConcurrency,
  280. nil,
  281. nil,
  282. newTestRetryOpts(),
  283. "",
  284. )
  285. if err != nil {
  286. t.Fatal(err)
  287. }
  288. errs := make(chan error, TotalRequests)
  289. for i := 0; i < TotalRequests; i++ {
  290. go func() {
  291. req, err := http.NewRequest(http.MethodPost, "", nil)
  292. if err != nil {
  293. errs <- err
  294. return
  295. }
  296. // we just need to execute this once to see retries in effect
  297. _, _, _, err = client.Do(context.Background(), req)
  298. errs <- err
  299. }()
  300. }
  301. for i := 0; i < TotalRequests; i++ {
  302. err := <-errs
  303. if err == nil {
  304. t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
  305. }
  306. rateLimitErr, ok := err.(*RateLimitedResponseError)
  307. if !ok {
  308. t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
  309. }
  310. t.Logf("%s\n", rateLimitErr.Error())
  311. }
  312. }