| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- package kube_events
- import (
- "errors"
- "fmt"
- "net/http"
- "net/url"
- "strings"
- "time"
- "github.com/porter-dev/porter/api/server/authz"
- "github.com/porter-dev/porter/api/server/handlers"
- "github.com/porter-dev/porter/api/server/shared"
- "github.com/porter-dev/porter/api/server/shared/apierrors"
- "github.com/porter-dev/porter/api/server/shared/config"
- "github.com/porter-dev/porter/api/types"
- "github.com/porter-dev/porter/internal/helm/grapher"
- "github.com/porter-dev/porter/internal/integrations/slack"
- "github.com/porter-dev/porter/internal/kubernetes"
- "github.com/porter-dev/porter/internal/models"
- "gorm.io/gorm"
- )
- type CreateKubeEventHandler struct {
- handlers.PorterHandlerReadWriter
- authz.KubernetesAgentGetter
- }
- func NewCreateKubeEventHandler(
- config *config.Config,
- decoderValidator shared.RequestDecoderValidator,
- writer shared.ResultWriter,
- ) *CreateKubeEventHandler {
- return &CreateKubeEventHandler{
- PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
- KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
- }
- }
- func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
- cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
- request := &types.CreateKubeEventRequest{}
- if ok := c.DecodeAndValidate(w, r, request); !ok {
- return
- }
- // Look for an event matching by the name, namespace, and was last updated within the
- // grouping threshold time. If so, we append a subevent to the existing event.
- kubeEvent, err := c.Repo().KubeEvent().ReadEventByGroup(proj.ID, cluster.ID, &types.GroupOptions{
- Name: request.Name,
- Namespace: request.Namespace,
- ResourceType: request.ResourceType,
- ThresholdTime: time.Now().Add(-15 * time.Minute),
- })
- foundMatchedEvent := kubeEvent != nil
- if !foundMatchedEvent {
- kubeEvent, err = c.Repo().KubeEvent().CreateEvent(&models.KubeEvent{
- ProjectID: proj.ID,
- ClusterID: cluster.ID,
- ResourceType: request.ResourceType,
- Name: request.Name,
- OwnerType: request.OwnerType,
- OwnerName: request.OwnerName,
- Namespace: request.Namespace,
- })
- if err != nil {
- c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
- return
- }
- }
- // append the subevent to the event
- err = c.Repo().KubeEvent().AppendSubEvent(kubeEvent, &models.KubeSubEvent{
- EventType: request.EventType,
- Message: request.Message,
- Reason: request.Reason,
- Timestamp: request.Timestamp,
- })
- if err != nil {
- c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
- return
- }
- w.WriteHeader(http.StatusCreated)
- if strings.ToLower(string(request.EventType)) == "critical" &&
- strings.ToLower(request.ResourceType) == "pod" &&
- request.Message != "Unable to determine the root cause of the error" {
- agent, err := c.GetAgent(r, cluster, request.Namespace)
- if err != nil {
- c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
- return
- }
- err = notifyPodCrashing(c.Config(), agent, proj, cluster, request)
- if err != nil {
- c.HandleAPIErrorNoWrite(w, r, apierrors.NewErrInternal(err))
- }
- }
- }
- func mapKubeEventToMessage(event *types.CreateKubeEventRequest) string {
- if strings.HasSuffix(event.Reason, "RunContainerError") {
- if strings.Contains(event.Message, "exec:") {
- return fmt.Sprintf("Application launch error: %s\n",
- strings.Split(strings.SplitAfter(event.Message, "exec: ")[1], ": unknown")[0])
- }
- } else if strings.HasSuffix(event.Reason, "ImagePullBackOff") {
- return "Deployment error: The application image could not be pulled from the registry"
- }
- return event.Message
- }
- func notifyPodCrashing(
- config *config.Config,
- agent *kubernetes.Agent,
- project *models.Project,
- cluster *models.Cluster,
- event *types.CreateKubeEventRequest,
- ) error {
- // if cluster has notifications turned off, don't alert
- if cluster.NotificationsDisabled {
- return nil
- }
- // attempt to get a matching Porter release to get the notification configuration
- var conf *models.NotificationConfig
- var notifConfig *types.NotificationConfig
- var notifyOpts *slack.NotifyOpts
- var matchedRel *models.Release
- var err error
- if isJob := strings.ToLower(event.OwnerType) == "job"; isJob {
- // check that the job alert is valid and get proper message
- jobOwner, jobMsg, jobName, shouldAlert, err := getJobAlert(agent, event.Name, event.Namespace)
- if err != nil {
- return err
- } else if !shouldAlert {
- return nil
- }
- // look for a matching job notification config
- jobNC, err := config.Repo.JobNotificationConfig().ReadNotificationConfig(project.ID, cluster.ID, jobName, event.Namespace)
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- return err
- }
- if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
- // if the job notification config does not exist, create it
- jobNC = &models.JobNotificationConfig{
- Name: jobName,
- Namespace: event.Namespace,
- ProjectID: project.ID,
- ClusterID: cluster.ID,
- LastNotifiedTime: time.Now(),
- }
- jobNC, err = config.Repo.JobNotificationConfig().CreateNotificationConfig(jobNC)
- if err != nil {
- return err
- }
- } else if err != nil {
- return err
- } else if err == nil && jobNC != nil {
- // If the job notification config does exist, check if the job notification config states that
- // a notification should happen. If so, notify.
- if !jobNC.ShouldNotify() {
- return nil
- }
- }
- notifyOpts = &slack.NotifyOpts{
- ProjectID: cluster.ProjectID,
- ClusterID: cluster.ID,
- ClusterName: cluster.Name,
- Name: jobOwner,
- Namespace: event.Namespace,
- Info: fmt.Sprintf("%s", jobMsg),
- Timestamp: &event.Timestamp,
- URL: fmt.Sprintf(
- "%s/jobs/%s/%s/%s?project_id=%d&job=%s",
- config.ServerConf.ServerURL,
- cluster.Name,
- event.Namespace,
- jobOwner,
- cluster.ProjectID,
- jobName,
- ),
- }
- } else {
- matchedRel := getMatchedPorterRelease(config, cluster.ID, event.OwnerName, event.Namespace)
- // for now, we only notify for Porter releases that have been deployed through Porter
- if matchedRel == nil {
- return nil
- }
- conf, err = config.Repo.NotificationConfig().ReadNotificationConfig(matchedRel.NotificationConfig)
- if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
- conf = &models.NotificationConfig{
- Enabled: true,
- Success: true,
- Failure: true,
- }
- conf, err = config.Repo.NotificationConfig().CreateNotificationConfig(conf)
- if err != nil {
- return err
- }
- if err != nil {
- return err
- }
- matchedRel.NotificationConfig = conf.ID
- matchedRel, err = config.Repo.Release().UpdateRelease(matchedRel)
- if err != nil {
- return err
- }
- notifConfig = conf.ToNotificationConfigType()
- } else if err != nil {
- return err
- } else if err == nil && conf != nil {
- if !conf.ShouldNotify() {
- return nil
- }
- notifConfig = conf.ToNotificationConfigType()
- }
- notifyOpts = &slack.NotifyOpts{
- ProjectID: cluster.ProjectID,
- ClusterID: cluster.ID,
- ClusterName: cluster.Name,
- Name: event.OwnerName,
- Namespace: event.Namespace,
- Info: mapKubeEventToMessage(event),
- URL: fmt.Sprintf(
- "%s/applications/%s/%s/%s?project_id=%d",
- config.ServerConf.ServerURL,
- url.PathEscape(cluster.Name),
- matchedRel.Namespace,
- matchedRel.Name,
- cluster.ProjectID,
- ),
- }
- }
- slackInts, _ := config.Repo.SlackIntegration().ListSlackIntegrationsByProjectID(project.ID)
- notifier := slack.NewSlackNotifier(notifConfig, slackInts...)
- notifyOpts.Status = slack.StatusPodCrashed
- err = notifier.Notify(notifyOpts)
- if err != nil {
- return err
- }
- // update the last updated time
- if matchedRel != nil && conf != nil {
- conf.LastNotifiedTime = time.Now()
- conf, err = config.Repo.NotificationConfig().UpdateNotificationConfig(conf)
- }
- return err
- }
- // getMatchedPorterRelease attempts to find a matching Porter release from the name of a controller.
- // For example, if the controller has a suffix "-web", it is likely a Porter web application, and
- // so we query for a Porter release with a matching name. Returns nil if no match is found
- func getMatchedPorterRelease(config *config.Config, clusterID uint, ownerName, namespace string) *models.Release {
- matchingName := ""
- if strings.Contains(ownerName, "-web") {
- matchingName = strings.Split(ownerName, "-web")[0]
- } else if strings.Contains(ownerName, "-worker") {
- matchingName = strings.Split(ownerName, "-worker")[0]
- } else if strings.Contains(ownerName, "-job") {
- matchingName = strings.Split(ownerName, "-job")[0]
- }
- rel, err := config.Repo.Release().ReadRelease(clusterID, matchingName, namespace)
- if err != nil {
- return nil
- }
- return rel
- }
- func getJobAlert(agent *kubernetes.Agent, name, namespace string) (
- ownerName string,
- msg string,
- jobName string,
- shouldAlert bool,
- err error,
- ) {
- ownerName = ""
- pod, err := agent.GetPodByName(name, namespace)
- // if the pod is not found, we should not alert for this pod
- if err != nil && errors.Is(err, kubernetes.IsNotFoundError) {
- return "", "", "", false, nil
- } else if err != nil {
- return "", "", "", false, err
- }
- ownerJobName := ""
- // get the owner name for the pod by looking at the owner reference
- if ownerRefArr := pod.ObjectMeta.OwnerReferences; len(ownerRefArr) > 0 {
- for _, ownerRef := range ownerRefArr {
- if strings.ToLower(ownerRef.Kind) == "job" {
- ownerJobName = ownerRef.Name
- }
- }
- }
- if ownerJobName == "" {
- return "", "", "", false, nil
- }
- // lookup the job in the cluster
- job, err := agent.GetJob(grapher.Object{
- Kind: "Job",
- Name: ownerJobName,
- Namespace: namespace,
- })
- if err != nil {
- return "", "", "", false, nil
- }
- if jobReleaseLabel, exists := job.ObjectMeta.Labels["meta.helm.sh/release-name"]; exists {
- ownerName = jobReleaseLabel
- }
- // if we don't have an owner name, don't alert -- the link will be broken
- if ownerName == "" {
- return "", "", "", false, nil
- }
- // only alert for jobs that are newer than 24 hours
- if podTime := pod.Status.StartTime; podTime != nil && podTime.After(time.Now().Add(-24*time.Hour)) {
- // find container statuses relating to the actual job container. We don't alert on sidecar containers
- for _, containerStatus := range pod.Status.ContainerStatuses {
- if containerStatus.Name != "sidecar" && containerStatus.Name != "cloud-sql-proxy" {
- state := containerStatus.State
- if state.Terminated != nil && state.Terminated.ExitCode != 0 {
- // before alerting, we check pod events to make sure the pod was not moved due to normal behavior such as scale down
- events, err := agent.ListEvents(name, namespace)
- if err == nil && len(events.Items) > 0 {
- for _, event := range events.Items {
- // if event is ScaleDown, don't alert
- if event.Reason == "ScaleDown" && strings.Contains(event.Message, "deleting pod for node scale down") {
- return ownerName, "", ownerJobName, false, nil
- }
- }
- }
- // next, if the exit code is 255, we check that the job doesn't have a different associated pod.
- // exit code 255 can mean this pod was moved to a different node due to node eviction, scaledown,
- // unhealthy node, etc
- if state.Terminated.ExitCode == 255 {
- jobPods, err := agent.GetJobPods(namespace, ownerJobName)
- if err == nil && len(jobPods) > 0 {
- for _, jobPod := range jobPods {
- if jobPod.ObjectMeta.Name != name {
- return ownerName, "", ownerJobName, false, nil
- }
- }
- }
- }
- msg := fmt.Sprintf("Job terminated with non-zero exit code: exit code %d.", state.Terminated.ExitCode)
- if state.Terminated.Message != "" {
- msg += fmt.Sprintf(" Error: %s", state.Terminated.Message)
- }
- return ownerName, msg, ownerJobName, true, nil
- }
- }
- }
- }
- return "", "", "", false, nil
- }
|