|
|
@@ -167,6 +167,18 @@ def service_synchronized(func):
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
+def minion_pool_synchronized(func):
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(self, ctxt, pool_id, *args, **kwargs):
|
|
|
+ @lockutils.synchronized(
|
|
|
+ constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
|
|
|
+ external=True)
|
|
|
+ def inner():
|
|
|
+ return func(self, ctxt, pool_id, *args, **kwargs)
|
|
|
+ return inner()
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+
|
|
|
class ConductorServerEndpoint(object):
|
|
|
def __init__(self):
|
|
|
self._licensing_client = licensing_client.LicensingClient.from_env()
|
|
|
@@ -2897,3 +2909,88 @@ class ConductorServerEndpoint(object):
|
|
|
@service_synchronized
|
|
|
def delete_service(self, ctxt, service_id):
|
|
|
db_api.delete_service(ctxt, service_id)
|
|
|
+
|
|
|
+ def create_minion_pool(
|
|
|
+ self, ctxt, name, endpoint_id, environment_options,
|
|
|
+ minimum_minions, maximum_minions, minion_max_idle_time,
|
|
|
+ minion_retention_strategy):
|
|
|
+ endpoint = db_api.get_endpoint(ctxt, endpoint_id)
|
|
|
+
|
|
|
+ minion_pool = models.MinionPool()
|
|
|
+ minion_pool.id = str(uuid.uuid4())
|
|
|
+ minion_pool.name = name
|
|
|
+ minion_pool.endpoint_id = endpoint_id
|
|
|
+ minion_pool.environment_options = environment_options
|
|
|
+ minion_pool.minimum_minions = minimum_minions
|
|
|
+ minion_pool.maximum_minions = maximum_minions
|
|
|
+ minion_pool.minion_max_idle_time = minion_max_idle_time
|
|
|
+ minion_pool.minion_retention_strategy = minion_retention_strategy
|
|
|
+
|
|
|
+ db_api.add_minion_pool(ctxt, minion_pool)
|
|
|
+ return self.get_minion_pool(ctxt, minion_pool.id)
|
|
|
+
|
|
|
+ def get_minion_pools(self, ctxt):
|
|
|
+ return db_api.get_minion_pools(ctxt)
|
|
|
+
|
|
|
+ @minion_pool_synchronized
|
|
|
+ def get_minion_pool(self, ctxt, minion_pool_id):
|
|
|
+ minion_pool = db_api.get_minion_pool(ctxt, minion_pool_id)
|
|
|
+ if not minion_pool:
|
|
|
+ raise exception.NotFound(
|
|
|
+ "minion_pool with ID '%s' not found." % minion_pool_id)
|
|
|
+ return minion_pool
|
|
|
+
|
|
|
+ @minion_pool_synchronized
|
|
|
+ def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
|
|
|
+ LOG.info(
|
|
|
+ "Attempting to update minion_pool '%s' with payload: %s",
|
|
|
+ minion_pool_id, updated_values)
|
|
|
+ db_api.update_minion_pool(ctxt, minion_pool_id, updated_values)
|
|
|
+ LOG.info("Minion Pool '%s' successfully updated", minion_pool_id)
|
|
|
+ return db_api.get_minion_pool(ctxt, minion_pool_id)
|
|
|
+
|
|
|
+ @minion_pool_synchronized
|
|
|
+ def delete_minion_pool(self, ctxt, minion_pool_id):
|
|
|
+ # TODO(aznashwan): add checks for endpoints/services
|
|
|
+ # associated to the minion_pool before deletion:
|
|
|
+ db_api.delete_minion_pool(ctxt, minion_pool_id)
|
|
|
+
|
|
|
+ @replica_synchronized
|
|
|
+ def get_minion_pool_lifecycle_executions(
|
|
|
+ self, ctxt, minion_pool_id, include_tasks=False):
|
|
|
+ return db_api.get_replica_tasks_executions(
|
|
|
+ ctxt, replica_id, include_tasks)
|
|
|
+
|
|
|
+ @tasks_execution_synchronized
|
|
|
+ def get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
|
|
|
+ return self._get_replica_tasks_execution(
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
+
|
|
|
+ @tasks_execution_synchronized
|
|
|
+ def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
|
|
|
+ execution = self._get_replica_tasks_execution(
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
+ if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
|
|
|
+ raise exception.InvalidMigrationState(
|
|
|
+ "Cannot delete execution '%s' for Replica '%s' as it is "
|
|
|
+ "currently in '%s' state." % (
|
|
|
+ execution_id, replica_id, execution.status))
|
|
|
+ db_api.delete_replica_tasks_execution(ctxt, execution_id)
|
|
|
+
|
|
|
+ @tasks_execution_synchronized
|
|
|
+ def cancel_replica_tasks_execution(self, ctxt, replica_id, execution_id,
|
|
|
+ force):
|
|
|
+ execution = self._get_replica_tasks_execution(
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
+ if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
|
|
|
+ raise exception.InvalidReplicaState(
|
|
|
+ "Replica '%s' has no running execution to cancel." % (
|
|
|
+ replica_id))
|
|
|
+ if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
|
|
|
+ not force):
|
|
|
+ raise exception.InvalidReplicaState(
|
|
|
+ "Replica '%s' is already being cancelled. Please use the "
|
|
|
+ "force option if you'd like to force-cancel it." % (
|
|
|
+ replica_id))
|
|
|
+ self._cancel_tasks_execution(ctxt, execution, force=force)
|
|
|
+
|