|
|
@@ -3,28 +3,33 @@ package billing
|
|
|
import (
|
|
|
"context"
|
|
|
"fmt"
|
|
|
- "net/http"
|
|
|
+ "strconv"
|
|
|
"time"
|
|
|
|
|
|
"github.com/getlago/lago-go-client"
|
|
|
- "github.com/google/uuid"
|
|
|
"github.com/porter-dev/porter/api/types"
|
|
|
"github.com/porter-dev/porter/internal/telemetry"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- defaultMaxRetries = 10
|
|
|
- porterStandardTrialDays = 15
|
|
|
defaultRewardAmountCents = 1000
|
|
|
- defaultPaidAmountCents = 0
|
|
|
maxReferralRewards = 10
|
|
|
+ defaultMaxRetries = 10
|
|
|
maxIngestEventLimit = 100
|
|
|
+
|
|
|
+ // These prefixes are used to build the customer and subscription IDs
|
|
|
+ // in Lago. This way we can reuse the project IDs instead of storing
|
|
|
+ // the Lago IDs in the database.
|
|
|
+
|
|
|
+ // SubscriptionIDPrefix is the prefix for the subscription ID
|
|
|
+ SubscriptionIDPrefix = "sub"
|
|
|
+ // CustomerIDPrefix is the prefix for the customer ID
|
|
|
+ CustomerIDPrefix = "cus"
|
|
|
)
|
|
|
|
|
|
// LagoClient is the client used to call the Lago API
|
|
|
type LagoClient struct {
|
|
|
client lago.Client
|
|
|
- billableMetrics []types.BillableMetric
|
|
|
PorterCloudPlanID string
|
|
|
PorterStandardPlanID string
|
|
|
|
|
|
@@ -64,7 +69,7 @@ func (m LagoClient) CreateCustomerWithPlan(ctx context.Context, userEmail string
|
|
|
return telemetry.Error(ctx, span, err, fmt.Sprintf("error while creating customer with plan %s", planID))
|
|
|
}
|
|
|
|
|
|
- subscriptionID := m.generateSubscriptionID(projectID, sandboxEnabled)
|
|
|
+ subscriptionID := m.GenerateLagoID(SubscriptionIDPrefix, projectID, sandboxEnabled)
|
|
|
|
|
|
err = m.addCustomerPlan(ctx, customerID, planID, subscriptionID)
|
|
|
|
|
|
@@ -76,7 +81,7 @@ func (m LagoClient) createCustomer(ctx context.Context, userEmail string, projec
|
|
|
ctx, span := telemetry.NewSpan(ctx, "create-metronome-customer")
|
|
|
defer span.End()
|
|
|
|
|
|
- customerID = m.generateCustomerID(projectID, sandboxEnabled)
|
|
|
+ customerID = m.GenerateLagoID(CustomerIDPrefix, projectID, sandboxEnabled)
|
|
|
|
|
|
customerInput := &lago.CustomerInput{
|
|
|
ExternalID: customerID,
|
|
|
@@ -122,14 +127,15 @@ func (m LagoClient) addCustomerPlan(ctx context.Context, projectID string, planI
|
|
|
}
|
|
|
|
|
|
// ListCustomerPlan will return the current active plan to which the user is subscribed
|
|
|
-func (m LagoClient) ListCustomerPlan(ctx context.Context, subscriptionID string) (plan types.Plan, err error) {
|
|
|
+func (m LagoClient) ListCustomerPlan(ctx context.Context, projectID uint, sandboxEnabled bool) (plan types.Plan, err error) {
|
|
|
ctx, span := telemetry.NewSpan(ctx, "list-customer-plans")
|
|
|
defer span.End()
|
|
|
|
|
|
- if subscriptionID == "" {
|
|
|
- return plan, telemetry.Error(ctx, span, err, "subscription id empty")
|
|
|
+ if projectID == 0 {
|
|
|
+ return plan, telemetry.Error(ctx, span, err, "project id empty")
|
|
|
}
|
|
|
|
|
|
+ subscriptionID := m.GenerateLagoID(SubscriptionIDPrefix, projectID, sandboxEnabled)
|
|
|
subscription, lagoErr := m.client.Subscription().Get(ctx, subscriptionID)
|
|
|
if err != nil {
|
|
|
return plan, telemetry.Error(ctx, span, lagoErr.Err, "failed to create subscription")
|
|
|
@@ -143,14 +149,15 @@ func (m LagoClient) ListCustomerPlan(ctx context.Context, subscriptionID string)
|
|
|
}
|
|
|
|
|
|
// EndCustomerPlan will immediately end the plan for the given customer
|
|
|
-func (m LagoClient) EndCustomerPlan(ctx context.Context, subscriptionID string) (err error) {
|
|
|
+func (m LagoClient) EndCustomerPlan(ctx context.Context, projectID uint) (err error) {
|
|
|
ctx, span := telemetry.NewSpan(ctx, "end-metronome-customer-plan")
|
|
|
defer span.End()
|
|
|
|
|
|
- if subscriptionID == "" {
|
|
|
+ if projectID == 0 {
|
|
|
return telemetry.Error(ctx, span, err, "subscription id empty")
|
|
|
}
|
|
|
|
|
|
+ subscriptionID := m.GenerateLagoID(SubscriptionIDPrefix, projectID, false)
|
|
|
subscriptionTerminateInput := lago.SubscriptionTerminateInput{
|
|
|
ExternalID: subscriptionID,
|
|
|
}
|
|
|
@@ -164,41 +171,42 @@ func (m LagoClient) EndCustomerPlan(ctx context.Context, subscriptionID string)
|
|
|
}
|
|
|
|
|
|
// ListCustomerCredits will return the total number of credits for the customer
|
|
|
-func (m LagoClient) ListCustomerCredits(ctx context.Context, customerID string) (credits types.ListCreditGrantsResponse, err error) {
|
|
|
- ctx, span := telemetry.NewSpan(ctx, "list-customer-credits")
|
|
|
- defer span.End()
|
|
|
+// func (m LagoClient) ListCustomerCredits(ctx context.Context, customerID string) (credits types.ListCreditGrantsResponse, err error) {
|
|
|
+// ctx, span := telemetry.NewSpan(ctx, "list-customer-credits")
|
|
|
+// defer span.End()
|
|
|
|
|
|
- if customerID == "" {
|
|
|
- return credits, telemetry.Error(ctx, span, err, "customer id empty")
|
|
|
- }
|
|
|
+// if customerID == "" {
|
|
|
+// return credits, telemetry.Error(ctx, span, err, "customer id empty")
|
|
|
+// }
|
|
|
|
|
|
- walletListInput := &lago.WalletListInput{
|
|
|
- ExternalCustomerID: customerID,
|
|
|
- }
|
|
|
+// walletListInput := &lago.WalletListInput{
|
|
|
+// ExternalCustomerID: customerID,
|
|
|
+// }
|
|
|
|
|
|
- walletList, lagoErr := m.client.Wallet().GetList(ctx, walletListInput)
|
|
|
- if lagoErr.Err != nil {
|
|
|
- return credits, telemetry.Error(ctx, span, lagoErr.Err, "failed to get wallet")
|
|
|
- }
|
|
|
+// walletList, lagoErr := m.client.Wallet().GetList(ctx, walletListInput)
|
|
|
+// if lagoErr.Err != nil {
|
|
|
+// return credits, telemetry.Error(ctx, span, lagoErr.Err, "failed to get wallet")
|
|
|
+// }
|
|
|
|
|
|
- var response types.ListCreditGrantsResponse
|
|
|
- for _, wallet := range walletList.Wallets {
|
|
|
- response.GrantedBalanceCents += wallet.BalanceCents
|
|
|
- response.RemainingBalanceCents += wallet.OngoingUsageBalanceCents
|
|
|
- }
|
|
|
+// var response types.ListCreditGrantsResponse
|
|
|
+// for _, wallet := range walletList.Wallets {
|
|
|
+// response.GrantedBalanceCents += wallet.BalanceCents
|
|
|
+// response.RemainingBalanceCents += wallet.OngoingUsageBalanceCents
|
|
|
+// }
|
|
|
|
|
|
- return response, nil
|
|
|
-}
|
|
|
+// return response, nil
|
|
|
+// }
|
|
|
|
|
|
// CreateCreditsGrant will create a new credit grant for the customer with the specified amount
|
|
|
-func (m LagoClient) CreateCreditsGrant(ctx context.Context, customerID string, reason string, grantAmount float64, expiresAt string) (err error) {
|
|
|
+func (m LagoClient) CreateCreditsGrant(ctx context.Context, projectID uint, name string, grantAmount int64, expiresAt string, sandboxEnabled bool) (err error) {
|
|
|
ctx, span := telemetry.NewSpan(ctx, "create-credits-grant")
|
|
|
defer span.End()
|
|
|
|
|
|
- if customerID == "" {
|
|
|
- return telemetry.Error(ctx, span, err, "customer id empty")
|
|
|
+ if projectID == 0 {
|
|
|
+ return telemetry.Error(ctx, span, err, "project id empty")
|
|
|
}
|
|
|
|
|
|
+ customerID := m.GenerateLagoID(CustomerIDPrefix, projectID, sandboxEnabled)
|
|
|
expiresAtTime, err := time.Parse(time.RFC3339, expiresAt)
|
|
|
if err != nil {
|
|
|
return telemetry.Error(ctx, span, err, "failed to parse credit expiration timestamp")
|
|
|
@@ -206,8 +214,10 @@ func (m LagoClient) CreateCreditsGrant(ctx context.Context, customerID string, r
|
|
|
|
|
|
walletInput := &lago.WalletInput{
|
|
|
ExternalCustomerID: customerID,
|
|
|
+ Name: name,
|
|
|
Currency: lago.USD,
|
|
|
- RateAmount: fmt.Sprintf("%.2f", grantAmount),
|
|
|
+ GrantedCredits: strconv.FormatInt(grantAmount, 10),
|
|
|
+ RateAmount: "1",
|
|
|
ExpirationAt: &expiresAtTime,
|
|
|
}
|
|
|
|
|
|
@@ -220,110 +230,30 @@ func (m LagoClient) CreateCreditsGrant(ctx context.Context, customerID string, r
|
|
|
}
|
|
|
|
|
|
// ListCustomerUsage will return the aggregated usage for a customer
|
|
|
-func (m LagoClient) ListCustomerUsage(ctx context.Context, customerID uuid.UUID, startingOn string, endingBefore string, windowsSize string, currentPeriod bool) (usage []types.Usage, err error) {
|
|
|
+func (m LagoClient) ListCustomerUsage(ctx context.Context, projectID uint, currentPeriod bool, sandboxEnabled bool) (usage []types.Usage, err error) {
|
|
|
ctx, span := telemetry.NewSpan(ctx, "list-customer-usage")
|
|
|
defer span.End()
|
|
|
|
|
|
- if customerID == uuid.Nil {
|
|
|
- return usage, telemetry.Error(ctx, span, err, "customer id empty")
|
|
|
- }
|
|
|
-
|
|
|
- if len(m.billableMetrics) == 0 {
|
|
|
- billableMetrics, err := m.listBillableMetricIDs(ctx, customerID)
|
|
|
- if err != nil {
|
|
|
- return nil, telemetry.Error(ctx, span, err, "failed to list billable metrics")
|
|
|
- }
|
|
|
-
|
|
|
- telemetry.WithAttributes(span,
|
|
|
- telemetry.AttributeKV{Key: "billable-metric-count", Value: len(billableMetrics)},
|
|
|
- )
|
|
|
-
|
|
|
- // Cache billable metric ids for future calls
|
|
|
- m.billableMetrics = append(m.billableMetrics, billableMetrics...)
|
|
|
- }
|
|
|
-
|
|
|
- path := "usage/groups"
|
|
|
-
|
|
|
- startingOnTimestamp, endingBeforeTimestamp, err := parseAndCheckTimestamps(startingOn, endingBefore)
|
|
|
- if err != nil {
|
|
|
- return nil, telemetry.Error(ctx, span, err, err.Error())
|
|
|
+ if projectID == 0 {
|
|
|
+ return usage, telemetry.Error(ctx, span, err, "project id empty")
|
|
|
}
|
|
|
|
|
|
- baseReq := types.ListCustomerUsageRequest{
|
|
|
- CustomerID: customerID,
|
|
|
- WindowSize: windowsSize,
|
|
|
- StartingOn: startingOnTimestamp,
|
|
|
- EndingBefore: endingBeforeTimestamp,
|
|
|
- CurrentPeriod: currentPeriod,
|
|
|
+ subscriptionID := m.GenerateLagoID(SubscriptionIDPrefix, projectID, sandboxEnabled)
|
|
|
+ customerUsageInput := &lago.CustomerUsageInput{
|
|
|
+ ExternalSubscriptionID: subscriptionID,
|
|
|
}
|
|
|
|
|
|
- for _, billableMetric := range m.billableMetrics {
|
|
|
- telemetry.WithAttributes(span,
|
|
|
- telemetry.AttributeKV{Key: "billable-metric-id", Value: billableMetric.ID},
|
|
|
- )
|
|
|
-
|
|
|
- var result struct {
|
|
|
- Data []types.CustomerUsageMetric `json:"data"`
|
|
|
- }
|
|
|
-
|
|
|
- baseReq.BillableMetricID = billableMetric.ID
|
|
|
- _, err = m.do(http.MethodPost, path, "", baseReq, &result)
|
|
|
- if err != nil {
|
|
|
- return usage, telemetry.Error(ctx, span, err, "failed to get customer usage")
|
|
|
- }
|
|
|
-
|
|
|
- usage = append(usage, types.Usage{
|
|
|
- MetricName: billableMetric.Name,
|
|
|
- UsageMetrics: result.Data,
|
|
|
- })
|
|
|
+ customerID := m.GenerateLagoID(CustomerIDPrefix, projectID, sandboxEnabled)
|
|
|
+ _, lagoErr := m.client.Customer().CurrentUsage(ctx, customerID, customerUsageInput)
|
|
|
+ if lagoErr.Err != nil {
|
|
|
+ return usage, telemetry.Error(ctx, span, lagoErr.Err, "failed to get customer usage")
|
|
|
}
|
|
|
|
|
|
return usage, nil
|
|
|
}
|
|
|
|
|
|
-// ListCustomerCosts will return the costs for a customer over a time period
|
|
|
-func (m LagoClient) ListCustomerCosts(ctx context.Context, customerID uuid.UUID, startingOn string, endingBefore string, limit int) (costs []types.FormattedCost, err error) {
|
|
|
- ctx, span := telemetry.NewSpan(ctx, "list-customer-costs")
|
|
|
- defer span.End()
|
|
|
-
|
|
|
- if customerID == uuid.Nil {
|
|
|
- return costs, telemetry.Error(ctx, span, err, "customer id empty")
|
|
|
- }
|
|
|
-
|
|
|
- path := fmt.Sprintf("customers/%s/costs", customerID)
|
|
|
-
|
|
|
- var result struct {
|
|
|
- Data []types.Cost `json:"data"`
|
|
|
- }
|
|
|
-
|
|
|
- startingOnTimestamp, endingBeforeTimestamp, err := parseAndCheckTimestamps(startingOn, endingBefore)
|
|
|
- if err != nil {
|
|
|
- return nil, telemetry.Error(ctx, span, err, err.Error())
|
|
|
- }
|
|
|
-
|
|
|
- queryParams := fmt.Sprintf("starting_on=%s&ending_before=%s&limit=%d", startingOnTimestamp, endingBeforeTimestamp, limit)
|
|
|
-
|
|
|
- _, err = m.do(http.MethodGet, path, queryParams, nil, &result)
|
|
|
- if err != nil {
|
|
|
- return costs, telemetry.Error(ctx, span, err, "failed to create credits grant")
|
|
|
- }
|
|
|
-
|
|
|
- for _, customerCost := range result.Data {
|
|
|
- formattedCost := types.FormattedCost{
|
|
|
- StartTimestamp: customerCost.StartTimestamp,
|
|
|
- EndTimestamp: customerCost.EndTimestamp,
|
|
|
- }
|
|
|
- for _, creditType := range customerCost.CreditTypes {
|
|
|
- formattedCost.Cost += creditType.Cost
|
|
|
- }
|
|
|
- costs = append(costs, formattedCost)
|
|
|
- }
|
|
|
-
|
|
|
- return costs, nil
|
|
|
-}
|
|
|
-
|
|
|
// IngestEvents sends a list of billing events to Metronome's ingest endpoint
|
|
|
-func (m LagoClient) IngestEvents(ctx context.Context, events []types.BillingEvent) (err error) {
|
|
|
+func (m LagoClient) IngestEvents(ctx context.Context, events []types.BillingEvent, enableSandbox bool) (err error) {
|
|
|
ctx, span := telemetry.NewSpan(ctx, "ingets-billing-events")
|
|
|
defer span.End()
|
|
|
|
|
|
@@ -331,8 +261,6 @@ func (m LagoClient) IngestEvents(ctx context.Context, events []types.BillingEven
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- path := "ingest"
|
|
|
-
|
|
|
for i := 0; i < len(events); i += maxIngestEventLimit {
|
|
|
end := i + maxIngestEventLimit
|
|
|
if end > len(events) {
|
|
|
@@ -340,29 +268,29 @@ func (m LagoClient) IngestEvents(ctx context.Context, events []types.BillingEven
|
|
|
}
|
|
|
|
|
|
batch := events[i:end]
|
|
|
+ batchInput := make([]lago.EventInput, len(batch))
|
|
|
|
|
|
- // Retry each batch to make sure all events are ingested
|
|
|
- var currentAttempts int
|
|
|
- for currentAttempts < defaultMaxRetries {
|
|
|
- statusCode, err := m.do(http.MethodPost, path, "", batch, nil)
|
|
|
- // Check errors that are not from error http codes
|
|
|
- if statusCode == 0 && err != nil {
|
|
|
- return telemetry.Error(ctx, span, err, "failed to ingest billing events")
|
|
|
+ for i := range batch {
|
|
|
+ customerID, err := strconv.ParseUint(batch[i].CustomerID, 10, 64)
|
|
|
+ if err != nil {
|
|
|
+ return telemetry.Error(ctx, span, err, "failed to parse customer ID")
|
|
|
}
|
|
|
-
|
|
|
- if statusCode == http.StatusForbidden || statusCode == http.StatusUnauthorized {
|
|
|
- return telemetry.Error(ctx, span, err, "unauthorized")
|
|
|
- }
|
|
|
-
|
|
|
- // 400 responses should not be retried
|
|
|
- if statusCode == http.StatusBadRequest {
|
|
|
- return telemetry.Error(ctx, span, err, "malformed billing events")
|
|
|
+ externalSubscriptionID := m.GenerateLagoID(SubscriptionIDPrefix, uint(customerID), enableSandbox)
|
|
|
+
|
|
|
+ event := lago.EventInput{
|
|
|
+ TransactionID: batch[i].TransactionID,
|
|
|
+ ExternalSubscriptionID: externalSubscriptionID,
|
|
|
+ Code: batch[i].EventType,
|
|
|
+ Timestamp: batch[i].Timestamp,
|
|
|
+ Properties: batch[i].Properties,
|
|
|
}
|
|
|
+ batchInput = append(batchInput, event)
|
|
|
+ }
|
|
|
|
|
|
- // Any other status code can be safely retried
|
|
|
- if statusCode == http.StatusOK {
|
|
|
- break
|
|
|
- }
|
|
|
+ // Retry each batch to make sure all events are ingested
|
|
|
+ var currentAttempts int
|
|
|
+ for currentAttempts < defaultMaxRetries {
|
|
|
+ m.client.Event().Batch(ctx, &batchInput)
|
|
|
currentAttempts++
|
|
|
}
|
|
|
|
|
|
@@ -374,40 +302,10 @@ func (m LagoClient) IngestEvents(ctx context.Context, events []types.BillingEven
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (m LagoClient) listBillableMetricIDs(ctx context.Context, customerID uuid.UUID) (billableMetrics []types.BillableMetric, err error) {
|
|
|
- ctx, span := telemetry.NewSpan(ctx, "list-billable-metrics")
|
|
|
- defer span.End()
|
|
|
-
|
|
|
- if customerID == uuid.Nil {
|
|
|
- return billableMetrics, telemetry.Error(ctx, span, err, "customer id empty")
|
|
|
- }
|
|
|
-
|
|
|
- path := fmt.Sprintf("/customers/%s/billable-metrics", customerID)
|
|
|
-
|
|
|
- var result struct {
|
|
|
- Data []types.BillableMetric `json:"data"`
|
|
|
- }
|
|
|
-
|
|
|
- _, err = m.do(http.MethodGet, path, "", nil, &result)
|
|
|
- if err != nil {
|
|
|
- return billableMetrics, telemetry.Error(ctx, span, err, "failed to retrieve billable metrics from metronome")
|
|
|
- }
|
|
|
-
|
|
|
- return result.Data, nil
|
|
|
-}
|
|
|
-
|
|
|
-func (m LagoClient) generateCustomerID(projectID uint, sandboxEnabled bool) string {
|
|
|
- if sandboxEnabled {
|
|
|
- return fmt.Sprintf("cloud_cus_%d", projectID)
|
|
|
- }
|
|
|
-
|
|
|
- return fmt.Sprintf("cus_%d", projectID)
|
|
|
-}
|
|
|
-
|
|
|
-func (m LagoClient) generateSubscriptionID(projectID uint, sandboxEnabled bool) string {
|
|
|
+func (m LagoClient) GenerateLagoID(prefix string, projectID uint, sandboxEnabled bool) string {
|
|
|
if sandboxEnabled {
|
|
|
- return fmt.Sprintf("cloud_sub_%d", projectID)
|
|
|
+ return fmt.Sprintf("cloud_%s_%d", prefix, projectID)
|
|
|
}
|
|
|
|
|
|
- return fmt.Sprintf("sub_%d", projectID)
|
|
|
+ return fmt.Sprintf("%s_%d", prefix, projectID)
|
|
|
}
|