소스 검색

create job notification config object, use it to reduce amount of alerts

Alexander Belanger 4 년 전
부모
커밋
368edee5fa

+ 41 - 8
api/server/handlers/kube_events/create.go

@@ -122,7 +122,7 @@ func notifyPodCrashing(
 
 	if isJob := strings.ToLower(event.OwnerType) == "job"; isJob {
 		// check that the job alert is valid and get proper message
-		jobOwner, jobMsg, shouldAlert, err := getJobAlert(agent, event.Name, event.Namespace)
+		jobOwner, jobMsg, jobName, shouldAlert, err := getJobAlert(agent, event.Name, event.Namespace)
 
 		if err != nil {
 			return err
@@ -130,6 +130,38 @@ func notifyPodCrashing(
 			return nil
 		}
 
+		// look for a matching job notification config
+		jobNC, err := config.Repo.JobNotificationConfig().ReadNotificationConfig(project.ID, cluster.ID, jobName, event.Namespace)
+
+		if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
+			return err
+		}
+
+		if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
+			// if the job notification config does not exist, create it
+			jobNC = &models.JobNotificationConfig{
+				Name:             jobName,
+				Namespace:        event.Namespace,
+				ProjectID:        project.ID,
+				ClusterID:        cluster.ID,
+				LastNotifiedTime: time.Now(),
+			}
+
+			jobNC, err = config.Repo.JobNotificationConfig().CreateNotificationConfig(jobNC)
+
+			if err != nil {
+				return err
+			}
+		} else if err != nil {
+			return err
+		} else if err == nil && jobNC != nil {
+			// If the job notification config does exist, check if the job notification config states that
+			// a notification should happen. If so, notify.
+			if !jobNC.ShouldNotify() {
+				return nil
+			}
+		}
+
 		notifyOpts = &slack.NotifyOpts{
 			ProjectID:   cluster.ProjectID,
 			ClusterID:   cluster.ID,
@@ -255,6 +287,7 @@ func getMatchedPorterRelease(config *config.Config, clusterID uint, ownerName, n
 func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
 	ownerName string,
 	msg string,
+	jobName string,
 	shouldAlert bool,
 	err error,
 ) {
@@ -264,9 +297,9 @@ func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
 
 	// if the pod is not found, we should not alert for this pod
 	if err != nil && errors.Is(err, kubernetes.IsNotFoundError) {
-		return "", "", false, nil
+		return "", "", "", false, nil
 	} else if err != nil {
-		return "", "", false, err
+		return "", "", "", false, err
 	}
 
 	ownerJobName := ""
@@ -281,7 +314,7 @@ func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
 	}
 
 	if ownerJobName == "" {
-		return "", "", false, nil
+		return "", "", "", false, nil
 	}
 
 	// lookup the job in the cluster
@@ -292,7 +325,7 @@ func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
 	})
 
 	if err != nil {
-		return "", "", false, nil
+		return "", "", "", false, nil
 	}
 
 	if jobReleaseLabel, exists := job.ObjectMeta.Labels["meta.helm.sh/release-name"]; exists {
@@ -301,7 +334,7 @@ func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
 
 	// if we don't have an owner name, don't alert -- the link will be broken
 	if ownerName == "" {
-		return "", "", false, nil
+		return "", "", "", false, nil
 	}
 
 	// only alert for jobs that are newer than 24 hours
@@ -317,11 +350,11 @@ func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
 						msg += fmt.Sprintf(" Error: %s", state.Terminated.Message)
 					}
 
-					return ownerName, msg, true, nil
+					return ownerName, msg, ownerJobName, true, nil
 				}
 			}
 		}
 	}
 
-	return "", "", false, nil
+	return "", "", "", false, nil
 }

+ 17 - 0
internal/models/notification.go

@@ -37,3 +37,20 @@ func notifLimitToTime(notifTime string) time.Time {
 	// TODO: compute a time that's not just 5 min
 	return time.Now().Add(-10 * time.Minute)
 }
+
+type JobNotificationConfig struct {
+	gorm.Model
+
+	Name      string
+	Namespace string
+
+	ProjectID uint
+	ClusterID uint
+
+	LastNotifiedTime time.Time
+}
+
+func (conf *JobNotificationConfig) ShouldNotify() bool {
+	// check the last notified time against the notification limit
+	return conf.LastNotifiedTime.Before(time.Now().Add(-24 * time.Hour))
+}

+ 62 - 0
internal/repository/gorm/notification.go

@@ -42,3 +42,65 @@ func (repo NotificationConfigRepository) UpdateNotificationConfig(am *models.Not
 
 	return am, nil
 }
+
+type JobNotificationConfigRepository struct {
+	db *gorm.DB
+}
+
+// NewJobNotificationConfigRepository creates a new JobNotificationConfigRepository
+func NewJobNotificationConfigRepository(db *gorm.DB) repository.JobNotificationConfigRepository {
+	return JobNotificationConfigRepository{db: db}
+}
+
+// CreateNotificationConfig creates a new JobNotificationConfig
+func (repo JobNotificationConfigRepository) CreateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error) {
+	var count int64
+
+	query := repo.db.Where("project_id = ? AND cluster_id = ?", am.ProjectID, am.ClusterID)
+
+	if err := query.Model([]*models.JobNotificationConfig{}).Count(&count).Error; err != nil {
+		return nil, err
+	}
+
+	// if the count is greater than 500, remove the lowest-order events to implement a
+	// basic fixed-length buffer
+	if count >= 500 {
+		err := repo.db.Debug().Exec(`
+			  DELETE FROM job_notification_configs 
+			  WHERE project_id = ? AND cluster_id = ? AND 
+			  id NOT IN (
+				SELECT id FROM job_notification_configs j2 WHERE j2.project_id = ? AND j2.cluster_id = ? ORDER BY j2.updated_at desc, j2.id desc LIMIT 499
+			  )
+			`, am.ProjectID, am.ClusterID, am.ProjectID, am.ClusterID).Error
+
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if err := repo.db.Debug().Create(am).Error; err != nil {
+		return nil, err
+	}
+
+	return am, nil
+}
+
+// ReadNotificationConfig reads a JobNotificationConfig by ID
+func (repo JobNotificationConfigRepository) ReadNotificationConfig(projID, clusterID uint, name, namespace string) (*models.JobNotificationConfig, error) {
+	ret := &models.JobNotificationConfig{}
+
+	if err := repo.db.Where("project_id = ? AND cluster_id = ? AND name = ? AND namespace = ?", projID, clusterID, name, namespace).First(&ret).Error; err != nil {
+		return nil, err
+	}
+
+	return ret, nil
+}
+
+// UpdateNotificationConfig updates a given JobNotificationConfig
+func (repo JobNotificationConfigRepository) UpdateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error) {
+	if err := repo.db.Save(am).Error; err != nil {
+		return nil, err
+	}
+
+	return am, nil
+}

+ 6 - 0
internal/repository/gorm/repository.go

@@ -31,6 +31,7 @@ type GormRepository struct {
 	githubAppOAuthIntegration repository.GithubAppOAuthIntegrationRepository
 	slackIntegration          repository.SlackIntegrationRepository
 	notificationConfig        repository.NotificationConfigRepository
+	jobNotificationConfig     repository.JobNotificationConfigRepository
 	buildEvent                repository.BuildEventRepository
 	kubeEvent                 repository.KubeEventRepository
 	projectUsage              repository.ProjectUsageRepository
@@ -136,6 +137,10 @@ func (t *GormRepository) NotificationConfig() repository.NotificationConfigRepos
 	return t.notificationConfig
 }
 
+func (t *GormRepository) JobNotificationConfig() repository.JobNotificationConfigRepository {
+	return t.jobNotificationConfig
+}
+
 func (t *GormRepository) BuildEvent() repository.BuildEventRepository {
 	return t.buildEvent
 }
@@ -192,6 +197,7 @@ func NewRepository(db *gorm.DB, key *[32]byte, storageBackend credentials.Creden
 		githubAppOAuthIntegration: NewGithubAppOAuthIntegrationRepository(db),
 		slackIntegration:          NewSlackIntegrationRepository(db, key),
 		notificationConfig:        NewNotificationConfigRepository(db),
+		jobNotificationConfig:     NewJobNotificationConfigRepository(db),
 		buildEvent:                NewBuildEventRepository(db),
 		kubeEvent:                 NewKubeEventRepository(db, key),
 		projectUsage:              NewProjectUsageRepository(db),

+ 6 - 0
internal/repository/notification.go

@@ -9,3 +9,9 @@ type NotificationConfigRepository interface {
 	ReadNotificationConfig(id uint) (*models.NotificationConfig, error)
 	UpdateNotificationConfig(am *models.NotificationConfig) (*models.NotificationConfig, error)
 }
+
+type JobNotificationConfigRepository interface {
+	CreateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error)
+	ReadNotificationConfig(projID, clusterID uint, name, namespace string) (*models.JobNotificationConfig, error)
+	UpdateNotificationConfig(am *models.JobNotificationConfig) (*models.JobNotificationConfig, error)
+}

+ 1 - 0
internal/repository/repository.go

@@ -25,6 +25,7 @@ type Repository interface {
 	GithubAppOAuthIntegration() GithubAppOAuthIntegrationRepository
 	SlackIntegration() SlackIntegrationRepository
 	NotificationConfig() NotificationConfigRepository
+	JobNotificationConfig() JobNotificationConfigRepository
 	BuildEvent() BuildEventRepository
 	KubeEvent() KubeEventRepository
 	ProjectUsage() ProjectUsageRepository