|
|
@@ -5,7 +5,6 @@ import (
|
|
|
"slices"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
- "sync"
|
|
|
|
|
|
"github.com/kubecost/events"
|
|
|
"github.com/opencost/opencost/core/pkg/clustercache"
|
|
|
@@ -23,34 +22,6 @@ import (
|
|
|
"k8s.io/apimachinery/pkg/util/validation"
|
|
|
)
|
|
|
|
|
|
-// SyncMap provides thread-safe concurrent access to a generic map
|
|
|
-type SyncMap[U comparable, T any] struct {
|
|
|
- mu sync.RWMutex
|
|
|
- data map[U]T
|
|
|
-}
|
|
|
-
|
|
|
-// newSyncMap creates a new thread-safe map with the specified initial capacity
|
|
|
-func newSyncMap[U comparable, T any](size int) *SyncMap[U, T] {
|
|
|
- return &SyncMap[U, T]{
|
|
|
- data: make(map[U]T, size),
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// Set adds or updates a key-value mapping
|
|
|
-func (sm *SyncMap[U, T]) Set(key U, value T) {
|
|
|
- sm.mu.Lock()
|
|
|
- defer sm.mu.Unlock()
|
|
|
- sm.data[key] = value
|
|
|
-}
|
|
|
-
|
|
|
-// Get retrieves a value by key. Returns the value and a boolean indicating if it was found.
|
|
|
-func (sm *SyncMap[U, T]) Get(key U) (T, bool) {
|
|
|
- sm.mu.RLock()
|
|
|
- defer sm.mu.RUnlock()
|
|
|
- value, ok := sm.data[key]
|
|
|
- return value, ok
|
|
|
-}
|
|
|
-
|
|
|
const unmountedPVsContainer = "unmounted-pvs"
|
|
|
|
|
|
type ClusterCacheScraper struct {
|
|
|
@@ -86,24 +57,25 @@ func (ccs *ClusterCacheScraper) Scrape() []metric.Update {
|
|
|
|
|
|
// create scrape indexes. While the pairs being mapped here don't have a 1 to 1 relationship in the general case,
|
|
|
// we are assuming that in the context of a single snapshot of the cluster they are 1 to 1.
|
|
|
- nodeNameToUID := newSyncMap[string, types.UID](len(nodes))
|
|
|
+ nodeNameToUID := make(map[string]types.UID, len(nodes))
|
|
|
for _, node := range nodes {
|
|
|
- nodeNameToUID.Set(node.Name, node.UID)
|
|
|
+ nodeNameToUID[node.Name] = node.UID
|
|
|
}
|
|
|
- namespaceNameToUID := newSyncMap[string, types.UID](len(namespaces))
|
|
|
+ namespaceNameToUID := make(map[string]types.UID, len(namespaces))
|
|
|
for _, ns := range namespaces {
|
|
|
- namespaceNameToUID.Set(ns.Name, ns.UID)
|
|
|
+ namespaceNameToUID[ns.Name] = ns.UID
|
|
|
}
|
|
|
- pvcNameToUID := newSyncMap[pvcKey, types.UID](len(pvcs))
|
|
|
+ pvcNameToUID := make(map[pvcKey]types.UID, len(pvcs))
|
|
|
for _, pvc := range pvcs {
|
|
|
- pvcNameToUID.Set(pvcKey{
|
|
|
+ key := pvcKey{
|
|
|
name: pvc.Name,
|
|
|
namespace: pvc.Namespace,
|
|
|
- }, pvc.UID)
|
|
|
+ }
|
|
|
+ pvcNameToUID[key] = pvc.UID
|
|
|
}
|
|
|
- pvNameToUID := newSyncMap[string, types.UID](len(pvs))
|
|
|
+ pvNameToUID := make(map[string]types.UID, len(pvs))
|
|
|
for _, pv := range pvs {
|
|
|
- pvNameToUID.Set(pv.Name, pv.UID)
|
|
|
+ pvNameToUID[pv.Name] = pv.UID
|
|
|
}
|
|
|
|
|
|
scrapeFuncs := []ScrapeFunc{
|
|
|
@@ -229,16 +201,16 @@ func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeDeployments(deployments []*clustercache.Deployment, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeDeployments(deployments []*clustercache.Deployment, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeDeployments(deployments, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, deployment := range deployments {
|
|
|
- nsUID, ok := namespaceIndex.Get(deployment.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[deployment.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("deployment namespaceUID missing from index for namespace name '%s'", deployment.Namespace)
|
|
|
}
|
|
|
@@ -355,9 +327,9 @@ func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Name
|
|
|
func (ccs *ClusterCacheScraper) GetScrapePods(
|
|
|
pods []*clustercache.Pod,
|
|
|
pvcs []*clustercache.PersistentVolumeClaim,
|
|
|
- nodeIndex *SyncMap[string, types.UID],
|
|
|
- namespaceIndex *SyncMap[string, types.UID],
|
|
|
- pvcIndex *SyncMap[pvcKey, types.UID],
|
|
|
+ nodeIndex map[string]types.UID,
|
|
|
+ namespaceIndex map[string]types.UID,
|
|
|
+ pvcIndex map[pvcKey]types.UID,
|
|
|
) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapePods(pods, pvcs, nodeIndex, namespaceIndex, pvcIndex)
|
|
|
@@ -367,9 +339,9 @@ func (ccs *ClusterCacheScraper) GetScrapePods(
|
|
|
func (ccs *ClusterCacheScraper) scrapePods(
|
|
|
pods []*clustercache.Pod,
|
|
|
pvcs []*clustercache.PersistentVolumeClaim,
|
|
|
- nodeIndex *SyncMap[string, types.UID],
|
|
|
- namespaceIndex *SyncMap[string, types.UID],
|
|
|
- pvcIndex *SyncMap[pvcKey, types.UID],
|
|
|
+ nodeIndex map[string]types.UID,
|
|
|
+ namespaceIndex map[string]types.UID,
|
|
|
+ pvcIndex map[pvcKey]types.UID,
|
|
|
) []metric.Update {
|
|
|
// this is only populated if we find gpu resources being requested
|
|
|
var nodesGpuInfo map[string]*NodeGpuInfo
|
|
|
@@ -382,11 +354,11 @@ func (ccs *ClusterCacheScraper) scrapePods(
|
|
|
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, pod := range pods {
|
|
|
- nodeUID, ok := nodeIndex.Get(pod.Spec.NodeName)
|
|
|
+ nodeUID, ok := nodeIndex[pod.Spec.NodeName]
|
|
|
if !ok {
|
|
|
log.Debugf("pod nodeUID missing from index for node name '%s'", pod.Spec.NodeName)
|
|
|
}
|
|
|
- nsUID, ok := namespaceIndex.Get(pod.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[pod.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("pod namespaceUID missing from index for namespace name '%s'", pod.Namespace)
|
|
|
}
|
|
|
@@ -480,10 +452,13 @@ func (ccs *ClusterCacheScraper) scrapePods(
|
|
|
|
|
|
for _, volume := range pod.Spec.Volumes {
|
|
|
if volume.PersistentVolumeClaim != nil {
|
|
|
- pvcUID, _ := pvcIndex.Get(pvcKey{
|
|
|
+ pvcUID, ok := pvcIndex[pvcKey{
|
|
|
name: volume.PersistentVolumeClaim.ClaimName,
|
|
|
namespace: pod.Namespace,
|
|
|
- })
|
|
|
+ }]
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
podPVCVolumeInfo := map[string]string{
|
|
|
source.UIDLabel: string(pod.UID),
|
|
|
source.PVCUIDLabel: string(pvcUID),
|
|
|
@@ -645,8 +620,8 @@ func scrapeResourceList(metricName string, resourceList v1.ResourceList, baseLab
|
|
|
|
|
|
func (ccs *ClusterCacheScraper) GetScrapePVCs(
|
|
|
pvcs []*clustercache.PersistentVolumeClaim,
|
|
|
- namespaceIndex *SyncMap[string, types.UID],
|
|
|
- pvIndex *SyncMap[string, types.UID],
|
|
|
+ namespaceIndex map[string]types.UID,
|
|
|
+ pvIndex map[string]types.UID,
|
|
|
) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapePVCs(pvcs, namespaceIndex, pvIndex)
|
|
|
@@ -655,16 +630,16 @@ func (ccs *ClusterCacheScraper) GetScrapePVCs(
|
|
|
|
|
|
func (ccs *ClusterCacheScraper) scrapePVCs(
|
|
|
pvcs []*clustercache.PersistentVolumeClaim,
|
|
|
- namespaceIndex *SyncMap[string, types.UID],
|
|
|
- pvIndex *SyncMap[string, types.UID],
|
|
|
+ namespaceIndex map[string]types.UID,
|
|
|
+ pvIndex map[string]types.UID,
|
|
|
) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, pvc := range pvcs {
|
|
|
- nsUID, ok := namespaceIndex.Get(pvc.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[pvc.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("pvc namespaceUID missing from index for namespace name '%s'", pvc.Namespace)
|
|
|
}
|
|
|
- pvUID, ok := pvIndex.Get(pvc.Spec.VolumeName)
|
|
|
+ pvUID, ok := pvIndex[pvc.Spec.VolumeName]
|
|
|
if !ok && pvc.Spec.VolumeName != "" {
|
|
|
log.Debugf("pvc volume name missing from index for pv name '%s'", pvc.Spec.VolumeName)
|
|
|
}
|
|
|
@@ -799,16 +774,16 @@ func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service)
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeStatefulSets(statefulSets []*clustercache.StatefulSet, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeStatefulSets(statefulSets []*clustercache.StatefulSet, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeStatefulSets(statefulSets, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, statefulSet := range statefulSets {
|
|
|
- nsUID, ok := namespaceIndex.Get(statefulSet.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[statefulSet.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("statefulSet namespaceUID missing from index for namespace name '%s'", statefulSet.Namespace)
|
|
|
}
|
|
|
@@ -871,16 +846,16 @@ func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeDaemonSets(daemonSets []*clustercache.DaemonSet, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeDaemonSets(daemonSets []*clustercache.DaemonSet, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeDaemonSets(daemonSets, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeDaemonSets(daemonSets []*clustercache.DaemonSet, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeDaemonSets(daemonSets []*clustercache.DaemonSet, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, daemonSet := range daemonSets {
|
|
|
- nsUID, ok := namespaceIndex.Get(daemonSet.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[daemonSet.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("daemonSet namespaceUID missing from index for namespace name '%s'", daemonSet.Namespace)
|
|
|
}
|
|
|
@@ -931,16 +906,16 @@ func (ccs *ClusterCacheScraper) scrapeDaemonSets(daemonSets []*clustercache.Daem
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeJobs(jobs []*clustercache.Job, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeJobs(jobs []*clustercache.Job, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeJobs(jobs, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeJobs(jobs []*clustercache.Job, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeJobs(jobs []*clustercache.Job, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, job := range jobs {
|
|
|
- nsUID, ok := namespaceIndex.Get(job.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[job.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("job namespaceUID missing from index for namespace name '%s'", job.Namespace)
|
|
|
}
|
|
|
@@ -991,16 +966,16 @@ func (ccs *ClusterCacheScraper) scrapeJobs(jobs []*clustercache.Job, namespaceIn
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeCronJobs(cronJobs []*clustercache.CronJob, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeCronJobs(cronJobs []*clustercache.CronJob, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeCronJobs(cronJobs, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeCronJobs(cronJobs []*clustercache.CronJob, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeCronJobs(cronJobs []*clustercache.CronJob, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, cronJob := range cronJobs {
|
|
|
- nsUID, ok := namespaceIndex.Get(cronJob.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[cronJob.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("cronjob namespaceUID missing from index for namespace name '%s'", cronJob.Namespace)
|
|
|
}
|
|
|
@@ -1051,16 +1026,16 @@ func (ccs *ClusterCacheScraper) scrapeCronJobs(cronJobs []*clustercache.CronJob,
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeReplicaSets(replicaSets []*clustercache.ReplicaSet, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeReplicaSets(replicaSets []*clustercache.ReplicaSet, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeReplicaSets(replicaSets, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
for _, replicaSet := range replicaSets {
|
|
|
- nsUID, ok := namespaceIndex.Get(replicaSet.Namespace)
|
|
|
+ nsUID, ok := namespaceIndex[replicaSet.Namespace]
|
|
|
if !ok {
|
|
|
log.Debugf("replicaset namespaceUID missing from index for namespace name '%s'", replicaSet.Namespace)
|
|
|
}
|
|
|
@@ -1148,13 +1123,13 @@ func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.Re
|
|
|
return scrapeResults
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) GetScrapeResourceQuotas(resourceQuotas []*clustercache.ResourceQuota, namespaceIndex *SyncMap[string, types.UID]) ScrapeFunc {
|
|
|
+func (ccs *ClusterCacheScraper) GetScrapeResourceQuotas(resourceQuotas []*clustercache.ResourceQuota, namespaceIndex map[string]types.UID) ScrapeFunc {
|
|
|
return func() []metric.Update {
|
|
|
return ccs.scrapeResourceQuotas(resourceQuotas, namespaceIndex)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ccs *ClusterCacheScraper) scrapeResourceQuotas(resourceQuotas []*clustercache.ResourceQuota, namespaceIndex *SyncMap[string, types.UID]) []metric.Update {
|
|
|
+func (ccs *ClusterCacheScraper) scrapeResourceQuotas(resourceQuotas []*clustercache.ResourceQuota, namespaceIndex map[string]types.UID) []metric.Update {
|
|
|
var scrapeResults []metric.Update
|
|
|
|
|
|
processResource := func(baseLabels map[string]string, name v1.ResourceName, quantity resource.Quantity, metricName string) metric.Update {
|
|
|
@@ -1172,7 +1147,7 @@ func (ccs *ClusterCacheScraper) scrapeResourceQuotas(resourceQuotas []*clusterca
|
|
|
}
|
|
|
|
|
|
for _, resourceQuota := range resourceQuotas {
|
|
|
- nsUID, _ := namespaceIndex.Get(resourceQuota.Namespace)
|
|
|
+ nsUID, _ := namespaceIndex[resourceQuota.Namespace]
|
|
|
resourceQuotaInfo := map[string]string{
|
|
|
source.UIDLabel: string(resourceQuota.UID),
|
|
|
source.NamespaceUIDLabel: string(nsUID),
|