create_stacks_env_group.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. package namespace
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "sigs.k8s.io/yaml"
  10. "github.com/google/uuid"
  11. "github.com/stefanmcshane/helm/pkg/release"
  12. "github.com/porter-dev/porter/api/server/authz"
  13. "github.com/porter-dev/porter/api/server/handlers"
  14. "github.com/porter-dev/porter/api/server/shared"
  15. "github.com/porter-dev/porter/api/server/shared/apierrors"
  16. "github.com/porter-dev/porter/api/server/shared/config"
  17. "github.com/porter-dev/porter/api/types"
  18. "github.com/porter-dev/porter/internal/helm"
  19. "github.com/porter-dev/porter/internal/kubernetes"
  20. "github.com/porter-dev/porter/internal/kubernetes/envgroup"
  21. "github.com/porter-dev/porter/internal/models"
  22. "github.com/porter-dev/porter/internal/repository"
  23. "github.com/porter-dev/porter/internal/telemetry"
  24. "github.com/stefanmcshane/helm/pkg/chart"
  25. )
  26. type CreateStacksEnvGroupHandler struct {
  27. handlers.PorterHandlerReadWriter
  28. authz.KubernetesAgentGetter
  29. }
  30. func NewCreateStacksEnvGroupHandler(
  31. config *config.Config,
  32. decoderValidator shared.RequestDecoderValidator,
  33. writer shared.ResultWriter,
  34. ) *CreateStacksEnvGroupHandler {
  35. return &CreateStacksEnvGroupHandler{
  36. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  37. KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
  38. }
  39. }
  40. func (c *CreateStacksEnvGroupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  41. request := &types.CreateStacksEnvGroupRequest{}
  42. if ok := c.DecodeAndValidate(w, r, request); !ok {
  43. return
  44. }
  45. namespace := r.Context().Value(types.NamespaceScope).(string)
  46. cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
  47. agent, err := c.GetAgent(r, cluster, namespace)
  48. if err != nil {
  49. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting agent"))
  50. return
  51. }
  52. // if the environment group exists and has MetaVersion=1, throw an error
  53. aggregateReleases := []*release.Release{}
  54. for i := range request.Apps {
  55. namespaceStack := "porter-stack-" + request.Apps[i]
  56. helmAgent, err := c.GetHelmAgent(r.Context(), r, cluster, namespaceStack)
  57. if err != nil {
  58. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting agent"))
  59. return
  60. }
  61. releases, err := envgroup.GetStackSyncedReleases(helmAgent, namespaceStack)
  62. if err != nil {
  63. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting releases"))
  64. return
  65. }
  66. aggregateReleases = append(aggregateReleases, releases...)
  67. }
  68. errors := rolloutStacksApplications(c, c.Config(), cluster, request.Name, namespace, agent, aggregateReleases, r, w)
  69. if len(errors) > 0 {
  70. errStrArr := make([]string, 0)
  71. for _, err := range errors {
  72. errStrArr = append(errStrArr, err.Error())
  73. }
  74. c.HandleAPIErrorNoWrite(w, r, apierrors.NewErrPassThroughToClient(err, 504, "error getting adding env group"))
  75. return
  76. }
  77. c.WriteResult(w, r, nil)
  78. }
  79. func rolloutStacksApplications(
  80. c *CreateStacksEnvGroupHandler,
  81. config *config.Config,
  82. cluster *models.Cluster,
  83. envGroupName string,
  84. namespace string,
  85. agent *kubernetes.Agent,
  86. releases []*release.Release,
  87. r *http.Request,
  88. w http.ResponseWriter,
  89. ) []error {
  90. registries, err := config.Repo.Registry().ListRegistriesByProjectID(cluster.ProjectID)
  91. if err != nil {
  92. return []error{err}
  93. }
  94. // asynchronously update releases with that image repo uri
  95. var wg sync.WaitGroup
  96. mu := &sync.Mutex{}
  97. errors := make([]error, 0)
  98. for i, rel := range releases {
  99. index := i
  100. release := rel
  101. wg.Add(1)
  102. cm, _, err := agent.GetLatestVersionedConfigMap(envGroupName, "porter-stack-"+releases[index].Name)
  103. if err != nil {
  104. return []error{err}
  105. }
  106. versionStr, ok := cm.ObjectMeta.Labels["version"]
  107. if !ok {
  108. return []error{err}
  109. }
  110. versionInt, err := strconv.Atoi(versionStr)
  111. if err != nil {
  112. return []error{err}
  113. }
  114. version := uint(versionInt)
  115. newSection := &SyncedEnvSection{
  116. Name: envGroupName,
  117. Version: version,
  118. }
  119. newSectionKeys := make([]SyncedEnvSectionKey, 0)
  120. for key, val := range cm.Data {
  121. newSectionKeys = append(newSectionKeys, SyncedEnvSectionKey{
  122. Name: key,
  123. Secret: strings.Contains(val, "PORTERSECRET"),
  124. })
  125. }
  126. newSection.Keys = newSectionKeys
  127. go func() {
  128. defer wg.Done()
  129. // read release via agent
  130. newConfig, err := getNewStacksConfig(release.Config, newSection, release)
  131. if err != nil {
  132. mu.Lock()
  133. errors = append(errors, err)
  134. mu.Unlock()
  135. return
  136. }
  137. // if this is a job chart, update the config and set correct paused param to true
  138. if release.Chart.Name() == "job" {
  139. newConfig["paused"] = true
  140. }
  141. if req := releases[index].Chart.Metadata.Dependencies; req != nil {
  142. for _, dep := range req {
  143. dep.Name = getType(dep.Name)
  144. }
  145. }
  146. metadata := &chart.Metadata{
  147. Name: "umbrella",
  148. Description: "Web application that is exposed to external traffic.",
  149. Version: "0.96.0",
  150. APIVersion: "v2",
  151. Home: "https://getporter.dev/",
  152. Icon: "https://user-images.githubusercontent.com/65516095/111255214-07d3da80-85ed-11eb-99e2-fddcbdb99bdb.png",
  153. Keywords: []string{
  154. "porter",
  155. "application",
  156. "service",
  157. "umbrella",
  158. },
  159. Type: "application",
  160. Dependencies: releases[index].Chart.Metadata.Dependencies,
  161. }
  162. charter := &chart.Chart{
  163. Metadata: metadata,
  164. }
  165. conf := &helm.InstallChartConfig{
  166. Chart: charter,
  167. Name: releases[index].Name,
  168. Namespace: "porter-stack-" + releases[index].Name,
  169. Values: newConfig,
  170. Cluster: cluster,
  171. Repo: config.Repo,
  172. Registries: registries,
  173. }
  174. helmAgent, err := c.GetHelmAgent(r.Context(), r, cluster, "porter-stack-"+releases[index].Name)
  175. if err != nil {
  176. fmt.Println("Could Not Get Helm Agent ")
  177. return
  178. }
  179. _, err = helmAgent.UpgradeInstallChart(r.Context(), conf, config.DOConf, config.ServerConf.DisablePullSecretsInjection)
  180. if err != nil {
  181. mu.Lock()
  182. errors = append(errors, err)
  183. mu.Unlock()
  184. return
  185. }
  186. }()
  187. app, err := c.Repo().PorterApp().ReadPorterAppByName(cluster.ID, releases[index].Name)
  188. ctx, span := telemetry.NewSpan(r.Context(), "serve-create-porter-app")
  189. updatedPorterApp, err := c.Repo().PorterApp().UpdatePorterApp(app)
  190. if err != nil {
  191. err = telemetry.Error(ctx, span, err, "error writing updated app to DB")
  192. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  193. return []error{err}
  194. }
  195. imageInfo := attemptToGetImageInfoFromRelease(releases[i].Config)
  196. _, err = createPorterAppEvent(ctx, "SUCCESS", updatedPorterApp.ID, releases[i].Version+1, imageInfo.Tag, c.Repo().PorterAppEvent())
  197. if err != nil {
  198. err = telemetry.Error(ctx, span, err, "error creating porter app event")
  199. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
  200. return []error{err}
  201. }
  202. }
  203. wg.Wait()
  204. return errors
  205. }
  206. func getNewStacksConfig(curr map[string]interface{}, syncedEnvSection *SyncedEnvSection, release *release.Release) (map[string]interface{}, error) {
  207. // look for container.env.synced
  208. aggEnvConf := make(map[string]interface{})
  209. for _, dep := range release.Chart.Metadata.Dependencies {
  210. envConf, err := getStacksNestedMap(curr, dep.Name, "container", "env")
  211. normalKeys, ok := envConf["normal"].(map[string]interface{})
  212. if !ok {
  213. fmt.Println("Normal Keys", normalKeys)
  214. }
  215. if err != nil {
  216. return nil, err
  217. }
  218. for k, v := range envConf {
  219. aggEnvConf[k] = v
  220. }
  221. syncedEnvInter, syncedEnvExists := envConf["synced"]
  222. if !syncedEnvExists {
  223. return curr, nil
  224. } else {
  225. syncedArr := make([]*SyncedEnvSection, 0)
  226. syncedArrInter, ok := syncedEnvInter.([]interface{})
  227. if !ok {
  228. return nil, fmt.Errorf("could not convert to synced env section: not an array")
  229. }
  230. for _, syncedArrInterObj := range syncedArrInter {
  231. syncedArrObj := &SyncedEnvSection{}
  232. syncedArrInterObjMap, ok := syncedArrInterObj.(map[string]interface{})
  233. if !ok {
  234. continue
  235. }
  236. if nameField, nameFieldExists := syncedArrInterObjMap["name"]; nameFieldExists {
  237. syncedArrObj.Name, ok = nameField.(string)
  238. if !ok {
  239. continue
  240. }
  241. }
  242. if versionField, versionFieldExists := syncedArrInterObjMap["version"]; versionFieldExists {
  243. versionFloat, ok := versionField.(float64)
  244. if !ok {
  245. continue
  246. }
  247. syncedArrObj.Version = uint(versionFloat)
  248. }
  249. if keyField, keyFieldExists := syncedArrInterObjMap["keys"]; keyFieldExists {
  250. keyFieldInterArr, ok := keyField.([]interface{})
  251. if !ok {
  252. continue
  253. }
  254. keyFieldMapArr := make([]map[string]interface{}, 0)
  255. for _, keyFieldInter := range keyFieldInterArr {
  256. mapConv, ok := keyFieldInter.(map[string]interface{})
  257. if !ok {
  258. continue
  259. }
  260. // check if mapConv["name"] is in normalKeys
  261. keyName, ok := mapConv["name"].(string) // check if "name" key exists and is a string
  262. if !ok {
  263. continue
  264. }
  265. if _, exists := normalKeys[keyName]; !exists {
  266. keyFieldMapArr = append(keyFieldMapArr, mapConv)
  267. }
  268. }
  269. keyFieldRes := make([]SyncedEnvSectionKey, 0)
  270. for _, keyFieldMap := range keyFieldMapArr {
  271. toAdd := SyncedEnvSectionKey{}
  272. if nameField, nameFieldExists := keyFieldMap["name"]; nameFieldExists {
  273. toAdd.Name, ok = nameField.(string)
  274. if !ok {
  275. continue
  276. }
  277. // only append if not in aggEnvConf
  278. if _, exists := aggEnvConf[toAdd.Name]; !exists {
  279. if secretField, secretFieldExists := keyFieldMap["secret"]; secretFieldExists {
  280. toAdd.Secret, ok = secretField.(bool)
  281. if !ok {
  282. continue
  283. }
  284. }
  285. keyFieldRes = append(keyFieldRes, toAdd)
  286. }
  287. }
  288. }
  289. syncedArrObj.Keys = keyFieldRes
  290. }
  291. syncedArr = append(syncedArr, syncedArrObj)
  292. }
  293. resArr := make([]SyncedEnvSection, 0)
  294. foundMatch := false
  295. for _, candidate := range syncedArr {
  296. if candidate.Name == syncedEnvSection.Name {
  297. resArr = append(resArr, *filterEnvConf(syncedEnvSection, normalKeys))
  298. foundMatch = true
  299. } else {
  300. resArr = append(resArr, *candidate)
  301. }
  302. }
  303. if !foundMatch {
  304. return curr, nil
  305. }
  306. envConf["synced"] = resArr
  307. }
  308. }
  309. // to remove all types that Helm may not be able to work with, we marshal to and from
  310. // yaml for good measure. Otherwise we get silly error messages like:
  311. // Upgrade failed: template: web/templates/deployment.yaml:138:40: executing \"web/templates/deployment.yaml\"
  312. // at <$syncedEnv.keys>: can't evaluate field keys in type namespace.SyncedEnvSection
  313. currYAML, err := yaml.Marshal(curr)
  314. if err != nil {
  315. return nil, err
  316. }
  317. res := make(map[string]interface{})
  318. err = yaml.Unmarshal([]byte(currYAML), &res)
  319. if err != nil {
  320. return nil, err
  321. }
  322. return res, nil
  323. }
  324. func getStacksNestedMap(obj map[string]interface{}, fields ...string) (map[string]interface{}, error) {
  325. var res map[string]interface{}
  326. curr := obj
  327. for _, field := range fields {
  328. objField, ok := curr[field]
  329. if !ok {
  330. return nil, fmt.Errorf("%s not found", field)
  331. }
  332. res, ok = objField.(map[string]interface{})
  333. if !ok {
  334. return nil, fmt.Errorf("%s is not a nested object", field)
  335. }
  336. curr = res
  337. }
  338. return res, nil
  339. }
  340. func filterEnvConf(syncedEnv *SyncedEnvSection, normalEnv map[string]interface{}) *SyncedEnvSection {
  341. // filter out keys that are already in normalEnv
  342. keys := make([]SyncedEnvSectionKey, 0)
  343. for _, key := range syncedEnv.Keys {
  344. if _, exists := normalEnv[key.Name]; !exists {
  345. keys = append(keys, key)
  346. }
  347. }
  348. syncedEnv.Keys = keys
  349. return syncedEnv
  350. }
  351. // postUpgrade runs any necessary scripting after the release has been upgraded.
  352. // func postStacksUpgrade(config *config.Config, projectID, clusterID uint, envGroup *types.EnvGroup) error {
  353. // // update the relevant env group version number if tied to a stack resource
  354. // return stacks.UpdateEnvGroupVersion(config, projectID, clusterID, envGroup)
  355. // }
  356. func getType(name string) string {
  357. if strings.HasSuffix(name, "-web") {
  358. return "web"
  359. } else if strings.HasSuffix(name, "-wkr") {
  360. return "worker"
  361. } else if strings.HasSuffix(name, "-job") {
  362. return "job"
  363. }
  364. return ""
  365. }
  366. func createPorterAppEvent(ctx context.Context, status string, appID uint, revision int, tag string, repo repository.PorterAppEventRepository) (*models.PorterAppEvent, error) {
  367. event := models.PorterAppEvent{
  368. ID: uuid.New(),
  369. Status: status,
  370. Type: "DEPLOY",
  371. TypeExternalSource: "KUBERNETES",
  372. PorterAppID: appID,
  373. Metadata: map[string]any{
  374. "revision": revision,
  375. "image_tag": tag,
  376. },
  377. }
  378. err := repo.CreateEvent(ctx, &event)
  379. if err != nil {
  380. return nil, err
  381. }
  382. if event.ID == uuid.Nil {
  383. return nil, err
  384. }
  385. return &event, nil
  386. }
  387. func attemptToGetImageInfoFromRelease(values map[string]interface{}) types.ImageInfo {
  388. imageInfo := types.ImageInfo{}
  389. if values == nil {
  390. return imageInfo
  391. }
  392. globalImage, err := getNestedMap(values, "global", "image")
  393. if err != nil {
  394. return imageInfo
  395. }
  396. repoVal, okRepo := globalImage["repository"]
  397. tagVal, okTag := globalImage["tag"]
  398. if okRepo && okTag {
  399. imageInfo.Repository = repoVal.(string)
  400. imageInfo.Tag = tagVal.(string)
  401. }
  402. return imageInfo
  403. }