Преглед изворни кода

Merge branch 'belanger/agent-v3-integration' into dev

Alexander Belanger пре 3 година
родитељ
комит
2fa9e4febb

+ 23 - 9
api/server/handlers/cluster/get_incident_event_logs.go

@@ -1,15 +1,18 @@
 package cluster
 
 import (
+	"fmt"
 	"net/http"
+	"strings"
+	"time"
 
 	"github.com/porter-dev/porter/api/server/authz"
 	"github.com/porter-dev/porter/api/server/handlers"
 	"github.com/porter-dev/porter/api/server/shared"
 	"github.com/porter-dev/porter/api/server/shared/apierrors"
 	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/websocket"
 	"github.com/porter-dev/porter/api/types"
-	porter_agent "github.com/porter-dev/porter/internal/kubernetes/porter_agent/v2"
 	"github.com/porter-dev/porter/internal/models"
 )
 
@@ -30,6 +33,7 @@ func NewGetIncidentEventLogsHandler(
 }
 
 func (c *GetIncidentEventLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	safeRW := r.Context().Value(types.RequestCtxWebsocketKey).(*websocket.WebsocketSafeReadWriter)
 	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
 
 	request := &types.GetIncidentEventLogsRequest{}
@@ -38,27 +42,37 @@ func (c *GetIncidentEventLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.R
 		return
 	}
 
-	agent, err := c.GetAgent(r, cluster, "")
+	k8sAgent, err := c.GetAgent(r, cluster, "porter-agent-system")
 
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
 		return
 	}
 
-	// get agent service
-	agentSvc, err := porter_agent.GetAgentService(agent.Clientset)
+	if len(request.Labels) == 0 {
+		return
+	}
 
-	if err != nil {
-		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+	// validate that the labels are valid
+	for _, label := range request.Labels {
+		if key, val, found := strings.Cut(label, "="); !found || key == "" || val == "" {
+			c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(fmt.Errorf("invalid label: %s", label),
+				http.StatusBadRequest))
+			return
+		}
+	}
+
+	// validate start time
+	if _, err := time.Parse(time.RFC3339, request.StartTime); err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(fmt.Errorf("invalid start time: %s", request.StartTime),
+			http.StatusBadRequest))
 		return
 	}
 
-	logs, err := porter_agent.GetLogs(agent.Clientset, agentSvc, request.LogID)
+	err = k8sAgent.StreamPorterAgentLokiLog(request.Labels, request.StartTime, request.Limit, safeRW)
 
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
 		return
 	}
-
-	c.WriteResult(w, r, logs)
 }

+ 23 - 23
api/server/handlers/cluster/get_incidents.go

@@ -38,9 +38,9 @@ func (c *GetIncidentsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 		return
 	}
 
-	incidentID := request.IncidentID
-	releaseName := request.ReleaseName
-	namespace := request.Namespace
+	// incidentID := request.IncidentID
+	// releaseName := request.ReleaseName
+	// namespace := request.Namespace
 
 	agent, err := c.GetAgent(r, cluster, "")
 
@@ -57,31 +57,31 @@ func (c *GetIncidentsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 		return
 	}
 
-	if incidentID != "" {
-		events, err := porter_agent.GetIncidentEventsByID(agent.Clientset, agentSvc, incidentID)
+	// if incidentID != "" {
+	// 	events, err := porter_agent.GetIncidentEventsByID(agent.Clientset, agentSvc, incidentID)
 
-		if err != nil {
-			c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
-			return
-		}
+	// 	if err != nil {
+	// 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+	// 		return
+	// 	}
 
-		c.WriteResult(w, r, events)
-		return
-	} else if releaseName != "" {
-		if namespace == "" {
-			namespace = "default"
-		}
+	// 	c.WriteResult(w, r, events)
+	// 	return
+	// } else if releaseName != "" {
+	// 	if namespace == "" {
+	// 		namespace = "default"
+	// 	}
 
-		incidents, err := porter_agent.GetIncidentsByReleaseNamespace(agent.Clientset, agentSvc, releaseName, namespace)
+	// 	incidents, err := porter_agent.GetIncidentsByReleaseNamespace(agent.Clientset, agentSvc, releaseName, namespace)
 
-		if err != nil {
-			c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
-			return
-		}
+	// 	if err != nil {
+	// 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+	// 		return
+	// 	}
 
-		c.WriteResult(w, r, incidents)
-		return
-	}
+	// 	c.WriteResult(w, r, incidents)
+	// 	return
+	// }
 
 	incidents, err := porter_agent.GetAllIncidents(agent.Clientset, agentSvc)
 

+ 89 - 2
api/server/handlers/cluster/install_agent.go

@@ -1,6 +1,7 @@
 package cluster
 
 import (
+	"context"
 	"fmt"
 	"net/http"
 
@@ -13,7 +14,14 @@ import (
 	"github.com/porter-dev/porter/internal/auth/token"
 	"github.com/porter-dev/porter/internal/helm"
 	"github.com/porter-dev/porter/internal/helm/loader"
+	"github.com/porter-dev/porter/internal/kubernetes"
 	"github.com/porter-dev/porter/internal/models"
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+const (
+	monitoringNodeLabel = "porter.run/workload-kind=monitoring"
+	olderAgentLabel     = "control-plane=controller-manager"
 )
 
 type InstallAgentHandler struct {
@@ -36,6 +44,21 @@ func (c *InstallAgentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 	proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
 	user, _ := r.Context().Value(types.UserScope).(*models.User)
 	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+
+	k8sAgent, err := c.GetAgent(r, cluster, "porter-agent-system")
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	err = checkAndDeleteOlderAgent(k8sAgent)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
 	helmAgent, err := c.GetHelmAgent(r, cluster, "porter-agent-system")
 
 	if err != nil {
@@ -87,6 +110,13 @@ func (c *InstallAgentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 		},
 	}
 
+	if exists, err := checkIfMonitoringNodeExists(k8sAgent); err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	} else if exists {
+		porterAgentValues["monitoringWorkload"] = true
+	}
+
 	conf := &helm.InstallChartConfig{
 		Chart:     chart,
 		Name:      "porter-agent",
@@ -100,8 +130,7 @@ func (c *InstallAgentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(
-			fmt.Errorf("error installing a new chart: %s", err.Error()),
-			http.StatusBadRequest,
+			fmt.Errorf("error installing porter-agent: %w", err), http.StatusBadRequest,
 		))
 
 		return
@@ -109,3 +138,61 @@ func (c *InstallAgentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 
 	w.WriteHeader(http.StatusOK)
 }
+
+func checkAndDeleteOlderAgent(k8sAgent *kubernetes.Agent) error {
+	namespaceList, err := k8sAgent.Clientset.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{})
+
+	if err != nil {
+		return fmt.Errorf("error listing namespaces: %w", err)
+	}
+
+	nsExists := false
+
+	for _, namespace := range namespaceList.Items {
+		if namespace.Name == "porter-agent-system" {
+			nsExists = true
+			break
+		}
+	}
+
+	if !nsExists {
+		return nil
+	}
+
+	podList, err := k8sAgent.Clientset.CoreV1().Pods("porter-agent-system").List(context.Background(), v1.ListOptions{
+		LabelSelector: olderAgentLabel,
+	})
+
+	if err != nil {
+		return fmt.Errorf("error listing pods for older porter-agent: %w", err)
+	}
+
+	if len(podList.Items) > 0 {
+		// older porter-agent exists, delete the entire namespace
+		err := k8sAgent.Clientset.CoreV1().Namespaces().Delete(
+			context.Background(), "porter-agent-system", v1.DeleteOptions{},
+		)
+
+		if err != nil {
+			return fmt.Errorf("error deleting older porter-agent's namespace: %w", err)
+		}
+	}
+
+	return nil
+}
+
+func checkIfMonitoringNodeExists(k8sAgent *kubernetes.Agent) (bool, error) {
+	nodeList, err := k8sAgent.Clientset.CoreV1().Nodes().List(context.Background(), v1.ListOptions{
+		LabelSelector: monitoringNodeLabel,
+	})
+
+	if err != nil {
+		return false, fmt.Errorf("error listing nodes: %w", err)
+	}
+
+	if len(nodeList.Items) > 0 {
+		return true, nil
+	}
+
+	return false, nil
+}

+ 1 - 0
api/server/router/cluster.go

@@ -1198,6 +1198,7 @@ func getClusterRoutes(
 				types.ProjectScope,
 				types.ClusterScope,
 			},
+			IsWebsocket: true,
 		},
 	)
 

+ 3 - 1
api/types/cluster.go

@@ -280,7 +280,9 @@ type GetIncidentsRequest struct {
 }
 
 type GetIncidentEventLogsRequest struct {
-	LogID string `schema:"log_id"`
+	Labels    []string `schema:"labels" form:"required"`
+	StartTime string   `schema:"start_time" form:"required"`
+	Limit     uint32   `schema:"limit"`
 }
 
 type IncidentNotifyRequest struct {

+ 138 - 0
internal/kubernetes/agent.go

@@ -46,6 +46,7 @@ import (
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/tools/cache"
 	"k8s.io/client-go/tools/remotecommand"
+	"k8s.io/kubectl/pkg/scheme"
 
 	rspb "helm.sh/helm/v3/pkg/release"
 )
@@ -1754,6 +1755,143 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 	return a.RunWebsocketTask(run)
 }
 
+func (a *Agent) StreamPorterAgentLokiLog(
+	labels []string,
+	startTime string,
+	limit uint32,
+	rw *websocket.WebsocketSafeReadWriter,
+) error {
+	run := func() error {
+		errorchan := make(chan error)
+
+		var wg sync.WaitGroup
+		var once sync.Once
+		var err error
+
+		wg.Add(2)
+
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
+			defer wg.Done()
+
+			// listens for websocket closing handshake
+			for {
+				if _, _, err := rw.ReadMessage(); err != nil {
+					errorchan <- nil
+					return
+				}
+			}
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
+			defer wg.Done()
+
+			podList, err := a.Clientset.CoreV1().Pods("porter-agent-system").List(context.Background(), metav1.ListOptions{
+				LabelSelector: "app.kubernetes.io/instance=porter-agent",
+			})
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+
+			if len(podList.Items) == 0 {
+				errorchan <- fmt.Errorf("no porter agent pods found")
+				return
+			}
+
+			pod := podList.Items[0]
+
+			restConf, err := a.RESTClientGetter.ToRESTConfig()
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+
+			req := a.Clientset.CoreV1().RESTClient().
+				Post().
+				Resource("pods").
+				Name(pod.Name).
+				Namespace(pod.Namespace).
+				SubResource("exec")
+
+			cmd := []string{
+				"sh",
+				"-c",
+				"/porter/agent-cli",
+				"--start",
+				startTime,
+			}
+
+			for _, label := range labels {
+				cmd = append(cmd, "--label", label)
+			}
+
+			if limit > 0 {
+				cmd = append(cmd, "--limit", fmt.Sprintf("%d", limit))
+			}
+
+			opts := &v1.PodExecOptions{
+				Command: cmd,
+				Stdout:  true,
+				Stderr:  true,
+			}
+
+			req.VersionedParams(
+				opts,
+				scheme.ParameterCodec,
+			)
+
+			exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+
+			err = exec.Stream(remotecommand.StreamOptions{
+				Stdin:  nil,
+				Stdout: rw,
+				Stderr: rw,
+			})
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+		}()
+
+		for err = range errorchan {
+			once.Do(func() {
+				rw.Close()
+			})
+		}
+
+		return err
+	}
+
+	return a.RunWebsocketTask(run)
+}
+
 // CreateImagePullSecrets will create the required image pull secrets and
 // return a map from the registry name to the name of the secret.
 func (a *Agent) CreateImagePullSecrets(

+ 85 - 85
internal/kubernetes/porter_agent/v2/agent_server.go

@@ -23,7 +23,7 @@ func GetAgentService(clientset kubernetes.Interface) (*v1.Service, error) {
 func GetAllIncidents(
 	clientset kubernetes.Interface,
 	service *v1.Service,
-) (*IncidentsResponse, error) {
+) (*ListIncidentsResponse, error) {
 	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
 		"http",
 		service.Name,
@@ -37,7 +37,7 @@ func GetAllIncidents(
 		return nil, err
 	}
 
-	incidentsResp := &IncidentsResponse{}
+	incidentsResp := &ListIncidentsResponse{}
 
 	err = json.Unmarshal(rawQuery, incidentsResp)
 	if err != nil {
@@ -47,86 +47,86 @@ func GetAllIncidents(
 	return incidentsResp, nil
 }
 
-func GetIncidentEventsByID(
-	clientset kubernetes.Interface,
-	service *v1.Service,
-	incidentID string,
-) (*EventsResponse, error) {
-	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
-		"http",
-		service.Name,
-		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
-		fmt.Sprintf("/incidents/%s", incidentID),
-		nil,
-	)
-
-	rawQuery, err := resp.DoRaw(context.Background())
-	if err != nil {
-		return nil, err
-	}
-
-	eventsResp := &EventsResponse{}
-
-	err = json.Unmarshal(rawQuery, eventsResp)
-	if err != nil {
-		return nil, err
-	}
-
-	return eventsResp, nil
-}
-
-func GetIncidentsByReleaseNamespace(
-	clientset kubernetes.Interface,
-	service *v1.Service,
-	releaseName, namespace string,
-) (*IncidentsResponse, error) {
-	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
-		"http",
-		service.Name,
-		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
-		fmt.Sprintf("/incidents/namespaces/%s/releases/%s", namespace, releaseName),
-		nil,
-	)
-
-	rawQuery, err := resp.DoRaw(context.Background())
-	if err != nil {
-		return nil, err
-	}
-
-	incidentsResp := &IncidentsResponse{}
-
-	err = json.Unmarshal(rawQuery, incidentsResp)
-	if err != nil {
-		return nil, err
-	}
-
-	return incidentsResp, nil
-}
-
-func GetLogs(
-	clientset kubernetes.Interface,
-	service *v1.Service,
-	logID string,
-) (*LogsResponse, error) {
-	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
-		"http",
-		service.Name,
-		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
-		fmt.Sprintf("/incidents/logs/%s", logID),
-		nil,
-	)
-
-	rawQuery, err := resp.DoRaw(context.Background())
-	if err != nil {
-		return nil, err
-	}
-
-	logsResp := &LogsResponse{}
-
-	err = json.Unmarshal(rawQuery, logsResp)
-	if err != nil {
-		return nil, err
-	}
-
-	return logsResp, nil
-}
+// func GetIncidentEventsByID(
+// 	clientset kubernetes.Interface,
+// 	service *v1.Service,
+// 	incidentID string,
+// ) (*EventsResponse, error) {
+// 	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
+// 		"http",
+// 		service.Name,
+// 		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
+// 		fmt.Sprintf("/incidents/%s", incidentID),
+// 		nil,
+// 	)
+
+// 	rawQuery, err := resp.DoRaw(context.Background())
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	eventsResp := &EventsResponse{}
+
+// 	err = json.Unmarshal(rawQuery, eventsResp)
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	return eventsResp, nil
+// }
+
+// func GetIncidentsByReleaseNamespace(
+// 	clientset kubernetes.Interface,
+// 	service *v1.Service,
+// 	releaseName, namespace string,
+// ) (*IncidentsResponse, error) {
+// 	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
+// 		"http",
+// 		service.Name,
+// 		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
+// 		fmt.Sprintf("/incidents/namespaces/%s/releases/%s", namespace, releaseName),
+// 		nil,
+// 	)
+
+// 	rawQuery, err := resp.DoRaw(context.Background())
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	incidentsResp := &IncidentsResponse{}
+
+// 	err = json.Unmarshal(rawQuery, incidentsResp)
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	return incidentsResp, nil
+// }
+
+// func GetLogs(
+// 	clientset kubernetes.Interface,
+// 	service *v1.Service,
+// 	logID string,
+// ) (*LogsResponse, error) {
+// 	resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
+// 		"http",
+// 		service.Name,
+// 		fmt.Sprintf("%d", service.Spec.Ports[0].Port),
+// 		fmt.Sprintf("/incidents/logs/%s", logID),
+// 		nil,
+// 	)
+
+// 	rawQuery, err := resp.DoRaw(context.Background())
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	logsResp := &LogsResponse{}
+
+// 	err = json.Unmarshal(rawQuery, logsResp)
+// 	if err != nil {
+// 		return nil, err
+// 	}
+
+// 	return logsResp, nil
+// }

+ 60 - 43
internal/kubernetes/porter_agent/v2/models.go

@@ -2,49 +2,6 @@ package v2
 
 import "time"
 
-type ContainerEvent struct {
-	Name     string `json:"container_name"`
-	Reason   string `json:"reason"`
-	Message  string `json:"message"`
-	LogID    string `json:"log_id"`
-	ExitCode int32  `json:"exit_code"`
-}
-
-type PodEvent struct {
-	EventID         string                     `json:"event_id"`
-	PodName         string                     `json:"pod_name"`
-	Namespace       string                     `json:"namespace"`
-	Cluster         string                     `json:"cluster"`
-	OwnerName       string                     `json:"release_name"`
-	OwnerType       string                     `json:"release_type"`
-	Timestamp       int64                      `json:"timestamp"`
-	Phase           string                     `json:"pod_phase"`
-	Status          string                     `json:"pod_status"`
-	Reason          string                     `json:"reason"`
-	Message         string                     `json:"message"`
-	ContainerEvents map[string]*ContainerEvent `json:"container_events"`
-}
-
-type IncidentsResponse struct {
-	Incidents []*Incident `json:"incidents" form:"required"`
-}
-
-type EventsResponse struct {
-	IncidentID    string      `json:"incident_id" form:"required"`
-	ChartName     string      `json:"chart_name"`
-	ReleaseName   string      `json:"release_name"`
-	CreatedAt     int64       `json:"created_at"`
-	UpdatedAt     int64       `json:"updated_at"`
-	LatestState   string      `json:"latest_state"`
-	LatestReason  string      `json:"latest_reason"`
-	LatestMessage string      `json:"latest_message"`
-	Events        []*PodEvent `json:"events" form:"required"`
-}
-
-type LogsResponse struct {
-	Contents string `json:"contents" form:"required"`
-}
-
 type SeverityType string
 
 const (
@@ -83,8 +40,68 @@ type IncidentMeta struct {
 	InvolvedObjectNamespace string             `json:"involved_object_namespace" form:"required"`
 }
 
+type PaginationRequest struct {
+	Page int64 `schema:"page"`
+}
+
+type PaginationResponse struct {
+	NumPages    int64 `json:"num_pages" form:"required"`
+	CurrentPage int64 `json:"current_page" form:"required"`
+	NextPage    int64 `json:"next_page" form:"required"`
+}
+
+type ListIncidentsRequest struct {
+	*PaginationRequest
+	Status           *IncidentStatus `schema:"status"`
+	ReleaseName      *string         `schema:"release_name"`
+	ReleaseNamespace *string         `schema:"release_namespace"`
+}
+
+type ListIncidentsResponse struct {
+	Incidents  []*IncidentMeta     `json:"incidents" form:"required"`
+	Pagination *PaginationResponse `json:"pagination"`
+}
+
 type Incident struct {
 	*IncidentMeta
 	Pods   []string `json:"pods" form:"required"`
 	Detail string   `json:"detail" form:"required"`
 }
+
+type IncidentEvent struct {
+	ID           string     `json:"id" form:"required"`
+	LastSeen     *time.Time `json:"last_seen" form:"required"`
+	PodName      string     `json:"pod_name" form:"required"`
+	PodNamespace string     `json:"pod_namespace" form:"required"`
+	Summary      string     `json:"summary" form:"required"`
+	Detail       string     `json:"detail" form:"required"`
+}
+
+type ListIncidentEventsRequest struct {
+	*PaginationRequest
+	PodName      *string `schema:"pod_name"`
+	PodNamespace *string `schema:"pod_namespace"`
+	Summary      *string `schema:"summary"`
+}
+
+type ListIncidentEventsResponse struct {
+	Events     []*IncidentEvent    `json:"events" form:"required"`
+	Pagination *PaginationResponse `json:"pagination"`
+}
+
+type GetLogRequest struct {
+	Limit      uint       `json:"limit"`
+	StartRange *time.Time `json:"start_range"`
+	EndRange   *time.Time `json:"end_range"`
+	Pods       []string   `json:"pods"`
+}
+
+type LogLine struct {
+	Timestamp *time.Time `json:"timestamp"`
+	Line      string     `json:"line"`
+}
+
+type GetLogResponse struct {
+	ContinueTime *time.Time `json:"continue_time"`
+	Logs         []LogLine  `json:"logs"`
+}