cloudcostaggregate.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. package kubecost
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/pkg/filter"
  8. "github.com/opencost/opencost/pkg/log"
  9. )
  10. const (
  11. CloudCostBillingIDProp string = "billingID"
  12. CloudCostWorkGroupIDProp string = "workGroupID"
  13. CloudCostProviderProp string = "provider"
  14. CloudCostServiceProp string = "service"
  15. CloudCostLabelProp string = "label"
  16. )
  17. // CloudCostAggregateProperties unique property set for CloudCostAggregate within a window
  18. type CloudCostAggregateProperties struct {
  19. Provider string `json:"provider"`
  20. WorkGroupID string `json:"workGroupID"`
  21. BillingID string `json:"billingID"`
  22. Service string `json:"service"`
  23. LabelValue string `json:"label"`
  24. }
  25. func (ccap CloudCostAggregateProperties) Equal(that CloudCostAggregateProperties) bool {
  26. return ccap.Provider == that.Provider &&
  27. ccap.WorkGroupID == that.WorkGroupID &&
  28. ccap.BillingID == that.BillingID &&
  29. ccap.Service == that.Service &&
  30. ccap.LabelValue == that.LabelValue
  31. }
  32. // Intersection ensure the values of two CloudCostAggregateProperties are maintain only if they are equal
  33. func (ccap CloudCostAggregateProperties) Intersection(that CloudCostAggregateProperties) CloudCostAggregateProperties {
  34. if ccap.Equal(that) {
  35. return ccap
  36. }
  37. intersectionCCAP := CloudCostAggregateProperties{}
  38. if ccap == intersectionCCAP || that == intersectionCCAP {
  39. return intersectionCCAP
  40. }
  41. if ccap.Provider == that.Provider {
  42. intersectionCCAP.Provider = ccap.Provider
  43. }
  44. if ccap.WorkGroupID == that.WorkGroupID {
  45. intersectionCCAP.WorkGroupID = ccap.WorkGroupID
  46. }
  47. if ccap.BillingID == that.BillingID {
  48. intersectionCCAP.BillingID = ccap.BillingID
  49. }
  50. if ccap.Service == that.Service {
  51. intersectionCCAP.Service = ccap.Service
  52. }
  53. if ccap.LabelValue == that.LabelValue {
  54. intersectionCCAP.LabelValue = ccap.LabelValue
  55. }
  56. return intersectionCCAP
  57. }
  58. func (ccap CloudCostAggregateProperties) Key(props []string) string {
  59. if len(props) == 0 {
  60. return fmt.Sprintf("%s/%s/%s/%s/%s", ccap.Provider, ccap.BillingID, ccap.WorkGroupID, ccap.Service, ccap.LabelValue)
  61. }
  62. keys := make([]string, len(props))
  63. for i, prop := range props {
  64. key := UnallocatedSuffix
  65. switch prop {
  66. case CloudCostProviderProp:
  67. if ccap.Provider != "" {
  68. key = ccap.Provider
  69. }
  70. case CloudCostBillingIDProp:
  71. if ccap.BillingID != "" {
  72. key = ccap.BillingID
  73. }
  74. case CloudCostWorkGroupIDProp:
  75. if ccap.WorkGroupID != "" {
  76. key = ccap.WorkGroupID
  77. }
  78. case CloudCostServiceProp:
  79. if ccap.Service != "" {
  80. key = ccap.Service
  81. }
  82. case CloudCostLabelProp:
  83. if ccap.LabelValue != "" {
  84. key = ccap.LabelValue
  85. }
  86. }
  87. keys[i] = key
  88. }
  89. return strings.Join(keys, "/")
  90. }
  91. // CloudCostAggregate represents an aggregation of Billing Integration data on the properties listed
  92. // - KubernetesPercent is the percent of the CloudCostAggregates cost which was from an item which could be identified
  93. // as coming from a kubernetes resources.
  94. // - Cost is the sum of the cost of each item in the CloudCostAggregate
  95. // - Credit is the sum of credits applied to each item in the CloudCostAggregate
  96. type CloudCostAggregate struct {
  97. Properties CloudCostAggregateProperties `json:"properties"`
  98. KubernetesPercent float64 `json:"kubernetesPercent"`
  99. Cost float64 `json:"cost"`
  100. NetCost float64 `json:"netCost"`
  101. }
  102. func NewCloudCostAggregate(properties CloudCostAggregateProperties, kubernetesPercent, cost, netCost float64) *CloudCostAggregate {
  103. return &CloudCostAggregate{
  104. Properties: properties,
  105. KubernetesPercent: kubernetesPercent,
  106. Cost: cost,
  107. NetCost: netCost,
  108. }
  109. }
  110. func (cca *CloudCostAggregate) Clone() *CloudCostAggregate {
  111. return &CloudCostAggregate{
  112. Properties: cca.Properties,
  113. KubernetesPercent: cca.KubernetesPercent,
  114. Cost: cca.Cost,
  115. NetCost: cca.NetCost,
  116. }
  117. }
  118. func (cca *CloudCostAggregate) Equal(that *CloudCostAggregate) bool {
  119. if that == nil {
  120. return false
  121. }
  122. return cca.Cost == that.Cost &&
  123. cca.NetCost == that.NetCost &&
  124. cca.Properties.Equal(that.Properties)
  125. }
  126. func (cca *CloudCostAggregate) Key(props []string) string {
  127. return cca.Properties.Key(props)
  128. }
  129. func (cca *CloudCostAggregate) StringProperty(prop string) (string, error) {
  130. if cca == nil {
  131. return "", nil
  132. }
  133. switch prop {
  134. case CloudCostBillingIDProp:
  135. return cca.Properties.BillingID, nil
  136. case CloudCostWorkGroupIDProp:
  137. return cca.Properties.WorkGroupID, nil
  138. case CloudCostProviderProp:
  139. return cca.Properties.Provider, nil
  140. case CloudCostServiceProp:
  141. return cca.Properties.Service, nil
  142. case CloudCostLabelProp:
  143. return cca.Properties.LabelValue, nil
  144. default:
  145. return "", fmt.Errorf("invalid property name: %s", prop)
  146. }
  147. }
  148. func (cca *CloudCostAggregate) add(that *CloudCostAggregate) {
  149. if cca == nil {
  150. log.Warnf("cannot add to nil CloudCostAggregate")
  151. return
  152. }
  153. // Preserve string properties of cloud cost aggregates that are matching between the two CloudCostAggregate
  154. cca.Properties = cca.Properties.Intersection(that.Properties)
  155. // Compute KubernetesPercent for sum
  156. k8sPct := 0.0
  157. sumCost := cca.Cost + that.Cost
  158. if sumCost > 0.0 {
  159. thisK8sCost := cca.Cost * cca.KubernetesPercent
  160. thatK8sCost := that.Cost * that.KubernetesPercent
  161. k8sPct = (thisK8sCost + thatK8sCost) / sumCost
  162. }
  163. cca.Cost = sumCost
  164. cca.NetCost += that.NetCost
  165. cca.KubernetesPercent = k8sPct
  166. }
  167. type CloudCostAggregateSet struct {
  168. CloudCostAggregates map[string]*CloudCostAggregate `json:"aggregates"`
  169. AggregationProperties []string `json:"-"`
  170. Integration string `json:"-"`
  171. LabelName string `json:"labelName,omitempty"`
  172. Window Window `json:"window"`
  173. }
  174. func NewCloudCostAggregateSet(start, end time.Time, cloudCostAggregates ...*CloudCostAggregate) *CloudCostAggregateSet {
  175. ccas := &CloudCostAggregateSet{
  176. CloudCostAggregates: map[string]*CloudCostAggregate{},
  177. Window: NewWindow(&start, &end),
  178. }
  179. for _, cca := range cloudCostAggregates {
  180. ccas.insertByProperty(cca, nil)
  181. }
  182. return ccas
  183. }
  184. func (ccas *CloudCostAggregateSet) Aggregate(props []string) (*CloudCostAggregateSet, error) {
  185. if ccas == nil {
  186. return nil, errors.New("cannot aggregate a nil CloudCostAggregateSet")
  187. }
  188. if ccas.Window.IsOpen() {
  189. return nil, fmt.Errorf("cannot aggregate a CloudCostAggregateSet with an open window: %s", ccas.Window)
  190. }
  191. // Create a new result set, with the given aggregation property
  192. result := NewCloudCostAggregateSet(*ccas.Window.Start(), *ccas.Window.End())
  193. result.AggregationProperties = props
  194. result.LabelName = ccas.LabelName
  195. result.Integration = ccas.Integration
  196. // Insert clones of each item in the set, keyed by the given property.
  197. // The underlying insert logic will add binned items together.
  198. for name, cca := range ccas.CloudCostAggregates {
  199. ccaClone := cca.Clone()
  200. err := result.insertByProperty(ccaClone, props)
  201. if err != nil {
  202. return nil, fmt.Errorf("error aggregating %s by %v: %s", name, props, err)
  203. }
  204. }
  205. return result, nil
  206. }
  207. func (ccas *CloudCostAggregateSet) Filter(filters filter.Filter[*CloudCostAggregate]) *CloudCostAggregateSet {
  208. if ccas == nil {
  209. return nil
  210. }
  211. result := ccas.Clone()
  212. result.filter(filters)
  213. return result
  214. }
  215. func (ccas *CloudCostAggregateSet) filter(filters filter.Filter[*CloudCostAggregate]) {
  216. if ccas == nil {
  217. return
  218. }
  219. if filters == nil {
  220. return
  221. }
  222. for name, cca := range ccas.CloudCostAggregates {
  223. if !filters.Matches(cca) {
  224. delete(ccas.CloudCostAggregates, name)
  225. }
  226. }
  227. }
  228. func (ccas *CloudCostAggregateSet) Insert(that *CloudCostAggregate) error {
  229. // Publicly, only allow Inserting as a basic operation (i.e. without causing
  230. // an aggregation on a property).
  231. return ccas.insertByProperty(that, nil)
  232. }
  233. func (ccas *CloudCostAggregateSet) insertByProperty(that *CloudCostAggregate, props []string) error {
  234. if ccas == nil {
  235. return fmt.Errorf("cannot insert into nil CloudCostAggregateSet")
  236. }
  237. if ccas.CloudCostAggregates == nil {
  238. ccas.CloudCostAggregates = map[string]*CloudCostAggregate{}
  239. }
  240. // Add the given CloudCostAggregate to the existing entry, if there is one;
  241. // otherwise just set directly into allocations
  242. if _, ok := ccas.CloudCostAggregates[that.Key(props)]; !ok {
  243. ccas.CloudCostAggregates[that.Key(props)] = that
  244. } else {
  245. ccas.CloudCostAggregates[that.Key(props)].add(that)
  246. }
  247. return nil
  248. }
  249. func (ccas *CloudCostAggregateSet) Clone() *CloudCostAggregateSet {
  250. aggs := make(map[string]*CloudCostAggregate, len(ccas.CloudCostAggregates))
  251. for k, v := range ccas.CloudCostAggregates {
  252. aggs[k] = v.Clone()
  253. }
  254. return &CloudCostAggregateSet{
  255. CloudCostAggregates: aggs,
  256. Integration: ccas.Integration,
  257. LabelName: ccas.LabelName,
  258. Window: ccas.Window.Clone(),
  259. }
  260. }
  261. func (ccas *CloudCostAggregateSet) Equal(that *CloudCostAggregateSet) bool {
  262. if ccas.Integration != that.Integration {
  263. return false
  264. }
  265. if ccas.LabelName != that.LabelName {
  266. return false
  267. }
  268. if !ccas.Window.Equal(that.Window) {
  269. return false
  270. }
  271. if len(ccas.CloudCostAggregates) != len(that.CloudCostAggregates) {
  272. return false
  273. }
  274. for k, cca := range ccas.CloudCostAggregates {
  275. tcca, ok := that.CloudCostAggregates[k]
  276. if !ok {
  277. return false
  278. }
  279. if !cca.Equal(tcca) {
  280. return false
  281. }
  282. }
  283. return true
  284. }
  285. func (ccas *CloudCostAggregateSet) IsEmpty() bool {
  286. if ccas == nil {
  287. return true
  288. }
  289. if len(ccas.CloudCostAggregates) == 0 {
  290. return true
  291. }
  292. return false
  293. }
  294. func (ccas *CloudCostAggregateSet) Length() int {
  295. if ccas == nil {
  296. return 0
  297. }
  298. return len(ccas.CloudCostAggregates)
  299. }
  300. func (ccas *CloudCostAggregateSet) GetWindow() Window {
  301. return ccas.Window
  302. }
  303. func (ccas *CloudCostAggregateSet) Merge(that *CloudCostAggregateSet) (*CloudCostAggregateSet, error) {
  304. if ccas == nil || that == nil {
  305. return nil, fmt.Errorf("cannot merge nil CloudCostAggregateSets")
  306. }
  307. if that.IsEmpty() {
  308. return ccas.Clone(), nil
  309. }
  310. if !ccas.Window.Equal(that.Window) {
  311. return nil, fmt.Errorf("cannot merge CloudCostAggregateSets with different windows")
  312. }
  313. if ccas.LabelName != that.LabelName {
  314. return nil, fmt.Errorf("cannot merge CloudCostAggregateSets with different label names: '%s' != '%s'", ccas.LabelName, that.LabelName)
  315. }
  316. start, end := *ccas.Window.Start(), *ccas.Window.End()
  317. result := NewCloudCostAggregateSet(start, end)
  318. result.LabelName = ccas.LabelName
  319. for _, cca := range ccas.CloudCostAggregates {
  320. result.insertByProperty(cca, nil)
  321. }
  322. for _, cca := range that.CloudCostAggregates {
  323. result.insertByProperty(cca, nil)
  324. }
  325. return result, nil
  326. }
  327. type CloudCostAggregateSetRange struct {
  328. CloudCostAggregateSets []*CloudCostAggregateSet `json:"sets"`
  329. Window Window `json:"window"`
  330. }
  331. // NewCloudCostAggregateSetRange create a CloudCostAggregateSetRange containing CloudCostItemSets with windows of equal duration
  332. // the duration between start and end must be divisible by the window duration argument
  333. func NewCloudCostAggregateSetRange(start, end time.Time, window time.Duration, integration string, labelName string) (*CloudCostAggregateSetRange, error) {
  334. windows, err := GetWindows(start, end, window)
  335. if err != nil {
  336. return nil, err
  337. }
  338. // Build slice of CloudCostAggregateSet to cover the range
  339. cloudCostAggregateSets := make([]*CloudCostAggregateSet, len(windows))
  340. for i, w := range windows {
  341. ccas := NewCloudCostAggregateSet(*w.Start(), *w.End())
  342. ccas.Integration = integration
  343. ccas.LabelName = labelName
  344. cloudCostAggregateSets[i] = ccas
  345. }
  346. return &CloudCostAggregateSetRange{
  347. Window: NewWindow(&start, &end),
  348. CloudCostAggregateSets: cloudCostAggregateSets,
  349. }, nil
  350. }
  351. // LoadCloudCostAggregate loads CloudCostAggregates into existing CloudCostAggregateSets of the CloudCostAggregateSetRange.
  352. // This function service to aggregate and distribute costs over predefined windows
  353. // If all or a portion of the window of the CloudCostAggregate is outside of the windows of the existing CloudCostAggregateSets,
  354. // that portion of the CloudCostAggregate's cost will not be inserted
  355. func (ccasr *CloudCostAggregateSetRange) LoadCloudCostAggregate(window Window, cloudCostAggregate *CloudCostAggregate) {
  356. if window.IsOpen() {
  357. log.Errorf("CloudCostItemSetRange: LoadCloudCostItem: invalid window %s", window.String())
  358. return
  359. }
  360. totalPct := 0.0
  361. // Distribute cost of the current item across one or more CloudCostAggregates in
  362. // across each relevant CloudCostAggregateSet. Stop when the end of the current
  363. // block reaches the item's end time or the end of the range.
  364. for _, ccas := range ccasr.CloudCostAggregateSets {
  365. pct := ccas.GetWindow().GetPercentInWindow(window)
  366. if pct == 0 {
  367. continue
  368. }
  369. cca := cloudCostAggregate
  370. // If the current set Window only contains a portion of the CloudCostItem Window, insert costs relative to that portion
  371. if pct < 1.0 {
  372. cca = &CloudCostAggregate{
  373. Properties: cloudCostAggregate.Properties,
  374. KubernetesPercent: cloudCostAggregate.KubernetesPercent * pct,
  375. Cost: cloudCostAggregate.Cost * pct,
  376. NetCost: cloudCostAggregate.NetCost * pct,
  377. }
  378. }
  379. err := ccas.insertByProperty(cca, nil)
  380. if err != nil {
  381. log.Errorf("LoadCloudCostAggregateSets: failed to load CloudCostAggregate with key %s and window %s", cca.Key(nil), ccas.GetWindow().String())
  382. }
  383. // If all cost has been inserted then finish
  384. totalPct += pct
  385. if totalPct >= 1.0 {
  386. return
  387. }
  388. }
  389. }
  390. func (ccasr *CloudCostAggregateSetRange) Clone() *CloudCostAggregateSetRange {
  391. ccasSlice := make([]*CloudCostAggregateSet, len(ccasr.CloudCostAggregateSets))
  392. for i, ccas := range ccasr.CloudCostAggregateSets {
  393. ccasSlice[i] = ccas.Clone()
  394. }
  395. return &CloudCostAggregateSetRange{
  396. Window: ccasr.Window.Clone(),
  397. CloudCostAggregateSets: ccasSlice,
  398. }
  399. }
  400. func (ccasr *CloudCostAggregateSetRange) IsEmpty() bool {
  401. for _, ccas := range ccasr.CloudCostAggregateSets {
  402. if !ccas.IsEmpty() {
  403. return false
  404. }
  405. }
  406. return true
  407. }
  408. func (ccasr *CloudCostAggregateSetRange) Accumulate() (*CloudCostAggregateSet, error) {
  409. if ccasr == nil {
  410. return nil, errors.New("cannot accumulate a nil CloudCostAggregateSetRange")
  411. }
  412. if ccasr.Window.IsOpen() {
  413. return nil, fmt.Errorf("cannot accumulate a CloudCostAggregateSetRange with an open window: %s", ccasr.Window)
  414. }
  415. result := NewCloudCostAggregateSet(*ccasr.Window.Start(), *ccasr.Window.End())
  416. for _, ccas := range ccasr.CloudCostAggregateSets {
  417. for name, cca := range ccas.CloudCostAggregates {
  418. err := result.insertByProperty(cca.Clone(), ccas.AggregationProperties)
  419. if err != nil {
  420. return nil, fmt.Errorf("error accumulating CloudCostAggregateSetRange[%s][%s]: %s", ccas.Window.String(), name, err)
  421. }
  422. }
  423. }
  424. return result, nil
  425. }