metrics.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package prometheus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. v1 "k8s.io/api/core/v1"
  8. "k8s.io/client-go/kubernetes"
  9. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  10. )
  11. // returns the prometheus service name
  12. func GetPrometheusService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  13. services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  14. LabelSelector: "app=prometheus,component=server,heritage=Helm",
  15. })
  16. if err != nil {
  17. return nil, false, err
  18. }
  19. if len(services.Items) == 0 {
  20. return nil, false, nil
  21. }
  22. return &services.Items[0], true, nil
  23. }
  24. // returns the prometheus service name
  25. func getKubeStateMetricsService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  26. services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  27. LabelSelector: "app.kubernetes.io/name=kube-state-metrics",
  28. })
  29. if err != nil {
  30. return nil, false, err
  31. }
  32. if len(services.Items) == 0 {
  33. return nil, false, nil
  34. }
  35. return &services.Items[0], true, nil
  36. }
  37. type SimpleIngress struct {
  38. Name string `json:"name"`
  39. Namespace string `json:"namespace"`
  40. }
  41. // GetIngressesWithNGINXAnnotation gets an array of names for all ingresses controlled by
  42. // NGINX
  43. func GetIngressesWithNGINXAnnotation(clientset kubernetes.Interface) ([]SimpleIngress, error) {
  44. ingressList, err := clientset.NetworkingV1beta1().Ingresses("").List(context.TODO(), metav1.ListOptions{})
  45. if err != nil {
  46. return nil, err
  47. }
  48. res := make([]SimpleIngress, 0)
  49. for _, ingress := range ingressList.Items {
  50. if ingressAnn, found := ingress.ObjectMeta.Annotations["kubernetes.io/ingress.class"]; found {
  51. if ingressAnn == "nginx" {
  52. res = append(res, SimpleIngress{
  53. Name: ingress.ObjectMeta.Name,
  54. Namespace: ingress.ObjectMeta.Namespace,
  55. })
  56. }
  57. }
  58. }
  59. return res, nil
  60. }
  61. type QueryOpts struct {
  62. Metric string `schema:"metric"`
  63. ShouldSum bool `schema:"shouldsum"`
  64. Kind string `schema:"kind"`
  65. PodList []string `schema:"pods"`
  66. Name string `schema:"name"`
  67. Namespace string `schema:"namespace"`
  68. StartRange uint `schema:"startrange"`
  69. EndRange uint `schema:"endrange"`
  70. Resolution string `schema:"resolution"`
  71. }
  72. func QueryPrometheus(
  73. clientset kubernetes.Interface,
  74. service *v1.Service,
  75. opts *QueryOpts,
  76. ) ([]byte, error) {
  77. if len(service.Spec.Ports) == 0 {
  78. return nil, fmt.Errorf("prometheus service has no exposed ports to query")
  79. }
  80. podSelectionRegex, err := getPodSelectionRegex(opts.Kind, opts.Name)
  81. if err != nil {
  82. return nil, err
  83. }
  84. var podSelector string
  85. if len(opts.PodList) > 0 {
  86. podSelector = fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, strings.Join(opts.PodList, "|"))
  87. } else {
  88. podSelector = fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, podSelectionRegex)
  89. }
  90. query := ""
  91. if opts.Metric == "cpu" {
  92. query = fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[5m])", podSelector)
  93. } else if opts.Metric == "memory" {
  94. query = fmt.Sprintf("container_memory_usage_bytes{%s}", podSelector)
  95. } else if opts.Metric == "network" {
  96. netPodSelector := fmt.Sprintf(`namespace="%s",pod=~"%s",container="POD"`, opts.Namespace, podSelectionRegex)
  97. query = fmt.Sprintf("rate(container_network_receive_bytes_total{%s}[5m])", netPodSelector)
  98. } else if opts.Metric == "nginx:errors" {
  99. num := fmt.Sprintf(`sum(rate(nginx_ingress_controller_requests{status=~"5.*",namespace="%s",ingress=~"%s"}[5m]) OR on() vector(0))`, opts.Namespace, podSelectionRegex)
  100. denom := fmt.Sprintf(`sum(rate(nginx_ingress_controller_requests{namespace="%s",ingress=~"%s"}[5m]) > 0)`, opts.Namespace, podSelectionRegex)
  101. query = fmt.Sprintf(`%s / %s * 100 OR on() vector(0)`, num, denom)
  102. } else if opts.Metric == "cpu_hpa_threshold" {
  103. // get the name of the kube hpa metric
  104. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "spec_target_metric")
  105. cpuMetricName := getKubeCPUMetricName(clientset, service, opts)
  106. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  107. appLabel := ""
  108. if found {
  109. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  110. }
  111. query = createHPAAbsoluteCPUThresholdQuery(cpuMetricName, metricName, podSelectionRegex, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  112. } else if opts.Metric == "memory_hpa_threshold" {
  113. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "spec_target_metric")
  114. memMetricName := getKubeMemoryMetricName(clientset, service, opts)
  115. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  116. appLabel := ""
  117. if found {
  118. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  119. }
  120. query = createHPAAbsoluteMemoryThresholdQuery(memMetricName, metricName, podSelectionRegex, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  121. } else if opts.Metric == "hpa_replicas" {
  122. metricName, hpaMetricName := getKubeHPAMetricName(clientset, service, opts, "status_current_replicas")
  123. ksmSvc, found, _ := getKubeStateMetricsService(clientset)
  124. appLabel := ""
  125. if found {
  126. appLabel = ksmSvc.ObjectMeta.Labels["app.kubernetes.io/instance"]
  127. }
  128. query = createHPACurrentReplicasQuery(metricName, opts.Name, opts.Namespace, appLabel, hpaMetricName)
  129. }
  130. if opts.ShouldSum {
  131. query = fmt.Sprintf("sum(%s)", query)
  132. }
  133. fmt.Println("QUERY IS", query)
  134. queryParams := map[string]string{
  135. "query": query,
  136. "start": fmt.Sprintf("%d", opts.StartRange),
  137. "end": fmt.Sprintf("%d", opts.EndRange),
  138. "step": opts.Resolution,
  139. }
  140. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  141. "http",
  142. service.Name,
  143. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  144. "/api/v1/query_range",
  145. queryParams,
  146. )
  147. rawQuery, err := resp.DoRaw(context.TODO())
  148. if err != nil {
  149. return nil, err
  150. }
  151. return parseQuery(rawQuery, opts.Metric)
  152. }
  153. type promRawQuery struct {
  154. Data struct {
  155. Result []struct {
  156. Metric struct {
  157. Pod string `json:"pod,omitempty"`
  158. } `json:"metric,omitempty"`
  159. Values [][]interface{} `json:"values"`
  160. } `json:"result"`
  161. } `json:"data"`
  162. }
  163. type promParsedSingletonQueryResult struct {
  164. Date interface{} `json:"date,omitempty"`
  165. CPU interface{} `json:"cpu,omitempty"`
  166. Replicas interface{} `json:"replicas,omitempty"`
  167. Memory interface{} `json:"memory,omitempty"`
  168. Bytes interface{} `json:"bytes,omitempty"`
  169. ErrorPct interface{} `json:"error_pct,omitempty"`
  170. }
  171. type promParsedSingletonQuery struct {
  172. Pod string `json:"pod,omitempty"`
  173. Results []promParsedSingletonQueryResult `json:"results"`
  174. }
  175. func parseQuery(rawQuery []byte, metric string) ([]byte, error) {
  176. rawQueryObj := &promRawQuery{}
  177. json.Unmarshal(rawQuery, rawQueryObj)
  178. res := make([]*promParsedSingletonQuery, 0)
  179. for _, result := range rawQueryObj.Data.Result {
  180. singleton := &promParsedSingletonQuery{
  181. Pod: result.Metric.Pod,
  182. }
  183. singletonResults := make([]promParsedSingletonQueryResult, 0)
  184. for _, values := range result.Values {
  185. singletonResult := &promParsedSingletonQueryResult{
  186. Date: values[0],
  187. }
  188. if metric == "cpu" {
  189. singletonResult.CPU = values[1]
  190. } else if metric == "memory" {
  191. singletonResult.Memory = values[1]
  192. } else if metric == "network" {
  193. singletonResult.Bytes = values[1]
  194. } else if metric == "nginx:errors" {
  195. singletonResult.ErrorPct = values[1]
  196. } else if metric == "cpu_hpa_threshold" {
  197. singletonResult.CPU = values[1]
  198. } else if metric == "memory_hpa_threshold" {
  199. singletonResult.Memory = values[1]
  200. } else if metric == "hpa_replicas" {
  201. singletonResult.Replicas = values[1]
  202. }
  203. singletonResults = append(singletonResults, *singletonResult)
  204. }
  205. singleton.Results = singletonResults
  206. res = append(res, singleton)
  207. }
  208. return json.Marshal(res)
  209. }
  210. func getPodSelectionRegex(kind, name string) (string, error) {
  211. var suffix string
  212. switch strings.ToLower(kind) {
  213. case "deployment":
  214. suffix = "[a-z0-9]+-[a-z0-9]+"
  215. case "statefulset":
  216. suffix = "[0-9]+"
  217. case "job":
  218. suffix = "[a-z0-9]+"
  219. case "cronjob":
  220. suffix = "[a-z0-9]+-[a-z0-9]+"
  221. default:
  222. return "", fmt.Errorf("not a supported controller to query for metrics")
  223. }
  224. return fmt.Sprintf("%s-%s", name, suffix), nil
  225. }
  226. func createHPAAbsoluteCPUThresholdQuery(cpuMetricName, metricName, podSelectionRegex, hpaName, namespace, appLabel, hpaMetricName string) string {
  227. kubeMetricsPodSelector := getKubeMetricsPodSelector(podSelectionRegex, namespace)
  228. kubeMetricsHPASelector := fmt.Sprintf(
  229. `%s="%s",namespace="%s",metric_name="cpu",metric_target_type="utilization"`,
  230. hpaMetricName,
  231. hpaName,
  232. namespace,
  233. )
  234. if cpuMetricName == "kube_pod_container_resource_requests" {
  235. kubeMetricsPodSelector += `,resource="cpu",unit="core"`
  236. }
  237. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  238. // as well
  239. if appLabel != "" {
  240. kubeMetricsPodSelector += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  241. kubeMetricsHPASelector += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  242. }
  243. requestCPU := fmt.Sprintf(
  244. `sum by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  245. hpaMetricName,
  246. cpuMetricName,
  247. kubeMetricsPodSelector,
  248. hpaMetricName,
  249. hpaName,
  250. )
  251. targetCPUUtilThreshold := fmt.Sprintf(
  252. `%s{%s} / 100`,
  253. metricName,
  254. kubeMetricsHPASelector,
  255. )
  256. return fmt.Sprintf(`%s * on(%s) %s`, requestCPU, hpaMetricName, targetCPUUtilThreshold)
  257. }
  258. func createHPAAbsoluteMemoryThresholdQuery(memMetricName, metricName, podSelectionRegex, hpaName, namespace, appLabel, hpaMetricName string) string {
  259. kubeMetricsPodSelector := getKubeMetricsPodSelector(podSelectionRegex, namespace)
  260. kubeMetricsHPASelector := fmt.Sprintf(
  261. `%s="%s",namespace="%s",metric_name="memory",metric_target_type="utilization"`,
  262. hpaMetricName,
  263. hpaName,
  264. namespace,
  265. )
  266. if memMetricName == "kube_pod_container_resource_requests" {
  267. kubeMetricsPodSelector += `,resource="memory",unit="byte"`
  268. }
  269. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  270. // as well
  271. if appLabel != "" {
  272. kubeMetricsPodSelector += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  273. kubeMetricsHPASelector += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  274. }
  275. requestMem := fmt.Sprintf(
  276. `sum by (%s) (label_replace(%s{%s},"%s", "%s", "", ""))`,
  277. hpaMetricName,
  278. memMetricName,
  279. kubeMetricsPodSelector,
  280. hpaMetricName,
  281. hpaName,
  282. )
  283. targetMemUtilThreshold := fmt.Sprintf(
  284. `%s{%s} / 100`,
  285. metricName,
  286. kubeMetricsHPASelector,
  287. )
  288. return fmt.Sprintf(`%s * on(%s) %s`, requestMem, hpaMetricName, targetMemUtilThreshold)
  289. }
  290. func getKubeMetricsPodSelector(podSelectionRegex, namespace string) string {
  291. return fmt.Sprintf(
  292. `pod=~"%s",namespace="%s",container!="POD",container!=""`,
  293. podSelectionRegex,
  294. namespace,
  295. )
  296. }
  297. func createHPACurrentReplicasQuery(metricName, hpaName, namespace, appLabel, hpaMetricName string) string {
  298. kubeMetricsHPASelector := fmt.Sprintf(
  299. `%s="%s",namespace="%s"`,
  300. hpaMetricName,
  301. hpaName,
  302. namespace,
  303. )
  304. // the kube-state-metrics queries are less prone to error if the field app_kubernetes_io_instance is matched
  305. // as well
  306. if appLabel != "" {
  307. kubeMetricsHPASelector += fmt.Sprintf(`,app_kubernetes_io_instance="%s"`, appLabel)
  308. }
  309. return fmt.Sprintf(
  310. `%s{%s}`,
  311. metricName,
  312. kubeMetricsHPASelector,
  313. )
  314. }
  315. type promRawValuesQuery struct {
  316. Status string `json:"status"`
  317. Data []string `json:"data"`
  318. }
  319. // getKubeHPAMetricName performs a "best guess" for the name of the kube HPA metric,
  320. // which was renamed to kube_horizontalpodautoscaler... in later versions of kube-state-metrics.
  321. // we query Prometheus for a list of metric names to see if any match the new query
  322. // value, otherwise we return the deprecated name.
  323. func getKubeHPAMetricName(
  324. clientset kubernetes.Interface,
  325. service *v1.Service,
  326. opts *QueryOpts,
  327. suffix string,
  328. ) (string, string) {
  329. queryParams := map[string]string{
  330. "match[]": fmt.Sprintf("kube_horizontalpodautoscaler_%s", suffix),
  331. "start": fmt.Sprintf("%d", opts.StartRange),
  332. "end": fmt.Sprintf("%d", opts.EndRange),
  333. }
  334. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  335. "http",
  336. service.Name,
  337. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  338. "/api/v1/label/__name__/values",
  339. queryParams,
  340. )
  341. rawQuery, err := resp.DoRaw(context.TODO())
  342. if err != nil {
  343. return fmt.Sprintf("kube_hpa_%s", suffix), "hpa"
  344. }
  345. rawQueryObj := &promRawValuesQuery{}
  346. json.Unmarshal(rawQuery, rawQueryObj)
  347. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  348. return fmt.Sprintf("kube_horizontalpodautoscaler_%s", suffix), "horizontalpodautoscaler"
  349. }
  350. return fmt.Sprintf("kube_hpa_%s", suffix), "hpa"
  351. }
  352. func getKubeCPUMetricName(
  353. clientset kubernetes.Interface,
  354. service *v1.Service,
  355. opts *QueryOpts,
  356. ) string {
  357. queryParams := map[string]string{
  358. "match[]": "kube_pod_container_resource_requests",
  359. "start": fmt.Sprintf("%d", opts.StartRange),
  360. "end": fmt.Sprintf("%d", opts.EndRange),
  361. }
  362. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  363. "http",
  364. service.Name,
  365. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  366. "/api/v1/label/__name__/values",
  367. queryParams,
  368. )
  369. rawQuery, err := resp.DoRaw(context.TODO())
  370. if err != nil {
  371. return "kube_pod_container_resource_requests_cpu_cores"
  372. }
  373. rawQueryObj := &promRawValuesQuery{}
  374. json.Unmarshal(rawQuery, rawQueryObj)
  375. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  376. return "kube_pod_container_resource_requests"
  377. }
  378. return "kube_pod_container_resource_requests_cpu_cores"
  379. }
  380. func getKubeMemoryMetricName(
  381. clientset kubernetes.Interface,
  382. service *v1.Service,
  383. opts *QueryOpts,
  384. ) string {
  385. queryParams := map[string]string{
  386. "match[]": "kube_pod_container_resource_requests",
  387. "start": fmt.Sprintf("%d", opts.StartRange),
  388. "end": fmt.Sprintf("%d", opts.EndRange),
  389. }
  390. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  391. "http",
  392. service.Name,
  393. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  394. "/api/v1/label/__name__/values",
  395. queryParams,
  396. )
  397. rawQuery, err := resp.DoRaw(context.TODO())
  398. if err != nil {
  399. return "kube_pod_container_resource_requests_memory_bytes"
  400. }
  401. rawQueryObj := &promRawValuesQuery{}
  402. json.Unmarshal(rawQuery, rawQueryObj)
  403. if rawQueryObj.Status == "success" && len(rawQueryObj.Data) == 1 {
  404. return "kube_pod_container_resource_requests"
  405. }
  406. return "kube_pod_container_resource_requests_memory_bytes"
  407. }