costmodel.go 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795
  1. package costmodel
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "regexp"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/opencost/opencost/core/pkg/clusters"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/opencost"
  13. "github.com/opencost/opencost/core/pkg/source"
  14. "github.com/opencost/opencost/core/pkg/util"
  15. "github.com/opencost/opencost/core/pkg/util/promutil"
  16. costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud/models"
  17. "github.com/opencost/opencost/pkg/clustercache"
  18. "github.com/opencost/opencost/pkg/env"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/labels"
  22. "golang.org/x/sync/singleflight"
  23. )
  24. const (
  25. profileThreshold = 1000 * 1000 * 1000 // 1s (in ns)
  26. unmountedPVsContainer = "unmounted-pvs"
  27. )
  28. // isCron matches a CronJob name and captures the non-timestamp name
  29. //
  30. // We support either a 10 character timestamp OR an 8 character timestamp
  31. // because batch/v1beta1 CronJobs creates Jobs with 10 character timestamps
  32. // and batch/v1 CronJobs create Jobs with 8 character timestamps.
  33. var isCron = regexp.MustCompile(`^(.+)-(\d{10}|\d{8})$`)
  34. type CostModel struct {
  35. Cache clustercache.ClusterCache
  36. ClusterMap clusters.ClusterMap
  37. BatchDuration time.Duration
  38. RequestGroup *singleflight.Group
  39. DataSource source.OpenCostDataSource
  40. Provider costAnalyzerCloud.Provider
  41. pricingMetadata *costAnalyzerCloud.PricingMatchMetadata
  42. }
  43. func NewCostModel(
  44. dataSource source.OpenCostDataSource,
  45. provider costAnalyzerCloud.Provider,
  46. cache clustercache.ClusterCache,
  47. clusterMap clusters.ClusterMap,
  48. batchDuration time.Duration,
  49. ) *CostModel {
  50. // request grouping to prevent over-requesting the same data prior to caching
  51. requestGroup := new(singleflight.Group)
  52. return &CostModel{
  53. Cache: cache,
  54. ClusterMap: clusterMap,
  55. BatchDuration: batchDuration,
  56. DataSource: dataSource,
  57. Provider: provider,
  58. RequestGroup: requestGroup,
  59. }
  60. }
  61. type CostData struct {
  62. Name string `json:"name,omitempty"`
  63. PodName string `json:"podName,omitempty"`
  64. NodeName string `json:"nodeName,omitempty"`
  65. NodeData *costAnalyzerCloud.Node `json:"node,omitempty"`
  66. Namespace string `json:"namespace,omitempty"`
  67. Deployments []string `json:"deployments,omitempty"`
  68. Services []string `json:"services,omitempty"`
  69. Daemonsets []string `json:"daemonsets,omitempty"`
  70. Statefulsets []string `json:"statefulsets,omitempty"`
  71. Jobs []string `json:"jobs,omitempty"`
  72. RAMReq []*util.Vector `json:"ramreq,omitempty"`
  73. RAMUsed []*util.Vector `json:"ramused,omitempty"`
  74. RAMAllocation []*util.Vector `json:"ramallocated,omitempty"`
  75. CPUReq []*util.Vector `json:"cpureq,omitempty"`
  76. CPUUsed []*util.Vector `json:"cpuused,omitempty"`
  77. CPUAllocation []*util.Vector `json:"cpuallocated,omitempty"`
  78. GPUReq []*util.Vector `json:"gpureq,omitempty"`
  79. PVCData []*PersistentVolumeClaimData `json:"pvcData,omitempty"`
  80. NetworkData []*util.Vector `json:"network,omitempty"`
  81. Annotations map[string]string `json:"annotations,omitempty"`
  82. Labels map[string]string `json:"labels,omitempty"`
  83. NamespaceLabels map[string]string `json:"namespaceLabels,omitempty"`
  84. ClusterID string `json:"clusterId"`
  85. ClusterName string `json:"clusterName"`
  86. }
  87. func (cd *CostData) String() string {
  88. return fmt.Sprintf("\n\tName: %s; PodName: %s, NodeName: %s\n\tNamespace: %s\n\tDeployments: %s\n\tServices: %s\n\tCPU (req, used, alloc): %d, %d, %d\n\tRAM (req, used, alloc): %d, %d, %d",
  89. cd.Name, cd.PodName, cd.NodeName, cd.Namespace, strings.Join(cd.Deployments, ", "), strings.Join(cd.Services, ", "),
  90. len(cd.CPUReq), len(cd.CPUUsed), len(cd.CPUAllocation),
  91. len(cd.RAMReq), len(cd.RAMUsed), len(cd.RAMAllocation))
  92. }
  93. func (cd *CostData) GetController() (name string, kind string, hasController bool) {
  94. hasController = false
  95. if len(cd.Deployments) > 0 {
  96. name = cd.Deployments[0]
  97. kind = "deployment"
  98. hasController = true
  99. } else if len(cd.Statefulsets) > 0 {
  100. name = cd.Statefulsets[0]
  101. kind = "statefulset"
  102. hasController = true
  103. } else if len(cd.Daemonsets) > 0 {
  104. name = cd.Daemonsets[0]
  105. kind = "daemonset"
  106. hasController = true
  107. } else if len(cd.Jobs) > 0 {
  108. name = cd.Jobs[0]
  109. kind = "job"
  110. hasController = true
  111. match := isCron.FindStringSubmatch(name)
  112. if match != nil {
  113. name = match[1]
  114. }
  115. }
  116. return name, kind, hasController
  117. }
  118. func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData, error) {
  119. // Cluster ID is specific to the source cluster
  120. clusterID := env.GetClusterID()
  121. cp := cm.Provider
  122. ds := cm.DataSource
  123. grp := source.NewQueryGroup()
  124. resChRAMUsage := source.WithGroup(grp, ds.QueryRAMUsageAvg(start, end))
  125. resChCPUUsage := source.WithGroup(grp, ds.QueryCPUUsageAvg(start, end))
  126. resChNetZoneRequests := source.WithGroup(grp, ds.QueryNetZoneGiB(start, end))
  127. resChNetRegionRequests := source.WithGroup(grp, ds.QueryNetRegionGiB(start, end))
  128. resChNetInternetRequests := source.WithGroup(grp, ds.QueryNetInternetGiB(start, end))
  129. // Pull pod information from k8s API
  130. podlist := cm.Cache.GetAllPods()
  131. podDeploymentsMapping, err := getPodDeployments(cm.Cache, podlist, clusterID)
  132. if err != nil {
  133. return nil, err
  134. }
  135. podServicesMapping, err := getPodServices(cm.Cache, podlist, clusterID)
  136. if err != nil {
  137. return nil, err
  138. }
  139. namespaceLabelsMapping, err := getNamespaceLabels(cm.Cache, clusterID)
  140. if err != nil {
  141. return nil, err
  142. }
  143. namespaceAnnotationsMapping, err := getNamespaceAnnotations(cm.Cache, clusterID)
  144. if err != nil {
  145. return nil, err
  146. }
  147. // Process Prometheus query results. Handle errors using ctx.Errors.
  148. resRAMUsage, _ := resChRAMUsage.Await()
  149. resCPUUsage, _ := resChCPUUsage.Await()
  150. resNetZoneRequests, _ := resChNetZoneRequests.Await()
  151. resNetRegionRequests, _ := resChNetRegionRequests.Await()
  152. resNetInternetRequests, _ := resChNetInternetRequests.Await()
  153. // NOTE: The way we currently handle errors and warnings only early returns if there is an error. Warnings
  154. // NOTE: will not propagate unless coupled with errors.
  155. if grp.HasErrors() {
  156. // To keep the context of where the errors are occurring, we log the errors here and pass them the error
  157. // back to the caller. The caller should handle the specific case where error is an ErrorCollection
  158. for _, queryErr := range grp.Errors() {
  159. if queryErr.Error != nil {
  160. log.Errorf("ComputeCostData: Request Error: %s", queryErr.Error)
  161. }
  162. if queryErr.ParseError != nil {
  163. log.Errorf("ComputeCostData: Parsing Error: %s", queryErr.ParseError)
  164. }
  165. }
  166. // ErrorCollection is an collection of errors wrapped in a single error implementation
  167. // We opt to not return an error for the sake of running as a pure exporter.
  168. log.Warnf("ComputeCostData: continuing despite prometheus errors: %s", grp.Error())
  169. }
  170. defer measureTime(time.Now(), profileThreshold, "ComputeCostData: Processing Query Data")
  171. nodes, err := cm.GetNodeCost(cp)
  172. if err != nil {
  173. log.Warnf("GetNodeCost: no node cost model available: %s", err)
  174. return nil, err
  175. }
  176. // Unmounted PVs represent the PVs that are not mounted or tied to a volume on a container
  177. unmountedPVs := make(map[string][]*PersistentVolumeClaimData)
  178. pvClaimMapping, err := GetPVInfoLocal(cm.Cache, clusterID)
  179. if err != nil {
  180. log.Warnf("GetPVInfo: unable to get PV data: %s", err.Error())
  181. }
  182. if pvClaimMapping != nil {
  183. err = addPVData(cm.Cache, pvClaimMapping, cp)
  184. if err != nil {
  185. return nil, err
  186. }
  187. // copy claim mappings into zombies, then remove as they're discovered
  188. for k, v := range pvClaimMapping {
  189. unmountedPVs[k] = []*PersistentVolumeClaimData{v}
  190. }
  191. }
  192. networkUsageMap, err := GetNetworkUsageData(resNetZoneRequests, resNetRegionRequests, resNetInternetRequests, clusterID)
  193. if err != nil {
  194. log.Warnf("Unable to get Network Cost Data: %s", err.Error())
  195. networkUsageMap = make(map[string]*NetworkUsageData)
  196. }
  197. containerNameCost := make(map[string]*CostData)
  198. containers := make(map[string]bool)
  199. RAMUsedMap, err := GetContainerMetricVector(resRAMUsage, clusterID)
  200. if err != nil {
  201. return nil, err
  202. }
  203. for key := range RAMUsedMap {
  204. containers[key] = true
  205. }
  206. CPUUsedMap, err := GetContainerMetricVector(resCPUUsage, clusterID) // No need to normalize here, as this comes from a counter
  207. if err != nil {
  208. return nil, err
  209. }
  210. for key := range CPUUsedMap {
  211. containers[key] = true
  212. }
  213. currentContainers := make(map[string]clustercache.Pod)
  214. for _, pod := range podlist {
  215. if pod.Status.Phase != v1.PodRunning {
  216. continue
  217. }
  218. cs, err := NewContainerMetricsFromPod(pod, clusterID)
  219. if err != nil {
  220. return nil, err
  221. }
  222. for _, c := range cs {
  223. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  224. currentContainers[c.Key()] = *pod
  225. }
  226. }
  227. missingNodes := make(map[string]*costAnalyzerCloud.Node)
  228. missingContainers := make(map[string]*CostData)
  229. for key := range containers {
  230. if _, ok := containerNameCost[key]; ok {
  231. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  232. }
  233. // The _else_ case for this statement is the case in which the container has been
  234. // deleted so we have usage information but not request information. In that case,
  235. // we return partial data for CPU and RAM: only usage and not requests.
  236. if pod, ok := currentContainers[key]; ok {
  237. podName := pod.Name
  238. ns := pod.Namespace
  239. nsLabels := namespaceLabelsMapping[ns+","+clusterID]
  240. podLabels := pod.Labels
  241. if podLabels == nil {
  242. podLabels = make(map[string]string)
  243. }
  244. for k, v := range nsLabels {
  245. if _, ok := podLabels[k]; !ok {
  246. podLabels[k] = v
  247. }
  248. }
  249. nsAnnotations := namespaceAnnotationsMapping[ns+","+clusterID]
  250. podAnnotations := pod.Annotations
  251. if podAnnotations == nil {
  252. podAnnotations = make(map[string]string)
  253. }
  254. for k, v := range nsAnnotations {
  255. if _, ok := podAnnotations[k]; !ok {
  256. podAnnotations[k] = v
  257. }
  258. }
  259. nodeName := pod.Spec.NodeName
  260. var nodeData *costAnalyzerCloud.Node
  261. if _, ok := nodes[nodeName]; ok {
  262. nodeData = nodes[nodeName]
  263. }
  264. nsKey := ns + "," + clusterID
  265. var podDeployments []string
  266. if _, ok := podDeploymentsMapping[nsKey]; ok {
  267. if ds, ok := podDeploymentsMapping[nsKey][pod.Name]; ok {
  268. podDeployments = ds
  269. } else {
  270. podDeployments = []string{}
  271. }
  272. }
  273. var podPVs []*PersistentVolumeClaimData
  274. podClaims := pod.Spec.Volumes
  275. for _, vol := range podClaims {
  276. if vol.PersistentVolumeClaim != nil {
  277. name := vol.PersistentVolumeClaim.ClaimName
  278. key := ns + "," + name + "," + clusterID
  279. if pvClaim, ok := pvClaimMapping[key]; ok {
  280. pvClaim.TimesClaimed++
  281. podPVs = append(podPVs, pvClaim)
  282. // Remove entry from potential unmounted pvs
  283. delete(unmountedPVs, key)
  284. }
  285. }
  286. }
  287. var podNetCosts []*util.Vector
  288. if usage, ok := networkUsageMap[ns+","+podName+","+clusterID]; ok {
  289. netCosts, err := GetNetworkCost(usage, cp)
  290. if err != nil {
  291. log.Debugf("Error pulling network costs: %s", err.Error())
  292. } else {
  293. podNetCosts = netCosts
  294. }
  295. }
  296. var podServices []string
  297. if _, ok := podServicesMapping[nsKey]; ok {
  298. if svcs, ok := podServicesMapping[nsKey][pod.Name]; ok {
  299. podServices = svcs
  300. } else {
  301. podServices = []string{}
  302. }
  303. }
  304. for i, container := range pod.Spec.Containers {
  305. containerName := container.Name
  306. // recreate the key and look up data for this container
  307. newKey := NewContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
  308. // k8s.io/apimachinery/pkg/api/resource/amount.go and
  309. // k8s.io/apimachinery/pkg/api/resource/quantity.go for
  310. // details on the "amount" API. See
  311. // https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-types
  312. // for the units of memory and CPU.
  313. ramRequestBytes := container.Resources.Requests.Memory().Value()
  314. // Because information on container RAM & CPU requests isn't
  315. // coming from Prometheus, it won't have a timestamp associated
  316. // with it. We need to provide a timestamp.
  317. RAMReqV := []*util.Vector{
  318. {
  319. Value: float64(ramRequestBytes),
  320. Timestamp: float64(time.Now().UTC().Unix()),
  321. },
  322. }
  323. // use millicores so we can convert to cores in a float64 format
  324. cpuRequestMilliCores := container.Resources.Requests.Cpu().MilliValue()
  325. CPUReqV := []*util.Vector{
  326. {
  327. Value: float64(cpuRequestMilliCores) / 1000,
  328. Timestamp: float64(time.Now().UTC().Unix()),
  329. },
  330. }
  331. gpuReqCount := 0.0
  332. if g, ok := container.Resources.Requests["nvidia.com/gpu"]; ok {
  333. gpuReqCount = g.AsApproximateFloat64()
  334. } else if g, ok := container.Resources.Limits["nvidia.com/gpu"]; ok {
  335. gpuReqCount = g.AsApproximateFloat64()
  336. } else if g, ok := container.Resources.Requests["k8s.amazonaws.com/vgpu"]; ok {
  337. gpuReqCount = g.AsApproximateFloat64()
  338. } else if g, ok := container.Resources.Limits["k8s.amazonaws.com/vgpu"]; ok {
  339. gpuReqCount = g.AsApproximateFloat64()
  340. }
  341. GPUReqV := []*util.Vector{
  342. {
  343. Value: float64(gpuReqCount),
  344. Timestamp: float64(time.Now().UTC().Unix()),
  345. },
  346. }
  347. RAMUsedV, ok := RAMUsedMap[newKey]
  348. if !ok {
  349. log.Debug("no RAM usage for " + newKey)
  350. RAMUsedV = []*util.Vector{{}}
  351. }
  352. CPUUsedV, ok := CPUUsedMap[newKey]
  353. if !ok {
  354. log.Debug("no CPU usage for " + newKey)
  355. CPUUsedV = []*util.Vector{{}}
  356. }
  357. var pvReq []*PersistentVolumeClaimData
  358. var netReq []*util.Vector
  359. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  360. pvReq = podPVs
  361. netReq = podNetCosts
  362. }
  363. costs := &CostData{
  364. Name: containerName,
  365. PodName: podName,
  366. NodeName: nodeName,
  367. Namespace: ns,
  368. Deployments: podDeployments,
  369. Services: podServices,
  370. Daemonsets: getDaemonsetsOfPod(pod),
  371. Jobs: getJobsOfPod(pod),
  372. Statefulsets: getStatefulSetsOfPod(pod),
  373. NodeData: nodeData,
  374. RAMReq: RAMReqV,
  375. RAMUsed: RAMUsedV,
  376. CPUReq: CPUReqV,
  377. CPUUsed: CPUUsedV,
  378. GPUReq: GPUReqV,
  379. PVCData: pvReq,
  380. NetworkData: netReq,
  381. Annotations: podAnnotations,
  382. Labels: podLabels,
  383. NamespaceLabels: nsLabels,
  384. ClusterID: clusterID,
  385. ClusterName: cm.ClusterMap.NameFor(clusterID),
  386. }
  387. var cpuReq, cpuUse *util.Vector
  388. if len(costs.CPUReq) > 0 {
  389. cpuReq = costs.CPUReq[0]
  390. }
  391. if len(costs.CPUUsed) > 0 {
  392. cpuUse = costs.CPUUsed[0]
  393. }
  394. costs.CPUAllocation = getContainerAllocation(cpuReq, cpuUse, "CPU")
  395. var ramReq, ramUse *util.Vector
  396. if len(costs.RAMReq) > 0 {
  397. ramReq = costs.RAMReq[0]
  398. }
  399. if len(costs.RAMUsed) > 0 {
  400. ramUse = costs.RAMUsed[0]
  401. }
  402. costs.RAMAllocation = getContainerAllocation(ramReq, ramUse, "RAM")
  403. containerNameCost[newKey] = costs
  404. }
  405. } else {
  406. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  407. log.Debug("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  408. c, err := NewContainerMetricFromKey(key)
  409. if err != nil {
  410. return nil, err
  411. }
  412. // CPU and RAM requests are obtained from the Kubernetes API.
  413. // If this case has been reached, the Kubernetes API will not
  414. // have information about the pod because it no longer exists.
  415. //
  416. // The case where this matters is minimal, mainly in environments
  417. // with very short-lived pods that over-request resources.
  418. RAMReqV := []*util.Vector{{}}
  419. CPUReqV := []*util.Vector{{}}
  420. GPUReqV := []*util.Vector{{}}
  421. RAMUsedV, ok := RAMUsedMap[key]
  422. if !ok {
  423. log.Debug("no RAM usage for " + key)
  424. RAMUsedV = []*util.Vector{{}}
  425. }
  426. CPUUsedV, ok := CPUUsedMap[key]
  427. if !ok {
  428. log.Debug("no CPU usage for " + key)
  429. CPUUsedV = []*util.Vector{{}}
  430. }
  431. node, ok := nodes[c.NodeName]
  432. if !ok {
  433. log.Debugf("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
  434. if n, ok := missingNodes[c.NodeName]; ok {
  435. node = n
  436. } else {
  437. node = &costAnalyzerCloud.Node{}
  438. missingNodes[c.NodeName] = node
  439. }
  440. }
  441. namespacelabels := namespaceLabelsMapping[c.Namespace+","+c.ClusterID]
  442. namespaceAnnotations := namespaceAnnotationsMapping[c.Namespace+","+c.ClusterID]
  443. costs := &CostData{
  444. Name: c.ContainerName,
  445. PodName: c.PodName,
  446. NodeName: c.NodeName,
  447. NodeData: node,
  448. Namespace: c.Namespace,
  449. RAMReq: RAMReqV,
  450. RAMUsed: RAMUsedV,
  451. CPUReq: CPUReqV,
  452. CPUUsed: CPUUsedV,
  453. GPUReq: GPUReqV,
  454. Annotations: namespaceAnnotations,
  455. NamespaceLabels: namespacelabels,
  456. ClusterID: c.ClusterID,
  457. ClusterName: cm.ClusterMap.NameFor(c.ClusterID),
  458. }
  459. var cpuReq, cpuUse *util.Vector
  460. if len(costs.CPUReq) > 0 {
  461. cpuReq = costs.CPUReq[0]
  462. }
  463. if len(costs.CPUUsed) > 0 {
  464. cpuUse = costs.CPUUsed[0]
  465. }
  466. costs.CPUAllocation = getContainerAllocation(cpuReq, cpuUse, "CPU")
  467. var ramReq, ramUse *util.Vector
  468. if len(costs.RAMReq) > 0 {
  469. ramReq = costs.RAMReq[0]
  470. }
  471. if len(costs.RAMUsed) > 0 {
  472. ramUse = costs.RAMUsed[0]
  473. }
  474. costs.RAMAllocation = getContainerAllocation(ramReq, ramUse, "RAM")
  475. containerNameCost[key] = costs
  476. missingContainers[key] = costs
  477. }
  478. }
  479. // Use unmounted pvs to create a mapping of "Unmounted-<Namespace>" containers
  480. // to pass along the cost data
  481. unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping, namespaceAnnotationsMapping)
  482. for k, costs := range unmounted {
  483. log.Debugf("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
  484. containerNameCost[k] = costs
  485. }
  486. err = findDeletedNodeInfo(cm.DataSource, missingNodes, start, end)
  487. if err != nil {
  488. log.Errorf("Error fetching historical node data: %s", err.Error())
  489. }
  490. err = findDeletedPodInfo(cm.DataSource, missingContainers, start, end)
  491. if err != nil {
  492. log.Errorf("Error fetching historical pod data: %s", err.Error())
  493. }
  494. return containerNameCost, err
  495. }
  496. func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string, namespaceAnnotationsMapping map[string]map[string]string) map[string]*CostData {
  497. costs := make(map[string]*CostData)
  498. if len(unmountedPVs) == 0 {
  499. return costs
  500. }
  501. for k, pv := range unmountedPVs {
  502. keyParts := strings.Split(k, ",")
  503. if len(keyParts) != 3 {
  504. log.Warnf("Unmounted PV used key with incorrect parts: %s", k)
  505. continue
  506. }
  507. ns, _, clusterID := keyParts[0], keyParts[1], keyParts[2]
  508. namespacelabels := namespaceLabelsMapping[ns+","+clusterID]
  509. namespaceAnnotations := namespaceAnnotationsMapping[ns+","+clusterID]
  510. metric := NewContainerMetricFromValues(ns, unmountedPVsContainer, unmountedPVsContainer, "", clusterID)
  511. key := metric.Key()
  512. if costData, ok := costs[key]; !ok {
  513. costs[key] = &CostData{
  514. Name: unmountedPVsContainer,
  515. PodName: unmountedPVsContainer,
  516. NodeName: "",
  517. Annotations: namespaceAnnotations,
  518. Namespace: ns,
  519. NamespaceLabels: namespacelabels,
  520. Labels: namespacelabels,
  521. ClusterID: clusterID,
  522. ClusterName: clusterMap.NameFor(clusterID),
  523. PVCData: pv,
  524. }
  525. } else {
  526. costData.PVCData = append(costData.PVCData, pv...)
  527. }
  528. }
  529. return costs
  530. }
  531. func findDeletedPodInfo(dataSource source.OpenCostDataSource, missingContainers map[string]*CostData, start, end time.Time) error {
  532. if len(missingContainers) > 0 {
  533. podLabelsResCh := dataSource.QueryPodLabels(start, end)
  534. podLabelsResult, err := podLabelsResCh.Await()
  535. if err != nil {
  536. log.Errorf("failed to parse historical pod labels: %s", err.Error())
  537. }
  538. podLabels := make(map[string]map[string]string)
  539. if podLabelsResult != nil {
  540. podLabels, err = parsePodLabels(podLabelsResult)
  541. if err != nil {
  542. log.Errorf("failed to parse historical pod labels: %s", err.Error())
  543. }
  544. }
  545. for key, costData := range missingContainers {
  546. cm, _ := NewContainerMetricFromKey(key)
  547. labels, ok := podLabels[cm.PodName]
  548. if !ok {
  549. labels = make(map[string]string)
  550. }
  551. for k, v := range costData.NamespaceLabels {
  552. labels[k] = v
  553. }
  554. costData.Labels = labels
  555. }
  556. }
  557. return nil
  558. }
  559. func findDeletedNodeInfo(dataSource source.OpenCostDataSource, missingNodes map[string]*costAnalyzerCloud.Node, start, end time.Time) error {
  560. if len(missingNodes) > 0 {
  561. defer measureTime(time.Now(), profileThreshold, "Finding Deleted Node Info")
  562. grp := source.NewQueryGroup()
  563. cpuCostResCh := source.WithGroup(grp, dataSource.QueryNodeCPUPricePerHr(start, end))
  564. ramCostResCh := source.WithGroup(grp, dataSource.QueryNodeRAMPricePerGiBHr(start, end))
  565. gpuCostResCh := source.WithGroup(grp, dataSource.QueryNodeGPUPricePerHr(start, end))
  566. cpuCostRes, _ := cpuCostResCh.Await()
  567. ramCostRes, _ := ramCostResCh.Await()
  568. gpuCostRes, _ := gpuCostResCh.Await()
  569. if grp.HasErrors() {
  570. return grp.Error()
  571. }
  572. cpuCosts, err := getCost(cpuCostRes, cpuCostNode, cpuCostData)
  573. if err != nil {
  574. return err
  575. }
  576. ramCosts, err := getCost(ramCostRes, ramCostNode, ramCostData)
  577. if err != nil {
  578. return err
  579. }
  580. gpuCosts, err := getCost(gpuCostRes, gpuCostNode, gpuCostData)
  581. if err != nil {
  582. return err
  583. }
  584. if len(cpuCosts) == 0 {
  585. log.Infof("Kubecost prometheus metrics not currently available. Ingest this server's /metrics endpoint to get that data.")
  586. }
  587. for node, costv := range cpuCosts {
  588. if _, ok := missingNodes[node]; ok {
  589. missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
  590. } else {
  591. log.DedupedWarningf(5, "Node `%s` in prometheus but not k8s api", node)
  592. }
  593. }
  594. for node, costv := range ramCosts {
  595. if _, ok := missingNodes[node]; ok {
  596. missingNodes[node].RAMCost = fmt.Sprintf("%f", costv[0].Value)
  597. }
  598. }
  599. for node, costv := range gpuCosts {
  600. if _, ok := missingNodes[node]; ok {
  601. missingNodes[node].GPUCost = fmt.Sprintf("%f", costv[0].Value)
  602. }
  603. }
  604. }
  605. return nil
  606. }
  607. // getContainerAllocation takes the max between request and usage. This function
  608. // returns a slice containing a single element describing the container's
  609. // allocation.
  610. //
  611. // Additionally, the timestamp of the allocation will be the highest value
  612. // timestamp between the two vectors. This mitigates situations where
  613. // Timestamp=0. This should have no effect on the metrics emitted by the
  614. // CostModelMetricsEmitter
  615. func getContainerAllocation(req *util.Vector, used *util.Vector, allocationType string) []*util.Vector {
  616. var result []*util.Vector
  617. if req != nil && used != nil {
  618. x1 := req.Value
  619. if math.IsNaN(x1) {
  620. log.Debugf("NaN value found during %s allocation calculation for requests.", allocationType)
  621. x1 = 0.0
  622. }
  623. y1 := used.Value
  624. if math.IsNaN(y1) {
  625. log.Debugf("NaN value found during %s allocation calculation for used.", allocationType)
  626. y1 = 0.0
  627. }
  628. result = []*util.Vector{
  629. {
  630. Value: math.Max(x1, y1),
  631. Timestamp: math.Max(req.Timestamp, used.Timestamp),
  632. },
  633. }
  634. if result[0].Value == 0 && result[0].Timestamp == 0 {
  635. log.Debugf("No request or usage data found during %s allocation calculation. Setting allocation to 0.", allocationType)
  636. }
  637. } else if req != nil {
  638. result = []*util.Vector{
  639. {
  640. Value: req.Value,
  641. Timestamp: req.Timestamp,
  642. },
  643. }
  644. } else if used != nil {
  645. result = []*util.Vector{
  646. {
  647. Value: used.Value,
  648. Timestamp: used.Timestamp,
  649. },
  650. }
  651. } else {
  652. log.Debugf("No request or usage data found during %s allocation calculation. Setting allocation to 0.", allocationType)
  653. result = []*util.Vector{
  654. {
  655. Value: 0,
  656. Timestamp: float64(time.Now().UTC().Unix()),
  657. },
  658. }
  659. }
  660. return result
  661. }
  662. func addPVData(cache clustercache.ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
  663. cfg, err := cloud.GetConfig()
  664. if err != nil {
  665. return err
  666. }
  667. // Pull a region from the first node
  668. var defaultRegion string
  669. nodeList := cache.GetAllNodes()
  670. if len(nodeList) > 0 {
  671. defaultRegion, _ = util.GetRegion(nodeList[0].Labels)
  672. }
  673. storageClasses := cache.GetAllStorageClasses()
  674. storageClassMap := make(map[string]map[string]string)
  675. for _, storageClass := range storageClasses {
  676. params := storageClass.Parameters
  677. storageClassMap[storageClass.Name] = params
  678. if storageClass.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.Annotations["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  679. storageClassMap["default"] = params
  680. storageClassMap[""] = params
  681. }
  682. }
  683. pvs := cache.GetAllPersistentVolumes()
  684. pvMap := make(map[string]*costAnalyzerCloud.PV)
  685. for _, pv := range pvs {
  686. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  687. if !ok {
  688. log.Debugf("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  689. }
  690. var region string
  691. if r, ok := util.GetRegion(pv.Labels); ok {
  692. region = r
  693. } else {
  694. region = defaultRegion
  695. }
  696. cacPv := &costAnalyzerCloud.PV{
  697. Class: pv.Spec.StorageClassName,
  698. Region: region,
  699. Parameters: parameters,
  700. }
  701. err := GetPVCost(cacPv, pv, cloud, region)
  702. if err != nil {
  703. return err
  704. }
  705. pvMap[pv.Name] = cacPv
  706. }
  707. for _, pvc := range pvClaimMapping {
  708. if vol, ok := pvMap[pvc.VolumeName]; ok {
  709. pvc.Volume = vol
  710. } else {
  711. log.Debugf("PV not found, using default")
  712. pvc.Volume = &costAnalyzerCloud.PV{
  713. Cost: cfg.Storage,
  714. }
  715. }
  716. }
  717. return nil
  718. }
  719. func GetPVCost(pv *costAnalyzerCloud.PV, kpv *clustercache.PersistentVolume, cp costAnalyzerCloud.Provider, defaultRegion string) error {
  720. cfg, err := cp.GetConfig()
  721. if err != nil {
  722. return err
  723. }
  724. key := cp.GetPVKey(kpv, pv.Parameters, defaultRegion)
  725. pv.ProviderID = key.ID()
  726. pvWithCost, err := cp.PVPricing(key)
  727. if err != nil {
  728. pv.Cost = cfg.Storage
  729. return err
  730. }
  731. if pvWithCost == nil || pvWithCost.Cost == "" {
  732. pv.Cost = cfg.Storage
  733. return nil // set default cost
  734. }
  735. pv.Cost = pvWithCost.Cost
  736. return nil
  737. }
  738. func (cm *CostModel) GetPricingSourceCounts() (*costAnalyzerCloud.PricingMatchMetadata, error) {
  739. if cm.pricingMetadata != nil {
  740. return cm.pricingMetadata, nil
  741. } else {
  742. return nil, fmt.Errorf("Node costs not yet calculated")
  743. }
  744. }
  745. func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
  746. cfg, err := cp.GetConfig()
  747. if err != nil {
  748. return nil, err
  749. }
  750. nodeList := cm.Cache.GetAllNodes()
  751. nodes := make(map[string]*costAnalyzerCloud.Node)
  752. pmd := &costAnalyzerCloud.PricingMatchMetadata{
  753. TotalNodes: 0,
  754. PricingTypeCounts: make(map[costAnalyzerCloud.PricingType]int),
  755. }
  756. for _, n := range nodeList {
  757. name := n.Name
  758. nodeLabels := n.Labels
  759. nodeLabels["providerID"] = n.SpecProviderID
  760. pmd.TotalNodes++
  761. cnode, _, err := cp.NodePricing(cp.GetKey(nodeLabels, n))
  762. if err != nil {
  763. log.Infof("Error getting node pricing. Error: %s", err.Error())
  764. if cnode != nil {
  765. nodes[name] = cnode
  766. continue
  767. } else {
  768. cnode = &costAnalyzerCloud.Node{
  769. VCPUCost: cfg.CPU,
  770. RAMCost: cfg.RAM,
  771. }
  772. }
  773. }
  774. if _, ok := pmd.PricingTypeCounts[cnode.PricingType]; ok {
  775. pmd.PricingTypeCounts[cnode.PricingType]++
  776. } else {
  777. pmd.PricingTypeCounts[cnode.PricingType] = 1
  778. }
  779. // newCnode builds upon cnode but populates/overrides certain fields.
  780. // cnode was populated leveraging cloud provider public pricing APIs.
  781. newCnode := *cnode
  782. if newCnode.InstanceType == "" {
  783. it, _ := util.GetInstanceType(n.Labels)
  784. newCnode.InstanceType = it
  785. }
  786. if newCnode.Region == "" {
  787. region, _ := util.GetRegion(n.Labels)
  788. newCnode.Region = region
  789. }
  790. if newCnode.ArchType == "" {
  791. arch, _ := util.GetArchType(n.Labels)
  792. newCnode.ArchType = arch
  793. }
  794. newCnode.ProviderID = n.SpecProviderID
  795. var cpu float64
  796. if newCnode.VCPU == "" {
  797. cpu = float64(n.Status.Capacity.Cpu().Value())
  798. newCnode.VCPU = n.Status.Capacity.Cpu().String()
  799. } else {
  800. cpu, err = strconv.ParseFloat(newCnode.VCPU, 64)
  801. if err != nil {
  802. log.Warnf("parsing VCPU value: \"%s\" as float64", newCnode.VCPU)
  803. }
  804. }
  805. if math.IsNaN(cpu) {
  806. log.Warnf("cpu parsed as NaN. Setting to 0.")
  807. cpu = 0
  808. }
  809. var ram float64
  810. if newCnode.RAM == "" {
  811. newCnode.RAM = n.Status.Capacity.Memory().String()
  812. }
  813. ram = float64(n.Status.Capacity.Memory().Value())
  814. if math.IsNaN(ram) {
  815. log.Warnf("ram parsed as NaN. Setting to 0.")
  816. ram = 0
  817. }
  818. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  819. gpuc, err := strconv.ParseFloat(newCnode.GPU, 64)
  820. if err != nil {
  821. gpuc = 0.0
  822. }
  823. // The k8s API will often report more accurate results for GPU count
  824. // than cloud provider public pricing APIs. If found, override the
  825. // original value.
  826. gpuOverride, vgpuOverride, err := getGPUCount(cm.Cache, n)
  827. if err != nil {
  828. log.Warnf("Unable to get GPUCount for node %s: %s", n.Name, err.Error())
  829. }
  830. if gpuOverride > 0 {
  831. newCnode.GPU = fmt.Sprintf("%f", gpuOverride)
  832. gpuc = gpuOverride
  833. }
  834. if vgpuOverride > 0 {
  835. newCnode.VGPU = fmt.Sprintf("%f", vgpuOverride)
  836. }
  837. // Special case for SUSE rancher, since it won't behave with normal
  838. // calculations, courtesy of the instance type not being "real" (a
  839. // recognizable AWS instance type.)
  840. if newCnode.InstanceType == "rke2" {
  841. log.Infof(
  842. "Found a SUSE Rancher node %s, defaulting and skipping math",
  843. cp.GetKey(nodeLabels, n).Features(),
  844. )
  845. defaultCPUCorePrice, err := strconv.ParseFloat(cfg.CPU, 64)
  846. if err != nil {
  847. log.Errorf("Could not parse default cpu price")
  848. defaultCPUCorePrice = 0
  849. }
  850. if math.IsNaN(defaultCPUCorePrice) {
  851. log.Warnf("defaultCPU parsed as NaN. Setting to 0.")
  852. defaultCPUCorePrice = 0
  853. }
  854. // Some customers may want GPU pricing to be determined by the labels affixed to their nodes. GpuPricing
  855. // passes the node's labels to the provider, which then cross-references them with the labels that the
  856. // provider knows to have label-specific costs associated with them, and returns that cost. See CSVProvider
  857. // for an example implementation.
  858. var gpuPrice float64
  859. gpuPricing, err := cp.GpuPricing(nodeLabels)
  860. if err != nil {
  861. log.Errorf("Could not determine custom GPU pricing: %s", err)
  862. gpuPrice = 0
  863. } else if len(gpuPricing) > 0 {
  864. gpuPrice, err = strconv.ParseFloat(gpuPricing, 64)
  865. if err != nil {
  866. log.Errorf("Could not parse custom GPU pricing: %s", err)
  867. gpuPrice = 0
  868. } else if math.IsNaN(gpuPrice) {
  869. log.Warnf("Custom GPU pricing parsed as NaN. Setting to 0.")
  870. gpuPrice = 0
  871. } else {
  872. log.Infof("Using custom GPU pricing for node \"%s\": %f", name, gpuPrice)
  873. }
  874. } else {
  875. gpuPrice, err = strconv.ParseFloat(cfg.GPU, 64)
  876. if err != nil {
  877. log.Errorf("Could not parse default gpu price")
  878. gpuPrice = 0
  879. }
  880. if math.IsNaN(gpuPrice) {
  881. log.Warnf("defaultGPU parsed as NaN. Setting to 0.")
  882. gpuPrice = 0
  883. }
  884. }
  885. defaultRAMPrice, err := strconv.ParseFloat(cfg.RAM, 64)
  886. if err != nil {
  887. log.Errorf("Could not parse default ram price")
  888. defaultRAMPrice = 0
  889. }
  890. if math.IsNaN(defaultRAMPrice) {
  891. log.Warnf("defaultRAM parsed as NaN. Setting to 0.")
  892. defaultRAMPrice = 0
  893. }
  894. defaultGPUPrice, err := strconv.ParseFloat(cfg.GPU, 64)
  895. if err != nil {
  896. log.Errorf("Could not parse default gpu price")
  897. defaultGPUPrice = 0
  898. }
  899. if math.IsNaN(defaultGPUPrice) {
  900. log.Warnf("defaultGPU parsed as NaN. Setting to 0.")
  901. defaultGPUPrice = 0
  902. }
  903. // Just say no to doing the ratios!
  904. cpuCost := defaultCPUCorePrice * cpu
  905. gpuCost := gpuPrice * gpuc
  906. ramCost := defaultRAMPrice * ram
  907. nodeCost := cpuCost + gpuCost + ramCost
  908. newCnode.Cost = fmt.Sprintf("%f", nodeCost)
  909. newCnode.VCPUCost = fmt.Sprintf("%f", defaultCPUCorePrice)
  910. newCnode.GPUCost = fmt.Sprintf("%f", gpuPrice)
  911. newCnode.RAMCost = fmt.Sprintf("%f", defaultRAMPrice)
  912. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  913. } else if newCnode.GPU != "" && newCnode.GPUCost == "" {
  914. // was the big thing to investigate. All the funky ratio math
  915. // we were doing was messing with their default pricing. for SUSE Rancher.
  916. // We reach this when a GPU is detected on a node, but no cost for
  917. // the GPU is defined in the OnDemand pricing. Calculate ratios of
  918. // CPU to RAM and GPU to RAM costs, then distribute the total node
  919. // cost among the CPU, RAM, and GPU.
  920. log.Tracef("GPU without cost found for %s, calculating...", cp.GetKey(nodeLabels, n).Features())
  921. // Some customers may want GPU pricing to be determined by the labels affixed to their nodes. GpuPricing
  922. // passes the node's labels to the provider, which then cross-references them with the labels that the
  923. // provider knows to have label-specific costs associated with them, and returns that cost. See CSVProvider
  924. // for an example implementation.
  925. gpuPricing, err := cp.GpuPricing(nodeLabels)
  926. if err != nil {
  927. log.Errorf("Could not determine custom GPU pricing: %s", err)
  928. } else if len(gpuPricing) > 0 {
  929. newCnode.GPUCost = gpuPricing
  930. log.Infof("Using custom GPU pricing for node \"%s\": %s", name, gpuPricing)
  931. }
  932. if newCnode.GPUCost == "" {
  933. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  934. if err != nil {
  935. log.Errorf("Could not parse default cpu price")
  936. defaultCPU = 0
  937. }
  938. if math.IsNaN(defaultCPU) {
  939. log.Warnf("defaultCPU parsed as NaN. Setting to 0.")
  940. defaultCPU = 0
  941. }
  942. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  943. if err != nil {
  944. log.Errorf("Could not parse default ram price")
  945. defaultRAM = 0
  946. }
  947. if math.IsNaN(defaultRAM) {
  948. log.Warnf("defaultRAM parsed as NaN. Setting to 0.")
  949. defaultRAM = 0
  950. }
  951. defaultGPU, err := strconv.ParseFloat(cfg.GPU, 64)
  952. if err != nil {
  953. log.Errorf("Could not parse default gpu price")
  954. defaultGPU = 0
  955. }
  956. if math.IsNaN(defaultGPU) {
  957. log.Warnf("defaultGPU parsed as NaN. Setting to 0.")
  958. defaultGPU = 0
  959. }
  960. cpuToRAMRatio := defaultCPU / defaultRAM
  961. if math.IsNaN(cpuToRAMRatio) {
  962. log.Warnf("cpuToRAMRatio[defaultCPU: %f / defaultRAM: %f] is NaN. Setting to 10.", defaultCPU, defaultRAM)
  963. cpuToRAMRatio = 10
  964. }
  965. gpuToRAMRatio := defaultGPU / defaultRAM
  966. if math.IsNaN(gpuToRAMRatio) {
  967. log.Warnf("gpuToRAMRatio is NaN. Setting to 100.")
  968. gpuToRAMRatio = 100
  969. }
  970. ramGB := ram / 1024 / 1024 / 1024
  971. if math.IsNaN(ramGB) {
  972. log.Warnf("ramGB is NaN. Setting to 0.")
  973. ramGB = 0
  974. }
  975. ramMultiple := gpuc*gpuToRAMRatio + cpu*cpuToRAMRatio + ramGB
  976. if math.IsNaN(ramMultiple) {
  977. log.Warnf("ramMultiple is NaN. Setting to 0.")
  978. ramMultiple = 0
  979. }
  980. var nodePrice float64
  981. if newCnode.Cost != "" {
  982. nodePrice, err = strconv.ParseFloat(newCnode.Cost, 64)
  983. if err != nil {
  984. log.Errorf("Could not parse total node price")
  985. return nil, err
  986. }
  987. } else if newCnode.VCPUCost != "" {
  988. nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated to the CPU
  989. if err != nil {
  990. log.Errorf("Could not parse node vcpu price")
  991. return nil, err
  992. }
  993. } else { // add case to use default pricing model when API data fails.
  994. log.Debugf("No node price or CPUprice found, falling back to default")
  995. nodePrice = defaultCPU*cpu + defaultRAM*ram + gpuc*defaultGPU
  996. }
  997. if math.IsNaN(nodePrice) {
  998. log.Warnf("nodePrice parsed as NaN. Setting to 0.")
  999. nodePrice = 0
  1000. }
  1001. ramPrice := (nodePrice / ramMultiple)
  1002. if math.IsNaN(ramPrice) {
  1003. log.Warnf("ramPrice[nodePrice: %f / ramMultiple: %f] parsed as NaN. Setting to 0.", nodePrice, ramMultiple)
  1004. ramPrice = 0
  1005. }
  1006. cpuPrice := ramPrice * cpuToRAMRatio
  1007. gpuPrice := ramPrice * gpuToRAMRatio
  1008. newCnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  1009. newCnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  1010. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  1011. newCnode.GPUCost = fmt.Sprintf("%f", gpuPrice)
  1012. }
  1013. } else if newCnode.RAMCost == "" {
  1014. // We reach this when no RAM cost is defined in the OnDemand
  1015. // pricing. It calculates a cpuToRAMRatio and ramMultiple to
  1016. // distrubte the total node cost among CPU and RAM costs.
  1017. log.Tracef("No RAM cost found for %s, calculating...", cp.GetKey(nodeLabels, n).Features())
  1018. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1019. if err != nil {
  1020. log.Warnf("Could not parse default cpu price")
  1021. defaultCPU = 0
  1022. }
  1023. if math.IsNaN(defaultCPU) {
  1024. log.Warnf("defaultCPU parsed as NaN. Setting to 0.")
  1025. defaultCPU = 0
  1026. }
  1027. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1028. if err != nil {
  1029. log.Warnf("Could not parse default ram price")
  1030. defaultRAM = 0
  1031. }
  1032. if math.IsNaN(defaultRAM) {
  1033. log.Warnf("defaultRAM parsed as NaN. Setting to 0.")
  1034. defaultRAM = 0
  1035. }
  1036. cpuToRAMRatio := defaultCPU / defaultRAM
  1037. if math.IsNaN(cpuToRAMRatio) {
  1038. log.Warnf("cpuToRAMRatio[defaultCPU: %f / defaultRAM: %f] is NaN. Setting to 10.", defaultCPU, defaultRAM)
  1039. cpuToRAMRatio = 10
  1040. }
  1041. ramGB := ram / 1024 / 1024 / 1024
  1042. if math.IsNaN(ramGB) {
  1043. log.Warnf("ramGB is NaN. Setting to 0.")
  1044. ramGB = 0
  1045. }
  1046. ramMultiple := cpu*cpuToRAMRatio + ramGB
  1047. if math.IsNaN(ramMultiple) {
  1048. log.Warnf("ramMultiple is NaN. Setting to 0.")
  1049. ramMultiple = 0
  1050. }
  1051. var nodePrice float64
  1052. if newCnode.Cost != "" {
  1053. nodePrice, err = strconv.ParseFloat(newCnode.Cost, 64)
  1054. if err != nil {
  1055. log.Warnf("Could not parse total node price")
  1056. return nil, err
  1057. }
  1058. if newCnode.GPUCost != "" {
  1059. gpuPrice, err := strconv.ParseFloat(newCnode.GPUCost, 64)
  1060. if err != nil {
  1061. log.Warnf("Could not parse node gpu price")
  1062. return nil, err
  1063. }
  1064. nodePrice = nodePrice - gpuPrice // remove the gpuPrice from the total, we're just costing out RAM and CPU.
  1065. }
  1066. } else if newCnode.VCPUCost != "" {
  1067. nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated to the CPU
  1068. if err != nil {
  1069. log.Warnf("Could not parse node vcpu price")
  1070. return nil, err
  1071. }
  1072. } else { // add case to use default pricing model when API data fails.
  1073. log.Debugf("No node price or CPUprice found, falling back to default")
  1074. nodePrice = defaultCPU*cpu + defaultRAM*ramGB
  1075. }
  1076. if math.IsNaN(nodePrice) {
  1077. log.Warnf("nodePrice parsed as NaN. Setting to 0.")
  1078. nodePrice = 0
  1079. }
  1080. ramPrice := (nodePrice / ramMultiple)
  1081. if math.IsNaN(ramPrice) {
  1082. log.Warnf("ramPrice[nodePrice: %f / ramMultiple: %f] parsed as NaN. Setting to 0.", nodePrice, ramMultiple)
  1083. ramPrice = 0
  1084. }
  1085. cpuPrice := ramPrice * cpuToRAMRatio
  1086. if defaultRAM != 0 {
  1087. newCnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  1088. newCnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  1089. } else { // just assign the full price to CPU
  1090. if cpu != 0 {
  1091. newCnode.VCPUCost = fmt.Sprintf("%f", nodePrice/cpu)
  1092. } else {
  1093. newCnode.VCPUCost = fmt.Sprintf("%f", nodePrice)
  1094. }
  1095. }
  1096. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  1097. log.Tracef("Computed \"%s\" RAM Cost := %v", name, newCnode.RAMCost)
  1098. }
  1099. nodes[name] = &newCnode
  1100. }
  1101. cm.pricingMetadata = pmd
  1102. cp.ApplyReservedInstancePricing(nodes)
  1103. return nodes, nil
  1104. }
  1105. // TODO: drop some logs
  1106. func (cm *CostModel) GetLBCost(cp costAnalyzerCloud.Provider) (map[serviceKey]*costAnalyzerCloud.LoadBalancer, error) {
  1107. // for fetching prices from cloud provider
  1108. // cfg, err := cp.GetConfig()
  1109. // if err != nil {
  1110. // return nil, err
  1111. // }
  1112. servicesList := cm.Cache.GetAllServices()
  1113. loadBalancerMap := make(map[serviceKey]*costAnalyzerCloud.LoadBalancer)
  1114. for _, service := range servicesList {
  1115. namespace := service.Namespace
  1116. name := service.Name
  1117. key := serviceKey{
  1118. Cluster: env.GetClusterID(),
  1119. Namespace: namespace,
  1120. Service: name,
  1121. }
  1122. if service.Type == "LoadBalancer" {
  1123. loadBalancer, err := cp.LoadBalancerPricing()
  1124. if err != nil {
  1125. return nil, err
  1126. }
  1127. newLoadBalancer := *loadBalancer
  1128. for _, loadBalancerIngress := range service.Status.LoadBalancer.Ingress {
  1129. address := loadBalancerIngress.IP
  1130. // Some cloud providers use hostname rather than IP
  1131. if address == "" {
  1132. address = loadBalancerIngress.Hostname
  1133. }
  1134. newLoadBalancer.IngressIPAddresses = append(newLoadBalancer.IngressIPAddresses, address)
  1135. }
  1136. loadBalancerMap[key] = &newLoadBalancer
  1137. }
  1138. }
  1139. return loadBalancerMap, nil
  1140. }
  1141. func getPodServices(cache clustercache.ClusterCache, podList []*clustercache.Pod, clusterID string) (map[string]map[string][]string, error) {
  1142. servicesList := cache.GetAllServices()
  1143. podServicesMapping := make(map[string]map[string][]string)
  1144. for _, service := range servicesList {
  1145. namespace := service.Namespace
  1146. name := service.Name
  1147. key := namespace + "," + clusterID
  1148. if _, ok := podServicesMapping[key]; !ok {
  1149. podServicesMapping[key] = make(map[string][]string)
  1150. }
  1151. s := labels.Nothing()
  1152. if len(service.SpecSelector) > 0 {
  1153. s = labels.Set(service.SpecSelector).AsSelectorPreValidated()
  1154. }
  1155. for _, pod := range podList {
  1156. labelSet := labels.Set(pod.Labels)
  1157. if s.Matches(labelSet) && pod.Namespace == namespace {
  1158. services, ok := podServicesMapping[key][pod.Name]
  1159. if ok {
  1160. podServicesMapping[key][pod.Name] = append(services, name)
  1161. } else {
  1162. podServicesMapping[key][pod.Name] = []string{name}
  1163. }
  1164. }
  1165. }
  1166. }
  1167. return podServicesMapping, nil
  1168. }
  1169. func getPodStatefulsets(cache clustercache.ClusterCache, podList []*clustercache.Pod, clusterID string) (map[string]map[string][]string, error) {
  1170. ssList := cache.GetAllStatefulSets()
  1171. podSSMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
  1172. for _, ss := range ssList {
  1173. namespace := ss.Namespace
  1174. name := ss.Name
  1175. key := namespace + "," + clusterID
  1176. if _, ok := podSSMapping[key]; !ok {
  1177. podSSMapping[key] = make(map[string][]string)
  1178. }
  1179. s, err := metav1.LabelSelectorAsSelector(ss.SpecSelector)
  1180. if err != nil {
  1181. log.Errorf("Error doing deployment label conversion: " + err.Error())
  1182. }
  1183. for _, pod := range podList {
  1184. labelSet := labels.Set(pod.Labels)
  1185. if s.Matches(labelSet) && pod.Namespace == namespace {
  1186. sss, ok := podSSMapping[key][pod.Name]
  1187. if ok {
  1188. podSSMapping[key][pod.Name] = append(sss, name)
  1189. } else {
  1190. podSSMapping[key][pod.Name] = []string{name}
  1191. }
  1192. }
  1193. }
  1194. }
  1195. return podSSMapping, nil
  1196. }
  1197. func getPodDeployments(cache clustercache.ClusterCache, podList []*clustercache.Pod, clusterID string) (map[string]map[string][]string, error) {
  1198. deploymentsList := cache.GetAllDeployments()
  1199. podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
  1200. for _, deployment := range deploymentsList {
  1201. namespace := deployment.Namespace
  1202. name := deployment.Name
  1203. key := namespace + "," + clusterID
  1204. if _, ok := podDeploymentsMapping[key]; !ok {
  1205. podDeploymentsMapping[key] = make(map[string][]string)
  1206. }
  1207. s, err := metav1.LabelSelectorAsSelector(deployment.SpecSelector)
  1208. if err != nil {
  1209. log.Errorf("Error doing deployment label conversion: %s", err)
  1210. }
  1211. for _, pod := range podList {
  1212. labelSet := labels.Set(pod.Labels)
  1213. if s.Matches(labelSet) && pod.Namespace == namespace {
  1214. deployments, ok := podDeploymentsMapping[key][pod.Name]
  1215. if ok {
  1216. podDeploymentsMapping[key][pod.Name] = append(deployments, name)
  1217. } else {
  1218. podDeploymentsMapping[key][pod.Name] = []string{name}
  1219. }
  1220. }
  1221. }
  1222. }
  1223. return podDeploymentsMapping, nil
  1224. }
  1225. func getNamespaceLabels(cache clustercache.ClusterCache, clusterID string) (map[string]map[string]string, error) {
  1226. nsToLabels := make(map[string]map[string]string)
  1227. nss := cache.GetAllNamespaces()
  1228. for _, ns := range nss {
  1229. labels := make(map[string]string)
  1230. for k, v := range ns.Labels {
  1231. labels[promutil.SanitizeLabelName(k)] = v
  1232. }
  1233. nsToLabels[ns.Name+","+clusterID] = labels
  1234. }
  1235. return nsToLabels, nil
  1236. }
  1237. func getNamespaceAnnotations(cache clustercache.ClusterCache, clusterID string) (map[string]map[string]string, error) {
  1238. nsToAnnotations := make(map[string]map[string]string)
  1239. nss := cache.GetAllNamespaces()
  1240. for _, ns := range nss {
  1241. annotations := make(map[string]string)
  1242. for k, v := range ns.Annotations {
  1243. annotations[promutil.SanitizeLabelName(k)] = v
  1244. }
  1245. nsToAnnotations[ns.Name+","+clusterID] = annotations
  1246. }
  1247. return nsToAnnotations, nil
  1248. }
  1249. func getDaemonsetsOfPod(pod clustercache.Pod) []string {
  1250. for _, ownerReference := range pod.OwnerReferences {
  1251. if ownerReference.Kind == "DaemonSet" {
  1252. return []string{ownerReference.Name}
  1253. }
  1254. }
  1255. return []string{}
  1256. }
  1257. func getJobsOfPod(pod clustercache.Pod) []string {
  1258. for _, ownerReference := range pod.OwnerReferences {
  1259. if ownerReference.Kind == "Job" {
  1260. return []string{ownerReference.Name}
  1261. }
  1262. }
  1263. return []string{}
  1264. }
  1265. func getStatefulSetsOfPod(pod clustercache.Pod) []string {
  1266. for _, ownerReference := range pod.OwnerReferences {
  1267. if ownerReference.Kind == "StatefulSet" {
  1268. return []string{ownerReference.Name}
  1269. }
  1270. }
  1271. return []string{}
  1272. }
  1273. // getGPUCount reads the node's Status and Labels (via the k8s API) to identify
  1274. // the number of GPUs and vGPUs are equipped on the node. If unable to identify
  1275. // a GPU count, it will return -1.
  1276. func getGPUCount(cache clustercache.ClusterCache, n *clustercache.Node) (float64, float64, error) {
  1277. g, hasGpu := n.Status.Capacity["nvidia.com/gpu"]
  1278. _, hasReplicas := n.Labels["nvidia.com/gpu.replicas"]
  1279. // Case 1: Standard NVIDIA GPU
  1280. if hasGpu && g.Value() != 0 && !hasReplicas {
  1281. return float64(g.Value()), float64(g.Value()), nil
  1282. }
  1283. // Case 2: NVIDIA GPU with GPU Feature Discovery (GFD) Pod enabled.
  1284. // Ref: https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/latest/gpu-sharing.html#verifying-the-gpu-time-slicing-configuration
  1285. // Ref: https://github.com/NVIDIA/k8s-device-plugin/blob/d899752a424818428f744a946d32b132ea2c0cf1/internal/lm/resource_test.go#L44-L45
  1286. // Ref: https://github.com/NVIDIA/k8s-device-plugin/blob/d899752a424818428f744a946d32b132ea2c0cf1/internal/lm/resource_test.go#L103-L118
  1287. if hasReplicas {
  1288. resultGPU := 0.0
  1289. resultVGPU := 0.0
  1290. if c, ok := n.Labels["nvidia.com/gpu.count"]; ok {
  1291. var err error
  1292. resultGPU, err = strconv.ParseFloat(c, 64)
  1293. if err != nil {
  1294. return -1, -1, fmt.Errorf("could not parse label \"nvidia.com/gpu.count\": %v", err)
  1295. }
  1296. }
  1297. if s, ok := n.Status.Capacity["nvidia.com/gpu.shared"]; ok { // GFD configured `renameByDefault=true`
  1298. resultVGPU = float64(s.Value())
  1299. } else if g, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // GFD configured `renameByDefault=false`
  1300. resultVGPU = float64(g.Value())
  1301. } else {
  1302. resultVGPU = resultGPU
  1303. }
  1304. return resultGPU, resultVGPU, nil
  1305. }
  1306. // Case 3: AWS vGPU
  1307. if vgpu, ok := n.Status.Capacity["k8s.amazonaws.com/vgpu"]; ok {
  1308. vgpuCount, err := getAllocatableVGPUs(cache)
  1309. if err != nil {
  1310. return -1, -1, err
  1311. }
  1312. vgpuCoeff := 10.0
  1313. if vgpuCount > 0.0 {
  1314. vgpuCoeff = vgpuCount
  1315. }
  1316. if vgpu.Value() != 0 {
  1317. resultGPU := float64(vgpu.Value()) / vgpuCoeff
  1318. resultVGPU := float64(vgpu.Value())
  1319. return resultGPU, resultVGPU, nil
  1320. }
  1321. }
  1322. // No GPU found
  1323. return -1, -1, nil
  1324. }
  1325. func getAllocatableVGPUs(cache clustercache.ClusterCache) (float64, error) {
  1326. daemonsets := cache.GetAllDaemonSets()
  1327. vgpuCount := 0.0
  1328. for _, ds := range daemonsets {
  1329. dsContainerList := &ds.SpecContainers
  1330. for _, ctnr := range *dsContainerList {
  1331. if ctnr.Args != nil {
  1332. for _, arg := range ctnr.Args {
  1333. if strings.Contains(arg, "--vgpu=") {
  1334. vgpus, err := strconv.ParseFloat(arg[strings.IndexByte(arg, '=')+1:], 64)
  1335. if err != nil {
  1336. log.Errorf("failed to parse vgpu allocation string %s: %v", arg, err)
  1337. continue
  1338. }
  1339. vgpuCount = vgpus
  1340. return vgpuCount, nil
  1341. }
  1342. }
  1343. }
  1344. }
  1345. }
  1346. return vgpuCount, nil
  1347. }
  1348. type PersistentVolumeClaimData struct {
  1349. Class string `json:"class"`
  1350. Claim string `json:"claim"`
  1351. Namespace string `json:"namespace"`
  1352. ClusterID string `json:"clusterId"`
  1353. TimesClaimed int `json:"timesClaimed"`
  1354. VolumeName string `json:"volumeName"`
  1355. Volume *costAnalyzerCloud.PV `json:"persistentVolume"`
  1356. Values []*util.Vector `json:"values"`
  1357. }
  1358. func measureTime(start time.Time, threshold time.Duration, name string) {
  1359. elapsed := time.Since(start)
  1360. if elapsed > threshold {
  1361. log.Infof("[Profiler] %s: %s", elapsed, name)
  1362. }
  1363. }
  1364. func (cm *CostModel) QueryAllocation(window opencost.Window, resolution, step time.Duration, aggregate []string, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer bool, accumulateBy opencost.AccumulateOption, shareIdle bool) (*opencost.AllocationSetRange, error) {
  1365. // Validate window is legal
  1366. if window.IsOpen() || window.IsNegative() {
  1367. return nil, fmt.Errorf("illegal window: %s", window)
  1368. }
  1369. var totalsStore opencost.TotalsStore
  1370. // Idle is required for proportional asset costs
  1371. if includeProportionalAssetResourceCosts {
  1372. if !includeIdle {
  1373. return nil, errors.New("bad request - includeIdle must be set true if includeProportionalAssetResourceCosts is true")
  1374. }
  1375. totalsStore = opencost.NewMemoryTotalsStore()
  1376. }
  1377. // Begin with empty response
  1378. asr := opencost.NewAllocationSetRange()
  1379. // Query for AllocationSets in increments of the given step duration,
  1380. // appending each to the response.
  1381. stepStart := *window.Start()
  1382. stepEnd := stepStart.Add(step)
  1383. var isAKS bool
  1384. for window.End().After(stepStart) {
  1385. allocSet, err := cm.ComputeAllocation(stepStart, stepEnd, resolution)
  1386. if err != nil {
  1387. return nil, fmt.Errorf("error computing allocations for %s: %w", opencost.NewClosedWindow(stepStart, stepEnd), err)
  1388. }
  1389. if includeIdle {
  1390. assetSet, err := cm.ComputeAssets(stepStart, stepEnd)
  1391. if err != nil {
  1392. return nil, fmt.Errorf("error computing assets for %s: %w", opencost.NewClosedWindow(stepStart, stepEnd), err)
  1393. }
  1394. if includeProportionalAssetResourceCosts {
  1395. // AKS is a special case - there can be a maximum of 2
  1396. // load balancers (1 public and 1 private) in an AKS cluster
  1397. // therefore, when calculating PARCs for load balancers,
  1398. // we must know if this is an AKS cluster
  1399. for _, node := range assetSet.Nodes {
  1400. if _, found := node.Labels["label_kubernetes_azure_com_cluster"]; found {
  1401. isAKS = true
  1402. break
  1403. }
  1404. }
  1405. _, err := opencost.UpdateAssetTotalsStore(totalsStore, assetSet)
  1406. if err != nil {
  1407. log.Errorf("ETL: error updating asset resource totals for %s: %s", assetSet.Window, err)
  1408. }
  1409. }
  1410. idleSet, err := computeIdleAllocations(allocSet, assetSet, true)
  1411. if err != nil {
  1412. return nil, fmt.Errorf("error computing idle allocations for %s: %w", opencost.NewClosedWindow(stepStart, stepEnd), err)
  1413. }
  1414. for _, idleAlloc := range idleSet.Allocations {
  1415. allocSet.Insert(idleAlloc)
  1416. }
  1417. }
  1418. asr.Append(allocSet)
  1419. stepStart = stepEnd
  1420. stepEnd = stepStart.Add(step)
  1421. }
  1422. // Set aggregation options and aggregate
  1423. var shareIdleOpt string
  1424. if shareIdle {
  1425. shareIdleOpt = opencost.ShareWeighted
  1426. } else {
  1427. shareIdleOpt = opencost.ShareNone
  1428. }
  1429. opts := &opencost.AllocationAggregationOptions{
  1430. IncludeProportionalAssetResourceCosts: includeProportionalAssetResourceCosts,
  1431. IdleByNode: idleByNode,
  1432. IncludeAggregatedMetadata: includeAggregatedMetadata,
  1433. ShareIdle: shareIdleOpt,
  1434. }
  1435. // Aggregate
  1436. err := asr.AggregateBy(aggregate, opts)
  1437. if err != nil {
  1438. return nil, fmt.Errorf("error aggregating for %s: %w", window, err)
  1439. }
  1440. // Accumulate, if requested
  1441. if accumulateBy != opencost.AccumulateOptionNone {
  1442. asr, err = asr.Accumulate(accumulateBy)
  1443. if err != nil {
  1444. log.Errorf("error accumulating by %v: %s", accumulateBy, err)
  1445. return nil, fmt.Errorf("error accumulating by %v: %s", accumulateBy, err)
  1446. }
  1447. // when accumulating and returning PARCs, we need the totals for the
  1448. // accumulated windows to accurately compute a fraction
  1449. if includeProportionalAssetResourceCosts {
  1450. assetSet, err := cm.ComputeAssets(*asr.Window().Start(), *asr.Window().End())
  1451. if err != nil {
  1452. return nil, fmt.Errorf("error computing assets for %s: %w", opencost.NewClosedWindow(*asr.Window().Start(), *asr.Window().End()), err)
  1453. }
  1454. _, err = opencost.UpdateAssetTotalsStore(totalsStore, assetSet)
  1455. if err != nil {
  1456. log.Errorf("ETL: error updating asset resource totals for %s: %s", opencost.NewClosedWindow(*asr.Window().Start(), *asr.Window().End()), err)
  1457. }
  1458. }
  1459. }
  1460. if includeProportionalAssetResourceCosts {
  1461. for _, as := range asr.Allocations {
  1462. totalStoreByNode, ok := totalsStore.GetAssetTotalsByNode(as.Start(), as.End())
  1463. if !ok {
  1464. log.Errorf("unable to locate allocation totals for node for window %v - %v", as.Start(), as.End())
  1465. return nil, fmt.Errorf("unable to locate allocation totals for node for window %v - %v", as.Start(), as.End())
  1466. }
  1467. totalStoreByCluster, ok := totalsStore.GetAssetTotalsByCluster(as.Start(), as.End())
  1468. if !ok {
  1469. log.Errorf("unable to locate allocation totals for cluster for window %v - %v", as.Start(), as.End())
  1470. return nil, fmt.Errorf("unable to locate allocation totals for cluster for window %v - %v", as.Start(), as.End())
  1471. }
  1472. var totalPublicLbCost, totalPrivateLbCost float64
  1473. if isAKS && sharedLoadBalancer {
  1474. // loop through all assetTotals, adding all load balancer costs by public and private
  1475. for _, tot := range totalStoreByNode {
  1476. if tot.PrivateLoadBalancer {
  1477. totalPrivateLbCost += tot.LoadBalancerCost
  1478. } else {
  1479. totalPublicLbCost += tot.LoadBalancerCost
  1480. }
  1481. }
  1482. }
  1483. // loop through each allocation set, using total cost from totals store
  1484. for _, alloc := range as.Allocations {
  1485. for rawKey, parc := range alloc.ProportionalAssetResourceCosts {
  1486. key := strings.TrimSuffix(strings.ReplaceAll(rawKey, ",", "/"), "/")
  1487. // for each parc , check the totals store for each
  1488. // on a totals hit, set the corresponding total and calculate percentage
  1489. var totals *opencost.AssetTotals
  1490. if totalsLoc, found := totalStoreByCluster[key]; found {
  1491. totals = totalsLoc
  1492. }
  1493. if totalsLoc, found := totalStoreByNode[key]; found {
  1494. totals = totalsLoc
  1495. }
  1496. if totals == nil {
  1497. log.Errorf("unable to locate asset totals for allocation %s, corresponding PARC is being skipped", key)
  1498. continue
  1499. }
  1500. parc.CPUTotalCost = totals.CPUCost
  1501. parc.GPUTotalCost = totals.GPUCost
  1502. parc.RAMTotalCost = totals.RAMCost
  1503. parc.PVTotalCost = totals.PersistentVolumeCost
  1504. if isAKS && sharedLoadBalancer && len(alloc.LoadBalancers) > 0 {
  1505. // Azure is a special case - use computed totals above
  1506. // use the lbAllocations in the object to determine if
  1507. // this PARC is a public or private load balancer
  1508. // then set the total accordingly
  1509. // AKS only has 1 public and 1 private load balancer
  1510. lbAlloc, found := alloc.LoadBalancers[key]
  1511. if found {
  1512. if lbAlloc.Private {
  1513. parc.LoadBalancerTotalCost = totalPrivateLbCost
  1514. } else {
  1515. parc.LoadBalancerTotalCost = totalPublicLbCost
  1516. }
  1517. }
  1518. } else {
  1519. parc.LoadBalancerTotalCost = totals.LoadBalancerCost
  1520. }
  1521. opencost.ComputePercentages(&parc)
  1522. alloc.ProportionalAssetResourceCosts[rawKey] = parc
  1523. }
  1524. }
  1525. }
  1526. }
  1527. return asr, nil
  1528. }
  1529. func computeIdleAllocations(allocSet *opencost.AllocationSet, assetSet *opencost.AssetSet, idleByNode bool) (*opencost.AllocationSet, error) {
  1530. if !allocSet.Window.Equal(assetSet.Window) {
  1531. return nil, fmt.Errorf("cannot compute idle allocations for mismatched sets: %s does not equal %s", allocSet.Window, assetSet.Window)
  1532. }
  1533. var allocTotals map[string]*opencost.AllocationTotals
  1534. var assetTotals map[string]*opencost.AssetTotals
  1535. if idleByNode {
  1536. allocTotals = opencost.ComputeAllocationTotals(allocSet, opencost.AllocationNodeProp)
  1537. assetTotals = opencost.ComputeAssetTotals(assetSet, true)
  1538. } else {
  1539. allocTotals = opencost.ComputeAllocationTotals(allocSet, opencost.AllocationClusterProp)
  1540. assetTotals = opencost.ComputeAssetTotals(assetSet, false)
  1541. }
  1542. start, end := *allocSet.Window.Start(), *allocSet.Window.End()
  1543. idleSet := opencost.NewAllocationSet(start, end)
  1544. for key, assetTotal := range assetTotals {
  1545. allocTotal, ok := allocTotals[key]
  1546. if !ok {
  1547. log.Warnf("ETL: did not find allocations for asset key: %s", key)
  1548. // Use a zero-value set of totals. This indicates either (1) an
  1549. // error computing totals, or (2) that no allocations ran on the
  1550. // given node for the given window.
  1551. allocTotal = &opencost.AllocationTotals{
  1552. Cluster: assetTotal.Cluster,
  1553. Node: assetTotal.Node,
  1554. Start: assetTotal.Start,
  1555. End: assetTotal.End,
  1556. }
  1557. }
  1558. // Insert one idle allocation for each key (whether by node or
  1559. // by cluster), defined as the difference between the total
  1560. // asset cost and the allocated cost per-resource.
  1561. name := fmt.Sprintf("%s/%s", key, opencost.IdleSuffix)
  1562. err := idleSet.Insert(&opencost.Allocation{
  1563. Name: name,
  1564. Window: idleSet.Window.Clone(),
  1565. Properties: &opencost.AllocationProperties{
  1566. Cluster: assetTotal.Cluster,
  1567. Node: assetTotal.Node,
  1568. ProviderID: assetTotal.Node,
  1569. },
  1570. Start: assetTotal.Start,
  1571. End: assetTotal.End,
  1572. CPUCost: assetTotal.TotalCPUCost() - allocTotal.TotalCPUCost(),
  1573. GPUCost: assetTotal.TotalGPUCost() - allocTotal.TotalGPUCost(),
  1574. RAMCost: assetTotal.TotalRAMCost() - allocTotal.TotalRAMCost(),
  1575. })
  1576. if err != nil {
  1577. return nil, fmt.Errorf("failed to insert idle allocation %s: %w", name, err)
  1578. }
  1579. }
  1580. return idleSet, nil
  1581. }