فهرست منبع

Merge pull request #135 from aznashwan/delete-source-disks

Delete source disks
Nashwan Azhari 5 سال پیش
والد
کامیت
6c3af2e109

+ 35 - 16
coriolis/conductor/rpc/server.py

@@ -720,9 +720,13 @@ class ConductorServerEndpoint(object):
         for instance in replica.instances:
             if (instance in replica.info and
                     replica.info[instance].get('volumes_info')):
+                source_del_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS,
+                    execution)
                 self._create_task(
                     instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS,
-                    execution)
+                    execution, depends_on=[source_del_task.id])
                 has_tasks = True
 
         if not has_tasks:
@@ -931,8 +935,10 @@ class ConductorServerEndpoint(object):
                     validate_replica_deployment_inputs_task.id])
 
             deploy_replica_task = self._create_task(
-                instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
-                execution, [create_snapshot_task.id])
+                instance,
+                constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES,
+                execution,
+                depends_on=[create_snapshot_task.id])
 
             depends_on = [deploy_replica_task.id]
             if not skip_os_morphing:
@@ -970,7 +976,8 @@ class ConductorServerEndpoint(object):
                 depends_on=depends_on)
 
             self._create_task(
-                instance, constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS,
+                instance,
+                constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS,
                 execution, depends_on=[
                     create_snapshot_task.id,
                     finalize_deployment_task.id],
@@ -1101,7 +1108,8 @@ class ConductorServerEndpoint(object):
             # if all the source disk snapshotting and worker setup steps are
             # performed by the source plugin in REPLICATE_DISKS.
             # This should no longer be a problem when worker pooling lands.
-            last_migration_task = None
+            last_sync_task = None
+            first_sync_task = None
             migration_resources_tasks = [
                 deploy_migration_source_resources_task.id,
                 deploy_migration_target_resources_task.id]
@@ -1110,26 +1118,35 @@ class ConductorServerEndpoint(object):
                 if i == (migration.replication_count - 1) and (
                         migration.shutdown_instances):
                     shutdown_deps = migration_resources_tasks
-                    if last_migration_task:
-                        shutdown_deps = [last_migration_task.id]
-                    last_migration_task = self._create_task(
+                    if last_sync_task:
+                        shutdown_deps = [last_sync_task.id]
+                    last_sync_task = self._create_task(
                         instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
                         execution, depends_on=shutdown_deps)
 
                 replication_deps = migration_resources_tasks
-                if last_migration_task:
-                    replication_deps = [last_migration_task.id]
+                if last_sync_task:
+                    replication_deps = [last_sync_task.id]
 
-                last_migration_task = self._create_task(
+                last_sync_task = self._create_task(
                     instance, constants.TASK_TYPE_REPLICATE_DISKS,
                     execution, depends_on=replication_deps)
+                if not first_sync_task:
+                    first_sync_task = last_sync_task
 
             delete_source_resources_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
                 execution, depends_on=[
                     deploy_migration_source_resources_task.id,
-                    last_migration_task.id],
+                    last_sync_task.id],
+                on_error=True)
+
+            cleanup_source_storage_task = self._create_task(
+                instance, constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE,
+                execution, depends_on=[
+                    first_sync_task.id,
+                    delete_source_resources_task.id],
                 on_error=True)
 
             delete_destination_resources_task = self._create_task(
@@ -1137,13 +1154,13 @@ class ConductorServerEndpoint(object):
                 constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
                 execution, depends_on=[
                     deploy_migration_target_resources_task.id,
-                    last_migration_task.id],
+                    last_sync_task.id],
                 on_error=True)
 
             deploy_instance_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
                 execution, depends_on=[
-                    last_migration_task.id,
+                    last_sync_task.id,
                     delete_destination_resources_task.id])
 
             depends_on = [deploy_instance_task.id]
@@ -1191,12 +1208,13 @@ class ConductorServerEndpoint(object):
 
             cleanup_deps = [
                 create_instance_disks_task.id,
+                cleanup_source_storage_task.id,
                 delete_destination_resources_task.id,
                 cleanup_failed_deployment_task.id]
             if task_delete_os_morphing_resources:
                 cleanup_deps.append(task_delete_os_morphing_resources.id)
             self._create_task(
-                instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
+                instance, constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE,
                 execution, depends_on=cleanup_deps,
                 on_error_only=True)
 
@@ -1807,7 +1825,8 @@ class ConductorServerEndpoint(object):
                     ctxt, execution.action_id, task.instance,
                     {"volumes_info": volumes_info})
 
-        elif task_type == constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
+        elif task_type == (
+                constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS):
 
             if not task_info.get("clone_disks"):
                 # The migration completed. If the replica is executed again,

+ 10 - 3
coriolis/constants.py

@@ -76,7 +76,10 @@ TASK_TYPE_DEPLOY_INSTANCE_RESOURCES = "DEPLOY_INSTANCE_RESOURCES"
 TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT = "FINALIZE_INSTANCE_DEPLOYMENT"
 TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT = (
     "CLEANUP_FAILED_INSTANCE_DEPLOYMENT")
-TASK_TYPE_CLEANUP_INSTANCE_STORAGE = "CLEANUP_INSTANCE_STORAGE"
+TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE = (
+    "CLEANUP_INSTANCE_SOURCE_STORAGE")
+TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE = (
+    "CLEANUP_INSTANCE_TARGET_STORAGE")
 
 TASK_TYPE_CREATE_INSTANCE_DISKS = "CREATE_INSTANCE_DISKS"
 
@@ -87,6 +90,8 @@ TASK_TYPE_DELETE_OS_MORPHING_RESOURCES = "DELETE_OS_MORPHING_RESOURCES"
 
 TASK_TYPE_GET_INSTANCE_INFO = "GET_INSTANCE_INFO"
 TASK_TYPE_DEPLOY_REPLICA_DISKS = "DEPLOY_REPLICA_DISKS"
+TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS = (
+    "DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS")
 TASK_TYPE_DELETE_REPLICA_DISKS = "DELETE_REPLICA_DISKS"
 TASK_TYPE_REPLICATE_DISKS = "REPLICATE_DISKS"
 TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES = "DEPLOY_REPLICA_SOURCE_RESOURCES"
@@ -94,13 +99,15 @@ TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES = "DELETE_REPLICA_SOURCE_RESOURCES"
 TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES = "DEPLOY_REPLICA_TARGET_RESOURCES"
 TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES = "DELETE_REPLICA_TARGET_RESOURCES"
 TASK_TYPE_SHUTDOWN_INSTANCE = "SHUTDOWN_INSTANCE"
-TASK_TYPE_DEPLOY_REPLICA_INSTANCE = "DEPLOY_REPLICA_INSTANCE"
+TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES = (
+    "DEPLOY_REPLICA_INSTANCE_RESOURCES")
 TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT = (
     "FINALIZE_REPLICA_INSTANCE_DEPLOYMENT")
 TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT = (
     "CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT")
 TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS = "CREATE_REPLICA_DISK_SNAPSHOTS"
-TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS = "DELETE_REPLICA_DISK_SNAPSHOTS"
+TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS = (
+    "DELETE_REPLICA_TARGET_DISK_SNAPSHOTS")
 TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS = "RESTORE_REPLICA_DISK_SNAPSHOTS"
 TASK_TYPE_GET_OPTIMAL_FLAVOR = "GET_OPTIMAL_FLAVOR"
 TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS = (

+ 47 - 32
coriolis/providers/base.py

@@ -227,13 +227,15 @@ class BaseImportInstanceProvider(BaseInstanceProvider):
         return dest_instance_name
 
     @abc.abstractmethod
-    def deploy_os_morphing_resources(self, ctxt, connection_info,
-                                     instance_deployment_info):
+    def deploy_os_morphing_resources(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
         pass
 
     @abc.abstractmethod
-    def delete_os_morphing_resources(self, ctxt, connection_info,
-                                     os_morphing_resources):
+    def delete_os_morphing_resources(
+            self, ctxt, connection_info, target_environment,
+            os_morphing_resources):
         pass
 
 
@@ -306,18 +308,20 @@ class BaseImportProvider(BaseImportInstanceProvider):
         pass
 
     @abc.abstractmethod
-    def deploy_disk_copy_resources(self, ctxt, connection_info,
-                                   target_environment, volumes_info):
+    def deploy_disk_copy_resources(
+            self, ctxt, connection_info, target_environment, volumes_info):
         pass
 
     @abc.abstractmethod
-    def delete_disk_copy_resources(self, ctxt, connection_info,
-                                   target_resources_dict):
+    def delete_disk_copy_resources(
+            self, ctxt, connection_info, target_environment,
+            target_resources_dict):
         pass
 
     @abc.abstractmethod
-    def finalize_import_instance(self, ctxt, connection_info,
-                                 instance_deployment_info):
+    def finalize_import_instance(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
         """ Should return a dict with the info of the migrated VM on the
         destination platform in the same format as offered by
         'BaseExportProvider.export_instance()'.
@@ -325,22 +329,24 @@ class BaseImportProvider(BaseImportInstanceProvider):
         return {}
 
     @abc.abstractmethod
-    def cleanup_failed_import_instance(self, ctxt, connection_info,
-                                       instance_deployment_info):
+    def cleanup_failed_import_instance(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
         pass
 
 
 class BaseReplicaImportProvider(BaseImportInstanceProvider):
 
     @abc.abstractmethod
-    def deploy_replica_instance(self, ctxt, connection_info,
-                                target_environment, instance_name, export_info,
-                                volumes_info, clone_disks):
+    def deploy_replica_instance(
+            self, ctxt, connection_info, target_environment,
+            instance_name, export_info, volumes_info, clone_disks):
         pass
 
     @abc.abstractmethod
-    def finalize_replica_instance_deployment(self, ctxt, connection_info,
-                                             instance_deployment_info):
+    def finalize_replica_instance_deployment(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
         """ Should return a dict with the info of the migrated VM on the
         destination platform in the same format as offered by
         'BaseExportProvider.export_instance()'.
@@ -348,42 +354,46 @@ class BaseReplicaImportProvider(BaseImportInstanceProvider):
         return {}
 
     @abc.abstractmethod
-    def cleanup_failed_replica_instance_deployment(self, ctxt, connection_info,
-                                                   instance_deployment_info):
+    def cleanup_failed_replica_instance_deployment(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
         pass
 
     @abc.abstractmethod
-    def deploy_replica_disks(self, ctxt, connection_info, target_environment,
-                             instance_name, export_info, volumes_info):
+    def deploy_replica_disks(
+            self, ctxt, connection_info, target_environment, instance_name,
+            export_info, volumes_info):
         pass
 
     @abc.abstractmethod
-    def deploy_replica_target_resources(self, ctxt, connection_info,
-                                        target_environment, volumes_info):
+    def deploy_replica_target_resources(
+            self, ctxt, connection_info, target_environment, volumes_info):
         pass
 
     @abc.abstractmethod
-    def delete_replica_target_resources(self, ctxt, connection_info,
-                                        migr_resources_dict):
+    def delete_replica_target_resources(
+            self, ctxt, connection_info, target_environment,
+            migr_resources_dict):
         pass
 
     @abc.abstractmethod
-    def delete_replica_disks(self, ctxt, connection_info, volumes_info):
+    def delete_replica_disks(
+            self, ctxt, connection_info, target_environment, volumes_info):
         pass
 
     @abc.abstractmethod
-    def create_replica_disk_snapshots(self, ctxt, connection_info,
-                                      volumes_info):
+    def create_replica_disk_snapshots(
+            self, ctxt, connection_info, target_environment, volumes_info):
         pass
 
     @abc.abstractmethod
-    def delete_replica_disk_snapshots(self, ctxt, connection_info,
-                                      volumes_info):
+    def delete_replica_target_disk_snapshots(
+            self, ctxt, connection_info, target_environment, volumes_info):
         pass
 
     @abc.abstractmethod
-    def restore_replica_disk_snapshots(self, ctxt, connection_info,
-                                       volumes_info):
+    def restore_replica_disk_snapshots(
+            self, ctxt, connection_info, target_environment, volumes_info):
         pass
 
 
@@ -431,6 +441,11 @@ class BaseReplicaExportProvider(BaseExportInstanceProvider):
                         target_conn_info, volumes_info, incremental):
         pass
 
+    @abc.abstractmethod
+    def delete_replica_source_snapshots(
+            self, ctxt, connection_info, source_environment, volumes_info):
+        pass
+
     @abc.abstractmethod
     def shutdown_instance(self, ctxt, connection_info, source_environment,
                           instance_name):

+ 10 - 6
coriolis/tasks/factory.py

@@ -24,8 +24,10 @@ _TASKS_MAP = {
         migration_tasks.CreateInstanceDisksTask,
     constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT:
         migration_tasks.CleanupFailedInstanceDeploymentTask,
-    constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE:
-        migration_tasks.CleanupInstanceStorageTask,
+    constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE:
+        migration_tasks.CleanupInstanceTargetStorageTask,
+    constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE:
+        migration_tasks.CleanupInstanceSourceStorageTask,
     constants.TASK_TYPE_GET_OPTIMAL_FLAVOR:
         migration_tasks.GetOptimalFlavorTask,
     constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS:
@@ -46,6 +48,8 @@ _TASKS_MAP = {
         replica_tasks.ShutdownInstanceTask,
     constants.TASK_TYPE_DEPLOY_REPLICA_DISKS:
         replica_tasks.DeployReplicaDisksTask,
+    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS:
+        replica_tasks.DeleteReplicaSourceDiskSnapshotsTask,
     constants.TASK_TYPE_DELETE_REPLICA_DISKS:
         replica_tasks.DeleteReplicaDisksTask,
     constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES:
@@ -56,16 +60,16 @@ _TASKS_MAP = {
         replica_tasks.DeployReplicaSourceResourcesTask,
     constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES:
         replica_tasks.DeleteReplicaSourceResourcesTask,
-    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE:
-        replica_tasks.DeployReplicaInstanceTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES:
+        replica_tasks.DeployReplicaInstanceResourcesTask,
     constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT:
         replica_tasks.FinalizeReplicaInstanceDeploymentTask,
     constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT:
         replica_tasks.CleanupFailedReplicaInstanceDeploymentTask,
     constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS:
         replica_tasks.CreateReplicaDiskSnapshotsTask,
-    constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
-        replica_tasks.DeleteReplicaDiskSnapshotsTask,
+    constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS:
+        replica_tasks.DeleteReplicaTargetDiskSnapshotsTask,
     constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
         replica_tasks.RestoreReplicaDiskSnapshotsTask,
     constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS:

+ 8 - 2
coriolis/tasks/migration_tasks.py

@@ -62,7 +62,12 @@ class CreateInstanceDisksTask(
     pass
 
 
-class CleanupInstanceStorageTask(replica_tasks.DeleteReplicaDisksTask):
+class CleanupInstanceTargetStorageTask(replica_tasks.DeleteReplicaDisksTask):
+    pass
+
+
+class CleanupInstanceSourceStorageTask(
+        replica_tasks.DeleteReplicaSourceDiskSnapshotsTask):
     pass
 
 
@@ -101,5 +106,6 @@ class DeleteMigrationTargetResourcesTask(
     pass
 
 
-class DeployInstanceResourcesTask(replica_tasks.DeployReplicaInstanceTask):
+class DeployInstanceResourcesTask(
+        replica_tasks.DeployReplicaInstanceResourcesTask):
     pass

+ 6 - 4
coriolis/tasks/osmorphing_tasks.py

@@ -66,7 +66,7 @@ class DeployOSMorphingResourcesTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["instance_deployment_info"]
+        return ["target_environment", "instance_deployment_info"]
 
     @property
     def returned_task_info_properties(self):
@@ -80,10 +80,11 @@ class DeployOSMorphingResourcesTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
+        target_environment = task_info["target_environment"]
         instance_deployment_info = task_info["instance_deployment_info"]
 
         import_info = provider.deploy_os_morphing_resources(
-            ctxt, connection_info, instance_deployment_info)
+            ctxt, connection_info, target_environment, instance_deployment_info)
 
         schemas.validate_value(
             import_info, schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA,
@@ -125,7 +126,7 @@ class DeleteOSMorphingResourcesTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["os_morphing_resources"]
+        return ["target_environment", "os_morphing_resources"]
 
     @property
     def returned_task_info_properties(self):
@@ -138,9 +139,10 @@ class DeleteOSMorphingResourcesTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
         os_morphing_resources = task_info.get("os_morphing_resources")
+        target_environment = task_info["target_environment"]
 
         provider.delete_os_morphing_resources(
-            ctxt, connection_info, os_morphing_resources)
+            ctxt, connection_info, target_environment, os_morphing_resources)
 
         return {
             "os_morphing_resources": None,

+ 66 - 20
coriolis/tasks/replica_tasks.py

@@ -214,6 +214,43 @@ class DeployReplicaDisksTask(base.TaskRunner):
             'volumes_info': volumes_info}
 
 
+class DeleteReplicaSourceDiskSnapshotsTask(base.TaskRunner):
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "volumes_info", "source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        event_manager = events.EventManager(event_handler)
+        if not task_info.get("volumes_info"):
+            LOG.debug(
+                "No volumes_info present. Skipping source snapshot deletion.")
+            event_manager.progress_update(
+                "No previous volumes information present, nothing to delete")
+            return {'volumes_info': []}
+
+
+        provider = providers_factory.get_provider(
+            origin['type'], constants.PROVIDER_TYPE_REPLICA_EXPORT,
+            event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+        source_environment = task_info['source_environment']
+        volumes_info = _get_volumes_info(task_info)
+
+        volumes_info = provider.delete_replica_source_snapshots(
+            ctxt, connection_info, source_environment, volumes_info)
+
+        return {
+            'volumes_info': volumes_info}
+
+
 class DeleteReplicaDisksTask(base.TaskRunner):
 
     @property
@@ -227,9 +264,12 @@ class DeleteReplicaDisksTask(base.TaskRunner):
 
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
+        event_manager = events.EventManager(event_handler)
         if not task_info.get("volumes_info"):
             LOG.debug(
                 "No volumes_info present. Skipping disk deletion.")
+            event_manager.progress_update(
+                "No previous volumes information present, nothing to delete")
             return {'volumes_info': []}
 
         provider = providers_factory.get_provider(
@@ -238,10 +278,10 @@ class DeleteReplicaDisksTask(base.TaskRunner):
         connection_info = base.get_connection_info(ctxt, destination)
 
         volumes_info = _get_volumes_info(task_info)
+        target_environment = task_info['target_environment']
 
-        # TODO (aznashwan): add target_env options to `delete_replica_disks`:
         volumes_info = provider.delete_replica_disks(
-            ctxt, connection_info, volumes_info)
+            ctxt, connection_info, target_environment, volumes_info)
         if volumes_info:
             LOG.warn(
                 "'volumes_info' should have been void after disk "
@@ -270,7 +310,7 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
 
-        source_environment = task_info['source_environment'] or {}
+        source_environment = task_info.get('source_environment', {})
         export_info = task_info['export_info']
         replica_resources_info = provider.deploy_replica_source_resources(
             ctxt, connection_info, export_info, source_environment)
@@ -447,18 +487,18 @@ class DeleteReplicaTargetResourcesTask(base.TaskRunner):
         connection_info = base.get_connection_info(ctxt, destination)
 
         migr_resources = task_info.get("target_resources")
+        target_environment = task_info["target_environment"]
 
         if migr_resources:
-            # TODO (aznashwan): add 'target_env' param to call:
             provider.delete_replica_target_resources(
-                ctxt, connection_info, migr_resources)
+                ctxt, connection_info, target_environment, migr_resources)
 
         return {
             "target_resources": None,
             "target_resources_connection_info": None}
 
 
-class DeployReplicaInstanceTask(base.TaskRunner):
+class DeployReplicaInstanceResourcesTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
@@ -495,7 +535,7 @@ class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["instance_deployment_info"]
+        return ["target_environment", "instance_deployment_info"]
 
     @property
     def returned_task_info_properties(self):
@@ -507,10 +547,12 @@ class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
+        target_environment = task_info["target_environment"]
         instance_deployment_info = task_info["instance_deployment_info"]
 
         result = provider.finalize_replica_instance_deployment(
-            ctxt, connection_info, instance_deployment_info)
+            ctxt, connection_info, target_environment,
+            instance_deployment_info)
         if result is None:
             LOG.warn(
                 "'None' was returned as result for Finalize Replica Instance "
@@ -524,7 +566,7 @@ class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["instance_deployment_info"]
+        return ["target_environment", "instance_deployment_info"]
 
     @property
     def returned_task_info_properties(self):
@@ -536,11 +578,12 @@ class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
-        instance_deployment_info = task_info.get(
-            "instance_deployment_info", {})
+        target_environment = task_info["target_environment"]
+        instance_deployment_info = task_info["instance_deployment_info"]
 
         provider.cleanup_failed_replica_instance_deployment(
-            ctxt, connection_info, instance_deployment_info)
+            ctxt, connection_info, target_environment,
+            instance_deployment_info)
 
         return {
             "instance_deployment_info": None}
@@ -550,7 +593,7 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["export_info", "volumes_info"]
+        return ["target_environment", "export_info", "volumes_info"]
 
     @property
     def returned_task_info_properties(self):
@@ -563,11 +606,12 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
         export_info = task_info['export_info']
+        target_environment = task_info["target_environment"]
 
         volumes_info = _get_volumes_info(task_info)
 
         volumes_info = provider.create_replica_disk_snapshots(
-            ctxt, connection_info, volumes_info)
+            ctxt, connection_info, target_environment, volumes_info)
         schemas.validate_value(
             volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
@@ -578,11 +622,11 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
             "volumes_info": volumes_info}
 
 
-class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
+class DeleteReplicaTargetDiskSnapshotsTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["export_info", "volumes_info"]
+        return ["target_environment", "export_info", "volumes_info"]
 
     @property
     def returned_task_info_properties(self):
@@ -597,9 +641,10 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
         connection_info = base.get_connection_info(ctxt, destination)
 
         volumes_info = _get_volumes_info(task_info)
+        target_environment = task_info["target_environment"]
 
-        volumes_info = provider.delete_replica_disk_snapshots(
-            ctxt, connection_info, volumes_info)
+        volumes_info = provider.delete_replica_target_disk_snapshots(
+            ctxt, connection_info, target_environment, volumes_info)
         schemas.validate_value(
             volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
@@ -614,7 +659,7 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
 
     @property
     def required_task_info_properties(self):
-        return ["export_info", "volumes_info"]
+        return ["target_environment", "export_info", "volumes_info"]
 
     @property
     def returned_task_info_properties(self):
@@ -627,11 +672,12 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
         export_info = task_info['export_info']
+        target_environment = task_info["target_environment"]
 
         volumes_info = _get_volumes_info(task_info)
 
         volumes_info = provider.restore_replica_disk_snapshots(
-            ctxt, connection_info, volumes_info)
+            ctxt, connection_info, target_environment, volumes_info)
         schemas.validate_value(
             volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)