ソースを参照

api: add pagination for transfer executions

At the moment, listing endpoint instances is the only Coriolis
API that supports pagination. For this reason, Coriolis clients
often retrieve much more database records than needed, leading to
poor performance.

We're now adding pagination to other Coriolis APIs, starting with
the transfer executions. A transfer can have a large amount of
executions, especially in case of cron jobs.

New optional parameters:
* limit - the maximum number of entries to retrieve
* marker - the last seen id, will not be retrieved again

The pagination will be performed on the db API side, leveraging the
"utils.paginate_query" helper from oslo_db.
Lucian Petrut 2 週間 前
コミット
28658596f9

+ 5 - 1
coriolis/api/v1/transfer_tasks_executions.py

@@ -2,6 +2,7 @@
 # All Rights Reserved.
 
 from coriolis.api.v1.views import transfer_tasks_execution_view
+from coriolis.api import common
 from coriolis.api import wsgi as api_wsgi
 from coriolis import exception
 from coriolis.policies import transfer_tasks_executions as executions_policies
@@ -31,9 +32,12 @@ class TransferTasksExecutionController(api_wsgi.Controller):
         context.can(
             executions_policies.get_transfer_executions_policy_label("list"))
 
+        marker, limit = common.get_paging_params(req)
+
         return transfer_tasks_execution_view.collection(
             self._transfer_tasks_execution_api.get_executions(
-                context, transfer_id, include_tasks=False))
+                context, transfer_id, include_tasks=False,
+                marker=marker, limit=limit))
 
     def detail(self, req, transfer_id):
         context = req.environ["coriolis.context"]

+ 6 - 2
coriolis/conductor/rpc/client.py

@@ -143,11 +143,15 @@ class ConductorClient(rpc.BaseRPCClient):
             shutdown_instances=shutdown_instances, auto_deploy=auto_deploy)
 
     def get_transfer_tasks_executions(self, ctxt, transfer_id,
-                                      include_tasks=False):
+                                      include_tasks=False,
+                                      marker=None,
+                                      limit=None):
         return self._call(
             ctxt, 'get_transfer_tasks_executions',
             transfer_id=transfer_id,
-            include_tasks=include_tasks)
+            include_tasks=include_tasks,
+            marker=marker,
+            limit=limit)
 
     def get_transfer_tasks_execution(self, ctxt, transfer_id, execution_id,
                                      include_task_info=False):

+ 7 - 2
coriolis/conductor/rpc/server.py

@@ -1152,10 +1152,15 @@ class ConductorServerEndpoint(object):
     @transfer_synchronized
     def get_transfer_tasks_executions(self, ctxt, transfer_id,
                                       include_tasks=False,
-                                      include_task_info=False):
+                                      include_task_info=False,
+                                      marker=None,
+                                      limit=None):
         return db_api.get_transfer_tasks_executions(
             ctxt, transfer_id, include_tasks,
-            include_task_info=include_task_info, to_dict=True)
+            include_task_info=include_task_info,
+            marker=marker,
+            limit=limit,
+            to_dict=True)
 
     @tasks_execution_synchronized
     def get_transfer_tasks_execution(self, ctxt, transfer_id, execution_id,

+ 20 - 3
coriolis/db/api.py

@@ -7,6 +7,7 @@ from oslo_config import cfg
 from oslo_db import api as db_api
 from oslo_db import options as db_options
 from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import utils as sqlalchemyutils
 from oslo_log import log as logging
 from oslo_utils import timeutils
 from sqlalchemy import func
@@ -275,7 +276,10 @@ def delete_endpoint(context, endpoint_id):
 
 @enginefacade.reader
 def get_transfer_tasks_executions(context, transfer_id, include_tasks=False,
-                                  include_task_info=False, to_dict=False):
+                                  include_task_info=False,
+                                  marker=None,
+                                  limit=None,
+                                  to_dict=False):
     q = _soft_delete_aware_query(context, models.TasksExecution)
     q = q.join(models.Transfer)
     if include_task_info:
@@ -285,8 +289,21 @@ def get_transfer_tasks_executions(context, transfer_id, include_tasks=False,
     if is_user_context(context):
         q = q.filter(models.Transfer.project_id == context.project_id)
 
-    db_result = q.filter(
-        models.Transfer.id == transfer_id).all()
+    q = q.filter(models.Transfer.id == transfer_id)
+
+    if marker:
+        try:
+            marker = get_transfer_tasks_execution(
+                context, transfer_id, marker)
+        except exception.NotFound:
+            raise exception.MarkerNotFound(marker=marker)
+
+    if marker or limit:
+        q = sqlalchemy.paginate_query(
+            q, models.TasksExecution, limit,
+            sort_keys=['id'], marker=marker)
+
+    db_result = q.all()
     if to_dict:
         return [e.to_dict() for e in db_result]
     return db_result

+ 5 - 0
coriolis/exception.py

@@ -289,6 +289,11 @@ class NotFound(CoriolisException):
     safe = True
 
 
+class MarkerNotFound(NotFound):
+    message = _(
+        "Could not find database record "
+        "identified by marker: %(marker)s")
+
 class RegionNotFound(NotFound):
     message = _("The specified Coriolis region(s) could not be found.")
 

+ 3 - 2
coriolis/transfer_tasks_executions/api.py

@@ -20,9 +20,10 @@ class API(object):
         self._rpc_client.cancel_transfer_tasks_execution(
             ctxt, transfer_id, execution_id, force)
 
-    def get_executions(self, ctxt, transfer_id, include_tasks=False):
+    def get_executions(self, ctxt, transfer_id, include_tasks=False,
+                       marker=None, limit=None):
         return self._rpc_client.get_transfer_tasks_executions(
-            ctxt, transfer_id, include_tasks)
+            ctxt, transfer_id, include_tasks, marker, limit)
 
     def get_execution(self, ctxt, transfer_id, execution_id):
         return self._rpc_client.get_transfer_tasks_execution(