create_stacks_env_group.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. package namespace
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "sigs.k8s.io/yaml"
  10. "github.com/google/uuid"
  11. "github.com/stefanmcshane/helm/pkg/release"
  12. "github.com/porter-dev/porter/api/server/authz"
  13. "github.com/porter-dev/porter/api/server/handlers"
  14. "github.com/porter-dev/porter/api/server/shared"
  15. "github.com/porter-dev/porter/api/server/shared/apierrors"
  16. "github.com/porter-dev/porter/api/server/shared/config"
  17. "github.com/porter-dev/porter/api/types"
  18. "github.com/porter-dev/porter/internal/helm"
  19. "github.com/porter-dev/porter/internal/helm/loader"
  20. "github.com/porter-dev/porter/internal/kubernetes"
  21. "github.com/porter-dev/porter/internal/kubernetes/envgroup"
  22. "github.com/porter-dev/porter/internal/models"
  23. "github.com/porter-dev/porter/internal/repository"
  24. "github.com/porter-dev/porter/internal/telemetry"
  25. "github.com/stefanmcshane/helm/pkg/chart"
  26. )
  27. type CreateStacksEnvGroupHandler struct {
  28. handlers.PorterHandlerReadWriter
  29. authz.KubernetesAgentGetter
  30. }
  31. func NewCreateStacksEnvGroupHandler(
  32. config *config.Config,
  33. decoderValidator shared.RequestDecoderValidator,
  34. writer shared.ResultWriter,
  35. ) *CreateStacksEnvGroupHandler {
  36. return &CreateStacksEnvGroupHandler{
  37. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  38. KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
  39. }
  40. }
  41. func (c *CreateStacksEnvGroupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  42. request := &types.CreateStacksEnvGroupRequest{}
  43. ctx := r.Context()
  44. cluster, _ := ctx.Value(types.ClusterScope).(*models.Cluster)
  45. ctx, span := telemetry.NewSpan(ctx, "serve-create-env-group-stacks")
  46. defer span.End()
  47. if ok := c.DecodeAndValidate(w, r, request); !ok {
  48. return
  49. }
  50. namespace := ctx.Value(types.NamespaceScope).(string)
  51. agent, err := c.GetAgent(r, cluster, namespace)
  52. if err != nil {
  53. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting agent"))
  54. return
  55. }
  56. // if the environment group exists and has MetaVersion=1, throw an error
  57. aggregateReleases := []*release.Release{}
  58. for i := range request.Apps {
  59. namespaceStack := "porter-stack-" + request.Apps[i]
  60. helmAgent, err := c.GetHelmAgent(ctx, r, cluster, namespaceStack)
  61. if err != nil {
  62. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting agent"))
  63. return
  64. }
  65. releases, err := envgroup.GetStackSyncedReleases(helmAgent, namespaceStack)
  66. if err != nil {
  67. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting releases"))
  68. return
  69. }
  70. aggregateReleases = append(aggregateReleases, releases...)
  71. }
  72. errors := rolloutStacksApplications(c, c.Config(), cluster, request.Name, namespace, agent, aggregateReleases, r, ctx, w)
  73. if len(errors) > 0 {
  74. errStrArr := make([]string, 0)
  75. for _, err := range errors {
  76. errStrArr = append(errStrArr, err.Error())
  77. }
  78. c.HandleAPIErrorNoWrite(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting adding env group"))
  79. return
  80. }
  81. c.WriteResult(w, r, nil)
  82. }
  83. func rolloutStacksApplications(
  84. c *CreateStacksEnvGroupHandler,
  85. config *config.Config,
  86. cluster *models.Cluster,
  87. envGroupName string,
  88. namespace string,
  89. agent *kubernetes.Agent,
  90. releases []*release.Release,
  91. r *http.Request,
  92. ctx context.Context,
  93. w http.ResponseWriter,
  94. ) []error {
  95. registries, err := config.Repo.Registry().ListRegistriesByProjectID(cluster.ProjectID)
  96. if err != nil {
  97. return []error{err}
  98. }
  99. ctx, span := telemetry.NewSpan(ctx, "rollout-env-group-stacks")
  100. // asynchronously update releases with that image repo uri
  101. var wg sync.WaitGroup
  102. mu := &sync.Mutex{}
  103. errors := make([]error, 0)
  104. for i, rel := range releases {
  105. index := i
  106. release := rel
  107. wg.Add(1)
  108. suffix := "-r"
  109. releaseName := release.Name
  110. if strings.HasSuffix(release.Name, suffix) {
  111. releaseName = strings.TrimSuffix(releaseName, suffix)
  112. }
  113. cm, _, err := agent.GetLatestVersionedConfigMap(envGroupName, "porter-stack-"+releaseName)
  114. if err != nil {
  115. return []error{err}
  116. }
  117. versionStr, ok := cm.ObjectMeta.Labels["version"]
  118. if !ok {
  119. return []error{err}
  120. }
  121. versionInt, err := strconv.Atoi(versionStr)
  122. if err != nil {
  123. return []error{err}
  124. }
  125. version := uint(versionInt)
  126. newSection := &SyncedEnvSection{
  127. Name: envGroupName,
  128. Version: version,
  129. }
  130. newSectionKeys := make([]SyncedEnvSectionKey, 0)
  131. for key, val := range cm.Data {
  132. newSectionKeys = append(newSectionKeys, SyncedEnvSectionKey{
  133. Name: key,
  134. Secret: strings.Contains(val, "PORTERSECRET"),
  135. })
  136. }
  137. newSection.Keys = newSectionKeys
  138. go func() {
  139. defer wg.Done()
  140. // read release via agent
  141. newConfig, err := getNewStacksConfig(release.Config, newSection, release)
  142. if err != nil {
  143. mu.Lock()
  144. errors = append(errors, err)
  145. mu.Unlock()
  146. return
  147. }
  148. // if this is a job chart, update the config and set correct paused param to true
  149. if release.Chart.Name() == "job" {
  150. newConfig["paused"] = true
  151. }
  152. if !strings.HasSuffix(release.Name, suffix) {
  153. if req := releases[index].Chart.Metadata.Dependencies; req != nil {
  154. for _, dep := range req {
  155. dep.Name = getType(dep.Name)
  156. }
  157. }
  158. metadata := &chart.Metadata{
  159. Name: "umbrella",
  160. Description: "Web application that is exposed to external traffic.",
  161. Version: "0.96.0",
  162. APIVersion: "v2",
  163. Home: "https://getporter.dev/",
  164. Icon: "https://user-images.githubusercontent.com/65516095/111255214-07d3da80-85ed-11eb-99e2-fddcbdb99bdb.png",
  165. Keywords: []string{
  166. "porter",
  167. "application",
  168. "service",
  169. "umbrella",
  170. },
  171. Type: "application",
  172. Dependencies: releases[index].Chart.Metadata.Dependencies,
  173. }
  174. charter := &chart.Chart{
  175. Metadata: metadata,
  176. }
  177. conf := &helm.InstallChartConfig{
  178. Chart: charter,
  179. Name: releases[index].Name,
  180. Namespace: "porter-stack-" + releases[index].Name,
  181. Values: newConfig,
  182. Cluster: cluster,
  183. Repo: config.Repo,
  184. Registries: registries,
  185. }
  186. helmAgent, err := c.GetHelmAgent(ctx, r, cluster, "porter-stack-"+releases[index].Name)
  187. if err != nil {
  188. fmt.Println("Could Not Get Helm Agent ")
  189. return
  190. }
  191. _, err = helmAgent.UpgradeInstallChart(ctx, conf, config.DOConf, config.ServerConf.DisablePullSecretsInjection)
  192. if err != nil {
  193. mu.Lock()
  194. errors = append(errors, err)
  195. mu.Unlock()
  196. return
  197. }
  198. } else {
  199. helmAgent, err := c.GetHelmAgent(ctx, r, cluster, "porter-stack-"+releaseName)
  200. if err != nil {
  201. fmt.Println("Could Not Get Helm Agent ")
  202. return
  203. }
  204. helmRelease, err := helmAgent.GetRelease(ctx, rel.Name, 0, false)
  205. if err != nil {
  206. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "creating-pre-deploy-job", Value: true})
  207. conf, err := createReleaseJobChart(
  208. ctx,
  209. releaseName,
  210. newConfig,
  211. c.Config().ServerConf.DefaultApplicationHelmRepoURL,
  212. registries,
  213. cluster,
  214. c.Repo(),
  215. )
  216. if err != nil {
  217. err = telemetry.Error(ctx, span, err, "error making config for pre-deploy job chart")
  218. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  219. return
  220. }
  221. _, err = helmAgent.InstallChart(ctx, conf, c.Config().DOConf, c.Config().ServerConf.DisablePullSecretsInjection)
  222. if err != nil {
  223. err = telemetry.Error(ctx, span, err, "error installing pre-deploy job chart")
  224. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  225. _, uninstallChartErr := helmAgent.UninstallChart(ctx, fmt.Sprintf("%s-r", releaseName))
  226. if uninstallChartErr != nil {
  227. uninstallChartErr = telemetry.Error(ctx, span, err, "error uninstalling pre-deploy job chart after failed install")
  228. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(uninstallChartErr, http.StatusInternalServerError))
  229. }
  230. return
  231. }
  232. } else {
  233. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "updating-pre-deploy-job", Value: true})
  234. chart, err := loader.LoadChartPublic(ctx, c.Config().Metadata.DefaultAppHelmRepoURL, "job", "")
  235. if err != nil {
  236. err = telemetry.Error(ctx, span, err, "error loading latest job chart")
  237. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  238. return
  239. }
  240. conf := &helm.UpgradeReleaseConfig{
  241. Name: helmRelease.Name,
  242. Cluster: cluster,
  243. Repo: c.Repo(),
  244. Registries: registries,
  245. Values: newConfig,
  246. Chart: chart,
  247. }
  248. _, err = helmAgent.UpgradeReleaseByValues(ctx, conf, c.Config().DOConf, c.Config().ServerConf.DisablePullSecretsInjection, false)
  249. if err != nil {
  250. err = telemetry.Error(ctx, span, err, "error upgrading pre-deploy job chart")
  251. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  252. return
  253. }
  254. }
  255. }
  256. }()
  257. app, err := c.Repo().PorterApp().ReadPorterAppByName(cluster.ID, releases[index].Name)
  258. ctx, span := telemetry.NewSpan(ctx, "serve-update-porter-app")
  259. updatedPorterApp, err := c.Repo().PorterApp().UpdatePorterApp(app)
  260. if err != nil {
  261. err = telemetry.Error(ctx, span, err, "error writing updated app to DB")
  262. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  263. return []error{err}
  264. }
  265. imageInfo := attemptToGetImageInfoFromRelease(releases[i].Config)
  266. _, err = createPorterAppEvent(ctx, "SUCCESS", updatedPorterApp.ID, releases[i].Version+1, imageInfo.Tag, c.Repo().PorterAppEvent())
  267. if err != nil {
  268. err = telemetry.Error(ctx, span, err, "error creating porter app event")
  269. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  270. return []error{err}
  271. }
  272. }
  273. wg.Wait()
  274. return errors
  275. }
  276. func getNewStacksConfig(curr map[string]interface{}, syncedEnvSection *SyncedEnvSection, release *release.Release) (map[string]interface{}, error) {
  277. // look for container.env.synced
  278. aggEnvConf := make(map[string]interface{})
  279. for _, dep := range release.Chart.Metadata.Dependencies {
  280. envConf, err := getStacksNestedMap(curr, dep.Name, "container", "env")
  281. normalKeys, ok := envConf["normal"].(map[string]interface{})
  282. if !ok {
  283. fmt.Println("Normal Keys", normalKeys)
  284. }
  285. if err != nil {
  286. return nil, err
  287. }
  288. for k, v := range envConf {
  289. aggEnvConf[k] = v
  290. }
  291. syncedEnvInter, syncedEnvExists := envConf["synced"]
  292. if !syncedEnvExists {
  293. return curr, nil
  294. } else {
  295. syncedArr := make([]*SyncedEnvSection, 0)
  296. syncedArrInter, ok := syncedEnvInter.([]interface{})
  297. if !ok {
  298. return nil, fmt.Errorf("could not convert to synced env section: not an array")
  299. }
  300. for _, syncedArrInterObj := range syncedArrInter {
  301. syncedArrObj := &SyncedEnvSection{}
  302. syncedArrInterObjMap, ok := syncedArrInterObj.(map[string]interface{})
  303. if !ok {
  304. continue
  305. }
  306. if nameField, nameFieldExists := syncedArrInterObjMap["name"]; nameFieldExists {
  307. syncedArrObj.Name, ok = nameField.(string)
  308. if !ok {
  309. continue
  310. }
  311. }
  312. if versionField, versionFieldExists := syncedArrInterObjMap["version"]; versionFieldExists {
  313. versionFloat, ok := versionField.(float64)
  314. if !ok {
  315. continue
  316. }
  317. syncedArrObj.Version = uint(versionFloat)
  318. }
  319. if keyField, keyFieldExists := syncedArrInterObjMap["keys"]; keyFieldExists {
  320. keyFieldInterArr, ok := keyField.([]interface{})
  321. if !ok {
  322. continue
  323. }
  324. keyFieldMapArr := make([]map[string]interface{}, 0)
  325. for _, keyFieldInter := range keyFieldInterArr {
  326. mapConv, ok := keyFieldInter.(map[string]interface{})
  327. if !ok {
  328. continue
  329. }
  330. // check if mapConv["name"] is in normalKeys
  331. keyName, ok := mapConv["name"].(string) // check if "name" key exists and is a string
  332. if !ok {
  333. continue
  334. }
  335. if _, exists := normalKeys[keyName]; !exists {
  336. keyFieldMapArr = append(keyFieldMapArr, mapConv)
  337. }
  338. }
  339. keyFieldRes := make([]SyncedEnvSectionKey, 0)
  340. for _, keyFieldMap := range keyFieldMapArr {
  341. toAdd := SyncedEnvSectionKey{}
  342. if nameField, nameFieldExists := keyFieldMap["name"]; nameFieldExists {
  343. toAdd.Name, ok = nameField.(string)
  344. if !ok {
  345. continue
  346. }
  347. // only append if not in aggEnvConf
  348. if _, exists := aggEnvConf[toAdd.Name]; !exists {
  349. if secretField, secretFieldExists := keyFieldMap["secret"]; secretFieldExists {
  350. toAdd.Secret, ok = secretField.(bool)
  351. if !ok {
  352. continue
  353. }
  354. }
  355. keyFieldRes = append(keyFieldRes, toAdd)
  356. }
  357. }
  358. }
  359. syncedArrObj.Keys = keyFieldRes
  360. }
  361. syncedArr = append(syncedArr, syncedArrObj)
  362. }
  363. resArr := make([]SyncedEnvSection, 0)
  364. foundMatch := false
  365. for _, candidate := range syncedArr {
  366. if candidate.Name == syncedEnvSection.Name {
  367. resArr = append(resArr, *filterEnvConf(syncedEnvSection, normalKeys))
  368. foundMatch = true
  369. } else {
  370. resArr = append(resArr, *candidate)
  371. }
  372. }
  373. if !foundMatch {
  374. return curr, nil
  375. }
  376. envConf["synced"] = resArr
  377. }
  378. }
  379. // to remove all types that Helm may not be able to work with, we marshal to and from
  380. // yaml for good measure. Otherwise we get silly error messages like:
  381. // Upgrade failed: template: web/templates/deployment.yaml:138:40: executing \"web/templates/deployment.yaml\"
  382. // at <$syncedEnv.keys>: can't evaluate field keys in type namespace.SyncedEnvSection
  383. currYAML, err := yaml.Marshal(curr)
  384. if err != nil {
  385. return nil, err
  386. }
  387. res := make(map[string]interface{})
  388. err = yaml.Unmarshal([]byte(currYAML), &res)
  389. if err != nil {
  390. return nil, err
  391. }
  392. return res, nil
  393. }
  394. func getStacksNestedMap(obj map[string]interface{}, fields ...string) (map[string]interface{}, error) {
  395. var res map[string]interface{}
  396. curr := obj
  397. for _, field := range fields {
  398. objField, ok := curr[field]
  399. if !ok {
  400. return nil, fmt.Errorf("%s not found", field)
  401. }
  402. res, ok = objField.(map[string]interface{})
  403. if !ok {
  404. return nil, fmt.Errorf("%s is not a nested object", field)
  405. }
  406. curr = res
  407. }
  408. return res, nil
  409. }
  410. func filterEnvConf(syncedEnv *SyncedEnvSection, normalEnv map[string]interface{}) *SyncedEnvSection {
  411. // filter out keys that are already in normalEnv
  412. keys := make([]SyncedEnvSectionKey, 0)
  413. for _, key := range syncedEnv.Keys {
  414. if _, exists := normalEnv[key.Name]; !exists {
  415. keys = append(keys, key)
  416. }
  417. }
  418. syncedEnv.Keys = keys
  419. return syncedEnv
  420. }
  421. // postUpgrade runs any necessary scripting after the release has been upgraded.
  422. // func postStacksUpgrade(config *config.Config, projectID, clusterID uint, envGroup *types.EnvGroup) error {
  423. // // update the relevant env group version number if tied to a stack resource
  424. // return stacks.UpdateEnvGroupVersion(config, projectID, clusterID, envGroup)
  425. // }
  426. func getType(name string) string {
  427. if strings.HasSuffix(name, "-web") {
  428. return "web"
  429. } else if strings.HasSuffix(name, "-wkr") {
  430. return "worker"
  431. } else if strings.HasSuffix(name, "-job") {
  432. return "job"
  433. }
  434. return ""
  435. }
  436. func createPorterAppEvent(ctx context.Context, status string, appID uint, revision int, tag string, repo repository.PorterAppEventRepository) (*models.PorterAppEvent, error) {
  437. event := models.PorterAppEvent{
  438. ID: uuid.New(),
  439. Status: status,
  440. Type: "DEPLOY",
  441. TypeExternalSource: "KUBERNETES",
  442. PorterAppID: appID,
  443. Metadata: map[string]any{
  444. "revision": revision,
  445. "image_tag": tag,
  446. },
  447. }
  448. err := repo.CreateEvent(ctx, &event)
  449. if err != nil {
  450. return nil, err
  451. }
  452. if event.ID == uuid.Nil {
  453. return nil, err
  454. }
  455. return &event, nil
  456. }
  457. func attemptToGetImageInfoFromRelease(values map[string]interface{}) types.ImageInfo {
  458. imageInfo := types.ImageInfo{}
  459. if values == nil {
  460. return imageInfo
  461. }
  462. globalImage, err := getNestedMap(values, "global", "image")
  463. if err != nil {
  464. return imageInfo
  465. }
  466. repoVal, okRepo := globalImage["repository"]
  467. tagVal, okTag := globalImage["tag"]
  468. if okRepo && okTag {
  469. imageInfo.Repository = repoVal.(string)
  470. imageInfo.Tag = tagVal.(string)
  471. }
  472. return imageInfo
  473. }
  474. func createReleaseJobChart(
  475. ctx context.Context,
  476. stackName string,
  477. values map[string]interface{},
  478. repoUrl string,
  479. registries []*models.Registry,
  480. cluster *models.Cluster,
  481. repo repository.Repository,
  482. ) (*helm.InstallChartConfig, error) {
  483. chart, err := loader.LoadChartPublic(ctx, repoUrl, "job", "")
  484. if err != nil {
  485. return nil, err
  486. }
  487. releaseName := fmt.Sprintf("%s-r", stackName)
  488. namespace := fmt.Sprintf("porter-stack-%s", stackName)
  489. return &helm.InstallChartConfig{
  490. Chart: chart,
  491. Name: releaseName,
  492. Namespace: namespace,
  493. Values: values,
  494. Cluster: cluster,
  495. Repo: repo,
  496. Registries: registries,
  497. }, nil
  498. }