|
|
@@ -4,6 +4,7 @@ import (
|
|
|
"fmt"
|
|
|
"net"
|
|
|
"strconv"
|
|
|
+ "strings"
|
|
|
"time"
|
|
|
|
|
|
"github.com/opencost/opencost/pkg/cloud/provider"
|
|
|
@@ -42,7 +43,16 @@ const (
|
|
|
queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
|
|
|
)
|
|
|
|
|
|
-const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
|
|
|
+const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
|
|
|
+
|
|
|
+// When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
|
|
|
+// provisioned by sig-storage-local-static-provisioner is excluded
|
|
|
+// by checking if the volume is prefixed by "local-pv-".
|
|
|
+//
|
|
|
+// This is based on the sig-storage-local-static-provisioner implementation,
|
|
|
+// which creates all PVs with the "local-pv-" prefix. For reference, see:
|
|
|
+// https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
|
|
|
+const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
|
|
|
|
|
|
// Costs represents cumulative and monthly cluster costs over a given duration. Costs
|
|
|
// are broken down by cores, memory, and storage.
|
|
|
@@ -142,7 +152,7 @@ type DiskIdentifier struct {
|
|
|
Name string
|
|
|
}
|
|
|
|
|
|
-func ClusterDisks(client prometheus.Client, provider models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
|
|
|
+func ClusterDisks(client prometheus.Client, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
|
|
|
// Start from the time "end", querying backwards
|
|
|
t := end
|
|
|
|
|
|
@@ -209,11 +219,15 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
|
|
|
costPerGBHr := 0.04 / 730.0
|
|
|
|
|
|
- queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
|
|
|
- queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
|
|
|
- queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
|
|
|
- queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
|
|
|
- queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
|
|
|
+ // container_fs metrics contains metrics for disks that are not local storage of the node. While not perfect to
|
|
|
+ // attempt to identify the correct device which is being used as local storage we first filter for devices mounted
|
|
|
+ // at paths `/dev/nvme.*` or `/dev/sda.*`. There still may be multiple devices mounted at paths matching the regex
|
|
|
+ // so later on we will select the device with the highest `container_fs_limit_bytes` per instance to create a local disk asset
|
|
|
+ queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
|
|
|
+ queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
|
|
|
+ queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
|
|
|
+ queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
|
|
|
+ queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
|
|
|
queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
|
|
|
|
|
|
resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
|
|
|
@@ -273,9 +287,18 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
diskMap[key].ClaimNamespace = claimNamespace
|
|
|
}
|
|
|
|
|
|
- pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider, opencost.NewClosedWindow(start, end))
|
|
|
+ pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
|
|
|
|
|
|
- for _, result := range resLocalStorageCost {
|
|
|
+ type localStorage struct {
|
|
|
+ device string
|
|
|
+ disk *Disk
|
|
|
+ }
|
|
|
+
|
|
|
+ localStorageDisks := map[DiskIdentifier]localStorage{}
|
|
|
+
|
|
|
+ // Start with local storage bytes so that the device with the largest size which has passed the
|
|
|
+ // query filters can be determined
|
|
|
+ for _, result := range resLocalStorageBytes {
|
|
|
cluster, err := result.GetString(env.GetPromClusterLabel())
|
|
|
if err != nil {
|
|
|
cluster = env.GetClusterID()
|
|
|
@@ -287,23 +310,37 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- cost := result.Values[0].Value
|
|
|
+ device, err := result.GetString("device")
|
|
|
+ if err != nil {
|
|
|
+ log.Warnf("ClusterDisks: local storage data missing device")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ bytes := result.Values[0].Value
|
|
|
+ // Ignore disks that are larger than the max size
|
|
|
+ if bytes > MAX_LOCAL_STORAGE_SIZE {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
key := DiskIdentifier{cluster, name}
|
|
|
- if _, ok := diskMap[key]; !ok {
|
|
|
- diskMap[key] = &Disk{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- Breakdown: &ClusterCostsBreakdown{},
|
|
|
- Local: true,
|
|
|
+
|
|
|
+ // only keep the device with the most bytes per instance
|
|
|
+ if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
|
|
|
+ localStorageDisks[key] = localStorage{
|
|
|
+ device: device,
|
|
|
+ disk: &Disk{
|
|
|
+ Cluster: cluster,
|
|
|
+ Name: name,
|
|
|
+ Breakdown: &ClusterCostsBreakdown{},
|
|
|
+ Local: true,
|
|
|
+ StorageClass: opencost.LocalStorageClass,
|
|
|
+ Bytes: bytes,
|
|
|
+ },
|
|
|
}
|
|
|
}
|
|
|
- diskMap[key].Cost += cost
|
|
|
-
|
|
|
- //Assigning explicitly the storage class of local storage to local
|
|
|
- diskMap[key].StorageClass = opencost.LocalStorageClass
|
|
|
}
|
|
|
|
|
|
- for _, result := range resLocalStorageUsedCost {
|
|
|
+ for _, result := range resLocalStorageCost {
|
|
|
cluster, err := result.GetString(env.GetPromClusterLabel())
|
|
|
if err != nil {
|
|
|
cluster = env.GetClusterID()
|
|
|
@@ -311,24 +348,27 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
|
|
|
name, err := result.GetString("instance")
|
|
|
if err != nil {
|
|
|
- log.Warnf("ClusterDisks: local storage usage data missing instance")
|
|
|
+ log.Warnf("ClusterDisks: local storage data missing instance")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ device, err := result.GetString("device")
|
|
|
+ if err != nil {
|
|
|
+ log.Warnf("ClusterDisks: local storage data missing device")
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
cost := result.Values[0].Value
|
|
|
key := DiskIdentifier{cluster, name}
|
|
|
- if _, ok := diskMap[key]; !ok {
|
|
|
- diskMap[key] = &Disk{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- Breakdown: &ClusterCostsBreakdown{},
|
|
|
- Local: true,
|
|
|
- }
|
|
|
+ ls, ok := localStorageDisks[key]
|
|
|
+ if !ok || ls.device != device {
|
|
|
+ continue
|
|
|
}
|
|
|
- diskMap[key].Breakdown.System = cost / diskMap[key].Cost
|
|
|
+ ls.disk.Cost = cost
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- for _, result := range resLocalStorageUsedAvg {
|
|
|
+ for _, result := range resLocalStorageUsedCost {
|
|
|
cluster, err := result.GetString(env.GetPromClusterLabel())
|
|
|
if err != nil {
|
|
|
cluster = env.GetClusterID()
|
|
|
@@ -336,24 +376,26 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
|
|
|
name, err := result.GetString("instance")
|
|
|
if err != nil {
|
|
|
- log.Warnf("ClusterDisks: local storage data missing instance")
|
|
|
+ log.Warnf("ClusterDisks: local storage usage data missing instance")
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- bytesAvg := result.Values[0].Value
|
|
|
+ device, err := result.GetString("device")
|
|
|
+ if err != nil {
|
|
|
+ log.Warnf("ClusterDisks: local storage data missing device")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ cost := result.Values[0].Value
|
|
|
key := DiskIdentifier{cluster, name}
|
|
|
- if _, ok := diskMap[key]; !ok {
|
|
|
- diskMap[key] = &Disk{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- Breakdown: &ClusterCostsBreakdown{},
|
|
|
- Local: true,
|
|
|
- }
|
|
|
+ ls, ok := localStorageDisks[key]
|
|
|
+ if !ok || ls.device != device {
|
|
|
+ continue
|
|
|
}
|
|
|
- diskMap[key].BytesUsedAvgPtr = &bytesAvg
|
|
|
+ ls.disk.Breakdown.System = cost / ls.disk.Cost
|
|
|
}
|
|
|
|
|
|
- for _, result := range resLocalStorageUsedMax {
|
|
|
+ for _, result := range resLocalStorageUsedAvg {
|
|
|
cluster, err := result.GetString(env.GetPromClusterLabel())
|
|
|
if err != nil {
|
|
|
cluster = env.GetClusterID()
|
|
|
@@ -365,20 +407,22 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- bytesMax := result.Values[0].Value
|
|
|
+ device, err := result.GetString("device")
|
|
|
+ if err != nil {
|
|
|
+ log.Warnf("ClusterDisks: local storage data missing device")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ bytesAvg := result.Values[0].Value
|
|
|
key := DiskIdentifier{cluster, name}
|
|
|
- if _, ok := diskMap[key]; !ok {
|
|
|
- diskMap[key] = &Disk{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- Breakdown: &ClusterCostsBreakdown{},
|
|
|
- Local: true,
|
|
|
- }
|
|
|
+ ls, ok := localStorageDisks[key]
|
|
|
+ if !ok || ls.device != device {
|
|
|
+ continue
|
|
|
}
|
|
|
- diskMap[key].BytesUsedMaxPtr = &bytesMax
|
|
|
+ ls.disk.BytesUsedAvgPtr = &bytesAvg
|
|
|
}
|
|
|
|
|
|
- for _, result := range resLocalStorageBytes {
|
|
|
+ for _, result := range resLocalStorageUsedMax {
|
|
|
cluster, err := result.GetString(env.GetPromClusterLabel())
|
|
|
if err != nil {
|
|
|
cluster = env.GetClusterID()
|
|
|
@@ -390,21 +434,19 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- bytes := result.Values[0].Value
|
|
|
- key := DiskIdentifier{cluster, name}
|
|
|
- if _, ok := diskMap[key]; !ok {
|
|
|
- diskMap[key] = &Disk{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- Breakdown: &ClusterCostsBreakdown{},
|
|
|
- Local: true,
|
|
|
- }
|
|
|
+ device, err := result.GetString("device")
|
|
|
+ if err != nil {
|
|
|
+ log.Warnf("ClusterDisks: local storage data missing device")
|
|
|
+ continue
|
|
|
}
|
|
|
- diskMap[key].Bytes = bytes
|
|
|
- if bytes/1024/1024/1024 > maxLocalDiskSize {
|
|
|
- log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
|
|
|
- delete(diskMap, key)
|
|
|
+
|
|
|
+ bytesMax := result.Values[0].Value
|
|
|
+ key := DiskIdentifier{cluster, name}
|
|
|
+ ls, ok := localStorageDisks[key]
|
|
|
+ if !ok || ls.device != device {
|
|
|
+ continue
|
|
|
}
|
|
|
+ ls.disk.BytesUsedMaxPtr = &bytesMax
|
|
|
}
|
|
|
|
|
|
for _, result := range resLocalActiveMins {
|
|
|
@@ -419,12 +461,20 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ providerID, err := result.GetString("provider_id")
|
|
|
+ if err != nil {
|
|
|
+ log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
key := DiskIdentifier{cluster, name}
|
|
|
- if _, ok := diskMap[key]; !ok {
|
|
|
- log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
|
|
|
+ ls, ok := localStorageDisks[key]
|
|
|
+ if !ok {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
|
|
|
+
|
|
|
if len(result.Values) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
@@ -435,9 +485,14 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
|
|
|
// TODO niko/assets if mins >= threshold, interpolate for missing data?
|
|
|
|
|
|
- diskMap[key].End = e
|
|
|
- diskMap[key].Start = s
|
|
|
- diskMap[key].Minutes = mins
|
|
|
+ ls.disk.End = e
|
|
|
+ ls.disk.Start = s
|
|
|
+ ls.disk.Minutes = mins
|
|
|
+ }
|
|
|
+
|
|
|
+ // move local storage disks to main disk map
|
|
|
+ for key, ls := range localStorageDisks {
|
|
|
+ diskMap[key] = ls.disk
|
|
|
}
|
|
|
|
|
|
var unTracedDiskLogData []DiskIdentifier
|
|
|
@@ -487,6 +542,10 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if !env.GetAssetIncludeLocalDiskCost() {
|
|
|
+ return filterOutLocalPVs(diskMap), nil
|
|
|
+ }
|
|
|
+
|
|
|
return diskMap, nil
|
|
|
}
|
|
|
|
|
|
@@ -1503,11 +1562,13 @@ func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActi
|
|
|
log.Debugf("ClusterDisks: pv claim data missing volumename")
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
|
|
|
if err != nil {
|
|
|
log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
thatClaimNamespace, err := thatRes.GetString("namespace")
|
|
|
if err != nil {
|
|
|
log.Debugf("ClusterDisks: pv claim data missing namespace")
|
|
|
@@ -1544,6 +1605,7 @@ func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActi
|
|
|
log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
claimNamespace, err := result.GetString("namespace")
|
|
|
if err != nil {
|
|
|
log.Debugf("ClusterDisks: pv usage data missing namespace")
|
|
|
@@ -1564,11 +1626,13 @@ func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActi
|
|
|
log.Debugf("ClusterDisks: pv claim data missing volumename")
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
|
|
|
if err != nil {
|
|
|
log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
thatClaimNamespace, err := thatRes.GetString("namespace")
|
|
|
if err != nil {
|
|
|
log.Debugf("ClusterDisks: pv claim data missing namespace")
|
|
|
@@ -1594,3 +1658,22 @@ func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActi
|
|
|
diskMap[key].BytesUsedMaxPtr = &usage
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
|
|
|
+// Local PVs are identified by the prefix "local-pv-" in their names, which is the
|
|
|
+// convention used by sig-storage-local-static-provisioner.
|
|
|
+//
|
|
|
+// Parameters:
|
|
|
+// - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
|
|
|
+//
|
|
|
+// Returns:
|
|
|
+// - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
|
|
|
+func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
|
|
|
+ nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
|
|
|
+ for key, val := range diskMap {
|
|
|
+ if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
|
|
|
+ nonLocalPVDiskMap[key] = val
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nonLocalPVDiskMap
|
|
|
+}
|