metrics.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. package prometheus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "sort"
  8. "strings"
  9. "github.com/porter-dev/porter/internal/telemetry"
  10. v1 "k8s.io/api/core/v1"
  11. "k8s.io/client-go/kubernetes"
  12. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  13. )
  14. type ListNGINXIngressesResponse []SimpleIngress
  15. type GetPodMetricsRequest struct {
  16. QueryOpts
  17. }
  18. // GetPrometheusService returns the prometheus service name. The prometheus-community/prometheus chart @ v15.5.3 uses non-FQDN labels, unlike v22.6.2. This function checks for both labels.
  19. func GetPrometheusService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  20. redundantServices, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  21. LabelSelector: "app=prometheus,component=server,heritage=Helm",
  22. })
  23. if err != nil {
  24. return nil, false, err
  25. }
  26. upgradedServices, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  27. LabelSelector: "app.kubernetes.io/component=server,app.kubernetes.io/instance=prometheus,app.kubernetes.io/managed-by=Helm",
  28. })
  29. if err != nil {
  30. return nil, false, err
  31. }
  32. if len(redundantServices.Items) > 0 {
  33. return &redundantServices.Items[0], true, nil
  34. }
  35. if len(upgradedServices.Items) > 0 {
  36. return &upgradedServices.Items[0], true, nil
  37. }
  38. return nil, false, err
  39. }
  40. // getKubeStateMetricsService returns the prometheus service name
  41. func getKubeStateMetricsService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  42. services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  43. LabelSelector: "app.kubernetes.io/name=kube-state-metrics",
  44. })
  45. if err != nil {
  46. return nil, false, err
  47. }
  48. if len(services.Items) == 0 {
  49. return nil, false, nil
  50. }
  51. return &services.Items[0], true, nil
  52. }
  53. type SimpleIngress struct {
  54. Name string `json:"name"`
  55. Namespace string `json:"namespace"`
  56. }
  57. // GetIngressesWithNGINXAnnotation gets an array of names for all ingresses controlled by
  58. // NGINX
  59. func GetIngressesWithNGINXAnnotation(clientset kubernetes.Interface) ([]SimpleIngress, error) {
  60. res := make([]SimpleIngress, 0)
  61. foundMap := make(map[string]bool)
  62. v1beta1IngressList, v1beta1Err := clientset.NetworkingV1beta1().Ingresses("").List(context.TODO(), metav1.ListOptions{})
  63. v1IngressList, v1Err := clientset.NetworkingV1().Ingresses("").List(context.TODO(), metav1.ListOptions{})
  64. if v1beta1Err != nil && v1Err != nil {
  65. return nil, fmt.Errorf("List ingresses error: %s, %s", v1beta1Err.Error(), v1Err.Error())
  66. }
  67. if v1beta1Err == nil && len(v1beta1IngressList.Items) > 0 {
  68. for _, ingress := range v1beta1IngressList.Items {
  69. ingressAnn, found := ingress.ObjectMeta.Annotations["kubernetes.io/ingress.class"]
  70. uid := fmt.Sprintf("%s/%s", ingress.ObjectMeta.Namespace, ingress.ObjectMeta.Name)
  71. if _, exists := foundMap[uid]; !exists && ((found && ingressAnn == "nginx") || *ingress.Spec.IngressClassName == "nginx") {
  72. res = append(res, SimpleIngress{
  73. Name: ingress.ObjectMeta.Name,
  74. Namespace: ingress.ObjectMeta.Namespace,
  75. })
  76. foundMap[uid] = true
  77. }
  78. }
  79. }
  80. if v1Err == nil && len(v1IngressList.Items) > 0 {
  81. for _, ingress := range v1IngressList.Items {
  82. ingressAnn, found := ingress.ObjectMeta.Annotations["kubernetes.io/ingress.class"]
  83. uid := fmt.Sprintf("%s/%s", ingress.ObjectMeta.Namespace, ingress.ObjectMeta.Name)
  84. if _, exists := foundMap[uid]; !exists && ((found && ingressAnn == "nginx") || *ingress.Spec.IngressClassName == "nginx") {
  85. res = append(res, SimpleIngress{
  86. Name: ingress.ObjectMeta.Name,
  87. Namespace: ingress.ObjectMeta.Namespace,
  88. })
  89. foundMap[uid] = true
  90. }
  91. }
  92. }
  93. return res, nil
  94. }
  95. type QueryOpts struct {
  96. // the name of the metric being queried for
  97. Metric string `schema:"metric"`
  98. ShouldSum bool `schema:"shouldsum"`
  99. Kind string `schema:"kind"`
  100. PodList []string `schema:"pods"`
  101. Name string `schema:"name"`
  102. Namespace string `schema:"namespace"`
  103. // start time (in unix timestamp) for prometheus results
  104. StartRange uint `schema:"startrange"`
  105. // end time time (in unix timestamp) for prometheus results
  106. EndRange uint `schema:"endrange"`
  107. Resolution string `schema:"resolution"`
  108. Percentile float64 `schema:"percentile"`
  109. }
  110. func QueryPrometheus(
  111. ctx context.Context,
  112. clientset kubernetes.Interface,
  113. service *v1.Service,
  114. opts *QueryOpts,
  115. ) ([]*promParsedSingletonQuery, error) {
  116. ctx, span := telemetry.NewSpan(ctx, "query-prometheus")
  117. defer span.End()
  118. telemetry.WithAttributes(span,
  119. telemetry.AttributeKV{Key: "metric", Value: opts.Metric},
  120. telemetry.AttributeKV{Key: "should-sum", Value: opts.ShouldSum},
  121. telemetry.AttributeKV{Key: "kind", Value: opts.Kind},
  122. telemetry.AttributeKV{Key: "pod-list", Value: strings.Join(opts.PodList, ",")},
  123. telemetry.AttributeKV{Key: "name", Value: opts.Name},
  124. telemetry.AttributeKV{Key: "namespace", Value: opts.Namespace},
  125. telemetry.AttributeKV{Key: "start-range", Value: opts.StartRange},
  126. telemetry.AttributeKV{Key: "end-range", Value: opts.EndRange},
  127. telemetry.AttributeKV{Key: "range", Value: opts.EndRange - opts.StartRange},
  128. telemetry.AttributeKV{Key: "resolution", Value: opts.Resolution},
  129. telemetry.AttributeKV{Key: "percentile", Value: opts.Percentile},
  130. )
  131. if len(service.Spec.Ports) == 0 {
  132. return nil, telemetry.Error(ctx, span, nil, "prometheus service has no exposed ports to query")
  133. }
  134. selectionRegex, err := getSelectionRegex(opts.Kind, opts.Name)
  135. if err != nil {
  136. return nil, telemetry.Error(ctx, span, err, "failed to get selection regex")
  137. }
  138. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "selection-regex", Value: selectionRegex})
  139. var podSelector string
  140. if len(opts.PodList) > 0 {
  141. podSelector = fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, strings.Join(opts.PodList, "|"))
  142. } else {
  143. podSelector = fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, selectionRegex)
  144. }
  145. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "pod-selector", Value: podSelector})
  146. query := ""
  147. if opts.Metric == "cpu" {
  148. query = fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[5m])", podSelector)
  149. } else if opts.Metric == "memory" {
  150. query = fmt.Sprintf("container_memory_usage_bytes{%s}", podSelector)
  151. } else if opts.Metric == "network" {
  152. netPodSelector := fmt.Sprintf(`namespace="%s",pod=~"%s"`, opts.Namespace, selectionRegex)
  153. query = fmt.Sprintf("rate(container_network_receive_bytes_total{%s}[5m])", netPodSelector)
  154. } else if opts.Metric == "nginx:errors" {
  155. num := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_requests{status=~"5.*",exported_namespace="%s",ingress=~"%s"}[5m]) OR sum(rate(nginx_ingress_controller_requests{status=~"5.*",namespace="%s",ingress=~"%s"}[5m])) OR on() vector(0))`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  156. denom := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_requests{exported_namespace="%s",ingress=~"%s"}[5m]) OR sum(rate(nginx_ingress_controller_requests{namespace="%s",ingress=~"%s"}[5m])) > 0)`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  157. query = fmt.Sprintf(`%s / %s * 100 OR on() vector(0)`, num, denom)
  158. } else if opts.Metric == "nginx:latency" {
  159. num := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_request_duration_seconds_sum{exported_namespace=~"%s",ingress=~"%s"}[5m]) OR sum(rate(nginx_ingress_controller_request_duration_seconds_sum{namespace=~"%s",ingress=~"%s"}[5m])) OR on() vector(0))`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  160. denom := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_request_duration_seconds_count{exported_namespace=~"%s",ingress=~"%s"}[5m])) OR sum(rate(nginx_ingress_controller_request_duration_seconds_count{namespace=~"%s",ingress=~"%s"}[5m])))`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  161. query = fmt.Sprintf(`%s / %s OR on() vector(0)`, num, denom)
  162. } else if opts.Metric == "nginx:latency-histogram" {
  163. query = fmt.Sprintf(`histogram_quantile(%f, (sum(rate(nginx_ingress_controller_request_duration_seconds_bucket{status!="404",status!="500",exported_namespace=~"%s",ingress=~"%s"}[5m])) OR sum(rate(nginx_ingress_controller_request_duration_seconds_bucket{status!="404",status!="500",namespace=~"%s",ingress=~"%s"}[5m]))) by (le, ingress))`, opts.Percentile, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  164. } else if opts.Metric == "nginx:status" {
  165. query, err = getNginxStatusQuery(opts, selectionRegex)
  166. if err != nil {
  167. return nil, telemetry.Error(ctx, span, err, "failed to get nginx status query")
  168. }
  169. } else if opts.Metric == "cpu_hpa_threshold" {
  170. // get the name of the kube hpa metric
  171. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "spec_target_metric")
  172. cpuMetricName := getKubeCPUMetricName(clientset, service, opts)
  173. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  174. appLabel := ""
  175. if found {
  176. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  177. }
  178. query = createHPAAbsoluteCPUThresholdQuery(cpuMetricName, metricName, selectionRegex, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  179. } else if opts.Metric == "memory_hpa_threshold" {
  180. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "spec_target_metric")
  181. memMetricName := getKubeMemoryMetricName(clientset, service, opts)
  182. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  183. appLabel := ""
  184. if found {
  185. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  186. }
  187. query = createHPAAbsoluteMemoryThresholdQuery(memMetricName, metricName, selectionRegex, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  188. } else if opts.Metric == "hpa_replicas" {
  189. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "status_current_replicas")
  190. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  191. appLabel := ""
  192. if found {
  193. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  194. }
  195. query = createHPACurrentReplicasQuery(metricName, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  196. } else if opts.Metric == "replicas" {
  197. query = fmt.Sprintf(`kube_deployment_status_replicas{deployment="%s"}`, opts.Name)
  198. }
  199. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "query", Value: query})
  200. if opts.ShouldSum {
  201. query = fmt.Sprintf("sum(%s)", query)
  202. }
  203. queryParams := map[string]string{
  204. "query": query,
  205. "start": fmt.Sprintf("%d", opts.StartRange),
  206. "end": fmt.Sprintf("%d", opts.EndRange),
  207. "step": opts.Resolution,
  208. }
  209. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  210. "http",
  211. service.Name,
  212. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  213. "/api/v1/query_range",
  214. queryParams,
  215. )
  216. rawQuery, err := resp.DoRaw(ctx)
  217. if err != nil {
  218. // in this case, it's very likely that prometheus doesn't contain any data for the given labels
  219. if strings.Contains(err.Error(), "rejected our request for an unknown reason") {
  220. return []*promParsedSingletonQuery{}, nil
  221. }
  222. return nil, telemetry.Error(ctx, span, err, "failed to get raw query")
  223. }
  224. parsedQuery, err := parseQuery(rawQuery, opts.Metric)
  225. if err != nil {
  226. return nil, telemetry.Error(ctx, span, err, "failed to parse query")
  227. }
  228. return parsedQuery, nil
  229. }
  230. func getNginxStatusQuery(opts *QueryOpts, selectionRegex string) (string, error) {
  231. var queries []string
  232. // we recently changed the way labels are read into prometheus, which has removed the 'exported_' prepended to certain labels
  233. namespaceLabels := []string{"exported_namespace", "namespace"}
  234. for _, namespaceLabel := range namespaceLabels {
  235. queries = append(queries, fmt.Sprintf(`round(sum by (status_code, ingress)(label_replace(increase(nginx_ingress_controller_requests{%s=~"%s",ingress="%s",service="%s"}[2m]), "status_code", "${1}xx", "status", "(.)..")), 0.001)`, namespaceLabel, opts.Namespace, selectionRegex, opts.Name))
  236. }
  237. query := strings.Join(queries, " or ")
  238. return query, nil
  239. }
  240. type promRawQuery struct {
  241. Data struct {
  242. Result []struct {
  243. Metric struct {
  244. Pod string `json:"pod,omitempty"`
  245. StatusCode string `json:"status_code,omitempty"`
  246. } `json:"metric,omitempty"`
  247. Values [][]interface{} `json:"values"`
  248. } `json:"result"`
  249. } `json:"data"`
  250. }
  251. type promParsedSingletonQueryResult struct {
  252. Date interface{} `json:"date,omitempty"`
  253. CPU interface{} `json:"cpu,omitempty"`
  254. Replicas interface{} `json:"replicas,omitempty"`
  255. Memory interface{} `json:"memory,omitempty"`
  256. Bytes interface{} `json:"bytes,omitempty"`
  257. ErrorPct interface{} `json:"error_pct,omitempty"`
  258. Latency interface{} `json:"latency,omitempty"`
  259. StatusCode1xx interface{} `json:"1xx,omitempty"`
  260. StatusCode2xx interface{} `json:"2xx,omitempty"`
  261. StatusCode3xx interface{} `json:"3xx,omitempty"`
  262. StatusCode4xx interface{} `json:"4xx,omitempty"`
  263. StatusCode5xx interface{} `json:"5xx,omitempty"`
  264. }
  265. type promParsedSingletonQuery struct {
  266. Pod string `json:"pod,omitempty"`
  267. Results []promParsedSingletonQueryResult `json:"results"`
  268. }
  269. func parseQuery(rawQuery []byte, metric string) ([]*promParsedSingletonQuery, error) {
  270. if metric == "nginx:status" {
  271. return parseNginxStatusQuery(rawQuery)
  272. }
  273. rawQueryObj := &promRawQuery{}
  274. err := json.Unmarshal(rawQuery, rawQueryObj)
  275. if err != nil {
  276. return nil, err
  277. }
  278. res := make([]*promParsedSingletonQuery, 0)
  279. for _, result := range rawQueryObj.Data.Result {
  280. singleton := &promParsedSingletonQuery{
  281. Pod: result.Metric.Pod,
  282. }
  283. singletonResults := make([]promParsedSingletonQueryResult, 0)
  284. for _, values := range result.Values {
  285. singletonResult := &promParsedSingletonQueryResult{
  286. Date: values[0],
  287. }
  288. if metric == "cpu" {
  289. singletonResult.CPU = values[1]
  290. } else if metric == "memory" {
  291. singletonResult.Memory = values[1]
  292. } else if metric == "network" {
  293. singletonResult.Bytes = values[1]
  294. } else if metric == "nginx:errors" {
  295. singletonResult.ErrorPct = values[1]
  296. } else if metric == "cpu_hpa_threshold" {
  297. singletonResult.CPU = values[1]
  298. } else if metric == "memory_hpa_threshold" {
  299. singletonResult.Memory = values[1]
  300. } else if metric == "hpa_replicas" {
  301. singletonResult.Replicas = values[1]
  302. } else if metric == "nginx:latency" || metric == "nginx:latency-histogram" {
  303. singletonResult.Latency = values[1]
  304. } else if metric == "replicas" {
  305. singletonResult.Replicas = values[1]
  306. }
  307. singletonResults = append(singletonResults, *singletonResult)
  308. }
  309. singleton.Results = singletonResults
  310. res = append(res, singleton)
  311. }
  312. return res, nil
  313. }
  314. func parseNginxStatusQuery(rawQuery []byte) ([]*promParsedSingletonQuery, error) {
  315. rawQueryObj := &promRawQuery{}
  316. err := json.Unmarshal(rawQuery, rawQueryObj)
  317. if err != nil {
  318. return nil, err
  319. }
  320. singletonResultsByDate := make(map[string]*promParsedSingletonQueryResult, 0)
  321. keys := make([]string, 0)
  322. for _, result := range rawQueryObj.Data.Result {
  323. for _, values := range result.Values {
  324. date := values[0]
  325. dateKey := fmt.Sprintf("%v", date)
  326. if _, ok := singletonResultsByDate[dateKey]; !ok {
  327. keys = append(keys, dateKey)
  328. singletonResultsByDate[dateKey] = &promParsedSingletonQueryResult{
  329. Date: date,
  330. }
  331. }
  332. switch result.Metric.StatusCode {
  333. case "1xx":
  334. singletonResultsByDate[dateKey].StatusCode1xx = values[1]
  335. case "2xx":
  336. singletonResultsByDate[dateKey].StatusCode2xx = values[1]
  337. case "3xx":
  338. singletonResultsByDate[dateKey].StatusCode3xx = values[1]
  339. case "4xx":
  340. singletonResultsByDate[dateKey].StatusCode4xx = values[1]
  341. case "5xx":
  342. singletonResultsByDate[dateKey].StatusCode5xx = values[1]
  343. default:
  344. return nil, errors.New("invalid nginx status code")
  345. }
  346. }
  347. }
  348. sort.Strings(keys)
  349. singletonResults := make([]promParsedSingletonQueryResult, 0)
  350. for _, k := range keys {
  351. singletonResults = append(singletonResults, *singletonResultsByDate[k])
  352. }
  353. singleton := &promParsedSingletonQuery{
  354. Results: singletonResults,
  355. }
  356. res := make([]*promParsedSingletonQuery, 0)
  357. res = append(res, singleton)
  358. return res, nil
  359. }
  360. func getSelectionRegex(kind, name string) (string, error) {
  361. var suffix string
  362. switch strings.ToLower(kind) {
  363. case "deployment":
  364. suffix = "[a-z0-9]+(-[a-z0-9]+)*"
  365. case "statefulset":
  366. suffix = "[0-9]+"
  367. case "job":
  368. suffix = "[a-z0-9]+"
  369. case "cronjob":
  370. suffix = "[a-z0-9]+-[a-z0-9]+"
  371. case "ingress":
  372. return name, nil
  373. case "daemonset":
  374. suffix = "[a-z0-9]+"
  375. default:
  376. return "", fmt.Errorf("not a supported controller to query for metrics")
  377. }
  378. return fmt.Sprintf("%s-%s", name, suffix), nil
  379. }
  380. func createHPAAbsoluteCPUThresholdQuery(cpuMetricName, metricName, podSelectionRegex, hpaName, namespace, appLabel, hpaMetricName string) string {
  381. kubeMetricsPodSelectorOne := getKubeMetricsPodSelector(podSelectionRegex, namespace, "namespace")
  382. kubeMetricsPodSelectorTwo := getKubeMetricsPodSelector(podSelectionRegex, namespace, "exported_namespace")
  383. kubeMetricsHPASelectorOne := fmt.Sprintf(
  384. `%s="%s",namespace="%s",metric_name="cpu",metric_target_type="utilization"`,
  385. hpaMetricName,
  386. hpaName,
  387. namespace,
  388. )
  389. kubeMetricsHPASelectorTwo := fmt.Sprintf(
  390. `%s="%s",exported_namespace="%s",metric_name="cpu",metric_target_type="utilization"`,
  391. hpaMetricName,
  392. hpaName,
  393. namespace,
  394. )
  395. if cpuMetricName == "kube_pod_container_resource_requests" {
  396. kubeMetricsPodSelectorOne += `,resource="cpu",unit="core"`
  397. kubeMetricsPodSelectorTwo += `,resource="cpu",unit="core"`
  398. }
  399. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  400. // as well
  401. if appLabel != "" {
  402. kubeMetricsPodSelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  403. kubeMetricsPodSelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  404. kubeMetricsHPASelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  405. kubeMetricsHPASelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  406. }
  407. requestCPUOne := fmt.Sprintf(
  408. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  409. hpaMetricName,
  410. cpuMetricName,
  411. kubeMetricsPodSelectorOne,
  412. hpaMetricName,
  413. hpaName,
  414. )
  415. targetCPUUtilThresholdOne := fmt.Sprintf(
  416. `%s{%s} / 100`,
  417. metricName,
  418. kubeMetricsHPASelectorOne,
  419. )
  420. requestCPUTwo := fmt.Sprintf(
  421. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  422. hpaMetricName,
  423. cpuMetricName,
  424. kubeMetricsPodSelectorTwo,
  425. hpaMetricName,
  426. hpaName,
  427. )
  428. targetCPUUtilThresholdTwo := fmt.Sprintf(
  429. `%s{%s} / 100`,
  430. metricName,
  431. kubeMetricsHPASelectorTwo,
  432. )
  433. return fmt.Sprintf(
  434. `(%s * on(%s) %s) or (%s * on(%s) %s)`,
  435. requestCPUOne, hpaMetricName, targetCPUUtilThresholdOne,
  436. requestCPUTwo, hpaMetricName, targetCPUUtilThresholdTwo,
  437. )
  438. }
  439. func createHPAAbsoluteMemoryThresholdQuery(memMetricName, metricName, podSelectionRegex, hpaName, namespace, appLabel, hpaMetricName string) string {
  440. kubeMetricsPodSelectorOne := getKubeMetricsPodSelector(podSelectionRegex, namespace, "namespace")
  441. kubeMetricsPodSelectorTwo := getKubeMetricsPodSelector(podSelectionRegex, namespace, "exported_namespace")
  442. kubeMetricsHPASelectorOne := fmt.Sprintf(
  443. `%s="%s",namespace="%s",metric_name="memory",metric_target_type="utilization"`,
  444. hpaMetricName,
  445. hpaName,
  446. namespace,
  447. )
  448. kubeMetricsHPASelectorTwo := fmt.Sprintf(
  449. `%s="%s",exported_namespace="%s",metric_name="memory",metric_target_type="utilization"`,
  450. hpaMetricName,
  451. hpaName,
  452. namespace,
  453. )
  454. if memMetricName == "kube_pod_container_resource_requests" {
  455. kubeMetricsPodSelectorOne += `,resource="memory",unit="byte"`
  456. kubeMetricsPodSelectorTwo += `,resource="memory",unit="byte"`
  457. }
  458. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  459. // as well
  460. if appLabel != "" {
  461. kubeMetricsPodSelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  462. kubeMetricsPodSelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  463. kubeMetricsHPASelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  464. kubeMetricsHPASelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  465. }
  466. requestMemOne := fmt.Sprintf(
  467. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  468. hpaMetricName,
  469. memMetricName,
  470. kubeMetricsPodSelectorOne,
  471. hpaMetricName,
  472. hpaName,
  473. )
  474. targetMemUtilThresholdOne := fmt.Sprintf(
  475. `%s{%s} / 100`,
  476. metricName,
  477. kubeMetricsHPASelectorOne,
  478. )
  479. requestMemTwo := fmt.Sprintf(
  480. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  481. hpaMetricName,
  482. memMetricName,
  483. kubeMetricsPodSelectorTwo,
  484. hpaMetricName,
  485. hpaName,
  486. )
  487. targetMemUtilThresholdTwo := fmt.Sprintf(
  488. `%s{%s} / 100`,
  489. metricName,
  490. kubeMetricsHPASelectorTwo,
  491. )
  492. return fmt.Sprintf(
  493. `(%s * on(%s) %s) or (%s * on(%s) %s)`,
  494. requestMemOne, hpaMetricName, targetMemUtilThresholdOne,
  495. requestMemTwo, hpaMetricName, targetMemUtilThresholdTwo,
  496. )
  497. }
  498. func getKubeMetricsPodSelector(podSelectionRegex, namespace, namespaceLabel string) string {
  499. return fmt.Sprintf(
  500. `pod=~"%s",%s="%s",container!="POD",container!=""`,
  501. podSelectionRegex,
  502. namespaceLabel,
  503. namespace,
  504. )
  505. }
  506. func createHPACurrentReplicasQuery(metricName, hpaName, namespace, appLabel, hpaMetricName string) string {
  507. kubeMetricsHPASelectorOne := fmt.Sprintf(
  508. `%s="%s",namespace="%s"`,
  509. hpaMetricName,
  510. hpaName,
  511. namespace,
  512. )
  513. kubeMetricsHPASelectorTwo := fmt.Sprintf(
  514. `%s="%s",exported_namespace="%s"`,
  515. hpaMetricName,
  516. hpaName,
  517. namespace,
  518. )
  519. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  520. // as well
  521. if appLabel != "" {
  522. kubeMetricsHPASelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  523. kubeMetricsHPASelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  524. }
  525. return fmt.Sprintf(
  526. `(%s{%s}) or (%s{%s})`,
  527. metricName,
  528. kubeMetricsHPASelectorOne,
  529. metricName,
  530. kubeMetricsHPASelectorTwo,
  531. )
  532. }
  533. type promRawValuesQuery struct {
  534. Status string `json:"status"`
  535. Data []string `json:"data"`
  536. }
  537. // getKubeHPAMetricName performs a "best guess" for the name of the kube HPA metric,
  538. // which was renamed to kube_horizontalpodautoscaler... in later versions of kube-state-metrics.
  539. // we query Prometheus for a list of metric names to see if any match the new query
  540. // value, otherwise we return the deprecated name.
  541. func getKubeHPAMetricName(
  542. clientset kubernetes.Interface,
  543. service *v1.Service,
  544. opts *QueryOpts,
  545. suffix string,
  546. ) (string, string) {
  547. queryParams := map[string]string{
  548. "match[]": fmt.Sprintf("kube_horizontalpodautoscaler_%s", suffix),
  549. "start": fmt.Sprintf("%d", opts.StartRange),
  550. "end": fmt.Sprintf("%d", opts.EndRange),
  551. }
  552. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  553. "http",
  554. service.Name,
  555. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  556. "/api/v1/label/__name__/values",
  557. queryParams,
  558. )
  559. rawQuery, err := resp.DoRaw(context.TODO())
  560. if err != nil {
  561. return fmt.Sprintf("kube_hpa_%s", suffix), "hpa"
  562. }
  563. rawQueryObj := &promRawValuesQuery{}
  564. json.Unmarshal(rawQuery, rawQueryObj)
  565. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  566. return fmt.Sprintf("kube_horizontalpodautoscaler_%s", suffix), "horizontalpodautoscaler"
  567. }
  568. return fmt.Sprintf("kube_hpa_%s", suffix), "hpa"
  569. }
  570. func getKubeCPUMetricName(
  571. clientset kubernetes.Interface,
  572. service *v1.Service,
  573. opts *QueryOpts,
  574. ) string {
  575. queryParams := map[string]string{
  576. "match[]": "kube_pod_container_resource_requests",
  577. "start": fmt.Sprintf("%d", opts.StartRange),
  578. "end": fmt.Sprintf("%d", opts.EndRange),
  579. }
  580. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  581. "http",
  582. service.Name,
  583. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  584. "/api/v1/label/__name__/values",
  585. queryParams,
  586. )
  587. rawQuery, err := resp.DoRaw(context.TODO())
  588. if err != nil {
  589. return "kube_pod_container_resource_requests_cpu_cores"
  590. }
  591. rawQueryObj := &promRawValuesQuery{}
  592. json.Unmarshal(rawQuery, rawQueryObj)
  593. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  594. return "kube_pod_container_resource_requests"
  595. }
  596. return "kube_pod_container_resource_requests_cpu_cores"
  597. }
  598. func getKubeMemoryMetricName(
  599. clientset kubernetes.Interface,
  600. service *v1.Service,
  601. opts *QueryOpts,
  602. ) string {
  603. queryParams := map[string]string{
  604. "match[]": "kube_pod_container_resource_requests",
  605. "start": fmt.Sprintf("%d", opts.StartRange),
  606. "end": fmt.Sprintf("%d", opts.EndRange),
  607. }
  608. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  609. "http",
  610. service.Name,
  611. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  612. "/api/v1/label/__name__/values",
  613. queryParams,
  614. )
  615. rawQuery, err := resp.DoRaw(context.TODO())
  616. if err != nil {
  617. return "kube_pod_container_resource_requests_memory_bytes"
  618. }
  619. rawQueryObj := &promRawValuesQuery{}
  620. json.Unmarshal(rawQuery, rawQueryObj)
  621. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  622. return "kube_pod_container_resource_requests"
  623. }
  624. return "kube_pod_container_resource_requests_memory_bytes"
  625. }