Pārlūkot izejas kodu

Merge pull request #1561 from porter-dev/belanger/fix-job-alerting-agent

[POR-295] Filter job alerts to ensure only valid alerts are sent
abelanger5 4 gadi atpakaļ
vecāks
revīzija
36bca9b3ee

+ 133 - 5
api/server/handlers/kube_events/create.go

@@ -14,7 +14,9 @@ import (
 	"github.com/porter-dev/porter/api/server/shared/apierrors"
 	"github.com/porter-dev/porter/api/server/shared/config"
 	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/helm/grapher"
 	"github.com/porter-dev/porter/internal/integrations/slack"
+	"github.com/porter-dev/porter/internal/kubernetes"
 	"github.com/porter-dev/porter/internal/models"
 	"gorm.io/gorm"
 )
@@ -89,7 +91,14 @@ func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
 	w.WriteHeader(http.StatusCreated)
 
 	if strings.ToLower(string(request.EventType)) == "critical" && strings.ToLower(request.ResourceType) == "pod" {
-		err := notifyPodCrashing(c.Config(), proj, cluster, request)
+		agent, err := c.GetAgent(r, cluster, request.Namespace)
+
+		if err != nil {
+			c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+			return
+		}
+
+		err = notifyPodCrashing(c.Config(), agent, proj, cluster, request)
 
 		if err != nil {
 			c.HandleAPIErrorNoWrite(w, r, apierrors.NewErrInternal(err))
@@ -99,6 +108,7 @@ func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
 
 func notifyPodCrashing(
 	config *config.Config,
+	agent *kubernetes.Agent,
 	project *models.Project,
 	cluster *models.Cluster,
 	event *types.CreateKubeEventRequest,
@@ -111,17 +121,60 @@ func notifyPodCrashing(
 	var err error
 
 	if isJob := strings.ToLower(event.OwnerType) == "job"; isJob {
+		// check that the job alert is valid and get proper message
+		jobOwner, jobMsg, jobName, shouldAlert, err := getJobAlert(agent, event.Name, event.Namespace)
+
+		if err != nil {
+			return err
+		} else if !shouldAlert {
+			return nil
+		}
+
+		// look for a matching job notification config
+		jobNC, err := config.Repo.JobNotificationConfig().ReadNotificationConfig(project.ID, cluster.ID, jobName, event.Namespace)
+
+		if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
+			return err
+		}
+
+		if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
+			// if the job notification config does not exist, create it
+			jobNC = &models.JobNotificationConfig{
+				Name:             jobName,
+				Namespace:        event.Namespace,
+				ProjectID:        project.ID,
+				ClusterID:        cluster.ID,
+				LastNotifiedTime: time.Now(),
+			}
+
+			jobNC, err = config.Repo.JobNotificationConfig().CreateNotificationConfig(jobNC)
+
+			if err != nil {
+				return err
+			}
+		} else if err != nil {
+			return err
+		} else if err == nil && jobNC != nil {
+			// If the job notification config does exist, check if the job notification config states that
+			// a notification should happen. If so, notify.
+			if !jobNC.ShouldNotify() {
+				return nil
+			}
+		}
+
 		notifyOpts = &slack.NotifyOpts{
 			ProjectID:   cluster.ProjectID,
 			ClusterID:   cluster.ID,
 			ClusterName: cluster.Name,
-			Name:        event.OwnerName,
+			Name:        jobOwner,
 			Namespace:   event.Namespace,
-			Info:        fmt.Sprintf("%s:%s", event.Reason, event.Message),
+			Info:        fmt.Sprintf("%s", jobMsg),
 			URL: fmt.Sprintf(
-				"%s/jobs?cluster=%s&project_id=%d",
+				"%s/jobs/%s/%s/%s?project_id=%d",
 				config.ServerConf.ServerURL,
-				url.PathEscape(cluster.Name),
+				cluster.Name,
+				event.Namespace,
+				jobOwner,
 				cluster.ProjectID,
 			),
 		}
@@ -230,3 +283,78 @@ func getMatchedPorterRelease(config *config.Config, clusterID uint, ownerName, n
 
 	return rel
 }
+
+func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
+	ownerName string,
+	msg string,
+	jobName string,
+	shouldAlert bool,
+	err error,
+) {
+	ownerName = ""
+
+	pod, err := agent.GetPodByName(name, namespace)
+
+	// if the pod is not found, we should not alert for this pod
+	if err != nil && errors.Is(err, kubernetes.IsNotFoundError) {
+		return "", "", "", false, nil
+	} else if err != nil {
+		return "", "", "", false, err
+	}
+
+	ownerJobName := ""
+
+	// get the owner name for the pod by looking at the owner reference
+	if ownerRefArr := pod.ObjectMeta.OwnerReferences; len(ownerRefArr) > 0 {
+		for _, ownerRef := range ownerRefArr {
+			if strings.ToLower(ownerRef.Kind) == "job" {
+				ownerJobName = ownerRef.Name
+			}
+		}
+	}
+
+	if ownerJobName == "" {
+		return "", "", "", false, nil
+	}
+
+	// lookup the job in the cluster
+	job, err := agent.GetJob(grapher.Object{
+		Kind:      "Job",
+		Name:      ownerJobName,
+		Namespace: namespace,
+	})
+
+	if err != nil {
+		return "", "", "", false, nil
+	}
+
+	if jobReleaseLabel, exists := job.ObjectMeta.Labels["meta.helm.sh/release-name"]; exists {
+		ownerName = jobReleaseLabel
+	}
+
+	// if we don't have an owner name, don't alert -- the link will be broken
+	if ownerName == "" {
+		return "", "", "", false, nil
+	}
+
+	// only alert for jobs that are newer than 24 hours
+	if podTime := pod.Status.StartTime; podTime != nil && podTime.After(time.Now().Add(-24*time.Hour)) {
+		// find container statuses relating to the actual job container. We don't alert on sidecar containers
+		for _, containerStatus := range pod.Status.ContainerStatuses {
+			if containerStatus.Name != "sidecar" && containerStatus.Name != "cloud-sql-proxy" {
+				state := containerStatus.State
+				if state.Terminated != nil && state.Terminated.ExitCode != 0 {
+					msg := fmt.Sprintf("Job terminated with non-zero exit code: exit code %d.", state.Terminated.ExitCode)
+
+					if state.Terminated.Message != "" {
+						msg += fmt.Sprintf(" Error: %s", state.Terminated.Message)
+					}
+
+					return ownerName, msg, ownerJobName, true, nil
+				}
+			}
+		}
+	}
+
+	return "", "", "", false, nil
+}

+ 17 - 0
internal/models/notification.go

@@ -37,3 +37,20 @@ func notifLimitToTime(notifTime string) time.Time {
 	// TODO: compute a time that's not just 5 min
 	return time.Now().Add(-10 * time.Minute)
 }
+
+type JobNotificationConfig struct {
+	gorm.Model
+
+	Name      string
+	Namespace string
+
+	ProjectID uint
+	ClusterID uint
+
+	LastNotifiedTime time.Time
+}
+
+func (conf *JobNotificationConfig) ShouldNotify() bool {
+	// check the last notified time against the notification limit
+	return conf.LastNotifiedTime.Before(time.Now().Add(-24 * time.Hour))
+}

+ 1 - 0
internal/repository/gorm/migrate.go

@@ -27,6 +27,7 @@ func AutoMigrate(db *gorm.DB) error {
 		&models.DNSRecord{},
 		&models.PWResetToken{},
 		&models.NotificationConfig{},
+		&models.JobNotificationConfig{},
 		&models.EventContainer{},
 		&models.SubEvent{},
 		&models.KubeEvent{},

+ 62 - 0
internal/repository/gorm/notification.go

@@ -42,3 +42,65 @@ func (repo NotificationConfigRepository) UpdateNotificationConfig(am *models.Not
 
 	return am, nil
 }
+
+type JobNotificationConfigRepository struct {
+	db *gorm.DB
+}
+
+// NewJobNotificationConfigRepository creates a new JobNotificationConfigRepository
+func NewJobNotificationConfigRepository(db *gorm.DB) repository.JobNotificationConfigRepository {
+	return JobNotificationConfigRepository{db: db}
+}
+
+// CreateNotificationConfig creates a new JobNotificationConfig
+func (repo JobNotificationConfigRepository) CreateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error) {
+	var count int64
+
+	query := repo.db.Where("project_id = ? AND cluster_id = ?", am.ProjectID, am.ClusterID)
+
+	if err := query.Model([]*models.JobNotificationConfig{}).Count(&count).Error; err != nil {
+		return nil, err
+	}
+
+	// if the count is greater than 1000, remove the lowest-order events to implement a
+	// basic fixed-length buffer
+	if count >= 1000 {
+		err := repo.db.Debug().Exec(`
+			  DELETE FROM job_notification_configs 
+			  WHERE project_id = ? AND cluster_id = ? AND 
+			  id NOT IN (
+				SELECT id FROM job_notification_configs j2 WHERE j2.project_id = ? AND j2.cluster_id = ? ORDER BY j2.updated_at desc, j2.id desc LIMIT 999
+			  )
+			`, am.ProjectID, am.ClusterID, am.ProjectID, am.ClusterID).Error
+
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if err := repo.db.Debug().Create(am).Error; err != nil {
+		return nil, err
+	}
+
+	return am, nil
+}
+
+// ReadNotificationConfig reads a JobNotificationConfig by ID
+func (repo JobNotificationConfigRepository) ReadNotificationConfig(projID, clusterID uint, name, namespace string) (*models.JobNotificationConfig, error) {
+	ret := &models.JobNotificationConfig{}
+
+	if err := repo.db.Where("project_id = ? AND cluster_id = ? AND name = ? AND namespace = ?", projID, clusterID, name, namespace).First(&ret).Error; err != nil {
+		return nil, err
+	}
+
+	return ret, nil
+}
+
+// UpdateNotificationConfig updates a given JobNotificationConfig
+func (repo JobNotificationConfigRepository) UpdateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error) {
+	if err := repo.db.Save(am).Error; err != nil {
+		return nil, err
+	}
+
+	return am, nil
+}

+ 6 - 0
internal/repository/gorm/repository.go

@@ -31,6 +31,7 @@ type GormRepository struct {
 	githubAppOAuthIntegration repository.GithubAppOAuthIntegrationRepository
 	slackIntegration          repository.SlackIntegrationRepository
 	notificationConfig        repository.NotificationConfigRepository
+	jobNotificationConfig     repository.JobNotificationConfigRepository
 	buildEvent                repository.BuildEventRepository
 	kubeEvent                 repository.KubeEventRepository
 	projectUsage              repository.ProjectUsageRepository
@@ -136,6 +137,10 @@ func (t *GormRepository) NotificationConfig() repository.NotificationConfigRepos
 	return t.notificationConfig
 }
 
+func (t *GormRepository) JobNotificationConfig() repository.JobNotificationConfigRepository {
+	return t.jobNotificationConfig
+}
+
 func (t *GormRepository) BuildEvent() repository.BuildEventRepository {
 	return t.buildEvent
 }
@@ -192,6 +197,7 @@ func NewRepository(db *gorm.DB, key *[32]byte, storageBackend credentials.Creden
 		githubAppOAuthIntegration: NewGithubAppOAuthIntegrationRepository(db),
 		slackIntegration:          NewSlackIntegrationRepository(db, key),
 		notificationConfig:        NewNotificationConfigRepository(db),
+		jobNotificationConfig:     NewJobNotificationConfigRepository(db),
 		buildEvent:                NewBuildEventRepository(db),
 		kubeEvent:                 NewKubeEventRepository(db, key),
 		projectUsage:              NewProjectUsageRepository(db),

+ 6 - 0
internal/repository/notification.go

@@ -9,3 +9,9 @@ type NotificationConfigRepository interface {
 	ReadNotificationConfig(id uint) (*models.NotificationConfig, error)
 	UpdateNotificationConfig(am *models.NotificationConfig) (*models.NotificationConfig, error)
 }
+
+type JobNotificationConfigRepository interface {
+	CreateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error)
+	ReadNotificationConfig(projID, clusterID uint, name, namespace string) (*models.JobNotificationConfig, error)
+	UpdateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error)
+}

+ 1 - 0
internal/repository/repository.go

@@ -25,6 +25,7 @@ type Repository interface {
 	GithubAppOAuthIntegration() GithubAppOAuthIntegrationRepository
 	SlackIntegration() SlackIntegrationRepository
 	NotificationConfig() NotificationConfigRepository
+	JobNotificationConfig() JobNotificationConfigRepository
 	BuildEvent() BuildEventRepository
 	KubeEvent() KubeEventRepository
 	ProjectUsage() ProjectUsageRepository

+ 18 - 0
internal/repository/test/notification.go

@@ -22,3 +22,21 @@ func (n *NotificationConfigRepository) ReadNotificationConfig(id uint) (*models.
 func (n *NotificationConfigRepository) UpdateNotificationConfig(am *models.NotificationConfig) (*models.NotificationConfig, error) {
 	panic("not implemented") // TODO: Implement
 }
+
+type JobNotificationConfigRepository struct{}
+
+func NewJobNotificationConfigRepository(canQuery bool) repository.JobNotificationConfigRepository {
+	return &JobNotificationConfigRepository{}
+}
+
+func (n *JobNotificationConfigRepository) CreateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error) {
+	panic("not implemented") // TODO: Implement
+}
+
+func (n *JobNotificationConfigRepository) ReadNotificationConfig(projID, clusterID uint, name, namespace string) (*models.JobNotificationConfig, error) {
+	panic("not implemented") // TODO: Implement
+}
+
+func (n *JobNotificationConfigRepository) UpdateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error) {
+	panic("not implemented") // TODO: Implement
+}

+ 6 - 0
internal/repository/test/repository.go

@@ -29,6 +29,7 @@ type TestRepository struct {
 	githubAppOAuthIntegration repository.GithubAppOAuthIntegrationRepository
 	slackIntegration          repository.SlackIntegrationRepository
 	notificationConfig        repository.NotificationConfigRepository
+	jobNotificationConfig     repository.JobNotificationConfigRepository
 	buildEvent                repository.BuildEventRepository
 	kubeEvent                 repository.KubeEventRepository
 	projectUsage              repository.ProjectUsageRepository
@@ -134,6 +135,10 @@ func (t *TestRepository) NotificationConfig() repository.NotificationConfigRepos
 	return t.notificationConfig
 }
 
+func (t *TestRepository) JobNotificationConfig() repository.JobNotificationConfigRepository {
+	return t.jobNotificationConfig
+}
+
 func (t *TestRepository) BuildEvent() repository.BuildEventRepository {
 	return t.buildEvent
 }
@@ -190,6 +195,7 @@ func NewRepository(canQuery bool, failingMethods ...string) repository.Repositor
 		githubAppOAuthIntegration: NewGithubAppOAuthIntegrationRepository(canQuery),
 		slackIntegration:          NewSlackIntegrationRepository(canQuery),
 		notificationConfig:        NewNotificationConfigRepository(canQuery),
+		jobNotificationConfig:     NewJobNotificationConfigRepository(canQuery),
 		buildEvent:                NewBuildEventRepository(canQuery),
 		kubeEvent:                 NewKubeEventRepository(canQuery),
 		projectUsage:              NewProjectUsageRepository(canQuery),