Feroze Mohideen 2 年之前
父節點
當前提交
a9de7fec4d

+ 48 - 8
api/server/handlers/porter_app/create_and_update_events.go

@@ -16,6 +16,8 @@ import (
 	"github.com/porter-dev/porter/api/server/shared/config"
 	"github.com/porter-dev/porter/api/server/shared/requestutils"
 	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/deployment_target"
+	"github.com/porter-dev/porter/internal/kubernetes"
 	"github.com/porter-dev/porter/internal/models"
 	"github.com/porter-dev/porter/internal/porter_app/notifications"
 	"github.com/porter-dev/porter/internal/telemetry"
@@ -33,6 +35,7 @@ func NewCreateUpdatePorterAppEventHandler(
 ) *CreateUpdatePorterAppEventHandler {
 	return &CreateUpdatePorterAppEventHandler{
 		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
 	}
 }
 
@@ -77,16 +80,18 @@ func (p *CreateUpdatePorterAppEventHandler) ServeHTTP(w http.ResponseWriter, r *
 
 	// This branch will only be hit for v2 app_event type events
 	if request.ID == "" && request.DeploymentTargetID != "" && request.Type == types.PorterAppEventType_AppEvent {
-		inp := notifications.HandleNotificationInput{
-			Context:             ctx,
-			RawAppEventMetadata: request.Metadata,
-			EventRepo:           p.Repo().PorterAppEvent(),
-			DeploymentTargetID:  request.DeploymentTargetID,
-		}
-		err := notifications.HandleNotification(inp)
+		agent, err := p.GetAgent(r, cluster, "")
 		if err != nil {
-			err = telemetry.Error(ctx, span, err, "error handling notification")
+			err := telemetry.Error(ctx, span, err, "error getting agent")
 			p.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+			return
+		}
+
+		err = p.handleNotification(ctx, request, project.ID, cluster.ID, agent)
+		if err != nil {
+			e := telemetry.Error(ctx, span, err, "error handling notification")
+			p.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(e, http.StatusInternalServerError))
+			return
 		}
 		return
 	}
@@ -608,3 +613,38 @@ func (p *CreateUpdatePorterAppEventHandler) updateDeployEventMatchingAppEventDet
 	_ = p.updateDeployEvent(ctx, porterAppName, porterAppId, deploymentTargetID, updateMetadataMap)
 	return nil
 }
+
+func (p *CreateUpdatePorterAppEventHandler) handleNotification(ctx context.Context,
+	request *types.CreateOrUpdatePorterAppEventRequest,
+	projectId, clusterId uint,
+	agent *kubernetes.Agent,
+) error {
+	ctx, span := telemetry.NewSpan(ctx, "handle-notification")
+	defer span.End()
+
+	// get the namespace associated with the deployment target id
+	deploymentTarget, err := deployment_target.DeploymentTargetDetails(ctx, deployment_target.DeploymentTargetDetailsInput{
+		ProjectID:          int64(projectId),
+		ClusterID:          int64(clusterId),
+		DeploymentTargetID: request.DeploymentTargetID,
+		CCPClient:          p.Config().ClusterControlPlaneClient,
+	})
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error getting deployment target details")
+	}
+
+	inp := notifications.HandleNotificationInput{
+		RawAppEventMetadata: request.Metadata,
+		EventRepo:           p.Repo().PorterAppEvent(),
+		DeploymentTargetID:  request.DeploymentTargetID,
+		Namespace:           deploymentTarget.Namespace,
+		K8sAgent:            agent,
+	}
+
+	err = notifications.HandleNotification(ctx, inp)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error handling notification")
+	}
+
+	return nil
+}

+ 2 - 0
api/types/porter_app.go

@@ -118,6 +118,8 @@ const (
 	PorterAppEventType_PreDeploy PorterAppEventType = "PRE_DEPLOY"
 	// PorterAppEventType_AppEvent represents a Porter Stack App Event which occurred whilst the application was running, such as an OutOfMemory (OOM) error
 	PorterAppEventType_AppEvent PorterAppEventType = "APP_EVENT"
+	// PorterAppEventType_Notification represents a translation of the porter agent app event into the new notification format, which details everything that occurs while the app is running
+	PorterAppEventType_Notification PorterAppEventType = "NOTIFICATION"
 )
 
 // PorterAppEventStatus is an alias for a string that represents a Porter Stack Event Status

+ 7 - 0
dashboard/src/main/home/app-dashboard/app-view/tabs/activity-feed/events/cards/EventCard.tsx

@@ -28,6 +28,7 @@ const EventCard: React.FC<Props> = ({ event, deploymentTargetId, isLatestDeployE
 
     return match(event)
       .with({ type: "APP_EVENT" }, () => "")
+      .with({ type: "NOTIFICATION" }, () => "")
       .with({ type: "BUILD" }, (event) =>
         event.metadata.commit_sha
           ? `https://www.github.com/${porterApp.repo_name}/commit/${event.metadata.commit_sha}`
@@ -53,6 +54,7 @@ const EventCard: React.FC<Props> = ({ event, deploymentTargetId, isLatestDeployE
 
     return match(event)
       .with({ type: "APP_EVENT" }, () => "")
+      .with({ type: "NOTIFICATION" }, () => "")
       .with({ type: "BUILD" }, (event) =>
         event.metadata.commit_sha ? event.metadata.commit_sha.slice(0, 7) : ""
       )
@@ -108,6 +110,11 @@ const EventCard: React.FC<Props> = ({ event, deploymentTargetId, isLatestDeployE
         displayCommitSha={displayCommitSha}
       />
     ))
+    .with({ type: "NOTIFICATION" }, (ev) => (
+      <StyledEventCard>
+        <div>{ev.metadata?.human_readable_summary}</div>
+      </StyledEventCard>
+    ))
     .exhaustive();
 };
 

+ 8 - 0
dashboard/src/main/home/app-dashboard/app-view/tabs/activity-feed/events/types.ts

@@ -76,6 +76,14 @@ export const porterAppEventValidator = z.discriminatedUnion("type", [
         porter_app_id: z.number(),
         metadata: porterAppAppEventMetadataValidator
     }),
+    z.object({
+        id: z.string(),
+        created_at: z.string(),
+        updated_at: z.string(),
+        type: z.literal("NOTIFICATION"),
+        porter_app_id: z.number(),
+        metadata: z.any(),
+    }),
 ]);
 
 export const getPorterAppEventsValidator = z.array(porterAppEventValidator).optional().default([]);

+ 14 - 0
internal/kubernetes/agent.go

@@ -994,6 +994,20 @@ func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
 	return res, nil
 }
 
+func (a *Agent) GetDeploymentsBySelector(namespace string, selector string) (*appsv1.DeploymentList, error) {
+	res, err := a.Clientset.AppsV1().Deployments(namespace).List(
+		context.TODO(),
+		metav1.ListOptions{
+			LabelSelector: selector,
+		},
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
 // GetStatefulSet gets the statefulset given the name and namespace
 func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
 	res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(

+ 187 - 12
internal/porter_app/notifications/app_event.go

@@ -4,13 +4,29 @@ import (
 	"context"
 	"encoding/json"
 	"strconv"
+	"time"
 
 	"github.com/google/uuid"
+	"github.com/porter-dev/porter/internal/models"
 	"github.com/porter-dev/porter/internal/repository"
 	"github.com/porter-dev/porter/internal/telemetry"
 )
 
-const appEventType = "APP_EVENT"
+const PorterAppEventType_Notification = "NOTIFICATION"
+
+// PorterAppEventStatus is an alias for a string that represents a Porter App Event Status
+type PorterAppEventStatus string
+
+const (
+	// PorterAppEventStatus_Success represents a Porter App Event that was successful
+	PorterAppEventStatus_Success PorterAppEventStatus = "SUCCESS"
+	// PorterAppEventStatus_Failed represents a Porter App Event that failed
+	PorterAppEventStatus_Failed PorterAppEventStatus = "FAILED"
+	// PorterAppEventStatus_Progressing represents a Porter App Event that is in progress
+	PorterAppEventStatus_Progressing PorterAppEventStatus = "PROGRESSING"
+	// PorterAppEventStatus_Canceled represents a Porter App Event that has been canceled
+	PorterAppEventStatus_Canceled PorterAppEventStatus = "CANCELED"
+)
 
 // AppEventMetadata is the metadata for an app event
 type AppEventMetadata struct {
@@ -36,8 +52,18 @@ type AppEventMetadata struct {
 	Detail string `json:"detail"`
 }
 
-// convertMetadata converts a map of interface{} to AppEventMetadata
-func convertMetadata(metadata map[string]interface{}) (*AppEventMetadata, error) {
+// ServiceDeploymentMetadata contains information about a service when it deploys, stored in the deploy event
+type ServiceDeploymentMetadata struct {
+	// Status is the status of the service deployment
+	Status PorterAppEventStatus `json:"status"`
+	// ExternalURI is the external URI of a service (if it is web)
+	ExternalURI string `json:"external_uri"`
+	// Type is the type of the service - one of web, worker, or job
+	Type string `json:"type"`
+}
+
+// parseAppEventMetadata parses raw app event metadata to a AppEventMetadata struct
+func parseAppEventMetadata(metadata map[string]interface{}) (*AppEventMetadata, error) {
 	appEventMetadata := &AppEventMetadata{}
 
 	bytes, err := json.Marshal(metadata)
@@ -52,10 +78,10 @@ func convertMetadata(metadata map[string]interface{}) (*AppEventMetadata, error)
 	return appEventMetadata, nil
 }
 
-// checkIsAppEventDuplicate checks if an app event is a duplicate by seeing if another app event exists in the db with the same agent event id
-func checkIsAppEventDuplicate(
+// isNotificationDuplicate checks if another app event exists in the db with the same agent event id
+func isNotificationDuplicate(
 	ctx context.Context,
-	appEventMetadata AppEventMetadata,
+	notification Notification,
 	eventRepo repository.PorterAppEventRepository,
 	deploymentTargetID string,
 ) (bool, error) {
@@ -70,7 +96,7 @@ func checkIsAppEventDuplicate(
 		return false, telemetry.Error(ctx, span, nil, "deployment target id cannot be nil")
 	}
 
-	appIdInt, err := strconv.Atoi(appEventMetadata.AppID)
+	appIdInt, err := strconv.Atoi(notification.AppID)
 	if err != nil {
 		return false, telemetry.Error(ctx, span, err, "error converting app id to int")
 	}
@@ -80,15 +106,20 @@ func checkIsAppEventDuplicate(
 	}
 
 	for _, existingEvent := range existingEvents {
-		if existingEvent != nil && existingEvent.Type == appEventType {
-			convertedEventMetadata, err := convertMetadata(existingEvent.Metadata)
-			if err != nil || convertedEventMetadata == nil {
+		if existingEvent != nil && existingEvent.Type == PorterAppEventType_Notification {
+			existingNotification := &Notification{}
+			bytes, err := json.Marshal(existingEvent.Metadata)
+			if err != nil {
+				continue
+			}
+			err = json.Unmarshal(bytes, existingNotification)
+			if err != nil || existingNotification == nil {
 				continue
 			}
-			if convertedEventMetadata.AgentEventID == 0 {
+			if existingNotification.AgentEventID == 0 {
 				continue
 			}
-			if convertedEventMetadata.AgentEventID == appEventMetadata.AgentEventID {
+			if existingNotification.AgentEventID == notification.AgentEventID {
 				return true, nil
 			}
 		}
@@ -96,3 +127,147 @@ func checkIsAppEventDuplicate(
 
 	return false, nil
 }
+
+type updateDeployEventInput struct {
+	Notification
+	EventRepo repository.PorterAppEventRepository
+	Status    PorterAppEventStatus
+}
+
+func updateDeployEvent(ctx context.Context, inp updateDeployEventInput) error {
+	ctx, span := telemetry.NewSpan(ctx, "update-matching-deploy-event")
+	defer span.End()
+
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "matching-k8s-deployment-status", Value: inp.Deployment.Status})
+
+	appID, err := strconv.Atoi(inp.Notification.AppID)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error converting app id to int")
+	}
+
+	matchEvent, err := inp.EventRepo.ReadDeployEventByAppRevisionID(ctx, uint(appID), inp.Notification.AppRevisionID)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error finding matching deploy event")
+	}
+	if matchEvent.ID == uuid.Nil {
+		return telemetry.Error(ctx, span, nil, "no matching deploy event found")
+	}
+	if matchEvent.Status != string(PorterAppEventStatus_Progressing) {
+		return nil // nothing to update here
+	}
+
+	serviceStatus, ok := matchEvent.Metadata["service_deployment_metadata"]
+	if !ok {
+		return telemetry.Error(ctx, span, nil, "service deployment metadata not found in deploy event metadata")
+	}
+	serviceDeploymentGenericMap, ok := serviceStatus.(map[string]interface{})
+	if !ok {
+		return telemetry.Error(ctx, span, nil, "service deployment metadata is not correct type")
+	}
+	serviceDeploymentMap := make(map[string]ServiceDeploymentMetadata)
+	for k, v := range serviceDeploymentGenericMap {
+		by, err := json.Marshal(v)
+		if err != nil {
+			return telemetry.Error(ctx, span, nil, "unable to marshal service deployment metadata")
+		}
+
+		var serviceDeploymentMetadata ServiceDeploymentMetadata
+		err = json.Unmarshal(by, &serviceDeploymentMetadata)
+		if err != nil {
+			return telemetry.Error(ctx, span, nil, "unable to unmarshal service deployment metadata")
+		}
+		serviceDeploymentMap[k] = serviceDeploymentMetadata
+	}
+	serviceDeploymentMetadata, ok := serviceDeploymentMap[inp.Notification.ServiceName]
+	if !ok {
+		return telemetry.Error(ctx, span, nil, "deployment metadata not found for service")
+	}
+
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "existing-status", Value: string(serviceDeploymentMetadata.Status)})
+
+	if serviceDeploymentMetadata.Status != PorterAppEventStatus_Progressing {
+		return nil // nothing to update here
+	}
+	// update the map with the new status
+	serviceDeploymentMetadata.Status = inp.Status
+	serviceDeploymentMap[inp.Notification.ServiceName] = serviceDeploymentMetadata
+
+	// update the deploy event with new map and status if all services are done
+	matchEvent.Metadata["service_deployment_metadata"] = serviceDeploymentMap
+	allServicesDone := true
+	anyServicesFailed := false
+	for _, deploymentMetadata := range serviceDeploymentMap {
+		if deploymentMetadata.Status == PorterAppEventStatus_Progressing {
+			allServicesDone = false
+			break
+		}
+		if deploymentMetadata.Status == PorterAppEventStatus_Failed {
+			anyServicesFailed = true
+		}
+	}
+	if allServicesDone {
+		matchEvent.Metadata["end_time"] = time.Now().UTC()
+		if anyServicesFailed {
+			matchEvent.Status = string(PorterAppEventStatus_Failed)
+		} else {
+			matchEvent.Status = string(PorterAppEventStatus_Success)
+		}
+	}
+
+	err = inp.EventRepo.UpdateEvent(ctx, &matchEvent)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error updating deploy event")
+	}
+
+	return nil
+}
+
+func saveNotification(ctx context.Context, notification Notification, eventRepo repository.PorterAppEventRepository, deploymentTargetID string) error {
+	ctx, span := telemetry.NewSpan(ctx, "save-notification")
+	defer span.End()
+
+	telemetry.WithAttributes(span,
+		telemetry.AttributeKV{Key: "app-id", Value: notification.AppID},
+		telemetry.AttributeKV{Key: "app-name", Value: notification.AppName},
+		telemetry.AttributeKV{Key: "app-revision-id", Value: notification.AppRevisionID},
+		telemetry.AttributeKV{Key: "agent-event-id", Value: notification.AgentEventID},
+		telemetry.AttributeKV{Key: "service-name", Value: notification.ServiceName},
+		telemetry.AttributeKV{Key: "deployment-target-id", Value: deploymentTargetID},
+	)
+
+	appID, err := strconv.Atoi(notification.AppID)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error converting app id to int")
+	}
+
+	deploymentTargetUUID, err := uuid.Parse(deploymentTargetID)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error parsing deployment target id")
+	}
+	if deploymentTargetUUID == uuid.Nil {
+		return telemetry.Error(ctx, span, err, "deployment target id cannot be nil")
+	}
+
+	notificationMap := make(map[string]any)
+	bytes, err := json.Marshal(notification)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error marshaling notification")
+	}
+	err = json.Unmarshal(bytes, &notificationMap)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error unmarshaling notification")
+	}
+
+	err = eventRepo.CreateEvent(ctx, &models.PorterAppEvent{
+		ID:                 uuid.New(),
+		Type:               string(PorterAppEventType_Notification),
+		PorterAppID:        uint(appID),
+		DeploymentTargetID: deploymentTargetUUID,
+		Metadata:           notificationMap,
+	})
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "error creating porter app event")
+	}
+
+	return nil
+}

+ 150 - 0
internal/porter_app/notifications/deployment.go

@@ -0,0 +1,150 @@
+package notifications
+
+import (
+	"context"
+	"fmt"
+	"regexp"
+	"strings"
+
+	"github.com/porter-dev/porter/internal/kubernetes"
+	"github.com/porter-dev/porter/internal/telemetry"
+	v1 "k8s.io/api/apps/v1"
+)
+
+type Deployment struct {
+	Status DeploymentStatus `json:"status"`
+}
+
+type DeploymentStatus string
+
+const (
+	DeploymentStatus_Unknown DeploymentStatus = "UNKNOWN"
+	DeploymentStatus_Pending DeploymentStatus = "PENDING"
+	DeploymentStatus_Success DeploymentStatus = "SUCCESS"
+	DeploymentStatus_Failure DeploymentStatus = "FAILURE"
+)
+
+type hydrateNotificationInput struct {
+	Notification
+	DeploymentTargetId string
+	Namespace          string
+	K8sAgent           *kubernetes.Agent
+}
+
+func hydrateNotification(ctx context.Context, inp hydrateNotificationInput) (Notification, error) {
+	ctx, span := telemetry.NewSpan(ctx, "hydrate-notification")
+	defer span.End()
+
+	hydratedNotification := inp.Notification
+
+	if inp.Notification.Deployment.Status != DeploymentStatus_Unknown {
+		return hydratedNotification, nil
+	}
+
+	if inp.K8sAgent == nil {
+		err := telemetry.Error(ctx, span, nil, "k8s agent is nil")
+		return hydratedNotification, err
+	}
+
+	telemetry.WithAttributes(span,
+		telemetry.AttributeKV{Key: "deployment-target-id", Value: inp.DeploymentTargetId},
+		telemetry.AttributeKV{Key: "namespace", Value: inp.Namespace},
+		telemetry.AttributeKV{Key: "app-name", Value: inp.AppName},
+		telemetry.AttributeKV{Key: "app-revision-id", Value: inp.Notification.AppRevisionID},
+		telemetry.AttributeKV{Key: "service-name", Value: inp.ServiceName},
+	)
+
+	selectors := []string{
+		fmt.Sprintf("porter.run/deployment-target-id=%s", inp.DeploymentTargetId),
+		fmt.Sprintf("porter.run/app-name=%s", inp.AppName),
+		fmt.Sprintf("porter.run/app-revision-id=%s", inp.Notification.AppRevisionID),
+		fmt.Sprintf("porter.run/service-name=%s", inp.ServiceName),
+	}
+	depls, err := inp.K8sAgent.GetDeploymentsBySelector(inp.Namespace, strings.Join(selectors, ","))
+	if err != nil {
+		err := telemetry.Error(ctx, span, err, "failed to get deployments for notification")
+		return hydratedNotification, err
+	}
+	if len(depls.Items) == 0 {
+		err := telemetry.Error(ctx, span, nil, "no deployments found for notification")
+		return hydratedNotification, err
+	}
+	if len(depls.Items) > 1 {
+		err := telemetry.Error(ctx, span, nil, "multiple deployments found for notification")
+		return hydratedNotification, err
+	}
+
+	matchingDeployment := depls.Items[0]
+	telemetry.WithAttributes(span,
+		telemetry.AttributeKV{Key: "deployment-name", Value: matchingDeployment.Name},
+		telemetry.AttributeKV{Key: "deployment-uid", Value: matchingDeployment.ObjectMeta.UID},
+		telemetry.AttributeKV{Key: "deployment-creation-timestamp", Value: matchingDeployment.ObjectMeta.CreationTimestamp},
+	)
+	status := deploymentStatus(matchingDeployment)
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "deployment-status", Value: status})
+	if status == DeploymentStatus_Unknown {
+		err := telemetry.Error(ctx, span, nil, "unable to determine status of deployment")
+		return hydratedNotification, err
+	}
+
+	hydratedNotification.Deployment = Deployment{
+		Status: status,
+	}
+
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "agent-summary", Value: hydratedNotification.AgentSummary})
+	hydratedNotification.HumanReadableSummary = translateAgentSummary(hydratedNotification, status)
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "human-readable-summary", Value: hydratedNotification.HumanReadableSummary})
+
+	return hydratedNotification, nil
+}
+
+func deploymentStatus(depl v1.Deployment) DeploymentStatus {
+	deploymentStatus := DeploymentStatus_Unknown
+
+	for _, condition := range depl.Status.Conditions {
+		if condition.Type == "Progressing" {
+			if condition.Status == "True" && condition.Reason == "NewReplicaSetAvailable" {
+				deploymentStatus = DeploymentStatus_Success
+				break
+			} else if condition.Status == "False" && condition.Reason == "ProgressDeadlineExceeded" {
+				deploymentStatus = DeploymentStatus_Failure
+				break
+			} else {
+				deploymentStatus = DeploymentStatus_Pending
+			}
+		}
+	}
+
+	return deploymentStatus
+}
+
+var fatalDeploymentDetailSubstrings = []string{
+	"stuck in a restart loop",
+}
+
+func detailIndicatesDeploymentFailure(detail string) bool {
+	// if any of the fatal deployment detail substrings are found in the detail, then the deployment will fail
+	for _, fatalSubstring := range fatalDeploymentDetailSubstrings {
+		if strings.Contains(detail, fatalSubstring) {
+			return true
+		}
+	}
+	return false
+}
+
+func translateAgentSummary(notification Notification, status DeploymentStatus) string {
+	humanReadableSummary := notification.AgentSummary
+	pattern := `application \w+ in namespace \w+`
+	regex := regexp.MustCompile(pattern)
+	if regex.MatchString(humanReadableSummary) {
+		fmt.Printf("matched regex\n")
+		humanReadableSummary = regex.ReplaceAllString(humanReadableSummary, fmt.Sprintf("service %s", notification.ServiceName))
+	}
+	humanReadableSummary = strings.ReplaceAll(humanReadableSummary, "application", "service")
+	if status == DeploymentStatus_Pending {
+		humanReadableSummary = strings.ReplaceAll(humanReadableSummary, "has crashed", "is failing to deploy")
+		humanReadableSummary = strings.ReplaceAll(humanReadableSummary, "crashed", "is failing to deploy")
+		humanReadableSummary = strings.ReplaceAll(humanReadableSummary, "is currently experiencing downtime", "is failing to deploy")
+	}
+	return humanReadableSummary
+}

+ 74 - 15
internal/porter_app/notifications/notification.go

@@ -2,25 +2,28 @@ package notifications
 
 import (
 	"context"
+	"strings"
 
+	"github.com/porter-dev/porter/internal/kubernetes"
 	"github.com/porter-dev/porter/internal/repository"
 	"github.com/porter-dev/porter/internal/telemetry"
 )
 
 type HandleNotificationInput struct {
-	Context             context.Context
 	RawAppEventMetadata map[string]any
 	EventRepo           repository.PorterAppEventRepository
 	DeploymentTargetID  string
+	Namespace           string
+	K8sAgent            *kubernetes.Agent
 }
 
 // HandleNotification handles the logic for processing app events (which are currently sent by the porter agent)
-func HandleNotification(inp HandleNotificationInput) error {
-	ctx, span := telemetry.NewSpan(inp.Context, "handle-notification")
+func HandleNotification(ctx context.Context, inp HandleNotificationInput) error {
+	ctx, span := telemetry.NewSpan(ctx, "handle-notification")
 	defer span.End()
 
-	// 1. unmarshal app event
-	appEventMetadata, err := convertMetadata(inp.RawAppEventMetadata)
+	// 1. parse app event
+	appEventMetadata, err := parseAppEventMetadata(inp.RawAppEventMetadata)
 	if err != nil {
 		return telemetry.Error(ctx, span, err, "failed to unmarshal app event metadata")
 	}
@@ -28,8 +31,14 @@ func HandleNotification(inp HandleNotificationInput) error {
 		return telemetry.Error(ctx, span, nil, "app event metadata is nil")
 	}
 
-	// 2. dedupe app event
-	isDuplicate, err := checkIsAppEventDuplicate(ctx, *appEventMetadata, inp.EventRepo, inp.DeploymentTargetID)
+	// 2. convert app event to notification
+	notification, err := appEventToNotification(*appEventMetadata)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "failed to convert app event to notification")
+	}
+
+	// 3. dedupe notification
+	isDuplicate, err := isNotificationDuplicate(ctx, notification, inp.EventRepo, inp.DeploymentTargetID)
 	if err != nil {
 		return telemetry.Error(ctx, span, err, "failed to check if app event is duplicate")
 	}
@@ -38,20 +47,70 @@ func HandleNotification(inp HandleNotificationInput) error {
 		return nil
 	}
 
-	// 3. convert app event to notification
-	_, err = appEventToNotification(*appEventMetadata)
+	// 4. hydrate notification with k8s deployment info
+	hydratedNotification, err := hydrateNotification(ctx, hydrateNotificationInput{
+		Notification:       notification,
+		DeploymentTargetId: inp.DeploymentTargetID,
+		Namespace:          inp.Namespace,
+		K8sAgent:           inp.K8sAgent,
+	})
 	if err != nil {
-		return telemetry.Error(ctx, span, err, "failed to convert app event to notification")
+		return telemetry.Error(ctx, span, err, "failed to hydrate notification")
+	}
+
+	// 5. based on notification + k8s deployment, update the status of the matching deploy event
+	if hydratedNotification.Deployment.Status == DeploymentStatus_Failure ||
+		(hydratedNotification.Deployment.Status == DeploymentStatus_Pending &&
+			detailIndicatesDeploymentFailure(hydratedNotification.AgentDetail)) {
+		err = updateDeployEvent(ctx, updateDeployEventInput{
+			Notification: hydratedNotification,
+			EventRepo:    inp.EventRepo,
+			Status:       PorterAppEventStatus_Failed,
+		})
+		if err != nil {
+			return telemetry.Error(ctx, span, err, "failed to update deploy event matching notification")
+		}
+	}
+
+	// 6. save notification to db
+	// TODO: save the notification in its own table rather than co-opting the porter app events table
+	err = saveNotification(ctx, hydratedNotification, inp.EventRepo, inp.DeploymentTargetID)
+	if err != nil {
+		return telemetry.Error(ctx, span, err, "failed to save notification")
 	}
 
-	// 4. based on notification, change the status of the deploy event
-	// 5. send notification
 	return nil
 }
 
-type Notification struct{}
+type Notification struct {
+	AppID                string     `json:"app_id"`
+	AppName              string     `json:"app_name"`
+	ServiceName          string     `json:"service_name"`
+	AppRevisionID        string     `json:"app_revision_id"`
+	AgentEventID         int        `json:"agent_event_id"`
+	AgentDetail          string     `json:"agent_detail"`
+	AgentShortSummary    string     `json:"agent_short_summary"`
+	AgentSummary         string     `json:"agent_summary"`
+	HumanReadableDetail  string     `json:"human_readable_detail"`
+	HumanReadableSummary string     `json:"human_readable_summary"`
+	Deployment           Deployment `json:"deployment"`
+}
 
 // appEventToNotification converts an app event to a notification
-func appEventToNotification(appEventMetadata AppEventMetadata) (*Notification, error) {
-	return nil, nil
+func appEventToNotification(appEventMetadata AppEventMetadata) (Notification, error) {
+	humanReadableDetail := appEventMetadata.Detail
+	humanReadableDetail = strings.ReplaceAll(humanReadableDetail, "application", "service")
+
+	notification := Notification{
+		AppID:               appEventMetadata.AppID,
+		AppName:             appEventMetadata.AppName,
+		ServiceName:         appEventMetadata.ServiceName,
+		AgentEventID:        appEventMetadata.AgentEventID,
+		AgentDetail:         appEventMetadata.Detail,
+		AgentSummary:        appEventMetadata.Summary,
+		AppRevisionID:       appEventMetadata.AppRevisionID,
+		Deployment:          Deployment{Status: DeploymentStatus_Unknown},
+		HumanReadableDetail: humanReadableDetail,
+	}
+	return notification, nil
 }