main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "net/http"
  6. "os"
  7. "os/signal"
  8. "strconv"
  9. "syscall"
  10. "time"
  11. "github.com/go-chi/chi"
  12. "github.com/go-chi/chi/middleware"
  13. "github.com/porter-dev/porter/internal/worker"
  14. "github.com/porter-dev/porter/workers/jobs"
  15. )
  16. var (
  17. MaxWorkers = os.Getenv("MAX_WORKERS")
  18. MaxQueue = os.Getenv("MAX_QUEUE")
  19. jobQueue chan worker.Job
  20. )
  21. func main() {
  22. workerCount, err := strconv.Atoi(MaxWorkers)
  23. if err != nil {
  24. log.Default().Fatalln("invalid MAX_WORKERS value")
  25. }
  26. log.Default().Printf("setting max worker count to: %d\n", workerCount)
  27. queueCount, err := strconv.Atoi(MaxQueue)
  28. if err != nil {
  29. log.Default().Fatalln("invalid MAX_QUEUE value")
  30. }
  31. log.Default().Printf("setting max job queue count to: %d\n", queueCount)
  32. jobQueue = make(chan worker.Job, queueCount)
  33. d := worker.NewDispatcher(workerCount)
  34. log.Default().Println("starting worker dispatcher")
  35. err = d.Run(jobQueue)
  36. if err != nil {
  37. log.Default().Fatalln(err)
  38. }
  39. server := &http.Server{Addr: ":3000", Handler: httpService()}
  40. serverCtx, serverStopCtx := context.WithCancel(context.Background())
  41. sig := make(chan os.Signal, 1)
  42. signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  43. go func() {
  44. <-sig
  45. log.Default().Println("shutting down server")
  46. shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
  47. defer shutdownCtxCancel()
  48. go func() {
  49. <-shutdownCtx.Done()
  50. if shutdownCtx.Err() == context.DeadlineExceeded {
  51. log.Fatal("graceful shutdown timed out.. forcing exit.")
  52. }
  53. }()
  54. err = server.Shutdown(shutdownCtx)
  55. if err != nil {
  56. log.Fatalln(err)
  57. }
  58. log.Default().Println("server shutdown completed")
  59. serverStopCtx()
  60. }()
  61. log.Default().Println("starting HTTP server at :3000")
  62. err = server.ListenAndServe()
  63. if err != nil && err != http.ErrServerClosed {
  64. log.Default().Fatalf("error starting HTTP server: %v", err)
  65. }
  66. // Wait for server context to be stopped
  67. <-serverCtx.Done()
  68. d.Exit()
  69. }
  70. func httpService() http.Handler {
  71. log.Default().Println("setting up HTTP router and adding middleware")
  72. r := chi.NewRouter()
  73. r.Use(middleware.Logger)
  74. r.Use(middleware.Recoverer)
  75. r.Use(middleware.Heartbeat("/ping"))
  76. r.Use(middleware.AllowContentType("application/json"))
  77. log.Default().Println("setting up HTTP POST endpoint to enqueue jobs")
  78. r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
  79. job := getJob(chi.URLParam(r, "id"))
  80. if job == nil {
  81. w.WriteHeader(http.StatusNotFound)
  82. return
  83. }
  84. jobQueue <- job
  85. w.WriteHeader(http.StatusCreated)
  86. })
  87. return r
  88. }
  89. func getJob(id string) worker.Job {
  90. if id == "helm-revisions-count-tracker" {
  91. return jobs.NewHelmRevisionsCountTracker(time.Now().UTC())
  92. }
  93. return nil
  94. }