Просмотр исходного кода

Updates migrations and replicas to use enpoints only

Alessandro Pilotti 9 лет назад
Родитель
Сommit
2c0a87e728

+ 12 - 18
coriolis/api/v1/migrations.py

@@ -40,23 +40,13 @@ class MigrationController(api_wsgi.Controller):
 
     def _validate_migration_input(self, migration):
         try:
-            origin = migration["origin"]
-            destination = migration["destination"]
+            origin_endpoint_id = migration["origin_endpoint_id"]
+            destination_endpoint_id = migration["destination_endpoint_id"]
+            destination_environment = migration.get("destination_environment")
+            instances = migration["instances"]
 
-            export_provider = factory.get_provider(
-                origin["type"], constants.PROVIDER_TYPE_EXPORT, None)
-            export_provider.validate_connection_info(
-                origin.get("connection_info", {}))
-
-            import_provider = factory.get_provider(
-                destination["type"], constants.PROVIDER_TYPE_IMPORT, None)
-            import_provider.validate_connection_info(
-                destination.get("connection_info", {}))
-
-            import_provider.validate_target_environment(
-                destination.get("target_environment", {}))
-
-            return origin, destination, migration["instances"]
+            return (origin_endpoint_id, destination_endpoint_id,
+                    destination_environment, instances)
         except Exception as ex:
             LOG.exception(ex)
             if hasattr(ex, "message"):
@@ -78,10 +68,14 @@ class MigrationController(api_wsgi.Controller):
             migration = self._migration_api.deploy_replica_instances(
                 context, replica_id, clone_disks, force)
         else:
-            origin, destination, instances = self._validate_migration_input(
+            (origin_endpoint_id,
+             destination_endpoint_id,
+             destination_environment,
+             instances) = self._validate_migration_input(
                 migration_body)
             migration = self._migration_api.migrate_instances(
-                context, origin, destination, instances)
+                context, origin_endpoint_id, destination_endpoint_id,
+                destination_environment, instances)
 
         return migration_view.single(req, migration)
 

+ 11 - 18
coriolis/api/v1/replicas.py

@@ -44,23 +44,13 @@ class ReplicaController(api_wsgi.Controller):
         try:
             replica = body["replica"]
 
-            origin = replica["origin"]
-            destination = replica["destination"]
+            origin_endpoint_id = replica["origin_endpoint_id"]
+            destination_endpoint_id = replica["destination_endpoint_id"]
+            destination_environment = replica.get("destination_environment")
+            instances = replica["instances"]
 
-            export_provider = factory.get_provider(
-                origin["type"], constants.PROVIDER_TYPE_EXPORT, None)
-            export_provider.validate_connection_info(
-                origin.get("connection_info", {}))
-
-            import_provider = factory.get_provider(
-                destination["type"], constants.PROVIDER_TYPE_IMPORT, None)
-            import_provider.validate_connection_info(
-                destination.get("connection_info", {}))
-
-            import_provider.validate_target_environment(
-                destination.get("target_environment", {}))
-
-            return origin, destination, replica["instances"]
+            return (origin_endpoint_id, destination_endpoint_id,
+                    destination_environment, instances)
         except Exception as ex:
             LOG.exception(ex)
             if hasattr(ex, "message"):
@@ -70,9 +60,12 @@ class ReplicaController(api_wsgi.Controller):
             raise exception.InvalidInput(msg)
 
     def create(self, req, body):
-        origin, destination, instances = self._validate_create_body(body)
+        (origin_endpoint_id, destination_endpoint_id,
+         destination_environment, instances) = self._validate_create_body(body)
+
         return replica_view.single(req, self._replica_api.create(
-            req.environ['coriolis.context'], origin, destination, instances))
+            req.environ['coriolis.context'], origin_endpoint_id,
+            destination_endpoint_id, destination_environment, instances))
 
     def delete(self, req, id):
         try:

+ 17 - 6
coriolis/conductor/rpc/client.py

@@ -65,10 +65,16 @@ class ConductorClient(object):
             ctxt, 'cancel_replica_tasks_execution', replica_id=replica_id,
             execution_id=execution_id, force=force)
 
-    def create_instances_replica(self, ctxt, origin, destination, instances):
-        return self._client.call(
-            ctxt, 'create_instances_replica', origin=origin,
-            destination=destination, instances=instances)
+    def create_instances_replica(self, ctxt, origin_endpoint_id,
+                                 destination_endpoint_id,
+                                 destination_environment,
+                                 instances):
+        return self._client.call(
+            ctxt, 'create_instances_replica',
+            origin_endpoint_id=origin_endpoint_id,
+            destination_endpoint_id=destination_endpoint_id,
+            destination_environment=destination_environment,
+            instances=instances)
 
     def get_replicas(self, ctxt, include_tasks_executions=False):
         return self._client.call(
@@ -95,9 +101,14 @@ class ConductorClient(object):
         return self._client.call(
             ctxt, 'get_migration', migration_id=migration_id)
 
-    def migrate_instances(self, ctxt, origin, destination, instances):
+    def migrate_instances(self, ctxt, origin_endpoint_id,
+                          destination_endpoint_id, destination_environment,
+                          instances):
         return self._client.call(
-            ctxt, 'migrate_instances', origin=origin, destination=destination,
+            ctxt, 'migrate_instances',
+            origin_endpoint_id=origin_endpoint_id,
+            destination_endpoint_id=destination_endpoint_id,
+            destination_environment=destination_environment,
             instances=instances)
 
     def deploy_replica_instances(self, ctxt, replica_id, clone_disks=False,

+ 75 - 34
coriolis/conductor/rpc/server.py

@@ -141,9 +141,27 @@ class ConductorServerEndpoint(object):
                         break
         return task
 
+    def _get_task_origin(self, ctxt, action):
+        endpoint = self.get_endpoint(ctxt, action.origin_endpoint_id)
+        return {
+            "connection_info": endpoint.connection_info,
+            "type": endpoint.type
+        }
+
+    def _get_task_destination(self, ctxt, action):
+        endpoint = self.get_endpoint(ctxt, action.destination_endpoint_id)
+        return {
+            "connection_info": endpoint.connection_info,
+            "type": endpoint.type,
+            "target_environment": action.destination_environment
+        }
+
     def _begin_tasks(self, ctxt, execution, task_info={}):
         keystone.create_trust(ctxt)
 
+        origin = self._get_task_origin(ctxt, execution.action)
+        destination = self._get_task_destination(ctxt, execution.action)
+
         for task in execution.tasks:
             if (not task.depends_on and
                     task.status == constants.TASK_STATUS_PENDING):
@@ -151,8 +169,8 @@ class ConductorServerEndpoint(object):
                     ctxt, server=None,
                     task_id=task.id,
                     task_type=task.task_type,
-                    origin=execution.action.origin,
-                    destination=execution.action.destination,
+                    origin=origin,
+                    destination=destination,
                     instance=task.instance,
                     task_info=task_info.get(task.instance, {}))
 
@@ -297,18 +315,24 @@ class ConductorServerEndpoint(object):
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
     @staticmethod
-    def _check_endpoints(ctxt, origin, destination):
+    def _check_endpoints(ctxt, origin_endpoint, destination_endpoint):
         # TODO(alexpilotti): check Barbican secrets content as well
-        if origin.get("connection_info") == destination.get("connection_info"):
+        if (origin_endpoint.connection_info ==
+                destination_endpoint.connection_info):
             raise exception.SameDestination()
 
-    def create_instances_replica(self, ctxt, origin, destination, instances):
-        self._check_endpoints(ctxt, origin, destination)
+    def create_instances_replica(self, ctxt, origin_endpoint_id,
+                                 destination_endpoint_id,
+                                 destination_environment, instances):
+        origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
+        destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
+        self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
 
         replica = models.Replica()
         replica.id = str(uuid.uuid4())
-        replica.origin = origin
-        replica.destination = destination
+        replica.origin_endpoint = origin_endpoint
+        replica.destination_endpoint = destination_endpoint
+        replica.destination_environment = destination_environment
         replica.instances = instances
         replica.executions = []
         replica.info = {}
@@ -385,8 +409,9 @@ class ConductorServerEndpoint(object):
 
         migration = models.Migration()
         migration.id = str(uuid.uuid4())
-        migration.origin = replica.origin
-        migration.destination = replica.destination
+        migration.origin_endpoint_id = replica.origin_endpoint_id
+        migration.destination_endpoint_id = replica.destination_endpoint_id
+        migration.destination_environment = replica.destination_environment
         migration.instances = instances
         migration.replica = replica
         migration.info = replica.info
@@ -441,13 +466,18 @@ class ConductorServerEndpoint(object):
 
         return self.get_migration(ctxt, migration.id)
 
-    def migrate_instances(self, ctxt, origin, destination, instances):
-        self._check_endpoints(ctxt, origin, destination)
+    def migrate_instances(self, ctxt, origin_endpoint_id,
+                          destination_endpoint_id, destination_environment,
+                          instances):
+        origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
+        destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
+        self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
 
         migration = models.Migration()
         migration.id = str(uuid.uuid4())
-        migration.origin = origin
-        migration.destination = destination
+        migration.origin_endpoint = origin_endpoint
+        migration.destination_endpoint = destination_endpoint
+        migration.destination_environment = destination_environment
         execution = models.TasksExecution()
         execution.status = constants.EXECUTION_STATUS_RUNNING
         execution.number = 1
@@ -521,23 +551,32 @@ class ConductorServerEndpoint(object):
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
         if not has_running_tasks:
-            for task in execution.tasks:
-                if task.status in [constants.TASK_STATUS_PENDING,
-                                   constants.TASK_STATUS_ON_ERROR_ONLY]:
-                    if task.on_error:
-                        action = db_api.get_action(ctxt, execution.action_id)
-                        task_info = action.info.get(task.instance, {})
-
-                        self._rpc_worker_client.begin_task(
-                            ctxt, server=None,
-                            task_id=task.id,
-                            task_type=task.task_type,
-                            origin=action.origin,
-                            destination=action.destination,
-                            instance=task.instance,
-                            task_info=task_info)
-
-                        has_running_tasks = True
+            try:
+                origin = self._get_task_origin(ctxt, execution.action)
+                destination = self._get_task_destination(
+                    ctxt, execution.action)
+
+                for task in execution.tasks:
+                    if task.status in [constants.TASK_STATUS_PENDING,
+                                       constants.TASK_STATUS_ON_ERROR_ONLY]:
+                        if task.on_error:
+                            action = db_api.get_action(
+                                ctxt, execution.action_id)
+                            task_info = action.info.get(task.instance, {})
+
+                            self._rpc_worker_client.begin_task(
+                                ctxt, server=None,
+                                task_id=task.id,
+                                task_type=task.task_type,
+                                origin=origin,
+                                destination=destination,
+                                instance=task.instance,
+                                task_info=task_info)
+
+                            has_running_tasks = True
+            except exception.NotFound as ex:
+                LOG.error("A required endpoint could not be found")
+                LOG.exception(ex)
 
         if not has_running_tasks:
             self._set_tasks_execution_status(
@@ -557,6 +596,9 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
 
     def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
+        origin = self._get_task_origin(ctxt, execution.action)
+        destination = self._get_task_destination(ctxt, execution.action)
+
         for task in execution.tasks:
             if task.status == constants.TASK_STATUS_PENDING:
                 if task.depends_on and parent_task.id in task.depends_on:
@@ -575,13 +617,12 @@ class ConductorServerEndpoint(object):
                                 constants.TASK_TYPE_IMPORT_INSTANCE):
                             server = parent_task.host
 
-                        action = execution.action
                         self._rpc_worker_client.begin_task(
                             ctxt, server=server,
                             task_id=task.id,
                             task_type=task.task_type,
-                            origin=action.origin,
-                            destination=action.destination,
+                            origin=origin,
+                            destination=destination,
                             instance=task.instance,
                             task_info=task_info)
 

+ 4 - 3
coriolis/db/api.py

@@ -203,10 +203,11 @@ def get_migrations(context, include_tasks=False):
 
 def _get_tasks_with_details_options(query):
     return query.options(
-        orm.joinedload("tasks").
-        joinedload("progress_updates")).options(
+        orm.joinedload("action")).options(
             orm.joinedload("tasks").
-            joinedload("events"))
+            joinedload("progress_updates")).options(
+                orm.joinedload("tasks").
+                joinedload("events"))
 
 
 def _get_migration_task_query_options(query):

+ 11 - 2
coriolis/db/sqlalchemy/migrate_repo/versions/002_adds_endpoints.py

@@ -45,12 +45,21 @@ def upgrade(migrate_engine):
     base_transfer_action = sqlalchemy.Table(
         'base_transfer_action', meta, autoload=True)
 
+    # NOTE(alexpilotti) delete all records in base_transfer_action
+    # before performing this migration
     origin_endpoint_id = sqlalchemy.Column(
         "origin_endpoint_id", sqlalchemy.String(36),
-        sqlalchemy.ForeignKey('endpoint.id'), nullable=True)
+        sqlalchemy.ForeignKey('endpoint.id'), nullable=False)
     base_transfer_action.create_column(origin_endpoint_id)
 
     destination_endpoint_id = sqlalchemy.Column(
         "destination_endpoint_id", sqlalchemy.String(36),
-        sqlalchemy.ForeignKey('endpoint.id'), nullable=True)
+        sqlalchemy.ForeignKey('endpoint.id'), nullable=False)
     base_transfer_action.create_column(destination_endpoint_id)
+
+    destination_environment = sqlalchemy.Column(
+        "destination_environment", sqlalchemy.Text, nullable=True)
+    base_transfer_action.create_column(destination_environment)
+
+    base_transfer_action.drop_column("origin")
+    base_transfer_action.drop_column("destination")

+ 5 - 6
coriolis/db/sqlalchemy/models.py

@@ -95,8 +95,7 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
                                 primary_key=True)
     user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
-    origin = sqlalchemy.Column(types.Json, nullable=False)
-    destination = sqlalchemy.Column(types.Json, nullable=False)
+    destination_environment = sqlalchemy.Column(types.Json, nullable=True)
     type = sqlalchemy.Column(sqlalchemy.String(50))
     executions = orm.relationship(TasksExecution, cascade="all,delete",
                                   backref=orm.backref('action'),
@@ -107,10 +106,10 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
     info = sqlalchemy.Column(types.Json, nullable=False)
     origin_endpoint_id = sqlalchemy.Column(
         sqlalchemy.String(36),
-        sqlalchemy.ForeignKey('endpoint.id'), nullable=True)
+        sqlalchemy.ForeignKey('endpoint.id'), nullable=False)
     destination_endpoint_id = sqlalchemy.Column(
         sqlalchemy.String(36),
-        sqlalchemy.ForeignKey('endpoint.id'), nullable=True)
+        sqlalchemy.ForeignKey('endpoint.id'), nullable=False)
 
     __mapper_args__ = {
         'polymorphic_identity': 'base_transfer_action',
@@ -163,10 +162,10 @@ class Endpoint(BASE, models.TimestampMixin, models.ModelBase,
     name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     description = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
     origin_actions = orm.relationship(
-        BaseTransferAction, backref=orm.backref('origins'),
+        BaseTransferAction, backref=orm.backref('origin_endpoint'),
         primaryjoin="and_(BaseTransferAction.origin_endpoint_id==Endpoint.id, "
         "BaseTransferAction.deleted=='0')")
     destination_actions = orm.relationship(
-        BaseTransferAction, backref=orm.backref('destinations'),
+        BaseTransferAction, backref=orm.backref('destination_endpoint'),
         primaryjoin="and_(BaseTransferAction.destination_endpoint_id=="
         "Endpoint.id, BaseTransferAction.deleted=='0')")

+ 5 - 2
coriolis/migrations/api.py

@@ -8,9 +8,12 @@ class API(object):
     def __init__(self):
         self._rpc_client = rpc_client.ConductorClient()
 
-    def migrate_instances(self, ctxt, origin, destination, instances):
+    def migrate_instances(self, ctxt, origin_endpoint_id,
+                          destination_endpoint_id, destination_environment,
+                          instances):
         return self._rpc_client.migrate_instances(
-            ctxt, origin, destination, instances)
+            ctxt, origin_endpoint_id, destination_endpoint_id,
+            destination_environment, instances)
 
     def deploy_replica_instances(self, ctxt, replica_id, clone_disks=False,
                                  force=False):

+ 4 - 2
coriolis/replicas/api.py

@@ -8,9 +8,11 @@ class API(object):
     def __init__(self):
         self._rpc_client = rpc_client.ConductorClient()
 
-    def create(self, ctxt, origin, destination, instances):
+    def create(self, ctxt, origin_endpoint_id, destination_endpoint_id,
+               destination_environment, instances):
         return self._rpc_client.create_instances_replica(
-            ctxt, origin, destination, instances)
+            ctxt, origin_endpoint_id, destination_endpoint_id,
+            destination_environment, instances)
 
     def delete(self, ctxt, replica_id):
         self._rpc_client.delete_replica(ctxt, replica_id)