|
|
@@ -57,15 +57,18 @@ type Asset interface {
|
|
|
}
|
|
|
|
|
|
// key is used to determine uniqueness of an Asset, for instance during Insert
|
|
|
-// to determine if two Assets should be combined. Passing nil props indicates
|
|
|
-// that all available props should be used. Passing empty props indicates that
|
|
|
-// no props should be used (e.g. to aggregate all assets). Passing one or more
|
|
|
-// props will key by only those props.
|
|
|
-func key(a Asset, aggStrings []string) string {
|
|
|
+// to determine if two Assets should be combined. Passing `nil` `aggregateBy` indicates
|
|
|
+// that all available `AssetProperty` keys should be used. Passing empty `aggregateBy` indicates that
|
|
|
+// no key should be used (e.g. to aggregate all assets). Passing one or more `aggregateBy`
|
|
|
+// values will key by only those values.
|
|
|
+// Valid values of `aggregateBy` elements are strings which are an `AssetProperty`, and strings prefixed
|
|
|
+// with `"label:"`.
|
|
|
+func key(a Asset, aggregateBy []string) (string, []error) {
|
|
|
+ var errors []error
|
|
|
keys := []string{}
|
|
|
|
|
|
- if aggStrings == nil {
|
|
|
- aggStrings = []string{
|
|
|
+ if aggregateBy == nil {
|
|
|
+ aggregateBy = []string{
|
|
|
string(AssetProviderProp),
|
|
|
string(AssetAccountProp),
|
|
|
string(AssetProjectProp),
|
|
|
@@ -78,7 +81,7 @@ func key(a Asset, aggStrings []string) string {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for _, s := range aggStrings {
|
|
|
+ for _, s := range aggregateBy {
|
|
|
switch true {
|
|
|
case s == string(AssetProviderProp) && a.Properties().Provider != "":
|
|
|
keys = append(keys, a.Properties().Provider)
|
|
|
@@ -104,10 +107,19 @@ func key(a Asset, aggStrings []string) string {
|
|
|
} else {
|
|
|
keys = append(keys, "__unallocated__")
|
|
|
}
|
|
|
+ default:
|
|
|
+ _, err := ParseAssetProperty(s)
|
|
|
+ if err != nil {
|
|
|
+ if errors == nil {
|
|
|
+ errors = []error{}
|
|
|
+ }
|
|
|
+ errors = append(errors, fmt.Errorf("Invalid asset aggregation key: %s", s))
|
|
|
+ } else {
|
|
|
+ keys = append(keys, "__unallocated__")
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return strings.Join(keys, "/")
|
|
|
+ return strings.Join(keys, "/"), errors
|
|
|
}
|
|
|
|
|
|
func toString(a Asset) string {
|
|
|
@@ -2310,12 +2322,11 @@ func (sa *SharedAsset) String() string {
|
|
|
// a window. An AssetSet is mutable, so treat it like a threadsafe map.
|
|
|
type AssetSet struct {
|
|
|
sync.RWMutex
|
|
|
- aggStrings []string
|
|
|
- assets map[string]Asset
|
|
|
- props []AssetProperty
|
|
|
- Window Window
|
|
|
- Warnings []string
|
|
|
- Errors []string
|
|
|
+ aggregateBy []string
|
|
|
+ assets map[string]Asset
|
|
|
+ Window Window
|
|
|
+ Warnings []string
|
|
|
+ Errors []string
|
|
|
}
|
|
|
|
|
|
// NewAssetSet instantiates a new AssetSet and, optionally, inserts
|
|
|
@@ -2336,7 +2347,7 @@ func NewAssetSet(start, end time.Time, assets ...Asset) *AssetSet {
|
|
|
// AggregateBy aggregates the Assets in the AssetSet by the given list of
|
|
|
// AssetProperties, such that each asset is binned by a key determined by its
|
|
|
// relevant property values.
|
|
|
-func (as *AssetSet) AggregateBy(aggStrings []string, opts *AssetAggregationOptions) error {
|
|
|
+func (as *AssetSet) AggregateBy(aggregateBy []string, opts *AssetAggregationOptions) error {
|
|
|
if opts == nil {
|
|
|
opts = &AssetAggregationOptions{}
|
|
|
}
|
|
|
@@ -2348,21 +2359,8 @@ func (as *AssetSet) AggregateBy(aggStrings []string, opts *AssetAggregationOptio
|
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
|
|
|
|
- // Parse enumerated asset properties from given aggregation strings
|
|
|
- props := []AssetProperty{}
|
|
|
- if aggStrings == nil {
|
|
|
- props = nil
|
|
|
- } else {
|
|
|
- for _, s := range aggStrings {
|
|
|
- if prop, err := ParseAssetProperty(s); err == nil {
|
|
|
- props = append(props, prop)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
aggSet := NewAssetSet(as.Start(), as.End())
|
|
|
- aggSet.aggStrings = aggStrings
|
|
|
- aggSet.props = props
|
|
|
+ aggSet.aggregateBy = aggregateBy
|
|
|
|
|
|
// Compute hours of the given AssetSet, and if it ends in the future,
|
|
|
// adjust the hours accordingly
|
|
|
@@ -2377,7 +2375,10 @@ func (as *AssetSet) AggregateBy(aggStrings []string, opts *AssetAggregationOptio
|
|
|
sa := NewSharedAsset(name, as.Window.Clone())
|
|
|
sa.Cost = hourlyCost * hours
|
|
|
|
|
|
- aggSet.Insert(sa)
|
|
|
+ err := aggSet.Insert(sa)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Delete the Assets that don't pass each filter
|
|
|
@@ -2389,16 +2390,18 @@ func (as *AssetSet) AggregateBy(aggStrings []string, opts *AssetAggregationOptio
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Insert each asset into the new set, which will be keyed by the props
|
|
|
+ // Insert each asset into the new set, which will be keyed by the `aggregateBy`
|
|
|
// on aggSet, resulting in aggregation.
|
|
|
for _, asset := range as.assets {
|
|
|
- aggSet.Insert(asset)
|
|
|
+ err := aggSet.Insert(asset)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Assign the aggregated values back to the original set
|
|
|
as.assets = aggSet.assets
|
|
|
- as.aggStrings = aggStrings
|
|
|
- as.props = props
|
|
|
+ as.aggregateBy = aggregateBy
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
@@ -2413,26 +2416,26 @@ func (as *AssetSet) Clone() *AssetSet {
|
|
|
as.RLock()
|
|
|
defer as.RUnlock()
|
|
|
|
|
|
+ var aggregateBy []string
|
|
|
+ if as.aggregateBy != nil {
|
|
|
+ aggregateBy := []string{}
|
|
|
+ for _, s := range as.aggregateBy {
|
|
|
+ aggregateBy = append(aggregateBy, s)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
assets := map[string]Asset{}
|
|
|
for k, v := range as.assets {
|
|
|
assets[k] = v.Clone()
|
|
|
}
|
|
|
|
|
|
- var props []AssetProperty
|
|
|
- if as.props != nil {
|
|
|
- props = []AssetProperty{}
|
|
|
- for _, p := range as.props {
|
|
|
- props = append(props, p)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
s := as.Start()
|
|
|
e := as.End()
|
|
|
|
|
|
return &AssetSet{
|
|
|
- Window: NewWindow(&s, &e),
|
|
|
- assets: assets,
|
|
|
- props: props,
|
|
|
+ Window: NewWindow(&s, &e),
|
|
|
+ aggregateBy: aggregateBy,
|
|
|
+ assets: assets,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -2455,23 +2458,26 @@ func (as *AssetSet) End() time.Time {
|
|
|
// FindMatch attempts to find a match in the AssetSet for the given Asset on
|
|
|
// the provided properties and labels. If a match is not found, FindMatch
|
|
|
// returns nil and a Not Found error.
|
|
|
-func (as *AssetSet) FindMatch(query Asset, aggStrings []string) (Asset, error) {
|
|
|
+func (as *AssetSet) FindMatch(query Asset, aggregateBy []string) (Asset, error) {
|
|
|
as.RLock()
|
|
|
defer as.RUnlock()
|
|
|
|
|
|
- matchKey := key(query, aggStrings)
|
|
|
+ matchKey, errors := key(query, aggregateBy)
|
|
|
+ if errors != nil {
|
|
|
+ return nil, fmt.Errorf("Bad key(s) given to FindMatch: %v", errors)
|
|
|
+ }
|
|
|
for _, asset := range as.assets {
|
|
|
- if key(asset, aggStrings) == matchKey {
|
|
|
+ if k, _ := key(asset, aggregateBy); k == matchKey {
|
|
|
return asset, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return nil, fmt.Errorf("Asset not found to match %s on %v", query, aggStrings)
|
|
|
+ return nil, fmt.Errorf("Asset not found to match %s on %v", query, aggregateBy)
|
|
|
}
|
|
|
|
|
|
// ReconciliationMatch attempts to find an exact match in the AssetSet on
|
|
|
// (Category, ProviderID). If a match is found, it returns the Asset with the
|
|
|
-// intent to adjuts it. If no match exists, it attempts to find one on only
|
|
|
+// intent to adjut it. If no match exists, it attempts to find one on only
|
|
|
// (ProviderID). If that match is found, it returns the Asset with the intent
|
|
|
// to insert the associated Cloud cost.
|
|
|
func (as *AssetSet) ReconciliationMatch(query Asset) (Asset, bool, error) {
|
|
|
@@ -2480,19 +2486,25 @@ func (as *AssetSet) ReconciliationMatch(query Asset) (Asset, bool, error) {
|
|
|
|
|
|
// Full match means matching on (Category, ProviderID)
|
|
|
fullMatchProps := []string{string(AssetCategoryProp), string(AssetProviderIDProp)}
|
|
|
- fullMatchKey := key(query, fullMatchProps)
|
|
|
+ fullMatchKey, fullMatchErrors := key(query, fullMatchProps)
|
|
|
+
|
|
|
+ // This should never happen because we are using enumerated properties,
|
|
|
+ // but the check is here in case that changes
|
|
|
+ if fullMatchErrors != nil {
|
|
|
+ return nil, false, fmt.Errorf("ReconciliationMatch: Invalid key(s) passed: %v", fullMatchErrors)
|
|
|
+ }
|
|
|
|
|
|
// Partial match means matching only on (ProviderID)
|
|
|
providerIDMatchProps := []string{string(AssetProviderIDProp)}
|
|
|
- providerIDMatchKey := key(query, providerIDMatchProps)
|
|
|
+ providerIDMatchKey, _ := key(query, providerIDMatchProps)
|
|
|
|
|
|
var providerIDMatch Asset
|
|
|
for _, asset := range as.assets {
|
|
|
- if key(asset, fullMatchProps) == fullMatchKey {
|
|
|
+ if k, _ := key(asset, fullMatchProps); k == fullMatchKey {
|
|
|
log.DedupedInfof(10, "Asset ETL: Reconciliation[rcnw]: ReconcileRange Match: %s", fullMatchKey)
|
|
|
return asset, true, nil
|
|
|
}
|
|
|
- if key(asset, providerIDMatchProps) == providerIDMatchKey {
|
|
|
+ if k, _ := key(asset, providerIDMatchProps); k == providerIDMatchKey {
|
|
|
// Found a partial match. Save it until after all other options
|
|
|
// have been checked for full matches.
|
|
|
providerIDMatch = asset
|
|
|
@@ -2533,7 +2545,10 @@ func (as *AssetSet) Insert(asset Asset) error {
|
|
|
defer as.Unlock()
|
|
|
|
|
|
// Determine key into which to Insert the Asset.
|
|
|
- k := key(asset, as.aggStrings)
|
|
|
+ k, keyErrors := key(asset, as.aggregateBy)
|
|
|
+ if keyErrors != nil {
|
|
|
+ return fmt.Errorf("Attempted Insert on an `AssetSet` with invalid `aggregateBy` values: %v", keyErrors)
|
|
|
+ }
|
|
|
|
|
|
// Add the given Asset to the existing entry, if there is one;
|
|
|
// otherwise just set directly into assets
|
|
|
@@ -2588,7 +2603,7 @@ func (as *AssetSet) MarshalJSON() ([]byte, error) {
|
|
|
return json.Marshal(as.assets)
|
|
|
}
|
|
|
|
|
|
-func (as *AssetSet) Set(asset Asset, props []AssetProperty) {
|
|
|
+func (as *AssetSet) Set(asset Asset, aggregateBy []string) error {
|
|
|
if as.IsEmpty() {
|
|
|
as.Lock()
|
|
|
as.assets = map[string]Asset{}
|
|
|
@@ -2598,19 +2613,14 @@ func (as *AssetSet) Set(asset Asset, props []AssetProperty) {
|
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
|
|
|
|
- // Compute raw-string version of props for use with key()
|
|
|
- aggStrings := []string{}
|
|
|
- if props == nil {
|
|
|
- aggStrings = nil
|
|
|
- } else {
|
|
|
- for _, prop := range props {
|
|
|
- aggStrings = append(aggStrings, string(prop))
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// Expand the window to match the AssetSet, then set it
|
|
|
asset.ExpandWindow(as.Window)
|
|
|
- as.assets[key(asset, aggStrings)] = asset
|
|
|
+ k, keyErrors := key(asset, aggregateBy)
|
|
|
+ if keyErrors != nil {
|
|
|
+ return fmt.Errorf("Attempted to Set() Asset into AssetSet with invalid aggregateBy: %v", keyErrors)
|
|
|
+ }
|
|
|
+ as.assets[k] = asset
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
func (as *AssetSet) Start() time.Time {
|
|
|
@@ -2645,17 +2655,10 @@ func (as *AssetSet) accumulate(that *AssetSet) (*AssetSet, error) {
|
|
|
}
|
|
|
|
|
|
// In the case of an AssetSetRange with empty entries, we may end up with
|
|
|
- // an incoming as without props, even though we are trying to aggregate
|
|
|
- // by props. This handles that case, assigning the correct props.
|
|
|
- if !propsEqual(as.props, that.props) {
|
|
|
- if len(as.props) == 0 {
|
|
|
- as.props = that.props
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Same as above, but for aggStrings
|
|
|
- if len(as.aggStrings) == 0 {
|
|
|
- as.aggStrings = that.aggStrings
|
|
|
+ // an incoming `as` without an `aggregateBy`, even though we are tring to
|
|
|
+ // aggregate here. This handles that case by assigning the correct `aggregateBy`.
|
|
|
+ if len(as.aggregateBy) == 0 {
|
|
|
+ as.aggregateBy = that.aggregateBy
|
|
|
}
|
|
|
|
|
|
// Set start, end to min(start), max(end)
|
|
|
@@ -2673,8 +2676,7 @@ func (as *AssetSet) accumulate(that *AssetSet) (*AssetSet, error) {
|
|
|
}
|
|
|
|
|
|
acc := NewAssetSet(start, end)
|
|
|
- acc.aggStrings = as.aggStrings
|
|
|
- acc.props = as.props
|
|
|
+ acc.aggregateBy = as.aggregateBy
|
|
|
|
|
|
as.RLock()
|
|
|
defer as.RUnlock()
|
|
|
@@ -2734,14 +2736,14 @@ type AssetAggregationOptions struct {
|
|
|
FilterFuncs []AssetMatchFunc
|
|
|
}
|
|
|
|
|
|
-func (asr *AssetSetRange) AggregateBy(aggStrings []string, opts *AssetAggregationOptions) error {
|
|
|
+func (asr *AssetSetRange) AggregateBy(aggregateBy []string, opts *AssetAggregationOptions) error {
|
|
|
aggRange := &AssetSetRange{assets: []*AssetSet{}}
|
|
|
|
|
|
asr.Lock()
|
|
|
defer asr.Unlock()
|
|
|
|
|
|
for _, as := range asr.assets {
|
|
|
- err := as.AggregateBy(aggStrings, opts)
|
|
|
+ err := as.AggregateBy(aggregateBy, opts)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|