Prechádzať zdrojové kódy

Hook in new reservation fulfillment logic in conductor.

Signed-off-by: Nashwan Azhari <nazhari@cloudbasesolutions.com>
Nashwan Azhari 1 rok pred
rodič
commit
01491b17f4

+ 162 - 34
coriolis/conductor/rpc/server.py

@@ -45,6 +45,13 @@ TASK_DEADLOCK_ERROR_MESSAGE = (
     "A fatal deadlock has occurred. Further debugging is required. "
     "Please review the Conductor logs and contact support for assistance.")
 
+SCENARIO_TYPE_TO_LICENSING_RESERVATION_MAP = {
+    constants.REPLICA_SCENARIO_REPLICA:
+        licensing_client.RESERVATION_TYPE_REPLICA,
+    constants.REPLICA_SCENARIO_LIVE_MIGRATION:
+        licensing_client.RESERVATION_TYPE_MIGRATION
+}
+
 
 def endpoint_synchronized(func):
     @functools.wraps(func)
@@ -290,40 +297,119 @@ class ConductorServerEndpoint(object):
                     "action with ID '%s'. Skipping. Exception\n%s",
                     reservation_id, action_id, utils.get_exception_details())
 
-    def _check_create_reservation_for_transfer(
-            self, transfer_action, transfer_type):
-        action_id = transfer_action.base_id
+    def _create_reservation_for_replica(self, replica):
+        action_id = replica.base_id
+        scenario = replica.scenario
+        reservation_type = SCENARIO_TYPE_TO_LICENSING_RESERVATION_MAP.get(
+            scenario, None)
+        if not reservation_type:
+            raise exception.LicensingException(
+                message="Could not determine reservation type for replica "
+                        f"'{action_id}' with scenario '{replica.scenario}'.")
         if not self._licensing_client:
             LOG.warn(
                 "Licensing client not instantiated. Skipping creation of "
                 "reservation for transfer action '%s'", action_id)
             return
 
-        ninstances = len(transfer_action.instances)
+        ninstances = len(replica.instances)
         LOG.debug(
             "Attempting to create '%s' reservation for %d instances for "
             "transfer action with ID '%s'.",
-            transfer_type, ninstances, action_id)
+            reservation_type, ninstances, action_id)
         reservation = self._licensing_client.add_reservation(
-            transfer_type, ninstances)
-        transfer_action.reservation_id = reservation['id']
+            reservation_type, ninstances)
+
+        LOG.info(
+            f"Sucessfully created licensing reservation for transfer "
+            f"with ID '{action_id}' with properties: {reservation}")
+        replica.reservation_id = reservation['id']
+
+        return reservation
 
-    def _check_reservation_for_transfer(
-            self, transfer_action, reservation_type):
+    def _get_licensing_reservation_for_action(self, transfer_action):
         action_id = transfer_action.base_id
+        if not self._licensing_client:
+            LOG.warn(
+                f"Licensing client not instantiated. Skipping getting "
+                f"reservation for transfer action '{action_id}'")
+            return None
+
+        reservation_id = transfer_action.reservation_id
+        if not reservation_id:
+            LOG.warn(
+                f"No reservation_id set on transfer action '{action_id}'")
+            return None
+
+        return self._licensing_client.get_reservation(reservation_id)
+
+    def _check_mark_reservation_fulfilled(
+            self, transfer_action, must_unfulfilled=False):
+        action_id = transfer_action.id
+        reservation = self._get_licensing_reservation_for_action(
+            transfer_action)
+        if not reservation:
+            LOG.info(
+                f"No licensing reservation found for transfer action "
+                f"'{action_id}'. Skipping marking fulfilled.")
+            return
+
+        reservation_id = reservation['id']
+        fulfilled = reservation.get("fulfilled_at", None)
+        if fulfilled:
+            if must_unfulfilled:
+                raise exception.Conflict(
+                    f"A licensing reservation with ID {reservation_id} "
+                    "already exists and has been marked as fulfilled "
+                    "within the licensing server. Please create a new "
+                    "transfer operation in order to obtain a new "
+                    "reservation.")
+            LOG.debug(
+                f"Reservation with ID '{reservation_id}' for transfer "
+                f"transfer action '{action_id}' was already marked as fulfilled")
+        else:
+            self._licensing_client.mark_reservation_fulfilled(reservation_id)
+            LOG.debug(
+                f"Successfully marked reservation with ID '{reservation_id}' for "
+                f"transfer action '{action_id}' as fulfilled")
+
+    def _check_reservation_for_replica(self, replica):
+        scenario = replica.scenario
+        reservation_type = SCENARIO_TYPE_TO_LICENSING_RESERVATION_MAP.get(
+            scenario, None)
+        if not reservation_type:
+            raise exception.LicensingException(
+                message="Could not determine reservation type for replica "
+                        f"'{replica.id}' with scenario '{replica.scenario}'.")
+
+        action_id = replica.base_id
         if not self._licensing_client:
             LOG.warn(
                 "Licensing client not instantiated. Skipping checking of "
                 "reservation for transfer action '%s'", action_id)
             return
 
-        reservation_id = transfer_action.reservation_id
+        reservation_id = replica.reservation_id
         if reservation_id:
             LOG.debug(
                 "Attempting to check reservation with ID '%s' for transfer "
                 "action '%s'", reservation_id, action_id)
             try:
-                transfer_action.reservation_id = (
+                reservation = self._licensing_client.get_reservation(
+                    reservation_id)
+
+                fulfilled_at = reservation.get("fulfilled_at", None)
+                if scenario == constants.REPLICA_SCENARIO_LIVE_MIGRATION and (
+                        fulfilled_at):
+                    raise exception.LicensingException(
+                        message=f"The Live Migration operation with ID "
+                                f"'{replica.id}' (licensing reservation "
+                                f"'{reservation_id}' has already been "
+                                f"fulfilled on {fulfilled_at}. Please "
+                                f"create a new Live Migration operation "
+                                f"to create a new licensing reservation.")
+
+                replica.reservation_id = (
                     self._licensing_client.check_refresh_reservation(
                         reservation_id)['id'])
             except Exception as ex:
@@ -331,7 +417,7 @@ class ConductorServerEndpoint(object):
                 if exc_code in [404, 409]:
                     if exc_code == 409:
                         LOG.debug(
-                            "Server-side exception occurred while trying to "
+                            "Licensing-side conflict occurred while trying to "
                             "check the existing reservation '%s' for action "
                             "'%s'. Attempting to create a new reservation. "
                             "Trace was: %s",
@@ -344,15 +430,14 @@ class ConductorServerEndpoint(object):
                             "reservation. Trace was: %s",
                             reservation_id, action_id,
                             utils.get_exception_details())
-                    self._check_create_reservation_for_transfer(
-                        transfer_action, reservation_type)
+                    self._create_reservation_for_replica(replica)
                 else:
                     raise ex
         else:
-            LOG.debug(
-                "Transfer action '%s' has no reservation ID set.", action_id)
-            self._check_create_reservation_for_transfer(
-                transfer_action, reservation_type)
+            LOG.info(
+                f"Transfer action '{action_id}' has no reservation ID set, "
+                f"attempting to create a new one for it")
+            self._create_reservation_for_replica(replica)
 
     def create_endpoint(self, ctxt, name, endpoint_type, description,
                         connection_info, mapped_regions=None):
@@ -825,8 +910,7 @@ class ConductorServerEndpoint(object):
     @replica_synchronized
     def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
         replica = self._get_replica(ctxt, replica_id, include_task_info=True)
-        self._check_reservation_for_transfer(
-            replica, licensing_client.RESERVATION_TYPE_REPLICA)
+        self._check_reservation_for_replica(replica)
         self._check_replica_running_executions(ctxt, replica)
         self._check_minion_pools_for_action(ctxt, replica)
 
@@ -1109,12 +1193,6 @@ class ConductorServerEndpoint(object):
     def delete_replica(self, ctxt, replica_id):
         replica = self._get_replica(ctxt, replica_id)
         self._check_replica_running_executions(ctxt, replica)
-        # TODO(aznashwan): update reservation deletion logic if
-        # the Replica was never successfully deployed and its
-        # disks were deleted.
-        # This might not be possible if its executions were deleted,
-        # but might be possible to set the new 'fulfilled' field within
-        # the reservation on the licensing server after a successful execution.
         self._check_delete_reservation_for_transfer(replica)
         db_api.delete_replica(ctxt, replica_id)
 
@@ -1230,10 +1308,7 @@ class ConductorServerEndpoint(object):
 
         self._check_minion_pools_for_action(ctxt, replica)
 
-        # TODO(aznashwan): add scenario-appropriate steps for
-        # defining the Replica reservation:
-        self._check_create_reservation_for_transfer(
-            replica, licensing_client.RESERVATION_TYPE_REPLICA)
+        self._create_reservation_for_replica(replica)
 
         db_api.add_replica(ctxt, replica)
         LOG.info("Replica created: %s", replica.id)
@@ -1327,8 +1402,7 @@ class ConductorServerEndpoint(object):
             instance_osmorphing_minion_pool_mappings=None,
             skip_os_morphing=False, user_scripts=None):
         replica = self._get_replica(ctxt, replica_id, include_task_info=True)
-        self._check_reservation_for_transfer(
-            replica, licensing_client.RESERVATION_TYPE_REPLICA)
+        self._check_reservation_for_replica(replica)
         self._check_replica_running_executions(ctxt, replica)
         self._check_valid_replica_tasks_execution(replica, force)
         user_scripts = user_scripts or replica.user_scripts
@@ -1809,8 +1883,7 @@ class ConductorServerEndpoint(object):
         migration.instance_osmorphing_minion_pool_mappings = (
             instance_osmorphing_minion_pool_mappings)
 
-        self._check_create_reservation_for_transfer(
-            migration, licensing_client.RESERVATION_TYPE_MIGRATION)
+        self._create_reservation_for_replica(migration)
 
         self._check_minion_pools_for_action(ctxt, migration)
 
@@ -2386,6 +2459,57 @@ class ConductorServerEndpoint(object):
                 "No new tasks were started for execution '%s' following "
                 "state advancement after cancellation.", execution.id)
 
+    def _update_reservation_fulfillment_for_execution(self, ctxt, execution):
+        """ Updates the reservation fulfillment status for the parent
+        transfer action of the given execution based on its type.
+
+        Replica transfers are marked as fulfilled as soon as a Replica
+        Execution is successfully completed.
+        Live migration transfers are marked as fulfilled as soon as they
+        are deployed for the first (and only) time.
+        """
+        if execution.type not in (
+                constants.EXECUTION_TYPE_REPLICA_EXECUTION,
+                constants.EXECUTION_TYPE_REPLICA_DEPLOY):
+            LOG.debug(
+                f"Skipping setting reservation fulfillment for execution "
+                f"'{execution.id}' of type '{execution.type}'.")
+            return
+
+        if execution.type not in (
+                constants.EXECUTION_TYPE_REPLICA_EXECUTION,
+                constants.EXECUTION_TYPE_REPLICA_DEPLOY):
+            LOG.debug(
+                f"Skipping setting replica fulfillment for execution "
+                f"'{execution.id}' of type '{execution.type}'.")
+            return
+
+        transfer_action = execution.action
+        transfer_id = transfer_action.base_id
+        if transfer_action.type == constants.TRANSFER_ACTION_TYPE_MIGRATION:
+            deployment = self._get_migration(ctxt, transfer_id)
+            transfer_id = deployment.replica_id
+            transfer_action = self._get_replica(
+                ctxt, transfer_id, include_task_info=False)
+        else:
+            transfer_action = self._get_replica(
+                ctxt, execution.action_id, include_task_info=False)
+
+        if transfer_action.scenario == constants.REPLICA_SCENARIO_REPLICA and (
+                execution.type == constants.EXECUTION_TYPE_REPLICA_EXECUTION):
+            self._check_mark_reservation_fulfilled(
+                transfer_action, must_unfulfilled=False)
+        elif transfer_action.scenario == constants.REPLICA_SCENARIO_LIVE_MIGRATION and (
+                execution.type == constants.EXECUTION_TYPE_REPLICA_DEPLOY):
+            self._check_mark_reservation_fulfilled(
+                transfer_action, must_unfulfilled=False)
+        else:
+            LOG.debug(
+                f"Skipping setting replica fulfillment for execution "
+                f"'{execution.id}' of type '{execution.type}' on parent"
+                f"action {transfer_id} of scenario type "
+                f"{transfer_action.scenatio}.")
+
     def _set_tasks_execution_status(
             self, ctxt, execution, new_execution_status):
         previous_execution_status = execution.status
@@ -2396,6 +2520,10 @@ class ConductorServerEndpoint(object):
              "action": execution.action_id,
              "old_status": previous_execution_status})
 
+        if new_execution_status == constants.EXECUTION_STATUS_COMPLETED:
+            self._update_reservation_fulfillment_for_execution(
+                ctxt, execution)
+
         if new_execution_status in constants.FINALIZED_EXECUTION_STATUSES:
             # NOTE(aznashwan): because the taskflow flows within the minion
             # manager cannot [currently] be cancelled and are destined to

+ 6 - 0
coriolis/exception.py

@@ -137,6 +137,12 @@ class Conflict(CoriolisException):
     safe = True
 
 
+class LicensingException(Conflict):
+    message = _("Licensing exception occurred")
+    code = 409
+    safe = True
+
+
 class AdminRequired(NotAuthorized):
     message = _("User does not have admin privileges")
 

+ 6 - 0
coriolis/licensing/client.py

@@ -214,6 +214,12 @@ class LicensingClient(object):
             "/reservations/%s/refresh" % reservation_id, None,
             response_key="reservation")
 
+    def mark_reservation_fulfilled(self, reservation_id):
+        """ Marks the given reservation as fulfilled. """
+        return self._post(
+            "/reservations/%s/fulfill" % reservation_id, None,
+            response_key="reservation")
+
     def delete_reservation(self, reservation_id, raise_on_404=False):
         """ Deletes a reservation by its ID.
         Unless `raise_on_404` is set, ignores not found reservations.