Просмотр исходного кода

Fix remote machine allocations on pool manager.

Nashwan Azhari 5 лет назад
Родитель
Сommit
2e10db884f

+ 28 - 25
coriolis/conductor/rpc/server.py

@@ -583,19 +583,22 @@ class ConductorServerEndpoint(object):
     def _get_worker_service_rpc_for_task(
             self, ctxt, task, origin_endpoint, destination_endpoint,
             retry_count=5, retry_period=2, random_choice=True):
+        worker_service = None
         try:
             worker_service = self._scheduler_client.get_worker_service_for_task(
                 ctxt, {"id": task.id, "task_type": task.task_type},
-                origin_endpoint.to_dict(), destination_endpoint.to_dict(),
+                origin_endpoint, destination_endpoint,
                 retry_count=retry_count, retry_period=retry_period,
                 random_choice=random_choice)
         except Exception as ex:
             LOG.debug(
                 "Failed to get worker service for task '%s'. Updating status "
-                "to unscheduleable.")
+                "to unscheduleable. Error trace was: %s",
+                task.id, utils.get_exception_details())
             db_api.set_task_status(
                 ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
                 exception_details=str(ex))
+            raise
 
         return self._get_rpc_client_for_service(worker_service)
 
@@ -868,11 +871,11 @@ class ConductorServerEndpoint(object):
             validate_source_minion_task = None
             if instance_source_minion:
                 replica.info[instance].update({
-                    "source_minion_machine_id": instance_source_minion.id,
+                    "source_minion_machine_id": instance_source_minion['id'],
                     "source_minion_provider_properties": (
-                        instance_source_minion.provider_properties),
+                        instance_source_minion['provider_properties']),
                     "source_minion_connection_info": (
-                        instance_source_minion.connection_info)})
+                        instance_source_minion['connection_info'])})
                 validate_source_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
@@ -889,13 +892,13 @@ class ConductorServerEndpoint(object):
             validate_target_minion_task = None
             if instance_target_minion:
                 replica.info[instance].update({
-                    "target_minion_machine_id": instance_target_minion.id,
+                    "target_minion_machine_id": instance_target_minion['id'],
                     "target_minion_provider_properties": (
-                        instance_target_minion.provider_properties),
+                        instance_target_minion['provider_properties']),
                     "target_minion_connection_info": (
-                        instance_target_minion.connection_info),
+                        instance_target_minion['connection_info']),
                     "target_minion_backup_writer_connection_info": (
-                        instance_target_minion.backup_writer_connection_info)})
+                        instance_target_minion['backup_writer_connection_info'])})
                 validate_target_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY,
@@ -1354,11 +1357,11 @@ class ConductorServerEndpoint(object):
             last_validation_task = validate_replica_deployment_inputs_task
             if not skip_os_morphing and instance_osmorphing_minion:
                 migration.info[instance].update({
-                    "osmorphing_minion_machine_id": instance_osmorphing_minion.id,
+                    "osmorphing_minion_machine_id": instance_osmorphing_minion['id'],
                     "osmorphing_minion_provider_properties": (
-                        instance_osmorphing_minion.provider_properties),
+                        instance_osmorphing_minion['provider_properties']),
                     "osmorphing_minion_connection_info": (
-                        instance_osmorphing_minion.connection_info)})
+                        instance_osmorphing_minion['connection_info'])})
                 validate_osmorphing_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
@@ -1514,16 +1517,16 @@ class ConductorServerEndpoint(object):
             self, ctxt, action, include_transfer_minions=True,
             include_osmorphing_minions=True):
         return self._minion_manager_client.allocate_minion_machines_for_action(
-            ctxt, action.id, include_transfer_minions=include_transfer_minions,
+            ctxt, action, include_transfer_minions=include_transfer_minions,
             include_osmorphing_minions=include_osmorphing_minions)
 
     def _deallocate_minion_machines_for_action(self, ctxt, action):
         return self._minion_manager_client.deallocate_minion_machines_for_action(
-            ctxt, action.id)
+            ctxt, action)
 
     def _check_minion_pools_for_action(self, ctxt, action):
         return self._minion_manager_client.validate_minion_pool_selections_for_action(
-            ctxt, action.id)
+            ctxt, action)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,
@@ -1624,11 +1627,11 @@ class ConductorServerEndpoint(object):
                 validate_migration_source_inputs_task.id]
             if instance_source_minion:
                 migration.info[instance].update({
-                    "source_minion_machine_id": instance_source_minion.id,
+                    "source_minion_machine_id": instance_source_minion['id'],
                     "source_minion_provider_properties": (
-                        instance_source_minion.provider_properties),
+                        instance_source_minion['provider_properties']),
                     "source_minion_connection_info": (
-                        instance_source_minion.connection_info)})
+                        instance_source_minion['connection_info'])})
                 validate_source_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
@@ -1655,13 +1658,13 @@ class ConductorServerEndpoint(object):
             deploy_migration_target_resources_task = None
             if instance_target_minion:
                 migration.info[instance].update({
-                    "target_minion_machine_id": instance_target_minion.id,
+                    "target_minion_machine_id": instance_target_minion['id'],
                     "target_minion_provider_properties": (
-                        instance_target_minion.provider_properties),
+                        instance_target_minion['provider_properties']),
                     "target_minion_connection_info": (
-                        instance_target_minion.connection_info),
+                        instance_target_minion['connection_info']),
                     "target_minion_backup_writer_connection_info": (
-                        instance_target_minion.backup_writer_connection_info)})
+                        instance_target_minion['backup_writer_connection_info'])})
                 ttyp = (
                     constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY)
                 validate_target_minion_task = self._create_task(
@@ -1687,11 +1690,11 @@ class ConductorServerEndpoint(object):
             validate_osmorphing_minion_task = None
             if not skip_os_morphing and instance_osmorphing_minion:
                 migration.info[instance].update({
-                    "osmorphing_minion_machine_id": instance_osmorphing_minion.id,
+                    "osmorphing_minion_machine_id": instance_osmorphing_minion['id'],
                     "osmorphing_minion_provider_properties": (
-                        instance_osmorphing_minion.provider_properties),
+                        instance_osmorphing_minion['provider_properties']),
                     "osmorphing_minion_connection_info": (
-                        instance_osmorphing_minion.connection_info)})
+                        instance_osmorphing_minion['connection_info'])})
                 validate_osmorphing_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,

+ 3 - 0
coriolis/constants.py

@@ -294,6 +294,7 @@ SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
 REGION_LOCK_NAME_FORMAT = "region-%s"
 SERVICE_LOCK_NAME_FORMAT = "service-%s"
 MINION_POOL_LOCK_NAME_FORMAT = "minion-pool-%s"
+MINION_MACHINE_LOCK_NAME_FORMAT = "minion-pool-%s-machine-%s"
 
 EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
     EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT,
@@ -336,6 +337,7 @@ MINION_POOL_STATUS_DEALLOCATING_MACHINES = "DEALLOCATING_MACHINES"
 MINION_POOL_STATUS_DEALLOCATING_SHARED_RESOURCES = (
     "DEALLOCATING_SHARED_RESOURCES")
 MINION_POOL_STATUS_ALLOCATED = "ALLOCATED"
+MINION_POOL_STATUS_POOL_MAINTENANCE = "IN_MAINTENANCE"
 
 ACTIVE_MINION_POOL_STATUSES = [
     MINION_POOL_STATUS_VALIDATING_INPUTS,
@@ -348,6 +350,7 @@ MINION_MACHINE_IDENTIFIER_FORMAT = "coriolis-pool-%(pool_id)s-minion-%(minion_id
 MINION_MACHINE_STATUS_UNKNOWN = "UNKNOWN"
 MINION_MACHINE_STATUS_DEPLOYING = "DEPLOYING"
 MINION_MACHINE_STATUS_ERROR = "ERROR"
+MINION_MACHINE_STATUS_ERROR_DEPLOYING = "ERROR_DEPLOYING"
 MINION_MACHINE_STATUS_UNINITIALIZED = "UNINITIALIZED"
 MINION_MACHINE_STATUS_RECONFIGURING = "RECONFIGURING"
 MINION_MACHINE_STATUS_AVAILABLE = "AVAILABLE"

+ 6 - 6
coriolis/minion_manager/rpc/client.py

@@ -52,22 +52,22 @@ class MinionManagerClient(object):
     def get_diagnostics(self, ctxt):
         return self._client.call(ctxt, 'get_diagnostics')
 
-    def validate_minion_pool_selections_for_action(self, ctxt, action_id):
+    def validate_minion_pool_selections_for_action(self, ctxt, action):
         return self._client.call(
             ctxt, 'validate_minion_pool_selections_for_action',
-            action_id=action_id)
+            action=action)
 
     def allocate_minion_machines_for_action(
-            self, ctxt, action_id, include_transfer_minions=True,
+            self, ctxt, action, include_transfer_minions=True,
             include_osmorphing_minions=True):
         return self._client.call(
-            ctxt, 'allocate_minion_machines_for_action', action_id=action_id,
+            ctxt, 'allocate_minion_machines_for_action', action=action,
             include_transfer_minions=include_transfer_minions,
             include_osmorphing_minions=include_osmorphing_minions)
 
-    def deallocate_minion_machines_for_action(self, ctxt, action_id):
+    def deallocate_minion_machines_for_action(self, ctxt, action):
         return self._client.call(
-            ctxt, 'deallocate_minion_machines_for_action', action_id=action_id)
+            ctxt, 'deallocate_minion_machines_for_action', action=action)
 
     def create_minion_pool(
             self, ctxt, name, endpoint_id, pool_platform, pool_os_type,

+ 175 - 112
coriolis/minion_manager/rpc/server.py

@@ -2,7 +2,6 @@
 # All Rights Reserved.
 
 import contextlib
-import functools
 import itertools
 import uuid
 
@@ -19,8 +18,8 @@ from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
 from coriolis.minion_manager.rpc import tasks as minion_manager_tasks
+from coriolis.minion_manager.rpc import utils as minion_manager_utils
 from coriolis.scheduler.rpc import client as rpc_scheduler_client
-from coriolis.taskflow import base as coriolis_taskflow_base
 from coriolis.taskflow import runner as taskflow_runner
 from coriolis.worker.rpc import client as rpc_worker_client
 
@@ -35,18 +34,6 @@ CONF = cfg.CONF
 CONF.register_opts(MINION_MANAGER_OPTS, 'minion_manager')
 
 
-def minion_pool_synchronized(func):
-    @functools.wraps(func)
-    def wrapper(self, ctxt, minion_pool_id, *args, **kwargs):
-        @lockutils.synchronized(
-            constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
-            external=True)
-        def inner():
-            return func(self, ctxt, minion_pool_id, *args, **kwargs)
-        return inner()
-    return wrapper
-
-
 class MinionManagerServerEndpoint(object):
 
     @property
@@ -136,14 +123,14 @@ class MinionManagerServerEndpoint(object):
         return worker_rpc.validate_endpoint_destination_minion_pool_options(
             ctxt, endpoint['type'], pool_environment)
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def add_minion_pool_event(self, ctxt, minion_pool_id, level, message):
         LOG.info(
             "Minion pool event for pool %s: %s", minion_pool_id, message)
         pool = db_api.get_minion_pool(ctxt, minion_pool_id)
         db_api.add_minion_pool_event(ctxt, pool.id, level, message)
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def add_minion_pool_progress_update(
             self, ctxt, minion_pool_id, total_steps, message):
         LOG.info(
@@ -151,19 +138,35 @@ class MinionManagerServerEndpoint(object):
         db_api.add_minion_pool_progress_update(
             ctxt, minion_pool_id, total_steps, message)
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def update_minion_pool_progress_update(
             self, ctxt, minion_pool_id, step, total_steps, message):
         LOG.info("Updating minion pool progress update: %s", minion_pool_id)
         db_api.update_minion_pool_progress_update(
             ctxt, minion_pool_id, step, total_steps, message)
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
         return db_api.get_minion_pool_progress_step(ctxt, minion_pool_id)
 
-    def validate_minion_pool_selections_for_action(self, ctxt, action_id):
-        action = db_api.get_action(ctxt, action_id)
+    def validate_minion_pool_selections_for_action(self, ctxt, action):
+        if not isinstance(action, dict):
+            raise exception.InvalidInput(
+                "Action must be a dict, got '%s': %s" % (
+                    type(action), action))
+        required_action_properties = [
+            'id', 'origin_endpoint_id', 'destination_endpoint_id',
+            'origin_minion_pool_id', 'destination_minion_pool_id',
+            'instance_osmorphing_minion_pool_mappings']
+        missing = [
+            prop for prop in required_action_properties
+            if prop not in action]
+        if missing:
+            raise exception.InvalidInput(
+                "Missing the following required action properties for "
+                "minion pool selection validation: %s. Got %s" % (
+                    missing, action))
+
         minion_pools = {
             pool.id: pool
             for pool in db_api.get_minion_pools(
@@ -175,102 +178,103 @@ class MinionManagerServerEndpoint(object):
                     "Could not find minion pool with ID '%s'." % pool_id)
             return pool
 
-        if action.origin_minion_pool_id:
-            origin_pool = _get_pool(action.origin_minion_pool_id)
-            if origin_pool.endpoint_id != action.origin_endpoint_id:
+        if action['origin_minion_pool_id']:
+            origin_pool = _get_pool(action['origin_minion_pool_id'])
+            if origin_pool.endpoint_id != action['origin_endpoint_id']:
                 raise exception.InvalidMinionPoolSelection(
                     "The selected origin minion pool ('%s') belongs to a "
                     "different Coriolis endpoint ('%s') than the requested "
                     "origin endpoint ('%s')" % (
-                        action.origin_minion_pool_id,
+                        action['origin_minion_pool_id'],
                         origin_pool.endpoint_id,
-                        action.origin_endpoint_id))
-            if origin_pool.pool_platform != constants.PROVIDER_PLATFORM_SOURCE:
+                        action['origin_endpoint_id']))
+            if origin_pool.platform != constants.PROVIDER_PLATFORM_SOURCE:
                 raise exception.InvalidMinionPoolSelection(
                     "The selected origin minion pool ('%s') is configured as a"
                     " '%s' pool. The pool must be of type %s to be used for "
                     "data exports." % (
-                        action.origin_minion_pool_id,
-                        origin_pool.pool_platform,
+                        action['origin_minion_pool_id'],
+                        origin_pool.platform,
                         constants.PROVIDER_PLATFORM_SOURCE))
-            if origin_pool.pool_os_type != constants.OS_TYPE_LINUX:
+            if origin_pool.os_type != constants.OS_TYPE_LINUX:
                 raise exception.InvalidMinionPoolSelection(
                     "The selected origin minion pool ('%s') is of OS type '%s'"
                     " instead of the Linux OS type required for a source "
                     "transfer minion pool." % (
-                        action.origin_minion_pool_id,
-                        origin_pool.pool_os_type))
+                        action['origin_minion_pool_id'],
+                        origin_pool.os_type))
             LOG.debug(
                 "Successfully validated compatibility of origin minion pool "
                 "'%s' for use with action '%s'." % (
-                    action.origin_minion_pool_id, action.id))
+                    action['origin_minion_pool_id'], action['id']))
 
-        if action.destination_minion_pool_id:
-            destination_pool = _get_pool(action.destination_minion_pool_id)
+        if action['destination_minion_pool_id']:
+            destination_pool = _get_pool(action['destination_minion_pool_id'])
             if destination_pool.endpoint_id != (
-                    action.destination_endpoint_id):
+                    action['destination_endpoint_id']):
                 raise exception.InvalidMinionPoolSelection(
                     "The selected destination minion pool ('%s') belongs to a "
                     "different Coriolis endpoint ('%s') than the requested "
                     "destination endpoint ('%s')" % (
-                        action.destination_minion_pool_id,
+                        action['destination_minion_pool_id'],
                         destination_pool.endpoint_id,
-                        action.destination_endpoint_id))
-            if destination_pool.pool_platform != (
+                        action['destination_endpoint_id']))
+            if destination_pool.platform != (
                     constants.PROVIDER_PLATFORM_DESTINATION):
                 raise exception.InvalidMinionPoolSelection(
                     "The selected destination minion pool ('%s') is configured"
                     " as a '%s'. The pool must be of type %s to be used for "
                     "data imports." % (
-                        action.destination_minion_pool_id,
-                        destination_pool.pool_platform,
+                        action['destination_minion_pool_id'],
+                        destination_pool.platform,
                         constants.PROVIDER_PLATFORM_DESTINATION))
-            if destination_pool.pool_os_type != constants.OS_TYPE_LINUX:
+            if destination_pool.os_type != constants.OS_TYPE_LINUX:
                 raise exception.InvalidMinionPoolSelection(
                     "The selected destination minion pool ('%s') is of OS type"
                     " '%s' instead of the Linux OS type required for a source "
                     "transfer minion pool." % (
-                        action.destination_minion_pool_id,
-                        destination_pool.pool_os_type))
+                        action['destination_minion_pool_id'],
+                        destination_pool.os_type))
             LOG.debug(
                 "Successfully validated compatibility of destination minion "
                 "pool '%s' for use with action '%s'." % (
-                    action.origin_minion_pool_id, action.id))
+                    action['origin_minion_pool_id'], action['id']))
 
-        if action.instance_osmorphing_minion_pool_mappings:
+        if action.get('instance_osmorphing_minion_pool_mappings'):
             osmorphing_pool_mappings = {
                 instance_id: pool_id
                 for (instance_id, pool_id) in (
-                    action.instance_osmorphing_minion_pool_mappings.items())
+                    action.get(
+                        'instance_osmorphing_minion_pool_mappings').items())
                 if pool_id}
             for (instance, pool_id) in osmorphing_pool_mappings.items():
                 osmorphing_pool = _get_pool(pool_id)
                 if osmorphing_pool.endpoint_id != (
-                        action.destination_endpoint_id):
+                        action['destination_endpoint_id']):
                     raise exception.InvalidMinionPoolSelection(
                         "The selected OSMorphing minion pool for instance '%s'"
                         " ('%s') belongs to a different Coriolis endpoint "
                         "('%s') than the destination endpoint ('%s')" % (
                             instance, pool_id,
                             osmorphing_pool.endpoint_id,
-                            action.destination_endpoint_id))
-                if osmorphing_pool.pool_platform != (
+                            action['destination_endpoint_id']))
+                if osmorphing_pool.platform != (
                         constants.PROVIDER_PLATFORM_DESTINATION):
                     raise exception.InvalidMinionPoolSelection(
                         "The selected OSMorphing minion pool for instance '%s'"
                         "  ('%s') is configured as a '%s' pool. The pool must "
                         "be of type %s to be used for OSMorphing." % (
                             instance, pool_id,
-                            osmorphing_pool.pool_platform,
+                            osmorphing_pool.platform,
                             constants.PROVIDER_PLATFORM_DESTINATION))
                 LOG.debug(
                     "Successfully validated compatibility of destination "
                     "minion pool '%s' for use as OSMorphing minion for "
                     "instance '%s' during action '%s'." % (
-                        action.origin_minion_pool_id, instance, action.id))
+                        pool_id, instance, action['id']))
 
     def allocate_minion_machines_for_action(
-            self, ctxt, action_id, include_transfer_minions=True,
+            self, ctxt, action, include_transfer_minions=True,
             include_osmorphing_minions=True):
         """ Returns a dict of the form:
         {
@@ -281,18 +285,34 @@ class MinionManagerServerEndpoint(object):
             }
         }
         """
-        action = db_api.get_action(ctxt, action_id)
+        if not isinstance(action, dict):
+            raise exception.InvalidInput(
+                "Action must be a dict, got '%s': %s" % (
+                    type(action), action))
+        required_action_properties = [
+            'id', 'instances', 'origin_minion_pool_id',
+            'destination_minion_pool_id',
+            'instance_osmorphing_minion_pool_mappings']
+        missing = [
+            prop for prop in required_action_properties
+            if prop not in action]
+        if missing:
+            raise exception.InvalidInput(
+                "Missing the following required action properties for "
+                "minion pool machine allocation: %s. Got %s" % (
+                    missing, action))
+
         instance_machine_allocations = {
-            instance: {} for instance in action.instances}
+            instance: {} for instance in action['instances']}
 
         minion_pool_ids = set()
-        if action.origin_minion_pool_id:
-            minion_pool_ids.add(action.origin_minion_pool_id)
-        if action.destination_minion_pool_id:
-            minion_pool_ids.add(action.destination_minion_pool_id)
-        if action.instance_osmorphing_minion_pool_mappings:
+        if action['origin_minion_pool_id']:
+            minion_pool_ids.add(action['origin_minion_pool_id'])
+        if action['destination_minion_pool_id']:
+            minion_pool_ids.add(action['destination_minion_pool_id'])
+        if action['instance_osmorphing_minion_pool_mappings']:
             minion_pool_ids = minion_pool_ids.union(set(
-                action.instance_osmorphing_minion_pool_mappings.values()))
+                action['instance_osmorphing_minion_pool_mappings'].values()))
         if None in minion_pool_ids:
             minion_pool_ids.remove(None)
 
@@ -300,12 +320,12 @@ class MinionManagerServerEndpoint(object):
             LOG.debug(
                 "No minion pool settings found for action '%s'. "
                 "Skipping minion machine allocations." % (
-                    action.id))
+                    action['id']))
             return instance_machine_allocations
 
         LOG.debug(
             "All minion pool selections for action '%s': %s",
-            action.id, minion_pool_ids)
+            action['id'], minion_pool_ids)
 
         def _select_machine(minion_pool, exclude=None):
             if not minion_pool.minion_machines:
@@ -341,7 +361,7 @@ class MinionManagerServerEndpoint(object):
             return selected_machine
 
         osmorphing_pool_map = (
-            action.instance_osmorphing_minion_pool_mappings)
+            action['instance_osmorphing_minion_pool_mappings'])
         with contextlib.ExitStack() as stack:
             _ = [
                 stack.enter_context(
@@ -377,12 +397,12 @@ class MinionManagerServerEndpoint(object):
             allocated_source_machine_ids = set()
             allocated_target_machine_ids = set()
             allocated_osmorphing_machine_ids = set()
-            for instance in action.instances:
+            for instance in action['instances']:
 
                 if include_transfer_minions:
-                    if action.origin_minion_pool_id:
+                    if action['origin_minion_pool_id']:
                         origin_pool = minion_pool_id_mappings[
-                            action.origin_minion_pool_id]
+                            action['origin_minion_pool_id']]
                         machine = _select_machine(
                             origin_pool, exclude=allocated_source_machine_ids)
                         allocated_source_machine_ids.add(machine.id)
@@ -391,11 +411,11 @@ class MinionManagerServerEndpoint(object):
                         LOG.debug(
                             "Selected minion machine '%s' for source-side "
                             "syncing of instance '%s' as part of transfer "
-                            "action '%s'.", machine.id, instance, action.id)
+                            "action '%s'.", machine.id, instance, action['id'])
 
-                    if action.destination_minion_pool_id:
+                    if action['destination_minion_pool_id']:
                         dest_pool = minion_pool_id_mappings[
-                            action.destination_minion_pool_id]
+                            action['destination_minion_pool_id']]
                         machine = _select_machine(
                             dest_pool, exclude=allocated_target_machine_ids)
                         allocated_target_machine_ids.add(machine.id)
@@ -404,26 +424,26 @@ class MinionManagerServerEndpoint(object):
                         LOG.debug(
                             "Selected minion machine '%s' for target-side "
                             "syncing of instance '%s' as part of transfer "
-                            "action '%s'.", machine.id, instance, action.id)
+                            "action '%s'.", machine.id, instance, action['id'])
 
                 if include_osmorphing_minions:
                     if instance not in osmorphing_pool_map:
                         LOG.debug(
                             "Instance '%s' is not listed in the OSMorphing "
                             "minion pool mappings for action '%s'." % (
-                                instance, action.id))
+                                instance, action['id']))
                     elif osmorphing_pool_map[instance] is None:
                         LOG.debug(
                             "OSMorphing pool ID for instance '%s' is "
                             "None in action '%s'. Ignoring." % (
-                                instance, action.id))
+                                instance, action['id']))
                     else:
                         osmorphing_pool_id = osmorphing_pool_map[instance]
                         # if the selected target and OSMorphing pools
                         # are the same, reuse the same worker:
                         ima = instance_machine_allocations[instance]
                         if osmorphing_pool_id == (
-                                action.destination_minion_pool_id) and (
+                                action['destination_minion_pool_id']) and (
                                     'target_minion' in ima):
                             allocated_target_machine = ima[
                                 'target_minion']
@@ -432,7 +452,7 @@ class MinionManagerServerEndpoint(object):
                                 "OSMorphing of instance '%s' as port of "
                                 "transfer action '%s'",
                                 allocated_target_machine.id, instance,
-                                action.id)
+                                action['id'])
                             instance_machine_allocations[
                                 instance]['osmorphing_minion'] = (
                                     allocated_target_machine)
@@ -450,7 +470,7 @@ class MinionManagerServerEndpoint(object):
                                 "Selected minion machine '%s' for OSMorphing "
                                 " of instance '%s' as part of transfer "
                                 "action '%s'.",
-                                machine.id, instance, action.id)
+                                machine.id, instance, action['id'])
 
             # mark the selected machines as allocated:
             all_machine_ids = set(itertools.chain(
@@ -458,7 +478,7 @@ class MinionManagerServerEndpoint(object):
                 allocated_target_machine_ids,
                 allocated_osmorphing_machine_ids))
             db_api.set_minion_machines_allocation_statuses(
-                ctxt, all_machine_ids, action.id,
+                ctxt, all_machine_ids, action['id'],
                 constants.MINION_MACHINE_STATUS_ALLOCATED)
 
         # filter out redundancies:
@@ -469,23 +489,39 @@ class MinionManagerServerEndpoint(object):
 
         LOG.debug(
             "Allocated the following minion machines for action '%s': %s",
-            action.id, {
+            action['id'], {
                 instance: {
                     typ: machine.id
                     for (typ, machine) in allocation.items()}
                 for (instance, allocation) in instance_machine_allocations.items()})
         return instance_machine_allocations
 
-    def deallocate_minion_machines_for_action(self, ctxt, action_id):
-        action = db_api.get_action(ctxt, action_id)
+    def deallocate_minion_machines_for_action(self, ctxt, action):
+        if not isinstance(action, dict):
+            raise exception.InvalidInput(
+                "Action must be a dict, got '%s': %s" % (
+                    type(action), action))
+        required_action_properties = [
+            'id', 'instances', 'origin_minion_pool_id',
+            'destination_minion_pool_id',
+            'instance_osmorphing_minion_pool_mappings']
+        missing = [
+            prop for prop in required_action_properties
+            if prop not in action]
+        if missing:
+            raise exception.InvalidInput(
+                "Missing the following required action properties for "
+                "minion pool machine allocation: %s. Got %s" % (
+                    missing, action))
+
         minion_pool_ids = set()
-        if action.origin_minion_pool_id:
-            minion_pool_ids.add(action.origin_minion_pool_id)
-        if action.destination_minion_pool_id:
-            minion_pool_ids.add(action.destination_minion_pool_id)
-        if action.instance_osmorphing_minion_pool_mappings:
+        if action['origin_minion_pool_id']:
+            minion_pool_ids.add(action['origin_minion_pool_id'])
+        if action['destination_minion_pool_id']:
+            minion_pool_ids.add(action['destination_minion_pool_id'])
+        if action['instance_osmorphing_minion_pool_mappings']:
             minion_pool_ids = minion_pool_ids.union(set(
-                action.instance_osmorphing_minion_pool_mappings.values()))
+                action['instance_osmorphing_minion_pool_mappings'].values()))
         if None in minion_pool_ids:
             minion_pool_ids.remove(None)
 
@@ -493,12 +529,12 @@ class MinionManagerServerEndpoint(object):
             LOG.debug(
                 "No minion pools seem to have been used for action with "
                 "base_id '%s'. Skipping minion machine deallocation.",
-                action.base_id)
+                action['id'])
         else:
             LOG.debug(
                 "Attempting to deallocate all minion pool machine selections "
                 "for action '%s'. Afferent pools are: %s",
-                action.base_id, minion_pool_ids)
+                action['id'], minion_pool_ids)
 
             with contextlib.ExitStack() as stack:
                 _ = [
@@ -509,19 +545,19 @@ class MinionManagerServerEndpoint(object):
                     for pool_id in minion_pool_ids]
 
                 minion_machines = db_api.get_minion_machines(
-                    ctxt, allocated_action_id=action.base_id)
+                    ctxt, allocated_action_id=action['id'])
                 machine_ids = [m.id for m in minion_machines]
                 if machine_ids:
                     LOG.info(
                         "Releasing the following minion machines for "
-                        "action '%s': %s", action.base_id, machine_ids)
+                        "action '%s': %s", action['base_id'], machine_ids)
                     db_api.set_minion_machines_allocation_statuses(
                         ctxt, machine_ids, None,
                         constants.MINION_MACHINE_STATUS_AVAILABLE)
                 else:
                     LOG.debug(
                         "No minion machines were found to be associated "
-                        "with action with base_id '%s'.", action.base_id)
+                        "with action with base_id '%s'.", action['base_id'])
 
     def _get_minion_pool_allocation_flow(self, minion_pool):
         """ Returns a taskflow.Flow object pertaining to all the tasks
@@ -535,7 +571,8 @@ class MinionManagerServerEndpoint(object):
 
         # tansition pool to VALIDATING:
         allocation_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
-            minion_pool.id, constants.MINION_POOL_STATUS_VALIDATING_INPUTS))
+            minion_pool.id, constants.MINION_POOL_STATUS_VALIDATING_INPUTS,
+            status_to_revert_to=constants.MINION_POOL_STATUS_ERROR))
 
         # add pool options validation task:
         allocation_flow.add(minion_manager_tasks.ValidateMinionPoolOptionsTask(
@@ -668,7 +705,7 @@ class MinionManagerServerEndpoint(object):
                         mch.id: mch.status for mch in used_machines}))
         return used_machines
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def allocate_minion_pool(self, ctxt, minion_pool_id):
         LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
         minion_pool = self._get_minion_pool(
@@ -679,7 +716,8 @@ class MinionManagerServerEndpoint(object):
         acceptable_allocation_statuses = [
             constants.MINION_POOL_STATUS_UNINITIALIZED,
             constants.MINION_POOL_STATUS_DEALLOCATED]
-        if minion_pool.status not in acceptable_allocation_statuses:
+        current_status = minion_pool.status
+        if current_status not in acceptable_allocation_statuses:
             raise exception.InvalidMinionPoolState(
                 "Minion machines for pool '%s' cannot be allocated as the pool"
                 " is in '%s' state instead of the expected %s. Please "
@@ -690,8 +728,17 @@ class MinionManagerServerEndpoint(object):
         allocation_flow = self._get_minion_pool_allocation_flow(minion_pool)
         initial_store = self._get_pool_allocation_initial_store(
             ctxt, minion_pool, endpoint_dict)
-        self._taskflow_runner.run_flow_in_background(
-            allocation_flow, store=initial_store)
+
+        try:
+            db_api.set_minion_pool_status(
+                ctxt, minion_pool_id,
+                constants.MINION_POOL_STATUS_POOL_MAINTENANCE)
+            self._taskflow_runner.run_flow_in_background(
+                allocation_flow, store=initial_store)
+        except:
+            db_api.set_minion_pool_status(
+                ctxt, minion_pool_id, current_status)
+            raise
 
         return self._get_minion_pool(ctxt, minion_pool.id)
 
@@ -717,7 +764,8 @@ class MinionManagerServerEndpoint(object):
             # tansition pool to DEALLOCATING_MACHINES:
             deallocation_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
                 minion_pool.id,
-                constants.MINION_POOL_STATUS_DEALLOCATING_MACHINES))
+                constants.MINION_POOL_STATUS_DEALLOCATING_MACHINES,
+                status_to_revert_to=constants.MINION_POOL_STATUS_ERROR))
             deallocation_flow.add(machines_flow)
         else:
             LOG.debug(
@@ -726,7 +774,8 @@ class MinionManagerServerEndpoint(object):
         # transition pool to DEALLOCATING_SHARED_RESOURCES:
         deallocation_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
             minion_pool.id,
-            constants.MINION_POOL_STATUS_DEALLOCATING_SHARED_RESOURCES))
+            constants.MINION_POOL_STATUS_DEALLOCATING_SHARED_RESOURCES,
+            status_to_revert_to=constants.MINION_POOL_STATUS_ERROR))
 
         # add pool shared resources deletion task:
         deallocation_flow.add(
@@ -759,25 +808,30 @@ class MinionManagerServerEndpoint(object):
                 "pool_shared_resources": minion_pool.shared_resources}}
         return initial_store
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def deallocate_minion_pool(self, ctxt, minion_pool_id, force=False):
         LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_events=False, include_machines=True,
             include_progress_updates=False)
-        if minion_pool.status != constants.MINION_POOL_STATUS_ALLOCATED:
+        current_status = minion_pool.status
+        acceptable_deallocation_statuses = [
+            constants.MINION_POOL_STATUS_ALLOCATED,
+            constants.MINION_POOL_STATUS_UNINITIALIZED,
+            constants.MINION_POOL_STATUS_ERROR]
+        if current_status not in acceptable_deallocation_statuses:
             if not force:
                 raise exception.InvalidMinionPoolState(
                     "Minion pool '%s' cannot be deallocated as the pool"
-                    " is in '%s' state instead of the expected %s."% (
+                    " is in '%s' state instead of one of the expected %s"% (
                         minion_pool_id, minion_pool.status,
-                        constants.MINION_POOL_STATUS_ALLOCATED))
+                        acceptable_deallocation_statuses))
             else:
                 LOG.warn(
                     "Forcibly deallocating minion pool '%s' at user request.",
                     minion_pool_id)
         self._check_pool_machines_in_use(
-            ctxt, minion_pool, raise_if_in_use=True)
+            ctxt, minion_pool, raise_if_in_use=not force)
         endpoint_dict = self._conductor_client.get_endpoint(
             ctxt, minion_pool.endpoint_id)
 
@@ -785,8 +839,17 @@ class MinionManagerServerEndpoint(object):
             minion_pool)
         initial_store = self._get_pool_deallocation_initial_store(
             ctxt, minion_pool, endpoint_dict)
-        self._taskflow_runner.run_flow_in_background(
-            deallocation_flow, store=initial_store)
+
+        try:
+            db_api.set_minion_pool_status(
+                ctxt, minion_pool_id,
+                constants.MINION_POOL_STATUS_POOL_MAINTENANCE)
+            self._taskflow_runner.run_flow_in_background(
+                deallocation_flow, store=initial_store)
+        except:
+            db_api.set_minion_pool_status(
+                ctxt, minion_pool_id, current_status)
+            raise
 
         return self._get_minion_pool(ctxt, minion_pool.id)
 
@@ -807,7 +870,7 @@ class MinionManagerServerEndpoint(object):
                 "Minion pool with ID '%s' not found." % minion_pool_id)
         return minion_pool
 
-    # @minion_pool_synchronized
+    # @minion_manager_utils.minion_pool_synchronized_op
     # def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
     #     LOG.info(
     #         "Attempting to set up shared resources for Minion Pool '%s'.",
@@ -875,7 +938,7 @@ class MinionManagerServerEndpoint(object):
     #     return self._get_minion_pool_lifecycle_execution(
     #         ctxt, minion_pool_id, execution.id).to_dict()
 
-    # @minion_pool_synchronized
+    # @minion_manager_utils.minion_pool_synchronized_op
     # def tear_down_shared_minion_pool_resources(
     #         self, ctxt, minion_pool_id, force=False):
     #     minion_pool = db_api.get_minion_pool_lifecycle(
@@ -931,7 +994,7 @@ class MinionManagerServerEndpoint(object):
     #     return self._get_minion_pool_lifecycle_execution(
     #         ctxt, minion_pool_id, execution.id).to_dict()
 
-    # @minion_pool_synchronized
+    # @minion_manager_utils.minion_pool_synchronized_op
     # def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
     #     LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
     #     minion_pool = self._get_minion_pool(
@@ -1021,7 +1084,7 @@ class MinionManagerServerEndpoint(object):
     #                 minion_pool.id,
     #                 allocated_machine_statuses))
 
-    # @minion_pool_synchronized
+    # @minion_manager_utils.minion_pool_synchronized_op
     # def deallocate_minion_pool_machines(self, ctxt, minion_pool_id, force=False):
     #     LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
     #     minion_pool = db_api.get_minion_pool_lifecycle(
@@ -1085,13 +1148,13 @@ class MinionManagerServerEndpoint(object):
     #     return self._get_minion_pool_lifecycle_execution(
     #         ctxt, minion_pool_id, execution.id).to_dict()
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def get_minion_pool(self, ctxt, minion_pool_id):
         return self._get_minion_pool(
             ctxt, minion_pool_id, include_machines=True, include_events=True,
             include_progress_updates=True)
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_machines=False)
@@ -1110,7 +1173,7 @@ class MinionManagerServerEndpoint(object):
         LOG.info("Minion Pool '%s' successfully updated", minion_pool_id)
         return db_api.get_minion_pool(ctxt, minion_pool_id)
 
-    @minion_pool_synchronized
+    @minion_manager_utils.minion_pool_synchronized_op
     def delete_minion_pool(self, ctxt, minion_pool_id):
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_machines=True)
@@ -1130,7 +1193,7 @@ class MinionManagerServerEndpoint(object):
         LOG.info("Deleting minion pool with ID '%s'" % minion_pool_id)
         db_api.delete_minion_pool(ctxt, minion_pool_id)
 
-    # @minion_pool_synchronized
+    # @minion_manager_utils.minion_pool_synchronized_op
     # def get_minion_pool_lifecycle_executions(
     #         self, ctxt, minion_pool_id, include_tasks=False):
     #     return db_api.get_minion_pool_lifecycle_executions(

+ 44 - 20
coriolis/minion_manager/rpc/tasks.py

@@ -10,6 +10,7 @@ from coriolis import constants
 from coriolis import exception
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
+from coriolis.minion_manager.rpc import utils as minion_manager_utils
 from coriolis.taskflow import base as coriolis_taskflow_base
 
 
@@ -33,7 +34,8 @@ MINION_POOL_DEALLOCATE_MACHINE_TASK_NAME_FORMAT = (
     "pool-%s-machine-%s-deallocation")
 
 
-class UpdateMinionPoolStatusTask(coriolis_taskflow_base.BaseCoriolisTaskflowTask):
+class UpdateMinionPoolStatusTask(
+        coriolis_taskflow_base.BaseCoriolisTaskflowTask):
     """Task which updates the status of the given pool.
     Is capable of recording and reverting the state.
     """
@@ -350,7 +352,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE)
         if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
             resource_deployment_task_type = (
-                constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE)
+                constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
             resource_cleanup_task_type = (
                 constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
         super(AllocateMinionMachineTask, self).__init__(
@@ -380,8 +382,14 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             "Allocating minion machine with internal pool ID '%s'" % (
                 self._minion_machine_id))
 
-        res = super(AllocateMinionMachineTask, self).execute(
-            context, origin, destination, execution_info)
+        try:
+            res = super(AllocateMinionMachineTask, self).execute(
+                context, origin, destination, execution_info)
+        except:
+            db_api.update_minion_machine(
+                context, self._minion_machine_id, {
+                    "status": constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING})
+            raise
 
         self._add_minion_pool_event(
             context,
@@ -401,17 +409,25 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
 
     def revert(self, context, origin, destination, task_info, **kwargs):
         original_result = kwargs.get('result', {})
-        if original_result and (
-                isinstance(original_result, dict) and (
-                    'minion_provider_properties' not in original_result)):
-            LOG.debug(
-                "Reversion for Minion Machine '%s' (pool '%s') did not "
-                "receive any result from the original run. Presuming "
-                "that the task had not initially run successfully. "
-                "Result was: %s",
-                self._minion_machine_id, self._minion_pool_id,
-                original_result)
-            return task_info
+        if original_result:
+            if not isinstance(original_result, dict):
+                LOG.debug(
+                    "Reversion for Minion Machine '%s' (pool '%s') did not "
+                    "receive any dict result from the original run. Presuming "
+                    "that the task had not initially run successfully. "
+                    "Result was: %s",
+                    self._minion_machine_id, self._minion_pool_id,
+                    original_result)
+                return task_info
+            elif 'minion_provider_properties' not in original_result:
+                LOG.debug(
+                    "Reversion for Minion Machine '%s' (pool '%s') did not "
+                    "receive any result from the original run. Presuming "
+                    "that the task had not initially run successfully. "
+                    "Result was: %s",
+                    self._minion_machine_id, self._minion_pool_id,
+                    original_result)
+                return task_info
 
         cleanup_info = copy.deepcopy(task_info)
         cleanup_info['minion_provider_properties'] = original_result[
@@ -422,7 +438,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
         if db_api.get_minion_machine(context, self._minion_machine_id):
             LOG.debug(
                 "Removing minion machine entry with ID '%s' for minion pool "
-                "'%s' from DB.", self._minion_machine_id, self._minion_pool_id)
+                "'%s' from the DB.", self._minion_machine_id, self._minion_pool_id)
             db_api.delete_minion_machine(context, self._minion_machine_id)
 
         return task_info
@@ -460,10 +476,18 @@ class DeallocateMinionMachineTask(BaseMinionManangerTask):
             "Deallocating minion machine with internal pool ID '%s'" % (
                 self._minion_machine_id))
 
-        execution_info = {
-            "minion_provider_properties": machine.provider_properties}
-        res = super(DeallocateMinionMachineTask, self).execute(
-            context, origin, destination, execution_info)
+        if machine.provider_properties:
+            execution_info = {
+                "minion_provider_properties": machine.provider_properties}
+            _ = super(DeallocateMinionMachineTask, self).execute(
+                context, origin, destination, execution_info)
+        else:
+            self._add_minion_pool_event(
+                context,
+                "Minion machine with ID '%s' had no provider properties set. "
+                "Presuming it failed to deploy in the first place and simply "
+                "removing the machine's entry from the DB" % (
+                    self._minion_machine_id))
 
         LOG.debug(
             "[Task '%s'] Deleting minion machine with ID '%s' from the DB",

+ 41 - 0
coriolis/minion_manager/rpc/utils.py

@@ -0,0 +1,41 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import functools
+
+from oslo_concurrency import lockutils
+
+from coriolis import constants
+
+
+def minion_pool_synchronized(minion_pool_id, func):
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        @lockutils.synchronized(
+            constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
+            external=True)
+        def inner():
+            return func(*args, **kwargs)
+        return inner()
+    return wrapper
+
+
+def minion_pool_synchronized_op(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, minion_pool_id, *args, **kwargs):
+        return minion_pool_synchronized(minion_pool_id, func)(
+            self, ctxt, minion_pool_id, *args, **kwargs)
+    return wrapper
+
+
+def minion_machine_synchronized(minion_pool_id, minion_machine_id, func):
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        @lockutils.synchronized(
+            constants.MINION_MACHINE_LOCK_NAME_FORMAT % (
+                minion_pool_id, minion_machine_id),
+            external=True)
+        def inner():
+            return func(*args, **kwargs)
+        return inner()
+    return wrapper