ingest.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // NewGetUsageDashboardHandler returns a new GetUsageDashboardHandler
  2. package billing
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "net/http"
  9. "github.com/porter-dev/porter/api/server/handlers"
  10. "github.com/porter-dev/porter/api/server/shared"
  11. "github.com/porter-dev/porter/api/server/shared/apierrors"
  12. "github.com/porter-dev/porter/api/server/shared/config"
  13. "github.com/porter-dev/porter/api/types"
  14. "github.com/porter-dev/porter/internal/models"
  15. "github.com/porter-dev/porter/internal/telemetry"
  16. )
  17. // IngestEventsHandler is a handler for ingesting billing events
  18. type IngestEventsHandler struct {
  19. handlers.PorterHandlerReadWriter
  20. }
  21. // NewIngestEventsHandler returns a new IngestEventsHandler
  22. func NewIngestEventsHandler(
  23. config *config.Config,
  24. decoderValidator shared.RequestDecoderValidator,
  25. writer shared.ResultWriter,
  26. ) *IngestEventsHandler {
  27. return &IngestEventsHandler{
  28. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  29. }
  30. }
  31. func (c *IngestEventsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  32. ctx, span := telemetry.NewSpan(r.Context(), "serve-ingest-events")
  33. defer span.End()
  34. proj, _ := ctx.Value(types.ProjectScope).(*models.Project)
  35. telemetry.WithAttributes(span,
  36. telemetry.AttributeKV{Key: "lago-config-exists", Value: c.Config().BillingManager.LagoConfigLoaded},
  37. telemetry.AttributeKV{Key: "lago-enabled", Value: proj.GetFeatureFlag(models.LagoEnabled, c.Config().LaunchDarklyClient)},
  38. telemetry.AttributeKV{Key: "porter-cloud-enabled", Value: proj.EnableSandbox},
  39. )
  40. if !c.Config().BillingManager.LagoConfigLoaded || !proj.GetFeatureFlag(models.LagoEnabled, c.Config().LaunchDarklyClient) {
  41. c.WriteResult(w, r, "")
  42. return
  43. }
  44. ingestEventsRequest := struct {
  45. Events []types.BillingEvent `json:"billing_events"`
  46. }{}
  47. if ok := c.DecodeAndValidate(w, r, &ingestEventsRequest); !ok {
  48. err := telemetry.Error(ctx, span, nil, "error decoding ingest events request")
  49. c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
  50. return
  51. }
  52. telemetry.WithAttributes(span,
  53. telemetry.AttributeKV{Key: "usage-events-count", Value: len(ingestEventsRequest.Events)},
  54. )
  55. // For Porter Cloud events, we apend a prefix to avoid collisions before sending to Lago
  56. if proj.EnableSandbox {
  57. for i := range ingestEventsRequest.Events {
  58. ingestEventsRequest.Events[i].CustomerID = fmt.Sprintf("porter-cloud-%s", ingestEventsRequest.Events[i].CustomerID)
  59. }
  60. }
  61. plan, err := c.Config().BillingManager.LagoClient.GetCustomeActivePlan(ctx, proj.ID, proj.EnableSandbox)
  62. if err != nil {
  63. err := telemetry.Error(ctx, span, err, "error getting active subscription")
  64. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  65. return
  66. }
  67. telemetry.WithAttributes(span,
  68. telemetry.AttributeKV{Key: "subscription_id", Value: plan.ID},
  69. )
  70. err = c.Config().BillingManager.LagoClient.IngestEvents(ctx, plan.ID, ingestEventsRequest.Events, proj.EnableSandbox)
  71. if err != nil {
  72. err := telemetry.Error(ctx, span, err, "error ingesting events")
  73. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  74. return
  75. }
  76. // Call the ingest health endpoint
  77. err = c.postIngestHealthEndpoint(ctx, proj.ID)
  78. if err != nil {
  79. err := telemetry.Error(ctx, span, err, "error calling ingest health endpoint")
  80. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  81. return
  82. }
  83. c.WriteResult(w, r, "")
  84. }
  85. func (c *IngestEventsHandler) postIngestHealthEndpoint(ctx context.Context, projectID uint) (err error) {
  86. ctx, span := telemetry.NewSpan(ctx, "post-ingest-health-endpoint")
  87. defer span.End()
  88. // Call the ingest check webhook
  89. webhookUrl := c.Config().ServerConf.IngestStatusWebhookUrl
  90. telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "ingest-status-webhook-url", Value: webhookUrl})
  91. if webhookUrl == "" {
  92. return nil
  93. }
  94. req := struct {
  95. ProjectID uint `json:"project_id"`
  96. }{
  97. ProjectID: projectID,
  98. }
  99. reqBody, err := json.Marshal(req)
  100. if err != nil {
  101. return telemetry.Error(ctx, span, err, "error marshalling ingest status webhook request")
  102. }
  103. client := &http.Client{}
  104. resp, err := client.Post(webhookUrl, "application/json", bytes.NewBuffer(reqBody))
  105. if err != nil || resp.StatusCode != http.StatusOK {
  106. return telemetry.Error(ctx, span, err, "error sending ingest status webhook request")
  107. }
  108. return nil
  109. }