|
|
@@ -160,41 +160,50 @@ func GlobalStreamListener(
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ config.Logger.Debug().Msg(fmt.Sprintf("pushing state and log file for %s with status %v", workspaceID, statusVal))
|
|
|
+
|
|
|
switch fmt.Sprintf("%v", statusVal) {
|
|
|
- case "created":
|
|
|
- err := handleOperationCreated(config, client, infra, operation, workspaceID)
|
|
|
+ case "created", "error", "destroyed":
|
|
|
+ err := cleanupOperation(config, client, infra, operation, workspaceID)
|
|
|
|
|
|
if err != nil {
|
|
|
config.Alerter.SendAlert(context.Background(), err, map[string]interface{}{
|
|
|
"workspace_id": workspaceID,
|
|
|
})
|
|
|
}
|
|
|
- case "error":
|
|
|
- case "destroyed":
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func handleOperationCreated(config *config.Config, client *redis.Client, infra *models.Infra, operation *models.Operation, workspaceID string) error {
|
|
|
+func cleanupOperation(config *config.Config, client *redis.Client, infra *models.Infra, operation *models.Operation, workspaceID string) error {
|
|
|
+ l := config.Logger
|
|
|
+ l.Debug().Msg(fmt.Sprintf("pushing state for %s", workspaceID))
|
|
|
+
|
|
|
err := pushNewStateToStorage(config, client, infra, operation, workspaceID)
|
|
|
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ l.Debug().Msg(fmt.Sprintf("cleaning state stream for %s", workspaceID))
|
|
|
+
|
|
|
err = cleanupStateStream(config, client, workspaceID)
|
|
|
|
|
|
if err != nil {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ l.Debug().Msg(fmt.Sprintf("pushing logs for %s", workspaceID))
|
|
|
+
|
|
|
err = pushLogsToStorage(config, client, infra, workspaceID)
|
|
|
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ l.Debug().Msg(fmt.Sprintf("cleaning logs for %s", workspaceID))
|
|
|
+
|
|
|
err = cleanupLogStream(config, client, infra, workspaceID)
|
|
|
|
|
|
if err != nil {
|