|
|
@@ -14,6 +14,7 @@ from coriolis.db import api as db_api
|
|
|
from coriolis.db.sqlalchemy import models
|
|
|
from coriolis import exception
|
|
|
from coriolis import keystone
|
|
|
+from coriolis.licensing import client as licensing_client
|
|
|
from coriolis.replica_cron.rpc import client as rpc_cron_client
|
|
|
from coriolis import schemas
|
|
|
from coriolis import utils
|
|
|
@@ -86,9 +87,61 @@ def tasks_execution_synchronized(func):
|
|
|
|
|
|
class ConductorServerEndpoint(object):
|
|
|
def __init__(self):
|
|
|
+ self._licensing_client = licensing_client.LicensingClient.from_env()
|
|
|
self._rpc_worker_client = rpc_worker_client.WorkerClient()
|
|
|
self._replica_cron_client = rpc_cron_client.ReplicaCronClient()
|
|
|
|
|
|
+ def _check_delete_reservation_for_transfer(self, transfer_action):
|
|
|
+ action_id = transfer_action.base_id
|
|
|
+ if not self._licensing_client:
|
|
|
+ LOG.warn(
|
|
|
+ "Licensing client not instantiated. Skipping deletion of "
|
|
|
+ "reservation for transfer action '%s'", action_id)
|
|
|
+ return
|
|
|
+
|
|
|
+ reservation_id = transfer_action.reservation_id
|
|
|
+ if reservation_id:
|
|
|
+ try:
|
|
|
+ self._licensing_client.delete_reservation(reservation_id)
|
|
|
+ except (Exception, KeyboardInterrupt):
|
|
|
+ LOG.warn(
|
|
|
+ "Failed to delete reservation with ID '%s' for transfer "
|
|
|
+ "action with ID '%s'. Skipping. Exception\n%s",
|
|
|
+ reservation_id, action_id, utils.get_exception_details())
|
|
|
+
|
|
|
+ def _check_create_reservation_for_transfer(
|
|
|
+ self, transfer_action, transfer_type):
|
|
|
+ action_id = transfer_action.base_id
|
|
|
+ if not self._licensing_client:
|
|
|
+ LOG.warn(
|
|
|
+ "Licensing client not instantiated. Skipping creation of "
|
|
|
+ "reservation for transfer action '%s'", action_id)
|
|
|
+ return
|
|
|
+
|
|
|
+ ninstances = len(transfer_action.instances)
|
|
|
+ LOG.debug(
|
|
|
+ "Attempting to create '%s' reservation for %d instances for "
|
|
|
+ "transfer action with ID '%s'.",
|
|
|
+ transfer_type, ninstances, action_id)
|
|
|
+ reservation = self._licensing_client.add_reservation(
|
|
|
+ transfer_type, ninstances)
|
|
|
+ transfer_action.reservation_id = reservation['id']
|
|
|
+
|
|
|
+ def _check_reservation_for_transfer(self, transfer_action):
|
|
|
+ action_id = transfer_action.base_id
|
|
|
+ if not self._licensing_client:
|
|
|
+ LOG.warn(
|
|
|
+ "Licensing client not instantiated. Skipping checking of "
|
|
|
+ "reservation for transfer action '%s'", action_id)
|
|
|
+ return
|
|
|
+
|
|
|
+ reservation_id = transfer_action.reservation_id
|
|
|
+ if reservation_id:
|
|
|
+ LOG.debug(
|
|
|
+ "Attempting to check reservation with ID '%s' for transfer "
|
|
|
+ "action '%s'", reservation_id, action_id)
|
|
|
+ self._licensing_client.check_reservation(reservation_id)
|
|
|
+
|
|
|
def create_endpoint(self, ctxt, name, endpoint_type, description,
|
|
|
connection_info):
|
|
|
endpoint = models.Endpoint()
|
|
|
@@ -244,6 +297,7 @@ class ConductorServerEndpoint(object):
|
|
|
@replica_synchronized
|
|
|
def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
|
|
|
replica = self._get_replica(ctxt, replica_id)
|
|
|
+ self._check_reservation_for_transfer(replica)
|
|
|
self._check_replica_running_executions(ctxt, replica)
|
|
|
execution = models.TasksExecution()
|
|
|
execution.id = str(uuid.uuid4())
|
|
|
@@ -352,6 +406,7 @@ class ConductorServerEndpoint(object):
|
|
|
def delete_replica(self, ctxt, replica_id):
|
|
|
replica = self._get_replica(ctxt, replica_id)
|
|
|
self._check_replica_running_executions(ctxt, replica)
|
|
|
+ self._check_delete_reservation_for_transfer(replica)
|
|
|
db_api.delete_replica(ctxt, replica_id)
|
|
|
|
|
|
@replica_synchronized
|
|
|
@@ -413,6 +468,9 @@ class ConductorServerEndpoint(object):
|
|
|
replica.network_map = network_map
|
|
|
replica.storage_mappings = storage_mappings
|
|
|
|
|
|
+ self._check_create_reservation_for_transfer(
|
|
|
+ replica, licensing_client.RESERVATION_TYPE_REPLICA)
|
|
|
+
|
|
|
db_api.add_replica(ctxt, replica)
|
|
|
LOG.info("Replica created: %s", replica.id)
|
|
|
return self.get_replica(ctxt, replica.id)
|
|
|
@@ -481,6 +539,7 @@ class ConductorServerEndpoint(object):
|
|
|
def deploy_replica_instances(self, ctxt, replica_id, clone_disks, force,
|
|
|
skip_os_morphing=False):
|
|
|
replica = self._get_replica(ctxt, replica_id)
|
|
|
+ self._check_reservation_for_transfer(replica)
|
|
|
self._check_replica_running_executions(ctxt, replica)
|
|
|
self._check_valid_replica_tasks_execution(replica, force)
|
|
|
|
|
|
@@ -621,6 +680,9 @@ class ConductorServerEndpoint(object):
|
|
|
migration.info = {}
|
|
|
migration.notes = notes
|
|
|
|
|
|
+ self._check_create_reservation_for_transfer(
|
|
|
+ migration, licensing_client.RESERVATION_TYPE_MIGRATION)
|
|
|
+
|
|
|
for instance in instances:
|
|
|
task_validate = self._create_task(
|
|
|
instance, constants.TASK_TYPE_VALIDATE_MIGRATION_INPUTS,
|
|
|
@@ -714,6 +776,7 @@ class ConductorServerEndpoint(object):
|
|
|
"The migration is not running")
|
|
|
execution = migration.executions[0]
|
|
|
self._cancel_tasks_execution(ctxt, execution, force)
|
|
|
+ self._check_delete_reservation_for_transfer(migration)
|
|
|
|
|
|
def _cancel_tasks_execution(self, ctxt, execution, force=False):
|
|
|
has_running_tasks = False
|
|
|
@@ -936,9 +999,16 @@ class ConductorServerEndpoint(object):
|
|
|
task = db_api.get_task(ctxt, task_id)
|
|
|
execution = db_api.get_tasks_execution(ctxt, task.execution_id)
|
|
|
|
|
|
- with lockutils.lock(execution.action_id):
|
|
|
+ action_id = execution.action_id
|
|
|
+ with lockutils.lock(action_id):
|
|
|
self._cancel_tasks_execution(ctxt, execution)
|
|
|
|
|
|
+ # NOTE: if this is a migration, make sure to delete
|
|
|
+ # its associated reservation.
|
|
|
+ action = db_api.get_action(ctxt, action_id)
|
|
|
+ if action.type == "migration":
|
|
|
+ self._check_delete_reservation_for_transfer(action)
|
|
|
+
|
|
|
@task_synchronized
|
|
|
def task_event(self, ctxt, task_id, level, message):
|
|
|
LOG.info("Task event: %s", task_id)
|