Răsfoiți Sursa

Merge branch 'master' into bolt/reserved-instances

Matt Bolt 6 ani în urmă
părinte
comite
d57462adbe
3 a modificat fișierele cu 56 adăugiri și 59 ștergeri
  1. 17 15
      cloud/awsprovider.go
  2. 14 44
      costmodel/router.go
  3. 25 0
      util/semaphore.go

+ 17 - 15
cloud/awsprovider.go

@@ -707,7 +707,6 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			} else {
 			} else {
 				klog.V(2).Infof("Spot data for node %s is missing", k.ID())
 				klog.V(2).Infof("Spot data for node %s is missing", k.ID())
 			}
 			}
-			klog.V(1).Infof("SPOT COST FOR %s: %s", k.Features, spotcost)
 			return &Node{
 			return &Node{
 				Cost:         spotcost,
 				Cost:         spotcost,
 				VCPU:         terms.VCpu,
 				VCPU:         terms.VCpu,
@@ -1100,20 +1099,23 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string) (
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
+		if len(op.ResultSet.Rows) > 1 {
+			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
 
 
-		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
-
-			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
-			if err != nil {
-				return nil, err
-			}
-			ooc := &OutOfClusterAllocation{
-				Aggregator:  aggregator,
-				Environment: *r.Data[1].VarCharValue,
-				Service:     *r.Data[2].VarCharValue,
-				Cost:        cost,
+				cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+				if err != nil {
+					return nil, err
+				}
+				ooc := &OutOfClusterAllocation{
+					Aggregator:  aggregator,
+					Environment: *r.Data[1].VarCharValue,
+					Service:     *r.Data[2].VarCharValue,
+					Cost:        cost,
+				}
+				oocAllocs = append(oocAllocs, ooc)
 			}
 			}
-			oocAllocs = append(oocAllocs, ooc)
+		} else {
+			klog.V(1).Infof("No results available for %s at database %s between %s and %s", aggregator_column_name, customPricing.AthenaTable, start, end)
 		}
 		}
 	}
 	}
 
 
@@ -1367,7 +1369,7 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 				}
 				}
 				if len(foundVersion) == 0 {
 				if len(foundVersion) == 0 {
 					spotFeedVersion := rec[0]
 					spotFeedVersion := rec[0]
-					klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
+					klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
 					matches := versionRx.FindStringSubmatch(spotFeedVersion)
 					matches := versionRx.FindStringSubmatch(spotFeedVersion)
 					if matches != nil {
 					if matches != nil {
 						foundVersion = matches[1]
 						foundVersion = matches[1]
@@ -1388,7 +1390,7 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 				continue
 				continue
 			}
 			}
 
 
-			klog.V(3).Infof("Found spot info %+v", spot)
+			klog.V(4).Infof("Found spot info %+v", spot)
 			spots[spot.InstanceID] = &spot
 			spots[spot.InstanceID] = &spot
 		}
 		}
 		gr.Close()
 		gr.Close()

+ 14 - 44
costmodel/router.go

@@ -63,9 +63,7 @@ type Accesses struct {
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *CostModel
 	Model                         *CostModel
-	CostDataCache                 *cache.Cache
 	OutOfClusterCache             *cache.Cache
 	OutOfClusterCache             *cache.Cache
-	SettingsCache                 *cache.Cache
 }
 }
 
 
 type DataEnvelope struct {
 type DataEnvelope struct {
@@ -75,14 +73,24 @@ type DataEnvelope struct {
 	Message string      `json:"message,omitempty"`
 	Message string      `json:"message,omitempty"`
 }
 }
 
 
-// FilterCostData allows through only CostData that matches the given filters for namespace and clusterId
-func FilterCostData(data map[string]*CostData, namespace, clusterId string) map[string]*CostData {
+// FilterFunc is a filter that returns true iff the given CostData should be filtered out
+type FilterFunc func(*CostData) bool
+
+// FilterCostData allows through only CostData that matches all the given filter functions
+func FilterCostData(data map[string]*CostData, filters ...FilterFunc) map[string]*CostData {
 	result := make(map[string]*CostData)
 	result := make(map[string]*CostData)
+
+DataLoop:
 	for key, datum := range data {
 	for key, datum := range data {
-		if costDataPassesFilters(datum, namespace, clusterId) {
-			result[key] = datum
+		for _, ff := range filters {
+			if !ff(datum) {
+				// if any filter function check fails, move on to the next datum
+				continue DataLoop
+			}
 		}
 		}
+		result[key] = datum
 	}
 	}
+
 	return result
 	return result
 }
 }
 
 
@@ -312,41 +320,6 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 	w.Write(WrapData(data, err))
 	w.Write(WrapData(data, err))
 }
 }
 
 
-func (a *Accesses) CustomPricingHasChanged() bool {
-	customPricing, err := a.Cloud.GetConfig()
-	if err != nil || customPricing == nil {
-		klog.Errorf("error accessing cloud provider configuration: %s", err)
-		return false
-	}
-
-	// describe parameters by which we determine whether or not custom
-	// pricing settings have changed
-	encodeCustomPricing := func(cp *costAnalyzerCloud.CustomPricing) string {
-		return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%s", cp.CustomPricesEnabled, cp.CPU, cp.SpotCPU,
-			cp.RAM, cp.SpotRAM, cp.GPU, cp.Storage, cp.CurrencyCode)
-	}
-
-	// compare cached custom pricing parameters with current values
-	cpStr := encodeCustomPricing(customPricing)
-	cpStrCached := ""
-	val, found := a.SettingsCache.Get("customPricing")
-	if found {
-		ok := false
-		cpStrCached, ok = val.(string)
-		if !ok {
-			klog.Errorf("caching error: failed to cast custom pricing to string")
-		}
-	}
-	if cpStr == cpStrCached {
-		return false
-	}
-
-	// cache new custom pricing settings
-	a.SettingsCache.Set("customPricing", cpStr, cache.DefaultExpiration)
-
-	return true
-}
-
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -911,9 +884,7 @@ func Initialize() {
 	})
 	})
 
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
-	costDataCache := cache.New(time.Minute*5, time.Minute*10)
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
-	settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
 
 
 	A = Accesses{
 	A = Accesses{
 		PrometheusClient:              promCli,
 		PrometheusClient:              promCli,
@@ -935,7 +906,6 @@ func Initialize() {
 		Model:                         NewCostModel(k8sCache),
 		Model:                         NewCostModel(k8sCache),
 		CostDataCache:                 costDataCache,
 		CostDataCache:                 costDataCache,
 		OutOfClusterCache:             outOfClusterCache,
 		OutOfClusterCache:             outOfClusterCache,
-		SettingsCache:                 settingsCache,
 	}
 	}
 
 
 	remoteEnabled := os.Getenv(remoteEnabled)
 	remoteEnabled := os.Getenv(remoteEnabled)

+ 25 - 0
util/semaphore.go

@@ -0,0 +1,25 @@
+package util
+
+// Semaphore implements a non-weighted semaphore for restricting
+// concurrent access to a limited number of processes.
+type Semaphore struct {
+	s chan bool
+}
+
+// Acquire blocks until access can be granted to the caller
+func (s *Semaphore) Acquire() {
+	s.s <- true
+}
+
+// Return releases access from the caller, opening it for acquisition
+func (s *Semaphore) Return() {
+	<-s.s
+}
+
+// NewSemaphore creates a new Semaphore that allows max number of
+// concurrent access
+func NewSemaphore(max int) *Semaphore {
+	return &Semaphore{
+		s: make(chan bool, max),
+	}
+}