main.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. //go:build ee
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "syscall"
  12. "time"
  13. "github.com/go-chi/chi/middleware"
  14. "github.com/go-chi/chi/v5"
  15. "github.com/joeshaw/envdecode"
  16. "github.com/porter-dev/porter/api/server/shared/config/env"
  17. "github.com/porter-dev/porter/internal/adapter"
  18. "github.com/porter-dev/porter/internal/opa"
  19. "github.com/porter-dev/porter/internal/repository"
  20. "github.com/porter-dev/porter/internal/worker"
  21. "github.com/porter-dev/porter/workers/jobs"
  22. "gorm.io/gorm"
  23. "github.com/porter-dev/porter/ee/integrations/vault"
  24. rcreds "github.com/porter-dev/porter/internal/repository/credentials"
  25. pgorm "github.com/porter-dev/porter/internal/repository/gorm"
  26. )
  27. var (
  28. jobQueue chan worker.Job
  29. envDecoder = EnvConf{}
  30. dbConn *gorm.DB
  31. repo repository.Repository
  32. opaPolicies *opa.KubernetesPolicies
  33. )
  34. // EnvConf holds the environment variables for this binary
  35. type EnvConf struct {
  36. // ServerURL is the URL of the Porter server
  37. ServerURL string `env:"SERVER_URL,default=http://localhost:8080"`
  38. // Porter instance's database configuration
  39. DBConf env.DBConf
  40. // DigitalOcean OAuth2 credentials
  41. DOClientID string `env:"DO_CLIENT_ID"`
  42. DOClientSecret string `env:"DO_CLIENT_SECRET"`
  43. // Worker pool configuration
  44. MaxWorkers uint `env:"MAX_WORKERS,default=10"`
  45. MaxQueue uint `env:"MAX_QUEUE,default=100"`
  46. Port uint `env:"PORT,default=3000"`
  47. /**
  48. * Job-specific configuration
  49. */
  50. // "helm-revisions-count-tracker"
  51. AWSAccessKeyID string `env:"AWS_ACCESS_KEY_ID"`
  52. AWSSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
  53. AWSRegion string `env:"AWS_REGION"`
  54. S3BucketName string `env:"S3_BUCKET_NAME"`
  55. EncryptionKey string `env:"S3_ENCRYPTION_KEY"`
  56. RevisionsCount int `env:"REVISIONS_COUNT,default=20"`
  57. // "recommender"
  58. OPAConfigFileDir string `env:"OPA_CONFIG_FILE_DIR,default=./internal/opa"`
  59. LegacyProjectIDs []uint `env:"LEGACY_PROJECT_IDS"`
  60. // "preview-deployments-ttl-deleter"
  61. PreviewDeploymentsTTL string `env:"PREVIEW_DEPLOYMENTS_TTL"`
  62. }
  63. func main() {
  64. ctx := context.Background()
  65. if err := envdecode.StrictDecode(&envDecoder); err != nil {
  66. log.Fatalf("Failed to decode server conf: %v", err)
  67. }
  68. log.Printf("setting max worker count to: %d\n", envDecoder.MaxWorkers)
  69. log.Printf("setting max job queue count to: %d\n", envDecoder.MaxQueue)
  70. log.Printf("legacy project ids are: %v", envDecoder.LegacyProjectIDs)
  71. db, err := adapter.New(&envDecoder.DBConf)
  72. if err != nil {
  73. log.Fatalln(err)
  74. }
  75. dbConn = db
  76. var credBackend rcreds.CredentialStorage
  77. if envDecoder.DBConf.VaultAPIKey != "" && envDecoder.DBConf.VaultServerURL != "" && envDecoder.DBConf.VaultPrefix != "" {
  78. credBackend = vault.NewClient(
  79. envDecoder.DBConf.VaultServerURL,
  80. envDecoder.DBConf.VaultAPIKey,
  81. envDecoder.DBConf.VaultPrefix,
  82. )
  83. }
  84. var key [32]byte
  85. for i, b := range []byte(envDecoder.DBConf.EncryptionKey) {
  86. key[i] = b
  87. }
  88. repo = pgorm.NewRepository(db, &key, credBackend)
  89. opaPolicies, err = opa.LoadPolicies(envDecoder.OPAConfigFileDir)
  90. if err != nil {
  91. log.Fatalln(err)
  92. }
  93. jobQueue = make(chan worker.Job, envDecoder.MaxQueue)
  94. d := worker.NewDispatcher(int(envDecoder.MaxWorkers))
  95. log.Println("starting worker dispatcher")
  96. err = d.Run(ctx, jobQueue)
  97. if err != nil {
  98. log.Fatalln(err)
  99. }
  100. server := &http.Server{Addr: fmt.Sprintf(":%d", envDecoder.Port), Handler: httpService(ctx)}
  101. serverCtx, serverStopCtx := context.WithCancel(context.Background())
  102. sig := make(chan os.Signal, 1)
  103. signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  104. go func() {
  105. <-sig
  106. log.Println("shutting down server")
  107. shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
  108. defer shutdownCtxCancel()
  109. go func() {
  110. <-shutdownCtx.Done()
  111. if shutdownCtx.Err() == context.DeadlineExceeded {
  112. log.Fatal("graceful shutdown timed out.. forcing exit.")
  113. }
  114. }()
  115. err = server.Shutdown(shutdownCtx)
  116. if err != nil {
  117. log.Fatalln(err)
  118. }
  119. log.Println("server shutdown completed")
  120. serverStopCtx()
  121. }()
  122. log.Println("starting HTTP server at :3000")
  123. err = server.ListenAndServe()
  124. if err != nil && err != http.ErrServerClosed {
  125. log.Fatalf("error starting HTTP server: %v", err)
  126. }
  127. // Wait for server context to be stopped
  128. <-serverCtx.Done()
  129. d.Exit()
  130. }
  131. func httpService(ctx context.Context) http.Handler {
  132. log.Println("setting up HTTP router and adding middleware")
  133. r := chi.NewRouter()
  134. r.Use(middleware.Logger)
  135. r.Use(middleware.Recoverer)
  136. r.Use(middleware.Heartbeat("/ping"))
  137. // r.Use(middleware.AllowContentType("application/json"))
  138. r.Mount("/debug", middleware.Profiler())
  139. log.Println("setting up HTTP POST endpoint to enqueue jobs")
  140. r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
  141. req := make(map[string]interface{})
  142. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  143. log.Printf("error converting body to json: %v", err)
  144. return
  145. }
  146. job := getJob(ctx, chi.URLParam(r, "id"), req)
  147. if job == nil {
  148. w.WriteHeader(http.StatusNotFound)
  149. return
  150. }
  151. jobQueue <- job
  152. w.WriteHeader(http.StatusCreated)
  153. })
  154. return r
  155. }
  156. func getJob(ctx context.Context, id string, input map[string]interface{}) worker.Job {
  157. if id == "helm-revisions-count-tracker" {
  158. newJob, err := jobs.NewHelmRevisionsCountTracker(ctx, dbConn, time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
  159. DBConf: &envDecoder.DBConf,
  160. DOClientID: envDecoder.DOClientID,
  161. DOClientSecret: envDecoder.DOClientSecret,
  162. DOScopes: []string{"read", "write"},
  163. ServerURL: envDecoder.ServerURL,
  164. AWSAccessKeyID: envDecoder.AWSAccessKeyID,
  165. AWSSecretAccessKey: envDecoder.AWSSecretAccessKey,
  166. AWSRegion: envDecoder.AWSRegion,
  167. S3BucketName: envDecoder.S3BucketName,
  168. EncryptionKey: envDecoder.EncryptionKey,
  169. RevisionsCount: envDecoder.RevisionsCount,
  170. })
  171. if err != nil {
  172. log.Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
  173. return nil
  174. }
  175. return newJob
  176. } else if id == "recommender" {
  177. newJob, err := jobs.NewRecommender(dbConn, time.Now().UTC(), &jobs.RecommenderOpts{
  178. DBConf: &envDecoder.DBConf,
  179. DOClientID: envDecoder.DOClientID,
  180. DOClientSecret: envDecoder.DOClientSecret,
  181. DOScopes: []string{"read", "write"},
  182. ServerURL: envDecoder.ServerURL,
  183. Input: input,
  184. LegacyProjectIDs: envDecoder.LegacyProjectIDs,
  185. }, opaPolicies)
  186. if err != nil {
  187. log.Printf("error creating job with ID: recommender. Error: %v", err)
  188. return nil
  189. }
  190. return newJob
  191. } else if id == "preview-deployments-ttl-deleter" {
  192. newJob, err := jobs.NewPreviewDeploymentsTTLDeleter(dbConn, time.Now().UTC(), &jobs.PreviewDeploymentsTTLDeleterOpts{
  193. DBConf: &envDecoder.DBConf,
  194. ServerURL: envDecoder.ServerURL,
  195. DOClientID: envDecoder.DOClientID,
  196. DOClientSecret: envDecoder.DOClientSecret,
  197. DOScopes: []string{"read", "write"},
  198. PreviewDeploymentsTTL: envDecoder.PreviewDeploymentsTTL,
  199. })
  200. if err != nil {
  201. log.Printf("error creating job with ID: preview-deployments-ttl-deleter. Error: %v", err)
  202. return nil
  203. }
  204. return newJob
  205. }
  206. return nil
  207. }