cloudcostaggregate.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package kubecost
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/pkg/filter"
  8. "github.com/opencost/opencost/pkg/log"
  9. )
  10. const (
  11. CloudCostBillingIDProp string = "billingID"
  12. CloudCostWorkGroupIDProp string = "workGroupID"
  13. CloudCostProviderProp string = "provider"
  14. CloudCostServiceProp string = "service"
  15. CloudCostLabelProp string = "label"
  16. )
  17. // CloudCostAggregateProperties unique property set for CloudCostAggregate within a window
  18. type CloudCostAggregateProperties struct {
  19. Provider string `json:"provider"`
  20. WorkGroupID string `json:"workGroupID"`
  21. BillingID string `json:"billingID"`
  22. Service string `json:"service"`
  23. LabelValue string `json:"label"`
  24. }
  25. func (ccap CloudCostAggregateProperties) Equal(that CloudCostAggregateProperties) bool {
  26. return ccap.Provider == that.Provider &&
  27. ccap.WorkGroupID == that.WorkGroupID &&
  28. ccap.BillingID == that.BillingID &&
  29. ccap.Service == that.Service &&
  30. ccap.LabelValue == that.LabelValue
  31. }
  32. func (ccap CloudCostAggregateProperties) Key(props []string) string {
  33. if len(props) == 0 {
  34. return fmt.Sprintf("%s/%s/%s/%s/%s", ccap.Provider, ccap.BillingID, ccap.WorkGroupID, ccap.Service, ccap.LabelValue)
  35. }
  36. keys := make([]string, len(props))
  37. for i, prop := range props {
  38. key := UnallocatedSuffix
  39. switch prop {
  40. case CloudCostProviderProp:
  41. if ccap.Provider != "" {
  42. key = ccap.Provider
  43. }
  44. case CloudCostBillingIDProp:
  45. if ccap.BillingID != "" {
  46. key = ccap.BillingID
  47. }
  48. case CloudCostWorkGroupIDProp:
  49. if ccap.WorkGroupID != "" {
  50. key = ccap.WorkGroupID
  51. }
  52. case CloudCostServiceProp:
  53. if ccap.Service != "" {
  54. key = ccap.Service
  55. }
  56. case CloudCostLabelProp:
  57. if ccap.LabelValue != "" {
  58. key = ccap.LabelValue
  59. }
  60. }
  61. keys[i] = key
  62. }
  63. return strings.Join(keys, "/")
  64. }
  65. // CloudCostAggregate represents an aggregation of Billing Integration data on the properties listed
  66. // - KubernetesPercent is the percent of the CloudCostAggregates cost which was from an item which could be identified
  67. // as coming from a kubernetes resources.
  68. // - Cost is the sum of the cost of each item in the CloudCostAggregate
  69. // - Credit is the sum of credits applied to each item in the CloudCostAggregate
  70. type CloudCostAggregate struct {
  71. Properties CloudCostAggregateProperties `json:"properties"`
  72. KubernetesPercent float64 `json:"kubernetesPercent"`
  73. Cost float64 `json:"cost"`
  74. NetCost float64 `json:"netCost"`
  75. }
  76. func NewCloudCostAggregate(properties CloudCostAggregateProperties, kubernetesPercent, cost, netCost float64) *CloudCostAggregate {
  77. return &CloudCostAggregate{
  78. Properties: properties,
  79. KubernetesPercent: kubernetesPercent,
  80. Cost: cost,
  81. NetCost: netCost,
  82. }
  83. }
  84. func (cca *CloudCostAggregate) Clone() *CloudCostAggregate {
  85. return &CloudCostAggregate{
  86. Properties: cca.Properties,
  87. KubernetesPercent: cca.KubernetesPercent,
  88. Cost: cca.Cost,
  89. NetCost: cca.NetCost,
  90. }
  91. }
  92. func (cca *CloudCostAggregate) Equal(that *CloudCostAggregate) bool {
  93. if that == nil {
  94. return false
  95. }
  96. return cca.Cost == that.Cost &&
  97. cca.NetCost == that.NetCost &&
  98. cca.Properties.Equal(that.Properties)
  99. }
  100. func (cca *CloudCostAggregate) Key(props []string) string {
  101. return cca.Properties.Key(props)
  102. }
  103. func (cca *CloudCostAggregate) StringProperty(prop string) (string, error) {
  104. if cca == nil {
  105. return "", nil
  106. }
  107. switch prop {
  108. case CloudCostBillingIDProp:
  109. return cca.Properties.BillingID, nil
  110. case CloudCostWorkGroupIDProp:
  111. return cca.Properties.WorkGroupID, nil
  112. case CloudCostProviderProp:
  113. return cca.Properties.Provider, nil
  114. case CloudCostServiceProp:
  115. return cca.Properties.Service, nil
  116. case CloudCostLabelProp:
  117. return cca.Properties.LabelValue, nil
  118. default:
  119. return "", fmt.Errorf("invalid property name: %s", prop)
  120. }
  121. }
  122. func (cca *CloudCostAggregate) add(that *CloudCostAggregate) {
  123. if cca == nil {
  124. log.Warnf("cannot add to nil CloudCostAggregate")
  125. return
  126. }
  127. // Compute KubernetesPercent for sum
  128. k8sPct := 0.0
  129. sumCost := cca.Cost + that.Cost
  130. if sumCost > 0.0 {
  131. thisK8sCost := cca.Cost * cca.KubernetesPercent
  132. thatK8sCost := that.Cost * that.KubernetesPercent
  133. k8sPct = (thisK8sCost + thatK8sCost) / sumCost
  134. }
  135. cca.Cost = sumCost
  136. cca.NetCost += that.NetCost
  137. cca.KubernetesPercent = k8sPct
  138. }
  139. type CloudCostAggregateSet struct {
  140. CloudCostAggregates map[string]*CloudCostAggregate `json:"aggregates"`
  141. AggregationProperties []string `json:"-"`
  142. Integration string `json:"-"`
  143. LabelName string `json:"labelName,omitempty"`
  144. Window Window `json:"window"`
  145. }
  146. func NewCloudCostAggregateSet(start, end time.Time, cloudCostAggregates ...*CloudCostAggregate) *CloudCostAggregateSet {
  147. ccas := &CloudCostAggregateSet{
  148. CloudCostAggregates: map[string]*CloudCostAggregate{},
  149. Window: NewWindow(&start, &end),
  150. }
  151. for _, cca := range cloudCostAggregates {
  152. ccas.insertByProperty(cca, nil)
  153. }
  154. return ccas
  155. }
  156. func (ccas *CloudCostAggregateSet) Aggregate(props []string) (*CloudCostAggregateSet, error) {
  157. if ccas == nil {
  158. return nil, errors.New("cannot aggregate a nil CloudCostAggregateSet")
  159. }
  160. if ccas.Window.IsOpen() {
  161. return nil, fmt.Errorf("cannot aggregate a CloudCostAggregateSet with an open window: %s", ccas.Window)
  162. }
  163. // Create a new result set, with the given aggregation property
  164. result := NewCloudCostAggregateSet(*ccas.Window.Start(), *ccas.Window.End())
  165. result.AggregationProperties = props
  166. result.LabelName = ccas.LabelName
  167. result.Integration = ccas.Integration
  168. // Insert clones of each item in the set, keyed by the given property.
  169. // The underlying insert logic will add binned items together.
  170. for name, cca := range ccas.CloudCostAggregates {
  171. ccaClone := cca.Clone()
  172. err := result.insertByProperty(ccaClone, props)
  173. if err != nil {
  174. return nil, fmt.Errorf("error aggregating %s by %v: %s", name, props, err)
  175. }
  176. }
  177. return result, nil
  178. }
  179. func (ccas *CloudCostAggregateSet) Filter(filters filter.Filter[*CloudCostAggregate]) *CloudCostAggregateSet {
  180. if ccas == nil {
  181. return nil
  182. }
  183. result := ccas.Clone()
  184. result.filter(filters)
  185. return result
  186. }
  187. func (ccas *CloudCostAggregateSet) filter(filters filter.Filter[*CloudCostAggregate]) {
  188. if ccas == nil {
  189. return
  190. }
  191. if filters == nil {
  192. return
  193. }
  194. for name, cca := range ccas.CloudCostAggregates {
  195. if !filters.Matches(cca) {
  196. delete(ccas.CloudCostAggregates, name)
  197. }
  198. }
  199. }
  200. func (ccas *CloudCostAggregateSet) Insert(that *CloudCostAggregate) error {
  201. // Publicly, only allow Inserting as a basic operation (i.e. without causing
  202. // an aggregation on a property).
  203. return ccas.insertByProperty(that, nil)
  204. }
  205. func (ccas *CloudCostAggregateSet) insertByProperty(that *CloudCostAggregate, props []string) error {
  206. if ccas == nil {
  207. return fmt.Errorf("cannot insert into nil CloudCostAggregateSet")
  208. }
  209. if ccas.CloudCostAggregates == nil {
  210. ccas.CloudCostAggregates = map[string]*CloudCostAggregate{}
  211. }
  212. // Add the given CloudCostAggregate to the existing entry, if there is one;
  213. // otherwise just set directly into allocations
  214. if _, ok := ccas.CloudCostAggregates[that.Key(props)]; !ok {
  215. ccas.CloudCostAggregates[that.Key(props)] = that
  216. } else {
  217. ccas.CloudCostAggregates[that.Key(props)].add(that)
  218. }
  219. return nil
  220. }
  221. func (ccas *CloudCostAggregateSet) Clone() *CloudCostAggregateSet {
  222. aggs := make(map[string]*CloudCostAggregate, len(ccas.CloudCostAggregates))
  223. for k, v := range ccas.CloudCostAggregates {
  224. aggs[k] = v.Clone()
  225. }
  226. return &CloudCostAggregateSet{
  227. CloudCostAggregates: aggs,
  228. Integration: ccas.Integration,
  229. LabelName: ccas.LabelName,
  230. Window: ccas.Window.Clone(),
  231. }
  232. }
  233. func (ccas *CloudCostAggregateSet) Equal(that *CloudCostAggregateSet) bool {
  234. if ccas.Integration != that.Integration {
  235. return false
  236. }
  237. if ccas.LabelName != that.LabelName {
  238. return false
  239. }
  240. if !ccas.Window.Equal(that.Window) {
  241. return false
  242. }
  243. if len(ccas.CloudCostAggregates) != len(that.CloudCostAggregates) {
  244. return false
  245. }
  246. for k, cca := range ccas.CloudCostAggregates {
  247. tcca, ok := that.CloudCostAggregates[k]
  248. if !ok {
  249. return false
  250. }
  251. if !cca.Equal(tcca) {
  252. return false
  253. }
  254. }
  255. return true
  256. }
  257. func (ccas *CloudCostAggregateSet) IsEmpty() bool {
  258. if ccas == nil {
  259. return true
  260. }
  261. if len(ccas.CloudCostAggregates) == 0 {
  262. return true
  263. }
  264. return false
  265. }
  266. func (ccas *CloudCostAggregateSet) Length() int {
  267. if ccas == nil {
  268. return 0
  269. }
  270. return len(ccas.CloudCostAggregates)
  271. }
  272. func (ccas *CloudCostAggregateSet) GetWindow() Window {
  273. return ccas.Window
  274. }
  275. func (ccas *CloudCostAggregateSet) Merge(that *CloudCostAggregateSet) (*CloudCostAggregateSet, error) {
  276. if ccas == nil || that == nil {
  277. return nil, fmt.Errorf("cannot merge nil CloudCostAggregateSets")
  278. }
  279. if that.IsEmpty() {
  280. return ccas.Clone(), nil
  281. }
  282. if !ccas.Window.Equal(that.Window) {
  283. return nil, fmt.Errorf("cannot merge CloudCostAggregateSets with different windows")
  284. }
  285. if ccas.LabelName != that.LabelName {
  286. return nil, fmt.Errorf("cannot merge CloudCostAggregateSets with different label names: '%s' != '%s'", ccas.LabelName, that.LabelName)
  287. }
  288. start, end := *ccas.Window.Start(), *ccas.Window.End()
  289. result := NewCloudCostAggregateSet(start, end)
  290. result.LabelName = ccas.LabelName
  291. for _, cca := range ccas.CloudCostAggregates {
  292. result.insertByProperty(cca, nil)
  293. }
  294. for _, cca := range that.CloudCostAggregates {
  295. result.insertByProperty(cca, nil)
  296. }
  297. return result, nil
  298. }
  299. type CloudCostAggregateSetRange struct {
  300. CloudCostAggregateSets []*CloudCostAggregateSet `json:"sets"`
  301. Window Window `json:"window"`
  302. }
  303. // NewCloudCostAggregateSetRange create a CloudCostAggregateSetRange containing CloudCostItemSets with windows of equal duration
  304. // the duration between start and end must be divisible by the window duration argument
  305. func NewCloudCostAggregateSetRange(start, end time.Time, window time.Duration, integration string, labelName string) (*CloudCostAggregateSetRange, error) {
  306. windows, err := GetWindows(start, end, window)
  307. if err != nil {
  308. return nil, err
  309. }
  310. // Build slice of CloudCostAggregateSet to cover the range
  311. cloudCostAggregateSets := make([]*CloudCostAggregateSet, len(windows))
  312. for i, w := range windows {
  313. ccas := NewCloudCostAggregateSet(*w.Start(), *w.End())
  314. ccas.Integration = integration
  315. ccas.LabelName = labelName
  316. cloudCostAggregateSets[i] = ccas
  317. }
  318. return &CloudCostAggregateSetRange{
  319. Window: NewWindow(&start, &end),
  320. CloudCostAggregateSets: cloudCostAggregateSets,
  321. }, nil
  322. }
  323. // LoadCloudCostAggregate loads CloudCostAggregates into existing CloudCostAggregateSets of the CloudCostAggregateSetRange.
  324. // This function service to aggregate and distribute costs over predefined windows
  325. // If all or a portion of the window of the CloudCostAggregate is outside of the windows of the existing CloudCostAggregateSets,
  326. // that portion of the CloudCostAggregate's cost will not be inserted
  327. func (ccasr *CloudCostAggregateSetRange) LoadCloudCostAggregate(window Window, cloudCostAggregate *CloudCostAggregate) {
  328. if window.IsOpen() {
  329. log.Errorf("CloudCostItemSetRange: LoadCloudCostItem: invalid window %s", window.String())
  330. return
  331. }
  332. totalPct := 0.0
  333. // Distribute cost of the current item across one or more CloudCostAggregates in
  334. // across each relevant CloudCostAggregateSet. Stop when the end of the current
  335. // block reaches the item's end time or the end of the range.
  336. for _, ccas := range ccasr.CloudCostAggregateSets {
  337. pct := ccas.GetWindow().GetPercentInWindow(window)
  338. if pct == 0 {
  339. continue
  340. }
  341. cca := cloudCostAggregate
  342. // If the current set Window only contains a portion of the CloudCostItem Window, insert costs relative to that portion
  343. if pct < 1.0 {
  344. cca = &CloudCostAggregate{
  345. Properties: cloudCostAggregate.Properties,
  346. KubernetesPercent: cloudCostAggregate.KubernetesPercent * pct,
  347. Cost: cloudCostAggregate.Cost * pct,
  348. NetCost: cloudCostAggregate.NetCost * pct,
  349. }
  350. }
  351. err := ccas.insertByProperty(cca, nil)
  352. if err != nil {
  353. log.Errorf("LoadCloudCostAggregateSets: failed to load CloudCostAggregate with key %s and window %s", cca.Key(nil), ccas.GetWindow().String())
  354. }
  355. // If all cost has been inserted then finish
  356. totalPct += pct
  357. if totalPct >= 1.0 {
  358. return
  359. }
  360. }
  361. }
  362. func (ccasr *CloudCostAggregateSetRange) Clone() *CloudCostAggregateSetRange {
  363. ccasSlice := make([]*CloudCostAggregateSet, len(ccasr.CloudCostAggregateSets))
  364. for i, ccas := range ccasr.CloudCostAggregateSets {
  365. ccasSlice[i] = ccas.Clone()
  366. }
  367. return &CloudCostAggregateSetRange{
  368. Window: ccasr.Window.Clone(),
  369. CloudCostAggregateSets: ccasSlice,
  370. }
  371. }
  372. func (ccasr *CloudCostAggregateSetRange) IsEmpty() bool {
  373. for _, ccas := range ccasr.CloudCostAggregateSets {
  374. if !ccas.IsEmpty() {
  375. return false
  376. }
  377. }
  378. return true
  379. }
  380. func (ccasr *CloudCostAggregateSetRange) Accumulate() (*CloudCostAggregateSet, error) {
  381. if ccasr == nil {
  382. return nil, errors.New("cannot accumulate a nil CloudCostAggregateSetRange")
  383. }
  384. if ccasr.Window.IsOpen() {
  385. return nil, fmt.Errorf("cannot accumulate a CloudCostAggregateSetRange with an open window: %s", ccasr.Window)
  386. }
  387. result := NewCloudCostAggregateSet(*ccasr.Window.Start(), *ccasr.Window.End())
  388. for _, ccas := range ccasr.CloudCostAggregateSets {
  389. for name, cca := range ccas.CloudCostAggregates {
  390. err := result.insertByProperty(cca.Clone(), ccas.AggregationProperties)
  391. if err != nil {
  392. return nil, fmt.Errorf("error accumulating CloudCostAggregateSetRange[%s][%s]: %s", ccas.Window.String(), name, err)
  393. }
  394. }
  395. }
  396. return result, nil
  397. }