瀏覽代碼

Update conductor layer for Live Migrations and Deployments.

Update the conductor layer for separate handling of Live Migrations and
deployments.

Signed-off-by: Nashwan Azhari <nazhari@cloudbasesolutions.com>
Nashwan Azhari 2 年之前
父節點
當前提交
1c6d73fa58
共有 3 個文件被更改,包括 88 次插入6 次删除
  1. 24 2
      coriolis/conductor/rpc/client.py
  2. 62 4
      coriolis/conductor/rpc/server.py
  3. 2 0
      coriolis/constants.py

+ 24 - 2
coriolis/conductor/rpc/client.py

@@ -159,7 +159,9 @@ class ConductorClient(rpc.BaseRPCClient):
             ctxt, 'cancel_replica_tasks_execution', replica_id=replica_id,
             execution_id=execution_id, force=force)
 
-    def create_instances_replica(self, ctxt, origin_endpoint_id,
+    def create_instances_replica(self, ctxt,
+                                 replica_scenario,
+                                 origin_endpoint_id,
                                  destination_endpoint_id,
                                  origin_minion_pool_id,
                                  destination_minion_pool_id,
@@ -169,6 +171,7 @@ class ConductorClient(rpc.BaseRPCClient):
                                  notes=None, user_scripts=None):
         return self._call(
             ctxt, 'create_instances_replica',
+            replica_scenario=replica_scenario,
             origin_endpoint_id=origin_endpoint_id,
             destination_endpoint_id=destination_endpoint_id,
             origin_minion_pool_id=origin_minion_pool_id,
@@ -214,6 +217,17 @@ class ConductorClient(rpc.BaseRPCClient):
             ctxt, 'get_migration', migration_id=migration_id,
             include_task_info=include_task_info)
 
+    def get_deployments(self, ctxt, include_tasks=False,
+                        include_task_info=False):
+        return self._call(
+            ctxt, 'get_deployments', include_tasks=include_tasks,
+            include_task_info=include_task_info)
+
+    def get_deployment(self, ctxt, deployment_id, include_task_info=False):
+        return self._call(
+            ctxt, 'get_deployment', deployment_id=deployment_id,
+            include_task_info=include_task_info)
+
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,
                           destination_minion_pool_id,
@@ -262,6 +276,14 @@ class ConductorClient(rpc.BaseRPCClient):
         self._call(
             ctxt, 'cancel_migration', migration_id=migration_id, force=force)
 
+    def delete_deployment(self, ctxt, deployment_id):
+        self._call(
+            ctxt, 'delete_deployment', deployment_id=deployment_id)
+
+    def cancel_deployment(self, ctxt, deployment_id, force):
+        self._call(
+            ctxt, 'cancel_deployment', deployment_id=deployment_id, force=force)
+
     def set_task_host(self, ctxt, task_id, host):
         self._call(
             ctxt, 'set_task_host', task_id=task_id, host=host)
@@ -460,7 +482,7 @@ class ConductorTaskRpcEventHandler(events.BaseEventHandler):
         return self._rpc_conductor_client_instance
 
     @classmethod
-    def get_progress_update_identifier(self, progress_update):
+    def get_progress_update_identifier(cls, progress_update):
         return progress_update['index']
 
     def add_progress_update(

+ 62 - 4
coriolis/conductor/rpc/server.py

@@ -123,6 +123,18 @@ def migration_synchronized(func):
     return wrapper
 
 
+def deployment_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, deployment_id, *args, **kwargs):
+        @lockutils.synchronized(
+            constants.DEPLOYMENT_LOCK_NAME_FORMAT % deployment_id,
+            external=True)
+        def inner():
+            return func(self, ctxt, deployment_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
 def tasks_execution_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
@@ -1097,6 +1109,12 @@ class ConductorServerEndpoint(object):
     def delete_replica(self, ctxt, replica_id):
         replica = self._get_replica(ctxt, replica_id)
         self._check_replica_running_executions(ctxt, replica)
+        # TODO(aznashwan): update reservation deletion logic if
+        # the Replica was never successfully deployed and its
+        # disks were deleted.
+        # This might not be possible if its executions were deleted,
+        # but might be possible to set the new 'fulfilled' field within
+        # the reservation on the licensing server after a successful execution.
         self._check_delete_reservation_for_transfer(replica)
         db_api.delete_replica(ctxt, replica_id)
 
@@ -1165,7 +1183,8 @@ class ConductorServerEndpoint(object):
                 destination_endpoint.connection_info)):
             raise exception.SameDestination()
 
-    def create_instances_replica(self, ctxt, origin_endpoint_id,
+    def create_instances_replica(self, ctxt, replica_scenario,
+                                 origin_endpoint_id,
                                  destination_endpoint_id,
                                  origin_minion_pool_id,
                                  destination_minion_pool_id,
@@ -1174,6 +1193,14 @@ class ConductorServerEndpoint(object):
                                  destination_environment, instances,
                                  network_map, storage_mappings, notes=None,
                                  user_scripts=None):
+        supported_scenarios = [
+            constants.REPLICA_SCENARIO_REPLICA,
+            constants.REPLICA_SCENARIO_LIVE_MIGRATION]
+        if replica_scenario not in supported_scenarios:
+            raise exception.InvalidInput(
+                message=f"Unsupported Replica scenario '{replica_scenario}'. "
+                        f"Must be one of: {supported_scenarios}")
+
         origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
         destination_endpoint = self.get_endpoint(
             ctxt, destination_endpoint_id)
@@ -1182,6 +1209,7 @@ class ConductorServerEndpoint(object):
         replica = models.Replica()
         replica.id = str(uuid.uuid4())
         replica.base_id = replica.id
+        replica.scenario = replica_scenario
         replica.origin_endpoint_id = origin_endpoint_id
         replica.origin_minion_pool_id = origin_minion_pool_id
         replica.destination_endpoint_id = destination_endpoint_id
@@ -1202,6 +1230,8 @@ class ConductorServerEndpoint(object):
 
         self._check_minion_pools_for_action(ctxt, replica)
 
+        # TODO(aznashwan): add scenario-appropriate steps for
+        # defining the Replica reservation:
         self._check_create_reservation_for_transfer(
             replica, licensing_client.RESERVATION_TYPE_REPLICA)
 
@@ -1232,6 +1262,20 @@ class ConductorServerEndpoint(object):
             ctxt, migration_id, include_task_info=include_task_info,
             to_dict=True)
 
+    def get_deployments(self, ctxt, include_tasks,
+                        include_task_info=False):
+        return db_api.get_migrations(
+            ctxt, include_tasks,
+            include_task_info=include_task_info,
+            replica_migrations_only=True,
+            to_dict=True)
+
+    @deployment_synchronized
+    def get_deployment(self, ctxt, deployment_id, include_task_info=False):
+        return self._get_migration(
+            ctxt, deployment_id, include_task_info=include_task_info,
+            to_dict=True)
+
     @staticmethod
     def _check_running_replica_migrations(ctxt, replica_id):
         migrations = db_api.get_replica_migrations(ctxt, replica_id)
@@ -2125,8 +2169,7 @@ class ConductorServerEndpoint(object):
                 "Migration with ID '%s' not found." % migration_id)
         return migration
 
-    @migration_synchronized
-    def delete_migration(self, ctxt, migration_id):
+    def _delete_migration(self, ctxt, migration_id):
         migration = self._get_migration(ctxt, migration_id)
         execution = migration.executions[0]
         if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
@@ -2136,7 +2179,14 @@ class ConductorServerEndpoint(object):
         db_api.delete_migration(ctxt, migration_id)
 
     @migration_synchronized
-    def cancel_migration(self, ctxt, migration_id, force):
+    def delete_migration(self, ctxt, migration_id):
+        self._delete_migration(ctxt, migration_id)
+
+    @deployment_synchronized
+    def delete_deployment(self, ctxt, deployment_id):
+        self._delete_migration(ctxt, deployment_id)
+
+    def _cancel_migration(self, ctxt, migration_id, force):
         migration = self._get_migration(ctxt, migration_id)
         if len(migration.executions) != 1:
             raise exception.InvalidMigrationState(
@@ -2157,6 +2207,14 @@ class ConductorServerEndpoint(object):
                 external=True):
             self._cancel_tasks_execution(ctxt, execution, force=force)
 
+    @migration_synchronized
+    def cancel_migration(self, ctxt, migration_id, force):
+        self._cancel_migration(ctxt, migration_id, force)
+
+    @deployment_synchronized
+    def cancel_deployment(self, ctxt, deployment_id, force):
+        self._cancel_migration(ctxt, deployment_id, force)
+
     def _cancel_tasks_execution(
             self, ctxt, execution, requery=True, force=False):
         """ Cancels a running Execution by:

+ 2 - 0
coriolis/constants.py

@@ -307,6 +307,8 @@ TASKFLOW_LOCK_NAME_FORMAT = "taskflow-%s"
 EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
 ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
 MIGRATION_LOCK_NAME_FORMAT = "migration-%s"
+# NOTE(aznashwan): intentionately left identical to Migration locks.
+DEPLOYMENT_LOCK_NAME_FORMAT = "migration-%s"
 REPLICA_LOCK_NAME_FORMAT = "replica-%s"
 SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
 REGION_LOCK_NAME_FORMAT = "region-%s"