ingest.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // NewGetUsageDashboardHandler returns a new GetUsageDashboardHandler
  2. package billing
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "net/http"
  9. "github.com/porter-dev/porter/api/server/handlers"
  10. "github.com/porter-dev/porter/api/server/shared"
  11. "github.com/porter-dev/porter/api/server/shared/apierrors"
  12. "github.com/porter-dev/porter/api/server/shared/config"
  13. "github.com/porter-dev/porter/api/types"
  14. "github.com/porter-dev/porter/internal/models"
  15. "github.com/porter-dev/porter/internal/telemetry"
  16. )
  17. // IngestEventsHandler is a handler for ingesting billing events
  18. type IngestEventsHandler struct {
  19. handlers.PorterHandlerReadWriter
  20. }
  21. // NewIngestEventsHandler returns a new IngestEventsHandler
  22. func NewIngestEventsHandler(
  23. config *config.Config,
  24. decoderValidator shared.RequestDecoderValidator,
  25. writer shared.ResultWriter,
  26. ) *IngestEventsHandler {
  27. return &IngestEventsHandler{
  28. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  29. }
  30. }
  31. func (c *IngestEventsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  32. ctx, span := telemetry.NewSpan(r.Context(), "serve-ingest-events")
  33. defer span.End()
  34. proj, _ := ctx.Value(types.ProjectScope).(*models.Project)
  35. if !c.Config().BillingManager.MetronomeConfigLoaded || !proj.GetFeatureFlag(models.MetronomeEnabled, c.Config().LaunchDarklyClient) {
  36. c.WriteResult(w, r, "")
  37. telemetry.WithAttributes(span,
  38. telemetry.AttributeKV{Key: "metronome-config-exists", Value: c.Config().BillingManager.MetronomeConfigLoaded},
  39. telemetry.AttributeKV{Key: "metronome-enabled", Value: proj.GetFeatureFlag(models.MetronomeEnabled, c.Config().LaunchDarklyClient)},
  40. telemetry.AttributeKV{Key: "porter-cloud-enabled", Value: proj.EnableSandbox},
  41. )
  42. return
  43. }
  44. telemetry.WithAttributes(span,
  45. telemetry.AttributeKV{Key: "metronome-enabled", Value: true},
  46. telemetry.AttributeKV{Key: "usage-id", Value: proj.UsageID},
  47. )
  48. ingestEventsRequest := struct {
  49. Events []types.BillingEvent `json:"billing_events"`
  50. }{}
  51. if ok := c.DecodeAndValidate(w, r, &ingestEventsRequest); !ok {
  52. err := telemetry.Error(ctx, span, nil, "error decoding ingest events request")
  53. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
  54. return
  55. }
  56. telemetry.WithAttributes(span,
  57. telemetry.AttributeKV{Key: "usage-events-count", Value: len(ingestEventsRequest.Events)},
  58. )
  59. // For Porter Cloud events, we apend a prefix to avoid collisions before sending to Metronome
  60. if proj.EnableSandbox {
  61. for i := range ingestEventsRequest.Events {
  62. ingestEventsRequest.Events[i].CustomerID = fmt.Sprintf("porter-cloud-%s", ingestEventsRequest.Events[i].CustomerID)
  63. }
  64. }
  65. err := c.Config().BillingManager.MetronomeClient.IngestEvents(ctx, ingestEventsRequest.Events)
  66. if err != nil {
  67. err := telemetry.Error(ctx, span, err, "error ingesting events")
  68. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  69. return
  70. }
  71. // Call the ingest health endpoint
  72. err = c.postIngestHealthEndpoint(ctx, proj.ID)
  73. if err != nil {
  74. err := telemetry.Error(ctx, span, err, "error calling ingest health endpoint")
  75. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  76. return
  77. }
  78. c.WriteResult(w, r, "")
  79. }
  80. func (c *IngestEventsHandler) postIngestHealthEndpoint(ctx context.Context, projectID uint) (err error) {
  81. ctx, span := telemetry.NewSpan(ctx, "post-ingest-health-endpoint")
  82. defer span.End()
  83. // Call the ingest check webhook
  84. webhookUrl := c.Config().ServerConf.IngestStatusWebhookUrl
  85. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "ingest-status-webhook-url", Value: webhookUrl})
  86. if webhookUrl == "" {
  87. return nil
  88. }
  89. req := struct {
  90. ProjectID uint `json:"project_id"`
  91. }{
  92. ProjectID: projectID,
  93. }
  94. reqBody, err := json.Marshal(req)
  95. if err != nil {
  96. return telemetry.Error(ctx, span, err, "error marshalling ingest status webhook request")
  97. }
  98. client := &http.Client{}
  99. resp, err := client.Post(webhookUrl, "application/json", bytes.NewBuffer(reqBody))
  100. if err != nil || resp.StatusCode != http.StatusOK {
  101. return telemetry.Error(ctx, span, err, "error sending ingest status webhook request")
  102. }
  103. return nil
  104. }