main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. //go:build ee
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "net/http"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. "github.com/go-chi/chi"
  12. "github.com/go-chi/chi/middleware"
  13. "github.com/joeshaw/envdecode"
  14. "github.com/porter-dev/porter/api/server/shared/config/env"
  15. "github.com/porter-dev/porter/internal/worker"
  16. "github.com/porter-dev/porter/workers/jobs"
  17. )
  18. var (
  19. jobQueue chan worker.Job
  20. envDecoder = EnvConf{}
  21. )
  22. type EnvConf struct {
  23. ServerURL string `env:"SERVER_URL,default=http://localhost:8080"`
  24. DOClientID string `env:"DO_CLIENT_ID"`
  25. DOClientSecret string `env:"DO_CLIENT_SECRET"`
  26. DBConf env.DBConf
  27. MaxWorkers uint `env:"MAX_WORKERS,default=10"`
  28. MaxQueue uint `env:"MAX_QUEUE,default=100"`
  29. AWSAccessKeyID string `env:"AWS_ACCESS_KEY_ID"`
  30. AWSSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
  31. AWSRegion string `env:"AWS_REGION"`
  32. S3BucketName string `env:"S3_BUCKET_NAME"`
  33. EncryptionKey *[32]byte `env:"ENCRYPTION_KEY"`
  34. }
  35. func main() {
  36. if err := envdecode.StrictDecode(&envDecoder); err != nil {
  37. log.Fatalf("Failed to decode server conf: %v", err)
  38. }
  39. log.Printf("setting max worker count to: %d\n", envDecoder.MaxWorkers)
  40. log.Printf("setting max job queue count to: %d\n", envDecoder.MaxQueue)
  41. jobQueue = make(chan worker.Job, envDecoder.MaxQueue)
  42. d := worker.NewDispatcher(int(envDecoder.MaxWorkers))
  43. log.Println("starting worker dispatcher")
  44. err := d.Run(jobQueue)
  45. if err != nil {
  46. log.Fatalln(err)
  47. }
  48. server := &http.Server{Addr: ":3000", Handler: httpService()}
  49. serverCtx, serverStopCtx := context.WithCancel(context.Background())
  50. sig := make(chan os.Signal, 1)
  51. signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  52. go func() {
  53. <-sig
  54. log.Println("shutting down server")
  55. shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
  56. defer shutdownCtxCancel()
  57. go func() {
  58. <-shutdownCtx.Done()
  59. if shutdownCtx.Err() == context.DeadlineExceeded {
  60. log.Fatal("graceful shutdown timed out.. forcing exit.")
  61. }
  62. }()
  63. err = server.Shutdown(shutdownCtx)
  64. if err != nil {
  65. log.Fatalln(err)
  66. }
  67. log.Println("server shutdown completed")
  68. serverStopCtx()
  69. }()
  70. log.Println("starting HTTP server at :3000")
  71. err = server.ListenAndServe()
  72. if err != nil && err != http.ErrServerClosed {
  73. log.Fatalf("error starting HTTP server: %v", err)
  74. }
  75. // Wait for server context to be stopped
  76. <-serverCtx.Done()
  77. d.Exit()
  78. }
  79. func httpService() http.Handler {
  80. log.Println("setting up HTTP router and adding middleware")
  81. r := chi.NewRouter()
  82. r.Use(middleware.Logger)
  83. r.Use(middleware.Recoverer)
  84. r.Use(middleware.Heartbeat("/ping"))
  85. r.Use(middleware.AllowContentType("application/json"))
  86. log.Println("setting up HTTP POST endpoint to enqueue jobs")
  87. r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
  88. job := getJob(chi.URLParam(r, "id"))
  89. if job == nil {
  90. w.WriteHeader(http.StatusNotFound)
  91. return
  92. }
  93. jobQueue <- job
  94. w.WriteHeader(http.StatusCreated)
  95. })
  96. return r
  97. }
  98. func getJob(id string) worker.Job {
  99. if id == "helm-revisions-count-tracker" {
  100. newJob, err := jobs.NewHelmRevisionsCountTracker(time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
  101. DBConf: &envDecoder.DBConf,
  102. DOClientID: envDecoder.DOClientID,
  103. DOClientSecret: envDecoder.DOClientSecret,
  104. DOScopes: []string{"read", "write"},
  105. ServerURL: envDecoder.ServerURL,
  106. AWSAccessKeyID: envDecoder.AWSAccessKeyID,
  107. AWSSecretAccessKey: envDecoder.AWSSecretAccessKey,
  108. AWSRegion: envDecoder.AWSRegion,
  109. S3BucketName: envDecoder.S3BucketName,
  110. EncryptionKey: envDecoder.EncryptionKey,
  111. })
  112. if err != nil {
  113. log.Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
  114. }
  115. return newJob
  116. }
  117. return nil
  118. }