2
0

update_image_batch.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package release
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strings"
  6. "sync"
  7. "github.com/porter-dev/porter/internal/telemetry"
  8. "github.com/porter-dev/porter/api/server/authz"
  9. "github.com/porter-dev/porter/api/server/handlers"
  10. "github.com/porter-dev/porter/api/server/shared"
  11. "github.com/porter-dev/porter/api/server/shared/apierrors"
  12. "github.com/porter-dev/porter/api/server/shared/config"
  13. "github.com/porter-dev/porter/api/types"
  14. "github.com/porter-dev/porter/internal/helm"
  15. "github.com/porter-dev/porter/internal/models"
  16. )
  17. type UpdateImageBatchHandler struct {
  18. handlers.PorterHandlerReadWriter
  19. authz.KubernetesAgentGetter
  20. }
  21. func NewUpdateImageBatchHandler(
  22. config *config.Config,
  23. decoderValidator shared.RequestDecoderValidator,
  24. writer shared.ResultWriter,
  25. ) *UpdateImageBatchHandler {
  26. return &UpdateImageBatchHandler{
  27. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  28. KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
  29. }
  30. }
  31. func (c *UpdateImageBatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  32. ctx, span := telemetry.NewSpan(r.Context(), "serve-update-image-batch")
  33. defer span.End()
  34. r = r.Clone(ctx)
  35. cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
  36. // helmAgent has namespace set from the request
  37. helmAgent, err := c.GetHelmAgent(ctx, r, cluster, "")
  38. if err != nil {
  39. err = telemetry.Error(ctx, span, err, "error getting helm agent")
  40. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  41. return
  42. }
  43. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "namespace", Value: helmAgent.Namespace()})
  44. request := &types.UpdateImageBatchRequest{}
  45. if ok := c.DecodeAndValidate(w, r, request); !ok {
  46. _ = telemetry.Error(ctx, span, nil, "error decoding and validating request")
  47. return
  48. }
  49. releases, err := c.Repo().Release().ListReleasesByImageRepoURI(cluster.ID, request.ImageRepoURI)
  50. if err != nil {
  51. _ = telemetry.Error(ctx, span, err, "error listing releases by image repo uri")
  52. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(
  53. fmt.Errorf("releases not found with given image repo uri"),
  54. http.StatusBadRequest,
  55. ))
  56. return
  57. }
  58. printReleases := func(releases []*models.Release) string {
  59. var names []string
  60. for _, release := range releases {
  61. names = append(names, fmt.Sprintf("%s-%s", release.Name, release.Namespace))
  62. }
  63. return strings.Join(names, ",")
  64. }
  65. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "releases", Value: printReleases(releases)})
  66. var namespaceScopedReleases []*models.Release
  67. for _, release := range releases {
  68. if release.Namespace == helmAgent.Namespace() {
  69. namespaceScopedReleases = append(namespaceScopedReleases, release)
  70. }
  71. }
  72. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "releases-in-namespace", Value: printReleases(namespaceScopedReleases)})
  73. registries, err := c.Repo().Registry().ListRegistriesByProjectID(cluster.ProjectID)
  74. if err != nil {
  75. err = telemetry.Error(ctx, span, err, "error listing registries by project id")
  76. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  77. return
  78. }
  79. // asynchronously update releases with that image repo uri
  80. var wg sync.WaitGroup
  81. mu := &sync.Mutex{}
  82. errs := make([]string, 0)
  83. for i, rel := range namespaceScopedReleases {
  84. wg.Add(1)
  85. go func(release *models.Release, i int) {
  86. defer wg.Done()
  87. ctx, span := telemetry.NewSpan(ctx, "update-image-batch")
  88. defer span.End()
  89. telemetry.WithAttributes(span,
  90. telemetry.AttributeKV{Key: "release-name", Value: release.Name},
  91. telemetry.AttributeKV{Key: "release-index", Value: i},
  92. )
  93. // read release via agent
  94. rel, err := helmAgent.GetRelease(ctx, release.Name, 0, false)
  95. if err != nil {
  96. err = telemetry.Error(ctx, span, err, "error getting release")
  97. // if this is a release not found error, just return - the release has likely been deleted from the underlying
  98. // cluster but has not been deleted from the Porter database yet
  99. if strings.Contains(err.Error(), "release: not found") {
  100. return
  101. }
  102. mu.Lock()
  103. errs = append(errs, fmt.Sprintf("Error for %s, index %d: %s", release.Name, i, err.Error()))
  104. mu.Unlock()
  105. return
  106. }
  107. if rel.Chart.Name() == "job" {
  108. image := map[string]interface{}{}
  109. image["repository"] = release.ImageRepoURI
  110. image["tag"] = request.Tag
  111. rel.Config["image"] = image
  112. rel.Config["paused"] = true
  113. conf := &helm.UpgradeReleaseConfig{
  114. Name: release.Name,
  115. Cluster: cluster,
  116. Repo: c.Repo(),
  117. Registries: registries,
  118. Values: rel.Config,
  119. }
  120. _, err = helmAgent.UpgradeReleaseByValues(ctx, conf, c.Config().DOConf, c.Config().ServerConf.DisablePullSecretsInjection, false)
  121. if err != nil {
  122. err = telemetry.Error(ctx, span, err, "error upgrading release by values")
  123. // if this is a release not found error, just return - the release has likely been deleted from the underlying
  124. // cluster in the time since we've read the release, but has not been deleted from the Porter database yet
  125. if strings.Contains(err.Error(), "release: not found") {
  126. return
  127. }
  128. mu.Lock()
  129. errs = append(errs, fmt.Sprintf("Error for %s, index %d: %s", release.Name, i, err.Error()))
  130. mu.Unlock()
  131. }
  132. }
  133. }(rel, i)
  134. }
  135. wg.Wait()
  136. if len(errs) > 0 {
  137. err = fmt.Errorf("errors while deploying: %s", strings.Join(errs, ","))
  138. err = telemetry.Error(ctx, span, err, "errors while deploying")
  139. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  140. return
  141. }
  142. }