create.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package kube_events
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strings"
  8. "time"
  9. "github.com/porter-dev/porter/api/server/authz"
  10. "github.com/porter-dev/porter/api/server/handlers"
  11. "github.com/porter-dev/porter/api/server/shared"
  12. "github.com/porter-dev/porter/api/server/shared/apierrors"
  13. "github.com/porter-dev/porter/api/server/shared/config"
  14. "github.com/porter-dev/porter/api/types"
  15. "github.com/porter-dev/porter/internal/integrations/slack"
  16. "github.com/porter-dev/porter/internal/models"
  17. "gorm.io/gorm"
  18. )
  19. type CreateKubeEventHandler struct {
  20. handlers.PorterHandlerReadWriter
  21. authz.KubernetesAgentGetter
  22. }
  23. func NewCreateKubeEventHandler(
  24. config *config.Config,
  25. decoderValidator shared.RequestDecoderValidator,
  26. writer shared.ResultWriter,
  27. ) *CreateKubeEventHandler {
  28. return &CreateKubeEventHandler{
  29. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  30. KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
  31. }
  32. }
  33. func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  34. proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
  35. cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
  36. request := &types.CreateKubeEventRequest{}
  37. if ok := c.DecodeAndValidate(w, r, request); !ok {
  38. return
  39. }
  40. // Look for an event matching by the name, namespace, and was last updated within the
  41. // grouping threshold time. If so, we append a subevent to the existing event.
  42. kubeEvent, err := c.Repo().KubeEvent().ReadEventByGroup(proj.ID, cluster.ID, &types.GroupOptions{
  43. Name: request.Name,
  44. Namespace: request.Namespace,
  45. ResourceType: request.ResourceType,
  46. ThresholdTime: time.Now().Add(-15 * time.Minute),
  47. })
  48. foundMatchedEvent := kubeEvent != nil
  49. if !foundMatchedEvent {
  50. kubeEvent, err = c.Repo().KubeEvent().CreateEvent(&models.KubeEvent{
  51. ProjectID: proj.ID,
  52. ClusterID: cluster.ID,
  53. ResourceType: request.ResourceType,
  54. Name: request.Name,
  55. OwnerType: request.OwnerType,
  56. OwnerName: request.OwnerName,
  57. Namespace: request.Namespace,
  58. })
  59. if err != nil {
  60. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  61. return
  62. }
  63. }
  64. // append the subevent to the event
  65. err = c.Repo().KubeEvent().AppendSubEvent(kubeEvent, &models.KubeSubEvent{
  66. EventType: request.EventType,
  67. Message: request.Message,
  68. Reason: request.Reason,
  69. Timestamp: request.Timestamp,
  70. })
  71. if err != nil {
  72. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  73. return
  74. }
  75. w.WriteHeader(http.StatusCreated)
  76. if strings.ToLower(string(request.EventType)) == "critical" && strings.ToLower(request.ResourceType) == "pod" {
  77. err := notifyPodCrashing(c.Config(), proj, cluster, request)
  78. if err != nil {
  79. c.HandleAPIErrorNoWrite(w, r, apierrors.NewErrInternal(err))
  80. }
  81. }
  82. }
  83. func notifyPodCrashing(
  84. config *config.Config,
  85. project *models.Project,
  86. cluster *models.Cluster,
  87. event *types.CreateKubeEventRequest,
  88. ) error {
  89. // attempt to get a matching Porter release to get the notification configuration
  90. var conf *models.NotificationConfig
  91. var notifConfig *types.NotificationConfig
  92. var err error
  93. matchedRel := getMatchedPorterRelease(config, cluster.ID, event.OwnerName, event.Namespace)
  94. // for now, we only notify for Porter releases that have been deployed through Porter
  95. if matchedRel == nil {
  96. return nil
  97. }
  98. conf, err = config.Repo.NotificationConfig().ReadNotificationConfig(matchedRel.NotificationConfig)
  99. if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
  100. conf = &models.NotificationConfig{
  101. Enabled: true,
  102. Success: true,
  103. Failure: true,
  104. }
  105. conf, err = config.Repo.NotificationConfig().CreateNotificationConfig(conf)
  106. if err == nil {
  107. notifConfig = conf.ToNotificationConfigType()
  108. }
  109. } else if err != nil {
  110. return err
  111. } else if err == nil && conf != nil {
  112. if !conf.ShouldNotify() {
  113. return nil
  114. }
  115. notifConfig = conf.ToNotificationConfigType()
  116. }
  117. slackInts, _ := config.Repo.SlackIntegration().ListSlackIntegrationsByProjectID(project.ID)
  118. notifier := slack.NewSlackNotifier(notifConfig, slackInts...)
  119. notifyOpts := &slack.NotifyOpts{
  120. ProjectID: cluster.ProjectID,
  121. ClusterID: cluster.ID,
  122. ClusterName: cluster.Name,
  123. Name: event.OwnerName,
  124. Namespace: event.Namespace,
  125. Info: fmt.Sprintf("%s:%s", event.Reason, event.Message),
  126. URL: fmt.Sprintf(
  127. "%s/applications/%s/%s/%s?project_id=%d",
  128. config.ServerConf.ServerURL,
  129. url.PathEscape(cluster.Name),
  130. matchedRel.Namespace,
  131. matchedRel.Name,
  132. cluster.ProjectID,
  133. ),
  134. }
  135. notifyOpts.Status = slack.StatusPodCrashed
  136. err = notifier.Notify(notifyOpts)
  137. if err != nil {
  138. return err
  139. }
  140. // update the last updated time
  141. if matchedRel != nil && conf != nil {
  142. conf.LastNotifiedTime = time.Now()
  143. conf, err = config.Repo.NotificationConfig().UpdateNotificationConfig(conf)
  144. }
  145. return err
  146. }
  147. // getMatchedPorterRelease attempts to find a matching Porter release from the name of a controller.
  148. // For example, if the controller has a suffix "-web", it is likely a Porter web application, and
  149. // so we query for a Porter release with a matching name. Returns nil if no match is found
  150. func getMatchedPorterRelease(config *config.Config, clusterID uint, ownerName, namespace string) *models.Release {
  151. matchingName := ""
  152. if strings.Contains(ownerName, "-web") {
  153. matchingName = strings.Split(ownerName, "-web")[0]
  154. } else if strings.Contains(ownerName, "-worker") {
  155. matchingName = strings.Split(ownerName, "-worker")[0]
  156. } else if strings.Contains(ownerName, "-job") {
  157. matchingName = strings.Split(ownerName, "-job")[0]
  158. }
  159. rel, err := config.Repo.Release().ReadRelease(clusterID, matchingName, namespace)
  160. if err != nil {
  161. return nil
  162. }
  163. return rel
  164. }