run_app_job.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package v2
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "syscall"
  9. "time"
  10. "github.com/fatih/color"
  11. "github.com/porter-dev/api-contracts/generated/go/helpers"
  12. porterv1 "github.com/porter-dev/api-contracts/generated/go/porter/v1"
  13. api "github.com/porter-dev/porter/api/client"
  14. "github.com/porter-dev/porter/cli/cmd/config"
  15. porter_app_internal "github.com/porter-dev/porter/internal/porter_app"
  16. )
  17. // WaitIntervalInSeconds is the amount of time to wait when polling for job status
  18. const WaitIntervalInSeconds = 5 * time.Second
  19. // RunAppJobInput is the input for the RunAppJob function
  20. type RunAppJobInput struct {
  21. // CLIConfig is the CLI configuration
  22. CLIConfig config.CLIConfig
  23. // Client is the Porter API client
  24. Client api.Client
  25. // DeploymentTargetName is the name of deployment target to run the job on
  26. DeploymentTargetName string
  27. AppName string
  28. JobName string
  29. // WaitForExit
  30. WaitForExit bool
  31. }
  32. // RunAppJob triggers a job run for an app and returns without waiting for the job to complete
  33. func RunAppJob(ctx context.Context, inp RunAppJobInput) error {
  34. ctx, cancel := context.WithCancel(ctx)
  35. defer cancel()
  36. termChan := make(chan os.Signal, 1)
  37. signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
  38. var runFinished bool
  39. currentAppRevisionResp, err := inp.Client.CurrentAppRevision(ctx, api.CurrentAppRevisionInput{
  40. ProjectID: inp.CLIConfig.Project,
  41. ClusterID: inp.CLIConfig.Cluster,
  42. AppName: inp.AppName,
  43. DeploymentTargetName: inp.DeploymentTargetName,
  44. DeploymentTargetID: "",
  45. })
  46. if err != nil {
  47. return fmt.Errorf("error getting current app revision: %w", err)
  48. }
  49. resp, err := inp.Client.RunAppJob(ctx, inp.CLIConfig.Project, inp.CLIConfig.Cluster, inp.AppName, inp.JobName, inp.DeploymentTargetName) // nolint:staticcheck
  50. if err != nil {
  51. return fmt.Errorf("unable to run job: %w", err)
  52. }
  53. triggeredBackgroundColor := color.FgGreen
  54. if inp.WaitForExit {
  55. triggeredBackgroundColor = color.FgBlue
  56. }
  57. color.New(triggeredBackgroundColor).Println("Triggered job with id:", resp.JobRunID) // nolint:errcheck,gosec
  58. if !inp.WaitForExit {
  59. return nil
  60. }
  61. decoded, err := base64.StdEncoding.DecodeString(currentAppRevisionResp.AppRevision.B64AppProto)
  62. if err != nil {
  63. return fmt.Errorf("unable to decode base64 app for revision: %w", err)
  64. }
  65. app := &porterv1.PorterApp{}
  66. err = helpers.UnmarshalContractObject(decoded, app)
  67. if err != nil {
  68. return fmt.Errorf("unable to unmarshal app for revision: %w", err)
  69. }
  70. timeoutSeconds := 1800 * time.Second
  71. for _, service := range app.ServiceList {
  72. if inp.JobName != service.Name {
  73. continue
  74. }
  75. if service.GetJobConfig() == nil {
  76. return fmt.Errorf("error getting job timeout")
  77. }
  78. timeoutSeconds = time.Duration(service.GetJobConfig().TimeoutSeconds) * time.Second
  79. }
  80. deadline := time.Now().Add(timeoutSeconds)
  81. color.New(color.FgBlue).Printf("Waiting %.f seconds for job to complete\n", timeoutSeconds.Seconds()) // nolint:errcheck,gosec
  82. time.Sleep(2 * time.Second)
  83. input := api.RunAppJobStatusInput{
  84. AppName: inp.AppName,
  85. ClusterID: inp.CLIConfig.Cluster,
  86. DeploymentTargetName: inp.DeploymentTargetName,
  87. ServiceName: inp.JobName,
  88. JobRunID: resp.JobRunID,
  89. ProjectID: inp.CLIConfig.Project,
  90. }
  91. go func() {
  92. select {
  93. case <-termChan:
  94. color.New(color.FgYellow).Println("Shutdown signal received, canceling processes") // nolint:errcheck,gosec
  95. if !runFinished {
  96. color.New(color.FgBlue).Println("\nCanceling job...") // nolint:errcheck,gosec
  97. _, err := inp.Client.CancelAppJobRun(ctx, api.CancelAppJobInput{
  98. ProjectID: inp.CLIConfig.Project,
  99. ClusterID: inp.CLIConfig.Cluster,
  100. AppName: inp.AppName,
  101. DeploymentTargetName: inp.DeploymentTargetName,
  102. JobName: resp.JobRunName,
  103. })
  104. if err != nil {
  105. fmt.Println("Error canceling job:", err)
  106. return
  107. }
  108. color.New(color.FgYellow).Println("\nJob run canceled") // nolint:errcheck,gosec
  109. }
  110. cancel()
  111. return
  112. case <-ctx.Done():
  113. }
  114. }()
  115. for time.Now().Before(deadline) {
  116. select {
  117. case <-ctx.Done():
  118. return ctx.Err()
  119. default:
  120. statusResp, err := inp.Client.RunAppJobStatus(ctx, input)
  121. if err != nil {
  122. return fmt.Errorf("unable to get job status: %w", err)
  123. }
  124. switch statusResp.Status {
  125. case porter_app_internal.InstanceStatusDescriptor_Pending:
  126. print(".")
  127. time.Sleep(WaitIntervalInSeconds)
  128. case porter_app_internal.InstanceStatusDescriptor_Running:
  129. print(".")
  130. time.Sleep(WaitIntervalInSeconds)
  131. case porter_app_internal.InstanceStatusDescriptor_Succeeded:
  132. runFinished = true
  133. print("\n")
  134. color.New(color.FgGreen).Println("Job completed successfully") // nolint:errcheck,gosec
  135. return nil
  136. case porter_app_internal.InstanceStatusDescriptor_Failed:
  137. runFinished = true
  138. return fmt.Errorf("job exited with non-zero exit code: %w", err)
  139. case porter_app_internal.InstanceStatusDescriptor_Unknown:
  140. runFinished = true
  141. return fmt.Errorf("unknown job status: %w", err)
  142. }
  143. }
  144. }
  145. return fmt.Errorf("timeout exceeded")
  146. }