main.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. //go:build ee
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "syscall"
  12. "time"
  13. "github.com/go-chi/chi"
  14. "github.com/go-chi/chi/middleware"
  15. "github.com/joeshaw/envdecode"
  16. "github.com/porter-dev/porter/api/server/shared/config/env"
  17. "github.com/porter-dev/porter/internal/adapter"
  18. "github.com/porter-dev/porter/internal/worker"
  19. "github.com/porter-dev/porter/workers/jobs"
  20. "gorm.io/gorm"
  21. )
  22. var (
  23. jobQueue chan worker.Job
  24. envDecoder = EnvConf{}
  25. dbConn *gorm.DB
  26. )
  27. // EnvConf holds the environment variables for this binary
  28. type EnvConf struct {
  29. ServerURL string `env:"SERVER_URL,default=http://localhost:8080"`
  30. DOClientID string `env:"DO_CLIENT_ID"`
  31. DOClientSecret string `env:"DO_CLIENT_SECRET"`
  32. DBConf env.DBConf
  33. MaxWorkers uint `env:"MAX_WORKERS,default=10"`
  34. MaxQueue uint `env:"MAX_QUEUE,default=100"`
  35. AWSAccessKeyID string `env:"AWS_ACCESS_KEY_ID"`
  36. AWSSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
  37. AWSRegion string `env:"AWS_REGION"`
  38. S3BucketName string `env:"S3_BUCKET_NAME"`
  39. EncryptionKey string `env:"S3_ENCRYPTION_KEY"`
  40. Port uint `env:"PORT,default=3000"`
  41. }
  42. func main() {
  43. if err := envdecode.StrictDecode(&envDecoder); err != nil {
  44. log.Fatalf("Failed to decode server conf: %v", err)
  45. }
  46. log.Printf("setting max worker count to: %d\n", envDecoder.MaxWorkers)
  47. log.Printf("setting max job queue count to: %d\n", envDecoder.MaxQueue)
  48. db, err := adapter.New(&envDecoder.DBConf)
  49. if err != nil {
  50. log.Fatalln(err)
  51. }
  52. dbConn = db
  53. jobQueue = make(chan worker.Job, envDecoder.MaxQueue)
  54. d := worker.NewDispatcher(int(envDecoder.MaxWorkers))
  55. log.Println("starting worker dispatcher")
  56. err = d.Run(jobQueue)
  57. if err != nil {
  58. log.Fatalln(err)
  59. }
  60. server := &http.Server{Addr: fmt.Sprintf(":%d", envDecoder.Port), Handler: httpService()}
  61. serverCtx, serverStopCtx := context.WithCancel(context.Background())
  62. sig := make(chan os.Signal, 1)
  63. signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  64. go func() {
  65. <-sig
  66. log.Println("shutting down server")
  67. shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
  68. defer shutdownCtxCancel()
  69. go func() {
  70. <-shutdownCtx.Done()
  71. if shutdownCtx.Err() == context.DeadlineExceeded {
  72. log.Fatal("graceful shutdown timed out.. forcing exit.")
  73. }
  74. }()
  75. err = server.Shutdown(shutdownCtx)
  76. if err != nil {
  77. log.Fatalln(err)
  78. }
  79. log.Println("server shutdown completed")
  80. serverStopCtx()
  81. }()
  82. log.Println("starting HTTP server at :3000")
  83. err = server.ListenAndServe()
  84. if err != nil && err != http.ErrServerClosed {
  85. log.Fatalf("error starting HTTP server: %v", err)
  86. }
  87. // Wait for server context to be stopped
  88. <-serverCtx.Done()
  89. d.Exit()
  90. }
  91. func httpService() http.Handler {
  92. log.Println("setting up HTTP router and adding middleware")
  93. r := chi.NewRouter()
  94. r.Use(middleware.Logger)
  95. r.Use(middleware.Recoverer)
  96. r.Use(middleware.Heartbeat("/ping"))
  97. // r.Use(middleware.AllowContentType("application/json"))
  98. r.Mount("/debug", middleware.Profiler())
  99. log.Println("setting up HTTP POST endpoint to enqueue jobs")
  100. r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
  101. req := make(map[string]interface{})
  102. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  103. log.Printf("error converting body to json: %v", err)
  104. return
  105. }
  106. job := getJob(chi.URLParam(r, "id"), req)
  107. if job == nil {
  108. w.WriteHeader(http.StatusNotFound)
  109. return
  110. }
  111. jobQueue <- job
  112. w.WriteHeader(http.StatusCreated)
  113. })
  114. return r
  115. }
  116. func getJob(id string, input map[string]interface{}) worker.Job {
  117. if id == "helm-revisions-count-tracker" {
  118. newJob, err := jobs.NewHelmRevisionsCountTracker(dbConn, time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
  119. DBConf: &envDecoder.DBConf,
  120. DOClientID: envDecoder.DOClientID,
  121. DOClientSecret: envDecoder.DOClientSecret,
  122. DOScopes: []string{"read", "write"},
  123. ServerURL: envDecoder.ServerURL,
  124. AWSAccessKeyID: envDecoder.AWSAccessKeyID,
  125. AWSSecretAccessKey: envDecoder.AWSSecretAccessKey,
  126. AWSRegion: envDecoder.AWSRegion,
  127. S3BucketName: envDecoder.S3BucketName,
  128. EncryptionKey: envDecoder.EncryptionKey,
  129. })
  130. if err != nil {
  131. log.Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
  132. return nil
  133. }
  134. return newJob
  135. } else if id == "nginx-recommender" {
  136. newJob, err := jobs.NewNGINXRecommender(dbConn, time.Now().UTC(), &jobs.NGINXRecommenderOpts{
  137. DBConf: &envDecoder.DBConf,
  138. DOClientID: envDecoder.DOClientID,
  139. DOClientSecret: envDecoder.DOClientSecret,
  140. DOScopes: []string{"read", "write"},
  141. ServerURL: envDecoder.ServerURL,
  142. Input: input,
  143. })
  144. if err != nil {
  145. log.Printf("error creating job with ID: nginx-recommender. Error: %v", err)
  146. return nil
  147. }
  148. return newJob
  149. }
  150. return nil
  151. }