cloudcostaggregate.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. package kubecost
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/pkg/filter"
  8. "github.com/opencost/opencost/pkg/log"
  9. )
  10. const (
  11. CloudCostAccountProp string = "account"
  12. CloudCostProjectProp string = "project"
  13. CloudCostProviderProp string = "provider"
  14. CloudCostServiceProp string = "service"
  15. CloudCostLabelProp string = "label"
  16. )
  17. // CloudCostAggregateProperties unique property set for CloudCostAggregate within a window
  18. type CloudCostAggregateProperties struct {
  19. Provider string `json:"provider"`
  20. Account string `json:"account"`
  21. Project string `json:"project"`
  22. Service string `json:"service"`
  23. LabelValue string `json:"label"`
  24. }
  25. func (ccap CloudCostAggregateProperties) Equal(that CloudCostAggregateProperties) bool {
  26. return ccap.Provider == that.Provider &&
  27. ccap.Account == that.Account &&
  28. ccap.Project == that.Project &&
  29. ccap.Service == that.Service &&
  30. ccap.LabelValue == that.LabelValue
  31. }
  32. func (ccap CloudCostAggregateProperties) Key(props []string) string {
  33. if len(props) == 0 {
  34. return fmt.Sprintf("%s/%s/%s/%s/%s", ccap.Provider, ccap.Account, ccap.Project, ccap.Service, ccap.LabelValue)
  35. }
  36. keys := make([]string, len(props))
  37. for i, prop := range props {
  38. key := UnallocatedSuffix
  39. switch prop {
  40. case CloudCostProviderProp:
  41. if ccap.Provider != "" {
  42. key = ccap.Provider
  43. }
  44. case CloudCostAccountProp:
  45. if ccap.Account != "" {
  46. key = ccap.Account
  47. }
  48. case CloudCostProjectProp:
  49. if ccap.Project != "" {
  50. key = ccap.Project
  51. }
  52. case CloudCostServiceProp:
  53. if ccap.Service != "" {
  54. key = ccap.Service
  55. }
  56. case CloudCostLabelProp:
  57. if ccap.LabelValue != "" {
  58. key = ccap.LabelValue
  59. }
  60. }
  61. keys[i] = key
  62. }
  63. return strings.Join(keys, "/")
  64. }
  65. // CloudCostAggregate represents an aggregation of Billing Integration data on the properties listed
  66. // - KubernetesPercent is the percent of the CloudCostAggregates cost which was from an item which could be identified
  67. // as coming from a kubernetes resources.
  68. // - Cost is the sum of the cost of each item in the CloudCostAggregate
  69. // - Credit is the sum of credits applied to each item in the CloudCostAggregate
  70. type CloudCostAggregate struct {
  71. Properties CloudCostAggregateProperties `json:"properties"`
  72. KubernetesPercent float64 `json:"kubernetesPercent"`
  73. Cost float64 `json:"cost"`
  74. Credit float64 `json:"credit"`
  75. }
  76. func (cca *CloudCostAggregate) Clone() *CloudCostAggregate {
  77. return &CloudCostAggregate{
  78. Properties: cca.Properties,
  79. KubernetesPercent: cca.KubernetesPercent,
  80. Cost: cca.Cost,
  81. Credit: cca.Credit,
  82. }
  83. }
  84. func (cca *CloudCostAggregate) Equal(that *CloudCostAggregate) bool {
  85. if that == nil {
  86. return false
  87. }
  88. return cca.Cost == that.Cost &&
  89. cca.Credit == that.Credit &&
  90. cca.Properties.Equal(that.Properties)
  91. }
  92. func (cca *CloudCostAggregate) Key(props []string) string {
  93. return cca.Properties.Key(props)
  94. }
  95. func (cca *CloudCostAggregate) StringProperty(prop string) (string, error) {
  96. if cca == nil {
  97. return "", nil
  98. }
  99. switch prop {
  100. case CloudCostAccountProp:
  101. return cca.Properties.Account, nil
  102. case CloudCostProjectProp:
  103. return cca.Properties.Project, nil
  104. case CloudCostProviderProp:
  105. return cca.Properties.Provider, nil
  106. case CloudCostServiceProp:
  107. return cca.Properties.Service, nil
  108. case CloudCostLabelProp:
  109. return cca.Properties.LabelValue, nil
  110. default:
  111. return "", fmt.Errorf("invalid property name: %s", prop)
  112. }
  113. }
  114. func (cca *CloudCostAggregate) add(that *CloudCostAggregate) {
  115. if cca == nil {
  116. log.Warnf("cannot add to nil CloudCostAggregate")
  117. return
  118. }
  119. // Compute KubernetesPercent for sum
  120. k8sPct := 0.0
  121. sumCost := cca.Cost + that.Cost
  122. if sumCost > 0.0 {
  123. thisK8sCost := cca.Cost * cca.KubernetesPercent
  124. thatK8sCost := that.Cost * that.KubernetesPercent
  125. k8sPct = (thisK8sCost + thatK8sCost) / sumCost
  126. }
  127. cca.Cost = sumCost
  128. cca.Credit += that.Credit
  129. cca.KubernetesPercent = k8sPct
  130. }
  131. type CloudCostAggregateSet struct {
  132. CloudCostAggregates map[string]*CloudCostAggregate `json:"items"`
  133. AggregationProperties []string `json:"-"`
  134. Integration string `json:"-"`
  135. LabelName string `json:"labelName,omitempty"`
  136. Window Window `json:"window"`
  137. }
  138. func NewCloudCostAggregateSet(start, end time.Time, cloudCostAggregates ...*CloudCostAggregate) *CloudCostAggregateSet {
  139. ccas := &CloudCostAggregateSet{
  140. CloudCostAggregates: map[string]*CloudCostAggregate{},
  141. Window: NewWindow(&start, &end),
  142. }
  143. for _, cca := range cloudCostAggregates {
  144. ccas.insertByProperty(cca, nil)
  145. }
  146. return ccas
  147. }
  148. func (ccas *CloudCostAggregateSet) Aggregate(props []string) (*CloudCostAggregateSet, error) {
  149. if ccas == nil {
  150. return nil, errors.New("cannot aggregate a nil CloudCostAggregateSet")
  151. }
  152. if ccas.Window.IsOpen() {
  153. return nil, fmt.Errorf("cannot aggregate a CloudCostAggregateSet with an open window: %s", ccas.Window)
  154. }
  155. // Create a new result set, with the given aggregation property
  156. result := NewCloudCostAggregateSet(*ccas.Window.Start(), *ccas.Window.End())
  157. result.AggregationProperties = props
  158. result.LabelName = ccas.LabelName
  159. result.Integration = ccas.Integration
  160. // Insert clones of each item in the set, keyed by the given property.
  161. // The underlying insert logic will add binned items together.
  162. for name, cca := range ccas.CloudCostAggregates {
  163. ccaClone := cca.Clone()
  164. err := result.insertByProperty(ccaClone, props)
  165. if err != nil {
  166. return nil, fmt.Errorf("error aggregating %s by %v: %s", name, props, err)
  167. }
  168. }
  169. return result, nil
  170. }
  171. func (ccas *CloudCostAggregateSet) Filter(filters filter.Filter[*CloudCostAggregate]) *CloudCostAggregateSet {
  172. if ccas == nil {
  173. return nil
  174. }
  175. result := ccas.Clone()
  176. result.filter(filters)
  177. return result
  178. }
  179. func (ccas *CloudCostAggregateSet) filter(filters filter.Filter[*CloudCostAggregate]) {
  180. if ccas == nil {
  181. return
  182. }
  183. if filters == nil {
  184. return
  185. }
  186. for name, cca := range ccas.CloudCostAggregates {
  187. if !filters.Matches(cca) {
  188. delete(ccas.CloudCostAggregates, name)
  189. }
  190. }
  191. }
  192. func (ccas *CloudCostAggregateSet) Insert(that *CloudCostAggregate) error {
  193. // Publicly, only allow Inserting as a basic operation (i.e. without causing
  194. // an aggregation on a property).
  195. return ccas.insertByProperty(that, nil)
  196. }
  197. func (ccas *CloudCostAggregateSet) insertByProperty(that *CloudCostAggregate, props []string) error {
  198. if ccas == nil {
  199. return fmt.Errorf("cannot insert into nil CloudCostAggregateSet")
  200. }
  201. if ccas.CloudCostAggregates == nil {
  202. ccas.CloudCostAggregates = map[string]*CloudCostAggregate{}
  203. }
  204. // Add the given CloudCostAggregate to the existing entry, if there is one;
  205. // otherwise just set directly into allocations
  206. if _, ok := ccas.CloudCostAggregates[that.Key(props)]; !ok {
  207. ccas.CloudCostAggregates[that.Key(props)] = that
  208. } else {
  209. ccas.CloudCostAggregates[that.Key(props)].add(that)
  210. }
  211. return nil
  212. }
  213. func (ccas *CloudCostAggregateSet) Clone() *CloudCostAggregateSet {
  214. aggs := make(map[string]*CloudCostAggregate, len(ccas.CloudCostAggregates))
  215. for k, v := range ccas.CloudCostAggregates {
  216. aggs[k] = v.Clone()
  217. }
  218. return &CloudCostAggregateSet{
  219. CloudCostAggregates: aggs,
  220. Integration: ccas.Integration,
  221. LabelName: ccas.LabelName,
  222. Window: ccas.Window.Clone(),
  223. }
  224. }
  225. func (ccas *CloudCostAggregateSet) Equal(that *CloudCostAggregateSet) bool {
  226. if ccas.Integration != that.Integration {
  227. return false
  228. }
  229. if ccas.LabelName != that.LabelName {
  230. return false
  231. }
  232. if !ccas.Window.Equal(that.Window) {
  233. return false
  234. }
  235. if len(ccas.CloudCostAggregates) != len(that.CloudCostAggregates) {
  236. return false
  237. }
  238. for k, cca := range ccas.CloudCostAggregates {
  239. tcca, ok := that.CloudCostAggregates[k]
  240. if !ok {
  241. return false
  242. }
  243. if !cca.Equal(tcca) {
  244. return false
  245. }
  246. }
  247. return true
  248. }
  249. func (ccas *CloudCostAggregateSet) IsEmpty() bool {
  250. if ccas == nil {
  251. return true
  252. }
  253. if len(ccas.CloudCostAggregates) == 0 {
  254. return true
  255. }
  256. return false
  257. }
  258. func (ccas *CloudCostAggregateSet) Length() int {
  259. if ccas == nil {
  260. return 0
  261. }
  262. return len(ccas.CloudCostAggregates)
  263. }
  264. func (ccas *CloudCostAggregateSet) GetWindow() Window {
  265. return ccas.Window
  266. }
  267. func (ccas *CloudCostAggregateSet) Merge(that *CloudCostAggregateSet) (*CloudCostAggregateSet, error) {
  268. if ccas == nil || that == nil {
  269. return nil, fmt.Errorf("cannot merge nil CloudCostAggregateSets")
  270. }
  271. if that.IsEmpty() {
  272. return ccas.Clone(), nil
  273. }
  274. if !ccas.Window.Equal(that.Window) {
  275. return nil, fmt.Errorf("cannot merge CloudCostAggregateSets with different windows")
  276. }
  277. if ccas.LabelName != that.LabelName {
  278. return nil, fmt.Errorf("cannot merge CloudCostAggregateSets with different label names: '%s' != '%s'", ccas.LabelName, that.LabelName)
  279. }
  280. start, end := *ccas.Window.Start(), *ccas.Window.End()
  281. result := NewCloudCostAggregateSet(start, end)
  282. result.LabelName = ccas.LabelName
  283. for _, cca := range ccas.CloudCostAggregates {
  284. result.insertByProperty(cca, nil)
  285. }
  286. for _, cca := range that.CloudCostAggregates {
  287. result.insertByProperty(cca, nil)
  288. }
  289. return result, nil
  290. }
  291. func GetCloudCostAggregateSets(start, end time.Time, windowDuration time.Duration, integration string, labelName string) ([]*CloudCostAggregateSet, error) {
  292. windows, err := GetWindows(start, end, windowDuration)
  293. if err != nil {
  294. return nil, err
  295. }
  296. // Build slice of CloudCostAggregateSet to cover the range
  297. CloudCostAggregateSets := []*CloudCostAggregateSet{}
  298. for _, w := range windows {
  299. ccas := NewCloudCostAggregateSet(*w.Start(), *w.End())
  300. ccas.Integration = integration
  301. ccas.LabelName = labelName
  302. CloudCostAggregateSets = append(CloudCostAggregateSets, ccas)
  303. }
  304. return CloudCostAggregateSets, nil
  305. }
  306. // LoadCloudCostAggregateSets creates and loads CloudCostAggregates into provided CloudCostAggregateSets. This method makes it so
  307. // that the input windows do not have to match the one day frame of the Athena queries. CloudCostAggregates being generated from a
  308. // CUR which may be the identical except for the pricing model used (default, RI or savings plan)
  309. // are accumulated here so that the resulting CloudCostAggregate with the 1d window has the correct price for the entire day.
  310. func LoadCloudCostAggregateSets(itemStart time.Time, itemEnd time.Time, properties CloudCostAggregateProperties, K8sPercent, cost, credit float64, CloudCostAggregateSets []*CloudCostAggregateSet) {
  311. // Disperse cost of the current item across one or more CloudCostAggregates in
  312. // across each relevant CloudCostAggregateSet. Stop when the end of the current
  313. // block reaches the item's end time or the end of the range.
  314. for _, ccas := range CloudCostAggregateSets {
  315. pct := ccas.GetWindow().GetPercentInWindow(itemStart, itemEnd)
  316. // Insert an CloudCostAggregate with that cost into the CloudCostAggregateSet at the given index
  317. cca := &CloudCostAggregate{
  318. Properties: properties,
  319. KubernetesPercent: K8sPercent * pct,
  320. Cost: cost * pct,
  321. Credit: credit * pct,
  322. }
  323. err := ccas.insertByProperty(cca, nil)
  324. if err != nil {
  325. log.Errorf("LoadCloudCostAggregateSets: failed to load CloudCostAggregate with key %s and window %s", cca.Key(nil), ccas.GetWindow().String())
  326. }
  327. }
  328. }
  329. type CloudCostAggregateSetRange struct {
  330. CloudCostAggregateSets []*CloudCostAggregateSet `json:"sets"`
  331. Window Window `json:"window"`
  332. }
  333. func (ccasr *CloudCostAggregateSetRange) Accumulate() (*CloudCostAggregateSet, error) {
  334. if ccasr == nil {
  335. return nil, errors.New("cannot accumulate a nil CloudCostAggregateSetRange")
  336. }
  337. if ccasr.Window.IsOpen() {
  338. return nil, fmt.Errorf("cannot accumulate a CloudCostAggregateSetRange with an open window: %s", ccasr.Window)
  339. }
  340. result := NewCloudCostAggregateSet(*ccasr.Window.Start(), *ccasr.Window.End())
  341. for _, ccas := range ccasr.CloudCostAggregateSets {
  342. for name, cca := range ccas.CloudCostAggregates {
  343. err := result.insertByProperty(cca.Clone(), ccas.AggregationProperties)
  344. if err != nil {
  345. return nil, fmt.Errorf("error accumulating CloudCostAggregateSetRange[%s][%s]: %s", ccas.Window.String(), name, err)
  346. }
  347. }
  348. }
  349. return result, nil
  350. }