Преглед на файлове

update recommender signature to support multi-cluster and categories

Alexander Belanger преди 3 години
родител
ревизия
12c9f51669
променени са 5 файла, в които са добавени 202 реда и са изтрити 91 реда
  1. 3 0
      internal/opa/config.yaml
  2. 8 6
      internal/opa/loader.go
  3. 68 18
      internal/opa/opa.go
  4. 114 61
      workers/jobs/recommender.go
  5. 9 6
      workers/main.go

+ 3 - 0
internal/opa/config.yaml

@@ -10,6 +10,7 @@ nginx:
   match:
     name: nginx-ingress
     namespace: ingress-nginx
+  mustExist: true
   policies:
   - path: "./policies/nginx/nginx_version.rego"
     name: "nginx.version"
@@ -24,6 +25,7 @@ cert-manager:
   match:
     name: cert-manager
     namespace: cert-manager
+  mustExist: true
   policies:
   - path: "./policies/cert-manager/cert_manager_version.rego"
     name: "cert_manager.version"
@@ -38,6 +40,7 @@ prometheus:
   match:
     name: prometheus
     namespace: monitoring
+  mustExist: true
   policies:
   - path: "./policies/prometheus/server_memory_limits.rego"
     name: "prometheus.server_memory_limits"

+ 8 - 6
internal/opa/loader.go

@@ -13,9 +13,10 @@ import (
 type ConfigFile map[string]ConfigFilePolicyCollection
 
 type ConfigFilePolicyCollection struct {
-	Kind     string             `yaml:"kind"`
-	Match    MatchParameters    `yaml:"match"`
-	Policies []ConfigFilePolicy `yaml:"policies"`
+	Kind      string             `yaml:"kind"`
+	Match     MatchParameters    `yaml:"match"`
+	MustExist bool               `yaml:"mustExist"`
+	Policies  []ConfigFilePolicy `yaml:"policies"`
 }
 
 type ConfigFilePolicy struct {
@@ -66,9 +67,10 @@ func LoadPolicies(configFilePathDir string) (*KubernetesPolicies, error) {
 		}
 
 		policies[name] = KubernetesOPAQueryCollection{
-			Kind:    KubernetesBuiltInKind(cfPolicyCollection.Kind),
-			Queries: queries,
-			Match:   cfPolicyCollection.Match,
+			Kind:      KubernetesBuiltInKind(cfPolicyCollection.Kind),
+			Queries:   queries,
+			Match:     cfPolicyCollection.Match,
+			MustExist: cfPolicyCollection.MustExist,
 		}
 	}
 

+ 68 - 18
internal/opa/opa.go

@@ -39,9 +39,10 @@ const (
 )
 
 type KubernetesOPAQueryCollection struct {
-	Kind    KubernetesBuiltInKind
-	Match   MatchParameters
-	Queries []rego.PreparedEvalQuery
+	Kind      KubernetesBuiltInKind
+	Match     MatchParameters
+	MustExist bool
+	Queries   []rego.PreparedEvalQuery
 }
 
 type MatchParameters struct {
@@ -85,24 +86,49 @@ func NewRunner(policies *KubernetesPolicies, k8sAgent *kubernetes.Agent, dynamic
 	return &KubernetesOPARunner{policies, k8sAgent, dynamicClient}
 }
 
-func (runner *KubernetesOPARunner) GetRecommendationsByName(name string) ([]*OPARecommenderQueryResult, error) {
-	// look up to determine if the name is registered
-	queryCollection, exists := runner.Policies[name]
+func (runner *KubernetesOPARunner) GetRecommendations(categories []string) ([]*OPARecommenderQueryResult, error) {
+	collectionNames := categories
 
-	if !exists {
-		return nil, fmt.Errorf("No policies for %s found", name)
+	if len(categories) == 0 {
+		for catName, _ := range runner.Policies {
+			collectionNames = append(collectionNames, catName)
+		}
 	}
 
-	switch queryCollection.Kind {
-	case HelmRelease:
-		return runner.runHelmReleaseQueries(name, queryCollection)
-	case Pod:
-		return runner.runPodQueries(name, queryCollection)
-	case CRDList:
-		return runner.runCRDListQueries(name, queryCollection)
-	default:
-		return nil, fmt.Errorf("Not a supported query kind")
+	res := make([]*OPARecommenderQueryResult, 0)
+
+	for _, name := range collectionNames {
+		// look up to determine if the name is registered
+		queryCollection, exists := runner.Policies[name]
+
+		if !exists {
+			return nil, fmt.Errorf("No policies for %s found", name)
+		}
+
+		var currResults []*OPARecommenderQueryResult
+		var err error
+
+		switch queryCollection.Kind {
+		case HelmRelease:
+			currResults, err = runner.runHelmReleaseQueries(name, queryCollection)
+		case Pod:
+			currResults, err = runner.runPodQueries(name, queryCollection)
+		case CRDList:
+			currResults, err = runner.runCRDListQueries(name, queryCollection)
+		default:
+			fmt.Printf("%s is not a supported query kind", queryCollection.Kind)
+			continue
+		}
+
+		if err != nil {
+			fmt.Printf("%s", err.Error())
+			continue
+		}
+
+		res = append(res, currResults...)
 	}
+
+	return res, nil
 }
 
 func (runner *KubernetesOPARunner) SetK8sAgent(k8sAgent *kubernetes.Agent) {
@@ -125,7 +151,31 @@ func (runner *KubernetesOPARunner) runHelmReleaseQueries(name string, collection
 		helmRelease, err := helmAgent.GetRelease(collection.Match.Name, 0, false)
 
 		if err != nil {
-			return nil, err
+			if collection.MustExist && strings.Contains(err.Error(), "not found") {
+				return []*OPARecommenderQueryResult{
+					{
+						Allow:          false,
+						ObjectID:       fmt.Sprintf("helm_release/%s/%s/%s", collection.Match.Namespace, collection.Match.Name, "exists"),
+						CategoryName:   name,
+						PolicyVersion:  "v0.0.1",
+						PolicySeverity: "high",
+						PolicyTitle:    fmt.Sprintf("The helm release %s must exist", collection.Match.Name),
+						PolicyMessage:  "The helm release was not found on the cluster",
+					},
+				}, nil
+			} else {
+				return nil, err
+			}
+		} else if collection.MustExist {
+			res = append(res, &OPARecommenderQueryResult{
+				Allow:          true,
+				ObjectID:       fmt.Sprintf("helm_release/%s/%s/%s", collection.Match.Namespace, collection.Match.Name, "exists"),
+				CategoryName:   name,
+				PolicyVersion:  "v0.0.1",
+				PolicySeverity: "high",
+				PolicyTitle:    fmt.Sprintf("The helm release %s must exist", collection.Match.Name),
+				PolicyMessage:  "The helm release was found",
+			})
 		}
 
 		helmReleases = append(helmReleases, helmRelease)

+ 114 - 61
workers/jobs/recommender.go

@@ -40,12 +40,12 @@ type recommender struct {
 	db                   *gorm.DB
 	repo                 repository.Repository
 	doConf               *oauth2.Config
-	projectID, clusterID uint
-	collectionName       string
+	clusterAndProjectIDs []clusterAndProjectID
+	categories           []string
 	policies             *opa.KubernetesPolicies
 }
 
-// HelmRevisionsCountTrackerOpts holds the options required to run this job
+// RecommenderOpts holds the options required to run this job
 type RecommenderOpts struct {
 	DBConf         *env.DBConf
 	DOClientID     string
@@ -53,23 +53,23 @@ type RecommenderOpts struct {
 	DOScopes       []string
 	ServerURL      string
 
+	LegacyProjectIDs []uint
+
 	Input map[string]interface{}
 }
 
 type recommenderInput struct {
-	ProjectID uint `form:"required" mapstructure:"project_id"`
-	ClusterID uint `form:"required" mapstructure:"cluster_id"`
+	Projects  []uint `mapstructure:"projects"`
+	ClusterID uint   `mapstructure:"cluster_id"`
+
+	Priority string `mapstructure:"priority"`
 
-	CollectionName string `form:"required" mapstructure:"name"`
+	Categories []string `mapstructure:"categories"`
 }
 
-type Recommendation struct {
-	// ID         RecommendationID
-	Message   string
-	Automatic bool
-	// Severity   RecommendationSeverity
-	Warning    string
-	LastTested time.Time
+type clusterAndProjectID struct {
+	clusterID uint
+	projectID uint
 }
 
 func NewRecommender(
@@ -118,11 +118,62 @@ func NewRecommender(
 		return nil, fmt.Errorf(requestErr.Error())
 	}
 
+	clusterIDs, err := getClustersToParse(db, repo.Cluster(), parsedInput, opts.LegacyProjectIDs)
+
+	if err != nil {
+		return nil, err
+	}
+
 	return &recommender{
-		enqueueTime, db, repo, doConf, parsedInput.ProjectID, parsedInput.ClusterID, parsedInput.CollectionName, opaPolicies,
+		enqueueTime, db, repo, doConf, clusterIDs, parsedInput.Categories, opaPolicies,
 	}, nil
 }
 
+func getClustersToParse(db *gorm.DB, clusterRepo repository.ClusterRepository, input *recommenderInput, legacyProjects []uint) ([]clusterAndProjectID, error) {
+	// if the project and cluster ID is set, make sure that the project id matches the cluster's
+	// project id
+	if input.ClusterID != 0 {
+		if len(input.Projects) != 1 {
+			return nil, fmt.Errorf("if cluster ID is passed, you must pass the matching project ID")
+		}
+
+		_, err := clusterRepo.ReadCluster(input.Projects[0], input.ClusterID)
+
+		if err != nil {
+			return nil, err
+		}
+
+		return []clusterAndProjectID{{
+			clusterID: input.ClusterID,
+			projectID: input.Projects[0],
+		}}, nil
+	}
+
+	// if there are no projects set, query for all clusters within the relevant projects
+	clusters := make([]*models.Cluster, 0)
+
+	query := db.Where(`clusters.project_id IN (?) OR clusters.project_id IN (
+		SELECT p2.id FROM projects AS p2
+		INNER JOIN project_usages ON p2.id=project_usages.project_id
+		WHERE project_usages.resource_cpu != 10 AND project_usages.resource_memory != 20000 AND project_usages.clusters != 1 AND project_usages.users != 1
+	)`, legacyProjects)
+
+	if err := query.Find(&clusters).Error; err != nil {
+		return nil, err
+	}
+
+	res := make([]clusterAndProjectID, 0)
+
+	for _, cluster := range clusters {
+		res = append(res, clusterAndProjectID{
+			clusterID: cluster.ID,
+			projectID: cluster.ProjectID,
+		})
+	}
+
+	return res, nil
+}
+
 func (n *recommender) ID() string {
 	return "recommender"
 }
@@ -132,72 +183,74 @@ func (n *recommender) EnqueueTime() time.Time {
 }
 
 func (n *recommender) Run() error {
-	fmt.Println(n.projectID, n.clusterID)
+	for _, ids := range n.clusterAndProjectIDs {
+		fmt.Println(ids.projectID, ids.clusterID)
 
-	cluster, err := n.repo.Cluster().ReadCluster(n.projectID, n.clusterID)
+		cluster, err := n.repo.Cluster().ReadCluster(ids.projectID, ids.clusterID)
 
-	if err != nil {
-		log.Printf("error reading cluster ID %d: %v. skipping cluster ...", n.clusterID, err)
-		return err
-	}
+		if err != nil {
+			log.Printf("error reading cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
+			continue
+		}
 
-	k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
-		Cluster:                   cluster,
-		Repo:                      n.repo,
-		DigitalOceanOAuth:         n.doConf,
-		AllowInClusterConnections: false,
-	})
+		k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
+			Cluster:                   cluster,
+			Repo:                      n.repo,
+			DigitalOceanOAuth:         n.doConf,
+			AllowInClusterConnections: false,
+		})
 
-	if err != nil {
-		log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", n.clusterID, err)
-		return err
-	}
+		if err != nil {
+			log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
+			continue
+		}
 
-	dynamicClient, err := kubernetes.GetDynamicClientOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
-		Cluster:                   cluster,
-		Repo:                      n.repo,
-		DigitalOceanOAuth:         n.doConf,
-		AllowInClusterConnections: false,
-	})
+		dynamicClient, err := kubernetes.GetDynamicClientOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
+			Cluster:                   cluster,
+			Repo:                      n.repo,
+			DigitalOceanOAuth:         n.doConf,
+			AllowInClusterConnections: false,
+		})
 
-	if err != nil {
-		log.Printf("error getting dynamic client for cluster ID %d: %v. skipping cluster ...", n.clusterID, err)
-		return err
-	}
+		if err != nil {
+			log.Printf("error getting dynamic client for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
+			continue
+		}
 
-	runner := opa.NewRunner(n.policies, k8sAgent, dynamicClient)
+		runner := opa.NewRunner(n.policies, k8sAgent, dynamicClient)
 
-	queryResults, err := runner.GetRecommendationsByName(n.collectionName)
+		queryResults, err := runner.GetRecommendations(n.categories)
 
-	if err != nil {
-		log.Printf("error querying opa policies for cluster ID %d: %v. skipping cluster ...", n.clusterID, err)
-		return err
-	}
+		if err != nil {
+			log.Printf("error querying opa policies for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
+			continue
+		}
 
-	for _, queryRes := range queryResults {
-		fmt.Println(queryRes.ObjectID, queryRes.Allow, queryRes.PolicyTitle, queryRes.PolicyMessage)
+		for _, queryRes := range queryResults {
+			fmt.Println(queryRes.ObjectID, queryRes.Allow, queryRes.PolicyTitle, queryRes.PolicyMessage)
 
-		monitor, err := n.repo.MonitorTestResult().ReadMonitorTestResult(n.projectID, n.clusterID, queryRes.ObjectID)
+			monitor, err := n.repo.MonitorTestResult().ReadMonitorTestResult(ids.projectID, ids.clusterID, queryRes.ObjectID)
 
-		if err != nil {
-			if errors.Is(err, gorm.ErrRecordNotFound) {
-				monitor, err = n.repo.MonitorTestResult().CreateMonitorTestResult(n.getMonitorTestResultFromQueryResult(queryRes))
+			if err != nil {
+				if errors.Is(err, gorm.ErrRecordNotFound) {
+					monitor, err = n.repo.MonitorTestResult().CreateMonitorTestResult(n.getMonitorTestResultFromQueryResult(cluster, queryRes))
+				} else {
+					continue
+				}
 			} else {
-				return err
+				monitor, err = n.repo.MonitorTestResult().UpdateMonitorTestResult(mergeMonitorTestResultFromQueryResult(monitor, queryRes))
 			}
-		} else {
-			monitor, err = n.repo.MonitorTestResult().UpdateMonitorTestResult(mergeMonitorTestResultFromQueryResult(monitor, queryRes))
-		}
 
-		if err != nil {
-			return err
+			if err != nil {
+				continue
+			}
 		}
 	}
 
 	return nil
 }
 
-func (n *recommender) getMonitorTestResultFromQueryResult(queryRes *opa.OPARecommenderQueryResult) *models.MonitorTestResult {
+func (n *recommender) getMonitorTestResultFromQueryResult(cluster *models.Cluster, queryRes *opa.OPARecommenderQueryResult) *models.MonitorTestResult {
 	runResult := types.MonitorTestStatusSuccess
 
 	if !queryRes.Allow {
@@ -207,8 +260,8 @@ func (n *recommender) getMonitorTestResultFromQueryResult(queryRes *opa.OPARecom
 	currTime := time.Now()
 
 	return &models.MonitorTestResult{
-		ProjectID:         n.projectID,
-		ClusterID:         n.clusterID,
+		ProjectID:         cluster.ProjectID,
+		ClusterID:         cluster.ID,
 		Category:          queryRes.CategoryName,
 		ObjectID:          queryRes.ObjectID,
 		LastStatusChange:  &currTime,

+ 9 - 6
workers/main.go

@@ -53,6 +53,8 @@ type EnvConf struct {
 
 	OPAConfigFileDir string `env:"OPA_CONFIG_FILE_DIR,default=./internal/opa"`
 
+	LegacyProjectIDs []uint `env:"LEGACY_PROJECT_IDS"`
+
 	Port uint `env:"PORT,default=3000"`
 }
 
@@ -210,12 +212,13 @@ func getJob(id string, input map[string]interface{}) worker.Job {
 		return newJob
 	} else if id == "recommender" {
 		newJob, err := jobs.NewRecommender(dbConn, time.Now().UTC(), &jobs.RecommenderOpts{
-			DBConf:         &envDecoder.DBConf,
-			DOClientID:     envDecoder.DOClientID,
-			DOClientSecret: envDecoder.DOClientSecret,
-			DOScopes:       []string{"read", "write"},
-			ServerURL:      envDecoder.ServerURL,
-			Input:          input,
+			DBConf:           &envDecoder.DBConf,
+			DOClientID:       envDecoder.DOClientID,
+			DOClientSecret:   envDecoder.DOClientSecret,
+			DOScopes:         []string{"read", "write"},
+			ServerURL:        envDecoder.ServerURL,
+			Input:            input,
+			LegacyProjectIDs: envDecoder.LegacyProjectIDs,
 		}, opaPolicies)
 
 		if err != nil {