Nashwan Azhari 5 سال پیش
والد
کامیت
6c242652f6

+ 50 - 118
coriolis/conductor/rpc/server.py

@@ -54,8 +54,7 @@ RPC_TOPIC_TO_CLIENT_CLASS_MAP = {
     constants.SCHEDULER_MAIN_MESSAGING_TOPIC: (
         rpc_scheduler_client.SchedulerClient),
     constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: (
-        rpc_cron_client.ReplicaCronClient)
-}
+        rpc_cron_client.ReplicaCronClient)}
 
 
 def endpoint_synchronized(func):
@@ -220,49 +219,42 @@ class ConductorServerEndpoint(object):
 
         return diagnostics
 
-    def _get_any_worker_service(self, ctxt, random_choice=False, raw_dict=False):
-        services = self._scheduler_client.get_workers_for_specs(ctxt)
-        if not services:
-            raise exception.NoWorkerServiceError()
-        service = services[0]
-        if random_choice:
-            service = random.choice(services)
-        if raw_dict:
-            return service
-        return db_api.get_service(ctxt, service['id'])
-
-    def _get_worker_rpc_for_host(self, worker_host, **client_kwargs):
-        return rpc_worker_client.WorkerClient(host=worker_host, **client_kwargs)
+    def _get_rpc_client_for_service(self, service, *client_args, **client_kwargs):
+        rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP.get(service['topic'])
+        if not rpc_client_class:
+            raise exception.NotFound(
+                "No RPC client class for service with topic '%s'." % (
+                    service.topic))
+
+        topic = service['topic']
+        if service['topic'] == constants.WORKER_MAIN_MESSAGING_TOPIC:
+            # NOTE: coriolis.service.MessagingService-type services (such
+            # as the worker), always have a dedicated per-host queue
+            # which can be used to target the service:
+            topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
+                "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
+                "host": service['host']})
+
+        return rpc_client_class(*client_args, topic=topic, **client_kwargs)
+
+    def _get_worker_rpc_for_host(self, host, *client_args, **client_kwargs):
+        rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP[
+            constants.WORKER_MAIN_MESSAGING_TOPIC]
+        topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
+            "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
+            "host": host})
+        return rpc_client_class(*client_args, topic=topic, **client_kwargs)
 
     def _get_worker_service_rpc_for_specs(
             self, ctxt, provider_requirements=None, region_sets=None,
             enabled=True, random_choice=False, raise_on_no_matches=True):
-        requirements_str = (
-            "enabled=%s; region_sets=%s; provider_requirements=%s" % (
-                enabled, region_sets, provider_requirements))
-        LOG.info(
-            "Requesting Worker Service from scheduler with the following "
-            "specifications: %s", requirements_str)
-        services = self._scheduler_client.get_workers_for_specs(
+        selected_service = self._scheduler_client.get_worker_service_for_specs(
             ctxt, provider_requirements=provider_requirements,
-            region_sets=region_sets, enabled=enabled)
-        if not services:
-            if raise_on_no_matches:
-                raise exception.NoSuitableWorkerServiceError()
-            return None
-        LOG.debug(
-            "Was offered Worker Services with the following IDs for "
-            "requirements '%s': %s",
-            requirements_str, [s["id"] for s in services])
-
-        selected_service = services[0]
-        if random_choice:
-            selected_service = random.choice(services)
-
-        LOG.info(
-            "Was offered Worker Service with ID '%s' for requirements: %s",
-            selected_service['id'], requirements_str)
-        return self._get_worker_rpc_for_host(selected_service['host'])
+            region_sets=region_sets, enabled=enabled,
+            random_choice=random_choice,
+            raise_on_no_matches=raise_on_no_matches)
+        service = db_api.get_service(ctxt, selected_service["id"])
+        return self._get_rpc_client_for_service(service)
 
     def _check_delete_reservation_for_transfer(self, transfer_action):
         action_id = transfer_action.base_id
@@ -573,16 +565,14 @@ class ConductorServerEndpoint(object):
             ctxt, endpoint.type, pool_environment)
 
     def get_available_providers(self, ctxt):
-        # TODO(aznashwan): merge list of all providers from all
-        # worker services:
-        worker_rpc = self._get_worker_rpc_for_host(
-            self._get_any_worker_service(ctxt)['host'])
+        worker_rpc = self._get_rpc_client_for_service(
+            self._scheduler_client.get_any_worker_service(ctxt))
         return worker_rpc.get_available_providers(ctxt)
 
     def get_provider_schemas(self, ctxt, platform_name, provider_type):
         # TODO(aznashwan): merge or version/namespace schemas for each worker?
-        worker_rpc = self._get_worker_rpc_for_host(
-            self._get_any_worker_service(ctxt)['host'])
+        worker_rpc = self._get_rpc_client_for_service(
+            self._scheduler_client.get_any_worker_service(ctxt))
         return worker_rpc.get_provider_schemas(
             ctxt, platform_name, provider_type)
 
@@ -647,78 +637,20 @@ class ConductorServerEndpoint(object):
     def _get_worker_service_rpc_for_task(
             self, ctxt, task, origin_endpoint, destination_endpoint,
             retry_count=5, retry_period=2):
-        LOG.debug(
-            "Compiling required Worker Service specs for task with "
-            "ID '%s' (type '%s') from endpoints '%s' to '%s'",
-            task.id, task.task_type, origin_endpoint.id,
-            destination_endpoint.id)
-        task_cls = tasks_factory.get_task_runner_class(task.task_type)
-
-        # determine required Coriolis regions based on the endpoints:
-        required_region_sets = []
-        origin_endpoint_region_ids = [
-            r.id for r in origin_endpoint.mapped_regions]
-        destination_endpoint_region_ids = [
-            r.id for r in destination_endpoint.mapped_regions]
-
-        required_platform = task_cls.get_required_platform()
-        if required_platform in (
-                constants.TASK_PLATFORM_SOURCE,
-                constants.TASK_PLATFORM_BILATERAL):
-            required_region_sets.append(origin_endpoint_region_ids)
-        if required_platform in (
-                constants.TASK_PLATFORM_DESTINATION,
-                constants.TASK_PLATFORM_BILATERAL):
-            required_region_sets.append(destination_endpoint_region_ids)
-
-        # determine provider requirements:
-        provider_requirements = {}
-        required_provider_types = task_cls.get_required_provider_types()
-        if constants.PROVIDER_PLATFORM_SOURCE in required_provider_types:
-            provider_requirements[origin_endpoint.type] = (
-                required_provider_types[
-                    constants.PROVIDER_PLATFORM_SOURCE])
-        if constants.PROVIDER_PLATFORM_DESTINATION in required_provider_types:
-            provider_requirements[destination_endpoint.type] = (
-                required_provider_types[
-                    constants.PROVIDER_PLATFORM_DESTINATION])
-
-        worker_rpc = None
-        for i in range(retry_count):
-            try:
-                LOG.debug(
-                    "Requesting Worker Service for task with ID '%s' (type "
-                    "'%s') from endpoints '%s' to '%s'", task.id,
-                    task.task_type, origin_endpoint.id,
-                    destination_endpoint.id)
-                worker_rpc = self._get_worker_service_rpc_for_specs(
-                    ctxt, provider_requirements=provider_requirements,
-                    region_sets=required_region_sets, enabled=True)
-                LOG.debug(
-                    "Scheduler has granted Worker Service for task with ID "
-                    "'%s' (type '%s') from endpoints '%s' to '%s'",
-                    task.id, task.task_type, origin_endpoint.id,
-                    destination_endpoint.id)
-                return worker_rpc
-            except Exception as ex:
-                LOG.warn(
-                    "Failed to schedule task with ID '%s' (attempt %d/%d). "
-                    "Waiting %d seconds and then retrying. Error was: %s",
-                    task.id, i+1, retry_count, retry_period,
-                    utils.get_exception_details())
-                time.sleep(retry_period)
-
-        message = (
-            "Failed to schedule task %s after %d tries. This may indicate that"
-            " there are no Coriolis Worker services able to perform the task "
-            "on the platforms and in the Coriolis Regions required by the "
-            "selected source/destination Coriolis Endpoints. Please review"
-            " the Conductor and Scheduler logs for more exact details." % (
-                task.id, retry_count))
-        db_api.set_task_status(
-            ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
-            exception_details=message)
-        raise exception.NoSuitableWorkerServiceError(message)
+        try:
+            worker_service = self._scheduler_client.get_worker_service_for_task(
+                ctxt, {"id": task.id, "task_type": task.task_type},
+                origin_endpoint.to_dict(), destination_endpoint.to_dict(),
+                retry_count=retry_count, retry_period=retry_period)
+        except Exception as ex:
+            LOG.debug(
+                "Failed to get worker service for task '%s'. Updating status "
+                "to unscheduleable.")
+            db_api.set_task_status(
+                ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
+                exception_details=str(ex))
+
+        return self._get_rpc_client_for_service(worker_service)
 
     def _begin_tasks(
             self, ctxt, action, execution, task_info_override=None,

+ 1 - 0
coriolis/constants.py

@@ -272,6 +272,7 @@ MINION_POOL_EXECUTION_TYPES = [
     EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS]
 
 TASK_LOCK_NAME_FORMAT = "task-%s"
+TASKFLOW_LOCK_NAME_FORMAT = "taskflow-%s"
 EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
 ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
 MIGRATION_LOCK_NAME_FORMAT = "migration-%s"

+ 14 - 2
coriolis/minion_manager/rpc/server.py

@@ -16,6 +16,8 @@ from coriolis import utils
 from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
+from coriolis.scheduler.rpc import client as rpc_scheduler_client
+from coriolis.worker.rpc import client as rpc_worker_client
 
 
 VERSION = "1.0"
@@ -42,8 +44,18 @@ def minion_pool_synchronized(func):
 
 
 class MinionManagerServerEndpoint(object):
-    def __init__(self):
-        self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
+
+    @property
+    def _rpc_worker_client(self):
+        return rpc_worker_client.WorkerClient()
+
+    @property
+    def _scheduler_client(self):
+        return rpc_scheduler_client.SchedulerClient()
+
+    @property
+    def _conductor_client(self):
+        return rpc_conductor_client.ConductorClient()
 
     def get_diagnostics(self, ctxt):
         return utils.get_diagnostics_info()

+ 54 - 0
coriolis/minion_manager/rpc/tasks.py

@@ -0,0 +1,54 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from taskflow import task as taskflow_tasks
+
+from coriolis import constants
+from coriolis import exception
+
+
+class BaseMinionManangerTask(taskflow_tasks.Task):
+
+    """
+    Base taskflow.Task implementation for Minion Mananger tasks.
+    """
+
+    def __init__(
+            self, ctxt, db_api, minion_pool_id, **kwargs):
+        self._minion_pool_id = minion_pool_id
+        self._db_api = db_api
+
+        super(BaseRunWorkerTask, self).__init__(**kwargs)
+
+    # @lock_on_pool
+    def pre_execute(self):
+        # TODO:
+        # check minion pool is ready for the task
+        # ask scheduler for worker service
+        # update minion pool status accordingly
+        pass
+
+    # @lock_on_pool
+    def execute(self):
+        # TODO:
+        # left to child classes
+        pass
+
+    def post_execute(self):
+        # TODO:
+        # update minion pool status accordingly
+        # record results (if any)
+        pass
+
+    def pre_revert(self):
+        # ask scheduler for worker service for reversion
+        pass
+
+    def revert(self):
+        # TODO:
+        # run reverting task
+        pass
+
+    def post_revert(self):
+        # TODO:
+        pass

+ 147 - 1
coriolis/scheduler/rpc/client.py

@@ -1,12 +1,22 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
-from oslo_config import cfg
+import random
+import time
+
 import oslo_messaging as messaging
+from oslo_config import cfg
+from oslo_log import log as logging
 
+from coriolis import constants
+from coriolis import exception
 from coriolis import rpc
+from coriolis import utils
+from coriolis.tasks import factory as tasks_factory
+
 
 VERSION = "1.0"
+LOG = logging.getLogger(__name__)
 
 scheduler_opts = [
     cfg.IntOpt("scheduler_rpc_timeout",
@@ -35,3 +45,139 @@ class SchedulerClient(rpc.BaseRPCClient):
         return self._call(
             ctxt, 'get_workers_for_specs', region_sets=region_sets,
             enabled=enabled, provider_requirements=provider_requirements)
+
+    def get_any_worker_service(
+            self, ctxt, random_choice=False, raise_if_none=True):
+        services = self.get_workers_for_specs(ctxt)
+        if not services:
+            if raise_if_none:
+                raise exception.NoWorkerServiceError()
+            return None
+        service = services[0]
+        if random_choice:
+            service = random.choice(services)
+        LOG.debug(
+            "Selected service with ID '%s' for any-worker request.",
+            service['id'])
+        return service
+
+    def get_worker_service_for_specs(
+            self, ctxt, provider_requirements=None, region_sets=None,
+            enabled=True, random_choice=False, raise_on_no_matches=True):
+        """Utility method which ensures at least one service matching
+        the provided requirements exists and is usable.
+        """
+        requirements_str = (
+            "enabled=%s; region_sets=%s; provider_requirements=%s" % (
+                enabled, region_sets, provider_requirements))
+        LOG.info(
+            "Requesting Worker Service from scheduler with the following "
+            "specifications: %s", requirements_str)
+        services = self.get_workers_for_specs(
+            ctxt, provider_requirements=provider_requirements,
+            region_sets=region_sets, enabled=enabled)
+        if not services:
+            if raise_on_no_matches:
+                raise exception.NoSuitableWorkerServiceError()
+            return None
+        LOG.debug(
+            "Was offered Worker Services with the following IDs for "
+            "requirements '%s': %s",
+            requirements_str, [s["id"] for s in services])
+
+        selected_service = services[0]
+        if random_choice:
+            selected_service = random.choice(services)
+
+        LOG.info(
+            "Was offered Worker Service with ID '%s' for requirements: %s",
+            selected_service['id'], requirements_str)
+        return selected_service
+
+    def get_worker_service_for_task(
+            self, ctxt, task, origin_endpoint, destination_endpoint,
+            retry_count=5, retry_period=2):
+        """ Gets a worker service for the task with the given properties
+        and source/target endpoints.
+
+        :param task: Dict of the form: {
+            "id": "<task_id>",
+            "task_type": "<constants.TASK_TYPE_*>"}
+        :param origin_endpoint: Dict of the form {
+            "id": "<ID>",
+            "mapped_regions": ["List of mapped endpoint regions"]}
+        :param destination_endpoint: Same as origin_endpoint
+        """
+        LOG.debug(
+            "Compiling required Worker Service specs for task with "
+            "ID '%s' (type '%s') from endpoints '%s' to '%s'",
+            task['id'], task['task_type'], origin_endpoint['id'],
+            destination_endpoint['id'])
+        task_cls = tasks_factory.get_task_runner_class(
+            task['task_type'])
+
+        # determine required Coriolis regions based on the endpoints:
+        required_region_sets = []
+        origin_endpoint_region_ids = [
+            r.id for r in origin_endpoint['mapped_regions']]
+        destination_endpoint_region_ids = [
+            r.id for r in destination_endpoint['mapped_regions']]
+
+        required_platform = task_cls.get_required_platform()
+        if required_platform in (
+                constants.TASK_PLATFORM_SOURCE,
+                constants.TASK_PLATFORM_BILATERAL):
+            required_region_sets.append(origin_endpoint_region_ids)
+        if required_platform in (
+                constants.TASK_PLATFORM_DESTINATION,
+                constants.TASK_PLATFORM_BILATERAL):
+            required_region_sets.append(destination_endpoint_region_ids)
+
+        # determine provider requirements:
+        provider_requirements = {}
+        required_provider_types = task_cls.get_required_provider_types()
+        if constants.PROVIDER_PLATFORM_SOURCE in required_provider_types:
+            provider_requirements[origin_endpoint['type']] = (
+                required_provider_types[
+                    constants.PROVIDER_PLATFORM_SOURCE])
+        if constants.PROVIDER_PLATFORM_DESTINATION in required_provider_types:
+            provider_requirements[destination_endpoint['type']] = (
+                required_provider_types[
+                    constants.PROVIDER_PLATFORM_DESTINATION])
+
+        worker_service = None
+        for i in range(retry_count):
+            try:
+                LOG.debug(
+                    "Requesting Worker Service for task with ID '%s' (type "
+                    "'%s') from endpoints '%s' to '%s'", task['id'],
+                    task['task_type'], origin_endpoint['id'],
+                    destination_endpoint['id'])
+                worker_service = self.get_workers_for_specs(
+                    ctxt, provider_requirements=provider_requirements,
+                    region_sets=required_region_sets, enabled=True)
+                LOG.debug(
+                    "Scheduler has granted Worker Service '%s' for task with "
+                    "ID '%s' (type '%s') from endpoints '%s' to '%s'",
+                    worker_service['id'], task['id'], task['task_type'],
+                    origin_endpoint['id'], destination_endpoint['id'])
+                return worker_service
+            except Exception as ex:
+                LOG.warn(
+                    "Failed to schedule task with ID '%s' (attempt %d/%d). "
+                    "Waiting %d seconds and then retrying. Error was: %s",
+                    task['id'], i+1, retry_count, retry_period,
+                    utils.get_exception_details())
+                time.sleep(retry_period)
+
+        message = (
+            "Failed to schedule task %s after %d tries. This may indicate that"
+            " there are no Coriolis Worker services able to perform the task "
+            "on the platforms and in the Coriolis Regions required by the "
+            "selected source/destination Coriolis Endpoints. Please review"
+            " the Conductor and Scheduler logs for more exact details." % (
+                task['id'], retry_count))
+        # db_api.set_task_status(
+        #     ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
+        #     exception_details=message)
+        raise exception.NoSuitableWorkerServiceError(message)

+ 67 - 0
coriolis/scheduler/scheduler_utils.py

@@ -0,0 +1,67 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import random
+
+from oslo_log import log as logging
+
+from coriolis import constants
+from coriolis.db import api as db_api
+from coriolis import exception
+from coriolis.replica_cron.rpc import client as rpc_cron_client
+from coriolis.scheduler.rpc import client as rpc_scheduler_client
+from coriolis import utils
+from coriolis.worker.rpc import client as rpc_worker_client
+
+
+VERSION = "1.0"
+
+LOG = logging.getLogger(__name__)
+
+RPC_TOPIC_TO_CLIENT_CLASS_MAP = {
+    constants.WORKER_MAIN_MESSAGING_TOPIC: rpc_worker_client.WorkerClient,
+    constants.SCHEDULER_MAIN_MESSAGING_TOPIC: (
+        rpc_scheduler_client.SchedulerClient),
+    constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: (
+        rpc_cron_client.ReplicaCronClient)
+}
+
+
+def get_rpc_client_for_service(service, *client_args, **client_kwargs):
+    rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP.get(service.topic)
+    if not rpc_client_class:
+        raise exception.NotFound(
+            "No RPC client class for service with topic '%s'." % (
+                service.topic))
+
+    topic = service.topic
+    if service.topic == constants.WORKER_MAIN_MESSAGING_TOPIC:
+        # NOTE: coriolis.service.MessagingService-type services (such
+        # as the worker), always have a dedicated per-host queue
+        # which can be used to target the service:
+        topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
+            "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
+            "host": service.host})
+
+    return rpc_client_class(*client_args, topic=topic, **client_kwargs)
+
+
+def get_any_worker_service(
+        scheduler_client, ctxt, random_choice=False, raw_dict=False):
+    services = scheduler_client.get_workers_for_specs(ctxt)
+    if not services:
+        raise exception.NoWorkerServiceError()
+    service = services[0]
+    if random_choice:
+        service = random.choice(services)
+    if raw_dict:
+        return service
+    return db_api.get_service(ctxt, service['id'])
+
+def get_worker_rpc_for_host(host, *client_args, **client_kwargs):
+    rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP[
+        constants.WORKER_MAIN_MESSAGING_TOPIC]
+    topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
+        "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
+        "host": host})
+    return rpc_client_class(*client_args, topic=topic, **client_kwargs)

+ 0 - 0
coriolis/taskflow/__init__.py


+ 75 - 0
coriolis/taskflow/runner.py

@@ -0,0 +1,75 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+# NOTE: we neeed to make sure eventlet is imported:
+import eventlet  #noqa
+
+from oslo_log import log as logging
+from taskflow import engines
+from taskflow import task as taskflow_tasks
+from taskflow.patterns import unordered_flow
+from taskflow.types import notifier
+
+
+LOG = logging.getLogger(__name__)
+
+TASKFLOW_EXECUTION_ORDER_PARALLEL = 'parallel'
+TASKFLOW_EXECUTION_ORDER_SERIAL = 'serial'
+
+TASKFLOW_EXECUTOR_THREADS = "threaded"
+TASKFLOW_EXECUTOR_PROCESSES = "processes"
+TASKFLOW_EXECUTOR_GREENTHREADED = "greenthreaded"
+
+
+class TaskFlowRunner(object):
+
+    def __init__(
+            self, service_name,
+            execution_order=TASKFLOW_EXECUTION_ORDER_PARALLEL,
+            executor=TASKFLOW_EXECUTOR_GREENTHREADED,
+            max_workers=1):
+
+        self._service_name = service_name
+        self._execution_order = execution_order
+        self._executor = executor
+        self._max_workers = max_workers
+
+    def _log_flow_transition(self, state, details):
+        LOG.debug(
+            "[TaskFlowRunner(%s)] Flow '%s' (internal UUID '%s') transitioned"
+            " from '%s' state to '%s'",
+            self._service_name, details['flow_name'], details['flow_uuid'],
+            details['old_state'], state)
+
+    def _log_task_transition(self, state, details):
+        LOG.debug(
+            "[TaskFlowRunner(%s)] Task '%s' (internal UUID '%s') transitioned"
+            "from '%s' state to '%s'",
+            self._service_name, details['task_name'], details['task_uuid'],
+            details['old_state'], state)
+
+    def _setup_engine_for_flow(self, flow, store=None):
+        engine = engines.load(
+            flow, store, executor=self._executor,
+            engine=self._execution_order, max_workers=self._max_workers)
+        engine.notifier.register(
+            notifier.Notifier.ANY, self._log_flow_transition)
+        engine.atom_notifier.register(
+            notifier.Notifier.ANY, self._log_task_transition)
+        return engine
+
+    def run_flow(self, flow, store=None):
+        LOG.debug("Ramping up to run flow with name '%s'", flow.name)
+        engine = self._setup_engine_for_flow(flow, store=store)
+
+        LOG.debug("Attempting to compile flow with name '%s'", flow.name)
+        engine.compile()
+
+        LOG.debug("Preparing to run flow with name '%s'", flow.name)
+        engine.prepare()
+
+        LOG.debug("Running flow with name '%s'", flow.name)
+        engine.run()
+        LOG.debug(
+            "Successfully ran flow with name '%s'. Statistics were: %s",
+            flow.name, engine.statistics)

+ 139 - 0
coriolis/taskflow/taskflow.py

@@ -0,0 +1,139 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from taskflow import task as taskflow_tasks
+
+from coriolis import constants
+from coriolis import exception
+from coriolis.tass import factory as tasks_factory
+
+
+TASK_RETURN_VALUE_FORMAT = "%s-result" % (
+        constants.TASK_LOCK_NAME_FORMAT)
+
+
+class _BaseRunWorkerTask(taskflow_tasks.Task):
+
+    """
+    Base taskflow.Task implementation for tasks which can be run
+    on the worker service.
+    This class can be seen as an "adapter" between the current
+    coriolis.tasks.TaskRunner classes and taskflow ones.
+
+    :param task_id: ID of the task. This value is declared as a returned value
+        from the task and can be set as a requirement for other tasks, thus
+        achieving a dependency system.
+    :param main_task_runner_class: constants.TASK_TYPE_* referencing the
+        main coriolis.tasks.TaskRunner class to be run on a worker service.
+    :param cleanup_task_runner_task: constants.TASK_TYPE_* referencing the
+        cleanup task to be run on reversion. No cleanup will be performed
+        during the task's reversion (apart from Worker Service deallocation)
+        otherwise.
+    """
+
+    def __init__(
+            self, worker_service_host, task_id, main_task_runner_type,
+            cleanup_task_runner_type, depends_on=None, **kwargs):
+        self._main_task_runner_type = main_task_runner_type
+        self._cleanup_task_runner_type = cleanup_task_runner_type
+        self._worker_service_host = worker_service_host
+
+        super(_BaseRunWorkerTask, self).__init__(name=task_id, **kwargs)
+
+    def _set_provides_for_dependencies(self, kwargs):
+        dep = self.TASK_RETURN_VALUE_FORMAT % self.task_id
+        if kwargs.get('provides') is not None:
+            kwargs['provides'].append(dep)
+        else:
+            kwargs['provides'] = [dep]
+
+    def _set_requires_for_dependencies(self, kwargs, depends_on):
+        dep_requirements = [
+            self.TASK_RETURN_VALUE_FORMAT % dep_id
+            for dep_id in depends_on]
+        if kwargs.get('requires') is not None:
+            kwargs['requires'].extend(dep_requirements)
+        elif dep_requirements:
+            kwargs['requires'] = dep_requirements
+        return kwargs
+
+    def _set_requires_for_task_info_fields(self, kwargs):
+        new_requires = kwargs.get('requires', [])
+        main_task_runner = tasks_factory.get_task_runner_class(
+            self._main_task_runner_type)
+        main_task_deps = main_task_runner.get_required_task_info_properties()
+        new_requires.extend(main_task_deps)
+        if self._cleanup_task_runner_type:
+            cleanup_task_runner = tasks_factory.get_task_runner_class(
+                self._cleanup_task_runner_type)
+            cleanup_task_deps = list(
+                set(
+                    cleanup_task_runner.get_required_task_info_properties(
+                        )).difference(
+                            main_task_runner.get_returned_task_info_properties()))
+            new_requires.extend(cleanup_task_deps)
+
+        kwargs['requires'] = new_requires
+        return kwargs
+
+    def _set_provides_for_task_info_fields(self, kwargs):
+        new_provides = kwargs.get('provides', [])
+        main_task_runner = tasks_factory.get_task_runner_class(
+            self._main_task_runner_type)
+        main_task_res = main_task_runner.get_returned_task_info_properties()
+        new_provides.extend(main_task_res)
+        if self._cleanup_task_runner_type:
+            cleanup_task_runner = tasks_factory.get_task_runner_class(
+                self._cleanup_task_runner_type)
+            cleanup_task_res = list(
+                set(
+                    cleanup_task_runner.get_returned_task_info_properties(
+                        )).difference(
+                            main_task_runner.get_returned_task_info_properties()))
+            new_provides.extend(cleanup_task_res)
+
+        kwargs['provides'] = new_provides
+        return kwargs
+
+    def _get_worker_rpc_for_main_task(self):
+        pass
+
+    def _get_worker_rpc_for_cleanup_task(self):
+        if self._cleanup_task_runner_type:
+            # TODO
+            pass
+
+    def pre_execute(self):
+        # TODO:
+        # add task to Coriolis' DB? (should we do it here or in the conductor?)
+        # load up TaskRunner class
+        # get worker host (shared among all tasks being run)
+        pass
+
+    # @lock_on_task? (Coriolis' notion of a task)
+    def execute(self):
+        # TODO:
+        # actively run the thing on the worker
+        # return results_dict_from_worker
+        pass
+
+    def post_execute(self):
+        # TODO:
+        # deallocate worker service
+        # save task result to Coriolis' db
+        pass
+
+    def pre_revert(self):
+        # deallocate original worker service
+        # TODO: find worker to run reverting
+        pass
+
+    def revert(self):
+        # TODO:
+        # run reverting task
+        pass
+
+    def post_revert(self):
+        # TODO:
+        # deallocate worker service
+        pass