Просмотр исходного кода

Make conductor call licensing server upon transfer creation/deletion

The conductor will now check if it has a licensing client instantiated
and attempt to create and cleanup reservations.

It will create a reservation whenever:
    - a Replica is created
    - a standalone Migration (i.e. not from a Replica) is created

The ID of the reservation will be saved in the DB.
The conductor will attempt to delete the reservation if:
    - a Replica is deleted
    - a Migration is cancelled
    - a task for a Migration enters 'error' state

An API to check to ensure the licence period is still in effect will
be made before:
    - a Replica execution is created
    - a Migration-from-Replica creation is attempted
Nashwan Azhari 7 лет назад
Родитель
Сommit
4050b69cde
1 измененных файлов с 71 добавлено и 1 удалено
  1. 71 1
      coriolis/conductor/rpc/server.py

+ 71 - 1
coriolis/conductor/rpc/server.py

@@ -14,6 +14,7 @@ from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
 from coriolis import exception
 from coriolis import keystone
+from coriolis.licensing import client as licensing_client
 from coriolis.replica_cron.rpc import client as rpc_cron_client
 from coriolis import schemas
 from coriolis import utils
@@ -86,9 +87,61 @@ def tasks_execution_synchronized(func):
 
 class ConductorServerEndpoint(object):
     def __init__(self):
+        self._licensing_client = licensing_client.LicensingClient.from_env()
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
         self._replica_cron_client = rpc_cron_client.ReplicaCronClient()
 
+    def _check_delete_reservation_for_transfer(self, transfer_action):
+        action_id = transfer_action.base_id
+        if not self._licensing_client:
+            LOG.warn(
+                "Licensing client not instantiated. Skipping deletion of "
+                "reservation for transfer action '%s'", action_id)
+            return
+
+        reservation_id = transfer_action.reservation_id
+        if reservation_id:
+            try:
+                self._licensing_client.delete_reservation(reservation_id)
+            except (Exception, KeyboardInterrupt):
+                LOG.warn(
+                    "Failed to delete reservation with ID '%s' for transfer "
+                    "action with ID '%s'. Skipping. Exception\n%s",
+                    reservation_id, action_id, utils.get_exception_details())
+
+    def _check_create_reservation_for_transfer(
+            self, transfer_action, transfer_type):
+        action_id = transfer_action.base_id
+        if not self._licensing_client:
+            LOG.warn(
+                "Licensing client not instantiated. Skipping creation of "
+                "reservation for transfer action '%s'", action_id)
+            return
+
+        ninstances = len(transfer_action.instances)
+        LOG.debug(
+            "Attempting to create '%s' reservation for %d instances for "
+            "transfer action with ID '%s'.",
+            transfer_type, ninstances, action_id)
+        reservation = self._licensing_client.add_reservation(
+            transfer_type, ninstances)
+        transfer_action.reservation_id = reservation['id']
+
+    def _check_reservation_for_transfer(self, transfer_action):
+        action_id = transfer_action.base_id
+        if not self._licensing_client:
+            LOG.warn(
+                "Licensing client not instantiated. Skipping checking of "
+                "reservation for transfer action '%s'", action_id)
+            return
+
+        reservation_id = transfer_action.reservation_id
+        if reservation_id:
+            LOG.debug(
+                "Attempting to check reservation with ID '%s' for transfer "
+                "action '%s'", reservation_id, action_id)
+        self._licensing_client.check_reservation(reservation_id)
+
     def create_endpoint(self, ctxt, name, endpoint_type, description,
                         connection_info):
         endpoint = models.Endpoint()
@@ -244,6 +297,7 @@ class ConductorServerEndpoint(object):
     @replica_synchronized
     def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
         replica = self._get_replica(ctxt, replica_id)
+        self._check_reservation_for_transfer(replica)
         self._check_replica_running_executions(ctxt, replica)
         execution = models.TasksExecution()
         execution.id = str(uuid.uuid4())
@@ -352,6 +406,7 @@ class ConductorServerEndpoint(object):
     def delete_replica(self, ctxt, replica_id):
         replica = self._get_replica(ctxt, replica_id)
         self._check_replica_running_executions(ctxt, replica)
+        self._check_delete_reservation_for_transfer(replica)
         db_api.delete_replica(ctxt, replica_id)
 
     @replica_synchronized
@@ -413,6 +468,9 @@ class ConductorServerEndpoint(object):
         replica.network_map = network_map
         replica.storage_mappings = storage_mappings
 
+        self._check_create_reservation_for_transfer(
+            replica, licensing_client.RESERVATION_TYPE_REPLICA)
+
         db_api.add_replica(ctxt, replica)
         LOG.info("Replica created: %s", replica.id)
         return self.get_replica(ctxt, replica.id)
@@ -481,6 +539,7 @@ class ConductorServerEndpoint(object):
     def deploy_replica_instances(self, ctxt, replica_id, clone_disks, force,
                                  skip_os_morphing=False):
         replica = self._get_replica(ctxt, replica_id)
+        self._check_reservation_for_transfer(replica)
         self._check_replica_running_executions(ctxt, replica)
         self._check_valid_replica_tasks_execution(replica, force)
 
@@ -621,6 +680,9 @@ class ConductorServerEndpoint(object):
         migration.info = {}
         migration.notes = notes
 
+        self._check_create_reservation_for_transfer(
+            migration, licensing_client.RESERVATION_TYPE_MIGRATION)
+
         for instance in instances:
             task_validate = self._create_task(
                 instance, constants.TASK_TYPE_VALIDATE_MIGRATION_INPUTS,
@@ -714,6 +776,7 @@ class ConductorServerEndpoint(object):
                 "The migration is not running")
         execution = migration.executions[0]
         self._cancel_tasks_execution(ctxt, execution, force)
+        self._check_delete_reservation_for_transfer(migration)
 
     def _cancel_tasks_execution(self, ctxt, execution, force=False):
         has_running_tasks = False
@@ -936,9 +999,16 @@ class ConductorServerEndpoint(object):
         task = db_api.get_task(ctxt, task_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
-        with lockutils.lock(execution.action_id):
+        action_id = execution.action_id
+        with lockutils.lock(action_id):
             self._cancel_tasks_execution(ctxt, execution)
 
+        # NOTE: if this is a migration, make sure to delete
+        # its associated reservation.
+        action = db_api.get_action(ctxt, action_id)
+        if action.type == "migration":
+            self._check_delete_reservation_for_transfer(action)
+
     @task_synchronized
     def task_event(self, ctxt, task_id, level, message):
         LOG.info("Task event: %s", task_id)