Jelajahi Sumber

Add `include_task_info` to action DB fetching operations

Adds `include_task_info` option to all DB API action fetching methods that
will control the loading of `info` column for a `BaseTransferAction`.
Also adds `to_dict` option that turns the model objects into dictionaries
for GET requests. This is necessary because serialization of model objects
with deferred columns is not possible without an active DB session.
Daniel Vincze 4 tahun lalu
induk
melakukan
f80ce70bf3
3 mengubah file dengan 108 tambahan dan 55 penghapusan
  1. 51 29
      coriolis/conductor/rpc/server.py
  2. 49 19
      coriolis/db/api.py
  3. 8 7
      coriolis/db/sqlalchemy/models.py

+ 51 - 29
coriolis/conductor/rpc/server.py

@@ -813,7 +813,7 @@ class ConductorServerEndpoint(object):
 
     @replica_synchronized
     def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
-        replica = self._get_replica(ctxt, replica_id)
+        replica = self._get_replica(ctxt, replica_id, include_task_info=True)
         self._check_reservation_for_transfer(
             replica, licensing_client.RESERVATION_TYPE_REPLICA)
         self._check_replica_running_executions(ctxt, replica)
@@ -1029,14 +1029,18 @@ class ConductorServerEndpoint(object):
 
     @replica_synchronized
     def get_replica_tasks_executions(self, ctxt, replica_id,
-                                     include_tasks=False):
+                                     include_tasks=False,
+                                     include_task_info=False):
         return db_api.get_replica_tasks_executions(
-            ctxt, replica_id, include_tasks)
+            ctxt, replica_id, include_tasks,
+            include_task_info=include_task_info, to_dict=True)
 
     @tasks_execution_synchronized
-    def get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
+    def get_replica_tasks_execution(self, ctxt, replica_id, execution_id,
+                                    include_task_info=False):
         return self._get_replica_tasks_execution(
-            ctxt, replica_id, execution_id)
+            ctxt, replica_id, execution_id,
+            include_task_info=include_task_info, to_dict=True)
 
     @tasks_execution_synchronized
     def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
@@ -1066,21 +1070,28 @@ class ConductorServerEndpoint(object):
                     replica_id))
         self._cancel_tasks_execution(ctxt, execution, force=force)
 
-    def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
+    def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id,
+                                     include_task_info=False, to_dict=False):
         execution = db_api.get_replica_tasks_execution(
-            ctxt, replica_id, execution_id)
+            ctxt, replica_id, execution_id,
+            include_task_info=include_task_info, to_dict=to_dict)
         if not execution:
             raise exception.NotFound(
                 "Execution with ID '%s' for Replica '%s' not found." % (
                     execution_id, replica_id))
         return execution
 
-    def get_replicas(self, ctxt, include_tasks_executions=False):
-        return db_api.get_replicas(ctxt, include_tasks_executions)
+    def get_replicas(self, ctxt, include_tasks_executions=False,
+                     include_task_info=False):
+        return db_api.get_replicas(
+            ctxt, include_tasks_executions,
+            include_task_info=include_task_info, to_dict=True)
 
     @replica_synchronized
-    def get_replica(self, ctxt, replica_id):
-        return self._get_replica(ctxt, replica_id)
+    def get_replica(self, ctxt, replica_id, include_task_info=False):
+        return self._get_replica(
+            ctxt, replica_id,
+            include_task_info=include_task_info, to_dict=True)
 
     @replica_synchronized
     def delete_replica(self, ctxt, replica_id):
@@ -1091,7 +1102,7 @@ class ConductorServerEndpoint(object):
 
     @replica_synchronized
     def delete_replica_disks(self, ctxt, replica_id):
-        replica = self._get_replica(ctxt, replica_id)
+        replica = self._get_replica(ctxt, replica_id, include_task_info=True)
         self._check_replica_running_executions(ctxt, replica)
 
         execution = models.TasksExecution()
@@ -1196,23 +1207,28 @@ class ConductorServerEndpoint(object):
         LOG.info("Replica created: %s", replica.id)
         return self.get_replica(ctxt, replica.id)
 
-    def _get_replica(self, ctxt, replica_id):
-        replica = db_api.get_replica(ctxt, replica_id)
+    def _get_replica(self, ctxt, replica_id, include_task_info=False,
+                     to_dict=False):
+        replica = db_api.get_replica(
+            ctxt, replica_id, include_task_info=include_task_info,
+            to_dict=to_dict)
         if not replica:
             raise exception.NotFound(
                 "Replica with ID '%s' not found." % replica_id)
         return replica
 
     def get_migrations(self, ctxt, include_tasks,
-                       include_info=False):
+                       include_task_info=False):
         return db_api.get_migrations(
             ctxt, include_tasks,
-            include_info=include_info)
+            include_task_info=include_task_info,
+            to_dict=True)
 
     @migration_synchronized
-    def get_migration(self, ctxt, migration_id):
-        # the default serialization mechanism enforces a max_depth of 3
-        return utils.to_dict(self._get_migration(ctxt, migration_id))
+    def get_migration(self, ctxt, migration_id, include_task_info=False):
+        return self._get_migration(
+            ctxt, migration_id, include_task_info=include_task_info,
+            to_dict=True)
 
     @staticmethod
     def _check_running_replica_migrations(ctxt, replica_id):
@@ -1265,7 +1281,7 @@ class ConductorServerEndpoint(object):
                                  instance_osmorphing_minion_pool_mappings=None,
                                  skip_os_morphing=False,
                                  user_scripts=None):
-        replica = self._get_replica(ctxt, replica_id)
+        replica = self._get_replica(ctxt, replica_id, include_task_info=True)
         self._check_reservation_for_transfer(
             replica, licensing_client.RESERVATION_TYPE_REPLICA)
         self._check_replica_running_executions(ctxt, replica)
@@ -1610,7 +1626,7 @@ class ConductorServerEndpoint(object):
     @replica_synchronized
     def confirm_replica_minions_allocation(
             self, ctxt, replica_id, minion_machine_allocations):
-        replica = self._get_replica(ctxt, replica_id)
+        replica = self._get_replica(ctxt, replica_id, include_task_info=True)
 
         awaiting_minions_status = (
             constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
@@ -1658,7 +1674,8 @@ class ConductorServerEndpoint(object):
     @migration_synchronized
     def confirm_migration_minions_allocation(
             self, ctxt, migration_id, minion_machine_allocations):
-        migration = self._get_migration(ctxt, migration_id)
+        migration = self._get_migration(
+            ctxt, migration_id, include_task_info=True)
 
         awaiting_minions_status = (
             constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
@@ -2086,8 +2103,11 @@ class ConductorServerEndpoint(object):
 
         return self.get_migration(ctxt, migration.id)
 
-    def _get_migration(self, ctxt, migration_id):
-        migration = db_api.get_migration(ctxt, migration_id)
+    def _get_migration(self, ctxt, migration_id, include_task_info=False,
+                       to_dict=False):
+        migration = db_api.get_migration(
+            ctxt, migration_id, include_task_info=include_task_info,
+            to_dict=to_dict)
         if not migration:
             raise exception.NotFound(
                 "Migration with ID '%s' not found." % migration_id)
@@ -2581,7 +2601,8 @@ class ConductorServerEndpoint(object):
 
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
-        action = db_api.get_action(ctxt, execution.action_id)
+        action = db_api.get_action(
+            ctxt, execution.action_id, include_task_info=True)
         origin_endpoint = db_api.get_endpoint(
             ctxt, execution.action.origin_endpoint_id)
         destination_endpoint = db_api.get_endpoint(
@@ -3121,7 +3142,7 @@ class ConductorServerEndpoint(object):
                     execution.type] % execution.action_id,
                 external=True):
             action_id = execution.action_id
-            action = db_api.get_action(ctxt, action_id)
+            action = db_api.get_action(ctxt, action_id, include_task_info=True)
 
             updated_task_info = None
             if task_result:
@@ -3139,7 +3160,8 @@ class ConductorServerEndpoint(object):
                     db_api.update_transfer_action_info_for_instance(
                         ctxt, action_id, task.instance, task_result))
             else:
-                action = db_api.get_action(ctxt, action_id)
+                action = db_api.get_action(
+                    ctxt, action_id, include_task_info=True)
                 updated_task_info = action.info[task.instance]
                 LOG.info(
                     "Task '%s' for instance '%s' of transfer action '%s' "
@@ -3339,7 +3361,7 @@ class ConductorServerEndpoint(object):
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
         action_id = execution.action_id
-        action = db_api.get_action(ctxt, action_id)
+        action = db_api.get_action(ctxt, action_id, include_task_info=True)
         with lockutils.lock(
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
                     execution.type] % action_id,
@@ -3507,7 +3529,7 @@ class ConductorServerEndpoint(object):
     @replica_synchronized
     def update_replica(
             self, ctxt, replica_id, updated_properties):
-        replica = self._get_replica(ctxt, replica_id)
+        replica = self._get_replica(ctxt, replica_id, include_task_info=True)
 
         minion_pool_fields = [
             "origin_minion_pool_id", "destination_minion_pool_id",

+ 49 - 19
coriolis/db/api.py

@@ -273,27 +273,41 @@ def delete_endpoint(context, endpoint_id):
 
 
 @enginefacade.reader
-def get_replica_tasks_executions(context, replica_id, include_tasks=False):
+def get_replica_tasks_executions(context, replica_id, include_tasks=False,
+                                 include_task_info=False, to_dict=False):
     q = _soft_delete_aware_query(context, models.TasksExecution)
     q = q.join(models.Replica)
+    if include_task_info:
+        q = q.options(orm.joinedload('action').undefer('info'))
     if include_tasks:
         q = _get_tasks_with_details_options(q)
     if is_user_context(context):
         q = q.filter(models.Replica.project_id == context.project_id)
-    return q.filter(
+
+    db_result = q.filter(
         models.Replica.id == replica_id).all()
+    if to_dict:
+        return [e.to_dict() for e in db_result]
+    return db_result
 
 
 @enginefacade.reader
-def get_replica_tasks_execution(context, replica_id, execution_id):
+def get_replica_tasks_execution(context, replica_id, execution_id,
+                                include_task_info=False, to_dict=False):
     q = _soft_delete_aware_query(context, models.TasksExecution).join(
         models.Replica)
+    if include_task_info:
+        q = q.options(orm.joinedload('action').undefer('info'))
     q = _get_tasks_with_details_options(q)
     if is_user_context(context):
         q = q.filter(models.Replica.project_id == context.project_id)
-    return q.filter(
+
+    db_result = q.filter(
         models.Replica.id == replica_id,
         models.TasksExecution.id == execution_id).first()
+    if to_dict:
+        return db_result.to_dict()
+    return db_result
 
 
 @enginefacade.writer
@@ -410,13 +424,13 @@ def _get_replica_with_tasks_executions_options(q):
 @enginefacade.reader
 def get_replicas(context,
                  include_tasks_executions=False,
-                 include_info=False,
-                 to_dict=True):
+                 include_task_info=False,
+                 to_dict=False):
     q = _soft_delete_aware_query(context, models.Replica)
     if include_tasks_executions:
         q = _get_replica_with_tasks_executions_options(q)
-    if include_info is False:
-        q = q.options(orm.defer('info'))
+    if include_task_info:
+        q = q.options(orm.undefer('info'))
     q = q.filter()
     if is_user_context(context):
         q = q.filter(
@@ -425,21 +439,28 @@ def get_replicas(context,
     if to_dict:
         return [
             i.to_dict(
-                include_info=include_info,
+                include_task_info=include_task_info,
                 include_executions=include_tasks_executions)
             for i in db_result]
     return db_result
 
 
 @enginefacade.reader
-def get_replica(context, replica_id):
+def get_replica(context, replica_id, include_task_info=False, to_dict=False):
     q = _soft_delete_aware_query(context, models.Replica)
     q = _get_replica_with_tasks_executions_options(q)
+    if include_task_info:
+        q = q.options(orm.undefer('info'))
     if is_user_context(context):
         q = q.filter(
             models.Replica.project_id == context.project_id)
-    return q.filter(
+
+    replica = q.filter(
         models.Replica.id == replica_id).first()
+    if to_dict:
+        return replica.to_dict(include_task_info=include_task_info)
+
+    return replica
 
 
 @enginefacade.reader
@@ -495,14 +516,14 @@ def get_replica_migrations(context, replica_id):
 
 @enginefacade.reader
 def get_migrations(context, include_tasks=False,
-                   include_info=False, to_dict=True):
+                   include_task_info=False, to_dict=False):
     q = _soft_delete_aware_query(context, models.Migration)
     if include_tasks:
         q = _get_migration_task_query_options(q)
     else:
         q = q.options(orm.joinedload("executions"))
-    if include_info is False:
-        q = q.options(orm.defer('info'))
+    if include_task_info:
+        q = q.options(orm.undefer('info'))
 
     args = {}
     if is_user_context(context):
@@ -510,7 +531,7 @@ def get_migrations(context, include_tasks=False,
     result = q.filter_by(**args).all()
     if to_dict:
         return [i.to_dict(
-            include_info=include_info,
+            include_task_info=include_task_info,
             include_tasks=include_tasks) for i in result]
     return result
 
@@ -537,13 +558,20 @@ def _get_migration_task_query_options(query):
 
 
 @enginefacade.reader
-def get_migration(context, migration_id):
+def get_migration(context, migration_id, include_task_info=False,
+                  to_dict=False):
     q = _soft_delete_aware_query(context, models.Migration)
     q = _get_migration_task_query_options(q)
+    if include_task_info:
+        q = q.options(orm.undefer('info'))
     args = {"id": migration_id}
     if is_user_context(context):
         args["project_id"] = context.project_id
-    return q.filter_by(**args).first()
+    db_result = q.filter_by(**args).first()
+
+    if to_dict:
+        return db_result.to_dict(include_task_info=include_task_info)
+    return db_result
 
 
 @enginefacade.writer
@@ -581,9 +609,11 @@ def set_execution_status(
 
 
 @enginefacade.reader
-def get_action(context, action_id):
+def get_action(context, action_id, include_task_info=False):
     action = _soft_delete_aware_query(
         context, models.BaseTransferAction)
+    if include_task_info:
+        action = action.options(orm.undefer('info'))
     if is_user_context(context):
         action = action.filter(
             models.BaseTransferAction.project_id == context.project_id)
@@ -609,7 +639,7 @@ def update_transfer_action_info_for_instance(
     Returns the updated value.
     Sub-fields of the dict already in the info will get overwritten entirely!
     """
-    action = get_action(context, action_id)
+    action = get_action(context, action_id, include_task_info=True)
     if not new_instance_info:
         LOG.debug(
             "No new info provided for action '%s' and instance '%s'. "

+ 8 - 7
coriolis/db/sqlalchemy/models.py

@@ -256,7 +256,7 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
         sqlalchemy.String(255), nullable=False,
         default=lambda: constants.EXECUTION_STATUS_UNEXECUTED)
     reservation_id = sqlalchemy.Column(sqlalchemy.String(36), nullable=True)
-    info = sqlalchemy.Column(types.Bson, nullable=False)
+    info = orm.deferred(sqlalchemy.Column(types.Bson, nullable=False))
     notes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     origin_endpoint_id = sqlalchemy.Column(
         sqlalchemy.String(36),
@@ -283,7 +283,7 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
         'polymorphic_on': type,
     }
 
-    def to_dict(self, include_info=True, include_executions=True):
+    def to_dict(self, include_task_info=True, include_executions=True):
         result = {
             "base_id": self.base_id,
             "user_id": self.user_id,
@@ -314,7 +314,7 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
         if include_executions:
             for ex in self.executions:
                 result["executions"].append(ex.to_dict())
-        if include_info:
+        if include_task_info:
             result["info"] = self.info
         return result
 
@@ -331,9 +331,9 @@ class Replica(BaseTransferAction):
         'polymorphic_identity': 'replica',
     }
 
-    def to_dict(self, include_info=True, include_executions=True):
+    def to_dict(self, include_task_info=True, include_executions=True):
         base = super(Replica, self).to_dict(
-            include_info=include_info,
+            include_task_info=include_task_info,
             include_executions=include_executions)
         base.update({"id": self.id})
         return base
@@ -360,9 +360,10 @@ class Migration(BaseTransferAction):
         'polymorphic_identity': 'migration',
     }
 
-    def to_dict(self, include_info=True, include_tasks=True):
+    def to_dict(self, include_task_info=True, include_tasks=True):
         base = super(Migration, self).to_dict(
-            include_info=include_info, include_executions=include_tasks)
+            include_task_info=include_task_info,
+            include_executions=include_tasks)
         base.update({
             "id": self.id,
             "replica_id": self.replica_id,