Quellcode durchsuchen

use retry helm agent inside worker job

Mohammed Nafees vor 3 Jahren
Ursprung
Commit
6ed0ddbf89
2 geänderte Dateien mit 126 neuen und 4 gelöschten Zeilen
  1. 8 4
      workers/jobs/helm_revisions_count_tracker.go
  2. 118 0
      workers/utils/retry_helm_agent.go

+ 8 - 4
workers/jobs/helm_revisions_count_tracker.go

@@ -24,6 +24,7 @@ import (
 	"fmt"
 	"log"
 	"os"
+	"strings"
 	"sync"
 	"time"
 
@@ -31,6 +32,7 @@ import (
 	"github.com/porter-dev/porter/api/types"
 	"github.com/porter-dev/porter/pkg/logger"
 	"github.com/porter-dev/porter/provisioner/integrations/storage/s3"
+	"github.com/porter-dev/porter/workers/utils"
 
 	"github.com/porter-dev/porter/ee/integrations/vault"
 	"github.com/porter-dev/porter/internal/helm"
@@ -191,13 +193,13 @@ func (t *helmRevisionsCountTracker) Run() error {
 				log.Printf("fetched %d namespaces for cluster ID %d", len(namespaces.Items), cluster.ID)
 
 				for _, ns := range namespaces.Items {
-					agent, err := helm.GetAgentOutOfClusterConfig(&helm.Form{
+					agent, err := utils.NewRetryHelmAgent(&helm.Form{
 						Cluster:                   cluster,
 						Namespace:                 ns.Name,
 						Repo:                      t.repo,
 						DigitalOceanOAuth:         t.doConf,
 						AllowInClusterConnections: false,
-					}, logger.New(true, os.Stdout))
+					}, logger.New(true, os.Stdout), 3, time.Second)
 
 					if err != nil {
 						log.Printf("error fetching helm client for namespace %s in cluster ID %d: %v. "+
@@ -219,7 +221,7 @@ func (t *helmRevisionsCountTracker) Run() error {
 
 					if err != nil {
 						log.Printf("error fetching releases for namespace %s in cluster ID %d: %v. skipping namespace ...",
-							len(releases), ns.Name, cluster.ID, err)
+							ns.Name, cluster.ID, err)
 						continue
 					}
 
@@ -273,7 +275,9 @@ func (t *helmRevisionsCountTracker) Run() error {
 
 							err = agent.DeleteReleaseRevision(rev.Name, rev.Version)
 
-							if err != nil {
+							if err != nil && strings.Contains(err.Error(), "Unauthorized") {
+
+							} else if err != nil {
 								log.Printf("error deleting revision %d of release %s in namespace %s of cluster ID %d: %v",
 									rev.Version, rel.Name, ns.Name, cluster.ID, err)
 								continue

+ 118 - 0
workers/utils/retry_helm_agent.go

@@ -0,0 +1,118 @@
+//go:build ee
+
+package utils
+
+import (
+	"fmt"
+	"os"
+	"strings"
+	"time"
+
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/helm"
+	"github.com/porter-dev/porter/pkg/logger"
+	"helm.sh/helm/v3/pkg/release"
+)
+
+type RetryHelmAgent struct {
+	form          *helm.Form
+	l             *logger.Logger
+	agent         *helm.Agent
+	retryCount    uint
+	retryInterval time.Duration
+}
+
+func NewRetryHelmAgent(
+	form *helm.Form,
+	l *logger.Logger,
+	retryCount uint,
+	retryInterval time.Duration,
+) (*RetryHelmAgent, error) {
+	if l == nil {
+		l = logger.New(true, os.Stdout)
+	}
+
+	helmAgent, err := helm.GetAgentOutOfClusterConfig(form, l)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &RetryHelmAgent{
+		form, l, helmAgent, retryCount, retryInterval,
+	}, nil
+}
+
+func (a *RetryHelmAgent) ListReleases(
+	namespace string,
+	filter *types.ReleaseListFilter,
+) ([]*release.Release, error) {
+	for i := uint(0); i < a.retryCount; i++ {
+		releases, err := a.agent.ListReleases(namespace, filter)
+
+		if err == nil {
+			return releases, nil
+		} else if strings.Contains(err.Error(), "Unauthorized") {
+			a.agent, err = helm.GetAgentOutOfClusterConfig(a.form, a.l)
+
+			if err != nil {
+				return nil, fmt.Errorf("error recreating helm agent for retrying ListReleases: %w", err)
+			}
+		} else {
+			return nil, err
+		}
+
+		time.Sleep(a.retryInterval)
+	}
+
+	return nil, fmt.Errorf("maxiumum number of retries (%d) reached for ListReleases", a.retryCount)
+}
+
+func (a *RetryHelmAgent) GetReleaseHistory(
+	name string,
+) ([]*release.Release, error) {
+	for i := uint(0); i < a.retryCount; i++ {
+		releases, err := a.agent.GetReleaseHistory(name)
+
+		if err == nil {
+			return releases, nil
+		} else if strings.Contains(err.Error(), "Unauthorized") {
+			a.agent, err = helm.GetAgentOutOfClusterConfig(a.form, a.l)
+
+			if err != nil {
+				return nil, fmt.Errorf("error recreating helm agent for retrying GetReleaseHistory: %w", err)
+			}
+		} else {
+			return nil, err
+		}
+
+		time.Sleep(a.retryInterval)
+	}
+
+	return nil, fmt.Errorf("maxiumum number of retries (%d) reached for GetReleaseHistory", a.retryCount)
+}
+
+func (a *RetryHelmAgent) DeleteReleaseRevision(
+	name string,
+	version int,
+) error {
+	for i := uint(0); i < a.retryCount; i++ {
+		err := a.agent.DeleteReleaseRevision(name, version)
+
+		if err == nil {
+			return nil
+		} else if strings.Contains(err.Error(), "Unauthorized") {
+			a.agent, err = helm.GetAgentOutOfClusterConfig(a.form, a.l)
+
+			if err != nil {
+				return fmt.Errorf("error recreating helm agent for retrying DeleteReleaseRevision: %w", err)
+			}
+		} else {
+			return err
+		}
+
+		time.Sleep(a.retryInterval)
+	}
+
+	return fmt.Errorf("maxiumum number of retries (%d) reached for DeleteReleaseRevision", a.retryCount)
+}