Przeglądaj źródła

refactor events to group via subevents; implement logs endpoints

Alexander Belanger 4 lat temu
rodzic
commit
31764eaf6f

+ 35 - 169
api/server/handlers/kube_events/create.go

@@ -2,7 +2,7 @@ package kube_events
 
 import (
 	"net/http"
-	"strings"
+	"time"
 
 	"github.com/porter-dev/porter/api/server/authz"
 	"github.com/porter-dev/porter/api/server/handlers"
@@ -39,20 +39,40 @@ func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
 		return
 	}
 
-	// handle write to the database
-	_, err := c.Repo().KubeEvent().CreateEvent(&models.KubeEvent{
-		ProjectID:    proj.ID,
-		ClusterID:    cluster.ID,
-		ResourceType: request.ResourceType,
-		Name:         request.Name,
-		OwnerType:    request.OwnerType,
-		OwnerName:    request.OwnerName,
-		EventType:    request.EventType,
-		Namespace:    request.Namespace,
-		Message:      request.Message,
-		Reason:       request.Reason,
-		Timestamp:    request.Timestamp,
-		Data:         []byte(strings.Join(request.Data, "\n")),
+	// Look for an event matching by the name, namespace, and was last updated within the
+	// grouping threshold time. If so, we append a subevent to the existing event.
+	kubeEvent, err := c.Repo().KubeEvent().ReadEventByGroup(proj.ID, cluster.ID, &types.GroupOptions{
+		Name:          request.Name,
+		Namespace:     request.Namespace,
+		ResourceType:  request.ResourceType,
+		ThresholdTime: time.Now().Add(-1 * time.Minute),
+	})
+
+	foundMatchedEvent := kubeEvent != nil
+
+	if !foundMatchedEvent {
+		kubeEvent, err = c.Repo().KubeEvent().CreateEvent(&models.KubeEvent{
+			ProjectID:    proj.ID,
+			ClusterID:    cluster.ID,
+			ResourceType: request.ResourceType,
+			Name:         request.Name,
+			OwnerType:    request.OwnerType,
+			OwnerName:    request.OwnerName,
+			Namespace:    request.Namespace,
+		})
+
+		if err != nil {
+			c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+			return
+		}
+	}
+
+	// append the subevent to the event
+	err = c.Repo().KubeEvent().AppendSubEvent(kubeEvent, &models.KubeSubEvent{
+		EventType: request.EventType,
+		Message:   request.Message,
+		Reason:    request.Reason,
+		Timestamp: request.Timestamp,
 	})
 
 	if err != nil {
@@ -62,157 +82,3 @@ func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
 
 	w.WriteHeader(http.StatusCreated)
 }
-
-// func (app *App) HandleCreateEvent(w http.ResponseWriter, r *http.Request) {
-// 	projID, err := strconv.ParseUint(chi.URLParam(r, "project_id"), 0, 64)
-
-// 	if err != nil || projID == 0 {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	clusterID, err := strconv.ParseUint(chi.URLParam(r, "cluster_id"), 0, 64)
-
-// 	if err != nil {
-// 		app.sendExternalError(err, http.StatusInternalServerError, HTTPError{
-// 			Code:   ErrReleaseReadData,
-// 			Errors: []string{"cluster not found"},
-// 		}, w)
-// 	}
-
-// 	form := &forms.CreateEventForm{}
-
-// 	// decode from JSON to form value
-// 	if err := json.NewDecoder(r.Body).Decode(form); err != nil {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	// validate the form
-// 	if err := app.validator.Struct(form); err != nil {
-// 		app.handleErrorFormValidation(err, ErrProjectValidateFields, w)
-// 		return
-// 	}
-
-// 	// convert the form to an invite
-// 	event := form.ToEvent(uint(projID), uint(clusterID))
-
-// 	if err != nil {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	// handle write to the database
-// 	event, err = app.Repo.Event.CreateEvent(event)
-
-// 	if err != nil {
-// 		app.handleErrorDataWrite(err, w)
-// 		return
-// 	}
-
-// 	w.WriteHeader(http.StatusCreated)
-// }
-
-// // HandleListEvents lists the events that match certain conditions in a project
-// func (app *App) HandleListEvents(w http.ResponseWriter, r *http.Request) {
-// 	projID, err := strconv.ParseUint(chi.URLParam(r, "project_id"), 0, 64)
-
-// 	if err != nil || projID == 0 {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	clusterID, err := strconv.ParseUint(chi.URLParam(r, "cluster_id"), 0, 64)
-
-// 	if err != nil {
-// 		app.sendExternalError(err, http.StatusInternalServerError, HTTPError{
-// 			Code:   ErrReleaseReadData,
-// 			Errors: []string{"cluster not found"},
-// 		}, w)
-// 	}
-
-// 	vals, err := url.ParseQuery(r.URL.RawQuery)
-
-// 	opts := &repository.ListEventOpts{
-// 		ClusterID: uint(clusterID),
-// 	}
-
-// 	decoder := schema.NewDecoder()
-
-// 	decoder.IgnoreUnknownKeys(true)
-
-// 	if err := decoder.Decode(opts, vals); err != nil {
-// 		app.sendExternalError(err, http.StatusInternalServerError, HTTPError{
-// 			Code:   ErrReleaseReadData,
-// 			Errors: []string{"bad request"},
-// 		}, w)
-// 	}
-
-// 	// handle write to the database
-// 	events, err := app.Repo.Event.ListEventsByProjectID(uint(projID), opts)
-
-// 	if err != nil {
-// 		app.handleErrorDataWrite(err, w)
-// 		return
-// 	}
-
-// 	eventExts := make([]*models.EventExternalSimple, 0)
-
-// 	for _, event := range events {
-// 		eventExts = append(eventExts, event.ExternalizeSimple())
-// 	}
-
-// 	w.WriteHeader(http.StatusOK)
-
-// 	if err := json.NewEncoder(w).Encode(eventExts); err != nil {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-// }
-
-// // HandleListEvents lists the events that match certain conditions in a project
-// func (app *App) HandleGetEvent(w http.ResponseWriter, r *http.Request) {
-// 	projID, err := strconv.ParseUint(chi.URLParam(r, "project_id"), 0, 64)
-
-// 	if err != nil || projID == 0 {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	clusterID, err := strconv.ParseUint(chi.URLParam(r, "cluster_id"), 0, 64)
-
-// 	if err != nil {
-// 		app.sendExternalError(err, http.StatusInternalServerError, HTTPError{
-// 			Code:   ErrReleaseReadData,
-// 			Errors: []string{"cluster not found"},
-// 		}, w)
-// 	}
-
-// 	eventID, err := strconv.ParseUint(chi.URLParam(r, "event_id"), 0, 64)
-
-// 	if err != nil || projID == 0 {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	event, err := app.Repo.Event.ReadEvent(uint(eventID), uint(projID), uint(clusterID))
-
-// 	if err != nil {
-// 		if err == gorm.ErrRecordNotFound {
-// 			http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
-// 			return
-// 		}
-
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-
-// 	eventExt := event.Externalize()
-
-// 	w.WriteHeader(http.StatusOK)
-
-// 	if err := json.NewEncoder(w).Encode(eventExt); err != nil {
-// 		app.handleErrorFormDecoding(err, ErrProjectDecode, w)
-// 		return
-// 	}
-// }

+ 86 - 0
api/server/handlers/kube_events/get_logs.go

@@ -0,0 +1,86 @@
+package kube_events
+
+import (
+	"fmt"
+	"net/http"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/requestutils"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/kubernetes/porter_agent"
+	"github.com/porter-dev/porter/internal/models"
+)
+
+type GetKubeEventLogsHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+func NewGetKubeEventLogsHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *GetKubeEventLogsHandler {
+	return &GetKubeEventLogsHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (c *GetKubeEventLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
+	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+	kubeEventID, _ := requestutils.GetURLParamUint(r, types.URLParamKubeEventID)
+
+	kubeEvent, err := c.Repo().KubeEvent().ReadEvent(kubeEventID, proj.ID, cluster.ID)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	// if the kube event is not a pod type, throw a bad request error to the user
+	if kubeEvent.ResourceType != "pod" {
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(
+			fmt.Errorf("event resource type must be pod to get logs"),
+			http.StatusBadRequest,
+		))
+
+		return
+	}
+
+	req := &types.GetKubeEventLogsRequest{}
+
+	if ok := c.DecodeAndValidate(w, r, req); !ok {
+		return
+	}
+
+	agent, err := c.GetAgent(r, cluster, "")
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	// get agent service
+	agentSvc, err := porter_agent.GetAgentService(agent.Clientset)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	resp, err := porter_agent.QueryPorterAgent(agent.Clientset, agentSvc, &porter_agent.PathOpts{
+		Timestamp: req.Timestamp,
+		Pod:       kubeEvent.Name,
+		Namespace: kubeEvent.Namespace,
+	})
+
+	c.WriteResult(w, r, &types.GetKubeEventLogsResponse{
+		Logs: resp.Logs,
+	})
+}

+ 3 - 4
api/server/handlers/kube_events/list.go

@@ -38,18 +38,17 @@ func (c *ListKubeEventsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
 		return
 	}
 
-	// handle write to the database
-	kubeEvents, err := c.Repo().KubeEvent().ListEventsByProjectID(proj.ID, cluster.ID, request, false)
+	kubeEvents, err := c.Repo().KubeEvent().ListEventsByProjectID(proj.ID, cluster.ID, request)
 
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
 		return
 	}
 
-	resp := make([]*types.KubeEventBasic, 0)
+	resp := make([]*types.KubeEvent, 0)
 
 	for _, kubeEvent := range kubeEvents {
-		resp = append(resp, kubeEvent.ToKubeEventBasicType())
+		resp = append(resp, kubeEvent.ToKubeEventType())
 	}
 
 	c.WriteResult(w, r, resp)

+ 29 - 0
api/server/router/cluster.go

@@ -564,6 +564,35 @@ func getClusterRoutes(
 		Router:   r,
 	})
 
+	// GET /api/projects/{project_id}/clusters/{cluster_id}/kube_events/{kube_event_id}/logs -> kube_events.NewGetKubeEventLogsHandler
+	getKubeEventLogsEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbGet,
+			Method: types.HTTPVerbGet,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: fmt.Sprintf("%s/kube_events/%s/logs", relPath, types.URLParamKubeEventID),
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+			},
+		},
+	)
+
+	getKubeEventLogsHandler := kube_events.NewGetKubeEventLogsHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &Route{
+		Endpoint: getKubeEventLogsEndpoint,
+		Handler:  getKubeEventLogsHandler,
+		Router:   r,
+	})
+
 	// POST /api/projects/{project_id}/clusters/{cluster_id}/kube_events -> kube_events.NewCreateKubeEventHandler
 	createKubeEventsEndpoint := factory.NewAPIEndpoint(
 		&types.APIRequestMetadata{

+ 48 - 27
api/types/kube_events.go

@@ -6,46 +6,59 @@ const (
 	URLParamKubeEventID = "kube_event_id"
 )
 
+type KubeEventType string
+
+const (
+	KubeEventTypeCritical KubeEventType = "critical"
+	KubeEventTypeNormal   KubeEventType = "normal"
+)
+
+type GroupOptions struct {
+	ResourceType  string
+	Name          string
+	Namespace     string
+	ThresholdTime time.Time
+}
+
 // CreateKubeEventRequest is the type for creating a new kube event
 type CreateKubeEventRequest struct {
-	ResourceType string    `json:"resource_type" form:"required"`
-	Name         string    `json:"name" form:"required"`
-	OwnerType    string    `json:"owner_type"`
-	OwnerName    string    `json:"owner_name"`
-	EventType    string    `json:"event_type" form:"required"`
-	Namespace    string    `json:"namespace"`
-	Message      string    `json:"message" form:"required"`
-	Reason       string    `json:"reason"`
-	Timestamp    time.Time `json:"timestamp" form:"required"`
-	Data         []string  `json:"data"`
+	ResourceType string        `json:"resource_type" form:"required"`
+	Name         string        `json:"name" form:"required"`
+	OwnerType    string        `json:"owner_type"`
+	OwnerName    string        `json:"owner_name"`
+	EventType    KubeEventType `json:"event_type" form:"required"`
+	Namespace    string        `json:"namespace"`
+	Message      string        `json:"message" form:"required"`
+	Reason       string        `json:"reason"`
+	Timestamp    time.Time     `json:"timestamp" form:"required"`
 }
 
-type KubeEventBasic struct {
+type KubeEvent struct {
+	UpdatedAt time.Time `json:"updated_at"`
+
 	ID        uint `json:"id"`
 	ProjectID uint `json:"project_id"`
 	ClusterID uint `json:"cluster_id"`
 
-	ResourceType string    `json:"resource_type"`
-	Name         string    `json:"name"`
-	OwnerType    string    `json:"owner_type"`
-	OwnerName    string    `json:"owner_name"`
-	EventType    string    `json:"event_type"`
-	Namespace    string    `json:"namespace"`
-	Message      string    `json:"message"`
-	Reason       string    `json:"reason"`
-	Timestamp    time.Time `json:"timestamp"`
-}
+	ResourceType string `json:"resource_type"`
+	Name         string `json:"name"`
+	OwnerType    string `json:"owner_type"`
+	OwnerName    string `json:"owner_name"`
+	Namespace    string `json:"namespace"`
 
-type KubeEvent struct {
-	*KubeEventBasic
+	SubEvents []*KubeSubEvent `json:"sub_events"`
+}
 
-	Data []byte `json:"data"`
+type KubeSubEvent struct {
+	EventType KubeEventType `json:"event_type"`
+	Message   string        `json:"message"`
+	Reason    string        `json:"reason"`
+	Timestamp time.Time     `json:"timestamp"`
 }
 
 type ListKubeEventRequest struct {
-	Limit int    `schema:"limit"`
-	Skip  int    `schema:"skip"`
-	Type  string `schema:"type"`
+	Limit int `schema:"limit"`
+	Skip  int `schema:"skip"`
 
 	// can only be "timestamp" for now
 	SortBy string `schema:"sort_by"`
@@ -55,3 +68,11 @@ type ListKubeEventRequest struct {
 
 	ResourceType string `schema:"resource_type"`
 }
+
+type GetKubeEventLogsRequest struct {
+	Timestamp int `schema:"timestamp"`
+}
+
+type GetKubeEventLogsResponse struct {
+	Logs []string `json:"logs"`
+}

+ 75 - 0
internal/kubernetes/porter_agent/logs.go

@@ -0,0 +1,75 @@
+package porter_agent
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/kubernetes"
+
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// returns the agent service
+func GetAgentService(clientset kubernetes.Interface) (*v1.Service, error) {
+	return clientset.CoreV1().Services("porter-agent-system").Get(
+		context.TODO(),
+		"porter-agent-controller-manager",
+		metav1.GetOptions{},
+	)
+}
+
+type SimpleIngress struct {
+	Name      string `json:"name"`
+	Namespace string `json:"namespace"`
+}
+
+type PathOpts struct {
+	Timestamp int
+	Pod       string
+	Namespace string
+}
+
+func QueryPorterAgent(
+	clientset kubernetes.Interface,
+	service *v1.Service,
+	opts *PathOpts,
+) (*AgentResp, error) {
+	if len(service.Spec.Ports) == 0 {
+		return nil, fmt.Errorf("agent service has no exposed ports to query")
+	}
+
+	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
+		"http",
+		service.Name,
+		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
+		fmt.Sprintf("/pod/%s/ns/%s/logbucket/%d", opts.Pod, opts.Namespace, opts.Timestamp),
+		nil,
+	)
+
+	rawQuery, err := resp.DoRaw(context.TODO())
+
+	if err != nil {
+		return nil, err
+	}
+
+	return parseQuery(rawQuery)
+}
+
+type AgentResp struct {
+	Logs          []string `json:"logs"`
+	MatchedBucket int      `json:"matchedBucket"`
+}
+
+func parseQuery(rawQuery []byte) (*AgentResp, error) {
+	resp := &AgentResp{}
+
+	err := json.Unmarshal(rawQuery, resp)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return resp, nil
+}

+ 44 - 20
internal/models/kube_events.go

@@ -14,28 +14,55 @@ type KubeEvent struct {
 	ProjectID uint
 	ClusterID uint
 
+	// The name of the referenced kube object
+	Name string
+
+	// The kube resource type, such as "pod", "hpa", or "node"
 	ResourceType string
-	Name         string
-	OwnerType    string
-	OwnerName    string
-	EventType    string
-	Namespace    string
-	Message      string
-	Reason       string
-	Timestamp    time.Time
-	Data         []byte
+
+	// (optional) The owner reference type and name, which can be used to filter events by
+	// controller
+	OwnerType string
+	OwnerName string
+
+	// (optional) the namespace of the event, if namespaceable
+	Namespace string
+
+	// The "subevents" attached to the event. These are a grouped collection of events that belong
+	// to the same object.
+	SubEvents []KubeSubEvent
 }
 
-// ToKubeEventType generates an external KubeEvent to be shared over REST
-func (k *KubeEvent) ToKubeEventType() *types.KubeEvent {
-	return &types.KubeEvent{
-		KubeEventBasic: k.ToKubeEventBasicType(),
-		Data:           k.Data,
+type KubeSubEvent struct {
+	gorm.Model
+
+	KubeEventID uint
+	Message     string
+	Reason      string
+	Timestamp   time.Time
+
+	// The event type, such as "critical" or "normal"
+	EventType types.KubeEventType
+}
+
+func (k *KubeSubEvent) ToKubeSubEventType() *types.KubeSubEvent {
+	return &types.KubeSubEvent{
+		Message:   k.Message,
+		Reason:    k.Reason,
+		Timestamp: k.Timestamp,
+		EventType: k.EventType,
 	}
 }
 
-func (k *KubeEvent) ToKubeEventBasicType() *types.KubeEventBasic {
-	return &types.KubeEventBasic{
+func (k *KubeEvent) ToKubeEventType() *types.KubeEvent {
+	subEvents := make([]*types.KubeSubEvent, 0)
+
+	for _, subEvent := range k.SubEvents {
+		subEvents = append(subEvents, subEvent.ToKubeSubEventType())
+	}
+
+	return &types.KubeEvent{
+		UpdatedAt:    k.UpdatedAt,
 		ID:           k.ID,
 		ProjectID:    k.ProjectID,
 		ClusterID:    k.ClusterID,
@@ -44,9 +71,6 @@ func (k *KubeEvent) ToKubeEventBasicType() *types.KubeEventBasic {
 		Namespace:    k.Namespace,
 		OwnerType:    k.OwnerType,
 		OwnerName:    k.OwnerName,
-		EventType:    k.EventType,
-		Message:      k.Message,
-		Reason:       k.Reason,
-		Timestamp:    k.Timestamp,
+		SubEvents:    subEvents,
 	}
 }

+ 2 - 1
internal/repository/event.go

@@ -16,12 +16,13 @@ type BuildEventRepository interface {
 
 type KubeEventRepository interface {
 	CreateEvent(event *models.KubeEvent) (*models.KubeEvent, error)
+	AppendSubEvent(event *models.KubeEvent, subEvent *models.KubeSubEvent) error
 	ReadEvent(id uint, projID uint, clusterID uint) (*models.KubeEvent, error)
+	ReadEventByGroup(projID uint, clusterID uint, opts *types.GroupOptions) (*models.KubeEvent, error)
 	ListEventsByProjectID(
 		projectID uint,
 		clusterID uint,
 		opts *types.ListKubeEventRequest,
-		shouldDecrypt bool,
 	) ([]*models.KubeEvent, error)
 	DeleteEvent(id uint) error
 }

+ 52 - 63
internal/repository/gorm/event.go

@@ -2,6 +2,7 @@ package gorm
 
 import (
 	"strings"
+	"time"
 
 	"github.com/porter-dev/porter/api/types"
 	"github.com/porter-dev/porter/internal/models"
@@ -82,12 +83,6 @@ func NewKubeEventRepository(db *gorm.DB, key *[32]byte) repository.KubeEventRepo
 func (repo *KubeEventRepository) CreateEvent(
 	event *models.KubeEvent,
 ) (*models.KubeEvent, error) {
-	err := repo.EncryptKubeEventData(event, repo.key)
-
-	if err != nil {
-		return nil, err
-	}
-
 	if err := repo.db.Create(event).Error; err != nil {
 		return nil, err
 	}
@@ -101,8 +96,7 @@ func (repo *KubeEventRepository) ReadEvent(
 ) (*models.KubeEvent, error) {
 	event := &models.KubeEvent{}
 
-	// preload Clusters association
-	if err := repo.db.Where(
+	if err := repo.db.Preload("SubEvents").Where(
 		"id = ? AND project_id = ? AND cluster_id = ?",
 		id,
 		projID,
@@ -111,9 +105,41 @@ func (repo *KubeEventRepository) ReadEvent(
 		return nil, err
 	}
 
-	err := repo.DecryptKubeEventData(event, repo.key)
+	// subEvents := make([]models.KubeSubEvent, 0)
+
+	// if err := repo.db.Where("kube_event_id = ?", event.ID).Find(&subEvents).Error; err != nil {
+	// 	return nil, err
+	// }
+
+	// event.SubEvents = subEvents
 
-	if err != nil {
+	return event, nil
+}
+
+// ReadEventByGroup finds an event by a set of options which group events together
+func (repo *KubeEventRepository) ReadEventByGroup(
+	projID uint,
+	clusterID uint,
+	opts *types.GroupOptions,
+) (*models.KubeEvent, error) {
+	event := &models.KubeEvent{}
+
+	query := repo.db.Debug().Preload("SubEvents").
+		Where("project_id = ? AND cluster_id = ? AND name = ? AND resource_type = ?", projID, clusterID, opts.Name, opts.ResourceType)
+
+	// construct query for timestamp
+	query = query.Where(
+		"updated_at >= ?", opts.ThresholdTime,
+	)
+
+	if opts.Namespace != "" {
+		query = query.Where(
+			"namespace = ?",
+			strings.ToLower(opts.Namespace),
+		)
+	}
+
+	if err := query.First(&event).Error; err != nil {
 		return nil, err
 	}
 
@@ -126,7 +152,6 @@ func (repo *KubeEventRepository) ListEventsByProjectID(
 	projectID uint,
 	clusterID uint,
 	opts *types.ListKubeEventRequest,
-	shouldDecrypt bool,
 ) ([]*models.KubeEvent, error) {
 	listOpts := opts
 
@@ -136,14 +161,8 @@ func (repo *KubeEventRepository) ListEventsByProjectID(
 
 	events := []*models.KubeEvent{}
 
-	query := repo.db.Where("project_id = ? AND cluster_id = ?", projectID, clusterID)
-
-	if listOpts.Type != "" {
-		query = query.Where(
-			"event_type = ?",
-			strings.ToLower(listOpts.Type),
-		)
-	}
+	// preload the subevents
+	query := repo.db.Preload("SubEvents").Where("project_id = ? AND cluster_id = ?", projectID, clusterID)
 
 	if listOpts.OwnerName != "" && listOpts.OwnerType != "" {
 		query = query.Where(
@@ -163,66 +182,36 @@ func (repo *KubeEventRepository) ListEventsByProjectID(
 	query = query.Limit(listOpts.Limit).Offset(listOpts.Skip)
 
 	if listOpts.SortBy == "timestamp" {
-		query = query.Order("timestamp desc").Order("id desc")
+		// sort by the updated_at field
+		query = query.Order("updated_at desc").Order("id desc")
 	}
 
 	if err := query.Find(&events).Error; err != nil {
 		return nil, err
 	}
 
-	if shouldDecrypt {
-		for _, event := range events {
-			repo.DecryptKubeEventData(event, repo.key)
-		}
-	}
-
 	return events, nil
 }
 
-// DeleteEvent deletes an event by ID
-func (repo *KubeEventRepository) DeleteEvent(
-	id uint,
-) error {
-	if err := repo.db.Where("id = ?", id).Delete(&models.KubeEvent{}).Error; err != nil {
+// AppendSubEvent will add a subevent to an existing event
+func (repo *KubeEventRepository) AppendSubEvent(event *models.KubeEvent, subEvent *models.KubeSubEvent) error {
+	subEvent.KubeEventID = event.ID
+
+	if err := repo.db.Create(subEvent).Error; err != nil {
 		return err
 	}
 
-	return nil
-}
-
-// EncryptEventData will encrypt the event data before
-// writing to the DB
-func (repo *KubeEventRepository) EncryptKubeEventData(
-	event *models.KubeEvent,
-	key *[32]byte,
-) error {
-	if len(event.Data) > 0 {
-		cipherData, err := repository.Encrypt(event.Data, key)
+	event.UpdatedAt = time.Now()
 
-		if err != nil {
-			return err
-		}
-
-		event.Data = cipherData
-	}
-
-	return nil
+	return repo.db.Save(event).Error
 }
 
-// DecryptEventData will decrypt the event data before
-// returning it from the DB
-func (repo *KubeEventRepository) DecryptKubeEventData(
-	event *models.KubeEvent,
-	key *[32]byte,
+// DeleteEvent deletes an event by ID
+func (repo *KubeEventRepository) DeleteEvent(
+	id uint,
 ) error {
-	if len(event.Data) > 0 {
-		plaintext, err := repository.Decrypt(event.Data, key)
-
-		if err != nil {
-			return err
-		}
-
-		event.Data = plaintext
+	if err := repo.db.Preload("SubEvents").Where("id = ?", id).Delete(&models.KubeEvent{}).Error; err != nil {
+		return err
 	}
 
 	return nil

+ 69 - 16
internal/repository/gorm/event_test.go

@@ -1,8 +1,10 @@
 package gorm_test
 
 import (
+	"encoding/json"
 	"fmt"
 	"testing"
+	"time"
 
 	"github.com/go-test/deep"
 	"github.com/porter-dev/porter/api/types"
@@ -24,12 +26,8 @@ func TestCreateKubeEvent(t *testing.T) {
 	event := &models.KubeEvent{
 		ProjectID: tester.initProjects[0].Model.ID,
 		ClusterID: tester.initClusters[0].Model.ID,
-		EventType: "pod",
 		Name:      "pod-example-1",
 		Namespace: "default",
-		Message:   "Pod killed",
-		Reason:    "OOM: memory limit exceeded",
-		Data:      []byte("log from pod\nlog2 from pod"),
 	}
 
 	copyKubeEvent := *event
@@ -40,6 +38,29 @@ func TestCreateKubeEvent(t *testing.T) {
 		t.Fatalf("%v\n", err)
 	}
 
+	// append a sub event as well
+	subEvent := &models.KubeSubEvent{
+		EventType: "pod",
+		Message:   "Pod killed",
+		Reason:    "OOM: memory limit exceeded",
+		Timestamp: time.Now(),
+	}
+
+	bytes, err := json.Marshal(subEvent)
+
+	t.Errorf("BYTES ARE %s", string(bytes))
+
+	copySubEvent := *subEvent
+	copySubEvent.KubeEventID = 1
+
+	err = tester.repo.KubeEvent().AppendSubEvent(event, subEvent)
+
+	if err != nil {
+		t.Fatalf("%v\n", err)
+	}
+
+	copyKubeEvent.SubEvents = []models.KubeSubEvent{copySubEvent}
+
 	event, err = tester.repo.KubeEvent().ReadEvent(event.Model.ID, 1, 1)
 
 	if err != nil {
@@ -52,9 +73,46 @@ func TestCreateKubeEvent(t *testing.T) {
 	}
 
 	event.Model = gorm.Model{}
+	event.SubEvents[0].Model = gorm.Model{}
 
 	if diff := deep.Equal(event, &copyKubeEvent); diff != nil {
-		t.Errorf("tokens not equal:")
+		t.Errorf("events not equal:")
+		t.Error(diff)
+	}
+}
+
+func TestReadKubeEventsByGroup(t *testing.T) {
+	suffix, _ := repository.GenerateRandomBytes(4)
+
+	tester := &tester{
+		dbFileName: fmt.Sprintf("./porter_read_event_%s.db", suffix),
+	}
+
+	setupTestEnv(tester, t)
+	initProject(tester, t)
+	initCluster(tester, t)
+	initKubeEvents(tester, t)
+	defer cleanup(tester, t)
+
+	event, err := tester.repo.KubeEvent().ReadEventByGroup(
+		tester.initProjects[0].Model.ID,
+		tester.initClusters[0].Model.ID,
+		&types.GroupOptions{
+			Name:          "pod-example-1",
+			Namespace:     "default",
+			ResourceType:  "pod",
+			ThresholdTime: time.Now().Add(-15 * time.Minute),
+		},
+	)
+
+	if err != nil {
+		t.Fatalf("%v\n", err)
+	}
+
+	expKubeEvent := tester.initKubeEvents[1]
+
+	if diff := deep.Equal(expKubeEvent, event); diff != nil {
+		t.Errorf("incorrect events")
 		t.Error(diff)
 	}
 }
@@ -73,8 +131,8 @@ func TestListKubeEventsByProjectIDWithLimit(t *testing.T) {
 	defer cleanup(tester, t)
 
 	testListKubeEventsByProjectID(tester, t, 1, true, &types.ListKubeEventRequest{
-		Limit: 10,
-		Type:  "node",
+		Limit:        10,
+		ResourceType: "node",
 	}, tester.initKubeEvents[50:60])
 }
 
@@ -111,10 +169,10 @@ func TestListKubeEventsByProjectIDWithSortBy(t *testing.T) {
 	defer cleanup(tester, t)
 
 	testListKubeEventsByProjectID(tester, t, 1, true, &types.ListKubeEventRequest{
-		Limit:  1,
-		Skip:   0,
-		Type:   "node",
-		SortBy: "timestamp",
+		Limit:        1,
+		Skip:         0,
+		ResourceType: "node",
+		SortBy:       "timestamp",
 	}, tester.initKubeEvents[99:])
 }
 
@@ -125,7 +183,6 @@ func testListKubeEventsByProjectID(tester *tester, t *testing.T, clusterID uint,
 		tester.initProjects[0].Model.ID,
 		clusterID,
 		opts,
-		decrypt,
 	)
 
 	if err != nil {
@@ -137,10 +194,6 @@ func testListKubeEventsByProjectID(tester *tester, t *testing.T, clusterID uint,
 		t.Fatalf("length of events incorrect: expected %d, got %d\n", len(expKubeEvents), len(events))
 	}
 
-	for _, expKubeEvent := range expKubeEvents {
-		expKubeEvent.Data = []byte("log from pod\nlog2 from pod")
-	}
-
 	if diff := deep.Equal(expKubeEvents, events); diff != nil {
 		t.Errorf("incorrect events")
 		t.Error(diff)

+ 20 - 7
internal/repository/gorm/helpers_test.go

@@ -67,6 +67,7 @@ func setupTestEnv(tester *tester, t *testing.T) {
 		&models.GitActionConfig{},
 		&models.Invite{},
 		&models.KubeEvent{},
+		&models.KubeSubEvent{},
 		&models.Onboarding{},
 		&ints.KubeIntegration{},
 		&ints.BasicIntegration{},
@@ -570,22 +571,34 @@ func initKubeEvents(tester *tester, t *testing.T) {
 		}
 
 		event := &models.KubeEvent{
-			ProjectID: tester.initProjects[0].Model.ID,
-			ClusterID: tester.initClusters[0].Model.ID,
-			EventType: refType,
-			Name:      fmt.Sprintf("%s-example-%d", refType, i),
-			Namespace: "default",
+			ProjectID:    tester.initProjects[0].Model.ID,
+			ClusterID:    tester.initClusters[0].Model.ID,
+			Name:         fmt.Sprintf("%s-example-%d", refType, i),
+			Namespace:    "default",
+			ResourceType: refType,
+		}
+
+		event, err := tester.repo.KubeEvent().CreateEvent(event)
+
+		if err != nil {
+			t.Fatalf("%v\n", err)
+		}
+
+		// append a sub event as well
+		subEvent := &models.KubeSubEvent{
+			EventType: "pod",
 			Message:   "Pod killed",
 			Reason:    "OOM: memory limit exceeded",
-			Data:      []byte("log from pod\nlog2 from pod"),
 		}
 
-		event, err := tester.repo.KubeEvent().CreateEvent(event)
+		err = tester.repo.KubeEvent().AppendSubEvent(event, subEvent)
 
 		if err != nil {
 			t.Fatalf("%v\n", err)
 		}
 
+		event.SubEvents = []models.KubeSubEvent{*subEvent}
+
 		initEvents = append(initEvents, event)
 	}
 

+ 1 - 0
internal/repository/gorm/migrate.go

@@ -30,6 +30,7 @@ func AutoMigrate(db *gorm.DB) error {
 		&models.EventContainer{},
 		&models.SubEvent{},
 		&models.KubeEvent{},
+		&models.KubeSubEvent{},
 		&models.ProjectUsage{},
 		&models.ProjectUsageCache{},
 		&models.Onboarding{},

+ 12 - 1
internal/repository/test/event.go

@@ -50,15 +50,26 @@ func (n *KubeEventRepository) ReadEvent(id uint, projID uint, clusterID uint) (*
 	panic("not implemented") // TODO: Implement
 }
 
+func (n *KubeEventRepository) ReadEventByGroup(
+	projID uint,
+	clusterID uint,
+	opts *types.GroupOptions,
+) (*models.KubeEvent, error) {
+	panic("not implemented") // TODO: Implement
+}
+
 func (n *KubeEventRepository) ListEventsByProjectID(
 	projectID uint,
 	clusterID uint,
 	opts *types.ListKubeEventRequest,
-	shouldDecrypt bool,
 ) ([]*models.KubeEvent, error) {
 	panic("not implemented") // TODO: Implement
 }
 
+func (n *KubeEventRepository) AppendSubEvent(event *models.KubeEvent, subEvent *models.KubeSubEvent) error {
+	panic("not implemented") // TODO: Implement
+}
+
 func (n *KubeEventRepository) DeleteEvent(id uint) error {
 	panic("not implemented") // TODO: Implement
 }