Răsfoiți Sursa

Adds replica API

Alessandro Pilotti 9 ani în urmă
părinte
comite
cac2106a08
37 a modificat fișierele cu 1613 adăugiri și 205 ștergeri
  1. 1 0
      .gitignore
  2. 19 6
      coriolis/api/v1/migrations.py
  3. 30 0
      coriolis/api/v1/replica_actions.py
  4. 29 0
      coriolis/api/v1/replica_tasks_execution_actions.py
  5. 58 0
      coriolis/api/v1/replica_tasks_executions.py
  6. 79 0
      coriolis/api/v1/replicas.py
  7. 38 0
      coriolis/api/v1/router.py
  8. 10 1
      coriolis/api/v1/views/migration_view.py
  9. 24 0
      coriolis/api/v1/views/replica_tasks_execution_view.py
  10. 24 0
      coriolis/api/v1/views/replica_view.py
  11. 56 1
      coriolis/conductor/rpc/client.py
  12. 270 48
      coriolis/conductor/rpc/server.py
  13. 17 3
      coriolis/constants.py
  14. 149 19
      coriolis/db/api.py
  15. 50 6
      coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py
  16. 64 9
      coriolis/db/sqlalchemy/models.py
  17. 8 0
      coriolis/exception.py
  18. 0 0
      coriolis/migrations/__init__.py
  19. 6 2
      coriolis/migrations/api.py
  20. 16 6
      coriolis/providers/base.py
  21. 178 34
      coriolis/providers/openstack/__init__.py
  22. 6 0
      coriolis/providers/openstack/schemas/connection_info_schema.json
  23. 1 4
      coriolis/providers/openstack/schemas/target_environment_schema.json
  24. 46 6
      coriolis/providers/vmware_vsphere/__init__.py
  25. 0 0
      coriolis/replica_tasks_executions/__init__.py
  26. 29 0
      coriolis/replica_tasks_executions/api.py
  27. 0 0
      coriolis/replicas/__init__.py
  28. 25 0
      coriolis/replicas/api.py
  29. 1 1
      coriolis/schemas.py
  30. 0 0
      coriolis/tasks/__init__.py
  31. 26 0
      coriolis/tasks/base.py
  32. 42 0
      coriolis/tasks/factory.py
  33. 47 0
      coriolis/tasks/migration_tasks.py
  34. 201 0
      coriolis/tasks/replica_tasks.py
  35. 29 0
      coriolis/utils.py
  36. 31 59
      coriolis/worker/rpc/server.py
  37. 3 0
      resources/makefile

+ 1 - 0
.gitignore

@@ -37,3 +37,4 @@ nosetests.xml
 nova/tests/cover/*
 nova/vcsversion.py
 tools/conf/nova.conf*
+resources/write_data

+ 19 - 6
coriolis/api/v1/migrations.py

@@ -34,9 +34,7 @@ class MigrationController(api_wsgi.Controller):
             req, self._migration_api.get_migrations(
                 req.environ['coriolis.context'], include_tasks=True))
 
-    def _validate_create_body(self, body):
-        migration = body["migration"]
-
+    def _validate_migration_input(self, migration):
         origin = migration["origin"]
         destination = migration["destination"]
 
@@ -61,9 +59,24 @@ class MigrationController(api_wsgi.Controller):
         return origin, destination, migration["instances"]
 
     def create(self, req, body):
-        origin, destination, instances = self._validate_create_body(body)
-        return migration_view.single(req, self._migration_api.start(
-            req.environ['coriolis.context'], origin, destination, instances))
+        # TODO: validate body
+
+        migration_body = body["migration"]
+        context = req.environ['coriolis.context']
+
+        replica_id = migration_body.get("replica_id")
+        if replica_id:
+            forced = migration_body.get("forced", False)
+
+            migration = self._migration_api.deploy_replica_instances(
+                context, replica_id, forced)
+        else:
+            origin, destination, instances = self._validate_migration_input(
+                migration_body)
+            migration = self._migration_api.migrate_instances(
+                context, origin, destination, instances)
+
+        return migration_view.single(req, migration)
 
     def delete(self, req, id):
         try:

+ 30 - 0
coriolis/api/v1/replica_actions.py

@@ -0,0 +1,30 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis.api.v1.views import replica_tasks_execution_view
+from coriolis.api import wsgi as api_wsgi
+from coriolis import exception
+from coriolis.replicas import api
+
+
+class ReplicaActionsController(api_wsgi.Controller):
+    def __init__(self):
+        self._replica_api = api.API()
+        super(ReplicaActionsController, self).__init__()
+
+    @api_wsgi.action('delete-disks')
+    def _delete_disks(self, req, id, body):
+        try:
+            return replica_tasks_execution_view.single(
+                req, self._replica_api.delete_disks(
+                    req.environ['coriolis.context'], id))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(ReplicaActionsController())

+ 29 - 0
coriolis/api/v1/replica_tasks_execution_actions.py

@@ -0,0 +1,29 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis.api import wsgi as api_wsgi
+from coriolis import exception
+from coriolis.replica_tasks_executions import api
+
+
+class ReplicaTasksExecutionActionsController(api_wsgi.Controller):
+    def __init__(self):
+        self._replica_tasks_execution_api = api.API()
+        super(ReplicaTasksExecutionActionsController, self).__init__()
+
+    @api_wsgi.action('cancel')
+    def _cancel(self, req, replica_id, id, body):
+        try:
+            self._replica_tasks_execution_api.cancel(
+                req.environ['coriolis.context'], id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(ReplicaTasksExecutionActionsController())

+ 58 - 0
coriolis/api/v1/replica_tasks_executions.py

@@ -0,0 +1,58 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis.api import wsgi as api_wsgi
+from coriolis.api.v1.views import replica_tasks_execution_view
+from coriolis import exception
+from coriolis.replica_tasks_executions import api
+
+
+class ReplicaTasksExecutionController(api_wsgi.Controller):
+    def __init__(self):
+        self._replica_tasks_execution_api = api.API()
+        super(ReplicaTasksExecutionController, self).__init__()
+
+    def show(self, req, replica_id, id):
+        execution = self._replica_tasks_execution_api.get_execution(
+            req.environ["coriolis.context"], id)
+        if not execution:
+            raise exc.HTTPNotFound()
+
+        return replica_tasks_execution_view.single(req, execution)
+
+    def index(self, req, replica_id):
+        return replica_tasks_execution_view.collection(
+            req, self._replica_tasks_execution_api.get_executions(
+                req.environ['coriolis.context'], replica_id,
+                include_tasks=False))
+
+    def detail(self, req, replica_id):
+        return replica_tasks_execution_view.collection(
+            req, self._replica_tasks_execution_api.get_executions(
+                req.environ['coriolis.context'], replica_id,
+                include_tasks=True))
+
+    def create(self, req, replica_id, body):
+        # TODO: validate body
+
+        execution_body = body.get("execution", {})
+        shutdown_instances = execution_body.get("shutdown_instances", False)
+
+        return replica_tasks_execution_view.single(
+            req, self._replica_tasks_execution_api.create(
+                req.environ['coriolis.context'], replica_id,
+                shutdown_instances))
+
+    def delete(self, req, replica_id, id):
+        try:
+            self._replica_tasks_execution_api.delete(
+                req.environ['coriolis.context'], id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(ReplicaTasksExecutionController())

+ 79 - 0
coriolis/api/v1/replicas.py

@@ -0,0 +1,79 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis.api import wsgi as api_wsgi
+from coriolis.api.v1.views import replica_view
+from coriolis import constants
+from coriolis import exception
+from coriolis.replicas import api
+from coriolis.providers import factory
+
+
+class ReplicaController(api_wsgi.Controller):
+    def __init__(self):
+        self._replica_api = api.API()
+        super(ReplicaController, self).__init__()
+
+    def show(self, req, id):
+        replica = self._replica_api.get_replica(
+            req.environ["coriolis.context"], id)
+        if not replica:
+            raise exc.HTTPNotFound()
+
+        return replica_view.single(req, replica)
+
+    def index(self, req):
+        return replica_view.collection(
+            req, self._replica_api.get_replicas(
+                req.environ['coriolis.context'],
+                include_tasks_executions=False))
+
+    def detail(self, req):
+        return replica_view.collection(
+            req, self._replica_api.get_replicas(
+                req.environ['coriolis.context'],
+                include_tasks_executions=True))
+
+    def _validate_create_body(self, body):
+        replica = body["replica"]
+
+        origin = replica["origin"]
+        destination = replica["destination"]
+
+        export_provider = factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, None)
+        if not export_provider.validate_connection_info(
+                origin.get("connection_info", {})):
+            # TODO: use a decent exception
+            raise exception.CoriolisException("Invalid connection info")
+
+        import_provider = factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, None)
+        if not import_provider.validate_connection_info(
+                destination.get("connection_info", {})):
+            # TODO: use a decent exception
+            raise exception.CoriolisException("Invalid connection info")
+
+        if not import_provider.validate_target_environment(
+                destination.get("target_environment", {})):
+            raise exception.CoriolisException("Invalid target environment")
+
+        return origin, destination, replica["instances"]
+
+    def create(self, req, body):
+        origin, destination, instances = self._validate_create_body(body)
+        return replica_view.single(req, self._replica_api.create(
+            req.environ['coriolis.context'], origin, destination, instances))
+
+    def delete(self, req, id):
+        try:
+            self._replica_api.delete(req.environ['coriolis.context'], id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(ReplicaController())

+ 38 - 0
coriolis/api/v1/router.py

@@ -6,6 +6,10 @@ from oslo_log import log as logging
 from coriolis import api
 from coriolis.api.v1 import migrations
 from coriolis.api.v1 import migration_actions
+from coriolis.api.v1 import replica_actions
+from coriolis.api.v1 import replica_tasks_executions
+from coriolis.api.v1 import replica_tasks_execution_actions
+from coriolis.api.v1 import replicas
 
 LOG = logging.getLogger(__name__)
 
@@ -38,3 +42,37 @@ class APIRouter(api.APIRouter):
                        controller=self.resources['migration_actions'],
                        action='action',
                        conditions={'method': 'POST'})
+
+        self.resources['replicas'] = replicas.create_resource()
+        mapper.resource('replica', 'replicas',
+                        controller=self.resources['replicas'],
+                        collection={'detail': 'GET'},
+                        member={'action': 'POST'})
+
+        replica_actions_resource = replica_actions.create_resource()
+        self.resources['replica_actions'] = replica_actions_resource
+        migration_path = '/{project_id}/replicas/{id}'
+        mapper.connect('replica_actions',
+                       migration_path + '/actions',
+                       controller=self.resources['replica_actions'],
+                       action='action',
+                       conditions={'method': 'POST'})
+
+        self.resources['replica_tasks_executions'] = \
+            replica_tasks_executions.create_resource()
+        mapper.resource('execution', 'replicas/{replica_id}/executions',
+                        controller=self.resources['replica_tasks_executions'],
+                        collection={'detail': 'GET'},
+                        member={'action': 'POST'})
+
+        replica_tasks_execution_actions_resource = \
+            replica_tasks_execution_actions.create_resource()
+        self.resources['replica_tasks_execution_actions'] = \
+            replica_tasks_execution_actions_resource
+        migration_path = '/{project_id}/replicas/{replica_id}/executions/{id}'
+        mapper.connect('replica_tasks_execution_actions',
+                       migration_path + '/actions',
+                       controller=self.resources[
+                           'replica_tasks_execution_actions'],
+                       action='action',
+                       conditions={'method': 'POST'})

+ 10 - 1
coriolis/api/v1/views/migration_view.py

@@ -10,9 +10,18 @@ def _format_migration(req, migration, keys=None):
             return
         yield (key, value)
 
-    return dict(itertools.chain.from_iterable(
+    migration_dict = dict(itertools.chain.from_iterable(
         transform(k, v) for k, v in migration.items()))
 
+    # Migrations have a single tasks execution
+    execution = migration_dict["executions"][0]
+    migration_dict["status"] = execution["status"]
+    tasks = execution.get("tasks")
+    if tasks:
+        migration_dict["tasks"] = tasks
+    del migration_dict["executions"]
+    return migration_dict
+
 
 def single(req, migration):
     return {"migration": _format_migration(req, migration)}

+ 24 - 0
coriolis/api/v1/views/replica_tasks_execution_view.py

@@ -0,0 +1,24 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_replica_tasks_execution(req, execution, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    return dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in execution.items()))
+
+
+def single(req, execution):
+    return {"execution": _format_replica_tasks_execution(req, execution)}
+
+
+def collection(req, executions):
+    formatted_executions = [_format_replica_tasks_execution(req, m)
+                            for m in executions]
+    return {'executions': formatted_executions}

+ 24 - 0
coriolis/api/v1/views/replica_view.py

@@ -0,0 +1,24 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_replica(req, replica, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    return dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in replica.items()))
+
+
+def single(req, replica):
+    return {"replica": _format_replica(req, replica)}
+
+
+def collection(req, replicas):
+    formatted_replicas = [_format_replica(req, m)
+                          for m in replicas]
+    return {'replicas': formatted_replicas}

+ 56 - 1
coriolis/conductor/rpc/client.py

@@ -13,6 +13,56 @@ class ConductorClient(object):
         target = messaging.Target(topic='coriolis_conductor', version=VERSION)
         self._client = rpc.get_client(target)
 
+    def execute_replica_tasks(self, ctxt, replica_id,
+                              shutdown_instances=False):
+        return self._client.call(
+            ctxt, 'execute_replica_tasks', replica_id=replica_id,
+            shutdown_instances=shutdown_instances)
+
+    def get_replica_tasks_executions(self, ctxt, replica_id,
+                                     include_tasks=False):
+        return self._client.call(
+            ctxt, 'get_replica_tasks_executions',
+            replica_id=replica_id,
+            include_tasks=include_tasks)
+
+    def get_replica_tasks_execution(self, ctxt, execution_id):
+        return self._client.call(
+            ctxt, 'get_replica_tasks_execution',
+            execution_id=execution_id)
+
+    def delete_replica_tasks_execution(self, ctxt, execution_id):
+        return self._client.call(
+            ctxt, 'delete_replica_tasks_execution',
+            execution_id=execution_id)
+
+    def cancel_replica_tasks_execution(self, ctxt, execution_id):
+        return self._client.call(
+            ctxt, 'cancel_replica_tasks_execution',
+            execution_id=execution_id)
+
+    def create_instances_replica(self, ctxt, origin, destination, instances):
+        return self._client.call(
+            ctxt, 'create_instances_replica', origin=origin,
+            destination=destination, instances=instances)
+
+    def get_replicas(self, ctxt, include_tasks_executions=False):
+        return self._client.call(
+            ctxt, 'get_replicas',
+            include_tasks_executions=include_tasks_executions)
+
+    def get_replica(self, ctxt, replica_id):
+        return self._client.call(
+            ctxt, 'get_replica', replica_id=replica_id)
+
+    def delete_replica(self, ctxt, replica_id):
+        self._client.call(
+            ctxt, 'delete_replica', replica_id=replica_id)
+
+    def delete_replica_disks(self, ctxt, replica_id):
+        return self._client.call(
+            ctxt, 'delete_replica_disks', replica_id=replica_id)
+
     def get_migrations(self, ctxt, include_tasks=False):
         return self._client.call(ctxt, 'get_migrations',
                                  include_tasks=include_tasks)
@@ -21,11 +71,16 @@ class ConductorClient(object):
         return self._client.call(
             ctxt, 'get_migration', migration_id=migration_id)
 
-    def begin_migrate_instances(self, ctxt, origin, destination, instances):
+    def migrate_instances(self, ctxt, origin, destination, instances):
         return self._client.call(
             ctxt, 'migrate_instances', origin=origin, destination=destination,
             instances=instances)
 
+    def deploy_replica_instances(self, ctxt, replica_id, forced=False):
+        return self._client.call(
+            ctxt, 'deploy_replica_instances', replica_id=replica_id,
+            forced=forced)
+
     def delete_migration(self, ctxt, migration_id):
         self._client.call(
             ctxt, 'delete_migration', migration_id=migration_id)

+ 270 - 48
coriolis/conductor/rpc/server.py

@@ -9,6 +9,7 @@ from coriolis import constants
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
 from coriolis import exception
+from coriolis import utils
 from coriolis.worker.rpc import client as rpc_worker_client
 
 VERSION = "1.0"
@@ -20,48 +21,258 @@ class ConductorServerEndpoint(object):
     def __init__(self):
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
 
+    @staticmethod
+    def _create_task(instance, task_type, execution, depends_on=None):
+        task = models.Task()
+        task.id = str(uuid.uuid4())
+        task.instance = instance
+        task.execution = execution
+        task.status = constants.TASK_STATUS_PENDING
+        task.task_type = task_type
+        task.depends_on = depends_on
+        return task
+
+    def _begin_tasks(self, ctxt, execution, task_info={}):
+        for task in execution.tasks:
+            if not task.depends_on:
+                self._rpc_worker_client.begin_task(
+                    ctxt, server=None,
+                    task_id=task.id,
+                    task_type=task.task_type,
+                    origin=execution.action.origin,
+                    destination=execution.action.destination,
+                    instance=task.instance,
+                    task_info=task_info.get(task.instance, {}))
+
+    def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
+        replica = self._get_replica(ctxt, replica_id)
+        self._check_running_executions(replica)
+        execution = models.TasksExecution()
+        execution.id = str(uuid.uuid4())
+        execution.status = constants.EXECUTION_STATUS_RUNNING
+        execution.action = replica
+
+        for instance in execution.action.instances:
+                depends_on = []
+                if shutdown_instances:
+                    shutdown_instance_task = self._create_task(
+                        instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
+                        execution)
+                    depends_on = [shutdown_instance_task.id]
+
+                get_instance_info_task = self._create_task(
+                    instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
+                    execution, depends_on=depends_on)
+
+                deploy_replica_disks_task = self._create_task(
+                    instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
+                    execution, depends_on=[get_instance_info_task.id])
+
+                deploy_replica_resources_task = self._create_task(
+                    instance, constants.TASK_TYPE_DEPLOY_REPLICA_RESOURCES,
+                    execution, depends_on=[deploy_replica_disks_task.id])
+
+                replicate_disks_task = self._create_task(
+                    instance, constants.TASK_TYPE_REPLICATE_DISKS,
+                    execution, depends_on=[deploy_replica_resources_task.id])
+
+                self._create_task(
+                    instance, constants.TASK_TYPE_DELETE_REPLICA_RESOURCES,
+                    execution, depends_on=[replicate_disks_task.id])
+
+        db_api.add_replica_tasks_execution(ctxt, execution)
+        LOG.info("Replica tasks execution created: %s", execution.id)
+
+        self._begin_tasks(ctxt, execution, replica.info)
+        return self.get_replica_tasks_execution(ctxt, execution.id)
+
+    def get_replica_tasks_executions(self, ctxt, replica_id,
+                                     include_tasks=False):
+        return db_api.get_replica_tasks_executions(
+            ctxt, replica_id, include_tasks)
+
+    def get_replica_tasks_execution(self, ctxt, execution_id):
+        return self._get_replica_tasks_execution(
+            ctxt, execution_id)
+
+    def delete_replica_tasks_execution(self, ctxt, execution_id):
+        execution = self._get_replica_tasks_execution(
+            ctxt, execution_id)
+        if execution.status == constants.EXECUTION_STATUS_RUNNING:
+            raise exception.InvalidMigrationState(
+                "Cannot delete a running replica tasks execution")
+        db_api.delete_replica_tasks_execution(ctxt, execution_id)
+
+    def cancel_replica_tasks_execution(self, ctxt, execution_id):
+        execution = self._get_replica_tasks_execution(
+            ctxt, execution_id)
+        if execution.status != constants.EXECUTION_STATUS_RUNNING:
+            raise exception.InvalidReplicaState(
+                "The replica tasks execution is not running")
+        self._cancel_tasks_execution(ctxt, execution)
+
+    def _get_replica_tasks_execution(self, ctxt, execution_id):
+        execution = db_api.get_replica_tasks_execution(
+            ctxt, execution_id)
+        if not execution:
+            raise exception.NotFound("Tasks execution not found")
+        return execution
+
+    def get_replicas(self, ctxt, include_tasks_executions=False):
+        return db_api.get_replicas(ctxt, include_tasks_executions)
+
+    def get_replica(self, ctxt, replica_id):
+        return self._get_replica(ctxt, replica_id)
+
+    def delete_replica(self, ctxt, replica_id):
+        replica = self._get_replica(ctxt, replica_id)
+        self._check_running_executions(replica)
+        db_api.delete_replica(ctxt, replica_id)
+
+    def delete_replica_disks(self, ctxt, replica_id):
+        replica = self._get_replica(ctxt, replica_id)
+        self._check_running_executions(replica)
+
+        execution = models.TasksExecution()
+        execution.id = str(uuid.uuid4())
+        execution.status = constants.EXECUTION_STATUS_RUNNING
+        execution.action = replica
+
+        has_tasks = False
+        for instance in replica.instances:
+            if (instance in replica.instances and
+                    "volumes_info" in replica.info[instance]):
+                self._create_task(
+                    instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS,
+                    execution)
+                has_tasks = True
+
+        if not has_tasks:
+            raise exception.InvalidReplicaState(
+                "This replica does not have volumes information for any "
+                "instance. Ensure that the replica has been executed "
+                "successfully priorly")
+
+        db_api.add_replica_tasks_execution(ctxt, execution)
+        LOG.info("Replica tasks execution created: %s", execution.id)
+
+        self._begin_tasks(ctxt, execution, replica.info)
+        return self.get_replica_tasks_execution(ctxt, execution.id)
+
+    def create_instances_replica(self, ctxt, origin, destination, instances):
+        replica = models.Replica()
+        replica.id = str(uuid.uuid4())
+        replica.origin = origin
+        replica.destination = destination
+        replica.instances = instances
+        replica.executions = []
+        replica.info = {}
+
+        db_api.add_replica(ctxt, replica)
+        LOG.info("Replica created: %s", replica.id)
+        return self.get_replica(ctxt, replica.id)
+
+    def _get_replica(self, ctxt, replica_id):
+        replica = db_api.get_replica(ctxt, replica_id)
+        if not replica:
+            raise exception.NotFound("Replica not found")
+        return replica
+
     def get_migrations(self, ctxt, include_tasks):
         return db_api.get_migrations(ctxt, include_tasks)
 
     def get_migration(self, ctxt, migration_id):
-        return self._get_migration(ctxt, migration_id)
+        # the default serialization mechanism enforces a max_depth of 3
+        return utils.to_dict(self._get_migration(ctxt, migration_id))
+
+    def _check_running_executions(self, action):
+        if [e for e in action.executions
+                if e.status == constants.EXECUTION_STATUS_RUNNING]:
+            raise exception.InvalidActionTasksExecutionState(
+                "Another tasks execution is in progress")
+
+    @staticmethod
+    def _check_valid_replica_tasks_execution(replica, forced=False):
+        sorted_executions = sorted(
+            replica.executions, key=lambda e: e.number, reverse=True)
+
+        if (forced and sorted_executions[0].status !=
+                constants.EXECUTION_STATUS_COMPLETED):
+            raise exception.InvalidReplicaState(
+                "The last replica tasks execution was not successful. "
+                "Perform a forced migration if you wish to perform a "
+                "migration without a successful last replica execution")
+        elif not [e for e in sorted_executions
+                  if e.status == constants.EXECUTION_STATUS_COMPLETED]:
+            raise exception.InvalidReplicaState(
+                "A replica must have been executed succesfully in order "
+                "to be migrated")
+
+    def deploy_replica_instances(self, ctxt, replica_id, forced):
+        replica = self._get_replica(ctxt, replica_id)
+        self._check_running_executions(replica)
+        self._check_valid_replica_tasks_execution(replica, forced)
+
+        instances = replica.instances
+
+        migration = models.Migration()
+        migration.id = str(uuid.uuid4())
+        migration.origin = replica.origin
+        migration.destination = replica.destination
+        migration.instances = instances
+        migration.replica = replica
+        migration.info = replica.info
+
+        execution = models.TasksExecution()
+        migration.executions = [execution]
+        execution.status = constants.EXECUTION_STATUS_RUNNING
+        execution.number = 1
+
+        for instance in instances:
+            create_snapshot_task = self._create_task(
+                instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS,
+                execution)
+
+            deploy_replica_task = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
+                execution, [create_snapshot_task.id])
+
+            self._create_task(
+                instance, constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS,
+                execution, [deploy_replica_task.id])
+
+        db_api.add_migration(ctxt, migration)
+        LOG.info("Migration created: %s", migration.id)
+
+        self._begin_tasks(ctxt, execution, migration.info)
+
+        return self.get_migration(ctxt, migration.id)
 
     def migrate_instances(self, ctxt, origin, destination, instances):
         migration = models.Migration()
         migration.id = str(uuid.uuid4())
-        migration.status = constants.MIGRATION_STATUS_RUNNING
         migration.origin = origin
         migration.destination = destination
+        execution = models.TasksExecution()
+        execution.status = constants.EXECUTION_STATUS_RUNNING
+        execution.number = 1
+        migration.executions = [execution]
+        migration.instances = instances
+        migration.info = {}
 
         for instance in instances:
-            task_export = models.Task()
-            task_export.id = str(uuid.uuid4())
-            task_export.migration = migration
-            task_export.instance = instance
-            task_export.status = constants.TASK_STATUS_PENDING
-            task_export.task_type = constants.TASK_TYPE_EXPORT_INSTANCE
-
-            task_import = models.Task()
-            task_import.id = str(uuid.uuid4())
-            task_import.migration = migration
-            task_import.instance = instance
-            task_import.status = constants.TASK_STATUS_PENDING
-            task_import.task_type = constants.TASK_TYPE_IMPORT_INSTANCE
-            task_import.depends_on = [task_export.id]
+
+            task_export = self._create_task(
+                instance, constants.TASK_TYPE_EXPORT_INSTANCE, execution)
+
+            self._create_task(
+                instance, constants.TASK_TYPE_IMPORT_INSTANCE,
+                execution, depends_on=[task_export.id])
 
         db_api.add_migration(ctxt, migration)
         LOG.info("Migration created: %s", migration.id)
 
-        for task in migration.tasks:
-            if not task.depends_on:
-                self._rpc_worker_client.begin_task(
-                    ctxt, server=None,
-                    task_id=task.id,
-                    task_type=task.task_type,
-                    origin=migration.origin,
-                    destination=migration.destination,
-                    instance=task.instance,
-                    task_info=None)
+        self._begin_tasks(ctxt, execution)
 
         return self.get_migration(ctxt, migration.id)
 
@@ -73,18 +284,23 @@ class ConductorServerEndpoint(object):
 
     def delete_migration(self, ctxt, migration_id):
         migration = self._get_migration(ctxt, migration_id)
-        if migration.status == constants.MIGRATION_STATUS_RUNNING:
+        execution = migration.executions[0]
+        if execution.status == constants.EXECUTION_STATUS_RUNNING:
             raise exception.InvalidMigrationState(
                 "Cannot delete a running migration")
         db_api.delete_migration(ctxt, migration_id)
 
     def cancel_migration(self, ctxt, migration_id):
         migration = self._get_migration(ctxt, migration_id)
-        if migration.status != constants.MIGRATION_STATUS_RUNNING:
+        execution = migration.executions[0]
+        if execution.status != constants.EXECUTION_STATUS_RUNNING:
             raise exception.InvalidMigrationState(
                 "The migration is not running")
+        execution = migration.executions[0]
+        self._cancel_tasks_execution(ctxt, execution)
 
-        for task in migration.tasks:
+    def _cancel_tasks_execution(self, ctxt, execution):
+        for task in execution.tasks:
             if task.status in [constants.TASK_STATUS_PENDING,
                                constants.TASK_STATUS_RUNNING]:
                 if task.status == constants.TASK_STATUS_RUNNING:
@@ -93,31 +309,32 @@ class ConductorServerEndpoint(object):
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
-        db_api.set_migration_status(
-            ctxt, migration_id, constants.MIGRATION_STATUS_ERROR)
+        db_api.set_execution_status(
+            ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
     def set_task_host(self, ctxt, task_id, host, process_id):
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
 
-    def _start_pending_tasks(self, ctxt, migration, parent_task, task_info):
+    def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
         has_pending_tasks = False
-        for task in migration.tasks:
+        for task in execution.tasks:
             if (task.depends_on and parent_task.id in task.depends_on and
                     task.status == constants.TASK_STATUS_PENDING):
                 has_pending_tasks = True
-                # instance imports needs to be executed on the same host
+                # instance imports need to be executed on the same host
                 server = None
                 if task.task_type == constants.TASK_TYPE_IMPORT_INSTANCE:
                     server = parent_task.host
 
+                action = execution.action
                 self._rpc_worker_client.begin_task(
                     ctxt, server=server,
                     task_id=task.id,
                     task_type=task.task_type,
-                    origin=migration.origin,
-                    destination=migration.destination,
+                    origin=action.origin,
+                    destination=action.destination,
                     instance=task.instance,
                     task_info=task_info)
         return has_pending_tasks
@@ -129,16 +346,21 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
 
         task = db_api.get_task(
-            ctxt, task_id, include_migration_tasks=True)
+            ctxt, task_id, include_execution_tasks=True)
 
-        migration = task.migration
-        has_pending_tasks = self._start_pending_tasks(ctxt, migration, task,
+        execution = task.execution
+        has_pending_tasks = self._start_pending_tasks(ctxt, execution, task,
                                                       task_info)
 
+        LOG.info("Setting instance %(instance)s action info: %(task_info)s",
+                 {"instance": task.instance, "task_info": task_info})
+        db_api.set_transfer_action_info(
+            ctxt, execution.action_id, task.instance, task_info)
+
         if not has_pending_tasks:
-            LOG.info("Migration completed: %s", migration.id)
-            db_api.set_migration_status(
-                ctxt, migration.id, constants.MIGRATION_STATUS_COMPLETED)
+            LOG.info("Tasks execution completed: %s", execution.id)
+            db_api.set_execution_status(
+                ctxt, execution.id, constants.EXECUTION_STATUS_COMPLETED)
 
     def set_task_error(self, ctxt, task_id, exception_details):
         LOG.error("Task error: %(task_id)s - %(ex)s",
@@ -148,17 +370,17 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, constants.TASK_STATUS_ERROR, exception_details)
 
         task = db_api.get_task(
-            ctxt, task_id, include_migration_tasks=True)
-        migration = task.migration
+            ctxt, task_id, include_execution_tasks=True)
+        execution = task.execution
 
-        for task in migration.tasks:
+        for task in execution.tasks:
             if task.status == constants.TASK_STATUS_PENDING:
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
-        LOG.error("Migration failed: %s", migration.id)
-        db_api.set_migration_status(
-            ctxt, migration.id, constants.MIGRATION_STATUS_ERROR)
+        LOG.error("Tasks execution failed: %s", execution.id)
+        db_api.set_execution_status(
+            ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
     def task_event(self, ctxt, task_id, level, message):
         LOG.info("Task event: %s", task_id)

+ 17 - 3
coriolis/constants.py

@@ -1,9 +1,9 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
-MIGRATION_STATUS_RUNNING = "RUNNING"
-MIGRATION_STATUS_COMPLETED = "COMPLETED"
-MIGRATION_STATUS_ERROR = "ERROR"
+EXECUTION_STATUS_RUNNING = "RUNNING"
+EXECUTION_STATUS_COMPLETED = "COMPLETED"
+EXECUTION_STATUS_ERROR = "ERROR"
 
 TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_RUNNING = "RUNNING"
@@ -14,6 +14,18 @@ TASK_STATUS_CANCELED = "CANCELED"
 TASK_TYPE_EXPORT_INSTANCE = "EXPORT_INSTANCE"
 TASK_TYPE_IMPORT_INSTANCE = "IMPORT_INSTANCE"
 
+TASK_TYPE_GET_INSTANCE_INFO = "GET_INSTANCE_INFO"
+TASK_TYPE_DEPLOY_REPLICA_DISKS = "DEPLOY_REPLICA_DISKS"
+TASK_TYPE_DELETE_REPLICA_DISKS = "DELETE_REPLICA_DISKS"
+TASK_TYPE_REPLICATE_DISKS = "REPLICATE_DISKS"
+TASK_TYPE_DEPLOY_REPLICA_RESOURCES = "DEPLOY_REPLICA_RESOURCES"
+TASK_TYPE_DELETE_REPLICA_RESOURCES = "DELETE_REPLICA_RESOURCES"
+TASK_TYPE_SHUTDOWN_INSTANCE = "SHUTDOWN_INSTANCE"
+TASK_TYPE_DEPLOY_REPLICA_INSTANCE = "DEPLOY_REPLICA_INSTANCE"
+TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS = "CREATE_REPLICA_DISK_SNAPSHOTS"
+TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS = "DELETE_REPLICA_DISK_SNAPSHOTS"
+
+
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2
 
@@ -46,3 +58,5 @@ OS_TYPE_LINUX = "linux"
 OS_TYPE_OS_X = "osx"
 OS_TYPE_SOLARIS = "solaris"
 OS_TYPE_WINDOWS = "windows"
+
+TMP_DIRS_KEY = "__tmp_dirs"

+ 149 - 19
coriolis/db/api.py

@@ -5,6 +5,7 @@ from oslo_config import cfg
 from oslo_db import api as db_api
 from oslo_db import options as db_options
 from oslo_db.sqlalchemy import enginefacade
+from sqlalchemy import func
 from sqlalchemy import orm
 
 from coriolis.db.sqlalchemy import models
@@ -57,18 +58,124 @@ def _soft_delete_aware_query(context, *args, **kwargs):
     return query
 
 
+@enginefacade.reader
+def get_replica_tasks_executions(context, replica_id, include_tasks=False):
+    q = _soft_delete_aware_query(context, models.TasksExecution)
+    q = q.join(models.Replica)
+    if include_tasks:
+        q = _get_tasks_with_details_options(q)
+    return q.filter(
+        models.Replica.project_id == context.tenant,
+        models.Replica.id == replica_id).all()
+
+
+@enginefacade.reader
+def get_replica_tasks_execution(context, execution_id):
+    q = _soft_delete_aware_query(context, models.TasksExecution).join(
+        models.Replica)
+    q = _get_tasks_with_details_options(q)
+    return q.filter(
+        models.Replica.project_id == context.tenant,
+        models.TasksExecution.id == execution_id).first()
+
+
+@enginefacade.writer
+def add_replica_tasks_execution(context, execution):
+    if execution.action.project_id != context.tenant:
+        raise exception.NotAuthorized()
+
+    # include deleted records
+    max_number = _model_query(
+        context, func.max(models.TasksExecution.number)).filter_by(
+            action_id=execution.action.id).first()[0] or 0
+    execution.number = max_number + 1
+
+    context.session.add(execution)
+
+
+@enginefacade.writer
+def delete_replica_tasks_execution(context, execution_id):
+    q = _soft_delete_aware_query(context, models.TasksExecution).filter(
+        models.TasksExecution.id == execution_id)
+    if not q.join(models.Replica).filter(
+            models.Replica.project_id == context.tenant).first():
+        raise exception.NotAuthorized()
+    count = q.soft_delete()
+    if count == 0:
+        raise exception.NotFound("0 entries were soft deleted")
+
+
+def _get_replica_with_tasks_executions_options(q):
+    return q.options(orm.joinedload(models.Replica.executions))
+
+
+@enginefacade.reader
+def get_replicas(context, include_tasks_executions=False):
+    q = _soft_delete_aware_query(context, models.Replica)
+    if include_tasks_executions:
+        q = _get_replica_with_tasks_executions_options(q)
+    return q.filter(
+        models.Replica.project_id == context.tenant).all()
+
+
+@enginefacade.reader
+def get_replica(context, replica_id):
+    q = _soft_delete_aware_query(context, models.Replica)
+    q = _get_replica_with_tasks_executions_options(q)
+    return q.filter(
+        models.Replica.project_id == context.tenant,
+        models.Replica.id == replica_id).first()
+
+
+@enginefacade.writer
+def add_replica(context, replica):
+    replica.user_id = context.user
+    replica.project_id = context.tenant
+    context.session.add(replica)
+
+
+@enginefacade.writer
+def _delete_transfer_action(context, cls, id):
+    count = _soft_delete_aware_query(context, cls).filter_by(
+        project_id=context.tenant, base_id=id).soft_delete()
+    if count == 0:
+        raise exception.NotFound("0 entries were soft deleted")
+
+    _soft_delete_aware_query(context, models.TasksExecution).filter_by(
+        action_id=id).soft_delete()
+
+
+@enginefacade.writer
+def delete_replica(context, replica_id):
+    _delete_transfer_action(context, models.Replica, replica_id)
+
+
 @enginefacade.reader
 def get_migrations(context, include_tasks=False):
     q = _soft_delete_aware_query(context, models.Migration)
     if include_tasks:
         q = _get_migration_task_query_options(q)
+    else:
+        q = q.options(orm.joinedload("executions"))
     return q.filter_by(project_id=context.tenant).all()
 
 
+def _get_tasks_with_details_options(query):
+    return query.options(
+        orm.joinedload("tasks").
+        joinedload("progress_updates")).options(
+            orm.joinedload("tasks").
+            joinedload("events"))
+
+
 def _get_migration_task_query_options(query):
     return query.options(
-        orm.joinedload("tasks").joinedload("progress_updates")).options(
-            orm.joinedload("tasks").joinedload("events"))
+        orm.joinedload("executions").
+        joinedload("tasks").
+        joinedload("progress_updates")).options(
+            orm.joinedload("executions").
+            joinedload("tasks").
+            joinedload("events"))
 
 
 @enginefacade.reader
@@ -87,20 +194,43 @@ def add_migration(context, migration):
 
 @enginefacade.writer
 def delete_migration(context, migration_id):
-    count = _soft_delete_aware_query(context, models.Migration).filter_by(
-        project_id=context.tenant, id=migration_id).soft_delete()
-    if count == 0:
-        raise exception.NotFound("0 entries were soft deleted")
+    _delete_transfer_action(context, models.Migration, migration_id)
+
+
+@enginefacade.writer
+def set_execution_status(context, execution_id, status):
+    execution = _soft_delete_aware_query(
+        context, models.TasksExecution).join(
+            models.TasksExecution.action).filter(
+                models.BaseTransferAction.project_id == context.tenant,
+                models.TasksExecution.id == execution_id).first()
+    if not execution:
+        raise exception.NotFound(
+            "Tasks execution not found: %s" % execution_id)
+
+    execution.status = status
 
 
 @enginefacade.writer
-def set_migration_status(context, migration_id, status):
-    migration = _soft_delete_aware_query(context, models.Migration).filter_by(
-        project_id=context.tenant, id=migration_id).first()
-    if not migration:
-        raise exception.NotFound("Migration not found: %s" % migration_id)
+def set_transfer_action_info(context, action_id, instance, instance_info):
+    action = _soft_delete_aware_query(
+        context, models.BaseTransferAction).filter(
+            models.BaseTransferAction.project_id == context.tenant,
+            models.BaseTransferAction.base_id == action_id).first()
+    if not action:
+        raise exception.NotFound(
+            "Transfer action not found: %s" % action_id)
+
+    # Copy is needed, otherwise sqlalchemy won't save the changes
+    action_info = action.info.copy()
+    action_info[instance] = instance_info
+    action.info = action_info
 
-    migration.status = status
+
+@enginefacade.reader
+def get_tasks_execution(context, execution_id):
+    q = _soft_delete_aware_query(context, models.TasksExecution)
+    return q.filter_by(project_id=context.tenant, id=execution_id).first()
 
 
 def _get_task(context, task_id):
@@ -126,13 +256,13 @@ def set_task_host(context, task_id, host, process_id):
 
 
 @enginefacade.reader
-def get_task(context, task_id, include_migration_tasks=False):
-    join_options = orm.joinedload("migration")
-    if include_migration_tasks:
-        join_options = join_options.joinedload("tasks")
-
-    return _soft_delete_aware_query(context, models.Task).options(
-        join_options).filter_by(id=task_id).first()
+def get_task(context, task_id, include_execution_tasks=False):
+    q = _soft_delete_aware_query(context, models.Task)
+    if include_execution_tasks:
+        q = q.options(
+            orm.joinedload("execution").joinedload("action")).options(
+                orm.joinedload("execution").joinedload("tasks"))
+    return q.filter_by(id=task_id).first()
 
 
 @enginefacade.writer

+ 50 - 6
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -10,9 +10,9 @@ def upgrade(migrate_engine):
     meta = sqlalchemy.MetaData()
     meta.bind = migrate_engine
 
-    migration = sqlalchemy.Table(
-        'migration', meta,
-        sqlalchemy.Column("id", sqlalchemy.String(36), primary_key=True,
+    base_transfer_action = sqlalchemy.Table(
+        'base_transfer_action', meta,
+        sqlalchemy.Column("base_id", sqlalchemy.String(36), primary_key=True,
                           default=lambda: str(uuid.uuid4())),
         sqlalchemy.Column('created_at', sqlalchemy.DateTime),
         sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
@@ -24,7 +24,19 @@ def upgrade(migrate_engine):
         sqlalchemy.Column("origin", sqlalchemy.Text, nullable=False),
         sqlalchemy.Column("destination", sqlalchemy.Text,
                           nullable=False),
-        sqlalchemy.Column("status", sqlalchemy.String(100), nullable=False),
+        sqlalchemy.Column("instances", sqlalchemy.Text, nullable=False),
+        sqlalchemy.Column("type", sqlalchemy.String(50), nullable=False),
+        sqlalchemy.Column("info", sqlalchemy.Text, nullable=False),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'
+    )
+
+    migration = sqlalchemy.Table(
+        'migration', meta,
+        sqlalchemy.Column("id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey(
+                              'base_transfer_action.base_id'),
+                          primary_key=True),
         mysql_engine='InnoDB',
         mysql_charset='utf8'
     )
@@ -37,8 +49,9 @@ def upgrade(migrate_engine):
         sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
         sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
         sqlalchemy.Column('deleted', sqlalchemy.String(36)),
-        sqlalchemy.Column("migration_id", sqlalchemy.String(36),
-                          sqlalchemy.ForeignKey('migration.id'),
+        sqlalchemy.Column("execution_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey(
+                              'tasks_execution.id'),
                           nullable=False),
         sqlalchemy.Column("instance", sqlalchemy.String(1024), nullable=False),
         sqlalchemy.Column("host", sqlalchemy.String(1024), nullable=True),
@@ -52,6 +65,24 @@ def upgrade(migrate_engine):
         mysql_charset='utf8'
     )
 
+    tasks_execution = sqlalchemy.Table(
+        'tasks_execution', meta,
+        sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                          default=lambda: str(uuid.uuid4())),
+        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+        sqlalchemy.Column("action_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey(
+                              'base_transfer_action.base_id'),
+                          nullable=False),
+        sqlalchemy.Column("status", sqlalchemy.String(100), nullable=False),
+        sqlalchemy.Column("number", sqlalchemy.Integer, nullable=False),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'
+    )
+
     task_progress_update = sqlalchemy.Table(
         'task_progress_update', meta,
         sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
@@ -87,8 +118,21 @@ def upgrade(migrate_engine):
         mysql_charset='utf8'
     )
 
+    replica = sqlalchemy.Table(
+        'replica', meta,
+        sqlalchemy.Column("id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey(
+                              'base_transfer_action.base_id'),
+                          primary_key=True),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'
+    )
+
     tables = (
+        base_transfer_action,
         migration,
+        replica,
+        tasks_execution,
         task,
         task_progress_update,
         task_events,

+ 64 - 9
coriolis/db/sqlalchemy/models.py

@@ -49,9 +49,9 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     id = sqlalchemy.Column(sqlalchemy.String(36),
                            default=lambda: str(uuid.uuid4()),
                            primary_key=True)
-    migration_id = sqlalchemy.Column(sqlalchemy.String(36),
-                                     sqlalchemy.ForeignKey('migration.id'),
-                                     nullable=False)
+    execution_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('tasks_execution.id'), nullable=False)
     instance = sqlalchemy.Column(sqlalchemy.String(1024), nullable=False)
     host = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
     process_id = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
@@ -59,24 +59,79 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     depends_on = sqlalchemy.Column(types.List, nullable=True)
+    # TODO: Add soft delete filter
     events = orm.relationship(TaskEvent, cascade="all,delete",
                               backref=orm.backref('task'))
+    # TODO: Add soft delete filter
     progress_updates = orm.relationship(TaskProgressUpdate,
                                         cascade="all,delete",
                                         backref=orm.backref('task'))
 
 
-class Migration(BASE, models.TimestampMixin, models.ModelBase,
-                models.SoftDeleteMixin):
-    __tablename__ = 'migration'
+class TasksExecution(BASE, models.TimestampMixin, models.ModelBase,
+                     models.SoftDeleteMixin):
+    __tablename__ = 'tasks_execution'
 
     id = sqlalchemy.Column(sqlalchemy.String(36),
                            default=lambda: str(uuid.uuid4()),
                            primary_key=True)
+    action_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('base_transfer_action.base_id'), nullable=False)
+    # TODO: Add soft delete filter
+    tasks = orm.relationship(Task, cascade="all,delete",
+                             backref=orm.backref('execution'))
+    status = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
+    number = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
+
+
+class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
+                         models.SoftDeleteMixin):
+    __tablename__ = 'base_transfer_action'
+
+    base_id = sqlalchemy.Column(sqlalchemy.String(36),
+                                default=lambda: str(uuid.uuid4()),
+                                primary_key=True)
     user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     origin = sqlalchemy.Column(types.Json, nullable=False)
     destination = sqlalchemy.Column(types.Json, nullable=False)
-    status = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
-    tasks = orm.relationship(Task, cascade="all,delete",
-                             backref=orm.backref('migration'))
+    type = sqlalchemy.Column(sqlalchemy.String(50))
+    executions = orm.relationship(TasksExecution, cascade="all,delete",
+                                  backref=orm.backref('action'),
+                                  primaryjoin="and_(BaseTransferAction."
+                                  "base_id==TasksExecution.action_id, "
+                                  "TasksExecution.deleted=='0')")
+    instances = sqlalchemy.Column(types.List, nullable=False)
+    info = sqlalchemy.Column(types.Json, nullable=False)
+
+    __mapper_args__ = {
+        'polymorphic_identity': 'base_transfer_action',
+        'polymorphic_on': type,
+    }
+
+
+class Migration(BaseTransferAction):
+    __tablename__ = 'migration'
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey(
+            'base_transfer_action.base_id'), primary_key=True)
+
+    __mapper_args__ = {
+        'polymorphic_identity': 'migration',
+    }
+
+
+class Replica(BaseTransferAction):
+    __tablename__ = 'replica'
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey(
+            'base_transfer_action.base_id'), primary_key=True)
+
+    __mapper_args__ = {
+        'polymorphic_identity': 'replica',
+    }

+ 8 - 0
coriolis/exception.py

@@ -166,10 +166,18 @@ class InvalidConfigurationValue(Invalid):
                 'configuration option "%(option)s"')
 
 
+class InvalidActionTasksExecutionState(Invalid):
+    message = _("Invalid tasks execution state: %(reason)s")
+
+
 class InvalidMigrationState(Invalid):
     message = _("Invalid migration state: %(reason)s")
 
 
+class InvalidReplicaState(Invalid):
+    message = _("Invalid replica state: %(reason)s")
+
+
 class ServiceUnavailable(Invalid):
     message = _("Service is unavailable at this time.")
 

+ 0 - 0
coriolis/migrations/__init__.py


+ 6 - 2
coriolis/migrations/api.py

@@ -8,10 +8,14 @@ class API(object):
     def __init__(self):
         self._rpc_client = rpc_client.ConductorClient()
 
-    def start(self, ctxt, origin, destination, instances):
-        return self._rpc_client.begin_migrate_instances(
+    def migrate_instances(self, ctxt, origin, destination, instances):
+        return self._rpc_client.migrate_instances(
             ctxt, origin, destination, instances)
 
+    def deploy_replica_instances(self, ctxt, replica_id, forced=False):
+        return self._rpc_client.deploy_replica_instances(
+            ctxt, replica_id, forced)
+
     def delete(self, ctxt, migration_id):
         self._rpc_client.delete_migration(ctxt, migration_id)
 

+ 16 - 6
coriolis/providers/base.py

@@ -72,11 +72,6 @@ class BaseReplicaImportProvider(BaseImportProvider):
 
     @abc.abstractmethod
     def deploy_replica_disks(self, ctxt, connection_info, target_environment,
-                             instance_name, export_info):
-        pass
-
-    @abc.abstractmethod
-    def update_replica_disks(self, ctxt, connection_info, target_environment,
                              instance_name, export_info, volumes_info):
         pass
 
@@ -87,13 +82,28 @@ class BaseReplicaImportProvider(BaseImportProvider):
 
     @abc.abstractmethod
     def delete_replica_resources(self, ctxt, connection_info,
-                                 target_replica_info):
+                                 migr_resources_dict):
         pass
 
     @abc.abstractmethod
     def delete_replica_disks(self, ctxt, connection_info, volumes_info):
         pass
 
+    @abc.abstractmethod
+    def create_replica_disk_snapshots(self, ctxt, connection_info,
+                                      volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def delete_replica_disk_snapshots(self, ctxt, connection_info,
+                                      volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def restore_replica_disk_snapshots(self, ctxt, connection_info,
+                                       volumes_info):
+        pass
+
 
 class BaseExportProvider(BaseProvider):
     __metaclass__ = abc.ABCMeta

+ 178 - 34
coriolis/providers/openstack/__init__.py

@@ -123,6 +123,19 @@ def _wait_for_instance(nova, instance, expected_status='ACTIVE'):
             "VM is in status: %s" % instance.status)
 
 
+@utils.retry_on_error()
+def _find_volume(cinder, volume_id):
+    volumes = cinder.volumes.findall(id=volume_id)
+    if volumes:
+        return volumes[0]
+
+
+@utils.retry_on_error()
+def _extend_volume(cinder, volume_id, new_size):
+    volume_size_gb = math.ceil(new_size / units.Gi)
+    cinder.volumes.extend(volume_id, volume_size_gb)
+
+
 @utils.retry_on_error()
 def _create_volume(cinder, size, name, image_ref=None):
     volume_size_gb = math.ceil(size / units.Gi)
@@ -133,8 +146,12 @@ def _create_volume(cinder, size, name, image_ref=None):
 
 
 @utils.retry_on_error()
-def _wait_for_volume(cinder, volume, expected_status='in-use'):
-    volume = cinder.volumes.findall(id=volume.id)[0]
+def _wait_for_volume(cinder, volume_id, expected_status='available'):
+    volumes = cinder.volumes.findall(id=volume_id)
+    if not volumes:
+        raise exception.CoriolisException("Volume not found")
+    volume = volumes[0]
+
     while volume.status not in [expected_status, 'error']:
         time.sleep(2)
         volume = cinder.volumes.get(volume.id)
@@ -150,6 +167,35 @@ def _delete_volume(cinder, volume_id):
         volume.delete()
 
 
+@utils.retry_on_error()
+def _create_volume_snapshot(cinder, volume_id, name):
+    return cinder.volume_snapshots.create(volume_id, name=name)
+
+
+@utils.retry_on_error()
+def _wait_for_volume_snapshot(cinder, snapshot_id,
+                              expected_status='available'):
+    snapshots = cinder.volume_snapshots.findall(id=snapshot_id)
+
+    if not snapshots:
+        raise exception.CoriolisException("Volume snapshot not found")
+    snapshot = snapshots[0]
+
+    while snapshot.status not in [expected_status, 'error']:
+        time.sleep(2)
+        snapshot = cinder.volume_snapshots.get(snapshot.id)
+    if snapshot.status != expected_status:
+        raise exception.CoriolisException(
+            "Volume snapshot is in status: %s" % snapshot.status)
+
+
+@utils.retry_on_error()
+def _delete_volume_snapshot(cinder, snapshot_id):
+    snapshots = cinder.volume_snapshots.findall(id=snapshot_id)
+    for snapshot in snapshots:
+        return cinder.volume_snapshots.delete(snapshot.id)
+
+
 class _MigrationResources(object):
     def __init__(self, nova, neutron, keypair, instance, port,
                  floating_ip, guest_port, sec_group, username, password, k):
@@ -452,7 +498,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
         # volume can be either a Volume object or an id
         volume = nova.volumes.create_server_volume(
             instance.id, volume_id, volume_dev)
-        _wait_for_volume(cinder, volume, 'in-use')
+        _wait_for_volume(cinder, volume.id, 'in-use')
         return volume
 
     def _get_import_config(self, target_environment, os_type):
@@ -520,7 +566,8 @@ class ImportProvider(base.BaseReplicaImportProvider):
 
         return config
 
-    def _create_images_and_volumes(self, glance, nova, config, disks_info):
+    def _create_images_and_volumes(self, glance, nova, cinder, config,
+                                   disks_info):
         if not config.glance_upload:
             raise exception.CoriolisException(
                 "Glance upload is currently required for migrations")
@@ -637,7 +684,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
                 # Migration
                 disks_info = export_info["devices"]["disks"]
                 images, volumes = self._create_images_and_volumes(
-                    glance, nova, config, disks_info)
+                    glance, nova, cinder, config, disks_info)
             else:
                 # Replica
                 volumes = self._get_replica_volumes(cinder, volumes_info)
@@ -651,7 +698,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
 
             try:
                 for i, volume in enumerate(volumes):
-                    _wait_for_volume(cinder, volume, 'available')
+                    _wait_for_volume(cinder, volume.id, 'available')
 
                     self._event_manager.progress_update(
                         "Attaching volume to worker instance")
@@ -769,44 +816,109 @@ class ImportProvider(base.BaseReplicaImportProvider):
         self._deploy_instance(ctxt, connection_info, target_environment,
                               instance_name, export_info, volumes_info)
 
-    def deploy_replica_disks(self, ctxt, connection_info, target_environment,
-                             instance_name, export_info):
-        session = keystone.create_keystone_session(ctxt, connection_info)
+    def _update_existing_disk_volumes(self, cinder, disks_info, volumes_info):
+        for disk_info in disks_info:
+            disk_id = disk_info["id"]
 
-        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+            vi = [v for v in volumes_info
+                  if v["disk_id"] == disk_id and v.get("volume_id")]
+            if vi:
+                volume_info = vi[0]
+                volume_id = volume_info["volume_id"]
+
+                volume = _find_volume(cinder, volume_id)
+                if volume:
+                    virtual_disk_size_gb = math.ceil(
+                        disk_info["size_bytes"] / units.Gi)
+
+                    if virtual_disk_size_gb > volume.size:
+                        LOG.info(
+                            "Extending volume %(volume_id)s. "
+                            "Current size: %(curr_size)s GB, "
+                            "Requested size: %(requested_size)s GB",
+                            {"volume_id": volume_id,
+                             "curr_size": virtual_disk_size_gb,
+                             "requested_size": volume.size})
+                        self._event_manager.progress_update("Extending volume")
+                        _extend_volume(
+                            cinder, volume_id, virtual_disk_size_gb * units.Gi)
+                    elif virtual_disk_size_gb < volume.size:
+                        LOG.warning(
+                            "Cannot shrink volume %(volume_id)s. "
+                            "Current size: %(curr_size)s GB, "
+                            "Requested size: %(requested_size)s GB",
+                            {"volume_id": volume_id,
+                             "curr_size": volume.size,
+                             "requested_size": virtual_disk_size_gb})
+                else:
+                    volumes_info.remove(volume_info)
+
+        return volumes_info
 
+    def _delete_removed_disk_volumes(self, cinder, disks_info, volumes_info):
+        for volume_info in volumes_info:
+            if volume_info["disk_id"] not in [
+                    d["id"] for d in disks_info if d["id"]]:
+
+                volume_id = volume_info["volume_id"]
+                volume = _find_volume(cinder, volume_id)
+                if volume:
+                    self._event_manager.progress_update("Deleting volume")
+                    _delete_volume(cinder, volume_id)
+                volumes_info.remove(volume_info)
+        return volumes_info
+
+    def _create_new_disk_volumes(self, cinder, disks_info, volumes_info,
+                                 instance_name):
         try:
-            disks_info = export_info["devices"]["disks"]
-            volumes_info = []
-            volumes = []
+            new_volumes = []
             for i, disk_info in enumerate(disks_info):
-                self._event_manager.progress_update(
-                    "Creating volume")
-
                 disk_id = disk_info["id"]
                 virtual_disk_size = disk_info["size_bytes"]
-                volume_name = REPLICA_VOLUME_NAME_FORMAT % {
-                    "instance_name": instance_name, "num": i + 1}
-                volume = _create_volume(
-                    cinder, virtual_disk_size, volume_name)
-                volumes.append(volume)
-                volumes_info.append({
-                    "volume_id": volume.id,
-                    "disk_id": disk_id})
-
-            for volume in volumes:
-                _wait_for_volume(cinder, volume, 'available')
+
+                if not [v for v in volumes_info if v["disk_id"] == disk_id]:
+                    self._event_manager.progress_update(
+                        "Creating volume")
+
+                    volume_name = REPLICA_VOLUME_NAME_FORMAT % {
+                        "instance_name": instance_name, "num": i + 1}
+                    volume = _create_volume(
+                        cinder, virtual_disk_size, volume_name)
+
+                    new_volumes.append(volume)
+                    volumes_info.append({
+                        "volume_id": volume.id,
+                        "disk_id": disk_id})
+                else:
+                    self._event_manager.progress_update(
+                        "Using previously deployed volume")
+
+            for volume in new_volumes:
+                _wait_for_volume(cinder, volume.id, 'available')
 
             return volumes_info
         except:
-            for volume in volumes:
+            for volume in new_volumes:
                 _delete_volume(cinder, volume)
             raise
 
-    def update_replica_disks(self, ctxt, connection_info, target_environment,
+    def deploy_replica_disks(self, ctxt, connection_info, target_environment,
                              instance_name, export_info, volumes_info):
-        # TODO: check if source disk size / number changed and update
-        # accordingly on destination
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        disks_info = export_info["devices"]["disks"]
+
+        volumes_info = self._update_existing_disk_volumes(
+            cinder, disks_info, volumes_info)
+
+        volumes_info = self._delete_removed_disk_volumes(
+            cinder, disks_info, volumes_info)
+
+        volumes_info = self._create_new_disk_volumes(
+            cinder, disks_info, volumes_info, instance_name)
+
         return volumes_info
 
     def deploy_replica_resources(self, ctxt, connection_info,
@@ -842,7 +954,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
 
             return {
                 "migr_resources": migr_resources.get_resources_dict(),
-                "volumes": volumes_info,
+                "volumes_info": volumes_info,
                 "connection_info": migr_resources.get_guest_connection_info(),
             }
         except:
@@ -852,13 +964,12 @@ class ImportProvider(base.BaseReplicaImportProvider):
             raise
 
     def delete_replica_resources(self, ctxt, connection_info,
-                                 target_replica_info):
+                                 migr_resources_dict):
         session = keystone.create_keystone_session(ctxt, connection_info)
 
         nova = nova_client.Client(NOVA_API_VERSION, session=session)
         neutron = neutron_client.Client(NEUTRON_API_VERSION, session=session)
 
-        migr_resources_dict = target_replica_info["migr_resources"]
         migr_resources = _MigrationResources.from_resources_dict(
             nova, neutron, migr_resources_dict)
         self._event_manager.progress_update(
@@ -875,6 +986,39 @@ class ImportProvider(base.BaseReplicaImportProvider):
         for volume_info in volumes_info:
             _delete_volume(cinder, volume_info["volume_id"])
 
+    def create_replica_disk_snapshots(self, ctxt, connection_info,
+                                      volumes_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        snapshots = []
+        self._event_manager.progress_update(
+            "Creating replica disk snapshots")
+        for volume_info in volumes_info:
+            snapshot = _create_volume_snapshot(
+                cinder, volume_info["volume_id"], _get_unique_name())
+            snapshots.append(snapshot)
+            volume_info["volume_snapshot_id"] = snapshot.id
+
+        for snapshot in snapshots:
+            _wait_for_volume_snapshot(cinder, snapshot.id, 'available')
+
+        return volumes_info
+
+    def delete_replica_disk_snapshots(self, ctxt, connection_info,
+                                      volumes_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        self._event_manager.progress_update(
+            "Removing replica disk snapshots")
+        for volume_info in volumes_info:
+            snapshot_id = volume_info.get("volume_snapshot_id")
+            if snapshot_id:
+                _delete_volume_snapshot(cinder, snapshot_id)
+
 
 class ExportProvider(base.BaseExportProvider):
     _OS_DISTRO_MAP = {

+ 6 - 0
coriolis/providers/openstack/schemas/connection_info_schema.json

@@ -45,6 +45,12 @@
         "project_domain_name",
         "auth_url"
       ]
+    },
+    {
+      "type": "object",
+      "additionalProperties": false,
+      "properties": {
+      }
     }
   ]
 }

+ 1 - 4
coriolis/providers/openstack/schemas/target_environment_schema.json

@@ -40,10 +40,7 @@
     {
       "required": [
         "network_map",
-        "flavor_name",
-        "fip_pool_name",
-        "migr_fip_pool_name",
-        "keypair_name"
+        "flavor_name"
       ]
     }
   ]

+ 46 - 6
coriolis/providers/vmware_vsphere/__init__.py

@@ -7,6 +7,7 @@ import os
 import re
 import struct
 import sys
+import threading
 import time
 from urllib import request
 import uuid
@@ -173,7 +174,8 @@ class _SSHBackupWriter(_BaseBackupWriter):
         ret_val = self._stdout.channel.recv_exit_status()
         if ret_val:
             raise exception.CoriolisException(
-                "An exception occurred while writing data on target")
+                "An exception occurred while writing data on target. "
+                "Error code: %s" % ret_val)
         self._ssh.close()
 
 
@@ -195,6 +197,17 @@ class ExportProvider(base.BaseReplicaExportProvider):
         if task.info.state == vim.TaskInfo.State.error:
             raise exception.CoriolisException(task.info.error.msg)
 
+    @staticmethod
+    def _keep_alive_vmware_conn(si, exit_event):
+        try:
+            while True:
+                LOG.debug("VMware connection keep alive")
+                si.CurrentTime()
+                if exit_event.wait(60):
+                    return
+        finally:
+            LOG.debug("Exiting VMware connection keep alive thread")
+
     @utils.retry_on_error()
     @contextlib.contextmanager
     def _connect(self, connection_info):
@@ -219,10 +232,21 @@ class ExportProvider(base.BaseReplicaExportProvider):
             pwd=password,
             port=port,
             sslContext=context)
+
+        thread = None
         try:
+            thread_exit_event = threading.Event()
+            thread = threading.Thread(
+                target=self._keep_alive_vmware_conn,
+                args=(si, thread_exit_event))
+            thread.start()
+
             yield context, si
         finally:
             connect.Disconnect(si)
+            if thread:
+                thread_exit_event.set()
+                thread.join()
 
     def _wait_for_vm_status(self, vm, status, max_wait=120):
         i = 0
@@ -245,7 +269,7 @@ class ExportProvider(base.BaseReplicaExportProvider):
             else:
                 container = container.childEntity[0].vmFolder
 
-        LOG.debug("VM path items:", path_items)
+        LOG.debug("VM path items: %s", path_items)
         for i, path_item in enumerate(path_items):
             l = [o for o in container.childEntity if o.name == path_item]
             if not l:
@@ -338,7 +362,9 @@ class ExportProvider(base.BaseReplicaExportProvider):
             disks.append({'size_bytes': device.capacityInBytes,
                           'unit_number': device.unitNumber,
                           'id': device.key,
-                          'controller_id': device.controllerKey})
+                          'controller_id': device.controllerKey,
+                          'path': device.backing.fileName,
+                          'format': constants.DISK_FORMAT_VMDK})
 
         cdroms = []
         devices = [d for d in vm.config.hardware.device if
@@ -351,7 +377,8 @@ class ExportProvider(base.BaseReplicaExportProvider):
         devices = [d for d in vm.config.hardware.device if
                    isinstance(d, vim.vm.device.VirtualFloppy)]
         for device in devices:
-            floppies.append({'unit_number': device.unitNumber, 'id': device.key,
+            floppies.append({'unit_number': device.unitNumber,
+                             'id': device.key,
                              'controller_id': device.controllerKey})
 
         nics = []
@@ -488,12 +515,12 @@ class ExportProvider(base.BaseReplicaExportProvider):
     @contextlib.contextmanager
     def _take_temp_vm_snapshot(self, vm, snapshot_name, memory=False,
                                quiesce=True):
-        self._event_manager.progress_update("Creating backup snapshot")
+        self._event_manager.progress_update("Creating snapshot")
         snapshot = self._take_vm_snapshot(vm, snapshot_name, memory, quiesce)
         try:
             yield snapshot
         finally:
-            self._event_manager.progress_update("Removing backup snapshot")
+            self._event_manager.progress_update("Removing snapshot")
             self._remove_vm_snapshot(snapshot)
 
     @utils.retry_on_error()
@@ -533,6 +560,15 @@ class ExportProvider(base.BaseReplicaExportProvider):
 
                 backup_disk_path = disk.backing.fileName
 
+                if change_id == '*':
+                    self._event_manager.progress_update(
+                        "Performing full CBT replica for disk: %s" %
+                        backup_disk_path)
+                else:
+                    self._event_manager.progress_update(
+                        "Performing incremental CBT replica for disk: %s" %
+                        backup_disk_path)
+
                 with vixdisklib.open(
                         conn, backup_disk_path) as disk_handle:
 
@@ -554,6 +590,10 @@ class ExportProvider(base.BaseReplicaExportProvider):
                                 buf = vixdisklib.get_buffer(
                                     curr_num_sectors * sector_size)
 
+                                LOG.debug(
+                                    "Read start sector: %s, num sectors: %s" %
+                                    (start_sector + i, curr_num_sectors))
+
                                 vixdisklib.read(
                                     disk_handle, start_sector + i,
                                     curr_num_sectors, buf)

+ 0 - 0
coriolis/replica_tasks_executions/__init__.py


+ 29 - 0
coriolis/replica_tasks_executions/api.py

@@ -0,0 +1,29 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def create(self, ctxt, replica_id, shutdown_instances):
+        return self._rpc_client.execute_replica_tasks(
+            ctxt, replica_id, shutdown_instances)
+
+    def delete(self, ctxt, execution_id):
+        self._rpc_client.delete_replica_tasks_execution(
+            ctxt, execution_id)
+
+    def cancel(self, ctxt, execution_id):
+        self._rpc_client.cancel_replica_tasks_execution(
+            ctxt, execution_id)
+
+    def get_executions(self, ctxt, replica_id, include_tasks=False):
+        return self._rpc_client.get_replica_tasks_executions(
+            ctxt, replica_id, include_tasks)
+
+    def get_execution(self, ctxt, execution_id):
+        return self._rpc_client.get_replica_tasks_execution(
+            ctxt, execution_id)

+ 0 - 0
coriolis/replicas/__init__.py


+ 25 - 0
coriolis/replicas/api.py

@@ -0,0 +1,25 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def create(self, ctxt, origin, destination, instances):
+        return self._rpc_client.create_instances_replica(
+            ctxt, origin, destination, instances)
+
+    def delete(self, ctxt, replica_id):
+        self._rpc_client.delete_replica(ctxt, replica_id)
+
+    def get_replicas(self, ctxt, include_tasks_executions=False):
+        return self._rpc_client.get_replicas(ctxt, include_tasks_executions)
+
+    def get_replica(self, ctxt, replica_id):
+        return self._rpc_client.get_replica(ctxt, replica_id)
+
+    def delete_disks(self, ctxt, replica_id):
+        return self._rpc_client.delete_replica_disks(ctxt, replica_id)

+ 1 - 1
coriolis/schemas.py

@@ -32,7 +32,7 @@ def get_schema(package_name, schema_name,
     schema = json.loads(template_env.get_template(schema_name).render())
 
     LOG.debug("Succesfully loaded and parsed schema '%s' from '%s'.",
-             schema_name, package_name)
+              schema_name, package_name)
     return schema
 
 

+ 0 - 0
coriolis/tasks/__init__.py


+ 26 - 0
coriolis/tasks/base.py

@@ -0,0 +1,26 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import abc
+
+from coriolis import secrets
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class TaskRunner(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        pass
+
+
+def get_connection_info(ctxt, data):
+    connection_info = data.get("connection_info") or {}
+    secret_ref = connection_info.get("secret_ref")
+    if secret_ref:
+        LOG.info("Retrieving connection info from secret: %s", secret_ref)
+        connection_info = secrets.get_secret(ctxt, secret_ref)
+    return connection_info

+ 42 - 0
coriolis/tasks/factory.py

@@ -0,0 +1,42 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import constants
+from coriolis import exception
+from coriolis.tasks import migration_tasks
+from coriolis.tasks import replica_tasks
+
+_TASKS_MAP = {
+    constants.TASK_TYPE_EXPORT_INSTANCE:
+        migration_tasks.ExportInstanceTask,
+    constants.TASK_TYPE_IMPORT_INSTANCE:
+        migration_tasks.ImportInstanceTask,
+    constants.TASK_TYPE_GET_INSTANCE_INFO:
+        replica_tasks.GetInstanceInfoTask,
+    constants.TASK_TYPE_REPLICATE_DISKS:
+        replica_tasks.ReplicateDisksTask,
+    constants.TASK_TYPE_SHUTDOWN_INSTANCE:
+        replica_tasks.ShutdownInstanceTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_DISKS:
+        replica_tasks.DeployReplicaDisksTask,
+    constants.TASK_TYPE_DELETE_REPLICA_DISKS:
+        replica_tasks.DeleteReplicaDisksTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_RESOURCES:
+        replica_tasks.DeployReplicaResourcesTask,
+    constants.TASK_TYPE_DELETE_REPLICA_RESOURCES:
+        replica_tasks.DeleteReplicaResourcesTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE:
+        replica_tasks.DeployReplicaInstanceTask,
+    constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS:
+        replica_tasks.CreateReplicaDiskSnapshotsTask,
+    constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
+        replica_tasks.DeleteReplicaDiskSnapshotsTask,
+}
+
+
+def get_task_runner(task_type):
+    cls = _TASKS_MAP.get(task_type)
+    if not cls:
+        raise exception.NotFound(
+            "TaskRunner not found for task type: %s" % task_type)
+    return cls()

+ 47 - 0
coriolis/tasks/migration_tasks.py

@@ -0,0 +1,47 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import constants
+from coriolis.providers import factory as providers_factory
+from coriolis import schemas
+from coriolis.tasks import base
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class ExportInstanceTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+        export_path = task_info["export_path"]
+
+        export_info = provider.export_instance(
+            ctxt, connection_info, instance, export_path)
+
+        # Validate the output
+        schemas.validate_value(
+            export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
+        task_info["export_info"] = export_info
+        task_info["retain_export_path"] = True
+
+        return task_info
+
+
+class ImportInstanceTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        target_environment = destination.get("target_environment") or {}
+        export_info = task_info["export_info"]
+
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        provider.import_instance(
+            ctxt, connection_info, target_environment, instance, export_info)
+
+        return task_info

+ 201 - 0
coriolis/tasks/replica_tasks.py

@@ -0,0 +1,201 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import constants
+from coriolis.providers import factory as providers_factory
+from coriolis import schemas
+from coriolis.tasks import base
+from coriolis import utils
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class GetInstanceInfoTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+
+        export_info = provider.get_replica_instance_info(
+            ctxt, connection_info, instance)
+
+        # Validate the output
+        schemas.validate_value(
+            export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
+        task_info["export_info"] = export_info
+
+        return task_info
+
+
+class ShutdownInstanceTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+
+        provider.shutdown_instance(ctxt, connection_info, instance)
+
+        return task_info
+
+
+class ReplicateDisksTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+
+        volumes_info = task_info["volumes_info"]
+
+        migr_conn_info = task_info["migr_connection_info"]
+        pkey_str = migr_conn_info["pkey"]
+        migr_conn_info["pkey"] = utils.deserialize_key(pkey_str)
+
+        incremental = task_info.get("incremental", True)
+
+        volumes_info = provider.replicate_disks(
+            ctxt, connection_info, instance, migr_conn_info,
+            volumes_info, incremental)
+
+        task_info["migr_connection_info"] = pkey_str
+        task_info["volumes_info"] = volumes_info
+
+        return task_info
+
+
+class DeployReplicaDisksTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        target_environment = destination.get("target_environment") or {}
+        export_info = task_info["export_info"]
+
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info.get("volumes_info", [])
+
+        volumes_info = provider.deploy_replica_disks(
+            ctxt, connection_info, target_environment, instance, export_info,
+            volumes_info)
+
+        task_info["volumes_info"] = volumes_info
+
+        return task_info
+
+
+class DeleteReplicaDisksTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info["volumes_info"]
+
+        provider.delete_replica_disks(
+            ctxt, connection_info, volumes_info)
+
+        del task_info["volumes_info"]
+
+        return task_info
+
+
+class DeployReplicaResourcesTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        target_environment = destination.get("target_environment") or {}
+
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info["volumes_info"]
+
+        replica_resources_info = provider.deploy_replica_resources(
+            ctxt, connection_info, target_environment, volumes_info)
+
+        task_info["volumes_info"] = replica_resources_info["volumes_info"]
+        task_info["migr_resources"] = replica_resources_info["migr_resources"]
+
+        migr_connection_info = replica_resources_info["connection_info"]
+        migr_connection_info["pkey"] = utils.serialize_key(
+            migr_connection_info["pkey"])
+        task_info["migr_connection_info"] = migr_connection_info
+
+        return task_info
+
+
+class DeleteReplicaResourcesTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        migr_resources = task_info["migr_resources"]
+
+        provider.delete_replica_resources(
+            ctxt, connection_info, migr_resources)
+
+        del task_info["migr_resources"]
+        del task_info["migr_connection_info"]
+
+        return task_info
+
+
+class DeployReplicaInstanceTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        target_environment = destination.get("target_environment") or {}
+        export_info = task_info["export_info"]
+
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info["volumes_info"]
+
+        provider.deploy_replica_instance(
+            ctxt, connection_info, target_environment, instance,
+            export_info, volumes_info)
+
+        return task_info
+
+
+class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info["volumes_info"]
+
+        volumes_info = provider.create_replica_disk_snapshots(
+            ctxt, connection_info, volumes_info)
+
+        task_info["volumes_info"] = volumes_info
+
+        return task_info
+
+
+class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info["volumes_info"]
+
+        volumes_info = provider.delete_replica_disk_snapshots(
+            ctxt, connection_info, volumes_info)
+
+        task_info["volumes_info"] = volumes_info
+
+        return task_info

+ 29 - 0
coriolis/utils.py

@@ -2,8 +2,10 @@
 # All Rights Reserved.
 
 import functools
+import io
 import json
 import os
+import pickle
 import re
 import socket
 import subprocess
@@ -13,6 +15,8 @@ import traceback
 import OpenSSL
 from oslo_config import cfg
 from oslo_log import log as logging
+from oslo_serialization import jsonutils
+import paramiko
 
 from coriolis import constants
 from coriolis import exception
@@ -245,3 +249,28 @@ def _get_base_dir():
 
 def get_resources_dir():
     return os.path.join(_get_base_dir(), "resources")
+
+
+def serialize_key(key):
+    key_io = io.StringIO()
+    key.write_private_key(key_io)
+    return key_io.getvalue()
+
+
+def deserialize_key(key_bytes):
+    key_io = io.StringIO(key_bytes)
+    return paramiko.RSAKey.from_private_key(key_io)
+
+
+def is_serializable(obj):
+    pickle.dumps(obj)
+
+
+def to_dict(obj, max_depth=10):
+    # jsonutils.dumps() has a max_depth of 3 by default
+    def _to_primitive(value, convert_instances=False,
+                      convert_datetime=True, level=0,
+                      max_depth=max_depth):
+        return jsonutils.to_primitive(
+            value, convert_instances, convert_datetime, level, max_depth)
+    return jsonutils.loads(jsonutils.dumps(obj, default=_to_primitive))

+ 31 - 59
coriolis/worker/rpc/server.py

@@ -16,11 +16,10 @@ from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis import constants
 from coriolis import events
 from coriolis import exception
-from coriolis.providers import factory
-from coriolis import schemas
-from coriolis import secrets
+from coriolis.tasks import factory as task_runners_factory
 from coriolis import utils
 
+
 worker_opts = [
     cfg.StrOpt('export_base_path',
                default='/tmp',
@@ -32,8 +31,6 @@ CONF.register_opts(worker_opts, 'worker')
 
 LOG = logging.getLogger(__name__)
 
-TMP_DIRS_KEY = "__tmp_dirs"
-
 VERSION = "1.0"
 
 
@@ -69,28 +66,14 @@ class WorkerServerEndpoint(object):
         self._server = utils.get_hostname()
         self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
 
-    def _cleanup_task_resources(self, task_id, task_info=None):
+    def _check_remove_dir(self, path):
         try:
-            export_path = _get_task_export_path(task_id)
-            if (not task_info or export_path not in
-                    task_info.get(TMP_DIRS_KEY, [])):
-                # Don't remove folder if it's needed by the dependent tasks
-                if os.path.exists(export_path):
-                    shutil.rmtree(export_path)
+            if os.path.exists(path):
+                shutil.rmtree(path)
         except Exception as ex:
             # Ignore the exception
             LOG.exception(ex)
 
-    def _remove_tmp_dirs(self, task_info):
-        if task_info:
-            for tmp_dir in task_info.get(TMP_DIRS_KEY, []):
-                if os.path.exists(tmp_dir):
-                    try:
-                        shutil.rmtree(tmp_dir)
-                    except Exception as ex:
-                        # Ignore exception
-                        LOG.exception(ex)
-
     def cancel_task(self, ctxt, process_id):
         try:
             p = psutil.Process(process_id)
@@ -138,6 +121,13 @@ class WorkerServerEndpoint(object):
 
     def exec_task(self, ctxt, task_id, task_type, origin, destination,
                   instance, task_info):
+        export_path = task_info.get("export_path")
+        if not export_path:
+            export_path = _get_task_export_path(task_id, create=True)
+            task_info["export_path"] = export_path
+        retain_export_path = False
+        task_info["retain_export_path"] = retain_export_path
+
         try:
             new_task_info = self._exec_task_process(
                 ctxt, task_id, task_type, origin, destination,
@@ -146,18 +136,20 @@ class WorkerServerEndpoint(object):
             if new_task_info:
                 LOG.info("Task info: %s", new_task_info)
 
+            # TODO: replace the temp storage with a host independent option
+            retain_export_path = new_task_info.get("retain_export_path", False)
+            if not retain_export_path:
+                del new_task_info["export_path"]
+
             LOG.info("Task completed: %s", task_id)
             self._rpc_conductor_client.task_completed(ctxt, task_id,
                                                       new_task_info)
-
-            self._cleanup_task_resources(task_id, new_task_info)
         except Exception as ex:
             LOG.exception(ex)
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))
-
-            self._cleanup_task_resources(task_id)
         finally:
-            self._remove_tmp_dirs(task_info)
+            if not retain_export_path:
+                self._check_remove_dir(export_path)
 
 
 def _get_task_export_path(task_id, create=False):
@@ -184,43 +176,23 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
     try:
         _setup_task_process(mp_log_q)
 
-        if task_type == constants.TASK_TYPE_EXPORT_INSTANCE:
-            provider_type = constants.PROVIDER_TYPE_EXPORT
-            data = origin
-        elif task_type == constants.TASK_TYPE_IMPORT_INSTANCE:
-            provider_type = constants.PROVIDER_TYPE_IMPORT
-            data = destination
-        else:
-            raise exception.NotFound(
-                "Unknown task type: %s" % task_type)
-
+        task_runner = task_runners_factory.get_task_runner(task_type)
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
-        provider = factory.get_provider(data["type"], provider_type,
-                                        event_handler)
 
-        connection_info = data.get("connection_info") or {}
-        target_environment = data.get("target_environment") or {}
+        LOG.debug("Executing task: %(task_id)s, type: %(task_type)s, "
+                  "origin: %(origin)s, destination: %(destination)s, "
+                  "instance: %(instance)s, task_info: %(task_info)s",
+                  {"task_id": task_id, "task_type": task_type,
+                   "origin": origin, "destination": destination,
+                   "instance": instance, "task_info": task_info})
 
-        secret_ref = connection_info.get("secret_ref")
-        if secret_ref:
-            LOG.info("Retrieving connection info from secret: %s", secret_ref)
-            connection_info = secrets.get_secret(ctxt, secret_ref)
+        new_task_info = task_runner.run(
+            ctxt, instance, origin, destination, task_info, event_handler)
 
-        if provider_type == constants.PROVIDER_TYPE_EXPORT:
-            export_path = _get_task_export_path(task_id, create=True)
+        # mq_p.put() doesn't raise if new_task_info is not serializable
+        utils.is_serializable(new_task_info)
 
-            result = provider.export_instance(ctxt, connection_info, instance,
-                                              export_path)
-            result[TMP_DIRS_KEY] = [export_path]
-
-            # validate the outputted VM info:
-            schemas.validate_value(
-                result, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
-        else:
-            result = provider.import_instance(ctxt, connection_info,
-                                              target_environment, instance,
-                                              task_info)
-        mp_q.put(result)
+        mp_q.put(new_task_info)
     except Exception as ex:
         mp_q.put(str(ex))
         LOG.exception(ex)

+ 3 - 0
resources/makefile

@@ -0,0 +1,3 @@
+write_data: write_data.c
+	gcc -o write_data write_data.c
+