Ver código fonte

handle prometheus alerts (#4510)

Yosef Mihretie 2 anos atrás
pai
commit
7e9f411d67

+ 110 - 0
api/server/handlers/webhook/prometheus_incoming.go

@@ -0,0 +1,110 @@
+package webhook
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+
+	"connectrpc.com/connect"
+
+	porterv1 "github.com/porter-dev/api-contracts/generated/go/porter/v1"
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/requestutils"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/telemetry"
+)
+
+// PrometheusAlertWebhookHandler handles incoming prometheus alerts
+type PrometheusAlertWebhookHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+// NewPrometheusAlertWebhookHandler returns an instance of PrometheusAlertWebhookHandler
+func NewPrometheusAlertWebhookHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *PrometheusAlertWebhookHandler {
+	return &PrometheusAlertWebhookHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (p *PrometheusAlertWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx, span := telemetry.NewSpan(r.Context(), "serve-post-prometheus-alert")
+	defer span.End()
+
+	// get the webhook id from the request
+	projectID, err := requestutils.GetURLParamUint(r, types.URLParamProjectID)
+	if err != nil {
+		e := telemetry.Error(ctx, span, err, "error getting project ID")
+		p.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(e, http.StatusBadRequest))
+		return
+	}
+	clusterID, err := requestutils.GetURLParamUint(r, types.URLParamClusterID)
+	if err != nil {
+		e := telemetry.Error(ctx, span, nil, "error getting cluster ID")
+		p.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(e, http.StatusBadRequest))
+		return
+	}
+
+	prometheusAlert := &types.PrometheusAlert{}
+	if ok := p.DecodeAndValidate(w, r, prometheusAlert); !ok {
+		e := telemetry.Error(ctx, span, nil, "error decoding request")
+		p.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(e, http.StatusBadRequest))
+		return
+	}
+	if err := p.handlePrometheusAlert(ctx, int64(projectID), int64(clusterID), prometheusAlert); err != nil {
+		e := telemetry.Error(ctx, span, err, "error handling prometheus alert")
+		p.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(e, http.StatusInternalServerError))
+		return
+	}
+}
+
+func (p *PrometheusAlertWebhookHandler) handlePrometheusAlert(ctx context.Context, projectId, clusterId int64, prometheusAlert *types.PrometheusAlert) error {
+	ctx, span := telemetry.NewSpan(ctx, "porter-process-prom-alert")
+	defer span.End()
+	recordPrometheusAlertRequest := connect.NewRequest(&porterv1.RecordPrometheusAlertRequest{
+		ProjectId: projectId,
+		ClusterId: clusterId,
+	})
+	labelKeyValues := ""
+	for _, alert := range prometheusAlert.Alerts {
+		for k, v := range alert.Labels {
+			labelKeyValues += fmt.Sprintf("%s %s", k, v)
+		}
+		if alert.Labels["alertname"] == "NoopAlert" {
+			continue
+		}
+		recordPrometheusAlertRequest.Msg.Alerts = append(recordPrometheusAlertRequest.Msg.Alerts, &porterv1.Alert{
+			Name:      alert.Labels["name"],
+			Namespace: alert.Labels["namespace"],
+			Type:      p.getType(alert),
+		})
+	}
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "porter-app-alert-labels", Value: labelKeyValues})
+	_, err := p.Config().ClusterControlPlaneClient.RecordPrometheusAlert(ctx, recordPrometheusAlertRequest)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error recording prometheus alert")
+	}
+	return nil
+}
+
+func (p *PrometheusAlertWebhookHandler) getType(alert types.Alert) porterv1.InvolvedObjectType {
+	switch alert.Labels["involvedObjectType"] {
+	case "Deployment":
+		return porterv1.InvolvedObjectType_INVOLVED_OBJECT_TYPE_DEPLOYMENT
+	case "StatefulSet":
+		return porterv1.InvolvedObjectType_INVOLVED_OBJECT_TYPE_STATEFULSET
+	case "DaemonSet":
+		return porterv1.InvolvedObjectType_INVOLVED_OBJECT_TYPE_DAEMONSET
+	default:
+		return porterv1.InvolvedObjectType_INVOLVED_OBJECT_TYPE_UNSPECIFIED
+	}
+}

+ 25 - 0
api/server/router/base.go

@@ -567,5 +567,30 @@ func GetBaseRoutes(
 		})
 	}
 
+	// POST /api/webhooks/prometheusalerts/{project_id}/{cluster_id} -> webhook.NewPrometheusAlertsHandler
+	prometheusAlertWebhookEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbCreate,
+			Method: types.HTTPVerbPost,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: fmt.Sprintf("/webhooks/prometheusalerts/{%s}/{%s}", types.URLParamProjectID, types.URLParamClusterID),
+			},
+			Scopes: []types.PermissionScope{},
+		},
+	)
+
+	prometheusAlertWebhookHandler := webhook.NewPrometheusAlertWebhookHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &router.Route{
+		Endpoint: prometheusAlertWebhookEndpoint,
+		Handler:  prometheusAlertWebhookHandler,
+		Router:   r,
+	})
+
 	return routes
 }

+ 26 - 0
api/types/prometheus_alerts.go

@@ -0,0 +1,26 @@
+package types
+
+// PrometheusAlert represents a Prometheus alert payload
+type PrometheusAlert struct {
+	Receiver          string            `json:"receiver"`
+	Status            string            `json:"status"`
+	Alerts            []Alert           `json:"alerts"`
+	GroupLabels       map[string]string `json:"groupLabels"`
+	CommonLabels      map[string]string `json:"commonLabels"`
+	CommonAnnotations map[string]string `json:"commonAnnotations"`
+	ExternalURL       string            `json:"externalURL"`
+	Version           string            `json:"version"`
+	GroupKey          string            `json:"groupKey"`
+	TruncatedAlerts   int               `json:"truncatedAlerts"`
+}
+
+// Alert represents a single alert
+type Alert struct {
+	Status       string            `json:"status"`
+	Labels       map[string]string `json:"labels"`
+	Annotations  map[string]string `json:"annotations"`
+	StartsAt     string            `json:"startsAt"`
+	EndsAt       string            `json:"endsAt"`
+	GeneratorURL string            `json:"generatorURL"`
+	Fingerprint  string            `json:"fingerprint"`
+}

+ 1 - 1
go.mod

@@ -85,7 +85,7 @@ require (
 	github.com/matryer/is v1.4.0
 	github.com/nats-io/nats.go v1.24.0
 	github.com/open-policy-agent/opa v0.44.0
-	github.com/porter-dev/api-contracts v0.2.142
+	github.com/porter-dev/api-contracts v0.2.143
 	github.com/riandyrn/otelchi v0.5.1
 	github.com/santhosh-tekuri/jsonschema/v5 v5.0.1
 	github.com/stefanmcshane/helm v0.0.0-20221213002717-88a4a2c6e77d

+ 2 - 2
go.sum

@@ -1552,8 +1552,8 @@ github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/polyfloyd/go-errorlint v0.0.0-20210722154253-910bb7978349/go.mod h1:wi9BfjxjF/bwiZ701TzmfKu6UKC357IOAtNr0Td0Lvw=
-github.com/porter-dev/api-contracts v0.2.142 h1:vpbrdAuDpC09boJWUCUkm0t455H9xOd1KfP3/M61zbg=
-github.com/porter-dev/api-contracts v0.2.142/go.mod h1:VV5BzXd02ZdbWIPLVP+PX3GKawJSGQnxorVT2sUZALU=
+github.com/porter-dev/api-contracts v0.2.143 h1:sfAGEACY4N3xw9HmxXMUHJjbHlvclZUGnKonMUQ+VLE=
+github.com/porter-dev/api-contracts v0.2.143/go.mod h1:VV5BzXd02ZdbWIPLVP+PX3GKawJSGQnxorVT2sUZALU=
 github.com/porter-dev/switchboard v0.0.3 h1:dBuYkiVLa5Ce7059d6qTe9a1C2XEORFEanhbtV92R+M=
 github.com/porter-dev/switchboard v0.0.3/go.mod h1:xSPzqSFMQ6OSbp42fhCi4AbGbQbsm6nRvOkrblFeXU4=
 github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=

+ 41 - 0
internal/models/system_service_status.go

@@ -0,0 +1,41 @@
+package models
+
+import (
+	"time"
+
+	"github.com/google/uuid"
+	"gorm.io/gorm"
+)
+
+// SystemServiceStatus represents a status entry in a database for a single service in a specific cluster
+type SystemServiceStatus struct {
+	gorm.Model
+
+	// ID is a unique identifier for a given event
+	ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"`
+
+	// CreatedAt is the time (UTC) that a given status was created. This should not change.
+	CreatedAt time.Time `json:"created_at"`
+	// UpdatedAt is the time (UTC) that the status was last updated.
+	UpdatedAt time.Time `json:"updated_at"`
+
+	ProjectID uint `json:"project_id"`
+
+	ClusterID uint `json:"cluster_id"`
+
+	// InvolvedObjectType is the type of k8s object that the service runs
+	// this is currently expected to be "Deployment", "StatefulSet" or "DaemonSet"
+	InvolvedObjectType string `json:"involved_object_type"`
+
+	Name string `json:"name"`
+
+	Namespace string `json:"namespace"`
+
+	// Any other relevant metadata. This field allows us to be flexible in the future.
+	Metadata JSONB `json:"metadata" sql:"type:jsonb" gorm:"type:jsonb"`
+}
+
+// TableName overrides the table name
+func (SystemServiceStatus) TableName() string {
+	return "system_service_status"
+}

+ 1 - 0
internal/repository/gorm/migrate.go

@@ -62,6 +62,7 @@ func AutoMigrate(db *gorm.DB, debug bool) error {
 		&models.AWSAssumeRoleChain{},
 		&models.PorterApp{},
 		&models.PorterAppEvent{},
+		&models.SystemServiceStatus{},
 		&models.AppRevision{},
 		&models.AppInstance{},
 		&models.DeploymentTarget{},

+ 7 - 0
internal/repository/gorm/repository.go

@@ -53,6 +53,7 @@ type GormRepository struct {
 	awsAssumeRoleChainer      repository.AWSAssumeRoleChainer
 	porterApp                 repository.PorterAppRepository
 	porterAppEvent            repository.PorterAppEventRepository
+	systemServiceStatus       repository.SystemServiceStatusRepository
 	deploymentTarget          repository.DeploymentTargetRepository
 	appRevision               repository.AppRevisionRepository
 	appTemplate               repository.AppTemplateRepository
@@ -246,6 +247,11 @@ func (t *GormRepository) PorterAppEvent() repository.PorterAppEventRepository {
 	return t.porterAppEvent
 }
 
+// SystemServiceStatus returns a SystemServiceStatusRepository
+func (t *GormRepository) SystemServiceStatus() repository.SystemServiceStatusRepository {
+	return t.systemServiceStatus
+}
+
 // DeploymentTarget returns the DeploymentTargetRepository interface implemented by gorm
 func (t *GormRepository) DeploymentTarget() repository.DeploymentTargetRepository {
 	return t.deploymentTarget
@@ -331,6 +337,7 @@ func NewRepository(db *gorm.DB, key *[32]byte, storageBackend credentials.Creden
 		awsAssumeRoleChainer:      NewAWSAssumeRoleChainer(db),
 		porterApp:                 NewPorterAppRepository(db),
 		porterAppEvent:            NewPorterAppEventRepository(db),
+		systemServiceStatus:       NewSystemServiceStatusRepository(db),
 		deploymentTarget:          NewDeploymentTargetRepository(db),
 		appRevision:               NewAppRevisionRepository(db),
 		appTemplate:               NewAppTemplateRepository(db),

+ 35 - 0
internal/repository/gorm/system_service_status.go

@@ -0,0 +1,35 @@
+package gorm
+
+import (
+	"context"
+	"errors"
+
+	"github.com/google/uuid"
+	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/repository"
+	"gorm.io/gorm"
+)
+
+// SystemServiceStatusRepository uses gorm.DB for querying the database
+type SystemServiceStatusRepository struct {
+	db *gorm.DB
+}
+
+// NewSystemServiceStatusRepository returns a SystemServiceStatusRepository which uses
+// gorm.DB for querying the database
+func NewSystemServiceStatusRepository(db *gorm.DB) repository.SystemServiceStatusRepository {
+	return &SystemServiceStatusRepository{db}
+}
+
+// ReadSystemServiceStatus is currently a noop method
+func (repo *SystemServiceStatusRepository) ReadSystemServiceStatus(ctx context.Context, id uuid.UUID) (models.SystemServiceStatus, error) {
+	status := models.SystemServiceStatus{}
+	if id == uuid.Nil {
+		return status, errors.New("invalid porter app event id supplied")
+	}
+	strID := id.String()
+	if err := repo.db.Where("id = ?", strID).First(&status).Error; err != nil {
+		return status, err
+	}
+	return status, nil
+}

+ 1 - 0
internal/repository/repository.go

@@ -47,6 +47,7 @@ type Repository interface {
 	AWSAssumeRoleChainer() AWSAssumeRoleChainer
 	PorterApp() PorterAppRepository
 	PorterAppEvent() PorterAppEventRepository
+	SystemServiceStatus() SystemServiceStatusRepository
 	DeploymentTarget() DeploymentTargetRepository
 	AppRevision() AppRevisionRepository
 	AppTemplate() AppTemplateRepository

+ 13 - 0
internal/repository/system_service_status.go

@@ -0,0 +1,13 @@
+package repository
+
+import (
+	"context"
+
+	"github.com/google/uuid"
+	"github.com/porter-dev/porter/internal/models"
+)
+
+// SystemServiceStatusRepository represents the set of queries on the SystemServiceStatus model
+type SystemServiceStatusRepository interface {
+	ReadSystemServiceStatus(ctx context.Context, id uuid.UUID) (models.SystemServiceStatus, error)
+}

+ 6 - 0
internal/repository/test/repository.go

@@ -51,6 +51,7 @@ type TestRepository struct {
 	awsAssumeRoleChainer      repository.AWSAssumeRoleChainer
 	porterApp                 repository.PorterAppRepository
 	porterAppEvent            repository.PorterAppEventRepository
+	systemServiceStatus       repository.SystemServiceStatusRepository
 	deploymentTarget          repository.DeploymentTargetRepository
 	appRevision               repository.AppRevisionRepository
 	appTemplate               repository.AppTemplateRepository
@@ -243,6 +244,10 @@ func (t *TestRepository) PorterAppEvent() repository.PorterAppEventRepository {
 	return t.porterAppEvent
 }
 
+func (t *TestRepository) SystemServiceStatus() repository.SystemServiceStatusRepository {
+	return t.systemServiceStatus
+}
+
 // DeploymentTarget returns a test DeploymentTargetRepository
 func (t *TestRepository) DeploymentTarget() repository.DeploymentTargetRepository {
 	return t.deploymentTarget
@@ -323,6 +328,7 @@ func NewRepository(canQuery bool, failingMethods ...string) repository.Repositor
 		awsAssumeRoleChainer:      NewAWSAssumeRoleChainer(),
 		porterApp:                 NewPorterAppRepository(canQuery, failingMethods...),
 		porterAppEvent:            NewPorterAppEventRepository(canQuery),
+		systemServiceStatus:       NewSystemServiceStatusRepository(canQuery),
 		deploymentTarget:          NewDeploymentTargetRepository(),
 		appRevision:               NewAppRevisionRepository(),
 		appTemplate:               NewAppTemplateRepository(),

+ 22 - 0
internal/repository/test/system_service_status.go

@@ -0,0 +1,22 @@
+package test
+
+import (
+	"context"
+	"errors"
+
+	"github.com/google/uuid"
+	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/repository"
+)
+
+type SystemServiceStatusRepository struct {
+	canQuery bool
+}
+
+func NewSystemServiceStatusRepository(canQuery bool, failingMethods ...string) repository.SystemServiceStatusRepository {
+	return &SystemServiceStatusRepository{canQuery: false}
+}
+
+func (repo *SystemServiceStatusRepository) ReadSystemServiceStatus(ctx context.Context, id uuid.UUID) (models.SystemServiceStatus, error) {
+	return models.SystemServiceStatus{}, errors.New("cannot read database")
+}