main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. //go:build ee
  2. package main
  3. import (
  4. "context"
  5. "log"
  6. "net/http"
  7. "os"
  8. "os/signal"
  9. "strconv"
  10. "syscall"
  11. "time"
  12. "github.com/go-chi/chi"
  13. "github.com/go-chi/chi/middleware"
  14. "github.com/joeshaw/envdecode"
  15. "github.com/porter-dev/porter/api/server/shared/config/env"
  16. "github.com/porter-dev/porter/internal/worker"
  17. "github.com/porter-dev/porter/workers/jobs"
  18. )
  19. var (
  20. MaxWorkers = os.Getenv("MAX_WORKERS")
  21. MaxQueue = os.Getenv("MAX_QUEUE")
  22. jobQueue chan worker.Job
  23. envDecoder = EnvConf{}
  24. )
  25. type EnvConf struct {
  26. ServerURL string `env:"SERVER_URL,default=http://localhost:8080"`
  27. DOClientID string `env:"DO_CLIENT_ID"`
  28. DOClientSecret string `env:"DO_CLIENT_SECRET"`
  29. DBConf env.DBConf
  30. }
  31. func main() {
  32. if err := envdecode.StrictDecode(&envDecoder); err != nil {
  33. log.Default().Fatalf("Failed to decode server conf: %v", err)
  34. }
  35. workerCount, err := strconv.Atoi(MaxWorkers)
  36. if err != nil {
  37. log.Default().Fatalln("invalid MAX_WORKERS value")
  38. }
  39. log.Default().Printf("setting max worker count to: %d\n", workerCount)
  40. queueCount, err := strconv.Atoi(MaxQueue)
  41. if err != nil {
  42. log.Default().Fatalln("invalid MAX_QUEUE value")
  43. }
  44. log.Default().Printf("setting max job queue count to: %d\n", queueCount)
  45. jobQueue = make(chan worker.Job, queueCount)
  46. d := worker.NewDispatcher(workerCount)
  47. log.Default().Println("starting worker dispatcher")
  48. err = d.Run(jobQueue)
  49. if err != nil {
  50. log.Default().Fatalln(err)
  51. }
  52. server := &http.Server{Addr: ":3000", Handler: httpService()}
  53. serverCtx, serverStopCtx := context.WithCancel(context.Background())
  54. sig := make(chan os.Signal, 1)
  55. signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  56. go func() {
  57. <-sig
  58. log.Default().Println("shutting down server")
  59. shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
  60. defer shutdownCtxCancel()
  61. go func() {
  62. <-shutdownCtx.Done()
  63. if shutdownCtx.Err() == context.DeadlineExceeded {
  64. log.Fatal("graceful shutdown timed out.. forcing exit.")
  65. }
  66. }()
  67. err = server.Shutdown(shutdownCtx)
  68. if err != nil {
  69. log.Fatalln(err)
  70. }
  71. log.Default().Println("server shutdown completed")
  72. serverStopCtx()
  73. }()
  74. log.Default().Println("starting HTTP server at :3000")
  75. err = server.ListenAndServe()
  76. if err != nil && err != http.ErrServerClosed {
  77. log.Default().Fatalf("error starting HTTP server: %v", err)
  78. }
  79. // Wait for server context to be stopped
  80. <-serverCtx.Done()
  81. d.Exit()
  82. }
  83. func httpService() http.Handler {
  84. log.Default().Println("setting up HTTP router and adding middleware")
  85. r := chi.NewRouter()
  86. r.Use(middleware.Logger)
  87. r.Use(middleware.Recoverer)
  88. r.Use(middleware.Heartbeat("/ping"))
  89. r.Use(middleware.AllowContentType("application/json"))
  90. log.Default().Println("setting up HTTP POST endpoint to enqueue jobs")
  91. r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
  92. job := getJob(chi.URLParam(r, "id"))
  93. if job == nil {
  94. w.WriteHeader(http.StatusNotFound)
  95. return
  96. }
  97. jobQueue <- job
  98. w.WriteHeader(http.StatusCreated)
  99. })
  100. return r
  101. }
  102. func getJob(id string) worker.Job {
  103. if id == "helm-revisions-count-tracker" {
  104. newJob, err := jobs.NewHelmRevisionsCountTracker(time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
  105. DBConf: &envDecoder.DBConf,
  106. DOClientID: envDecoder.DOClientID,
  107. DOClientSecret: envDecoder.DOClientSecret,
  108. DOScopes: []string{"read", "write"},
  109. ServerURL: envDecoder.ServerURL,
  110. })
  111. if err != nil {
  112. log.Default().Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
  113. }
  114. return newJob
  115. }
  116. return nil
  117. }