create.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package kube_events
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strings"
  8. "time"
  9. "github.com/porter-dev/porter/api/server/authz"
  10. "github.com/porter-dev/porter/api/server/handlers"
  11. "github.com/porter-dev/porter/api/server/shared"
  12. "github.com/porter-dev/porter/api/server/shared/apierrors"
  13. "github.com/porter-dev/porter/api/server/shared/config"
  14. "github.com/porter-dev/porter/api/types"
  15. "github.com/porter-dev/porter/internal/integrations/slack"
  16. "github.com/porter-dev/porter/internal/models"
  17. "gorm.io/gorm"
  18. )
  19. type CreateKubeEventHandler struct {
  20. handlers.PorterHandlerReadWriter
  21. authz.KubernetesAgentGetter
  22. }
  23. func NewCreateKubeEventHandler(
  24. config *config.Config,
  25. decoderValidator shared.RequestDecoderValidator,
  26. writer shared.ResultWriter,
  27. ) *CreateKubeEventHandler {
  28. return &CreateKubeEventHandler{
  29. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  30. KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
  31. }
  32. }
  33. func (c *CreateKubeEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  34. proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
  35. cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
  36. request := &types.CreateKubeEventRequest{}
  37. if ok := c.DecodeAndValidate(w, r, request); !ok {
  38. return
  39. }
  40. // Look for an event matching by the name, namespace, and was last updated within the
  41. // grouping threshold time. If so, we append a subevent to the existing event.
  42. kubeEvent, err := c.Repo().KubeEvent().ReadEventByGroup(proj.ID, cluster.ID, &types.GroupOptions{
  43. Name: request.Name,
  44. Namespace: request.Namespace,
  45. ResourceType: request.ResourceType,
  46. ThresholdTime: time.Now().Add(-15 * time.Minute),
  47. })
  48. foundMatchedEvent := kubeEvent != nil
  49. if !foundMatchedEvent {
  50. kubeEvent, err = c.Repo().KubeEvent().CreateEvent(&models.KubeEvent{
  51. ProjectID: proj.ID,
  52. ClusterID: cluster.ID,
  53. ResourceType: request.ResourceType,
  54. Name: request.Name,
  55. OwnerType: request.OwnerType,
  56. OwnerName: request.OwnerName,
  57. Namespace: request.Namespace,
  58. })
  59. if err != nil {
  60. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  61. return
  62. }
  63. }
  64. // append the subevent to the event
  65. err = c.Repo().KubeEvent().AppendSubEvent(kubeEvent, &models.KubeSubEvent{
  66. EventType: request.EventType,
  67. Message: request.Message,
  68. Reason: request.Reason,
  69. Timestamp: request.Timestamp,
  70. })
  71. if err != nil {
  72. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  73. return
  74. }
  75. w.WriteHeader(http.StatusCreated)
  76. if strings.ToLower(string(request.EventType)) == "critical" && strings.ToLower(request.ResourceType) == "pod" {
  77. err := notifyPodCrashing(c.Config(), proj, cluster, request)
  78. if err != nil {
  79. c.HandleAPIErrorNoWrite(w, r, apierrors.NewErrInternal(err))
  80. }
  81. }
  82. }
  83. func notifyPodCrashing(
  84. config *config.Config,
  85. project *models.Project,
  86. cluster *models.Cluster,
  87. event *types.CreateKubeEventRequest,
  88. ) error {
  89. // attempt to get a matching Porter release to get the notification configuration
  90. var conf *models.NotificationConfig
  91. var notifConfig *types.NotificationConfig
  92. var err error
  93. matchedRel := getMatchedPorterRelease(config, cluster.ID, event.OwnerName, event.Namespace)
  94. // for now, we only notify for Porter releases that have been deployed through Porter
  95. if matchedRel == nil {
  96. return nil
  97. }
  98. conf, err = config.Repo.NotificationConfig().ReadNotificationConfig(matchedRel.NotificationConfig)
  99. if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
  100. conf = &models.NotificationConfig{
  101. Enabled: true,
  102. Success: true,
  103. Failure: true,
  104. }
  105. conf, err = config.Repo.NotificationConfig().CreateNotificationConfig(conf)
  106. if err != nil {
  107. return err
  108. }
  109. if err != nil {
  110. return err
  111. }
  112. matchedRel.NotificationConfig = conf.ID
  113. matchedRel, err = config.Repo.Release().UpdateRelease(matchedRel)
  114. if err != nil {
  115. return err
  116. }
  117. notifConfig = conf.ToNotificationConfigType()
  118. } else if err != nil {
  119. return err
  120. } else if err == nil && conf != nil {
  121. if !conf.ShouldNotify() {
  122. return nil
  123. }
  124. notifConfig = conf.ToNotificationConfigType()
  125. }
  126. slackInts, _ := config.Repo.SlackIntegration().ListSlackIntegrationsByProjectID(project.ID)
  127. notifier := slack.NewSlackNotifier(notifConfig, slackInts...)
  128. notifyOpts := &slack.NotifyOpts{
  129. ProjectID: cluster.ProjectID,
  130. ClusterID: cluster.ID,
  131. ClusterName: cluster.Name,
  132. Name: event.OwnerName,
  133. Namespace: event.Namespace,
  134. Info: fmt.Sprintf("%s:%s", event.Reason, event.Message),
  135. URL: fmt.Sprintf(
  136. "%s/applications/%s/%s/%s?project_id=%d",
  137. config.ServerConf.ServerURL,
  138. url.PathEscape(cluster.Name),
  139. matchedRel.Namespace,
  140. matchedRel.Name,
  141. cluster.ProjectID,
  142. ),
  143. }
  144. notifyOpts.Status = slack.StatusPodCrashed
  145. err = notifier.Notify(notifyOpts)
  146. if err != nil {
  147. return err
  148. }
  149. // update the last updated time
  150. if matchedRel != nil && conf != nil {
  151. conf.LastNotifiedTime = time.Now()
  152. conf, err = config.Repo.NotificationConfig().UpdateNotificationConfig(conf)
  153. }
  154. return err
  155. }
  156. // getMatchedPorterRelease attempts to find a matching Porter release from the name of a controller.
  157. // For example, if the controller has a suffix "-web", it is likely a Porter web application, and
  158. // so we query for a Porter release with a matching name. Returns nil if no match is found
  159. func getMatchedPorterRelease(config *config.Config, clusterID uint, ownerName, namespace string) *models.Release {
  160. matchingName := ""
  161. if strings.Contains(ownerName, "-web") {
  162. matchingName = strings.Split(ownerName, "-web")[0]
  163. } else if strings.Contains(ownerName, "-worker") {
  164. matchingName = strings.Split(ownerName, "-worker")[0]
  165. } else if strings.Contains(ownerName, "-job") {
  166. matchingName = strings.Split(ownerName, "-job")[0]
  167. }
  168. rel, err := config.Repo.Release().ReadRelease(clusterID, matchingName, namespace)
  169. if err != nil {
  170. return nil
  171. }
  172. return rel
  173. }