metrics.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package prometheus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. v1 "k8s.io/api/core/v1"
  8. "k8s.io/client-go/kubernetes"
  9. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  10. )
  11. // returns the prometheus service name
  12. func GetPrometheusService(clientset kubernetes.Interface) (*v1.Service, bool, error) {
  13. services, err := clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{
  14. LabelSelector: "app=prometheus,component=server,heritage=Helm",
  15. })
  16. if err != nil {
  17. return nil, false, err
  18. }
  19. if len(services.Items) == 0 {
  20. return nil, false, nil
  21. }
  22. return &services.Items[0], true, nil
  23. }
  24. type QueryOpts struct {
  25. Metric string `schema:"metric"`
  26. ShouldSum bool `schema:"shouldsum"`
  27. PodList []string `schema:"pods"`
  28. Namespace string `schema:"namespace"`
  29. StartRange uint `schema:"startrange"`
  30. EndRange uint `schema:"endrange"`
  31. Resolution string `schema:"resolution"`
  32. }
  33. func QueryPrometheus(
  34. clientset kubernetes.Interface,
  35. service *v1.Service,
  36. opts *QueryOpts,
  37. ) ([]byte, error) {
  38. if len(service.Spec.Ports) == 0 {
  39. return nil, fmt.Errorf("prometheus service has no exposed ports to query")
  40. }
  41. podSelector := fmt.Sprintf(`namespace="%s",pod=~"%s",container!="POD",container!=""`, opts.Namespace, strings.Join(opts.PodList, "|"))
  42. query := ""
  43. if opts.Metric == "cpu" {
  44. query = fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[5m])", podSelector)
  45. } else if opts.Metric == "memory" {
  46. query = fmt.Sprintf("container_memory_usage_bytes{%s}", podSelector)
  47. } else if opts.Metric == "network" {
  48. netPodSelector := fmt.Sprintf(`namespace="%s",pod=~"%s",container="POD"`, opts.Namespace, strings.Join(opts.PodList, "|"))
  49. query = fmt.Sprintf("rate(container_network_receive_bytes_total{%s}[5m])", netPodSelector)
  50. }
  51. if opts.ShouldSum {
  52. query = fmt.Sprintf("sum(%s)", query)
  53. }
  54. queryParams := map[string]string{
  55. "query": query,
  56. "start": fmt.Sprintf("%d", opts.StartRange),
  57. "end": fmt.Sprintf("%d", opts.EndRange),
  58. "step": opts.Resolution,
  59. }
  60. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  61. "http",
  62. service.Name,
  63. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  64. "/api/v1/query_range",
  65. queryParams,
  66. )
  67. rawQuery, err := resp.DoRaw(context.TODO())
  68. if err != nil {
  69. return nil, err
  70. }
  71. return parseQuery(rawQuery, opts.Metric)
  72. }
  73. type promRawQuery struct {
  74. Data struct {
  75. Result []struct {
  76. Metric struct {
  77. Pod string `json:"pod,omitempty"`
  78. } `json:"metric,omitempty"`
  79. Values [][]interface{} `json:"values"`
  80. } `json:"result"`
  81. } `json:"data"`
  82. }
  83. type promParsedSingletonQueryResult struct {
  84. Date interface{} `json:"date,omitempty"`
  85. CPU interface{} `json:"cpu,omitempty"`
  86. Memory interface{} `json:"memory,omitempty"`
  87. Bytes interface{} `json:"bytes,omitempty"`
  88. }
  89. type promParsedSingletonQuery struct {
  90. Pod string `json:"pod,omitempty"`
  91. Results []promParsedSingletonQueryResult `json:"results"`
  92. }
  93. func parseQuery(rawQuery []byte, metric string) ([]byte, error) {
  94. rawQueryObj := &promRawQuery{}
  95. json.Unmarshal(rawQuery, rawQueryObj)
  96. res := make([]*promParsedSingletonQuery, 0)
  97. for _, result := range rawQueryObj.Data.Result {
  98. singleton := &promParsedSingletonQuery{
  99. Pod: result.Metric.Pod,
  100. }
  101. singletonResults := make([]promParsedSingletonQueryResult, 0)
  102. for _, values := range result.Values {
  103. singletonResult := &promParsedSingletonQueryResult{
  104. Date: values[0],
  105. }
  106. if metric == "cpu" {
  107. singletonResult.CPU = values[1]
  108. } else if metric == "memory" {
  109. singletonResult.Memory = values[1]
  110. } else if metric == "network" {
  111. singletonResult.Bytes = values[1]
  112. }
  113. singletonResults = append(singletonResults, *singletonResult)
  114. }
  115. singleton.Results = singletonResults
  116. res = append(res, singleton)
  117. }
  118. return json.Marshal(res)
  119. }