Selaa lähdekoodia

add endpoint for log buckets

Alexander Belanger 4 vuotta sitten
vanhempi
sitoutus
7ea4111bc7

+ 96 - 0
api/server/handlers/kube_events/get_log_buckets.go

@@ -0,0 +1,96 @@
+package kube_events
+
+import (
+	"fmt"
+	"net/http"
+	"strings"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/requestutils"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/kubernetes/porter_agent"
+	"github.com/porter-dev/porter/internal/models"
+)
+
+type GetKubeEventLogBucketsHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+func NewGetKubeEventLogBucketsHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *GetKubeEventLogBucketsHandler {
+	return &GetKubeEventLogBucketsHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (c *GetKubeEventLogBucketsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
+	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+	kubeEventID, _ := requestutils.GetURLParamUint(r, types.URLParamKubeEventID)
+
+	kubeEvent, err := c.Repo().KubeEvent().ReadEvent(kubeEventID, proj.ID, cluster.ID)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	// if the kube event is not a pod type, throw a bad request error to the user
+	if strings.ToLower(kubeEvent.ResourceType) != "pod" {
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(
+			fmt.Errorf("event resource type must be pod to get logs"),
+			http.StatusBadRequest,
+		))
+
+		return
+	}
+
+	req := &types.GetKubeEventLogsRequest{}
+
+	if ok := c.DecodeAndValidate(w, r, req); !ok {
+		return
+	}
+
+	agent, err := c.GetAgent(r, cluster, "")
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	// get agent service
+	agentSvc, err := porter_agent.GetAgentService(agent.Clientset)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	resp, err := porter_agent.GetLogBucketsFromPorterAgent(agent.Clientset, agentSvc, &porter_agent.LogBucketPathOpts{
+		Pod:       kubeEvent.Name,
+		Namespace: kubeEvent.Namespace,
+	})
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	if resp.Error != "" {
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(fmt.Errorf(resp.Error), http.StatusBadRequest))
+		return
+	}
+
+	c.WriteResult(w, r, &types.GetKubeEventLogBucketsResponse{
+		LogBuckets: resp.AvailableBuckets,
+	})
+}

+ 11 - 1
api/server/handlers/kube_events/get_logs.go

@@ -75,12 +75,22 @@ func (c *GetKubeEventLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque
 		return
 		return
 	}
 	}
 
 
-	resp, err := porter_agent.QueryPorterAgent(agent.Clientset, agentSvc, &porter_agent.PathOpts{
+	resp, err := porter_agent.GetLogsFromPorterAgent(agent.Clientset, agentSvc, &porter_agent.LogPathOpts{
 		Timestamp: req.Timestamp,
 		Timestamp: req.Timestamp,
 		Pod:       kubeEvent.Name,
 		Pod:       kubeEvent.Name,
 		Namespace: kubeEvent.Namespace,
 		Namespace: kubeEvent.Namespace,
 	})
 	})
 
 
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	if resp.Error != "" {
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(fmt.Errorf(resp.Error), http.StatusBadRequest))
+		return
+	}
+
 	c.WriteResult(w, r, &types.GetKubeEventLogsResponse{
 	c.WriteResult(w, r, &types.GetKubeEventLogsResponse{
 		Logs: resp.Logs,
 		Logs: resp.Logs,
 	})
 	})

+ 29 - 0
api/server/router/cluster.go

@@ -593,6 +593,35 @@ func getClusterRoutes(
 		Router:   r,
 		Router:   r,
 	})
 	})
 
 
+	// GET /api/projects/{project_id}/clusters/{cluster_id}/kube_events/{kube_event_id}/logs -> kube_events.NewGetKubeEventLogsHandler
+	getKubeEventLogBucketsEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbGet,
+			Method: types.HTTPVerbGet,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: fmt.Sprintf("%s/kube_events/{%s}/log_buckets", relPath, types.URLParamKubeEventID),
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+			},
+		},
+	)
+
+	getKubeEventLogBucketsHandler := kube_events.NewGetKubeEventLogBucketsHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &Route{
+		Endpoint: getKubeEventLogBucketsEndpoint,
+		Handler:  getKubeEventLogBucketsHandler,
+		Router:   r,
+	})
+
 	// POST /api/projects/{project_id}/clusters/{cluster_id}/kube_events -> kube_events.NewCreateKubeEventHandler
 	// POST /api/projects/{project_id}/clusters/{cluster_id}/kube_events -> kube_events.NewCreateKubeEventHandler
 	createKubeEventsEndpoint := factory.NewAPIEndpoint(
 	createKubeEventsEndpoint := factory.NewAPIEndpoint(
 		&types.APIRequestMetadata{
 		&types.APIRequestMetadata{

+ 4 - 0
api/types/kube_events.go

@@ -84,3 +84,7 @@ type GetKubeEventLogsRequest struct {
 type GetKubeEventLogsResponse struct {
 type GetKubeEventLogsResponse struct {
 	Logs []string `json:"logs"`
 	Logs []string `json:"logs"`
 }
 }
+
+type GetKubeEventLogBucketsResponse struct {
+	LogBuckets []string `json:"log_buckets"`
+}

+ 57 - 8
internal/kubernetes/porter_agent/logs.go

@@ -25,17 +25,17 @@ type SimpleIngress struct {
 	Namespace string `json:"namespace"`
 	Namespace string `json:"namespace"`
 }
 }
 
 
-type PathOpts struct {
+type LogPathOpts struct {
 	Timestamp int
 	Timestamp int
 	Pod       string
 	Pod       string
 	Namespace string
 	Namespace string
 }
 }
 
 
-func QueryPorterAgent(
+func GetLogsFromPorterAgent(
 	clientset kubernetes.Interface,
 	clientset kubernetes.Interface,
 	service *v1.Service,
 	service *v1.Service,
-	opts *PathOpts,
-) (*AgentResp, error) {
+	opts *LogPathOpts,
+) (*AgentLogsResp, error) {
 	if len(service.Spec.Ports) == 0 {
 	if len(service.Spec.Ports) == 0 {
 		return nil, fmt.Errorf("agent service has no exposed ports to query")
 		return nil, fmt.Errorf("agent service has no exposed ports to query")
 	}
 	}
@@ -54,16 +54,65 @@ func QueryPorterAgent(
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	return parseQuery(rawQuery)
+	return parseLogQuery(rawQuery)
 }
 }
 
 
-type AgentResp struct {
+type LogBucketPathOpts struct {
+	Pod       string
+	Namespace string
+}
+
+func GetLogBucketsFromPorterAgent(
+	clientset kubernetes.Interface,
+	service *v1.Service,
+	opts *LogBucketPathOpts,
+) (*AgentLogBucketsResp, error) {
+	if len(service.Spec.Ports) == 0 {
+		return nil, fmt.Errorf("agent service has no exposed ports to query")
+	}
+
+	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
+		"http",
+		service.Name,
+		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
+		fmt.Sprintf("/pod/%s/ns/%s/logbucket", opts.Pod, opts.Namespace),
+		nil,
+	)
+
+	rawQuery, err := resp.DoRaw(context.TODO())
+
+	if err != nil {
+		return nil, err
+	}
+
+	return parseLogBucketsQuery(rawQuery)
+}
+
+type AgentLogsResp struct {
 	Logs          []string `json:"logs"`
 	Logs          []string `json:"logs"`
 	MatchedBucket int      `json:"matchedBucket"`
 	MatchedBucket int      `json:"matchedBucket"`
+	Error         string   `json:"error"`
+}
+
+func parseLogQuery(rawQuery []byte) (*AgentLogsResp, error) {
+	resp := &AgentLogsResp{}
+
+	err := json.Unmarshal(rawQuery, resp)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return resp, nil
+}
+
+type AgentLogBucketsResp struct {
+	AvailableBuckets []string `json:"availableLogBuckets"`
+	Error            string   `json:"error"`
 }
 }
 
 
-func parseQuery(rawQuery []byte) (*AgentResp, error) {
-	resp := &AgentResp{}
+func parseLogBucketsQuery(rawQuery []byte) (*AgentLogBucketsResp, error) {
+	resp := &AgentLogBucketsResp{}
 
 
 	err := json.Unmarshal(rawQuery, resp)
 	err := json.Unmarshal(rawQuery, resp)
 
 

+ 47 - 0
internal/repository/gorm/event.go

@@ -83,6 +83,30 @@ func NewKubeEventRepository(db *gorm.DB, key *[32]byte) repository.KubeEventRepo
 func (repo *KubeEventRepository) CreateEvent(
 func (repo *KubeEventRepository) CreateEvent(
 	event *models.KubeEvent,
 	event *models.KubeEvent,
 ) (*models.KubeEvent, error) {
 ) (*models.KubeEvent, error) {
+	// read the count of the events in the DB
+	query := repo.db.Where("project_id = ? AND cluster_id = ?", event.ProjectID, event.ClusterID)
+
+	var count int64
+
+	if err := query.Model([]*models.KubeEvent{}).Count(&count).Error; err != nil {
+		return nil, err
+	}
+
+	// if the count is greater than 500, remove the lowest-order event to implement a
+	// basic fixed-length buffer
+	if count >= 500 {
+		// find the id of the first match
+		matchedEvent := &models.KubeEvent{}
+
+		if err := query.Order("updated_at asc").Order("id asc").First(matchedEvent).Error; err != nil {
+			return nil, err
+		}
+
+		if err := query.Delete(matchedEvent).Error; err != nil {
+			return nil, err
+		}
+	}
+
 	if err := repo.db.Create(event).Error; err != nil {
 	if err := repo.db.Create(event).Error; err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -196,6 +220,29 @@ func (repo *KubeEventRepository) ListEventsByProjectID(
 func (repo *KubeEventRepository) AppendSubEvent(event *models.KubeEvent, subEvent *models.KubeSubEvent) error {
 func (repo *KubeEventRepository) AppendSubEvent(event *models.KubeEvent, subEvent *models.KubeSubEvent) error {
 	subEvent.KubeEventID = event.ID
 	subEvent.KubeEventID = event.ID
 
 
+	var count int64
+
+	query := repo.db.Where("kube_event_id = ?", event.ID)
+
+	if err := query.Model([]*models.KubeSubEvent{}).Count(&count).Error; err != nil {
+		return err
+	}
+
+	// if the count is greater than 20, remove the lowest-order event to implement a
+	// basic fixed-length buffer
+	if count >= 20 {
+		// find the id of the first match
+		matchedEvent := &models.KubeSubEvent{}
+
+		if err := query.Order("updated_at asc").Order("id asc").First(matchedEvent).Error; err != nil {
+			return err
+		}
+
+		if err := query.Delete(matchedEvent).Error; err != nil {
+			return err
+		}
+	}
+
 	if err := repo.db.Create(subEvent).Error; err != nil {
 	if err := repo.db.Create(subEvent).Error; err != nil {
 		return err
 		return err
 	}
 	}