|
|
@@ -143,6 +143,18 @@ def tasks_execution_synchronized(func):
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
+def minion_pool_tasks_execution_synchronized(func):
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(self, ctxt, minion_pool_id, execution_id, *args, **kwargs):
|
|
|
+ @lockutils.synchronized(
|
|
|
+ constants.EXECUTION_LOCK_NAME_FORMAT % execution_id,
|
|
|
+ external=True)
|
|
|
+ def inner():
|
|
|
+ return func(self, ctxt, minion_pool_id, execution_id, *args, **kwargs)
|
|
|
+ return inner()
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+
|
|
|
def region_synchronized(func):
|
|
|
@functools.wraps(func)
|
|
|
def wrapper(self, ctxt, region_id, *args, **kwargs):
|
|
|
@@ -169,12 +181,12 @@ def service_synchronized(func):
|
|
|
|
|
|
def minion_pool_synchronized(func):
|
|
|
@functools.wraps(func)
|
|
|
- def wrapper(self, ctxt, pool_id, *args, **kwargs):
|
|
|
+ def wrapper(self, ctxt, minion_pool_id, *args, **kwargs):
|
|
|
@lockutils.synchronized(
|
|
|
- constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
|
|
|
+ constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
|
|
|
external=True)
|
|
|
def inner():
|
|
|
- return func(self, ctxt, pool_id, *args, **kwargs)
|
|
|
+ return func(self, ctxt, minion_pool_id, *args, **kwargs)
|
|
|
return inner()
|
|
|
return wrapper
|
|
|
|
|
|
@@ -764,7 +776,7 @@ class ConductorServerEndpoint(object):
|
|
|
if missing_deps:
|
|
|
raise exception.CoriolisException(
|
|
|
"Task '%s' (type '%s') for instance '%s' "
|
|
|
- "has non-exitent tasks referenced as "
|
|
|
+ "has non-existent tasks referenced as "
|
|
|
"dependencies: %s" % (
|
|
|
task.id, task.task_type,
|
|
|
instance, missing_deps))
|
|
|
@@ -2198,6 +2210,16 @@ class ConductorServerEndpoint(object):
|
|
|
def _handle_post_task_actions(self, ctxt, task, execution, task_info):
|
|
|
task_type = task.task_type
|
|
|
|
|
|
+ def _check_other_tasks_running(execution, current_task):
|
|
|
+ still_running = False
|
|
|
+ for other_task in execution.tasks:
|
|
|
+ if other_task.id == current_task.id:
|
|
|
+ continue
|
|
|
+ if other_task.status in constants.ACTIVE_TASK_STATUSES:
|
|
|
+ still_running = True
|
|
|
+ break
|
|
|
+ return still_running
|
|
|
+
|
|
|
if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
|
|
|
|
|
|
# When restoring a snapshot in some import providers (OpenStack),
|
|
|
@@ -2278,13 +2300,7 @@ class ConductorServerEndpoint(object):
|
|
|
|
|
|
if task_type == constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
|
|
|
# check if this was the last task in the update execution:
|
|
|
- still_running = False
|
|
|
- for other_task in execution.tasks:
|
|
|
- if other_task.id == task.id:
|
|
|
- continue
|
|
|
- if other_task.status in constants.ACTIVE_TASK_STATUSES:
|
|
|
- still_running = True
|
|
|
- break
|
|
|
+ still_running = _check_other_tasks_running(execution, task)
|
|
|
if not still_running:
|
|
|
# it means this was the last update task in the Execution
|
|
|
# and we may safely update the params of the Replica
|
|
|
@@ -2299,6 +2315,70 @@ class ConductorServerEndpoint(object):
|
|
|
# update task finishes last:
|
|
|
db_api.update_replica(
|
|
|
ctxt, execution.action_id, task_info)
|
|
|
+
|
|
|
+ elif task_type == constants.TASK_TYPE_SET_UP_SHARED_POOL_RESOURCES:
|
|
|
+ still_running = _check_other_tasks_running(execution, task)
|
|
|
+ if not still_running:
|
|
|
+ LOG.info(
|
|
|
+ "Updating 'pool_supporting_resources' for pool %s after "
|
|
|
+ "completion of task '%s' (type '%s').",
|
|
|
+ execution.action_id, task.id, task_type)
|
|
|
+ db_api.update_minion_pool_lifecycle(
|
|
|
+ ctxt, execution.action_id, {
|
|
|
+ "pool_supporting_resources": task_info.get(
|
|
|
+ "pool_supporting_resources", {})})
|
|
|
+ db_api.set_minion_pool_lifecycle_status(
|
|
|
+ ctxt, execution.action_id,
|
|
|
+ constants.MINION_POOL_STATUS_DEALLOCATED)
|
|
|
+
|
|
|
+ elif task_type == constants.TASK_TYPE_TEAR_DOWN_SHARED_POOL_RESOURCES:
|
|
|
+ still_running = _check_other_tasks_running(execution, task)
|
|
|
+ if not still_running:
|
|
|
+ LOG.info(
|
|
|
+ "Clearing 'pool_supporting_resources' for pool %s following "
|
|
|
+ "completion of task '%s' (type %s)",
|
|
|
+ execution.action_id, task.id, task_type)
|
|
|
+ db_api.update_minion_pool_lifecycle(
|
|
|
+ ctxt, execution.action_id, {
|
|
|
+ "pool_supporting_resources": {}})
|
|
|
+ db_api.set_minion_pool_lifecycle_status(
|
|
|
+ ctxt, execution.action_id,
|
|
|
+ constants.MINION_POOL_STATUS_UNINITIALIZED)
|
|
|
+
|
|
|
+ elif task_type == constants.TASK_TYPE_CREATE_MINION:
|
|
|
+ LOG.info(
|
|
|
+ "Updating properties for Minion Machine '%s' of pool %s "
|
|
|
+ "following completion of task '%s' (type %s).",
|
|
|
+ task.instance, execution.action_id, task.id, task_type)
|
|
|
+ db_api.update_minion_machine(
|
|
|
+ # NOTE: we used the Minion Machine ID as the tasks' instance.
|
|
|
+ ctxt, task.instance, {
|
|
|
+ "status": constants.MINION_MACHINE_STATUS_AVAILABLE,
|
|
|
+ "provider_properties": task_info[
|
|
|
+ "minion_provider_properties"],
|
|
|
+ "connection_info": task_info[
|
|
|
+ "minion_connection_info"]})
|
|
|
+
|
|
|
+ still_running = _check_other_tasks_running(execution, task)
|
|
|
+ if not still_running:
|
|
|
+ db_api.set_minion_pool_lifecycle_status(
|
|
|
+ ctxt, execution.action_id,
|
|
|
+ constants.MINION_POOL_STATUS_AVAILABLE)
|
|
|
+
|
|
|
+ elif task_type == constants.TASK_TYPE_DELETE_MINION:
|
|
|
+ LOG.info(
|
|
|
+ "Clearing properties for Minion Machine '%s' of pool %s "
|
|
|
+ "following completion of task '%s' (type %s).",
|
|
|
+ task.instance, execution.action_id, task.id, task_type)
|
|
|
+ db_api.update_minion_machine(
|
|
|
+ # NOTE: we used the Minion Machine ID as the tasks' instance.
|
|
|
+ ctxt, task.instance, {
|
|
|
+ "status": constants.MINION_MACHINE_STATUS_UNINITIALIZED,
|
|
|
+ "provider_properties": task_info[
|
|
|
+ "minion_provider_properties"],
|
|
|
+ "connection_info": task_info[
|
|
|
+ "minion_connection_info"]})
|
|
|
+
|
|
|
else:
|
|
|
LOG.debug(
|
|
|
"No post-task actions required for task '%s' of type '%s'",
|
|
|
@@ -2913,12 +2993,13 @@ class ConductorServerEndpoint(object):
|
|
|
def create_minion_pool(
|
|
|
self, ctxt, name, endpoint_id, environment_options,
|
|
|
minimum_minions, maximum_minions, minion_max_idle_time,
|
|
|
- minion_retention_strategy):
|
|
|
+ minion_retention_strategy, notes=None):
|
|
|
endpoint = db_api.get_endpoint(ctxt, endpoint_id)
|
|
|
|
|
|
minion_pool = models.MinionPoolLifecycle()
|
|
|
minion_pool.id = str(uuid.uuid4())
|
|
|
- minion_pool.name = name
|
|
|
+ minion_pool.pool_name = name
|
|
|
+ minion_pool.notes = notes
|
|
|
minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
|
|
|
minion_pool.minimum_minions = minimum_minions
|
|
|
minion_pool.maximum_minions = maximum_minions
|
|
|
@@ -2930,7 +3011,7 @@ class ConductorServerEndpoint(object):
|
|
|
minion_pool.origin_endpoint_id = endpoint_id
|
|
|
minion_pool.destination_endpoint_id = endpoint_id
|
|
|
minion_pool.source_environment = environment_options
|
|
|
- minion_pool.destination_environment = minimum_minions
|
|
|
+ minion_pool.destination_environment = environment_options
|
|
|
minion_pool.instances = []
|
|
|
minion_pool.info = {}
|
|
|
|
|
|
@@ -2945,19 +3026,68 @@ class ConductorServerEndpoint(object):
|
|
|
minion_pool = db_api.get_minion_pool_lifecycle(ctxt, minion_pool_id)
|
|
|
if not minion_pool:
|
|
|
raise exception.NotFound(
|
|
|
- "minion_pool with ID '%s' not found." % minion_pool_id)
|
|
|
+ "Minion pool with ID '%s' not found." % minion_pool_id)
|
|
|
return minion_pool
|
|
|
|
|
|
+ @minion_pool_synchronized
|
|
|
+ def initialize_minion_pool(self, ctxt, minion_pool_id):
|
|
|
+ LOG.info("Attempting to initialize Minion Pool '%s'.", minion_pool_id)
|
|
|
+ minion_pool = db_api.get_minion_pool_lifecycle(ctxt, minion_pool_id)
|
|
|
+ if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
|
|
|
+ raise exception.InvalidMinionPoolState(
|
|
|
+ "Minion Pool '%s' cannot be initialized as it is in '%s' state "
|
|
|
+ "instead of the expected %s."% (
|
|
|
+ minion_pool_id, minion_pool.pool_status,
|
|
|
+ constants.MINION_POOL_STATUS_UNINITIALIZED))
|
|
|
+
|
|
|
+ execution = models.TasksExecution()
|
|
|
+ execution.id = str(uuid.uuid4())
|
|
|
+ execution.action = minion_pool
|
|
|
+ execution.status = constants.EXECUTION_STATUS_UNEXECUTED
|
|
|
+ execution.type = constants.EXECUTION_TYPE_MINION_POOL_INITIALIZATION
|
|
|
+
|
|
|
+ minion_pool.info[minion_pool_id] = {
|
|
|
+ "pool_identifier": minion_pool.id,
|
|
|
+ # TODO(aznashwan): remove redundancy once transfer
|
|
|
+ # action DB models have been overhauled:
|
|
|
+ "pool_environment_options": minion_pool.source_environment}
|
|
|
+
|
|
|
+ validate_pool_options_task = self._create_task(
|
|
|
+ minion_pool.id,
|
|
|
+ constants.TASK_TYPE_VALIDATE_MINION_POOL_OPTIONS,
|
|
|
+ execution)
|
|
|
+
|
|
|
+ setup_pool_resources_task = self._create_task(
|
|
|
+ minion_pool.id,
|
|
|
+ constants.TASK_TYPE_SET_UP_SHARED_POOL_RESOURCES,
|
|
|
+ execution,
|
|
|
+ depends_on=[validate_pool_options_task.id])
|
|
|
+
|
|
|
+ self._check_execution_tasks_sanity(execution, minion_pool.info)
|
|
|
+
|
|
|
+ # update the action info for the pool's instance:
|
|
|
+ db_api.update_transfer_action_info_for_instance(
|
|
|
+ ctxt, minion_pool.id, minion_pool.id,
|
|
|
+ minion_pool.info[minion_pool.id])
|
|
|
+
|
|
|
+ # add new execution to DB:
|
|
|
+ db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
|
|
|
+ LOG.info("Minion pool initialization execution created: %s", execution.id)
|
|
|
+
|
|
|
+ self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
|
|
|
+ return self._get_minion_pool_lifecycle_execution(
|
|
|
+ ctxt, minion_pool_id, execution.id).to_dict()
|
|
|
+
|
|
|
@minion_pool_synchronized
|
|
|
def allocate_minion_pool(self, ctxt, minion_pool_id):
|
|
|
LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
|
|
|
- minion_pool = db_api.get_minion_pool_lifecycle(ctxt, minion_pool_id)
|
|
|
- if minion_pool.status not in (
|
|
|
- constants.MINION_POOL_STATUS_UNINITIALIZED,
|
|
|
- constants.MINION_POOL_STATUS_DEALLOCATED):
|
|
|
+ minion_pool = self._get_minion_pool(ctxt, minion_pool_id)
|
|
|
+ if minion_pool.pool_status != constants.MINION_POOL_STATUS_DEALLOCATED:
|
|
|
raise exception.InvalidMinionPoolState(
|
|
|
- "Minion Pool '%s' cannot be started as it is in '%s' state" % (
|
|
|
- minion_pool_id, minion_pool.status))
|
|
|
+ "Minion Pool '%s' cannot be started as it is in '%s' state "
|
|
|
+ "instead of the expected %s."% (
|
|
|
+ minion_pool_id, minion_pool.pool_status,
|
|
|
+ constants.MINION_POOL_STATUS_DEALLOCATED))
|
|
|
|
|
|
execution = models.TasksExecution()
|
|
|
execution.id = str(uuid.uuid4())
|
|
|
@@ -3022,18 +3152,18 @@ class ConductorServerEndpoint(object):
|
|
|
LOG.info("Minion pool allocation execution created: %s", execution.id)
|
|
|
|
|
|
self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
|
|
|
- return self.get_minion_pool_lifecycle_execution(
|
|
|
- ctxt, minion_pool_id, execution.id)
|
|
|
+ return self._get_minion_pool_lifecycle_execution(
|
|
|
+ ctxt, minion_pool_id, execution.id).to_dict()
|
|
|
|
|
|
@minion_pool_synchronized
|
|
|
def deallocate_minion_pool(self, ctxt, minion_pool_id):
|
|
|
LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
|
|
|
minion_pool = db_api.get_minion_pool_lifecycle(ctxt, minion_pool_id)
|
|
|
- if minion_pool.status not in (
|
|
|
+ if minion_pool.pool_status not in (
|
|
|
constants.MINION_POOL_STATUS_AVAILABLE):
|
|
|
raise exception.InvalidMinionPoolState(
|
|
|
"Minion Pool '%s' cannot be started as it is in '%s' state" % (
|
|
|
- minion_pool_id, minion_pool.status))
|
|
|
+ minion_pool_id, minion_pool.pool_status))
|
|
|
|
|
|
# TODO(aznashwan): check minion pool running
|
|
|
# executions/allocated machines
|
|
|
@@ -3044,8 +3174,8 @@ class ConductorServerEndpoint(object):
|
|
|
execution.status = constants.EXECUTION_STATUS_UNEXECUTED
|
|
|
execution.type = constants.EXECUTION_TYPE_MINION_POOL_MAINTENANCE
|
|
|
|
|
|
- # TODO(aznashwan): add shared pool resources setup tasks:
|
|
|
- for minion_machine_id in minion_pool.instances:
|
|
|
+ for minion_machine in minion_pool.minion_machines:
|
|
|
+ minion_machine_id = minion_machine.id
|
|
|
minion_pool.info[minion_machine_id] = {
|
|
|
"pool_environment_options": minion_pool.source_environment}
|
|
|
self._create_task(
|
|
|
@@ -3065,8 +3195,8 @@ class ConductorServerEndpoint(object):
|
|
|
LOG.info("Minion pool allocation execution created: %s", execution.id)
|
|
|
|
|
|
self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
|
|
|
- return self.get_minion_pool_lifecycle_execution(
|
|
|
- ctxt, minion_pool_id, execution.id)
|
|
|
+ return self._get_minion_pool_lifecycle_execution(
|
|
|
+ ctxt, minion_pool_id, execution.id).to_dict()
|
|
|
|
|
|
@minion_pool_synchronized
|
|
|
def get_minion_pool(self, ctxt, minion_pool_id):
|
|
|
@@ -3103,13 +3233,13 @@ class ConductorServerEndpoint(object):
|
|
|
execution_id, minion_pool_id))
|
|
|
return execution
|
|
|
|
|
|
- @tasks_execution_synchronized
|
|
|
+ @minion_pool_tasks_execution_synchronized
|
|
|
def get_minion_pool_lifecycle_execution(
|
|
|
self, ctxt, minion_pool_id, execution_id):
|
|
|
return self._get_minion_pool_lifecycle_execution(
|
|
|
- ctxt, minion_pool_id, execution_id)
|
|
|
+ ctxt, minion_pool_id, execution_id).to_dict()
|
|
|
|
|
|
- @tasks_execution_synchronized
|
|
|
+ @minion_pool_tasks_execution_synchronized
|
|
|
def delete_minion_pool_lifecycle_execution(
|
|
|
self, ctxt, minion_pool_id, execution_id):
|
|
|
execution = self._get_minion_pool_lifecycle_execution(
|
|
|
@@ -3121,7 +3251,7 @@ class ConductorServerEndpoint(object):
|
|
|
execution_id, minion_pool_id, execution.status))
|
|
|
db_api.delete_minion_pool_lifecycle_execution(ctxt, execution_id)
|
|
|
|
|
|
- @tasks_execution_synchronized
|
|
|
+ @minion_pool_tasks_execution_synchronized
|
|
|
def cancel_minion_pool_lifecycle_execution(
|
|
|
self, ctxt, minion_pool_id, execution_id, force):
|
|
|
execution = self._get_minion_pool_lifecycle_execution(
|