promparsers.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. package costmodel
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/util"
  10. costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud/models"
  11. "github.com/opencost/opencost/pkg/clustercache"
  12. "github.com/opencost/opencost/pkg/env"
  13. "github.com/opencost/opencost/pkg/prom"
  14. )
  15. var (
  16. // prometheusVersion stores the Prometheus server version (major.minor.patch).
  17. // Defaults to "0.0.0" if version cannot be retrieved
  18. prometheusVersion = "0.0.0"
  19. )
  20. // IsPrometheusVersionGTE3 returns true if the Prometheus server's major version
  21. // is 3 or higher.
  22. func IsPrometheusVersionGTE3() bool {
  23. if v := strings.Split(prometheusVersion, "."); len(v) > 0 {
  24. if major, err := strconv.Atoi(v[0]); err == nil && major >= 3 {
  25. return true
  26. }
  27. }
  28. return false
  29. }
  30. func GetPVInfoLocal(cache clustercache.ClusterCache, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
  31. toReturn := make(map[string]*PersistentVolumeClaimData)
  32. pvcs := cache.GetAllPersistentVolumeClaims()
  33. for _, pvc := range pvcs {
  34. var vals []*util.Vector
  35. vals = append(vals, &util.Vector{
  36. Timestamp: float64(time.Now().Unix()),
  37. Value: float64(pvc.Spec.Resources.Requests.Storage().Value()),
  38. })
  39. ns := pvc.Namespace
  40. pvcName := pvc.Name
  41. volumeName := pvc.Spec.VolumeName
  42. pvClass := ""
  43. if pvc.Spec.StorageClassName != nil {
  44. pvClass = *pvc.Spec.StorageClassName
  45. }
  46. clusterID := defaultClusterID
  47. key := fmt.Sprintf("%s,%s,%s", ns, pvcName, clusterID)
  48. toReturn[key] = &PersistentVolumeClaimData{
  49. Class: pvClass,
  50. Claim: pvcName,
  51. Namespace: ns,
  52. ClusterID: clusterID,
  53. VolumeName: volumeName,
  54. Values: vals,
  55. }
  56. }
  57. return toReturn, nil
  58. }
  59. // TODO niko/prom move parsing functions from costmodel.go
  60. func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
  61. toReturn := make(map[string]*PersistentVolumeClaimData)
  62. for _, val := range qrs {
  63. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  64. if clusterID == "" {
  65. clusterID = defaultClusterID
  66. }
  67. ns, err := val.GetString("namespace")
  68. if err != nil {
  69. return toReturn, err
  70. }
  71. pvcName, err := val.GetString("persistentvolumeclaim")
  72. if err != nil {
  73. return toReturn, err
  74. }
  75. volumeName, err := val.GetString("volumename")
  76. if err != nil {
  77. log.Debugf("Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
  78. volumeName = ""
  79. }
  80. pvClass, err := val.GetString("storageclass")
  81. if err != nil {
  82. // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
  83. log.DedupedWarningf(5, "Storage Class not found for claim \"%s/%s\".", ns, pvcName)
  84. pvClass = ""
  85. }
  86. key := fmt.Sprintf("%s,%s,%s", ns, pvcName, clusterID)
  87. toReturn[key] = &PersistentVolumeClaimData{
  88. Class: pvClass,
  89. Claim: pvcName,
  90. Namespace: ns,
  91. ClusterID: clusterID,
  92. VolumeName: volumeName,
  93. Values: val.Values,
  94. }
  95. }
  96. return toReturn, nil
  97. }
  98. func GetPVAllocationMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
  99. toReturn := make(map[string][]*PersistentVolumeClaimData)
  100. for _, val := range qrs {
  101. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  102. if clusterID == "" {
  103. clusterID = defaultClusterID
  104. }
  105. ns, err := val.GetString("namespace")
  106. if err != nil {
  107. return toReturn, err
  108. }
  109. pod, err := val.GetString("pod")
  110. if err != nil {
  111. return toReturn, err
  112. }
  113. pvcName, err := val.GetString("persistentvolumeclaim")
  114. if err != nil {
  115. return toReturn, err
  116. }
  117. pvName, err := val.GetString("persistentvolume")
  118. if err != nil {
  119. log.Warnf("persistentvolume field does not exist for pv %s", pvcName) // This is possible for an unfulfilled claim
  120. continue
  121. }
  122. key := fmt.Sprintf("%s,%s,%s", ns, pod, clusterID)
  123. pvcData := &PersistentVolumeClaimData{
  124. Class: "",
  125. Claim: pvcName,
  126. Namespace: ns,
  127. ClusterID: clusterID,
  128. VolumeName: pvName,
  129. Values: val.Values,
  130. }
  131. toReturn[key] = append(toReturn[key], pvcData)
  132. }
  133. return toReturn, nil
  134. }
  135. func GetPVCostMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
  136. toReturn := make(map[string]*costAnalyzerCloud.PV)
  137. for _, val := range qrs {
  138. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  139. if clusterID == "" {
  140. clusterID = defaultClusterID
  141. }
  142. volumeName, err := val.GetString("volumename")
  143. if err != nil {
  144. return toReturn, err
  145. }
  146. key := fmt.Sprintf("%s,%s", volumeName, clusterID)
  147. toReturn[key] = &costAnalyzerCloud.PV{
  148. Cost: fmt.Sprintf("%f", val.Values[0].Value),
  149. }
  150. }
  151. return toReturn, nil
  152. }
  153. func GetNamespaceLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  154. toReturn := make(map[string]map[string]string)
  155. for _, val := range qrs {
  156. // We want Namespace and ClusterID for key generation purposes
  157. ns, err := val.GetString("namespace")
  158. if err != nil {
  159. return toReturn, err
  160. }
  161. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  162. if clusterID == "" {
  163. clusterID = defaultClusterID
  164. }
  165. nsKey := ns + "," + clusterID
  166. if nsLabels, ok := toReturn[nsKey]; ok {
  167. for k, v := range val.GetLabels() {
  168. nsLabels[k] = v // override with more recently assigned if we changed labels within the window.
  169. }
  170. } else {
  171. toReturn[nsKey] = val.GetLabels()
  172. }
  173. }
  174. return toReturn, nil
  175. }
  176. func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  177. toReturn := make(map[string]map[string]string)
  178. for _, val := range qrs {
  179. // We want Pod, Namespace and ClusterID for key generation purposes
  180. pod, err := val.GetString("pod")
  181. if err != nil {
  182. return toReturn, err
  183. }
  184. ns, err := val.GetString("namespace")
  185. if err != nil {
  186. return toReturn, err
  187. }
  188. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  189. if clusterID == "" {
  190. clusterID = defaultClusterID
  191. }
  192. nsKey := ns + "," + pod + "," + clusterID
  193. if labels, ok := toReturn[nsKey]; ok {
  194. newlabels := val.GetLabels()
  195. for k, v := range newlabels {
  196. labels[k] = v
  197. }
  198. } else {
  199. toReturn[nsKey] = val.GetLabels()
  200. }
  201. }
  202. return toReturn, nil
  203. }
  204. func GetNamespaceAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  205. toReturn := make(map[string]map[string]string)
  206. for _, val := range qrs {
  207. // We want Namespace and ClusterID for key generation purposes
  208. ns, err := val.GetString("namespace")
  209. if err != nil {
  210. return toReturn, err
  211. }
  212. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  213. if clusterID == "" {
  214. clusterID = defaultClusterID
  215. }
  216. nsKey := ns + "," + clusterID
  217. if nsAnnotations, ok := toReturn[nsKey]; ok {
  218. for k, v := range val.GetAnnotations() {
  219. nsAnnotations[k] = v // override with more recently assigned if we changed labels within the window.
  220. }
  221. } else {
  222. toReturn[nsKey] = val.GetAnnotations()
  223. }
  224. }
  225. return toReturn, nil
  226. }
  227. func GetPodAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  228. toReturn := make(map[string]map[string]string)
  229. for _, val := range qrs {
  230. // We want Pod, Namespace and ClusterID for key generation purposes
  231. pod, err := val.GetString("pod")
  232. if err != nil {
  233. return toReturn, err
  234. }
  235. ns, err := val.GetString("namespace")
  236. if err != nil {
  237. return toReturn, err
  238. }
  239. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  240. if clusterID == "" {
  241. clusterID = defaultClusterID
  242. }
  243. nsKey := ns + "," + pod + "," + clusterID
  244. if labels, ok := toReturn[nsKey]; ok {
  245. for k, v := range val.GetAnnotations() {
  246. labels[k] = v
  247. }
  248. } else {
  249. toReturn[nsKey] = val.GetAnnotations()
  250. }
  251. }
  252. return toReturn, nil
  253. }
  254. func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  255. toReturn := make(map[string]map[string]string)
  256. for _, val := range qrs {
  257. // We want Statefulset, Namespace and ClusterID for key generation purposes
  258. ss, err := val.GetString("statefulSet")
  259. if err != nil {
  260. return toReturn, err
  261. }
  262. ns, err := val.GetString("namespace")
  263. if err != nil {
  264. return toReturn, err
  265. }
  266. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  267. if clusterID == "" {
  268. clusterID = defaultClusterID
  269. }
  270. nsKey := ns + "," + ss + "," + clusterID
  271. toReturn[nsKey] = val.GetLabels()
  272. }
  273. return toReturn, nil
  274. }
  275. func GetPodDaemonsetsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
  276. toReturn := make(map[string]string)
  277. for _, val := range qrs {
  278. ds, err := val.GetString("owner_name")
  279. if err != nil {
  280. return toReturn, err
  281. }
  282. ns, err := val.GetString("namespace")
  283. if err != nil {
  284. return toReturn, err
  285. }
  286. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  287. if clusterID == "" {
  288. clusterID = defaultClusterID
  289. }
  290. pod, err := val.GetString("pod")
  291. if err != nil {
  292. return toReturn, err
  293. }
  294. nsKey := ns + "," + pod + "," + clusterID
  295. toReturn[nsKey] = ds
  296. }
  297. return toReturn, nil
  298. }
  299. func GetPodJobsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
  300. toReturn := make(map[string]string)
  301. for _, val := range qrs {
  302. ds, err := val.GetString("owner_name")
  303. if err != nil {
  304. return toReturn, err
  305. }
  306. ns, err := val.GetString("namespace")
  307. if err != nil {
  308. return toReturn, err
  309. }
  310. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  311. if clusterID == "" {
  312. clusterID = defaultClusterID
  313. }
  314. pod, err := val.GetString("pod")
  315. if err != nil {
  316. return toReturn, err
  317. }
  318. nsKey := ns + "," + pod + "," + clusterID
  319. toReturn[nsKey] = ds
  320. }
  321. return toReturn, nil
  322. }
  323. func GetDeploymentMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  324. toReturn := make(map[string]map[string]string)
  325. for _, val := range qrs {
  326. // We want Deployment, Namespace and ClusterID for key generation purposes
  327. deployment, err := val.GetString("deployment")
  328. if err != nil {
  329. return toReturn, err
  330. }
  331. ns, err := val.GetString("namespace")
  332. if err != nil {
  333. return toReturn, err
  334. }
  335. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  336. if clusterID == "" {
  337. clusterID = defaultClusterID
  338. }
  339. nsKey := ns + "," + deployment + "," + clusterID
  340. toReturn[nsKey] = val.GetLabels()
  341. }
  342. return toReturn, nil
  343. }
  344. func GetServiceSelectorLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  345. toReturn := make(map[string]map[string]string)
  346. for _, val := range qrs {
  347. // We want Service, Namespace and ClusterID for key generation purposes
  348. service, err := val.GetString("service")
  349. if err != nil {
  350. return toReturn, err
  351. }
  352. ns, err := val.GetString("namespace")
  353. if err != nil {
  354. return toReturn, err
  355. }
  356. clusterID, _ := val.GetString(env.GetPromClusterLabel())
  357. if clusterID == "" {
  358. clusterID = defaultClusterID
  359. }
  360. nsKey := ns + "," + service + "," + clusterID
  361. toReturn[nsKey] = val.GetLabels()
  362. }
  363. return toReturn, nil
  364. }
  365. func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
  366. containerData := make(map[string][]*util.Vector)
  367. for _, val := range qrs {
  368. containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
  369. if err != nil {
  370. return nil, err
  371. }
  372. if normalize && normalizationValue != 0 {
  373. for _, v := range val.Values {
  374. v.Value = v.Value / normalizationValue
  375. }
  376. }
  377. containerData[containerMetric.Key()] = val.Values
  378. }
  379. return containerData, nil
  380. }
  381. func GetContainerMetricVectors(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*util.Vector, error) {
  382. containerData := make(map[string][]*util.Vector)
  383. for _, val := range qrs {
  384. containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
  385. if err != nil {
  386. return nil, err
  387. }
  388. containerData[containerMetric.Key()] = val.Values
  389. }
  390. return containerData, nil
  391. }
  392. func GetNormalizedContainerMetricVectors(qrs []*prom.QueryResult, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
  393. containerData := make(map[string][]*util.Vector)
  394. for _, val := range qrs {
  395. containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
  396. if err != nil {
  397. return nil, err
  398. }
  399. containerData[containerMetric.Key()] = util.NormalizeVectorByVector(val.Values, normalizationValues)
  400. }
  401. return containerData, nil
  402. }
  403. func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
  404. toReturn := make(map[string][]*util.Vector)
  405. for _, val := range qrs {
  406. instance, err := val.GetString("node")
  407. if err != nil {
  408. return toReturn, err
  409. }
  410. toReturn[instance] = val.Values
  411. }
  412. return toReturn, nil
  413. }
  414. // TODO niko/prom retain message:
  415. // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
  416. func getNormalization(qrs []*prom.QueryResult) (float64, error) {
  417. if len(qrs) == 0 {
  418. return 0.0, prom.NoDataErr("getNormalization")
  419. }
  420. if len(qrs[0].Values) == 0 {
  421. return 0.0, prom.NoDataErr("getNormalization")
  422. }
  423. return qrs[0].Values[0].Value, nil
  424. }
  425. // TODO niko/prom retain message:
  426. // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
  427. func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
  428. if len(qrs) == 0 {
  429. return nil, prom.NoDataErr("getNormalizations")
  430. }
  431. return qrs[0].Values, nil
  432. }
  433. func parsePodLabels(qrs []*prom.QueryResult) (map[string]map[string]string, error) {
  434. podLabels := map[string]map[string]string{}
  435. for _, result := range qrs {
  436. pod, err := result.GetString("pod")
  437. if err != nil {
  438. return podLabels, errors.New("missing pod field")
  439. }
  440. if _, ok := podLabels[pod]; ok {
  441. podLabels[pod] = result.GetLabels()
  442. } else {
  443. podLabels[pod] = map[string]string{}
  444. podLabels[pod] = result.GetLabels()
  445. }
  446. }
  447. return podLabels, nil
  448. }