Просмотр исходного кода

Optimize transfer execution lookups

Avoid loading full transfer execution histories and enable efficient
DB-side filtering for execution lookups.

Signed-off-by: Mihaela Balutoiu <mbalutoiu@cloudbasesolutions.com>
Mihaela Balutoiu 1 неделя назад
Родитель
Сommit
2b4c293f27

+ 42 - 37
coriolis/conductor/rpc/server.py

@@ -1416,34 +1416,43 @@ class ConductorServerEndpoint(object):
                     "Transfer '%s' is currently being deployed" % transfer_id)
 
     @staticmethod
-    def _check_running_executions(action):
-        running_executions = [
-            e.id for e in action.executions
-            if e.status in constants.ACTIVE_EXECUTION_STATUSES]
+    def _check_running_executions(ctxt, transfer):
+        running_executions = db_api.get_transfer_tasks_executions(
+            ctxt, transfer.id,
+            filters={"status": constants.ACTIVE_EXECUTION_STATUSES})
         if running_executions:
             raise exception.InvalidActionTasksExecutionState(
                 "Another tasks execution is in progress: %s" % (
-                    running_executions))
+                    [e.id for e in running_executions]))
 
     def _check_transfer_running_executions(self, ctxt, transfer):
-        self._check_running_executions(transfer)
+        self._check_running_executions(ctxt, transfer)
         self._check_running_transfer_deployments(ctxt, transfer.id)
 
     @staticmethod
-    def _check_valid_transfer_tasks_execution(transfer, force=False):
-        sorted_executions = sorted(
-            transfer.executions, key=lambda e: e.number, reverse=True)
-        if not sorted_executions and not force:
-            raise exception.InvalidTransferState(
-                "The Transfer has never been executed.")
+    def _get_last_transfer_execution(ctxt, transfer_id, status=None,
+                                     execution_type=None):
+        filters = {}
+        if status is not None:
+            filters["status"] = status
+        if execution_type is not None:
+            filters["type"] = execution_type
+        executions = db_api.get_transfer_tasks_executions(
+            ctxt, transfer_id, filters=filters,
+            sort_keys=["number"], sort_dirs=["desc"], limit=1)
+        return executions[0] if executions else None
+
+    def _check_transfer_deploy_ability(self, ctxt, transfer, force=False):
+        if force:
+            return
 
-        if not [e for e in sorted_executions
-                if e.type == constants.EXECUTION_TYPE_TRANSFER_EXECUTION and (
-                    e.status == constants.EXECUTION_STATUS_COMPLETED)]:
-            if not force:
-                raise exception.InvalidTransferState(
-                    "A transfer must have been executed successfully at least "
-                    "once in order to be deployed")
+        completed_execution = self._get_last_transfer_execution(
+            ctxt, transfer.id, status=constants.EXECUTION_STATUS_COMPLETED,
+            execution_type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION)
+        if completed_execution is None:
+            raise exception.InvalidTransferState(
+                "A transfer must have been executed successfully at least "
+                "once in order to be deployed")
 
     def _get_provider_types(self, ctxt, endpoint):
         provider_types = self.get_available_providers(ctxt).get(endpoint.type)
@@ -1454,7 +1463,7 @@ class ConductorServerEndpoint(object):
 
     def _validate_deployment_inputs(self, ctxt, deployment, transfer, force):
         self._check_transfer_running_executions(ctxt, transfer)
-        self._check_valid_transfer_tasks_execution(transfer, force)
+        self._check_transfer_deploy_ability(ctxt, transfer, force)
         for instance, info in transfer.info.items():
             if not info.get("volumes_info"):
                 raise exception.InvalidTransferState(
@@ -1874,18 +1883,6 @@ class ConductorServerEndpoint(object):
             db_api.update_transfer_action_info_for_instance(
                 ctxt, action.id, instance, action.info[instance])
 
-    def _get_last_execution_for_transfer(self, ctxt, transfer, requery=False):
-        if requery:
-            transfer = self._get_transfer(ctxt, transfer.id)
-        last_transfer_execution = None
-        if not transfer.executions:
-            raise exception.InvalidTransferState(
-                "Transfer with ID '%s' has no existing Trasnfer "
-                "executions." % transfer.id)
-        last_transfer_execution = sorted(
-            transfer.executions, key=lambda e: e.number)[-1]
-        return last_transfer_execution
-
     def _get_execution_for_deployment(self, ctxt, deployment, requery=False,
                                       raise_on_executions_empty=True):
         if requery:
@@ -1917,8 +1914,12 @@ class ConductorServerEndpoint(object):
                 "have minion machines allocated for it." % (
                     transfer.last_execution_status, awaiting_minions_status))
 
-        last_transfer_execution = self._get_last_execution_for_transfer(
-            ctxt, transfer, requery=False)
+        last_transfer_execution = self._get_last_transfer_execution(
+            ctxt, transfer.id)
+        if last_transfer_execution is None:
+            raise exception.InvalidTransferState(
+                "Transfer with ID '%s' has no existing Transfer "
+                "executions." % transfer.id)
         self._update_task_info_for_minion_allocations(
             ctxt, transfer, minion_machine_allocations)
 
@@ -1939,8 +1940,12 @@ class ConductorServerEndpoint(object):
                 "have minion machines allocations fail for it." % (
                     transfer.last_execution_status, awaiting_minions_status))
 
-        last_transfer_execution = self._get_last_execution_for_transfer(
-            ctxt, transfer, requery=False)
+        last_transfer_execution = self._get_last_transfer_execution(
+            ctxt, transfer.id)
+        if last_transfer_execution is None:
+            raise exception.InvalidTransferState(
+                "Transfer with ID '%s' has no existing Transfer "
+                "executions." % transfer.id)
         LOG.warn(
             "Error occurred while allocating minion machines for Transfer "
             "'%s'. Cancelling the current Transfer Execution ('%s'). "
@@ -3520,7 +3525,7 @@ class ConductorServerEndpoint(object):
             self._check_minion_pools_for_action(ctxt, dummy)
 
         self._check_transfer_running_executions(ctxt, transfer)
-        self._check_valid_transfer_tasks_execution(transfer, force=True)
+        self._check_transfer_deploy_ability(ctxt, transfer, force=True)
         if updated_properties.get('user_scripts'):
             transfer.user_scripts = updated_properties['user_scripts']
         execution = models.TasksExecution()

+ 13 - 3
coriolis/db/api.py

@@ -298,7 +298,16 @@ def get_transfer_tasks_executions(context, transfer_id, include_tasks=False,
     filters = copy.deepcopy(filters or {})
     if "status" in filters:
         status = filters.pop("status")
-        q = q.filter(models.TasksExecution.status == status)
+        if isinstance(status, (list, tuple, set)):
+            q = q.filter(models.TasksExecution.status.in_(status))
+        else:
+            q = q.filter(models.TasksExecution.status == status)
+    if "type" in filters:
+        execution_type = filters.pop("type")
+        if isinstance(execution_type, (list, tuple, set)):
+            q = q.filter(models.TasksExecution.type.in_(execution_type))
+        else:
+            q = q.filter(models.TasksExecution.type == execution_type)
     if filters:
         raise ValueError("Unsupported filters: %s" % filters)
 
@@ -520,7 +529,6 @@ def get_transfer(context, transfer_id,
                  include_task_info=False,
                  to_dict=False):
     q = _soft_delete_aware_query(context, models.Transfer)
-    q = _get_transfer_with_tasks_executions_options(q)
     if include_task_info:
         q = q.options(orm.undefer('info'))
     if transfer_scenario:
@@ -533,7 +541,9 @@ def get_transfer(context, transfer_id,
     transfer = q.filter(
         models.Transfer.id == transfer_id).first()
     if to_dict and transfer is not None:
-        return transfer.to_dict(include_task_info=include_task_info)
+        return transfer.to_dict(
+            include_task_info=include_task_info,
+            include_executions=False)
 
     return transfer
 

+ 116 - 114
coriolis/tests/conductor/rpc/test_server.py

@@ -2229,26 +2229,25 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             mock.sentinel.transfer_id,
         )
 
-    def test_check_running_executions(self):
-        execution_1 = mock.Mock()
-        execution_2 = mock.Mock()
-        action = mock.Mock()
-        execution_1.status = constants.EXECUTION_STATUS_COMPLETED
-        execution_2.status = constants.EXECUTION_STATUS_COMPLETED
-        action.executions = [execution_1, execution_2]
-        self.server._check_running_executions(action)
-
-    def test_check_running_executions_invalid_state(self):
-        execution_1 = mock.Mock()
-        execution_2 = mock.Mock()
-        action = mock.Mock()
-        execution_1.status = constants.EXECUTION_STATUS_COMPLETED
-        execution_2.status = constants.EXECUTION_STATUS_RUNNING
-        action.executions = [execution_1, execution_2]
+    @mock.patch.object(db_api, 'get_transfer_tasks_executions')
+    def test_check_running_executions(self, mock_get_executions):
+        transfer = mock.Mock()
+        mock_get_executions.return_value = []
+        self.server._check_running_executions(
+            mock.sentinel.context, transfer)
+        mock_get_executions.assert_called_once_with(
+            mock.sentinel.context, transfer.id,
+            filters={"status": constants.ACTIVE_EXECUTION_STATUSES})
+
+    @mock.patch.object(db_api, 'get_transfer_tasks_executions')
+    def test_check_running_executions_invalid_state(self, mock_get_executions):
+        transfer = mock.Mock()
+        mock_get_executions.return_value = [mock.Mock()]
         self.assertRaises(
             exception.InvalidActionTasksExecutionState,
             self.server._check_running_executions,
-            action
+            mock.sentinel.context,
+            transfer
         )
 
     @mock.patch.object(server.ConductorServerEndpoint,
@@ -2266,62 +2265,99 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             transfer
         )
 
-        mock_check_running_executions.assert_called_once_with(transfer)
+        mock_check_running_executions.assert_called_once_with(
+            mock.sentinel.context, transfer)
         mock_check_running_transfer_deployments.assert_called_once_with(
             mock.sentinel.context,
             transfer.id
         )
 
-    def test_check_valid_transfer_tasks_execution(self):
-        execution1 = mock.Mock(
-            number=1,
-            type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION,
-            status=constants.EXECUTION_STATUS_COMPLETED,
-        )
-        execution2 = mock.Mock(
-            number=2,
-            type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION,
-            status=constants.EXECUTION_STATUS_COMPLETED,
-        )
-        mock_transfer = mock.Mock(
-            executions=[execution1, execution2]
-        )
-        self.server._check_valid_transfer_tasks_execution(
-            mock_transfer
-        )
+    @mock.patch.object(server.ConductorServerEndpoint,
+                       '_get_last_transfer_execution')
+    def test_check_transfer_deploy_ability(self, mock_get_last):
+        mock_transfer = mock.Mock()
 
-        # raises exception if all executions are incomplete
-        execution1.status = constants.EXECUTION_STATUS_UNEXECUTED
-        execution2.status = constants.EXECUTION_STATUS_UNEXECUTED
+        # doesn't raise if a completed transfer execution exists
+        mock_get_last.return_value = mock.Mock()
+        self.server._check_transfer_deploy_ability(
+            mock.sentinel.context, mock_transfer)
+        mock_get_last.assert_called_once_with(
+            mock.sentinel.context, mock_transfer.id,
+            status=constants.EXECUTION_STATUS_COMPLETED,
+            execution_type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION)
 
+        # raises if no completed execution exists
+        mock_get_last.return_value = None
         self.assertRaises(
             exception.InvalidTransferState,
-            self.server._check_valid_transfer_tasks_execution,
+            self.server._check_transfer_deploy_ability,
+            mock.sentinel.context,
             mock_transfer
         )
 
-        # doesn't raise exception if all executions are incomplete
-        # and is forced
-        self.server._check_valid_transfer_tasks_execution(
-            mock_transfer,
-            True
-        )
-
-        # doesn't raise exception if only one execution is completed
-        execution1.status = constants.EXECUTION_STATUS_COMPLETED
-        execution2.status = constants.EXECUTION_STATUS_UNEXECUTED
+        # doesn't raise when forced, without querying executions
+        mock_get_last.reset_mock()
+        self.server._check_transfer_deploy_ability(
+            mock.sentinel.context, mock_transfer, True)
+        mock_get_last.assert_not_called()
 
-        self.server._check_valid_transfer_tasks_execution(
-            mock_transfer
-        )
-
-        mock_transfer.executions = []
+    @mock.patch.object(db_api, 'get_transfer_tasks_executions')
+    def test_get_last_transfer_execution(self, mock_get_executions):
+        execution = mock.Mock()
+        mock_get_executions.return_value = [execution]
 
-        self.assertRaises(
-            exception.InvalidTransferState,
-            self.server._check_valid_transfer_tasks_execution,
-            mock_transfer
-        )
+        # with a status and type filter (e.g. last completed/deployable
+        # transfer execution)
+        result = self.server._get_last_transfer_execution(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            status=constants.EXECUTION_STATUS_COMPLETED,
+            execution_type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION)
+        self.assertEqual(execution, result)
+        mock_get_executions.assert_called_once_with(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            filters={
+                "status": constants.EXECUTION_STATUS_COMPLETED,
+                "type": constants.EXECUTION_TYPE_TRANSFER_EXECUTION,
+            },
+            sort_keys=["number"], sort_dirs=["desc"], limit=1)
+
+        # with only a status filter (type not constrained)
+        mock_get_executions.reset_mock()
+        result = self.server._get_last_transfer_execution(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            status=constants.EXECUTION_STATUS_COMPLETED)
+        self.assertEqual(execution, result)
+        mock_get_executions.assert_called_once_with(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            filters={"status": constants.EXECUTION_STATUS_COMPLETED},
+            sort_keys=["number"], sort_dirs=["desc"], limit=1)
+
+        # with only a type filter (status not constrained)
+        mock_get_executions.reset_mock()
+        result = self.server._get_last_transfer_execution(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            execution_type=constants.EXECUTION_TYPE_TRANSFER_EXECUTION)
+        self.assertEqual(execution, result)
+        mock_get_executions.assert_called_once_with(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            filters={"type": constants.EXECUTION_TYPE_TRANSFER_EXECUTION},
+            sort_keys=["number"], sort_dirs=["desc"], limit=1)
+
+        # without any filter (latest execution of any status/type)
+        mock_get_executions.reset_mock()
+        result = self.server._get_last_transfer_execution(
+            mock.sentinel.context, mock.sentinel.transfer_id)
+        self.assertEqual(execution, result)
+        mock_get_executions.assert_called_once_with(
+            mock.sentinel.context, mock.sentinel.transfer_id,
+            filters={},
+            sort_keys=["number"], sort_dirs=["desc"], limit=1)
+
+        # returns None when no execution matches
+        mock_get_executions.return_value = []
+        result = self.server._get_last_transfer_execution(
+            mock.sentinel.context, mock.sentinel.transfer_id)
+        self.assertIsNone(result)
 
     @mock.patch.object(server.ConductorServerEndpoint,
                        "get_available_providers")
@@ -2714,39 +2750,6 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             expected_action_info[mock.sentinel.instance2]
         )
 
-    @mock.patch.object(server.ConductorServerEndpoint, '_get_transfer')
-    def test_get_last_execution_for_transfer(
-        self,
-        mock_get_transfer
-    ):
-        transfer = mock.Mock()
-        transfer.id = mock.sentinel.id
-        execution1 = mock.Mock(id=mock.sentinel.execution_id1, number=1)
-        execution2 = mock.Mock(id=mock.sentinel.execution_id2, number=3)
-        execution3 = mock.Mock(id=mock.sentinel.execution_id3, number=2)
-        transfer.executions = [execution1, execution2, execution3]
-        mock_get_transfer.return_value = transfer
-        result = self.server._get_last_execution_for_transfer(
-            mock.sentinel.context,
-            transfer,
-            requery=False
-        )
-        self.assertEqual(
-            execution2,
-            result
-        )
-        mock_get_transfer.assert_not_called()
-        transfer.executions = None
-        self.assertRaises(
-            exception.InvalidTransferState,
-            self.server._get_last_execution_for_transfer,
-            mock.sentinel.context,
-            transfer,
-            requery=True
-        )
-        mock_get_transfer.assert_called_once_with(
-            mock.sentinel.context, mock.sentinel.id)
-
     @mock.patch.object(server.ConductorServerEndpoint, '_get_deployment')
     def test_get_execution_for_deployment(
         self,
@@ -2792,12 +2795,12 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(server.ConductorServerEndpoint,
                        '_update_task_info_for_minion_allocations')
     @mock.patch.object(server.ConductorServerEndpoint,
-                       '_get_last_execution_for_transfer')
+                       '_get_last_transfer_execution')
     @mock.patch.object(server.ConductorServerEndpoint, '_get_transfer')
     def test_confirm_transfer_minions_allocation(
         self,
         mock_get_transfer,
-        mock_get_last_execution_for_transfer,
+        mock_get_last_transfer_execution,
         mock_update_task_info_for_minion_allocations,
         mock_get_transfer_tasks_execution,
         mock_begin_tasks
@@ -2818,10 +2821,9 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             mock.sentinel.transfer_id,
             include_task_info=True
         )
-        mock_get_last_execution_for_transfer.assert_called_once_with(
+        mock_get_last_transfer_execution.assert_called_once_with(
             mock.sentinel.context,
-            mock_get_transfer.return_value,
-            requery=False
+            mock_get_transfer.return_value.id
         )
         mock_update_task_info_for_minion_allocations.assert_called_once_with(
             mock.sentinel.context,
@@ -2831,7 +2833,7 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
         mock_get_transfer_tasks_execution.assert_called_once_with(
             mock.sentinel.context,
             mock_get_transfer.return_value.id,
-            mock_get_last_execution_for_transfer.return_value.id
+            mock_get_last_transfer_execution.return_value.id
         )
         mock_begin_tasks.assert_called_once_with(
             mock.sentinel.context,
@@ -2844,12 +2846,12 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(server.ConductorServerEndpoint,
                        '_update_task_info_for_minion_allocations')
     @mock.patch.object(server.ConductorServerEndpoint,
-                       '_get_last_execution_for_transfer')
+                       '_get_last_transfer_execution')
     @mock.patch.object(server.ConductorServerEndpoint, '_get_transfer')
     def test_confirm_transfer_minions_allocation_unexpected_status(
         self,
         mock_get_transfer,
-        mock_get_last_execution_for_transfer,
+        mock_get_last_transfer_execution,
         mock_update_task_info_for_minion_allocations,
         mock_get_transfer_tasks_execution,
         mock_begin_tasks
@@ -2872,7 +2874,7 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             mock.sentinel.transfer_id,
             include_task_info=True
         )
-        mock_get_last_execution_for_transfer.assert_not_called()
+        mock_get_last_transfer_execution.assert_not_called()
         mock_update_task_info_for_minion_allocations.assert_not_called()
         mock_get_transfer_tasks_execution.assert_not_called()
         mock_begin_tasks.assert_not_called()
@@ -2882,12 +2884,12 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(server.ConductorServerEndpoint,
                        '_cancel_tasks_execution')
     @mock.patch.object(server.ConductorServerEndpoint,
-                       '_get_last_execution_for_transfer')
+                       '_get_last_transfer_execution')
     @mock.patch.object(server.ConductorServerEndpoint, '_get_transfer')
     def test_report_transfer_minions_allocation_error(
         self,
         mock_get_transfer,
-        mock_get_last_execution_for_transfer,
+        mock_get_last_transfer_execution,
         mock_cancel_tasks_execution,
         mock_set_tasks_execution_status
     ):
@@ -2906,19 +2908,18 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             mock.sentinel.context,
             mock.sentinel.transfer_id
         )
-        mock_get_last_execution_for_transfer.assert_called_once_with(
+        mock_get_last_transfer_execution.assert_called_once_with(
             mock.sentinel.context,
-            mock_get_transfer.return_value,
-            requery=False
+            mock_get_transfer.return_value.id
         )
         mock_cancel_tasks_execution.assert_called_once_with(
             mock.sentinel.context,
-            mock_get_last_execution_for_transfer.return_value,
+            mock_get_last_transfer_execution.return_value,
             requery=True
         )
         mock_set_tasks_execution_status.assert_called_once_with(
             mock.sentinel.context,
-            mock_get_last_execution_for_transfer.return_value,
+            mock_get_last_transfer_execution.return_value,
             constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS
         )
 
@@ -2927,12 +2928,12 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(server.ConductorServerEndpoint,
                        '_cancel_tasks_execution')
     @mock.patch.object(server.ConductorServerEndpoint,
-                       '_get_last_execution_for_transfer')
+                       '_get_last_transfer_execution')
     @mock.patch.object(server.ConductorServerEndpoint, '_get_transfer')
     def test_report_transfer_minions_allocation_error_unexpected_status(
         self,
         mock_get_transfer,
-        mock_get_last_execution_for_transfer,
+        mock_get_last_transfer_execution,
         mock_cancel_tasks_execution,
         mock_set_tasks_execution_status
     ):
@@ -2953,7 +2954,7 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             mock.sentinel.context,
             mock.sentinel.transfer_id
         )
-        mock_get_last_execution_for_transfer.assert_not_called()
+        mock_get_last_transfer_execution.assert_not_called()
         mock_cancel_tasks_execution.assert_not_called()
         mock_set_tasks_execution_status.assert_not_called()
 
@@ -4985,7 +4986,7 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(utils, "sanitize_task_info")
     @mock.patch.object(models, "TasksExecution")
     @mock.patch.object(server.ConductorServerEndpoint,
-                       "_check_valid_transfer_tasks_execution")
+                       "_check_transfer_deploy_ability")
     @mock.patch.object(server.ConductorServerEndpoint,
                        "_check_transfer_running_executions")
     @mock.patch.object(server.ConductorServerEndpoint,
@@ -5000,7 +5001,7 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
         mock_transfer,
         mock_check_minion_pools_for_action,
         mock_check_transfer_running_executions,
-        mock_check_valid_transfer_tasks_execution,
+        mock_check_transfer_deploy_ability,
         mock_TasksExecution,
         mock_sanitize_task_info,
         mock_create_task,
@@ -5043,7 +5044,8 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
             mock.sentinel.context,
             mock_get_transfer.return_value,
         )
-        mock_check_valid_transfer_tasks_execution.assert_called_once_with(
+        mock_check_transfer_deploy_ability.assert_called_once_with(
+            mock.sentinel.context,
             mock_get_transfer.return_value,
             force=True,
         )

+ 42 - 0
coriolis/tests/db/test_api.py

@@ -613,6 +613,48 @@ class TransferTasksExecutionDBAPITestCase(BaseDBAPITestCase):
             self.context, self.valid_transfer.id)
         self.assertIn(self.valid_tasks_execution, result)
 
+    def test_get_transfer_tasks_executions_single_status_filter(self):
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id,
+            filters={"status": DEFAULT_EXECUTION_STATUS})
+        self.assertIn(self.valid_tasks_execution, result)
+
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id,
+            filters={"status": constants.EXECUTION_STATUS_COMPLETED})
+        self.assertNotIn(self.valid_tasks_execution, result)
+
+    def test_get_transfer_tasks_executions_multi_status_filter(self):
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id,
+            filters={"status": [
+                DEFAULT_EXECUTION_STATUS,
+                constants.EXECUTION_STATUS_COMPLETED]})
+        self.assertIn(self.valid_tasks_execution, result)
+
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id,
+            filters={"status": [
+                constants.EXECUTION_STATUS_COMPLETED,
+                constants.EXECUTION_STATUS_ERROR]})
+        self.assertNotIn(self.valid_tasks_execution, result)
+
+    def test_get_transfer_tasks_executions_type_filter(self):
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id,
+            filters={"type": constants.EXECUTION_TYPE_TRANSFER_EXECUTION})
+        self.assertIn(self.valid_tasks_execution, result)
+
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id,
+            filters={"type": constants.EXECUTION_TYPE_DEPLOYMENT})
+        self.assertNotIn(self.valid_tasks_execution, result)
+
+    def test_get_transfer_tasks_executions_limit(self):
+        result = api.get_transfer_tasks_executions(
+            self.context, self.valid_transfer.id, limit=1)
+        self.assertEqual(1, len(result))
+
     def test_get_transfer_tasks_executions_admin(self):
         self.context.is_admin = True
         result = api.get_transfer_tasks_executions(