Просмотр исходного кода

Merge pull request #215 from smiclea/test-execution-tasks

Add unit tests for '_check_execution_tasks_sanity'
Nashwan Azhari 4 лет назад
Родитель
Сommit
7d03aeaf16

+ 26 - 25
coriolis/conductor/rpc/server.py

@@ -102,6 +102,7 @@ def parent_tasks_execution_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, task_id, *args, **kwargs):
         task = db_api.get_task(ctxt, task_id)
+
         @lockutils.synchronized(
             constants.EXECUTION_LOCK_NAME_FORMAT % task.execution_id,
             external=True)
@@ -696,7 +697,7 @@ class ConductorServerEndpoint(object):
                 p for p in task_cls.get_required_task_info_properties()
                 if p not in instance_task_info_keys]
             if missing_params:
-                raise exception.CoriolisException(
+                raise exception.TaskParametersException(
                     "The following task parameters for instance '%s' "
                     "are missing from the task_info for task '%s' of "
                     "type '%s': %s" % (
@@ -727,7 +728,7 @@ class ConductorServerEndpoint(object):
                                 if dep_id not in tasks_to_process and (
                                     dep_id not in processed_tasks)]
                             if missing_deps:
-                                raise exception.CoriolisException(
+                                raise exception.TaskDependencyException(
                                     "Task '%s' (type '%s') for instance '%s' "
                                     "has non-existent tasks referenced as "
                                     "dependencies: %s" % (
@@ -738,9 +739,9 @@ class ConductorServerEndpoint(object):
                                      for dep_id in task.depends_on]):
                                 queued_tasks.append(task)
                     else:
-                        raise exception.CoriolisException(
+                        raise exception.InvalidTaskState(
                             "Invalid initial state '%s' for task '%s' "
-                            "of type '%s'."% (
+                            "of type '%s'." % (
                                 task.status, task.id, task.task_type))
 
                 # check if nothing was left queued:
@@ -751,7 +752,7 @@ class ConductorServerEndpoint(object):
                     processed_tasks_type_map = {
                         tid: t.task_type
                         for tid, t in processed_tasks.items()}
-                    raise exception.CoriolisException(
+                    raise exception.ExecutionDeadlockException(
                         "Execution '%s' (type '%s') is bound to be deadlocked:"
                         " there are leftover tasks for instance '%s' which "
                         "will never get queued. Already processed tasks are: "
@@ -779,7 +780,7 @@ class ConductorServerEndpoint(object):
                         modified_fields_by_queued_tasks.items())
                     if len(tasks) > 1}
                 if conflicting_fields:
-                    raise exception.CoriolisException(
+                    raise exception.TaskFieldsConflict(
                         "There are fields which will encounter a state "
                         "conflict following the parallelized execution of "
                         "tasks for execution '%s' (type '%s') for instance "
@@ -839,17 +840,17 @@ class ConductorServerEndpoint(object):
             if instance not in replica.info:
                 replica.info[instance] = {'volumes_info': []}
             elif replica.info[instance].get('volumes_info') is None:
-                 replica.info[instance]['volumes_info'] = []
+                replica.info[instance]['volumes_info'] = []
             # NOTE: we update all of the param values before triggering an
             # execution to ensure that the latest parameters are used:
             replica.info[instance].update({
                 "source_environment": replica.source_environment,
                 "target_environment": dest_env})
-                # TODO(aznashwan): have these passed separately to the relevant
-                # provider methods (they're currently passed directly inside
-                # dest-env by the API service when accepting the call)
-                # "network_map": network_map,
-                # "storage_mappings": storage_mappings,
+            # TODO(aznashwan): have these passed separately to the relevant
+            # provider methods (they're currently passed directly inside
+            # dest-env by the API service when accepting the call)
+            # "network_map": network_map,
+            # "storage_mappings": storage_mappings,
 
             validate_replica_source_inputs_task = self._create_task(
                 instance,
@@ -1339,11 +1340,11 @@ class ConductorServerEndpoint(object):
             migration.info[instance].update({
                 "source_environment": migration.source_environment,
                 "target_environment": dest_env})
-                # TODO(aznashwan): have these passed separately to the relevant
-                # provider methods (they're currently passed directly inside
-                # dest-env by the API service when accepting the call)
-                # "network_map": network_map,
-                # "storage_mappings": storage_mappings,
+            # TODO(aznashwan): have these passed separately to the relevant
+            # provider methods (they're currently passed directly inside
+            # dest-env by the API service when accepting the call)
+            # "network_map": network_map,
+            # "storage_mappings": storage_mappings,
 
             validate_replica_deployment_inputs_task = self._create_task(
                 instance,
@@ -1755,11 +1756,11 @@ class ConductorServerEndpoint(object):
                 # NOTE: we must explicitly set this in each VM's info
                 # to prevent the Replica disks from being cloned:
                 "clone_disks": False}
-                # TODO(aznashwan): have these passed separately to the relevant
-                # provider methods (they're currently passed directly inside
-                # dest-env by the API service when accepting the call)
-                # "network_map": network_map,
-                # "storage_mappings": storage_mappings,
+            # TODO(aznashwan): have these passed separately to the relevant
+            # provider methods (they're currently passed directly inside
+            # dest-env by the API service when accepting the call)
+            # "network_map": network_map,
+            # "storage_mappings": storage_mappings,
 
             get_instance_info_task = self._create_task(
                 instance,
@@ -2694,7 +2695,7 @@ class ConductorServerEndpoint(object):
                         if all([
                                 dep_stat == constants.TASK_STATUS_COMPLETED
                                 for dep_stat in (
-                                        parent_task_statuses.values())]):
+                                    parent_task_statuses.values())]):
                             LOG.info(
                                 "Starting task '%s' as all dependencies have "
                                 "completed successfully: %s",
@@ -2735,7 +2736,7 @@ class ConductorServerEndpoint(object):
                                 "no non-error parents to directly depend on, "
                                 "but one or more on-error tasks have completed"
                                 " successfully: %s",
-                                    task.id, parent_task_statuses)
+                                task.id, parent_task_statuses)
                             task_statuses[task.id] = _start_task(task)
                         # start on-error tasks only if at least one non-error
                         # parent task has completed successfully:
@@ -3433,7 +3434,7 @@ class ConductorServerEndpoint(object):
             ctxt, replica_id, schedule_id, expired=expired)
         if not schedule:
             raise exception.NotFound(
-                "Schedule with ID '%s' for Replica '%s' not found." %  (
+                "Schedule with ID '%s' for Replica '%s' not found." % (
                     schedule_id, replica_id))
         return schedule
 

+ 18 - 0
coriolis/exception.py

@@ -237,6 +237,24 @@ class InvalidReplicaState(Invalid):
     message = _("Invalid replica state: %(reason)s")
 
 
+class ExecutionDeadlockException(CoriolisException):
+    message = _("Execution is bound to be deadlocked.")
+
+
+class TaskParametersException(CoriolisException):
+    message = _("Execution task parameters are missing.")
+
+
+class TaskFieldsConflict(CoriolisException):
+    message = _("There are fields which will encounter a state conflict.")
+
+
+class TaskDependencyException(CoriolisException):
+    message = _(
+        "Execution task has non-existent tasks referenced as dependencies."
+    )
+
+
 class ServiceUnavailable(Invalid):
     message = _("Service is unavailable at this time.")
 

+ 118 - 0
coriolis/tests/conductor/rpc/data/execution_tasks_config.yml

@@ -0,0 +1,118 @@
+- 
+  # One task with default parameters should execute succesfully
+  tasks_config: [{}]
+  init_task_info: ~
+  expected_result: ~
+-
+  # Invalid task status
+  tasks_config:
+    - status: STARTING
+  init_task_info: ~
+  expected_result: 
+    type: 'INVALID_STATE'
+    message: "Invalid initial state 'STARTING'"
+-
+  # Empty initial task info
+  tasks_config: [{}]
+  init_task_info: {}
+  expected_result: 
+    type: "MISSING_PARAMS"
+-
+  # Task that depends on a missing dependency
+  tasks_config:
+    - depends_on: ["invalid_id"]
+  init_task_info: ~
+  expected_result: 
+    type: "MISSING_DEPENDENCIES"
+    message: |- 
+      non-existent .* dependencies: \[\'invalid_id\'\]
+-
+  # Multiple tasks with the same default parameters and instance should conflict
+  tasks_config: [{}, {}]
+  init_task_info: ~
+  expected_result: 
+    type: "FIELDS_CONFLICT"
+-
+  # Setting a different instance for one task should resolve the conflict, but should raise a missing parameters for the second instance
+  tasks_config: 
+    - {}
+    - instance: "instance_2"
+  init_task_info: ~
+  expected_result:
+    type: "MISSING_PARAMS" 
+    message: "task parameters for instance 'instance_2' are missing"
+-
+  # Setting task parameters for second instance should not raise any errors
+  tasks_config: 
+    - instance: "instance_1"
+    - instance: "instance_2"
+  init_task_info:
+    instance_1: 
+      source_environment: {}
+      export_info: {}
+    instance_2: 
+      source_environment: {}
+      export_info: {}
+  expected_result: ~
+-
+  #  First task depends on second task
+  tasks_config: 
+    - depends_on: ["task_2"]
+    - id: "task_2"
+  init_task_info: ~
+  expected_result: ~
+-
+  # 3-way task dependency
+  tasks_config: 
+    - 
+      id: "task_1"
+      depends_on: ["task_2"]
+    - id: "task_2"
+    - 
+      id: "task_3"
+      depends_on: ["task_1", "task_2"]
+  init_task_info: ~
+  expected_result: ~
+-
+  # Task depends on another task further down the list should cause deadlock
+  tasks_config: 
+    - 
+      id: "task_1"
+      depends_on: ["task_3"]
+    - id: "task_2"
+    - 
+      id: "task_3"
+      depends_on: ["task_1", "task_2"]
+  init_task_info: ~
+  expected_result: 
+    type: "DEADLOCK"
+-
+  # Circular dependency causes deadlock
+  tasks_config: 
+    - 
+      id: "task_1"
+      depends_on: ["task_2"]
+    - 
+      id: "task_2"
+      depends_on: ["task_1"]
+  init_task_info: ~
+  expected_result:
+    type: "DEADLOCK"
+-
+  # Different task type should require different task parameters
+  tasks_config:
+    - task_type: "DEPLOY_OS_MORPHING_RESOURCES"
+  init_task_info: ~
+  expected_result:
+    type: "MISSING_PARAMS"
+-
+  # A different task type with its task parameters set should execute succesfully
+  tasks_config:
+    - 
+      task_type: "DEPLOY_OS_MORPHING_RESOURCES"
+      instance: "instance_1"
+  init_task_info:
+    instance_1: 
+      target_environment: {}
+      instance_deployment_info: {}
+  expected_result: ~

+ 47 - 1
coriolis/tests/conductor/rpc/test_server.py

@@ -1,11 +1,13 @@
 # Copyright 2017 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import re
+import uuid
 import ddt
 from unittest import mock
 
 from coriolis.conductor.rpc import server
-from coriolis import exception
+from coriolis import constants, exception
 from coriolis.tests import test_base, testutils
 
 
@@ -39,3 +41,47 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
         mock_check_replica_running.assert_called_once_with(
             mock.sentinel.context, mock_replica)
         mock_create_task.assert_not_called()
+
+    @ddt.file_data('data/execution_tasks_config.yml')
+    @ddt.unpack
+    def test_check_execution_tasks_sanity(self, tasks_config, init_task_info, expected_result):
+        def convertToTask(task_config):
+            instance_task = mock.Mock()
+            instance_task.instance = task_config.get(
+                'instance', mock.sentinel.instance)
+            instance_task.id = task_config.get('id', str(uuid.uuid4()))
+            instance_task.status = task_config.get(
+                'status', constants.TASK_STATUS_SCHEDULED)
+            instance_task.depends_on = task_config.get('depends_on', None)
+            instance_task.task_type = task_config.get(
+                'task_type', constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES)
+            return instance_task
+
+        execution = mock.sentinel.execution
+        execution.id = str(uuid.uuid4())
+        execution.type = mock.sentinel.execution_type
+
+        execution.tasks = [convertToTask(t) for t in tasks_config]
+
+        if init_task_info != None:
+            initial_task_info = init_task_info
+        else:
+            initial_task_info = {mock.sentinel.instance: {
+                'source_environment': mock.sentinel.source_environment,
+                'export_info': mock.sentinel.export_info
+            }}
+
+        if not expected_result:
+            self.server._check_execution_tasks_sanity(
+                execution, initial_task_info)
+        else:
+            exception_mappings = {
+                'INVALID_STATE': exception.InvalidTaskState,
+                'MISSING_PARAMS': exception.TaskParametersException,
+                'MISSING_DEPENDENCIES': exception.TaskDependencyException,
+                'FIELDS_CONFLICT': exception.TaskFieldsConflict,
+                'DEADLOCK': exception.ExecutionDeadlockException,
+            }
+            with self.assertRaisesRegex(exception_mappings[expected_result['type']], expected_result.get('message', "")):
+                self.server._check_execution_tasks_sanity(
+                    execution, initial_task_info)