diagnostics.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package prom
  2. import (
  3. "fmt"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/source"
  6. prometheus "github.com/prometheus/client_golang/api"
  7. )
  8. // Prometheus Metric Diagnostic IDs
  9. const (
  10. // CAdvisorDiagnosticMetricID is the identifier of the metric used to determine if cAdvisor is being scraped.
  11. CAdvisorDiagnosticMetricID = "cadvisorMetric"
  12. // CAdvisorLabelDiagnosticMetricID is the identifier of the metric used to determine if cAdvisor labels are correct.
  13. CAdvisorLabelDiagnosticMetricID = "cadvisorLabel"
  14. // KSMDiagnosticMetricID is the identifier for the metric used to determine if KSM metrics are being scraped.
  15. KSMDiagnosticMetricID = "ksmMetric"
  16. // KSMVersionDiagnosticMetricID is the identifier for the metric used to determine if KSM version is correct.
  17. KSMVersionDiagnosticMetricID = "ksmVersion"
  18. // KubecostDiagnosticMetricID is the identifier for the metric used to determine if Kubecost metrics are being scraped.
  19. KubecostDiagnosticMetricID = "kubecostMetric"
  20. // NodeExporterDiagnosticMetricID is the identifier for the metric used to determine if NodeExporter metrics are being scraped.
  21. NodeExporterDiagnosticMetricID = "neMetric"
  22. // ScrapeIntervalDiagnosticMetricID is the identifier for the metric used to determine if prometheus has its own self-scraped
  23. // metrics.
  24. ScrapeIntervalDiagnosticMetricID = "scrapeInterval"
  25. // CPUThrottlingDiagnosticMetricID is the identifier for the metric used to determine if CPU throttling is being applied to the
  26. // cost-model container.
  27. CPUThrottlingDiagnosticMetricID = "cpuThrottling"
  28. // KubecostRecordingRuleCPUUsageID is the identifier for the query used to
  29. // determine of the CPU usage recording rule is set up correctly.
  30. KubecostRecordingRuleCPUUsageID = "kubecostRecordingRuleCPUUsage"
  31. // CAdvisorWorkingSetBytesMetricID is the identifier for the query used to determine
  32. // if cAdvisor working set bytes data is being scraped
  33. CAdvisorWorkingSetBytesMetricID = "cadvisorWorkingSetBytesMetric"
  34. // KSMCPUCapacityMetricID is the identifier for the query used to determine if
  35. // KSM CPU capacity data is being scraped
  36. KSMCPUCapacityMetricID = "ksmCpuCapacityMetric"
  37. // KSMAllocatableCPUCoresMetricID is the identifier for the query used to determine
  38. // if KSM allocatable CPU core data is being scraped
  39. KSMAllocatableCPUCoresMetricID = "ksmAllocatableCpuCoresMetric"
  40. )
  41. const DocumentationBaseURL = "https://www.opencost.io/docs/"
  42. // diagnostic definitions mapping holds all of the diagnostic definitions that can be used for prometheus metrics diagnostics
  43. var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnosticDefinition{
  44. CAdvisorDiagnosticMetricID: {
  45. ID: CAdvisorDiagnosticMetricID,
  46. QueryFmt: `absent_over_time(container_cpu_usage_seconds_total{%s}[5m] %s)`,
  47. Label: "cAdvisor metrics available",
  48. Description: "Determine if cAdvisor metrics are available during last 5 minutes.",
  49. DocLink: fmt.Sprintf("%s#cadvisor-metrics-available", DocumentationBaseURL),
  50. },
  51. KSMDiagnosticMetricID: {
  52. ID: KSMDiagnosticMetricID,
  53. QueryFmt: `absent_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", %s}[5m] %s)`,
  54. Label: "Kube-state-metrics available",
  55. Description: "Determine if metrics from kube-state-metrics are available during last 5 minutes.",
  56. DocLink: fmt.Sprintf("%s#kube-state-metrics-metrics-available", DocumentationBaseURL),
  57. },
  58. KubecostDiagnosticMetricID: {
  59. ID: KubecostDiagnosticMetricID,
  60. QueryFmt: `absent_over_time(node_cpu_hourly_cost{%s}[5m] %s)`,
  61. Label: "Kubecost metrics available",
  62. Description: "Determine if metrics from Kubecost are available during last 5 minutes.",
  63. },
  64. NodeExporterDiagnosticMetricID: {
  65. ID: NodeExporterDiagnosticMetricID,
  66. QueryFmt: `absent_over_time(node_cpu_seconds_total{%s}[5m] %s)`,
  67. Label: "Node-exporter metrics available",
  68. Description: "Determine if metrics from node-exporter are available during last 5 minutes.",
  69. DocLink: fmt.Sprintf("%s#node-exporter-metrics-available", DocumentationBaseURL),
  70. },
  71. CAdvisorLabelDiagnosticMetricID: {
  72. ID: CAdvisorLabelDiagnosticMetricID,
  73. QueryFmt: `absent_over_time(container_cpu_usage_seconds_total{container!="",pod!="", %s}[5m] %s)`,
  74. Label: "Expected cAdvisor labels available",
  75. Description: "Determine if expected cAdvisor labels are present during last 5 minutes.",
  76. DocLink: fmt.Sprintf("%s#cadvisor-metrics-available", DocumentationBaseURL),
  77. },
  78. KSMVersionDiagnosticMetricID: {
  79. ID: KSMVersionDiagnosticMetricID,
  80. QueryFmt: `absent_over_time(kube_persistentvolume_capacity_bytes{%s}[5m] %s)`,
  81. Label: "Expected kube-state-metrics version found",
  82. Description: "Determine if metric in required kube-state-metrics version are present during last 5 minutes.",
  83. DocLink: fmt.Sprintf("%s#expected-kube-state-metrics-version-found", DocumentationBaseURL),
  84. },
  85. ScrapeIntervalDiagnosticMetricID: {
  86. ID: ScrapeIntervalDiagnosticMetricID,
  87. QueryFmt: `absent_over_time(prometheus_target_interval_length_seconds{%s}[5m] %s)`,
  88. Label: "Expected Prometheus self-scrape metrics available",
  89. Description: "Determine if prometheus has its own self-scraped metrics during the last 5 minutes.",
  90. },
  91. CPUThrottlingDiagnosticMetricID: {
  92. ID: CPUThrottlingDiagnosticMetricID,
  93. QueryFmt: `avg(increase(container_cpu_cfs_throttled_periods_total{container="cost-model", %s}[10m] %s)) by (container_name, pod_name, namespace)
  94. / avg(increase(container_cpu_cfs_periods_total{container="cost-model",%s}[10m] %s)) by (container_name, pod_name, namespace) > 0.2`,
  95. Label: "Kubecost is not CPU throttled",
  96. Description: "Kubecost loading slowly? A kubecost component might be CPU throttled",
  97. },
  98. KubecostRecordingRuleCPUUsageID: {
  99. ID: KubecostRecordingRuleCPUUsageID,
  100. QueryFmt: `absent_over_time(kubecost_container_cpu_usage_irate{%s}[5m] %s)`,
  101. Label: "Kubecost's CPU usage recording rule is set up",
  102. Description: "If the 'kubecost_container_cpu_usage_irate' recording rule is not set up, Allocation pipeline build may put pressure on your Prometheus due to the use of a subquery.",
  103. DocLink: "https://www.opencost.io/docs/installation/prometheus",
  104. },
  105. CAdvisorWorkingSetBytesMetricID: {
  106. ID: CAdvisorWorkingSetBytesMetricID,
  107. QueryFmt: `absent_over_time(container_memory_working_set_bytes{container="cost-model", container!="POD", instance!="", %s}[5m] %s)`,
  108. Label: "cAdvisor working set bytes metrics available",
  109. Description: "Determine if cAdvisor working set bytes metrics are available during last 5 minutes.",
  110. },
  111. KSMCPUCapacityMetricID: {
  112. ID: KSMCPUCapacityMetricID,
  113. QueryFmt: `absent_over_time(kube_node_status_capacity_cpu_cores{%s}[5m] %s)`,
  114. Label: "KSM had CPU capacity during the last 5 minutes",
  115. Description: "Determine if KSM had CPU capacity during the last 5 minutes",
  116. },
  117. KSMAllocatableCPUCoresMetricID: {
  118. ID: KSMAllocatableCPUCoresMetricID,
  119. QueryFmt: `absent_over_time(kube_node_status_allocatable_cpu_cores{%s}[5m] %s)`,
  120. Label: "KSM had allocatable CPU cores during the last 5 minutes",
  121. Description: "Determine if KSM had allocatable CPU cores during the last 5 minutes",
  122. },
  123. }
  124. // RequestCounter is used to determine if the prometheus client keeps track of
  125. // the concurrent outbound requests
  126. type RequestCounter interface {
  127. TotalQueuedRequests() int
  128. TotalOutboundRequests() int
  129. }
  130. // QueuedPromRequest is a representation of a request waiting to be sent by the prometheus
  131. // client.
  132. type QueuedPromRequest struct {
  133. Context string `json:"context"`
  134. Query string `json:"query"`
  135. QueueTime int64 `json:"queueTime"`
  136. }
  137. // PrometheusQueueState contains diagnostic information concerning the state of the prometheus request
  138. // queue
  139. type PrometheusQueueState struct {
  140. QueuedRequests []*QueuedPromRequest `json:"queuedRequests"`
  141. OutboundRequests int `json:"outboundRequests"`
  142. TotalRequests int `json:"totalRequests"`
  143. MaxQueryConcurrency int `json:"maxQueryConcurrency"`
  144. }
  145. // GetPrometheusQueueState is a diagnostic function that probes the prometheus request queue and gathers
  146. // query, context, and queue statistics.
  147. func GetPrometheusQueueState(client prometheus.Client, config *OpenCostPrometheusConfig) (*PrometheusQueueState, error) {
  148. rlpc, ok := client.(*RateLimitedPrometheusClient)
  149. if !ok {
  150. return nil, fmt.Errorf("Failed to get prometheus queue state for the provided client. Must be of type RateLimitedPrometheusClient.")
  151. }
  152. outbound := rlpc.TotalOutboundRequests()
  153. requests := []*QueuedPromRequest{}
  154. rlpc.EachQueuedRequest(func(ctx string, query string, queueTimeMs int64) {
  155. requests = append(requests, &QueuedPromRequest{
  156. Context: ctx,
  157. Query: query,
  158. QueueTime: queueTimeMs,
  159. })
  160. })
  161. return &PrometheusQueueState{
  162. QueuedRequests: requests,
  163. OutboundRequests: outbound,
  164. TotalRequests: outbound + len(requests),
  165. MaxQueryConcurrency: config.ClientConfig.QueryConcurrency,
  166. }, nil
  167. }
  168. // LogPrometheusClientState logs the current state, with respect to outbound requests, if that
  169. // information is available.
  170. func LogPrometheusClientState(client prometheus.Client) {
  171. if rc, ok := client.(RequestCounter); ok {
  172. queued := rc.TotalQueuedRequests()
  173. outbound := rc.TotalOutboundRequests()
  174. total := queued + outbound
  175. log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
  176. }
  177. }
  178. // GetPrometheusMetrics returns a list of the state of Prometheus metric used by kubecost using the provided client
  179. func GetPrometheusMetrics(client prometheus.Client, config *OpenCostPrometheusConfig, offset string) PrometheusDiagnostics {
  180. ctx := NewNamedContext(client, config, DiagnosticContextName)
  181. var result []*PrometheusDiagnostic
  182. for _, definition := range diagnosticDefinitions {
  183. pd := definition.NewDiagnostic(config.ClusterFilter, offset)
  184. err := pd.executePrometheusDiagnosticQuery(ctx)
  185. // log the errror, append to results anyways, and continue
  186. if err != nil {
  187. log.Errorf("error: %s", err.Error())
  188. }
  189. result = append(result, pd)
  190. }
  191. return result
  192. }
  193. // GetPrometheusMetricsByID returns a list of the state of specific Prometheus metrics by identifier.
  194. func GetPrometheusMetricsByID(ids []string, client prometheus.Client, config *OpenCostPrometheusConfig, offset string) PrometheusDiagnostics {
  195. ctx := NewNamedContext(client, config, DiagnosticContextName)
  196. var result []*PrometheusDiagnostic
  197. for _, id := range ids {
  198. if definition, ok := diagnosticDefinitions[id]; ok {
  199. pd := definition.NewDiagnostic(config.ClusterFilter, offset)
  200. err := pd.executePrometheusDiagnosticQuery(ctx)
  201. // log the errror, append to results anyways, and continue
  202. if err != nil {
  203. log.Errorf("error: %s", err.Error())
  204. }
  205. result = append(result, pd)
  206. } else {
  207. log.Warnf("Failed to find diagnostic definition for id: %s", id)
  208. }
  209. }
  210. return result
  211. }
  212. // PrometheusDiagnostics is a PrometheusDiagnostic container with helper methods.
  213. type PrometheusDiagnostics []*PrometheusDiagnostic
  214. // HasFailure returns true if any of the diagnostic tests didn't pass.
  215. func (pd PrometheusDiagnostics) HasFailure() bool {
  216. for _, p := range pd {
  217. if !p.Passed {
  218. return true
  219. }
  220. }
  221. return false
  222. }
  223. // diagnosticDefinition is a definition of a diagnostic that can be used to create new
  224. // PrometheusDiagnostic instances using the definition's fields.
  225. type diagnosticDefinition struct {
  226. ID string
  227. QueryFmt string
  228. Label string
  229. Description string
  230. DocLink string
  231. }
  232. // NewDiagnostic creates a new PrometheusDiagnostic instance using the provided definition data.
  233. func (pdd *diagnosticDefinition) NewDiagnostic(filter string, offset string) *PrometheusDiagnostic {
  234. // FIXME: Any reasonable way to get the total number of replacements required in the query?
  235. // FIXME: All of the other queries require a single offset replace, but CPUThrottle requires two.
  236. var query string
  237. if pdd.ID == CPUThrottlingDiagnosticMetricID {
  238. query = fmt.Sprintf(pdd.QueryFmt, filter, offset, filter, offset)
  239. } else {
  240. query = fmt.Sprintf(pdd.QueryFmt, filter, offset)
  241. }
  242. return &PrometheusDiagnostic{
  243. ID: pdd.ID,
  244. Query: query,
  245. Label: pdd.Label,
  246. Description: pdd.Description,
  247. DocLink: pdd.DocLink,
  248. }
  249. }
  250. // PrometheusDiagnostic holds information about a metric and the query to ensure it is functional
  251. type PrometheusDiagnostic struct {
  252. ID string `json:"id"`
  253. Query string `json:"query"`
  254. Label string `json:"label"`
  255. Description string `json:"description"`
  256. DocLink string `json:"docLink"`
  257. Result []*source.QueryResult `json:"result"`
  258. Passed bool `json:"passed"`
  259. }
  260. // executePrometheusDiagnosticQuery executes a PrometheusDiagnostic query using the given context
  261. func (pd *PrometheusDiagnostic) executePrometheusDiagnosticQuery(ctx *Context) error {
  262. resultCh := ctx.Query(pd.Query)
  263. result, err := resultCh.Await()
  264. if err != nil {
  265. return fmt.Errorf("prometheus diagnostic %s failed with error: %s", pd.ID, err)
  266. }
  267. if result == nil {
  268. result = []*source.QueryResult{}
  269. }
  270. pd.Result = result
  271. pd.Passed = len(result) == 0
  272. return nil
  273. }