Selaa lähdekoodia

namespaces matter

Mohammed Nafees 3 vuotta sitten
vanhempi
sitoutus
677ab28943
2 muutettua tiedostoa jossa 60 lisäystä ja 22 poistoa
  1. 53 15
      workers/jobs/helm_revisions_count_tracker.go
  2. 7 7
      workers/main.go

+ 53 - 15
workers/jobs/helm_revisions_count_tracker.go

@@ -18,6 +18,7 @@ import (
 	"github.com/porter-dev/porter/ee/integrations/vault"
 	"github.com/porter-dev/porter/internal/adapter"
 	"github.com/porter-dev/porter/internal/helm"
+	"github.com/porter-dev/porter/internal/kubernetes"
 	"github.com/porter-dev/porter/internal/models"
 	"github.com/porter-dev/porter/internal/oauth"
 	"github.com/porter-dev/porter/internal/repository"
@@ -54,7 +55,7 @@ type HelmRevisionsCountTrackerOpts struct {
 	AWSSecretAccessKey string
 	AWSRegion          string
 	S3BucketName       string
-	EncryptionKey      *[32]byte
+	EncryptionKey      string
 }
 
 func NewHelmRevisionsCountTracker(
@@ -92,10 +93,16 @@ func NewHelmRevisionsCountTracker(
 		BaseURL:      opts.ServerURL,
 	})
 
+	var s3Key [32]byte
+
+	for i, b := range []byte(opts.EncryptionKey) {
+		s3Key[i] = b
+	}
+
 	return &helmRevisionsCountTracker{
 		enqueueTime, db, repo, doConf, opts.DBConf, credBackend,
 		opts.AWSAccessKeyID, opts.AWSSecretAccessKey, opts.AWSRegion,
-		opts.S3BucketName, opts.EncryptionKey,
+		opts.S3BucketName, &s3Key,
 	}, nil
 }
 
@@ -128,20 +135,15 @@ func (t *helmRevisionsCountTracker) Run() error {
 		for _, cluster := range clusters {
 			wg.Add(1)
 
-			go func(cluster *models.Cluster) {
+			go func(projID, clusterID uint) {
 				defer wg.Done()
 
 				log.Printf("starting release revision monitoring for cluster with ID %d", cluster.ID)
 
-				agent, err := helm.GetAgentOutOfClusterConfig(&helm.Form{
-					Cluster:                   cluster,
-					Repo:                      t.repo,
-					DigitalOceanOAuth:         t.doConf,
-					AllowInClusterConnections: false,
-				}, logger.New(true, os.Stdout))
+				cluster, err := t.repo.Cluster().ReadCluster(projID, clusterID)
 
 				if err != nil {
-					log.Printf("error fetching helm client for cluster ID %d: %v. skipping cluster ...", cluster.ID, err)
+					log.Printf("error reading cluster ID %d: %v. skipping cluster ...", clusterID, err)
 					return
 				}
 
@@ -155,7 +157,19 @@ func (t *helmRevisionsCountTracker) Run() error {
 					return
 				}
 
-				namespaces, err := agent.K8sAgent.ListNamespaces()
+				k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
+					Cluster:                   cluster,
+					Repo:                      t.repo,
+					DigitalOceanOAuth:         t.doConf,
+					AllowInClusterConnections: false,
+				})
+
+				if err != nil {
+					log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", cluster.ID, err)
+					return
+				}
+
+				namespaces, err := k8sAgent.ListNamespaces()
 
 				if err != nil {
 					log.Printf("error fetching namespaces for cluster ID %d: %v. skipping cluster ...", cluster.ID, err)
@@ -165,7 +179,31 @@ func (t *helmRevisionsCountTracker) Run() error {
 				log.Printf("fetched %d namespaces for cluster ID %d", len(namespaces.Items), cluster.ID)
 
 				for _, ns := range namespaces.Items {
-					releases, err := agent.ListReleases(ns.GetName(), &types.ReleaseListFilter{ByDate: true})
+					agent, err := helm.GetAgentOutOfClusterConfig(&helm.Form{
+						Cluster:                   cluster,
+						Namespace:                 ns.Name,
+						Repo:                      t.repo,
+						DigitalOceanOAuth:         t.doConf,
+						AllowInClusterConnections: false,
+					}, logger.New(true, os.Stdout))
+
+					if err != nil {
+						log.Printf("error fetching helm client for namespace %s in cluster ID %d: %v. "+
+							"skipping namespace ...", ns.Name, cluster.ID, err)
+						continue
+					}
+
+					releases, err := agent.ListReleases(ns.GetName(), &types.ReleaseListFilter{
+						ByDate: true,
+						StatusFilter: []string{
+							"deployed",
+							"pending",
+							"pending-install",
+							"pending-upgrade",
+							"pending-rollback",
+							"failed",
+						},
+					})
 
 					if err != nil {
 						log.Printf("error fetching releases for namespace %s in cluster ID %d: %v. skipping namespace ...",
@@ -213,7 +251,7 @@ func (t *helmRevisionsCountTracker) Run() error {
 								cluster.ID, rel.Namespace, rel.Name, rev.Version))
 
 							if err != nil {
-								log.Printf("error backing up revision for release %s, number %s: %v. skipping revision ...",
+								log.Printf("error backing up revision for release %s, number %d: %v. skipping revision ...",
 									rev.Name, rev.Version, err)
 								continue
 							}
@@ -225,7 +263,7 @@ func (t *helmRevisionsCountTracker) Run() error {
 
 							if err != nil {
 								log.Printf("error deleting revision %d of release %s in namespace %s of cluster ID %d: %v",
-									rev.Version, rel.Name, ns.Name, cluster.ID)
+									rev.Version, rel.Name, ns.Name, cluster.ID, err)
 								continue
 							}
 
@@ -234,7 +272,7 @@ func (t *helmRevisionsCountTracker) Run() error {
 						}
 					}
 				}
-			}(cluster)
+			}(cluster.ProjectID, cluster.ID)
 		}
 
 		wg.Wait()

+ 7 - 7
workers/main.go

@@ -29,13 +29,13 @@ type EnvConf struct {
 	DOClientID         string `env:"DO_CLIENT_ID"`
 	DOClientSecret     string `env:"DO_CLIENT_SECRET"`
 	DBConf             env.DBConf
-	MaxWorkers         uint      `env:"MAX_WORKERS,default=10"`
-	MaxQueue           uint      `env:"MAX_QUEUE,default=100"`
-	AWSAccessKeyID     string    `env:"AWS_ACCESS_KEY_ID"`
-	AWSSecretAccessKey string    `env:"AWS_SECRET_ACCESS_KEY"`
-	AWSRegion          string    `env:"AWS_REGION"`
-	S3BucketName       string    `env:"S3_BUCKET_NAME"`
-	EncryptionKey      *[32]byte `env:"ENCRYPTION_KEY"`
+	MaxWorkers         uint   `env:"MAX_WORKERS,default=10"`
+	MaxQueue           uint   `env:"MAX_QUEUE,default=100"`
+	AWSAccessKeyID     string `env:"AWS_ACCESS_KEY_ID"`
+	AWSSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
+	AWSRegion          string `env:"AWS_REGION"`
+	S3BucketName       string `env:"S3_BUCKET_NAME"`
+	EncryptionKey      string `env:"S3_ENCRYPTION_KEY"`
 }
 
 func main() {