metrics.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. package prometheus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sort"
  7. "strings"
  8. "github.com/porter-dev/porter/internal/telemetry"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/client-go/kubernetes"
  11. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  12. )
  13. type ListNGINXIngressesResponse []SimpleIngress
  14. type GetPodMetricsRequest struct {
  15. QueryOpts
  16. }
  17. // GetPrometheusService returns the prometheus service name. The prometheus-community/prometheus chart @ v15.5.3 uses non-FQDN labels, unlike v22.6.2. This function checks for both labels.
  18. func GetPrometheusService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  19. redundantServices, err := clientset.CoreV1().Services("monitoring").List(context.TODO(), metav1.ListOptions{
  20. LabelSelector: "app=prometheus,component=server,heritage=Helm",
  21. })
  22. if err != nil {
  23. return nil, false, err
  24. }
  25. upgradedServices, err := clientset.CoreV1().Services("monitoring").List(context.TODO(), metav1.ListOptions{
  26. LabelSelector: "app.kubernetes.io/component=server,app.kubernetes.io/instance=prometheus,app.kubernetes.io/managed-by=Helm",
  27. })
  28. if err != nil {
  29. return nil, false, err
  30. }
  31. if len(redundantServices.Items) > 0 {
  32. return &redundantServices.Items[0], true, nil
  33. }
  34. if len(upgradedServices.Items) > 0 {
  35. return &upgradedServices.Items[0], true, nil
  36. }
  37. return nil, false, err
  38. }
  39. // getKubeStateMetricsService returns the prometheus service name
  40. func getKubeStateMetricsService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  41. services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  42. LabelSelector: "app.kubernetes.io/name=kube-state-metrics",
  43. })
  44. if err != nil {
  45. return nil, false, err
  46. }
  47. if len(services.Items) == 0 {
  48. return nil, false, nil
  49. }
  50. return &services.Items[0], true, nil
  51. }
  52. type SimpleIngress struct {
  53. Name string `json:"name"`
  54. Namespace string `json:"namespace"`
  55. }
  56. // GetIngressesWithNGINXAnnotation gets an array of names for all ingresses controlled by
  57. // NGINX
  58. func GetIngressesWithNGINXAnnotation(clientset kubernetes.Interface) ([]SimpleIngress, error) {
  59. res := make([]SimpleIngress, 0)
  60. foundMap := make(map[string]bool)
  61. v1beta1IngressList, v1beta1Err := clientset.NetworkingV1beta1().Ingresses("").List(context.TODO(), metav1.ListOptions{})
  62. v1IngressList, v1Err := clientset.NetworkingV1().Ingresses("").List(context.TODO(), metav1.ListOptions{})
  63. if v1beta1Err != nil && v1Err != nil {
  64. return nil, fmt.Errorf("List ingresses error: %s, %s", v1beta1Err.Error(), v1Err.Error())
  65. }
  66. if v1beta1Err == nil && len(v1beta1IngressList.Items) > 0 {
  67. for _, ingress := range v1beta1IngressList.Items {
  68. ingressAnn, found := ingress.ObjectMeta.Annotations["kubernetes.io/ingress.class"]
  69. uid := fmt.Sprintf("%s/%s", ingress.ObjectMeta.Namespace, ingress.ObjectMeta.Name)
  70. if _, exists := foundMap[uid]; !exists && ((found && ingressAnn == "nginx") || *ingress.Spec.IngressClassName == "nginx") {
  71. res = append(res, SimpleIngress{
  72. Name: ingress.ObjectMeta.Name,
  73. Namespace: ingress.ObjectMeta.Namespace,
  74. })
  75. foundMap[uid] = true
  76. }
  77. }
  78. }
  79. if v1Err == nil && len(v1IngressList.Items) > 0 {
  80. for _, ingress := range v1IngressList.Items {
  81. ingressAnn, found := ingress.ObjectMeta.Annotations["kubernetes.io/ingress.class"]
  82. uid := fmt.Sprintf("%s/%s", ingress.ObjectMeta.Namespace, ingress.ObjectMeta.Name)
  83. if _, exists := foundMap[uid]; !exists && ((found && ingressAnn == "nginx") || *ingress.Spec.IngressClassName == "nginx") {
  84. res = append(res, SimpleIngress{
  85. Name: ingress.ObjectMeta.Name,
  86. Namespace: ingress.ObjectMeta.Namespace,
  87. })
  88. foundMap[uid] = true
  89. }
  90. }
  91. }
  92. return res, nil
  93. }
  94. type QueryOpts struct {
  95. // the name of the metric being queried for
  96. Metric string `schema:"metric"`
  97. ShouldSum bool `schema:"shouldsum"`
  98. Kind string `schema:"kind"`
  99. PodList []string `schema:"pods"`
  100. Name string `schema:"name"`
  101. Namespace string `schema:"namespace"`
  102. // start time (in unix timestamp) for prometheus results
  103. StartRange uint `schema:"startrange"`
  104. // end time time (in unix timestamp) for prometheus results
  105. EndRange uint `schema:"endrange"`
  106. Resolution string `schema:"resolution"`
  107. Percentile float64 `schema:"percentile"`
  108. }
  109. func QueryPrometheus(
  110. ctx context.Context,
  111. clientset kubernetes.Interface,
  112. service *v1.Service,
  113. opts *QueryOpts,
  114. ) ([]*promParsedSingletonQuery, error) {
  115. ctx, span := telemetry.NewSpan(ctx, "query-prometheus")
  116. defer span.End()
  117. telemetry.WithAttributes(span,
  118. telemetry.AttributeKV{Key: "metric", Value: opts.Metric},
  119. telemetry.AttributeKV{Key: "should-sum", Value: opts.ShouldSum},
  120. telemetry.AttributeKV{Key: "kind", Value: opts.Kind},
  121. telemetry.AttributeKV{Key: "pod-list", Value: strings.Join(opts.PodList, ",")},
  122. telemetry.AttributeKV{Key: "name", Value: opts.Name},
  123. telemetry.AttributeKV{Key: "namespace", Value: opts.Namespace},
  124. telemetry.AttributeKV{Key: "start-range", Value: opts.StartRange},
  125. telemetry.AttributeKV{Key: "end-range", Value: opts.EndRange},
  126. telemetry.AttributeKV{Key: "range", Value: opts.EndRange - opts.StartRange},
  127. telemetry.AttributeKV{Key: "resolution", Value: opts.Resolution},
  128. telemetry.AttributeKV{Key: "percentile", Value: opts.Percentile},
  129. )
  130. if len(service.Spec.Ports) == 0 {
  131. return nil, telemetry.Error(ctx, span, nil, "prometheus service has no exposed ports to query")
  132. }
  133. selectionRegex, err := getSelectionRegex(opts.Kind, opts.Name)
  134. if err != nil {
  135. return nil, telemetry.Error(ctx, span, err, "failed to get selection regex")
  136. }
  137. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "selection-regex", Value: selectionRegex})
  138. var podSelector string
  139. if len(opts.PodList) > 0 {
  140. podSelector = fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, strings.Join(opts.PodList, "|"))
  141. } else {
  142. podSelector = fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, selectionRegex)
  143. }
  144. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "pod-selector", Value: podSelector})
  145. query := ""
  146. if opts.Metric == "cpu" {
  147. query = fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[5m])", podSelector)
  148. } else if opts.Metric == "memory" {
  149. query = fmt.Sprintf("container_memory_usage_bytes{%s}", podSelector)
  150. } else if opts.Metric == "network" {
  151. netPodSelector := fmt.Sprintf(`namespace="%s",pod=~"%s"`, opts.Namespace, selectionRegex)
  152. query = fmt.Sprintf("rate(container_network_receive_bytes_total{%s}[5m])", netPodSelector)
  153. } else if opts.Metric == "nginx:errors" {
  154. num := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_requests{status=~"5.*",exported_namespace="%s",ingress=~"%s"}[5m]) OR sum(rate(nginx_ingress_controller_requests{status=~"5.*",namespace="%s",ingress=~"%s"}[5m])) OR on() vector(0))`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  155. denom := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_requests{exported_namespace="%s",ingress=~"%s"}[5m]) OR sum(rate(nginx_ingress_controller_requests{namespace="%s",ingress=~"%s"}[5m])) > 0)`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  156. query = fmt.Sprintf(`%s / %s * 100 OR on() vector(0)`, num, denom)
  157. } else if opts.Metric == "nginx:latency" {
  158. num := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_request_duration_seconds_sum{exported_namespace=~"%s",ingress=~"%s"}[5m]) OR sum(rate(nginx_ingress_controller_request_duration_seconds_sum{namespace=~"%s",ingress=~"%s"}[5m])) OR on() vector(0))`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  159. denom := fmt.Sprintf(`(sum(rate(nginx_ingress_controller_request_duration_seconds_count{exported_namespace=~"%s",ingress=~"%s"}[5m])) OR sum(rate(nginx_ingress_controller_request_duration_seconds_count{namespace=~"%s",ingress=~"%s"}[5m])))`, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  160. query = fmt.Sprintf(`%s / %s OR on() vector(0)`, num, denom)
  161. } else if opts.Metric == "nginx:latency-histogram" {
  162. query = fmt.Sprintf(`histogram_quantile(%f, (sum(rate(nginx_ingress_controller_request_duration_seconds_bucket{status!="404",status!="500",exported_namespace=~"%s",ingress=~"%s"}[5m])) OR sum(rate(nginx_ingress_controller_request_duration_seconds_bucket{status!="404",status!="500",namespace=~"%s",ingress=~"%s"}[5m]))) by (le, ingress))`, opts.Percentile, opts.Namespace, selectionRegex, opts.Namespace, selectionRegex)
  163. } else if opts.Metric == "nginx:status" {
  164. query, err = getNginxStatusQuery(opts, selectionRegex)
  165. if err != nil {
  166. return nil, telemetry.Error(ctx, span, err, "failed to get nginx status query")
  167. }
  168. } else if opts.Metric == "cpu_hpa_threshold" {
  169. // get the name of the kube hpa metric
  170. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "spec_target_metric")
  171. cpuMetricName := getKubeCPUMetricName(clientset, service, opts)
  172. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  173. appLabel := ""
  174. if found {
  175. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  176. }
  177. query = createHPAAbsoluteCPUThresholdQuery(cpuMetricName, metricName, selectionRegex, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  178. } else if opts.Metric == "memory_hpa_threshold" {
  179. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "spec_target_metric")
  180. memMetricName := getKubeMemoryMetricName(clientset, service, opts)
  181. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  182. appLabel := ""
  183. if found {
  184. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  185. }
  186. query = createHPAAbsoluteMemoryThresholdQuery(memMetricName, metricName, selectionRegex, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  187. } else if opts.Metric == "hpa_replicas" {
  188. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "status_current_replicas")
  189. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  190. appLabel := ""
  191. if found {
  192. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  193. }
  194. query = createHPACurrentReplicasQuery(metricName, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  195. } else if opts.Metric == "replicas" {
  196. query = fmt.Sprintf(`kube_deployment_status_replicas{deployment="%s"}`, opts.Name)
  197. }
  198. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "query", Value: query})
  199. if opts.ShouldSum {
  200. query = fmt.Sprintf("sum(%s)", query)
  201. }
  202. queryParams := map[string]string{
  203. "query": query,
  204. "start": fmt.Sprintf("%d", opts.StartRange),
  205. "end": fmt.Sprintf("%d", opts.EndRange),
  206. "step": opts.Resolution,
  207. }
  208. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  209. "http",
  210. service.Name,
  211. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  212. "/api/v1/query_range",
  213. queryParams,
  214. )
  215. rawQuery, err := resp.DoRaw(ctx)
  216. if err != nil {
  217. // in this case, it's very likely that prometheus doesn't contain any data for the given labels
  218. if strings.Contains(err.Error(), "rejected our request for an unknown reason") {
  219. return []*promParsedSingletonQuery{}, nil
  220. }
  221. return nil, telemetry.Error(ctx, span, err, "failed to get raw query")
  222. }
  223. parsedQuery, err := parseQuery(ctx, rawQuery, opts.Metric)
  224. if err != nil {
  225. return nil, telemetry.Error(ctx, span, err, "failed to parse query")
  226. }
  227. return parsedQuery, nil
  228. }
  229. func getNginxStatusQuery(opts *QueryOpts, selectionRegex string) (string, error) {
  230. var queries []string
  231. // we recently changed the way labels are read into prometheus, which has removed the 'exported_' prepended to certain labels
  232. namespaceLabels := []string{"exported_namespace", "namespace"}
  233. for _, namespaceLabel := range namespaceLabels {
  234. queries = append(queries, fmt.Sprintf(`round(sum by (status_code, ingress)(label_replace(increase(nginx_ingress_controller_requests{%s=~"%s",ingress="%s",service="%s"}[2m]), "status_code", "${1}xx", "status", "(.)..")), 0.001)`, namespaceLabel, opts.Namespace, selectionRegex, opts.Name))
  235. }
  236. query := strings.Join(queries, " or ")
  237. return query, nil
  238. }
  239. type promRawQuery struct {
  240. Data struct {
  241. Result []struct {
  242. Metric struct {
  243. Pod string `json:"pod,omitempty"`
  244. StatusCode string `json:"status_code,omitempty"`
  245. } `json:"metric,omitempty"`
  246. Values [][]interface{} `json:"values"`
  247. } `json:"result"`
  248. } `json:"data"`
  249. }
  250. type promParsedSingletonQueryResult struct {
  251. Date interface{} `json:"date,omitempty"`
  252. CPU interface{} `json:"cpu,omitempty"`
  253. Replicas interface{} `json:"replicas,omitempty"`
  254. Memory interface{} `json:"memory,omitempty"`
  255. Bytes interface{} `json:"bytes,omitempty"`
  256. ErrorPct interface{} `json:"error_pct,omitempty"`
  257. Latency interface{} `json:"latency,omitempty"`
  258. StatusCode0xx interface{} `json:"0xx,omitempty"`
  259. StatusCode1xx interface{} `json:"1xx,omitempty"`
  260. StatusCode2xx interface{} `json:"2xx,omitempty"`
  261. StatusCode3xx interface{} `json:"3xx,omitempty"`
  262. StatusCode4xx interface{} `json:"4xx,omitempty"`
  263. StatusCode5xx interface{} `json:"5xx,omitempty"`
  264. }
  265. type promParsedSingletonQuery struct {
  266. Pod string `json:"pod,omitempty"`
  267. Results []promParsedSingletonQueryResult `json:"results"`
  268. }
  269. func parseQuery(ctx context.Context, rawQuery []byte, metric string) ([]*promParsedSingletonQuery, error) {
  270. ctx, span := telemetry.NewSpan(ctx, "parse-query")
  271. defer span.End()
  272. if metric == "nginx:status" {
  273. return parseNginxStatusQuery(ctx, rawQuery)
  274. }
  275. rawQueryObj := &promRawQuery{}
  276. err := json.Unmarshal(rawQuery, rawQueryObj)
  277. if err != nil {
  278. return nil, telemetry.Error(ctx, span, err, "failed to unmarshal raw query")
  279. }
  280. res := make([]*promParsedSingletonQuery, 0)
  281. for _, result := range rawQueryObj.Data.Result {
  282. singleton := &promParsedSingletonQuery{
  283. Pod: result.Metric.Pod,
  284. }
  285. singletonResults := make([]promParsedSingletonQueryResult, 0)
  286. for _, values := range result.Values {
  287. singletonResult := &promParsedSingletonQueryResult{
  288. Date: values[0],
  289. }
  290. if metric == "cpu" {
  291. singletonResult.CPU = values[1]
  292. } else if metric == "memory" {
  293. singletonResult.Memory = values[1]
  294. } else if metric == "network" {
  295. singletonResult.Bytes = values[1]
  296. } else if metric == "nginx:errors" {
  297. singletonResult.ErrorPct = values[1]
  298. } else if metric == "cpu_hpa_threshold" {
  299. singletonResult.CPU = values[1]
  300. } else if metric == "memory_hpa_threshold" {
  301. singletonResult.Memory = values[1]
  302. } else if metric == "hpa_replicas" {
  303. singletonResult.Replicas = values[1]
  304. } else if metric == "nginx:latency" || metric == "nginx:latency-histogram" {
  305. singletonResult.Latency = values[1]
  306. } else if metric == "replicas" {
  307. singletonResult.Replicas = values[1]
  308. }
  309. singletonResults = append(singletonResults, *singletonResult)
  310. }
  311. singleton.Results = singletonResults
  312. res = append(res, singleton)
  313. }
  314. return res, nil
  315. }
  316. func parseNginxStatusQuery(ctx context.Context, rawQuery []byte) ([]*promParsedSingletonQuery, error) {
  317. ctx, span := telemetry.NewSpan(ctx, "parse-nginx-status-query")
  318. defer span.End()
  319. rawQueryObj := &promRawQuery{}
  320. err := json.Unmarshal(rawQuery, rawQueryObj)
  321. if err != nil {
  322. return nil, telemetry.Error(ctx, span, err, "failed to unmarshal raw query")
  323. }
  324. singletonResultsByDate := make(map[string]*promParsedSingletonQueryResult, 0)
  325. keys := make([]string, 0)
  326. for _, result := range rawQueryObj.Data.Result {
  327. for _, values := range result.Values {
  328. date := values[0]
  329. dateKey := fmt.Sprintf("%v", date)
  330. if _, ok := singletonResultsByDate[dateKey]; !ok {
  331. keys = append(keys, dateKey)
  332. singletonResultsByDate[dateKey] = &promParsedSingletonQueryResult{
  333. Date: date,
  334. }
  335. }
  336. switch result.Metric.StatusCode {
  337. case "0xx":
  338. singletonResultsByDate[dateKey].StatusCode0xx = values[1]
  339. case "1xx":
  340. singletonResultsByDate[dateKey].StatusCode1xx = values[1]
  341. case "2xx":
  342. singletonResultsByDate[dateKey].StatusCode2xx = values[1]
  343. case "3xx":
  344. singletonResultsByDate[dateKey].StatusCode3xx = values[1]
  345. case "4xx":
  346. singletonResultsByDate[dateKey].StatusCode4xx = values[1]
  347. case "5xx":
  348. singletonResultsByDate[dateKey].StatusCode5xx = values[1]
  349. default:
  350. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "status-code", Value: result.Metric.StatusCode})
  351. return nil, telemetry.Error(ctx, span, nil, "unknown status code")
  352. }
  353. }
  354. }
  355. sort.Strings(keys)
  356. singletonResults := make([]promParsedSingletonQueryResult, 0)
  357. for _, k := range keys {
  358. singletonResults = append(singletonResults, *singletonResultsByDate[k])
  359. }
  360. singleton := &promParsedSingletonQuery{
  361. Results: singletonResults,
  362. }
  363. res := make([]*promParsedSingletonQuery, 0)
  364. res = append(res, singleton)
  365. return res, nil
  366. }
  367. func getSelectionRegex(kind, name string) (string, error) {
  368. var suffix string
  369. switch strings.ToLower(kind) {
  370. case "deployment":
  371. suffix = "[a-z0-9]+(-[a-z0-9]+)*"
  372. case "statefulset":
  373. suffix = "[0-9]+"
  374. case "job":
  375. suffix = "[a-z0-9]+"
  376. case "cronjob":
  377. suffix = "[a-z0-9]+-[a-z0-9]+"
  378. case "ingress":
  379. return name, nil
  380. case "daemonset":
  381. suffix = "[a-z0-9]+"
  382. default:
  383. return "", fmt.Errorf("not a supported controller to query for metrics")
  384. }
  385. return fmt.Sprintf("%s-%s", name, suffix), nil
  386. }
  387. func createHPAAbsoluteCPUThresholdQuery(cpuMetricName, metricName, podSelectionRegex, hpaName, namespace, appLabel, hpaMetricName string) string {
  388. kubeMetricsPodSelectorOne := getKubeMetricsPodSelector(podSelectionRegex, namespace, "namespace")
  389. kubeMetricsPodSelectorTwo := getKubeMetricsPodSelector(podSelectionRegex, namespace, "exported_namespace")
  390. kubeMetricsHPASelectorOne := fmt.Sprintf(
  391. `%s="%s",namespace="%s",metric_name="cpu",metric_target_type="utilization"`,
  392. hpaMetricName,
  393. hpaName,
  394. namespace,
  395. )
  396. kubeMetricsHPASelectorTwo := fmt.Sprintf(
  397. `%s="%s",exported_namespace="%s",metric_name="cpu",metric_target_type="utilization"`,
  398. hpaMetricName,
  399. hpaName,
  400. namespace,
  401. )
  402. if cpuMetricName == "kube_pod_container_resource_requests" {
  403. kubeMetricsPodSelectorOne += `,resource="cpu",unit="core"`
  404. kubeMetricsPodSelectorTwo += `,resource="cpu",unit="core"`
  405. }
  406. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  407. // as well
  408. if appLabel != "" {
  409. kubeMetricsPodSelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  410. kubeMetricsPodSelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  411. kubeMetricsHPASelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  412. kubeMetricsHPASelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  413. }
  414. requestCPUOne := fmt.Sprintf(
  415. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  416. hpaMetricName,
  417. cpuMetricName,
  418. kubeMetricsPodSelectorOne,
  419. hpaMetricName,
  420. hpaName,
  421. )
  422. targetCPUUtilThresholdOne := fmt.Sprintf(
  423. `%s{%s} / 100`,
  424. metricName,
  425. kubeMetricsHPASelectorOne,
  426. )
  427. requestCPUTwo := fmt.Sprintf(
  428. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  429. hpaMetricName,
  430. cpuMetricName,
  431. kubeMetricsPodSelectorTwo,
  432. hpaMetricName,
  433. hpaName,
  434. )
  435. targetCPUUtilThresholdTwo := fmt.Sprintf(
  436. `%s{%s} / 100`,
  437. metricName,
  438. kubeMetricsHPASelectorTwo,
  439. )
  440. return fmt.Sprintf(
  441. `(%s * on(%s) %s) or (%s * on(%s) %s)`,
  442. requestCPUOne, hpaMetricName, targetCPUUtilThresholdOne,
  443. requestCPUTwo, hpaMetricName, targetCPUUtilThresholdTwo,
  444. )
  445. }
  446. func createHPAAbsoluteMemoryThresholdQuery(memMetricName, metricName, podSelectionRegex, hpaName, namespace, appLabel, hpaMetricName string) string {
  447. kubeMetricsPodSelectorOne := getKubeMetricsPodSelector(podSelectionRegex, namespace, "namespace")
  448. kubeMetricsPodSelectorTwo := getKubeMetricsPodSelector(podSelectionRegex, namespace, "exported_namespace")
  449. kubeMetricsHPASelectorOne := fmt.Sprintf(
  450. `%s="%s",namespace="%s",metric_name="memory",metric_target_type="utilization"`,
  451. hpaMetricName,
  452. hpaName,
  453. namespace,
  454. )
  455. kubeMetricsHPASelectorTwo := fmt.Sprintf(
  456. `%s="%s",exported_namespace="%s",metric_name="memory",metric_target_type="utilization"`,
  457. hpaMetricName,
  458. hpaName,
  459. namespace,
  460. )
  461. if memMetricName == "kube_pod_container_resource_requests" {
  462. kubeMetricsPodSelectorOne += `,resource="memory",unit="byte"`
  463. kubeMetricsPodSelectorTwo += `,resource="memory",unit="byte"`
  464. }
  465. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  466. // as well
  467. if appLabel != "" {
  468. kubeMetricsPodSelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  469. kubeMetricsPodSelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  470. kubeMetricsHPASelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  471. kubeMetricsHPASelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  472. }
  473. requestMemOne := fmt.Sprintf(
  474. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  475. hpaMetricName,
  476. memMetricName,
  477. kubeMetricsPodSelectorOne,
  478. hpaMetricName,
  479. hpaName,
  480. )
  481. targetMemUtilThresholdOne := fmt.Sprintf(
  482. `%s{%s} / 100`,
  483. metricName,
  484. kubeMetricsHPASelectorOne,
  485. )
  486. requestMemTwo := fmt.Sprintf(
  487. `avg by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  488. hpaMetricName,
  489. memMetricName,
  490. kubeMetricsPodSelectorTwo,
  491. hpaMetricName,
  492. hpaName,
  493. )
  494. targetMemUtilThresholdTwo := fmt.Sprintf(
  495. `%s{%s} / 100`,
  496. metricName,
  497. kubeMetricsHPASelectorTwo,
  498. )
  499. return fmt.Sprintf(
  500. `(%s * on(%s) %s) or (%s * on(%s) %s)`,
  501. requestMemOne, hpaMetricName, targetMemUtilThresholdOne,
  502. requestMemTwo, hpaMetricName, targetMemUtilThresholdTwo,
  503. )
  504. }
  505. func getKubeMetricsPodSelector(podSelectionRegex, namespace, namespaceLabel string) string {
  506. return fmt.Sprintf(
  507. `pod=~"%s",%s="%s",container!="POD",container!=""`,
  508. podSelectionRegex,
  509. namespaceLabel,
  510. namespace,
  511. )
  512. }
  513. func createHPACurrentReplicasQuery(metricName, hpaName, namespace, appLabel, hpaMetricName string) string {
  514. kubeMetricsHPASelectorOne := fmt.Sprintf(
  515. `%s="%s",namespace="%s"`,
  516. hpaMetricName,
  517. hpaName,
  518. namespace,
  519. )
  520. kubeMetricsHPASelectorTwo := fmt.Sprintf(
  521. `%s="%s",exported_namespace="%s"`,
  522. hpaMetricName,
  523. hpaName,
  524. namespace,
  525. )
  526. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  527. // as well
  528. if appLabel != "" {
  529. kubeMetricsHPASelectorOne += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  530. kubeMetricsHPASelectorTwo += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  531. }
  532. return fmt.Sprintf(
  533. `(%s{%s}) or (%s{%s})`,
  534. metricName,
  535. kubeMetricsHPASelectorOne,
  536. metricName,
  537. kubeMetricsHPASelectorTwo,
  538. )
  539. }
  540. type promRawValuesQuery struct {
  541. Status string `json:"status"`
  542. Data []string `json:"data"`
  543. }
  544. // getKubeHPAMetricName performs a "best guess" for the name of the kube HPA metric,
  545. // which was renamed to kube_horizontalpodautoscaler... in later versions of kube-state-metrics.
  546. // we query Prometheus for a list of metric names to see if any match the new query
  547. // value, otherwise we return the deprecated name.
  548. func getKubeHPAMetricName(
  549. clientset kubernetes.Interface,
  550. service *v1.Service,
  551. opts *QueryOpts,
  552. suffix string,
  553. ) (string, string) {
  554. queryParams := map[string]string{
  555. "match[]": fmt.Sprintf("kube_horizontalpodautoscaler_%s", suffix),
  556. "start": fmt.Sprintf("%d", opts.StartRange),
  557. "end": fmt.Sprintf("%d", opts.EndRange),
  558. }
  559. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  560. "http",
  561. service.Name,
  562. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  563. "/api/v1/label/__name__/values",
  564. queryParams,
  565. )
  566. rawQuery, err := resp.DoRaw(context.TODO())
  567. if err != nil {
  568. return fmt.Sprintf("kube_hpa_%s", suffix), "hpa"
  569. }
  570. rawQueryObj := &promRawValuesQuery{}
  571. json.Unmarshal(rawQuery, rawQueryObj)
  572. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  573. return fmt.Sprintf("kube_horizontalpodautoscaler_%s", suffix), "horizontalpodautoscaler"
  574. }
  575. return fmt.Sprintf("kube_hpa_%s", suffix), "hpa"
  576. }
  577. func getKubeCPUMetricName(
  578. clientset kubernetes.Interface,
  579. service *v1.Service,
  580. opts *QueryOpts,
  581. ) string {
  582. queryParams := map[string]string{
  583. "match[]": "kube_pod_container_resource_requests",
  584. "start": fmt.Sprintf("%d", opts.StartRange),
  585. "end": fmt.Sprintf("%d", opts.EndRange),
  586. }
  587. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  588. "http",
  589. service.Name,
  590. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  591. "/api/v1/label/__name__/values",
  592. queryParams,
  593. )
  594. rawQuery, err := resp.DoRaw(context.TODO())
  595. if err != nil {
  596. return "kube_pod_container_resource_requests_cpu_cores"
  597. }
  598. rawQueryObj := &promRawValuesQuery{}
  599. json.Unmarshal(rawQuery, rawQueryObj)
  600. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  601. return "kube_pod_container_resource_requests"
  602. }
  603. return "kube_pod_container_resource_requests_cpu_cores"
  604. }
  605. func getKubeMemoryMetricName(
  606. clientset kubernetes.Interface,
  607. service *v1.Service,
  608. opts *QueryOpts,
  609. ) string {
  610. queryParams := map[string]string{
  611. "match[]": "kube_pod_container_resource_requests",
  612. "start": fmt.Sprintf("%d", opts.StartRange),
  613. "end": fmt.Sprintf("%d", opts.EndRange),
  614. }
  615. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  616. "http",
  617. service.Name,
  618. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  619. "/api/v1/label/__name__/values",
  620. queryParams,
  621. )
  622. rawQuery, err := resp.DoRaw(context.TODO())
  623. if err != nil {
  624. return "kube_pod_container_resource_requests_memory_bytes"
  625. }
  626. rawQueryObj := &promRawValuesQuery{}
  627. json.Unmarshal(rawQuery, rawQueryObj)
  628. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  629. return "kube_pod_container_resource_requests"
  630. }
  631. return "kube_pod_container_resource_requests_memory_bytes"
  632. }