|
|
@@ -350,7 +350,7 @@ func (m LagoClient) IngestEvents(ctx context.Context, subscriptionID string, eve
|
|
|
}
|
|
|
|
|
|
batch := events[i:end]
|
|
|
- batchInput := make([]lago.EventInput, len(batch))
|
|
|
+ var batchInput []lago.EventInput
|
|
|
for i := range batch {
|
|
|
externalSubscriptionID := subscriptionID
|
|
|
if enableSandbox {
|
|
|
@@ -367,7 +367,6 @@ func (m LagoClient) IngestEvents(ctx context.Context, subscriptionID string, eve
|
|
|
TransactionID: batch[i].TransactionID,
|
|
|
ExternalSubscriptionID: externalSubscriptionID,
|
|
|
Code: batch[i].EventType,
|
|
|
- Timestamp: batch[i].Timestamp,
|
|
|
Properties: batch[i].Properties,
|
|
|
}
|
|
|
batchInput = append(batchInput, event)
|
|
|
@@ -375,12 +374,13 @@ func (m LagoClient) IngestEvents(ctx context.Context, subscriptionID string, eve
|
|
|
|
|
|
// Retry each batch to make sure all events are ingested
|
|
|
var currentAttempts int
|
|
|
- for currentAttempts < defaultMaxRetries {
|
|
|
+ for currentAttempts := 0; currentAttempts < defaultMaxRetries; currentAttempts++ {
|
|
|
_, lagoErr := m.client.Event().Batch(ctx, &batchInput)
|
|
|
- if lagoErr == nil {
|
|
|
- return telemetry.Error(ctx, span, fmt.Errorf(lagoErr.ErrorCode), "error sending ingest events to Lago")
|
|
|
+ if lagoErr != nil {
|
|
|
+ telemetry.Error(ctx, span, err, "failed to send ingest events")
|
|
|
+ continue
|
|
|
}
|
|
|
- currentAttempts++
|
|
|
+ break
|
|
|
}
|
|
|
|
|
|
if currentAttempts == defaultMaxRetries {
|