main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "net/http"
  6. "os"
  7. "os/signal"
  8. "strconv"
  9. "syscall"
  10. "time"
  11. "github.com/go-chi/chi"
  12. "github.com/go-chi/chi/middleware"
  13. "github.com/joeshaw/envdecode"
  14. "github.com/porter-dev/porter/api/server/shared/config/env"
  15. "github.com/porter-dev/porter/internal/worker"
  16. "github.com/porter-dev/porter/workers/jobs"
  17. )
  18. var (
  19. MaxWorkers = os.Getenv("MAX_WORKERS")
  20. MaxQueue = os.Getenv("MAX_QUEUE")
  21. jobQueue chan worker.Job
  22. envDecoder = EnvConf{}
  23. )
  24. type EnvConf struct {
  25. ServerURL string `env:"SERVER_URL,default=http://localhost:8080"`
  26. DOClientID string `env:"DO_CLIENT_ID"`
  27. DOClientSecret string `env:"DO_CLIENT_SECRET"`
  28. DBConf env.DBConf
  29. }
  30. func main() {
  31. if err := envdecode.StrictDecode(&envDecoder); err != nil {
  32. log.Default().Fatalf("Failed to decode server conf: %v", err)
  33. }
  34. workerCount, err := strconv.Atoi(MaxWorkers)
  35. if err != nil {
  36. log.Default().Fatalln("invalid MAX_WORKERS value")
  37. }
  38. log.Default().Printf("setting max worker count to: %d\n", workerCount)
  39. queueCount, err := strconv.Atoi(MaxQueue)
  40. if err != nil {
  41. log.Default().Fatalln("invalid MAX_QUEUE value")
  42. }
  43. log.Default().Printf("setting max job queue count to: %d\n", queueCount)
  44. jobQueue = make(chan worker.Job, queueCount)
  45. d := worker.NewDispatcher(workerCount)
  46. log.Default().Println("starting worker dispatcher")
  47. err = d.Run(jobQueue)
  48. if err != nil {
  49. log.Default().Fatalln(err)
  50. }
  51. server := &http.Server{Addr: ":3000", Handler: httpService()}
  52. serverCtx, serverStopCtx := context.WithCancel(context.Background())
  53. sig := make(chan os.Signal, 1)
  54. signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  55. go func() {
  56. <-sig
  57. log.Default().Println("shutting down server")
  58. shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
  59. defer shutdownCtxCancel()
  60. go func() {
  61. <-shutdownCtx.Done()
  62. if shutdownCtx.Err() == context.DeadlineExceeded {
  63. log.Fatal("graceful shutdown timed out.. forcing exit.")
  64. }
  65. }()
  66. err = server.Shutdown(shutdownCtx)
  67. if err != nil {
  68. log.Fatalln(err)
  69. }
  70. log.Default().Println("server shutdown completed")
  71. serverStopCtx()
  72. }()
  73. log.Default().Println("starting HTTP server at :3000")
  74. err = server.ListenAndServe()
  75. if err != nil && err != http.ErrServerClosed {
  76. log.Default().Fatalf("error starting HTTP server: %v", err)
  77. }
  78. // Wait for server context to be stopped
  79. <-serverCtx.Done()
  80. d.Exit()
  81. }
  82. func httpService() http.Handler {
  83. log.Default().Println("setting up HTTP router and adding middleware")
  84. r := chi.NewRouter()
  85. r.Use(middleware.Logger)
  86. r.Use(middleware.Recoverer)
  87. r.Use(middleware.Heartbeat("/ping"))
  88. r.Use(middleware.AllowContentType("application/json"))
  89. log.Default().Println("setting up HTTP POST endpoint to enqueue jobs")
  90. r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
  91. job := getJob(chi.URLParam(r, "id"))
  92. if job == nil {
  93. w.WriteHeader(http.StatusNotFound)
  94. return
  95. }
  96. jobQueue <- job
  97. w.WriteHeader(http.StatusCreated)
  98. })
  99. return r
  100. }
  101. func getJob(id string) worker.Job {
  102. if id == "helm-revisions-count-tracker" {
  103. newJob, err := jobs.NewHelmRevisionsCountTracker(time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
  104. DBConf: &envDecoder.DBConf,
  105. DOClientID: envDecoder.DOClientID,
  106. DOClientSecret: envDecoder.DOClientSecret,
  107. DOScopes: []string{"read", "write"},
  108. ServerURL: envDecoder.ServerURL,
  109. })
  110. if err != nil {
  111. log.Default().Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
  112. }
  113. return newJob
  114. }
  115. return nil
  116. }