promparsers.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. package costmodel
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud"
  7. "github.com/opencost/opencost/pkg/clustercache"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/pkg/log"
  10. "github.com/opencost/opencost/pkg/prom"
  11. "github.com/opencost/opencost/pkg/util"
  12. )
  13. func GetPVInfoLocal(cache clustercache.ClusterCache, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
  14. toReturn := make(map[string]*PersistentVolumeClaimData)
  15. pvcs := cache.GetAllPersistentVolumeClaims()
  16. for _, pvc := range pvcs {
  17. var vals []*util.Vector
  18. vals = append(vals, &util.Vector{
  19. Timestamp: float64(time.Now().Unix()),
  20. Value: float64(pvc.Spec.Resources.Requests.Storage().Value()),
  21. })
  22. ns := pvc.Namespace
  23. pvcName := pvc.Name
  24. volumeName := pvc.Spec.VolumeName
  25. pvClass := ""
  26. if pvc.Spec.StorageClassName != nil {
  27. pvClass = *pvc.Spec.StorageClassName
  28. }
  29. clusterID := defaultClusterID
  30. key := fmt.Sprintf("%s,%s,%s", ns, pvcName, clusterID)
  31. toReturn[key] = &PersistentVolumeClaimData{
  32. Class: pvClass,
  33. Claim: pvcName,
  34. Namespace: ns,
  35. ClusterID: clusterID,
  36. VolumeName: volumeName,
  37. Values: vals,
  38. }
  39. }
  40. return toReturn, nil
  41. }
  42. // TODO niko/prom move parsing functions from costmodel.go
  43. func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
  44. toReturn := make(map[string]*PersistentVolumeClaimData)
  45. for _, val := range qrs {
  46. clusterID, err := val.GetString(env.GetPromClusterLabel())
  47. if clusterID == "" {
  48. clusterID = defaultClusterID
  49. }
  50. ns, err := val.GetString("namespace")
  51. if err != nil {
  52. return toReturn, err
  53. }
  54. pvcName, err := val.GetString("persistentvolumeclaim")
  55. if err != nil {
  56. return toReturn, err
  57. }
  58. volumeName, err := val.GetString("volumename")
  59. if err != nil {
  60. log.Debugf("Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
  61. volumeName = ""
  62. }
  63. pvClass, err := val.GetString("storageclass")
  64. if err != nil {
  65. // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
  66. log.DedupedWarningf(5, "Storage Class not found for claim \"%s/%s\".", ns, pvcName)
  67. pvClass = ""
  68. }
  69. key := fmt.Sprintf("%s,%s,%s", ns, pvcName, clusterID)
  70. toReturn[key] = &PersistentVolumeClaimData{
  71. Class: pvClass,
  72. Claim: pvcName,
  73. Namespace: ns,
  74. ClusterID: clusterID,
  75. VolumeName: volumeName,
  76. Values: val.Values,
  77. }
  78. }
  79. return toReturn, nil
  80. }
  81. func GetPVAllocationMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
  82. toReturn := make(map[string][]*PersistentVolumeClaimData)
  83. for _, val := range qrs {
  84. clusterID, err := val.GetString(env.GetPromClusterLabel())
  85. if clusterID == "" {
  86. clusterID = defaultClusterID
  87. }
  88. ns, err := val.GetString("namespace")
  89. if err != nil {
  90. return toReturn, err
  91. }
  92. pod, err := val.GetString("pod")
  93. if err != nil {
  94. return toReturn, err
  95. }
  96. pvcName, err := val.GetString("persistentvolumeclaim")
  97. if err != nil {
  98. return toReturn, err
  99. }
  100. pvName, err := val.GetString("persistentvolume")
  101. if err != nil {
  102. log.Warnf("persistentvolume field does not exist for pv %s", pvcName) // This is possible for an unfulfilled claim
  103. continue
  104. }
  105. key := fmt.Sprintf("%s,%s,%s", ns, pod, clusterID)
  106. pvcData := &PersistentVolumeClaimData{
  107. Class: "",
  108. Claim: pvcName,
  109. Namespace: ns,
  110. ClusterID: clusterID,
  111. VolumeName: pvName,
  112. Values: val.Values,
  113. }
  114. toReturn[key] = append(toReturn[key], pvcData)
  115. }
  116. return toReturn, nil
  117. }
  118. func GetPVCostMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
  119. toReturn := make(map[string]*costAnalyzerCloud.PV)
  120. for _, val := range qrs {
  121. clusterID, err := val.GetString(env.GetPromClusterLabel())
  122. if clusterID == "" {
  123. clusterID = defaultClusterID
  124. }
  125. volumeName, err := val.GetString("volumename")
  126. if err != nil {
  127. return toReturn, err
  128. }
  129. key := fmt.Sprintf("%s,%s", volumeName, clusterID)
  130. toReturn[key] = &costAnalyzerCloud.PV{
  131. Cost: fmt.Sprintf("%f", val.Values[0].Value),
  132. }
  133. }
  134. return toReturn, nil
  135. }
  136. func GetNamespaceLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  137. toReturn := make(map[string]map[string]string)
  138. for _, val := range qrs {
  139. // We want Namespace and ClusterID for key generation purposes
  140. ns, err := val.GetString("namespace")
  141. if err != nil {
  142. return toReturn, err
  143. }
  144. clusterID, err := val.GetString(env.GetPromClusterLabel())
  145. if clusterID == "" {
  146. clusterID = defaultClusterID
  147. }
  148. nsKey := ns + "," + clusterID
  149. if nsLabels, ok := toReturn[nsKey]; ok {
  150. for k, v := range val.GetLabels() {
  151. nsLabels[k] = v // override with more recently assigned if we changed labels within the window.
  152. }
  153. } else {
  154. toReturn[nsKey] = val.GetLabels()
  155. }
  156. }
  157. return toReturn, nil
  158. }
  159. func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  160. toReturn := make(map[string]map[string]string)
  161. for _, val := range qrs {
  162. // We want Pod, Namespace and ClusterID for key generation purposes
  163. pod, err := val.GetString("pod")
  164. if err != nil {
  165. return toReturn, err
  166. }
  167. ns, err := val.GetString("namespace")
  168. if err != nil {
  169. return toReturn, err
  170. }
  171. clusterID, err := val.GetString(env.GetPromClusterLabel())
  172. if clusterID == "" {
  173. clusterID = defaultClusterID
  174. }
  175. nsKey := ns + "," + pod + "," + clusterID
  176. if labels, ok := toReturn[nsKey]; ok {
  177. newlabels := val.GetLabels()
  178. for k, v := range newlabels {
  179. labels[k] = v
  180. }
  181. } else {
  182. toReturn[nsKey] = val.GetLabels()
  183. }
  184. }
  185. return toReturn, nil
  186. }
  187. func GetNamespaceAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  188. toReturn := make(map[string]map[string]string)
  189. for _, val := range qrs {
  190. // We want Namespace and ClusterID for key generation purposes
  191. ns, err := val.GetString("namespace")
  192. if err != nil {
  193. return toReturn, err
  194. }
  195. clusterID, err := val.GetString(env.GetPromClusterLabel())
  196. if clusterID == "" {
  197. clusterID = defaultClusterID
  198. }
  199. nsKey := ns + "," + clusterID
  200. if nsAnnotations, ok := toReturn[nsKey]; ok {
  201. for k, v := range val.GetAnnotations() {
  202. nsAnnotations[k] = v // override with more recently assigned if we changed labels within the window.
  203. }
  204. } else {
  205. toReturn[nsKey] = val.GetAnnotations()
  206. }
  207. }
  208. return toReturn, nil
  209. }
  210. func GetPodAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  211. toReturn := make(map[string]map[string]string)
  212. for _, val := range qrs {
  213. // We want Pod, Namespace and ClusterID for key generation purposes
  214. pod, err := val.GetString("pod")
  215. if err != nil {
  216. return toReturn, err
  217. }
  218. ns, err := val.GetString("namespace")
  219. if err != nil {
  220. return toReturn, err
  221. }
  222. clusterID, err := val.GetString(env.GetPromClusterLabel())
  223. if clusterID == "" {
  224. clusterID = defaultClusterID
  225. }
  226. nsKey := ns + "," + pod + "," + clusterID
  227. if labels, ok := toReturn[nsKey]; ok {
  228. for k, v := range val.GetAnnotations() {
  229. labels[k] = v
  230. }
  231. } else {
  232. toReturn[nsKey] = val.GetAnnotations()
  233. }
  234. }
  235. return toReturn, nil
  236. }
  237. func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  238. toReturn := make(map[string]map[string]string)
  239. for _, val := range qrs {
  240. // We want Statefulset, Namespace and ClusterID for key generation purposes
  241. ss, err := val.GetString("statefulSet")
  242. if err != nil {
  243. return toReturn, err
  244. }
  245. ns, err := val.GetString("namespace")
  246. if err != nil {
  247. return toReturn, err
  248. }
  249. clusterID, err := val.GetString(env.GetPromClusterLabel())
  250. if clusterID == "" {
  251. clusterID = defaultClusterID
  252. }
  253. nsKey := ns + "," + ss + "," + clusterID
  254. toReturn[nsKey] = val.GetLabels()
  255. }
  256. return toReturn, nil
  257. }
  258. func GetPodDaemonsetsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
  259. toReturn := make(map[string]string)
  260. for _, val := range qrs {
  261. ds, err := val.GetString("owner_name")
  262. if err != nil {
  263. return toReturn, err
  264. }
  265. ns, err := val.GetString("namespace")
  266. if err != nil {
  267. return toReturn, err
  268. }
  269. clusterID, err := val.GetString(env.GetPromClusterLabel())
  270. if clusterID == "" {
  271. clusterID = defaultClusterID
  272. }
  273. pod, err := val.GetString("pod")
  274. if err != nil {
  275. return toReturn, err
  276. }
  277. nsKey := ns + "," + pod + "," + clusterID
  278. toReturn[nsKey] = ds
  279. }
  280. return toReturn, nil
  281. }
  282. func GetPodJobsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
  283. toReturn := make(map[string]string)
  284. for _, val := range qrs {
  285. ds, err := val.GetString("owner_name")
  286. if err != nil {
  287. return toReturn, err
  288. }
  289. ns, err := val.GetString("namespace")
  290. if err != nil {
  291. return toReturn, err
  292. }
  293. clusterID, err := val.GetString(env.GetPromClusterLabel())
  294. if clusterID == "" {
  295. clusterID = defaultClusterID
  296. }
  297. pod, err := val.GetString("pod")
  298. if err != nil {
  299. return toReturn, err
  300. }
  301. nsKey := ns + "," + pod + "," + clusterID
  302. toReturn[nsKey] = ds
  303. }
  304. return toReturn, nil
  305. }
  306. func GetDeploymentMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  307. toReturn := make(map[string]map[string]string)
  308. for _, val := range qrs {
  309. // We want Deployment, Namespace and ClusterID for key generation purposes
  310. deployment, err := val.GetString("deployment")
  311. if err != nil {
  312. return toReturn, err
  313. }
  314. ns, err := val.GetString("namespace")
  315. if err != nil {
  316. return toReturn, err
  317. }
  318. clusterID, err := val.GetString(env.GetPromClusterLabel())
  319. if clusterID == "" {
  320. clusterID = defaultClusterID
  321. }
  322. nsKey := ns + "," + deployment + "," + clusterID
  323. toReturn[nsKey] = val.GetLabels()
  324. }
  325. return toReturn, nil
  326. }
  327. func GetServiceSelectorLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
  328. toReturn := make(map[string]map[string]string)
  329. for _, val := range qrs {
  330. // We want Service, Namespace and ClusterID for key generation purposes
  331. service, err := val.GetString("service")
  332. if err != nil {
  333. return toReturn, err
  334. }
  335. ns, err := val.GetString("namespace")
  336. if err != nil {
  337. return toReturn, err
  338. }
  339. clusterID, err := val.GetString(env.GetPromClusterLabel())
  340. if clusterID == "" {
  341. clusterID = defaultClusterID
  342. }
  343. nsKey := ns + "," + service + "," + clusterID
  344. toReturn[nsKey] = val.GetLabels()
  345. }
  346. return toReturn, nil
  347. }
  348. func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
  349. containerData := make(map[string][]*util.Vector)
  350. for _, val := range qrs {
  351. containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
  352. if err != nil {
  353. return nil, err
  354. }
  355. if normalize && normalizationValue != 0 {
  356. for _, v := range val.Values {
  357. v.Value = v.Value / normalizationValue
  358. }
  359. }
  360. containerData[containerMetric.Key()] = val.Values
  361. }
  362. return containerData, nil
  363. }
  364. func GetContainerMetricVectors(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*util.Vector, error) {
  365. containerData := make(map[string][]*util.Vector)
  366. for _, val := range qrs {
  367. containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
  368. if err != nil {
  369. return nil, err
  370. }
  371. containerData[containerMetric.Key()] = val.Values
  372. }
  373. return containerData, nil
  374. }
  375. func GetNormalizedContainerMetricVectors(qrs []*prom.QueryResult, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
  376. containerData := make(map[string][]*util.Vector)
  377. for _, val := range qrs {
  378. containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
  379. if err != nil {
  380. return nil, err
  381. }
  382. containerData[containerMetric.Key()] = util.NormalizeVectorByVector(val.Values, normalizationValues)
  383. }
  384. return containerData, nil
  385. }
  386. func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
  387. toReturn := make(map[string][]*util.Vector)
  388. for _, val := range qrs {
  389. instance, err := val.GetString("node")
  390. if err != nil {
  391. return toReturn, err
  392. }
  393. toReturn[instance] = val.Values
  394. }
  395. return toReturn, nil
  396. }
  397. // TODO niko/prom retain message:
  398. // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
  399. func getNormalization(qrs []*prom.QueryResult) (float64, error) {
  400. if len(qrs) == 0 {
  401. return 0.0, prom.NoDataErr("getNormalization")
  402. }
  403. if len(qrs[0].Values) == 0 {
  404. return 0.0, prom.NoDataErr("getNormalization")
  405. }
  406. return qrs[0].Values[0].Value, nil
  407. }
  408. // TODO niko/prom retain message:
  409. // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
  410. func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
  411. if len(qrs) == 0 {
  412. return nil, prom.NoDataErr("getNormalizations")
  413. }
  414. return qrs[0].Values, nil
  415. }
  416. func parsePodLabels(qrs []*prom.QueryResult) (map[string]map[string]string, error) {
  417. podLabels := map[string]map[string]string{}
  418. for _, result := range qrs {
  419. pod, err := result.GetString("pod")
  420. if err != nil {
  421. return podLabels, errors.New("missing pod field")
  422. }
  423. if _, ok := podLabels[pod]; ok {
  424. podLabels[pod] = result.GetLabels()
  425. } else {
  426. podLabels[pod] = map[string]string{}
  427. podLabels[pod] = result.GetLabels()
  428. }
  429. }
  430. return podLabels, nil
  431. }