Nashwan Azhari %!s(int64=5) %!d(string=hai) anos
pai
achega
e39478e303

+ 8 - 5
coriolis/api/v1/views/minion_pool_view.py

@@ -14,11 +14,13 @@ def _format_minion_pool(req, minion_pool, keys=None):
         transform(k, v) for k, v in minion_pool.items()))
 
     def _hide_minion_creds(minion_conn):
-        if 'pkey' in minion_conn:
+        if not minion_conn:
+            return minion_conn
+        if minion_conn.get('pkey'):
             minion_conn['pkey'] = '***'
-        if 'password' in minion_conn:
+        if minion_conn.get('password'):
             minion_conn['password'] = '***'
-        if 'certificates' in minion_conn:
+        if minion_conn.get('certificates'):
             for key in minion_conn['certificates']:
                 minion_conn['certificates'][key] = '***'
     if 'minion_machines' in minion_pool_dict:
@@ -26,8 +28,9 @@ def _format_minion_pool(req, minion_pool, keys=None):
             if 'connection_info' in machine:
                 _hide_minion_creds(machine['connection_info'])
             if 'backup_writer_connection_info' in machine:
-                if 'connection_details' in machine[
-                        'backup_writer_connection_info']:
+                if machine.get('backup_writer_connection_info') and (
+                        'connection_details' in machine[
+                            'backup_writer_connection_info']):
                     _hide_minion_creds(
                         machine['backup_writer_connection_info'][
                             'connection_details'])

+ 3 - 56
coriolis/conductor/rpc/server.py

@@ -449,32 +449,6 @@ class ConductorServerEndpoint(object):
         return worker_rpc.get_endpoint_destination_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
-    def get_endpoint_source_minion_pool_options(
-            self, ctxt, endpoint_id, env, option_names):
-        endpoint = self.get_endpoint(ctxt, endpoint_id)
-
-        worker_rpc = self._get_worker_service_rpc_for_specs(
-            ctxt, enabled=True,
-            region_sets=[[reg.id for reg in endpoint.mapped_regions]],
-            provider_requirements={
-                endpoint.type: [
-                    constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
-        return worker_rpc.get_endpoint_source_minion_pool_options(
-            ctxt, endpoint.type, endpoint.connection_info, env, option_names)
-
-    def get_endpoint_destination_minion_pool_options(
-            self, ctxt, endpoint_id, env, option_names):
-        endpoint = self.get_endpoint(ctxt, endpoint_id)
-
-        worker_rpc = self._get_worker_service_rpc_for_specs(
-            ctxt, enabled=True,
-            region_sets=[[reg.id for reg in endpoint.mapped_regions]],
-            provider_requirements={
-                endpoint.type: [
-                    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
-        return worker_rpc.get_endpoint_destination_minion_pool_options(
-            ctxt, endpoint.type, endpoint.connection_info, env, option_names)
-
     def get_endpoint_networks(self, ctxt, endpoint_id, env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
@@ -536,34 +510,6 @@ class ConductorServerEndpoint(object):
         return worker_rpc.validate_endpoint_source_environment(
             ctxt, endpoint.type, source_env)
 
-    def validate_endpoint_source_minion_pool_options(
-            self, ctxt, endpoint_id, pool_environment):
-        endpoint = self.get_endpoint(ctxt, endpoint_id)
-
-        worker_rpc = self._get_worker_service_rpc_for_specs(
-            ctxt, enabled=True,
-            region_sets=[[reg.id for reg in endpoint.mapped_regions]],
-            provider_requirements={
-                endpoint.type: [
-                    constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
-
-        return worker_rpc.validate_endpoint_source_minion_pool_options(
-            ctxt, endpoint.type, pool_environment)
-
-    def validate_endpoint_destination_minion_pool_options(
-            self, ctxt, endpoint_id, pool_environment):
-        endpoint = self.get_endpoint(ctxt, endpoint_id)
-
-        worker_rpc = self._get_worker_service_rpc_for_specs(
-            ctxt, enabled=True,
-            region_sets=[[reg.id for reg in endpoint.mapped_regions]],
-            provider_requirements={
-                endpoint.type: [
-                    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
-
-        return worker_rpc.validate_endpoint_destination_minion_pool_options(
-            ctxt, endpoint.type, pool_environment)
-
     def get_available_providers(self, ctxt):
         worker_rpc = self._get_rpc_client_for_service(
             self._scheduler_client.get_any_worker_service(ctxt))
@@ -636,12 +582,13 @@ class ConductorServerEndpoint(object):
 
     def _get_worker_service_rpc_for_task(
             self, ctxt, task, origin_endpoint, destination_endpoint,
-            retry_count=5, retry_period=2):
+            retry_count=5, retry_period=2, random_choice=True):
         try:
             worker_service = self._scheduler_client.get_worker_service_for_task(
                 ctxt, {"id": task.id, "task_type": task.task_type},
                 origin_endpoint.to_dict(), destination_endpoint.to_dict(),
-                retry_count=retry_count, retry_period=retry_period)
+                retry_count=retry_count, retry_period=retry_period,
+                random_choice=random_choice)
         except Exception as ex:
             LOG.debug(
                 "Failed to get worker service for task '%s'. Updating status "

+ 27 - 10
coriolis/constants.py

@@ -98,7 +98,6 @@ TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES"
 TASK_TYPE_OS_MORPHING = "OS_MORPHING"
 TASK_TYPE_DELETE_OS_MORPHING_RESOURCES = "DELETE_OS_MORPHING_RESOURCES"
 
-
 TASK_TYPE_GET_INSTANCE_INFO = "GET_INSTANCE_INFO"
 TASK_TYPE_DEPLOY_REPLICA_DISKS = "DEPLOY_REPLICA_DISKS"
 TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS = (
@@ -173,6 +172,16 @@ TASK_TYPE_RELEASE_DESTINATION_MINION = "RELEASE_DESTINATION_MINION"
 TASK_TYPE_RELEASE_OSMORPHING_MINION = "RELEASE_OSMORPHING_MINION"
 TASK_TYPE_COLLECT_OSMORPHING_INFO = "COLLECT_OS_MORPHING_INFO"
 
+MINION_POOL_OPERATIONS_TASKS = [
+    TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS,
+    TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS,
+    TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
+    TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES,
+    TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
+    TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE,
+    TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
+    TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES]
+
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_DESTINATION = "destination"
 TASK_PLATFORM_BILATERAL = "bilateral"
@@ -225,6 +234,10 @@ TASK_EVENT_INFO = "INFO"
 TASK_EVENT_WARNING = "WARNING"
 TASK_EVENT_ERROR = "ERROR"
 
+MINION_POOL_EVENT_INFO = "INFO"
+MINION_POOL_EVENT_WARNING = "WARNING"
+MINION_POOL_EVENT_ERROR = "ERROR"
+
 OS_TYPE_BSD = "bsd"
 OS_TYPE_LINUX = "linux"
 OS_TYPE_OS_X = "osx"
@@ -315,22 +328,26 @@ MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF = "poweroff"
 MINION_POOL_STATUS_UNKNOWN = "UNKNOWN"
 MINION_POOL_STATUS_ERROR = "ERROR"
 MINION_POOL_STATUS_UNINITIALIZED = "UNINITIALIZED"
-MINION_POOL_STATUS_UNINITIALIZING = "UNINITIALIZING"
-MINION_POOL_STATUS_INITIALIZING = "INITIALIZING"
-MINION_POOL_STATUS_DEALLOCATING = "DEALLOCATING"
 MINION_POOL_STATUS_DEALLOCATED = "DEALLOCATED"
-MINION_POOL_STATUS_ALLOCATING = "ALLOCATING"
+MINION_POOL_STATUS_VALIDATING_INPUTS = "VALIDATING_INPUTS"
+MINION_POOL_STATUS_ALLOCATING_SHARED_RESOURCES = "ALLOCATING_SHARED_RESOURCES"
+MINION_POOL_STATUS_ALLOCATING_MACHINES = "ALLOCATING_MACHINES"
+MINION_POOL_STATUS_DEALLOCATING_MACHINES = "DEALLOCATING_MACHINES"
+MINION_POOL_STATUS_DEALLOCATING_SHARED_RESOURCES = (
+    "DEALLOCATING_SHARED_RESOURCES")
 MINION_POOL_STATUS_ALLOCATED = "ALLOCATED"
-MINION_POOL_STATUS_RECONFIGURING = "RECONFIGURING"
 
 ACTIVE_MINION_POOL_STATUSES = [
-    MINION_POOL_STATUS_INITIALIZING,
-    MINION_POOL_STATUS_ALLOCATING,
-    MINION_POOL_STATUS_DEALLOCATING,
-    MINION_POOL_STATUS_UNINITIALIZING]
+    MINION_POOL_STATUS_VALIDATING_INPUTS,
+    MINION_POOL_STATUS_ALLOCATING_SHARED_RESOURCES,
+    MINION_POOL_STATUS_ALLOCATING_MACHINES,
+    MINION_POOL_STATUS_DEALLOCATING_MACHINES,
+    MINION_POOL_STATUS_DEALLOCATING_SHARED_RESOURCES]
 
 MINION_MACHINE_IDENTIFIER_FORMAT = "coriolis-pool-%(pool_id)s-minion-%(minion_id)s"
 MINION_MACHINE_STATUS_UNKNOWN = "UNKNOWN"
+MINION_MACHINE_STATUS_DEPLOYING = "DEPLOYING"
+MINION_MACHINE_STATUS_ERROR = "ERROR"
 MINION_MACHINE_STATUS_UNINITIALIZED = "UNINITIALIZED"
 MINION_MACHINE_STATUS_RECONFIGURING = "RECONFIGURING"
 MINION_MACHINE_STATUS_AVAILABLE = "AVAILABLE"

+ 84 - 61
coriolis/db/api.py

@@ -715,6 +715,62 @@ def add_task_event(context, task_id, level, message):
     _session(context).add(task_event)
 
 
+@enginefacade.writer
+def add_minion_pool_event(context, pool_id, level, message):
+    pool_event = models.MinionPoolEvent()
+    pool_event.pool_id = pool_id
+    pool_event.level = level
+    pool_event.message = message
+    _session(context).add(pool_event)
+
+
+def _get_minion_pool_progress_update(context, pool_id, current_step):
+    q = _soft_delete_aware_query(context, models.MinionPoolProgressUpdate)
+    return q.filter(
+        models.MinionPoolProgressUpdate.pool_id == pool_id,
+        models.MinionPoolProgressUpdate.current_step == current_step).first()
+
+
+@enginefacade.reader
+def get_minion_pool_progress_step(context, pool_id):
+    curr_step = 0
+    q = _soft_delete_aware_query(context, models.MinionPoolProgressUpdate)
+    last_step = q.filter(
+        models.MinionPoolProgressUpdate.pool_id == pool_id).order_by(
+            models.MinionPoolProgressUpdate.current_step.desc()).first()
+
+    if last_step:
+        curr_step = last_step.current_step
+
+    return curr_step
+
+
+@enginefacade.writer
+def add_minion_pool_progress_update(context, pool_id, total_steps, message):
+    current_step = get_minion_pool_progress_step(context, pool_id) + 1
+    pool_progress_update = models.MinionPoolProgressUpdate(
+        pool_id=pool_id, current_step=current_step, total_steps=total_steps,
+        message=message)
+    _session(context).add(pool_progress_update)
+
+
+@enginefacade.writer
+def update_minion_pool_progress_update(
+        context, pool_id, step, total_steps, message):
+    pool_progress_update = _get_minion_pool_progress_update(
+        context, pool_id, step)
+    if not pool_progress_update:
+        pool_progress_update = models.MinionPoolProgressUpdate(
+            pool_id=pool_id, current_step=step, total_steps=total_steps,
+            message=message)
+        _session(context).add(pool_progress_update)
+
+    pool_progress_update.pool_id = pool_id
+    pool_progress_update.current_step = step
+    pool_progress_update.total_steps = total_steps
+    pool_progress_update.message = message
+
+
 def _get_progress_update(context, task_id, current_step):
     q = _soft_delete_aware_query(context, models.TaskProgressUpdate)
     return q.filter(
@@ -1203,16 +1259,26 @@ def add_minion_pool(context, minion_pool):
 
 @enginefacade.writer
 def delete_minion_pool(context, minion_pool_id):
-    _delete_transfer_action(
-        context, models.MinionPool, minion_pool_id)
+    args = {"id": minion_pool_id}
+    if is_user_context(context):
+        args["project_id"] = context.tenant
+    count = _soft_delete_aware_query(context, models.MinionPool).filter_by(
+        **args).soft_delete()
+    if count == 0:
+        raise exception.NotFound("0 entries were soft deleted")
 
 
 @enginefacade.reader
 def get_minion_pool(
-        context, minion_pool_id, include_machines=True):
+        context, minion_pool_id, include_machines=True, include_events=True,
+        include_progress_updates=True):
     q = _soft_delete_aware_query(context, models.MinionPool)
     if include_machines:
         q = q.options(orm.joinedload('minion_machines'))
+    if include_events:
+        q = q.options(orm.joinedload('events'))
+    if include_progress_updates:
+        q = q.options(orm.joinedload('progress_updates'))
     if is_user_context(context):
         q = q.filter(
             models.MinionPool.project_id == context.tenant)
@@ -1222,14 +1288,19 @@ def get_minion_pool(
 
 @enginefacade.reader
 def get_minion_pools(
-        context, include_machines=False, to_dict=True):
+        context, include_machines=False, include_events=False,
+        include_progress_updates=False, to_dict=True):
     q = _soft_delete_aware_query(context, models.MinionPool)
     q = q.filter()
     if is_user_context(context):
         q = q.filter(
-            models.Replica.project_id == context.tenant)
+            models.MinionPool.project_id == context.tenant)
     if include_machines:
         q = q.options(orm.joinedload('minion_machines'))
+    if include_events:
+        q = q.options(orm.joinedload('events'))
+    if include_progress_updates:
+        q = q.options(orm.joinedload('progress_updates'))
     db_result = q.all()
     if to_dict:
         return [i.to_dict(
@@ -1258,8 +1329,8 @@ def set_minion_pool_status(context, minion_pool_id, status):
         context, minion_pool_id, include_machines=False)
     LOG.debug(
         "Transitioning minion pool '%s' from status '%s' to '%s'in DB",
-        minion_pool_id, pool.pool_status, status)
-    pool.pool_status = status
+        minion_pool_id, pool.status, status)
+    pool.status = status
     setattr(pool, 'updated_at', timeutils.utcnow())
 
 
@@ -1274,69 +1345,21 @@ def update_minion_pool(context, minion_pool_id, updated_values):
     updateable_fields = [
         "minimum_minions", "maximum_minions", "minion_max_idle_time",
         "minion_retention_strategy", "environment_options",
-        "pool_shared_resources", "notes", "pool_name", "pool_os_type"]
-    # TODO(aznashwan): this should no longer be required when the
-    # transfer action class hirearchy is to be overhauled:
-    redundancies = {
-        "environment_options": [
-            "source_environment", "destination_environment"]}
+        "shared_resources", "notes", "name", "os_type"]
     for field in updateable_fields:
         if field in updated_values:
-            if field in redundancies:
-                for old_key in redundancies[field]:
-                    LOG.debug(
-                        "Updating the '%s' field of Minion Pool '%s' to: '%s'",
-                        old_key, minion_pool_id, updated_values[field])
-                    setattr(lifecycle, old_key, updated_values[field])
-            else:
-                LOG.debug(
-                    "Updating the '%s' field of Minion Pool '%s' to: '%s'",
-                    field, minion_pool_id, updated_values[field])
-                setattr(lifecycle, field, updated_values[field])
+            LOG.debug(
+                "Updating the '%s' field of Minion Pool '%s' to: '%s'",
+                field, minion_pool_id, updated_values[field])
+            setattr(lifecycle, field, updated_values[field])
 
     non_updateable_fields = set(
         updated_values.keys()).difference(updateable_fields)
     if non_updateable_fields:
         LOG.warn(
-            "The following Replica fields can NOT be updated: %s",
+            "The following Minion Pool fields can NOT be updated: %s",
             non_updateable_fields)
 
     # the oslo_db library uses this method for both the `created_at` and
     # `updated_at` fields
     setattr(lifecycle, 'updated_at', timeutils.utcnow())
-
-# @enginefacade.reader
-# def get_minion_pool_executions(
-#         context, lifecycle_id, include_tasks=True):
-#     q = _soft_delete_aware_query(context, models.TasksExecution)
-#     q = q.join(models.MinionPool)
-#     if include_tasks:
-#         q = _get_tasks_with_details_options(q)
-#     if is_user_context(context):
-#         q = q.filter(models.MinionPool.project_id == context.tenant)
-#     return q.filter(
-#         models.MinionPool.id == lifecycle_id).all()
-
-# @enginefacade.reader
-# def get_minion_pool_execution(context, lifecycle_id, execution_id):
-#     q = _soft_delete_aware_query(context, models.TasksExecution).join(
-#         models.MinionPool)
-#     q = _get_tasks_with_details_options(q)
-#     if is_user_context(context):
-#         q = q.filter(models.MinionPool.project_id == context.tenant)
-#     return q.filter(
-#         models.MinionPool.id == lifecycle_id,
-#         models.TasksExecution.id == execution_id).first()
-
-# @enginefacade.writer
-# def delete_minion_pool_execution(context, execution_id):
-#     q = _soft_delete_aware_query(context, models.TasksExecution).filter(
-#         models.TasksExecution.id == execution_id)
-#     if is_user_context(context):
-#         if not q.join(models.MinionPool).filter(
-#                 models.MinionPool.project_id == (
-#                     context.tenant)).first():
-#             raise exception.NotAuthorized()
-#     count = q.soft_delete()
-#     if count == 0:
-#         raise exception.NotFound("0 entries were soft deleted")

+ 42 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -30,6 +30,15 @@ def upgrade(migrate_engine):
             sqlalchemy.Column(
                 "id", sqlalchemy.String(36),
                 default=lambda: str(uuid.uuid4()), primary_key=True),
+            sqlalchemy.Column("notes", sqlalchemy.Text, nullable=True),
+            sqlalchemy.Column(
+                "user_id", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                "project_id", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted', sqlalchemy.String(36)),
             sqlalchemy.Column(
                 "name", sqlalchemy.String(255), nullable=False),
             sqlalchemy.Column(
@@ -88,6 +97,39 @@ def upgrade(migrate_engine):
                 'provider_properties', sqlalchemy.Text,
                 nullable=True)))
 
+    tables.append(sqlalchemy.Table(
+        'minion_pool_event', meta,
+        sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                          default=lambda: str(uuid.uuid4())),
+        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+        sqlalchemy.Column("pool_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey('minion_pool.id'),
+                          nullable=False),
+        sqlalchemy.Column("level", sqlalchemy.String(50), nullable=False),
+        sqlalchemy.Column("message", sqlalchemy.String(1024), nullable=False),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'))
+
+    tables.append(sqlalchemy.Table(
+        'minion_pool_progress_update', meta,
+        sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                          default=lambda: str(uuid.uuid4())),
+        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+        sqlalchemy.Column("pool_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey('minion_pool.id'),
+                          nullable=False),
+        sqlalchemy.Column("current_step", sqlalchemy.Integer, nullable=False),
+        sqlalchemy.Column("total_steps", sqlalchemy.Integer, nullable=True),
+        sqlalchemy.Column("message", sqlalchemy.String(1024), nullable=True),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'))
+
     # add the pool option properties for the transfer:
     origin_minion_pool_id = sqlalchemy.Column(
         "origin_minion_pool_id", sqlalchemy.String(36),

+ 84 - 1
coriolis/db/sqlalchemy/models.py

@@ -42,6 +42,34 @@ class TaskEvent(BASE, models.TimestampMixin, models.SoftDeleteMixin,
         return result
 
 
+class MinionPoolEvent(BASE, models.TimestampMixin, models.SoftDeleteMixin,
+                models.ModelBase):
+    __tablename__ = 'minion_pool_event'
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    pool_id = sqlalchemy.Column(sqlalchemy.String(36),
+                                sqlalchemy.ForeignKey('minion_pool.id'),
+                                nullable=False)
+    level = sqlalchemy.Column(sqlalchemy.String(20), nullable=False)
+    message = sqlalchemy.Column(sqlalchemy.String(1024), nullable=False)
+
+    def to_dict(self):
+        result = {
+            "id": self.id,
+            "pool_id": self.pool_id,
+            "level": self.level,
+            "message": self.message,
+            "created_at": self.created_at,
+            "updated_at": self.updated_at,
+            "deleted_at": self.deleted_at,
+            "deleted": self.deleted,
+        }
+        return result
+
+
+
 class TaskProgressUpdate(BASE, models.TimestampMixin, models.SoftDeleteMixin,
                          models.ModelBase):
     __tablename__ = 'task_progress_update'
@@ -73,6 +101,37 @@ class TaskProgressUpdate(BASE, models.TimestampMixin, models.SoftDeleteMixin,
         return result
 
 
+class MinionPoolProgressUpdate(
+        BASE, models.TimestampMixin, models.SoftDeleteMixin, models.ModelBase):
+    __tablename__ = 'minion_pool_progress_update'
+    __table_args__ = (
+        schema.UniqueConstraint("pool_id", "current_step", "deleted"),)
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    pool_id = sqlalchemy.Column(sqlalchemy.String(36),
+                                sqlalchemy.ForeignKey('minion_pool.id'),
+                                nullable=False)
+    current_step = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
+    total_steps = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
+    message = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
+
+    def to_dict(self):
+        result = {
+            "id": self.id,
+            "pool_id": self.pool_id,
+            "current_step": self.current_step,
+            "total_steps": self.total_steps,
+            "message": self.message,
+            "created_at": self.created_at,
+            "updated_at": self.updated_at,
+            "deleted_at": self.deleted_at,
+            "deleted": self.deleted,
+        }
+        return result
+
+
 class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
            models.ModelBase):
     __tablename__ = 'task'
@@ -468,10 +527,13 @@ class MinionPool(
     id = sqlalchemy.Column(
         sqlalchemy.String(36),
         primary_key=True)
+    user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+    project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
 
     name = sqlalchemy.Column(
         sqlalchemy.String(255),
         nullable=False)
+    notes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     endpoint_id = sqlalchemy.Column(
         sqlalchemy.String(36),
         sqlalchemy.ForeignKey('endpoint.id'), nullable=False)
@@ -493,19 +555,34 @@ class MinionPool(
         sqlalchemy.Integer, nullable=False)
     minion_retention_strategy = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False)
+
     minion_machines = orm.relationship(
         MinionMachine, backref=orm.backref('minion_pool'),
         primaryjoin="and_(MinionMachine.pool_id==MinionPool.id, "
                     "MinionMachine.deleted=='0')")
+    events = orm.relationship(MinionPoolEvent, cascade="all,delete",
+                              backref=orm.backref('minion_pool'))
+    progress_updates = orm.relationship(MinionPoolProgressUpdate,
+                                        cascade="all,delete",
+                                        backref=orm.backref('minion_pool'),
+                                        order_by=(
+                                            MinionPoolProgressUpdate.current_step))
 
-    def to_dict(self, include_machines=True):
+    def to_dict(
+            self, include_machines=True, include_events=True,
+            include_progress_updates=True):
         base = {
             "id": self.id,
             "name": self.name,
+            "notes": self.notes,
             "endpoint_id": self.endpoint_id,
             "environment_options": self.environment_options,
             "os_type": self.os_type,
             "platform": self.platform,
+            "created_at": self.created_at,
+            "updated_at": self.updated_at,
+            "deleted_at": self.deleted_at,
+            "deleted": self.deleted,
             "shared_resources": self.shared_resources,
             "status": self.status,
             "minimum_minions": self.minimum_minions,
@@ -516,6 +593,12 @@ class MinionPool(
         if include_machines:
             base["minion_machines"] = [
                 machine.to_dict() for machine in self.minion_machines]
+        if include_events:
+            base["events"] = [
+                ev.to_dict() for ev in self.events]
+        if include_progress_updates:
+            base["progress_updates"] = [
+                pu.to_dict() for pu in self.progress_updates]
         return base
 
 

+ 2 - 2
coriolis/endpoint_options/api.py

@@ -1,12 +1,12 @@
 # Copyright 2020 Cloudbase Solutions Srl
 # All Rights Reserved.
 
-from coriolis.conductor.rpc import client as rpc_client
+from coriolis.minion_manager.rpc import client as rpc_client
 
 
 class API(object):
     def __init__(self):
-        self._rpc_client = rpc_client.ConductorClient()
+        self._rpc_client = rpc_client.MinionManagerClient()
 
     def get_endpoint_destination_options(
             self, ctxt, endpoint_id, env=None, option_names=None):

+ 15 - 12
coriolis/endpoints/api.py

@@ -2,55 +2,58 @@
 # All Rights Reserved.
 
 from coriolis import utils
-from coriolis.conductor.rpc import client as rpc_client
+from coriolis.conductor.rpc import client as rpc_conductor_client
+from coriolis.minion_manager.rpc import client as rpc_minion_manager_client
 
 
 class API(object):
     def __init__(self):
-        self._rpc_client = rpc_client.ConductorClient()
+        self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
+        self._rpc_minion_manager_client = (
+            rpc_minion_manager_client.MinionManagerClient())
 
     def create(self, ctxt, name, endpoint_type, description,
                connection_info, mapped_regions):
-        return self._rpc_client.create_endpoint(
+        return self._rpc_conductor_client.create_endpoint(
             ctxt, name, endpoint_type, description, connection_info,
             mapped_regions)
 
     def update(self, ctxt, endpoint_id, properties):
-        return self._rpc_client.update_endpoint(
+        return self._rpc_conductor_client.update_endpoint(
             ctxt, endpoint_id, properties)
 
     def delete(self, ctxt, endpoint_id):
-        self._rpc_client.delete_endpoint(ctxt, endpoint_id)
+        self._rpc_conductor_client.delete_endpoint(ctxt, endpoint_id)
 
     def get_endpoints(self, ctxt):
-        return self._rpc_client.get_endpoints(ctxt)
+        return self._rpc_conductor_client.get_endpoints(ctxt)
 
     def get_endpoint(self, ctxt, endpoint_id):
-        return self._rpc_client.get_endpoint(ctxt, endpoint_id)
+        return self._rpc_conductor_client.get_endpoint(ctxt, endpoint_id)
 
     def validate_connection(self, ctxt, endpoint_id):
-        return self._rpc_client.validate_endpoint_connection(
+        return self._rpc_conductor_client.validate_endpoint_connection(
             ctxt, endpoint_id)
 
     @utils.bad_request_on_error("Invalid destination environment: %s")
     def validate_target_environment(self, ctxt, endpoint_id, target_env):
-        return self._rpc_client.validate_endpoint_target_environment(
+        return self._rpc_conductor_client.validate_endpoint_target_environment(
             ctxt, endpoint_id, target_env)
 
     @utils.bad_request_on_error("Invalid source environment: %s")
     def validate_source_environment(self, ctxt, endpoint_id, source_env):
-        return self._rpc_client.validate_endpoint_source_environment(
+        return self._rpc_conductor_client.validate_endpoint_source_environment(
             ctxt, endpoint_id, source_env)
 
     @utils.bad_request_on_error("Invalid source minion pool environment: %s")
     def validate_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
-        return self._rpc_client.validate_endpoint_source_minion_pool_options(
+        return self._rpc_minion_manager_client.validate_endpoint_source_minion_pool_options(
             ctxt, endpoint_id, pool_environment)
 
     @utils.bad_request_on_error(
         "Invalid destination minion pool environment: %s")
     def validate_endpoint_destination_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
-        return self._rpc_client.validate_endpoint_destination_minion_pool_options(
+        return self._rpc_minion_manager_client.validate_endpoint_destination_minion_pool_options(
             ctxt, endpoint_id, pool_environment)

+ 4 - 0
coriolis/events.py

@@ -84,6 +84,10 @@ class BaseEventHandler(object, with_metaclass(abc.ABCMeta)):
     def update_task_progress_update(self, step, total_steps, message):
         pass
 
+    @abc.abstractmethod
+    def get_task_progress_step(self):
+        pass
+
     @abc.abstractmethod
     def info(self, message):
         pass

+ 28 - 4
coriolis/minion_manager/rpc/client.py

@@ -25,6 +25,30 @@ class MinionManagerClient(object):
             timeout = CONF.minion_manager.minion_mananger_rpc_timeout
         self._client = rpc.get_client(target, timeout=timeout)
 
+    def add_minion_pool_progress_update(
+            self, ctxt, minion_pool_id, total_steps, message):
+        return self._client.call(
+            ctxt, 'add_minion_pool_progress_update',
+            minion_pool_id=minion_pool_id,
+            total_steps=total_steps, message=message)
+
+    def update_minion_pool_progress_update(
+            self, ctxt, minion_pool_id, step, total_steps, message):
+        return self._client.call(
+            ctxt, 'update_minion_pool_progress_update',
+            minion_pool_id=minion_pool_id,
+            step=step, total_steps=total_steps, message=message)
+
+    def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, 'get_minion_pool_progress_step',
+            minion_pool_id=minion_pool_id)
+
+    def add_minion_pool_event(self, ctxt, minion_pool_id, level, message):
+        return self._client.call(
+            ctxt, 'add_minion_pool_event', minion_pool_id=minion_pool_id,
+            level=level, message=message)
+
     def get_diagnostics(self, ctxt):
         return self._client.call(ctxt, 'get_diagnostics')
 
@@ -70,15 +94,15 @@ class MinionManagerClient(object):
             ctxt, "tear_down_shared_minion_pool_resources",
             minion_pool_id=minion_pool_id, force=force)
 
-    def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
+    def allocate_minion_pool(self, ctxt, minion_pool_id):
         return self._client.call(
-            ctxt, "allocate_minion_pool_machines",
+            ctxt, "allocate_minion_pool",
             minion_pool_id=minion_pool_id)
 
-    def deallocate_minion_pool_machines(
+    def deallocate_minion_pool(
             self, ctxt, minion_pool_id, force=False):
         return self._client.call(
-            ctxt, "deallocate_minion_pool_machines",
+            ctxt, "deallocate_minion_pool",
             minion_pool_id=minion_pool_id,
             force=force)
 

+ 365 - 78
coriolis/minion_manager/rpc/server.py

@@ -9,6 +9,8 @@ import uuid
 from oslo_concurrency import lockutils
 from oslo_config import cfg
 from oslo_log import log as logging
+from taskflow.patterns import linear_flow
+from taskflow.patterns import unordered_flow
 
 from coriolis import constants
 from coriolis import exception
@@ -16,7 +18,10 @@ from coriolis import utils
 from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
+from coriolis.minion_manager.rpc import tasks as minion_manager_tasks
 from coriolis.scheduler.rpc import client as rpc_scheduler_client
+from coriolis.taskflow import base as coriolis_taskflow_base
+from coriolis.taskflow import runner as taskflow_runner
 from coriolis.worker.rpc import client as rpc_worker_client
 
 
@@ -24,7 +29,6 @@ VERSION = "1.0"
 
 LOG = logging.getLogger(__name__)
 
-
 MINION_MANAGER_OPTS = []
 
 CONF = cfg.CONF
@@ -45,6 +49,12 @@ def minion_pool_synchronized(func):
 
 class MinionManagerServerEndpoint(object):
 
+    @property
+    def _taskflow_runner(self):
+        return taskflow_runner.TaskFlowRunner(
+            constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC,
+            max_workers=25)
+
     @property
     def _rpc_worker_client(self):
         return rpc_worker_client.WorkerClient()
@@ -60,56 +70,97 @@ class MinionManagerServerEndpoint(object):
     def get_diagnostics(self, ctxt):
         return utils.get_diagnostics_info()
 
-    @staticmethod
-    def _update_minion_pool_status_for_finished_execution(
-            ctxt, execution, new_execution_status):
-        # status map if execution is active:
-        stat_map = {
-            constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
-                constants.MINION_POOL_STATUS_ALLOCATING,
-            constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
-                constants.MINION_POOL_STATUS_DEALLOCATING,
-            constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
-                constants.MINION_POOL_STATUS_INITIALIZING,
-            constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
-                constants.MINION_POOL_STATUS_UNINITIALIZING}
-        if new_execution_status == constants.EXECUTION_STATUS_COMPLETED:
-            stat_map = {
-                constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_ALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_DEALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_DEALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_UNINITIALIZED}
-        elif new_execution_status in constants.FINALIZED_TASK_STATUSES:
-            stat_map = {
-                constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_DEALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_ALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_UNINITIALIZED,
-                constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_UNINITIALIZED}
-        final_pool_status = stat_map.get(execution.type)
-        if not final_pool_status:
-            LOG.error(
-                "Could not determine pool status following transition of "
-                "execution '%s' (type '%s') to status '%s'. Presuming error "
-                "has occured. Marking piil as error'd.",
-                execution.id, execution.type, new_execution_status)
-            final_pool_status = constants.MINION_POOL_STATUS_ERROR
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+
+        worker_service = self._scheduler_client.get_worker_service_for_specs(
+            ctxt, enabled=True,
+            region_sets=[[reg.id for reg in endpoint['mapped_regions']]],
+            provider_requirements={
+                endpoint['type']: [
+                    constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
+        worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
+            worker_service)
+
+        return worker_rpc.get_endpoint_source_minion_pool_options(
+            ctxt, endpoint['type'], endpoint['connection_info'], env,
+            option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+
+        worker_service = self._scheduler_client.get_worker_service_for_specs(
+            ctxt, enabled=True,
+            region_sets=[[reg.id for reg in endpoint['mapped_regions']]],
+            provider_requirements={
+                endpoint['type']: [
+                    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
+        worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
+            worker_service)
+
+        return worker_rpc.get_endpoint_destination_minion_pool_options(
+            ctxt, endpoint['type'], endpoint['connection_info'], env,
+            option_names)
+
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+
+        worker_service = self._scheduler_client.get_worker_service_for_specs(
+            ctxt, enabled=True,
+            region_sets=[[reg.id for reg in endpoint['mapped_regions']]],
+            provider_requirements={
+                endpoint['type']: [
+                    constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
+        worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
+            worker_service)
+
+        return worker_rpc.validate_endpoint_source_minion_pool_options(
+            ctxt, endpoint['type'], pool_environment)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+
+        worker_service = self._scheduler_client.get_worker_service_for_specs(
+            ctxt, enabled=True,
+            region_sets=[[reg.id for reg in endpoint['mapped_regions']]],
+            provider_requirements={
+                endpoint['type']: [
+                    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
+        worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
+            worker_service)
+
+        return worker_rpc.validate_endpoint_destination_minion_pool_options(
+            ctxt, endpoint['type'], pool_environment)
 
+    @minion_pool_synchronized
+    def add_minion_pool_event(self, ctxt, minion_pool_id, level, message):
         LOG.info(
-            "Marking minion pool '%s' status as '%s' in the DB following the "
-            "transition of execution '%s' (type '%s') to status '%s'.",
-            execution.action_id, final_pool_status, execution.id,
-            execution.type, new_execution_status)
-        db_api.set_minion_pool_status(
-            ctxt, execution.action_id, final_pool_status)
+            "Minion pool event for pool %s: %s", minion_pool_id, message)
+        pool = db_api.get_minion_pool(ctxt, minion_pool_id)
+        db_api.add_minion_pool_event(ctxt, pool.id, level, message)
 
+    @minion_pool_synchronized
+    def add_minion_pool_progress_update(
+            self, ctxt, minion_pool_id, total_steps, message):
+        LOG.info(
+            "Adding pool progress update for %s: %s", minion_pool_id, message)
+        db_api.add_minion_pool_progress_update(
+            ctxt, minion_pool_id, total_steps, message)
+
+    @minion_pool_synchronized
+    def update_minion_pool_progress_update(
+            self, ctxt, minion_pool_id, step, total_steps, message):
+        LOG.info("Updating minion pool progress update: %s", minion_pool_id)
+        db_api.update_minion_pool_progress_update(
+            ctxt, minion_pool_id, step, total_steps, message)
+
+    @minion_pool_synchronized
+    def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
+        return db_api.get_minion_pool_progress_step(ctxt, minion_pool_id)
     def validate_minion_pool_selections_for_action(self, ctxt, action_id):
         action = db_api.get_action(ctxt, action_id)
         minion_pools = {
@@ -313,9 +364,9 @@ class MinionManagerServerEndpoint(object):
                         missing_pools))
 
             unallocated_pools = {
-                pool_id: pool.pool_status
+                pool_id: pool.status
                 for (pool_id, pool) in minion_pool_id_mappings.items()
-                if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
+                if pool.status != constants.MINION_POOL_STATUS_ALLOCATED}
             if unallocated_pools:
                 raise exception.InvalidMinionPoolSelection(
                     "The following minion pools have not had their machines "
@@ -471,36 +522,219 @@ class MinionManagerServerEndpoint(object):
                         "No minion machines were found to be associated "
                         "with action with base_id '%s'.", action.base_id)
 
+    def _get_minion_pool_allocation_flow(self, minion_pool):
+        """ Returns a taskflow.Flow object pertaining to all the tasks
+        required for allocating a minion pool (validation, shared resource
+        setup, and actual minion creation)
+        """
+        # create task flow:
+        deployment_flow = linear_flow.Flow(
+            minion_manager_tasks.MINION_POOL_DEPLOYMENT_FLOW_NAME_FORMAT % (
+                minion_pool.id))
+
+        # tansition pool to VALIDATING:
+        deployment_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
+            minion_pool.id, constants.MINION_POOL_STATUS_VALIDATING_INPUTS))
+
+        # add pool options validation task:
+        deployment_flow.add(minion_manager_tasks.ValidateMinionPoolOptionsTask(
+            # NOTE: we pass in the ID of the minion pool itself as both
+            # the task ID and the instance ID for tasks which are strictly
+            # pool-related.
+            minion_pool.id,
+            minion_pool.id,
+            minion_pool.platform))
+
+        # transition pool to 'DEPLOYING_SHARED_RESOURCES':
+        deployment_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
+            minion_pool.id,
+            constants.MINION_POOL_STATUS_ALLOCATING_SHARED_RESOURCES))
+
+        # add pool shared resources deployment task:
+        deployment_flow.add(minion_manager_tasks.DeploySharedPoolResourcesTask(
+            minion_pool.id, minion_pool.id, minion_pool.platform,
+            # NOTE: the shared resource deployment task will always get run
+            # by itself so it is safe to have it override the task_info:
+            provides='task_info'))
+
+        # add subflow for deploying all of the minion machines:
+        fmt = (
+            minion_manager_tasks.MINION_POOL_CREATE_MINIONS_SUBFLOW_NAME_FORMAT)
+        machines_flow = unordered_flow.Flow(fmt % minion_pool.id)
+        pool_machine_ids = []
+        for _ in range(minion_pool.minimum_minions):
+            machine_id = str(uuid.uuid4())
+            pool_machine_ids.append(machine_id)
+            machines_flow.add(
+                minion_manager_tasks.DeployMinionMachineTask(
+                    minion_pool.id, machine_id, minion_pool.platform))
+        # NOTE: bool(flow) == False if the flow has no child flows/tasks:
+        if machines_flow:
+            deployment_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
+                minion_pool.id,
+                constants.MINION_POOL_STATUS_DEALLOCATING_MACHINES))
+            LOG.debug(
+                "The following minion machine IDs will be created for "
+                "pool with ID '%s': %s" % (minion_pool.id, pool_machine_ids))
+            deployment_flow.add(machines_flow)
+        else:
+            LOG.debug(
+                "No upfront minion machine deployments required for minion "
+                "pool with ID '%s'", minion_pool.id)
+
+        # transition pool to ALLOCATED:
+        deployment_flow.add(minion_manager_tasks.UpdateMinionPoolStatusTask(
+            minion_pool.id, constants.MINION_POOL_STATUS_ALLOCATED))
+
+        return deployment_flow
+
+
     def create_minion_pool(
             self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
             environment_options, minimum_minions, maximum_minions,
             minion_max_idle_time, minion_retention_strategy, notes=None):
-        endpoint = db_api.get_endpoint(ctxt, endpoint_id)
 
+        endpoint_dict = self._conductor_client.get_endpoint(
+            ctxt, endpoint_id)
         minion_pool = models.MinionPool()
         minion_pool.id = str(uuid.uuid4())
-        minion_pool.pool_name = name
+        minion_pool.name = name
         minion_pool.notes = notes
-        minion_pool.pool_platform = pool_platform
-        minion_pool.pool_os_type = pool_os_type
+        minion_pool.platform = pool_platform
+        minion_pool.os_type = pool_os_type
         minion_pool.endpoint_id = endpoint_id
         minion_pool.environment_options = environment_options
-        minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
+        minion_pool.status = constants.MINION_POOL_STATUS_UNINITIALIZED
         minion_pool.minimum_minions = minimum_minions
         minion_pool.maximum_minions = maximum_minions
         minion_pool.minion_max_idle_time = minion_max_idle_time
         minion_pool.minion_retention_strategy = minion_retention_strategy
 
         db_api.add_minion_pool(ctxt, minion_pool)
+        allocation_flow = self._get_minion_pool_allocation_flow(minion_pool)
+
+        # start the deployment flow:
+        initial_store = self._get_pool_allocation_initial_store(
+            ctxt, minion_pool, endpoint_dict)
+        self._taskflow_runner.run_flow_in_background(
+            allocation_flow, store=initial_store)
+
         return self.get_minion_pool(ctxt, minion_pool.id)
 
+    def _get_pool_allocation_initial_store(
+            self, ctxt, minion_pool, endpoint_dict):
+        # NOTE: considering pools are associated to strictly one endpoint,
+        # we can duplicate the 'origin/destination':
+        origin_dest_info = {
+            "id": endpoint_dict['id'],
+            "connection_info": endpoint_dict['connection_info'],
+            "mapped_regions": endpoint_dict['mapped_regions'],
+            "type": endpoint_dict['type']}
+        initial_store = {
+            "context": ctxt,
+            "origin": origin_dest_info,
+            "destination": origin_dest_info,
+            "task_info": {
+                "pool_identifier": minion_pool.id,
+                "pool_os_type": minion_pool.os_type,
+                "pool_environment_options": minion_pool.environment_options}}
+        return initial_store
+
+    def _check_pool_machines_in_use(
+            self, ctxt, minion_pool, raise_if_in_use=False, requery=False):
+        """ Checks whether the given pool has any machines currently in-use.
+        Returns a list of the used machines if so, or an empty list of not.
+        """
+        if requery:
+            minion_pool = self._get_minion_pool(
+                ctxt, minion_pool.id, include_machines=True,
+                include_events=False, include_progress_updates=False)
+        unused_machine_states = [
+            constants.MINION_MACHINE_STATUS_AVAILABLE,
+            constants.MINION_MACHINE_STATUS_ERROR]
+        used_machines = {
+            mch for mch in minion_pool.minion_machines
+            if mch.status not in unused_machine_states}
+        if used_machines and raise_if_in_use:
+            raise exception.InvalidMinionPoolState(
+                "Minion pool '%s' has one or more machines which are in an"
+                " active state: %s" % (
+                    minion_pool.id, {
+                        mch.id: mch.status for mch in used_machines}))
+        return used_machines
+
+    @minion_pool_synchronized
+    def allocate_minion_pool(self, ctxt, minion_pool_id):
+        LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
+        minion_pool = self._get_minion_pool(
+            ctxt, minion_pool_id, include_events=False, include_machines=False,
+            include_progress_updates=False)
+        endpoint_dict = self._conductor_client.get_endpoint(
+            ctxt, minion_pool.endpoint_id)
+        acceptable_allocation_statuses = [
+            constants.MINION_POOL_STATUS_UNINITIALIZED,
+            constants.MINION_POOL_STATUS_DEALLOCATED]
+        if minion_pool.status not in acceptable_allocation_statuses:
+            raise exception.InvalidMinionPoolState(
+                "Minion machines for pool '%s' cannot be allocated as the pool"
+                " is in '%s' state instead of the expected %s."% (
+                    minion_pool_id, minion_pool.status,
+                    acceptable_allocation_statuses))
+
+        allocation_flow = self._get_minion_pool_allocation_flow(minion_pool)
+        initial_store = self._get_pool_allocation_initial_store(
+            ctxt, minion_pool, endpoint_dict)
+        self._taskflow_runner.run_flow_in_background(
+            allocation_flow, store=initial_store)
+
+        return self._get_minion_pool(ctxt, minion_pool.id)
+
+    def _get_minion_pool_deallocation_flow(self, minion_pool):
+        # TODO
+        pass
+
+    def _get_pool_deallocation_initial_store(
+            self, ctxt, minion_pool, endpoint_dict):
+        # TODO
+        pass
+
+    @minion_pool_synchronized
+    def deallocate_minion_pool(self, ctxt, minion_pool_id):
+        LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
+        minion_pool = self._get_minion_pool(
+            ctxt, minion_pool_id, include_events=False, include_machines=True,
+            include_progress_updates=False)
+        if minion_pool.status != constants.MINION_POOL_STATUS_ALLOCATED:
+            raise exception.InvalidMinionPoolState(
+                "Minion pool '%s' cannot be deallocated as the pool"
+                " is in '%s' state instead of the expected %s."% (
+                    minion_pool_id, minion_pool.status,
+                    constants.MINION_POOL_STATUS_DEALLOCATED))
+        self._check_pool_machines_in_use(
+            ctxt, minion_pool, raise_if_in_use=True)
+        endpoint_dict = self._conductor_client.get_endpoint(
+            ctxt, minion_pool.endpoint_id)
+
+        deallocation_flow = self._get_minion_pool_deallocation_flow(
+            minion_pool)
+        initial_store = self._get_pool_deallocation_initial_store(
+            ctxt, minion_pool, endpoint_dict)
+        self._taskflow_runner.run_flow_in_background(
+            deallocation_flow, store=initial_store)
+
+        return self._get_minion_pool(ctxt, minion_pool.id)
+
     def get_minion_pools(self, ctxt, include_machines=True):
-        return db_api.get_minion_pools(ctxt, include_machines=include_machines)
+        return db_api.get_minion_pools(
+            ctxt, include_machines=include_machines, include_events=False,
+            include_progress_updates=False)
 
     def _get_minion_pool(
-            self, ctxt, minion_pool_id, include_machines=True):
+            self, ctxt, minion_pool_id, include_machines=True,
+            include_events=True, include_progress_updates=True):
         minion_pool = db_api.get_minion_pool(
-            ctxt, minion_pool_id, include_machines=include_machines)
+            ctxt, minion_pool_id, include_machines=include_machines,
+            include_events=True, include_progress_updates=True)
         if not minion_pool:
             raise exception.NotFound(
                 "Minion pool with ID '%s' not found." % minion_pool_id)
@@ -514,11 +748,11 @@ class MinionManagerServerEndpoint(object):
     #     minion_pool = db_api.get_minion_pool_lifecycle(
     #         ctxt, minion_pool_id, include_tasks_executions=False,
     #         include_machines=False)
-    #     if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
+    #     if minion_pool.status != constants.MINION_POOL_STATUS_UNINITIALIZED:
     #         raise exception.InvalidMinionPoolState(
     #             "Minion Pool '%s' cannot have shared resources set up as it "
     #             "is in '%s' state instead of the expected %s."% (
-    #                 minion_pool_id, minion_pool.pool_status,
+    #                 minion_pool_id, minion_pool.status,
     #                 constants.MINION_POOL_STATUS_UNINITIALIZED))
 
     #     execution = models.TasksExecution()
@@ -529,7 +763,7 @@ class MinionManagerServerEndpoint(object):
     #         constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES)
 
     #     minion_pool.info[minion_pool_id] = {
-    #         "pool_os_type": minion_pool.pool_os_type,
+    #         "pool_os_type": minion_pool.os_type,
     #         "pool_identifier": minion_pool.id,
     #         # TODO(aznashwan): remove redundancy once transfer
     #         # action DB models have been overhauled:
@@ -539,7 +773,7 @@ class MinionManagerServerEndpoint(object):
     #         constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS)
     #     set_up_task_type = (
     #         constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES)
-    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #     if minion_pool.platform == constants.PROVIDER_PLATFORM_SOURCE:
     #         validate_task_type = (
     #             constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS)
     #         set_up_task_type = (
@@ -580,14 +814,14 @@ class MinionManagerServerEndpoint(object):
     #     minion_pool = db_api.get_minion_pool_lifecycle(
     #         ctxt, minion_pool_id, include_tasks_executions=False,
     #         include_machines=False)
-    #     if minion_pool.pool_status != (
+    #     if minion_pool.status != (
     #             constants.MINION_POOL_STATUS_DEALLOCATED) and not force:
     #         raise exception.InvalidMinionPoolState(
     #             "Minion Pool '%s' cannot have shared resources torn down as it"
     #             " is in '%s' state instead of the expected %s. "
     #             "Please use the force flag if you are certain you want "
     #             "to tear down the shared resources for this pool." % (
-    #                 minion_pool_id, minion_pool.pool_status,
+    #                 minion_pool_id, minion_pool.status,
     #                 constants.MINION_POOL_STATUS_DEALLOCATED))
 
     #     LOG.info(
@@ -603,7 +837,7 @@ class MinionManagerServerEndpoint(object):
 
     #     tear_down_task_type = (
     #         constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES)
-    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #     if minion_pool.platform == constants.PROVIDER_PLATFORM_SOURCE:
     #         tear_down_task_type = (
     #             constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES)
 
@@ -636,11 +870,11 @@ class MinionManagerServerEndpoint(object):
     #     minion_pool = self._get_minion_pool(
     #         ctxt, minion_pool_id, include_tasks_executions=False,
     #         include_machines=True)
-    #     if minion_pool.pool_status != constants.MINION_POOL_STATUS_DEALLOCATED:
+    #     if minion_pool.status != constants.MINION_POOL_STATUS_DEALLOCATED:
     #         raise exception.InvalidMinionPoolState(
     #             "Minion machines for pool '%s' cannot be allocated as the pool"
     #             " is in '%s' state instead of the expected %s."% (
-    #                 minion_pool_id, minion_pool.pool_status,
+    #                 minion_pool_id, minion_pool.status,
     #                 constants.MINION_POOL_STATUS_DEALLOCATED))
 
     #     execution = models.TasksExecution()
@@ -656,7 +890,7 @@ class MinionManagerServerEndpoint(object):
     #         constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
     #     delete_minion_task_type = (
     #         constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
-    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #     if minion_pool.platform == constants.PROVIDER_PLATFORM_SOURCE:
     #         create_minion_task_type = (
     #             constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
     #         delete_minion_task_type = (
@@ -665,8 +899,8 @@ class MinionManagerServerEndpoint(object):
     #     for minion_machine_id in new_minion_machine_ids:
     #         minion_pool.info[minion_machine_id] = {
     #             "pool_identifier": minion_pool_id,
-    #             "pool_os_type": minion_pool.pool_os_type,
-    #             "pool_shared_resources": minion_pool.pool_shared_resources,
+    #             "pool_os_type": minion_pool.os_type,
+    #             "pool_shared_resources": minion_pool.shared_resources,
     #             "pool_environment_options": minion_pool.source_environment,
     #             # NOTE: we default this to an empty dict here to avoid possible
     #             # task info conflicts on the cleanup task below for minions
@@ -726,14 +960,14 @@ class MinionManagerServerEndpoint(object):
     #     minion_pool = db_api.get_minion_pool_lifecycle(
     #         ctxt, minion_pool_id, include_tasks_executions=False,
     #         include_machines=True)
-    #     if minion_pool.pool_status not in (
+    #     if minion_pool.status not in (
     #             constants.MINION_POOL_STATUS_ALLOCATED) and not force:
     #         raise exception.InvalidMinionPoolState(
     #             "Minion Pool '%s' cannot be deallocated as it is in '%s' "
     #             "state instead of the expected '%s'. Please use the "
     #             "force flag if you are certain you want to deallocate "
     #             "the minion pool's machines." % (
-    #                 minion_pool_id, minion_pool.pool_status,
+    #                 minion_pool_id, minion_pool.status,
     #                 constants.MINION_POOL_STATUS_ALLOCATED))
 
     #     if not force:
@@ -748,7 +982,7 @@ class MinionManagerServerEndpoint(object):
 
     #     delete_minion_task_type = (
     #         constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
-    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #     if minion_pool.platform == constants.PROVIDER_PLATFORM_SOURCE:
     #         delete_minion_task_type = (
     #             constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
 
@@ -787,19 +1021,20 @@ class MinionManagerServerEndpoint(object):
     @minion_pool_synchronized
     def get_minion_pool(self, ctxt, minion_pool_id):
         return self._get_minion_pool(
-            ctxt, minion_pool_id, include_machines=True)
+            ctxt, minion_pool_id, include_machines=True, include_events=True,
+            include_progress_updates=True)
 
     @minion_pool_synchronized
     def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_machines=False)
-        if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
+        if minion_pool.status != constants.MINION_POOL_STATUS_UNINITIALIZED:
             raise exception.InvalidMinionPoolState(
                 "Minion Pool '%s' cannot be updated as it is in '%s' status "
                 "instead of the expected '%s'. Please ensure the pool machines"
                 "have been deallocated and the pool's supporting resources "
                 "have been torn down before updating the pool." % (
-                    minion_pool_id, minion_pool.pool_status,
+                    minion_pool_id, minion_pool.status,
                     constants.MINION_POOL_STATUS_UNINITIALIZED))
         LOG.info(
             "Attempting to update minion_pool '%s' with payload: %s",
@@ -815,13 +1050,13 @@ class MinionManagerServerEndpoint(object):
         acceptable_deletion_statuses = [
             constants.MINION_POOL_STATUS_UNINITIALIZED,
             constants.MINION_POOL_STATUS_ERROR]
-        if minion_pool.pool_status not in acceptable_deletion_statuses:
+        if minion_pool.status not in acceptable_deletion_statuses:
             raise exception.InvalidMinionPoolState(
                 "Minion Pool '%s' cannot be deleted as it is in '%s' status "
                 "instead of one of the expected '%s'. Please ensure the pool "
                 "machines have been deallocated and the pool's supporting "
                 "resources have been torn down before deleting the pool." % (
-                    minion_pool_id, minion_pool.pool_status,
+                    minion_pool_id, minion_pool.status,
                     acceptable_deletion_statuses))
 
         LOG.info("Deleting minion pool with ID '%s'" % minion_pool_id)
@@ -877,3 +1112,55 @@ class MinionManagerServerEndpoint(object):
     #             "Please use the force option if you'd like to force-cancel "
     #             "it." % (minion_pool_id))
     #     self._cancel_tasks_execution(ctxt, execution, force=force)
+
+    # @staticmethod
+    # def _update_minion_pool_status_for_finished_execution(
+    #         ctxt, execution, new_execution_status):
+    #     # status map if execution is active:
+    #     stat_map = {
+    #         constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
+    #             constants.MINION_POOL_STATUS_ALLOCATING,
+    #         constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
+    #             constants.MINION_POOL_STATUS_DEALLOCATING,
+    #         constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
+    #             constants.MINION_POOL_STATUS_INITIALIZING,
+    #         constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
+    #             constants.MINION_POOL_STATUS_UNINITIALIZING}
+    #     if new_execution_status == constants.EXECUTION_STATUS_COMPLETED:
+    #         stat_map = {
+    #             constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
+    #                 constants.MINION_POOL_STATUS_ALLOCATED,
+    #             constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
+    #                 constants.MINION_POOL_STATUS_DEALLOCATED,
+    #             constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
+    #                 constants.MINION_POOL_STATUS_DEALLOCATED,
+    #             constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
+    #                 constants.MINION_POOL_STATUS_UNINITIALIZED}
+    #     elif new_execution_status in constants.FINALIZED_TASK_STATUSES:
+    #         stat_map = {
+    #             constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
+    #                 constants.MINION_POOL_STATUS_DEALLOCATED,
+    #             constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
+    #                 constants.MINION_POOL_STATUS_ALLOCATED,
+    #             constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
+    #                 constants.MINION_POOL_STATUS_UNINITIALIZED,
+    #             constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
+    #                 constants.MINION_POOL_STATUS_UNINITIALIZED}
+    #     final_pool_status = stat_map.get(execution.type)
+    #     if not final_pool_status:
+    #         LOG.error(
+    #             "Could not determine pool status following transition of "
+    #             "execution '%s' (type '%s') to status '%s'. Presuming error "
+    #             "has occured. Marking piil as error'd.",
+    #             execution.id, execution.type, new_execution_status)
+    #         final_pool_status = constants.MINION_POOL_STATUS_ERROR
+
+    #     LOG.info(
+    #         "Marking minion pool '%s' status as '%s' in the DB following the "
+    #         "transition of execution '%s' (type '%s') to status '%s'.",
+    #         execution.action_id, final_pool_status, execution.id,
+    #         execution.type, new_execution_status)
+    #     db_api.set_minion_pool_status(
+    #         ctxt, execution.action_id, final_pool_status)
+
+

+ 335 - 41
coriolis/minion_manager/rpc/tasks.py

@@ -1,54 +1,348 @@
 # Copyright 2020 Cloudbase Solutions Srl
 # All Rights Reserved.
 
-from taskflow import task as taskflow_tasks
+import abc
+import copy
+
+from oslo_log import log as logging
 
 from coriolis import constants
-from coriolis import exception
+from coriolis.db import api as db_api
+from coriolis.db.sqlalchemy import models
+from coriolis.taskflow import base as coriolis_taskflow_base
+
+
+LOG = logging.getLogger(__name__)
+
+MINION_POOL_DEPLOYMENT_FLOW_NAME_FORMAT = "pool-%s-deployment"
+MINION_POOL_VALIDATION_TASK_NAME_FORMAT = "pool-%s-validation"
+MINION_POOL_UPDATE_STATUS_TASK_NAME_FORMAT = "pool-%s-update-status-%s"
+MINION_POOL_SET_UP_SHARED_RESOURCES_TASK_NAME_FORMAT = (
+    "pool-%s-set-up-shared-resources")
+MINION_POOL_TEAR_DOWN_SHARED_RESOURCES_TASK_NAME_FORMAT = (
+    "pool-%s-tear-down-shared-resources")
+MINION_POOL_CREATE_MINIONS_SUBFLOW_NAME_FORMAT = (
+    "pool-%s-machines-deployment")
+MINION_POOL_CREATE_MINION_TASK_NAME_FORMAT = (
+    "pool-%s-machine-%s-deployment")
+MINION_POOL_DELETE_MINION_TASK_NAME_FORMAT = (
+    "pool-%s-machine-%s-deletion")
 
 
-class BaseMinionManangerTask(taskflow_tasks.Task):
 
+class UpdateMinionPoolStatusTask(coriolis_taskflow_base.BaseCoriolisTaskflowTask):
+    """Task which updates the status of the given pool.
+    Is capable of recording and reverting the state.
     """
-    Base taskflow.Task implementation for Minion Mananger tasks.
+
+    def __init__(
+            self, minion_pool_id, target_status,
+            status_to_revert_to=None, **kwargs):
+
+        self._target_status = target_status
+        self._minion_pool_id = minion_pool_id
+        self._task_name = (MINION_POOL_UPDATE_STATUS_TASK_NAME_FORMAT % (
+            self._minion_pool_id, self._target_status)).lower()
+        self._previous_status = status_to_revert_to
+
+        super(UpdateMinionPoolStatusTask, self).__init__(
+            name=self._task_name, **kwargs)
+
+    def _add_minion_pool_event(
+            self, ctxt, message, level=constants.TASK_EVENT_INFO):
+        db_api.add_minion_pool_event(
+            ctxt, self._minion_pool_id, level, message)
+
+    def execute(self, context, *args, **kwargs):
+        super(UpdateMinionPoolStatusTask, self).execute(*args, **kwargs)
+
+        if not self._previous_status:
+            minion_pool = db_api.get_minion_pool(
+                context, self._minion_pool_id, include_machines=False,
+                include_events=False, include_progress_updates=False)
+            self._previous_status = minion_pool.status
+
+        if self._previous_status == self._target_status:
+            LOG.debug(
+                "[Task '%s'] Minion pool '%s' already in status '%s'. "
+                "Nothing to do." % (
+                    self._task_name, self._minion_pool_id,
+                    self._target_status))
+        else:
+            LOG.debug(
+                "[Task '%s'] Transitioning minion pool '%s' from status '%s' "
+                "to '%s'." % (
+                    self._task_name, self._minion_pool_id,
+                    self._previous_status, self._target_status))
+            db_api.set_minion_pool_status(
+                context, self._minion_pool_id, self._target_status)
+            self._add_minion_pool_event(
+                context,
+                "Pool status transitioned from '%s' to '%s'" % (
+                    self._previous_status, self._target_status))
+
+    def revert(self, context, *args, **kwargs):
+        super(UpdateMinionPoolStatusTask, self).revert(*args, **kwargs)
+
+        minion_pool = db_api.get_minion_pool(
+            context, self._minion_pool_id, include_machines=False,
+            include_events=False, include_progress_updates=False)
+        if not minion_pool:
+            LOG.debug(
+                "[Task '%s'] Could not find pool with ID '%s' for status "
+                "reversion." % (self._task_name, self._minion_pool_id))
+            return
+
+        if minion_pool.status == self._previous_status:
+            LOG.debug(
+                "[Task '%s'] Minion pool '%s' is/was already reverted to "
+                "'%s'." % (
+                    self._task_name, self._minion_pool_id,
+                    self._previous_status))
+        else:
+            if minion_pool.status != self._target_status:
+                LOG.warn(
+                    "[Task %s] Minion pool '%s' is in status '%s', which is "
+                    "neither the previous status ('%s'), nor the newly-set "
+                    "status ('%s'). Reverting to '%s' anyway.",
+                    self._task_name, self._minion_pool_id, minion_pool.status,
+                    self._previous_status, self._target_status,
+                    self._previous_status)
+            LOG.debug(
+                "[Task '%s'] Reverting pool '%s' status from '%s' to "
+                "'%s'" % (
+                    self._task_name, self._minion_pool_id, minion_pool.status,
+                    self._previous_status))
+            db_api.set_minion_pool_status(
+                context, self._minion_pool_id, self._previous_status)
+            self._add_minion_pool_event(
+                context,
+                "Pool status reverted from '%s' to '%s'" % (
+                    minion_pool.status, self._previous_status))
+
+
+class BaseMinionManangerTask(coriolis_taskflow_base.BaseRunWorkerTask):
+
+    """Base taskflow.Task implementation for Minion Mananger tasks.
+
+    Acts as a simple adapter between minion-pool-specific params and the
+    BaseRunWorkerTask.
     """
 
+    default_provides = 'task_info'
+
     def __init__(
-            self, ctxt, db_api, minion_pool_id, **kwargs):
+            self, minion_pool_id, minion_machine_id,
+            main_task_runner_type, **kwargs):
         self._minion_pool_id = minion_pool_id
-        self._db_api = db_api
-
-        super(BaseRunWorkerTask, self).__init__(**kwargs)
-
-    # @lock_on_pool
-    def pre_execute(self):
-        # TODO:
-        # check minion pool is ready for the task
-        # ask scheduler for worker service
-        # update minion pool status accordingly
-        pass
-
-    # @lock_on_pool
-    def execute(self):
-        # TODO:
-        # left to child classes
-        pass
-
-    def post_execute(self):
-        # TODO:
-        # update minion pool status accordingly
-        # record results (if any)
-        pass
-
-    def pre_revert(self):
-        # ask scheduler for worker service for reversion
-        pass
-
-    def revert(self):
-        # TODO:
-        # run reverting task
-        pass
-
-    def post_revert(self):
-        # TODO:
-        pass
+        self._minion_machine_id = minion_machine_id
+
+        super(BaseMinionManangerTask, self).__init__(
+            self._get_task_name(minion_pool_id, minion_machine_id),
+            # TODO(aznashwan): passing the minion pool ID as the task ID is
+            # required to allow for the Minion pool event manager in the worker
+            # service to know what pool to emit events for.
+            minion_pool_id, minion_machine_id, main_task_runner_type, **kwargs)
+
+    @abc.abstractmethod
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        raise NotImplementedError("No task name providable")
+
+    def _add_minion_pool_event(
+            self, ctxt, message, level=constants.TASK_EVENT_INFO):
+        db_api.add_minion_pool_event(
+            ctxt, self._minion_pool_id, level, message)
+
+    def execute(self, context, origin, destination, task_info, **kwargs):
+        LOG.info(
+            "Starting minion pool task '%s' (runner type '%s')",
+            self._task_name, self._main_task_runner_type)
+        res = super(BaseMinionManangerTask, self).execute(
+            context, origin, destination, task_info, **kwargs)
+        LOG.info(
+            "Completed minion pool task '%s' (runner type '%s')",
+            self._task_name, self._main_task_runner_type)
+        return res
+
+    def revert(self, context, origin, destination, task_info, **kwargs):
+        flow_failures = kwargs.get('flow_failures', {})
+        self._add_minion_pool_event(
+            context,
+            "Failure occurred for one or more operations on minion pool '%s'. "
+            "Please check the logs for additional details. Error messages "
+            "were: %s" % (
+                self._minion_pool_id,
+                self._get_error_str_for_flow_failures(
+                    flow_failures, full_tracebacks=False)),
+            level=constants.TASK_EVENT_ERROR)
+        return super(BaseMinionManangerTask, self).revert(
+            context, origin, destination, task_info, **kwargs)
+
+
+class ValidateMinionPoolOptionsTask(BaseMinionManangerTask):
+
+    def __init__(
+            self, minion_pool_id, minion_machine_id, minion_pool_type,
+            **kwargs):
+        task_type = constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS
+        if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
+            task_type = (
+                constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS)
+        super(ValidateMinionPoolOptionsTask, self).__init__(
+            minion_pool_id, minion_machine_id, task_type, **kwargs)
+
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        return MINION_POOL_VALIDATION_TASK_NAME_FORMAT % minion_pool_id
+
+    def execute(self, context, origin, destination, task_info):
+        self._add_minion_pool_event(
+            context, "Validating minion pool options")
+        res = super(ValidateMinionPoolOptionsTask, self).execute(
+            context, origin, destination, task_info)
+        self._add_minion_pool_event(
+            context, "Successfully validated minion pool options")
+
+
+class DeploySharedPoolResourcesTask(BaseMinionManangerTask):
+
+    def __init__(
+            self, minion_pool_id, minion_machine_id, minion_pool_type,
+            **kwargs):
+
+        resource_deployment_task_type = (
+            constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES)
+        resource_cleanup_task_type = (
+            constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES)
+        if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
+            resource_deployment_task_type = (
+                constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES)
+            resource_cleanup_task_type = (
+                constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES)
+        super(DeploySharedPoolResourcesTask, self).__init__(
+            minion_pool_id, minion_machine_id, resource_deployment_task_type,
+            cleanup_task_runner_type=resource_cleanup_task_type)
+
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        return MINION_POOL_SET_UP_SHARED_RESOURCES_TASK_NAME_FORMAT % (
+            minion_pool_id)
+
+    def execute(self, context, origin, destination, task_info):
+        self._add_minion_pool_event(
+            context, "Deploying shared pool resources")
+        res = super(DeploySharedPoolResourcesTask, self).execute(
+            context, origin, destination, task_info)
+        pool_shared_resources = res['pool_shared_resources']
+        self._add_minion_pool_event(
+            context, "Successfully deployed shared pool resources: %s" % (
+                pool_shared_resources))
+
+        updated_values = {
+            "shared_resources": pool_shared_resources}
+        db_api.add_minion_pool_event(
+            context, self._minion_pool_id, constants.TASK_EVENT_INFO,
+            "Successfully deployed shared pool resources: %s" % (
+                pool_shared_resources))
+        db_api.update_minion_pool(
+            context, self._minion_pool_id, updated_values)
+
+        task_info['pool_shared_resources'] = res['pool_shared_resources']
+        return task_info
+
+    def revert(self, context, origin, destination, task_info, **kwargs):
+        if 'pool_shared_resources' not in task_info:
+            task_info['pool_shared_resources'] = {}
+
+        res = super(DeploySharedPoolResourcesTask, self).revert(
+            context, origin, destination, task_info)
+
+        if res and res.get('pool_shared_resources'):
+            LOG.warn(
+                "Pool shared resources cleanup task has returned non-void "
+                "resources dict: %s", res.get['pool_shared_resources'])
+
+        updated_values = {
+            "pool_shared_resources": None}
+        db_api.update_minion_pool(
+            context, self._minion_pool_id, updated_values)
+
+        task_info['pool_shared_resources'] = None
+        return task_info
+
+
+class DeployMinionMachineTask(BaseMinionManangerTask):
+
+    def __init__(
+            self, minion_pool_id, minion_machine_id, minion_pool_type,
+            **kwargs):
+        resource_deployment_task_type = (
+            constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
+        resource_cleanup_task_type = (
+            constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE)
+        if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
+            resource_deployment_task_type = (
+                constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE)
+            resource_cleanup_task_type = (
+                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+        super(DeployMinionMachineTask, self).__init__(
+            minion_pool_id, minion_machine_id, resource_deployment_task_type,
+            cleanup_task_runner_type=resource_cleanup_task_type)
+
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        return MINION_POOL_CREATE_MINION_TASK_NAME_FORMAT % (
+            minion_pool_id, minion_machine_id)
+
+    def execute(self, context, origin, destination, task_info):
+        minion_machine = models.MinionMachine()
+        minion_machine.id = self._minion_machine_id
+        minion_machine.pool_id = self._minion_pool_id
+        minion_machine.status = (
+            constants.MINION_MACHINE_STATUS_DEPLOYING)
+        db_api.add_minion_machine(context, minion_machine)
+
+        execution_info = {
+            "pool_environment_options": task_info["pool_environment_options"],
+            "pool_identifier": task_info["pool_identifier"],
+            "pool_shared_resources": task_info["pool_shared_resources"],
+            "pool_os_type": task_info["pool_os_type"]}
+
+        res = super(DeployMinionMachineTask, self).execute(
+            context, origin, destination, execution_info)
+
+        updated_values = {
+            "status": constants.MINION_MACHINE_STATUS_AVAILABLE,
+            "connection_info": res['minion_connection_info'],
+            "provider_properties": res['minion_provider_properties'],
+            "backup_writer_connection_info": res[
+                "minion_backup_writer_connection_info"]}
+        db_api.update_minion_machine(
+            context, self._minion_machine_id, updated_values)
+
+        return task_info
+
+    def revert(self, context, origin, destination, task_info, **kwargs):
+        original_result = kwargs.get('result', {})
+        if original_result and (
+                isinstance(original_result, dict) and (
+                    'minion_provider_properties' not in original_result)):
+            LOG.debug(
+                "Reversion for Minion Machine '%s' (pool '%s') did not "
+                "receive any result from the original run. Presuming "
+                "that the task had not initially run successfully. "
+                "Result was: %s",
+                self._minion_machine_id, self._minion_pool_id,
+                original_result)
+            return task_info
+
+        cleanup_info = copy.deepcopy(task_info)
+        cleanup_info['minion_provider_properties'] = original_result[
+            'minion_provider_properties']
+        _ = super(DeployMinionMachineTask, self).revert(
+            context, origin, destination, cleanup_info)
+
+        if db_api.get_minion_machine(context, self._minion_machine_id):
+            LOG.debug(
+                "Removing minion machine entry with ID '%s' for minion pool "
+                "'%s' from DB.", self._minion_machine_id, self._minion_pool_id)
+            db_api.delete_minion_machine(context, self._minion_machine_id)
+
+        return task_info

+ 4 - 3
coriolis/scheduler/rpc/client.py

@@ -96,7 +96,7 @@ class SchedulerClient(rpc.BaseRPCClient):
 
     def get_worker_service_for_task(
             self, ctxt, task, origin_endpoint, destination_endpoint,
-            retry_count=5, retry_period=2):
+            retry_count=5, retry_period=2, random_choice=True):
         """ Gets a worker service for the task with the given properties
         and source/target endpoints.
 
@@ -153,9 +153,10 @@ class SchedulerClient(rpc.BaseRPCClient):
                     "'%s') from endpoints '%s' to '%s'", task['id'],
                     task['task_type'], origin_endpoint['id'],
                     destination_endpoint['id'])
-                worker_service = self.get_workers_for_specs(
+                worker_service = self.get_worker_service_for_specs(
                     ctxt, provider_requirements=provider_requirements,
-                    region_sets=required_region_sets, enabled=True)
+                    region_sets=required_region_sets, enabled=True,
+                    random_choice=random_choice)
                 LOG.debug(
                     "Scheduler has granted Worker Service '%s' for task with "
                     "ID '%s' (type '%s') from endpoints '%s' to '%s'",

+ 227 - 0
coriolis/taskflow/base.py

@@ -0,0 +1,227 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from taskflow import task as taskflow_tasks
+from taskflow.types import failure
+
+from coriolis import constants
+from coriolis import exception
+from coriolis import utils
+from coriolis.tasks import factory as tasks_factory
+from coriolis.scheduler.rpc import client as rpc_scheduler_client
+from coriolis.worker.rpc import client as rpc_worker_client
+
+
+TASK_RETURN_VALUE_FORMAT = "%s-result" % (
+        constants.TASK_LOCK_NAME_FORMAT)
+LOG = logging.getLogger()
+
+taskflow_opts = [
+    cfg.IntOpt("worker_task_execution_timeout",
+               default=3600,
+               help="Number of seconds until Coriolis tasks which are executed"
+                    "remotely on a Worker Service through taskflow timeout.")
+]
+
+CONF = cfg.CONF
+CONF.register_opts(taskflow_opts, 'taskflow')
+
+
+class BaseCoriolisTaskflowTask(taskflow_tasks.Task):
+    """ Base class for all TaskFlow tasks within Coriolis. """
+
+    def _get_error_str_for_flow_failures(
+            self, flow_failures, full_tracebacks=True):
+        if not flow_failures:
+            return "No flow failures provided."
+
+        res = "No flow failures present."
+        for (task_id, task_failure) in flow_failures.items():
+            label = "Error message"
+            failure_str = task_failure.exception_str
+            if full_tracebacks:
+                label = "Traceback"
+                failure_str = task_failure.traceback_str
+            res = (
+                "%s %s for task '%s': %s\n" % (
+                    res, label, task_id, failure_str))
+        if res:
+            # remove extra newline:
+            res = res[:-1]
+
+        return res
+
+    def revert(self, *args, **kwargs):
+        result = kwargs.get('result')
+        if isinstance(result, failure.Failure):
+            # it means that this is the task which error'd out:
+            LOG.error(
+                "Taskflow task '%s' is reverting after errorring out with the "
+                "following trace: %s", self.name, result.traceback_str)
+        else:
+            # else the failures were from other tasks:
+            flow_failures = kwargs.get('flow_failures', {})
+            LOG.error(
+                "Taskflow task '%s' is reverting after the failure of one "
+                "or more other tasks (%s) Tracebacks were:\n%s" % (
+                    self.name, list(flow_failures.keys()),
+                    self._get_error_str_for_flow_failures(
+                        flow_failures, full_tracebacks=True)))
+
+
+class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
+    """ Base taskflow.Task implementation for tasks which can be run
+    on the worker service.
+    This class can be seen as an "adapter" between the current
+    coriolis.tasks.TaskRunner classes and taskflow ones.
+
+    :param task_id: ID of the task. This value is declared as a returned value
+        from the task and can be set as a requirement for other tasks, thus
+        achieving a dependency system.
+    :param main_task_runner_class: constants.TASK_TYPE_* referencing the
+        main coriolis.tasks.TaskRunner class to be run on a worker service.
+    :param cleanup_task_runner_task: constants.TASK_TYPE_* referencing the
+        cleanup task to be run on reversion. No cleanup will be performed
+        during the task's reversion (apart from Worker Service deallocation)
+        otherwise.
+    """
+
+    def __init__(
+            self, task_name, task_id, task_instance, main_task_runner_type,
+            cleanup_task_runner_type=None, depends_on=None, **kwargs):
+        self._task_id = task_id
+        self._task_name = task_name
+        self._task_instance = task_instance
+        self._main_task_runner_type = main_task_runner_type
+        self._cleanup_task_runner_type = cleanup_task_runner_type
+
+        super(BaseRunWorkerTask, self).__init__(name=task_name, **kwargs)
+
+    @property
+    def _scheduler_client(self):
+        return rpc_scheduler_client.SchedulerClient()
+
+    def _set_provides_for_dependencies(self, kwargs):
+        dep = TASK_RETURN_VALUE_FORMAT % self._task_name
+        if kwargs.get('provides') is not None:
+            kwargs['provides'].append(dep)
+        else:
+            kwargs['provides'] = [dep]
+
+    def _set_requires_for_dependencies(self, kwargs, depends_on):
+        dep_requirements = [
+            TASK_RETURN_VALUE_FORMAT % dep_id
+            for dep_id in depends_on]
+        if kwargs.get('requires') is not None:
+            kwargs['requires'].extend(dep_requirements)
+        elif dep_requirements:
+            kwargs['requires'] = dep_requirements
+        return kwargs
+
+    def _set_requires_for_task_info_fields(self, kwargs):
+        new_requires = kwargs.get('requires', [])
+        main_task_runner = tasks_factory.get_task_runner_class(
+            self._main_task_runner_type)
+        main_task_deps = main_task_runner.get_required_task_info_properties()
+        new_requires.extend(main_task_deps)
+        if self._cleanup_task_runner_type:
+            cleanup_task_runner = tasks_factory.get_task_runner_class(
+                self._cleanup_task_runner_type)
+            cleanup_task_deps = list(
+                set(
+                    cleanup_task_runner.get_required_task_info_properties(
+                        )).difference(
+                            main_task_runner.get_returned_task_info_properties()))
+            new_requires.extend(cleanup_task_deps)
+
+        kwargs['requires'] = new_requires
+        return kwargs
+
+    def _set_provides_for_task_info_fields(self, kwargs):
+        new_provides = kwargs.get('provides', [])
+        main_task_runner = tasks_factory.get_task_runner_class(
+            self._main_task_runner_type)
+        main_task_res = main_task_runner.get_returned_task_info_properties()
+        new_provides.extend(main_task_res)
+        if self._cleanup_task_runner_type:
+            cleanup_task_runner = tasks_factory.get_task_runner_class(
+                self._cleanup_task_runner_type)
+            cleanup_task_res = list(
+                set(
+                    cleanup_task_runner.get_returned_task_info_properties(
+                        )).difference(
+                            main_task_runner.get_returned_task_info_properties()))
+            new_provides.extend(cleanup_task_res)
+
+        kwargs['provides'] = new_provides
+        return kwargs
+
+    def _get_worker_service_rpc_for_task(
+            self, ctxt, task_id, task_type, origin, destination,
+            retry_count=5, retry_period=2,
+            rpc_timeout=CONF.taskflow.worker_task_execution_timeout):
+        task_info = {
+            "id": task_id,
+            "task_type": task_type}
+        worker_service = self._scheduler_client.get_worker_service_for_task(
+            ctxt, task_info, origin, destination, retry_count=retry_count,
+            retry_period=retry_period, random_choice=True)
+        LOG.debug(
+            "Was offered the following worker service for executing TaskFlow "
+            "task '%s': %s", task_id, worker_service)
+
+        return rpc_worker_client.WorkerClient.from_service_definition(
+            worker_service, timeout=rpc_timeout)
+
+    def _execute_task(
+            self, ctxt, task_id, task_type, origin, destination, task_info):
+        worker_rpc = self._get_worker_service_rpc_for_task(
+            ctxt, self._task_id, task_type, origin, destination)
+
+        try:
+            LOG.debug(
+                "Starting to run task '%s' (type '%s') on worker service." % (
+                    task_id, task_type))
+            res = worker_rpc.run_task(
+                ctxt, None, self._task_id, task_type, origin, destination,
+                self._task_instance, task_info)
+            LOG.debug(
+                "Taskflow task '%s' (type %s) has successfully run and returned "
+                "the following info: %s" % (
+                    task_id, task_type, res))
+            return res
+        except Exception as ex:
+            LOG.debug(
+                "Exception occurred while executing task '%s' (type '%s') on "
+                "the worker service: %s", task_id, task_type,
+                utils.get_exception_details())
+            raise
+
+    def execute(self, context, origin, destination, task_info):
+        res = self._execute_task(
+            context, self._task_id, self._main_task_runner_type, origin,
+            destination, task_info)
+        return res
+
+    def revert(self, context, origin, destination, task_info, **kwargs):
+        super(BaseRunWorkerTask, self).revert(
+            context, origin, destination, task_info, **kwargs)
+        original_result = kwargs.get('result')
+        if not self._cleanup_task_runner_type:
+            LOG.debug(
+                "Task '%s' (main type '%s') had no cleanup task runner "
+                "associated with it. Skipping any reversion logic",
+                self._task_name, self._main_task_runner_type)
+            return original_result
+
+        res = self._execute_task(
+            context, self._task_id, self._cleanup_task_runner_type, origin,
+            destination, task_info)
+        LOG.debug(
+            "Reversion of taskflow task '%s' (ID '%s') was successfully "
+            "executed using task runner '%s' with the following result: %s" % (
+                self._task_name, self._task_id, self._cleanup_task_runner_type,
+                res))
+        return res

+ 67 - 7
coriolis/taskflow/runner.py

@@ -2,21 +2,26 @@
 # All Rights Reserved.
 
 # NOTE: we neeed to make sure eventlet is imported:
-import eventlet  #noqa
+import multiprocessing
+import sys
+from logging import handlers
 
+import eventlet  #noqa
+from oslo_config import cfg
 from oslo_log import log as logging
+from six.moves import queue
 from taskflow import engines
-from taskflow import task as taskflow_tasks
-from taskflow.patterns import unordered_flow
 from taskflow.types import notifier
 
+from coriolis import utils
+
 
 LOG = logging.getLogger(__name__)
 
 TASKFLOW_EXECUTION_ORDER_PARALLEL = 'parallel'
 TASKFLOW_EXECUTION_ORDER_SERIAL = 'serial'
 
-TASKFLOW_EXECUTOR_THREADS = "threaded"
+TASKFLOW_EXECUTOR_THREADED = "threaded"
 TASKFLOW_EXECUTOR_PROCESSES = "processes"
 TASKFLOW_EXECUTOR_GREENTHREADED = "greenthreaded"
 
@@ -26,7 +31,7 @@ class TaskFlowRunner(object):
     def __init__(
             self, service_name,
             execution_order=TASKFLOW_EXECUTION_ORDER_PARALLEL,
-            executor=TASKFLOW_EXECUTOR_GREENTHREADED,
+            executor=TASKFLOW_EXECUTOR_THREADED,
             max_workers=1):
 
         self._service_name = service_name
@@ -58,7 +63,7 @@ class TaskFlowRunner(object):
             notifier.Notifier.ANY, self._log_task_transition)
         return engine
 
-    def run_flow(self, flow, store=None):
+    def _run_flow(self, flow, store=None):
         LOG.debug("Ramping up to run flow with name '%s'", flow.name)
         engine = self._setup_engine_for_flow(flow, store=store)
 
@@ -70,6 +75,61 @@ class TaskFlowRunner(object):
 
         LOG.debug("Running flow with name '%s'", flow.name)
         engine.run()
-        LOG.debug(
+        LOG.info(
             "Successfully ran flow with name '%s'. Statistics were: %s",
             flow.name, engine.statistics)
+
+    def run_flow(self, flow, store=None):
+        self._run_flow(flow, store=store)
+
+    def _setup_task_process_logging(self, mp_log_q):
+        # Setting up logging and cfg, needed since this is a new process
+        cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
+        utils.setup_logging()
+
+        # Log events need to be handled in the parent process
+        log_root = logging.getLogger(None).logger
+        for handler in log_root.handlers:
+            log_root.removeHandler(handler)
+        log_root.addHandler(handlers.QueueHandler(mp_log_q))
+
+    def _run_flow_in_process(self, flow, mp_log_queue, store=None):
+        self._setup_task_process_logging(mp_log_queue)
+        self._run_flow(flow, store=store)
+
+    def _handle_mp_log_events(self, p, mp_log_q):
+        while True:
+            try:
+                record = mp_log_q.get(timeout=1)
+                if record is None:
+                    break
+                logger = logging.getLogger(record.name).logger
+                logger.handle(record)
+            except queue.Empty:
+                if not p.is_alive():
+                    break
+
+    def _spawn_process_flow(self, flow, store=None):
+        mp_ctx = multiprocessing.get_context('spawn')
+        mp_log_q = mp_ctx.Queue()
+        process = mp_ctx.Process(
+            target=self._run_flow_in_process,
+            args=(flow, mp_log_q, store))
+        LOG.debug("Starting new background process for flow '%s'", flow.name)
+        process.start()
+        LOG.debug(
+            "Sucessfully started background process for flow '%s' with "
+            "PID: '%d'", flow.name, process.pid)
+        eventlet.spawn(self._handle_mp_log_events, process, mp_log_q)
+
+    def run_flow_in_background(self, flow, store=None):
+        """ Starts the given flow in the background in a separate process.
+        Does NOT return/store any result.
+
+        All tasks must be "self-sufficient" and record their own results in
+        some fashion or another.
+        Care should be taken that any fields/attributes within the tasks
+        are thread/fork-safe.
+        The 'store' inputs should also only contain be thread-safe datatypes.
+        """
+        self._spawn_process_flow(flow, store=store)

+ 0 - 139
coriolis/taskflow/taskflow.py

@@ -1,139 +0,0 @@
-# Copyright 2020 Cloudbase Solutions Srl
-# All Rights Reserved.
-
-from taskflow import task as taskflow_tasks
-
-from coriolis import constants
-from coriolis import exception
-from coriolis.tass import factory as tasks_factory
-
-
-TASK_RETURN_VALUE_FORMAT = "%s-result" % (
-        constants.TASK_LOCK_NAME_FORMAT)
-
-
-class _BaseRunWorkerTask(taskflow_tasks.Task):
-
-    """
-    Base taskflow.Task implementation for tasks which can be run
-    on the worker service.
-    This class can be seen as an "adapter" between the current
-    coriolis.tasks.TaskRunner classes and taskflow ones.
-
-    :param task_id: ID of the task. This value is declared as a returned value
-        from the task and can be set as a requirement for other tasks, thus
-        achieving a dependency system.
-    :param main_task_runner_class: constants.TASK_TYPE_* referencing the
-        main coriolis.tasks.TaskRunner class to be run on a worker service.
-    :param cleanup_task_runner_task: constants.TASK_TYPE_* referencing the
-        cleanup task to be run on reversion. No cleanup will be performed
-        during the task's reversion (apart from Worker Service deallocation)
-        otherwise.
-    """
-
-    def __init__(
-            self, worker_service_host, task_id, main_task_runner_type,
-            cleanup_task_runner_type, depends_on=None, **kwargs):
-        self._main_task_runner_type = main_task_runner_type
-        self._cleanup_task_runner_type = cleanup_task_runner_type
-        self._worker_service_host = worker_service_host
-
-        super(_BaseRunWorkerTask, self).__init__(name=task_id, **kwargs)
-
-    def _set_provides_for_dependencies(self, kwargs):
-        dep = self.TASK_RETURN_VALUE_FORMAT % self.task_id
-        if kwargs.get('provides') is not None:
-            kwargs['provides'].append(dep)
-        else:
-            kwargs['provides'] = [dep]
-
-    def _set_requires_for_dependencies(self, kwargs, depends_on):
-        dep_requirements = [
-            self.TASK_RETURN_VALUE_FORMAT % dep_id
-            for dep_id in depends_on]
-        if kwargs.get('requires') is not None:
-            kwargs['requires'].extend(dep_requirements)
-        elif dep_requirements:
-            kwargs['requires'] = dep_requirements
-        return kwargs
-
-    def _set_requires_for_task_info_fields(self, kwargs):
-        new_requires = kwargs.get('requires', [])
-        main_task_runner = tasks_factory.get_task_runner_class(
-            self._main_task_runner_type)
-        main_task_deps = main_task_runner.get_required_task_info_properties()
-        new_requires.extend(main_task_deps)
-        if self._cleanup_task_runner_type:
-            cleanup_task_runner = tasks_factory.get_task_runner_class(
-                self._cleanup_task_runner_type)
-            cleanup_task_deps = list(
-                set(
-                    cleanup_task_runner.get_required_task_info_properties(
-                        )).difference(
-                            main_task_runner.get_returned_task_info_properties()))
-            new_requires.extend(cleanup_task_deps)
-
-        kwargs['requires'] = new_requires
-        return kwargs
-
-    def _set_provides_for_task_info_fields(self, kwargs):
-        new_provides = kwargs.get('provides', [])
-        main_task_runner = tasks_factory.get_task_runner_class(
-            self._main_task_runner_type)
-        main_task_res = main_task_runner.get_returned_task_info_properties()
-        new_provides.extend(main_task_res)
-        if self._cleanup_task_runner_type:
-            cleanup_task_runner = tasks_factory.get_task_runner_class(
-                self._cleanup_task_runner_type)
-            cleanup_task_res = list(
-                set(
-                    cleanup_task_runner.get_returned_task_info_properties(
-                        )).difference(
-                            main_task_runner.get_returned_task_info_properties()))
-            new_provides.extend(cleanup_task_res)
-
-        kwargs['provides'] = new_provides
-        return kwargs
-
-    def _get_worker_rpc_for_main_task(self):
-        pass
-
-    def _get_worker_rpc_for_cleanup_task(self):
-        if self._cleanup_task_runner_type:
-            # TODO
-            pass
-
-    def pre_execute(self):
-        # TODO:
-        # add task to Coriolis' DB? (should we do it here or in the conductor?)
-        # load up TaskRunner class
-        # get worker host (shared among all tasks being run)
-        pass
-
-    # @lock_on_task? (Coriolis' notion of a task)
-    def execute(self):
-        # TODO:
-        # actively run the thing on the worker
-        # return results_dict_from_worker
-        pass
-
-    def post_execute(self):
-        # TODO:
-        # deallocate worker service
-        # save task result to Coriolis' db
-        pass
-
-    def pre_revert(self):
-        # deallocate original worker service
-        # TODO: find worker to run reverting
-        pass
-
-    def revert(self):
-        # TODO:
-        # run reverting task
-        pass
-
-    def post_revert(self):
-        # TODO:
-        # deallocate worker service
-        pass

+ 20 - 5
coriolis/worker/rpc/client.py

@@ -34,20 +34,35 @@ class WorkerClient(rpc.BaseRPCClient):
         super(WorkerClient, self).__init__(
             target, timeout=timeout)
 
+    @classmethod
+    def from_service_definition(
+            cls, service, timeout=None, topic_override=None):
+        if service.get('topic') != constants.WORKER_MAIN_MESSAGING_TOPIC:
+            raise ValueError(
+                "Unknown topic '%s' for worker service client. Only "
+                "acceptable value is '%s': %s" % (
+                    service.get('topic'),
+                    constants.WORKER_MAIN_MESSAGING_TOPIC,
+                    service))
+        topic = constants.WORKER_MAIN_MESSAGING_TOPIC
+        if topic_override:
+            topic = topic_override
+        return cls(
+            timeout=timeout, base_worker_topic=topic, host=service.get('host'))
+
     def begin_task(self, ctxt, task_id, task_type, origin, destination,
                    instance, task_info):
         self._cast(
             ctxt, 'exec_task', task_id=task_id, task_type=task_type,
             origin=origin, destination=destination, instance=instance,
-            task_info=task_info, asynchronous=True)
+            task_info=task_info, report_to_conductor=True)
 
-    def run_task(self, ctxt, server, task_id, task_type, origin, destination,
+    def run_task(self, ctxt, task_id, task_type, origin, destination,
                  instance, task_info):
-        cctxt = self._client.prepare(server=server)
-        cctxt.cast(
+        return self._call(
             ctxt, 'exec_task', task_id=task_id, task_type=task_type,
             origin=origin, destination=destination, instance=instance,
-            task_info=task_info, asynchronous=False)
+            task_info=task_info, report_to_conductor=False)
 
     def cancel_task(self, ctxt, task_id, process_id, force):
         return self._call(

+ 59 - 8
coriolis/worker/rpc/server.py

@@ -21,6 +21,7 @@ from coriolis import constants
 from coriolis import context
 from coriolis import events
 from coriolis import exception
+from coriolis.minion_manager.rpc import client as rpc_minion_manager_client
 from coriolis.providers import factory as providers_factory
 from coriolis import schemas
 from coriolis import service
@@ -72,6 +73,54 @@ class _ConductorProviderEventHandler(events.BaseEventHandler):
             self._ctxt, self._task_id, constants.TASK_EVENT_ERROR, message)
 
 
+class _MinionPoolManagerProviderEventHandler(events.BaseEventHandler):
+    def __init__(self, ctxt, pool_id):
+        self._ctxt = ctxt
+        self._pool_id = pool_id
+        self._rpc_minion_manager_client = (
+            rpc_minion_manager_client.MinionManagerClient())
+
+    def add_task_progress_update(self, total_steps, message):
+        LOG.info(
+            "Minion pool '%s' progress update: %s", self._pool_id, message)
+        self._rpc_minion_manager_client.add_minion_pool_progress_update(
+            self._ctxt, self._pool_id, total_steps, message)
+
+    def update_task_progress_update(self, step, total_steps, message):
+        LOG.info(
+            "Minion pool '%s' progress update: %s", self._pool_id, message)
+        self._rpc_minion_manager_client.update_minion_pool_progress_update(
+            self._ctxt, self._pool_id, step, total_steps, message)
+
+    def get_task_progress_step(self):
+        return self._rpc_minion_manager_client.get_minion_pool_progress_step(
+            self._ctxt, self._pool_id)
+
+    def info(self, message):
+        LOG.info(message)
+        self._rpc_minion_manager_client.add_minion_pool_event(
+            self._ctxt, self._pool_id, constants.MINION_POOL_EVENT_INFO, message)
+
+    def warn(self, message):
+        LOG.warn(message)
+        self._rpc_minion_manager_client.add_minion_pool_event(
+            self._ctxt, self._pool_id, constants.MINION_POOL_EVENT_WARNING, message)
+
+    def error(self, message):
+        LOG.error(message)
+        self._rpc_minion_manager_client.add_minion_pool_event(
+            self._ctxt, self._pool_id, constants.MINION_POOL_EVENT_ERROR, message)
+
+
+# TODO(aznashwan): parametrize the event handler provided during task execution
+# to decouple what gets notified from the task running logic itself:
+def _get_event_handler_for_task_type(task_type, ctxt, task_object_id):
+    if task_type in constants.MINION_POOL_OPERATIONS_TASKS:
+        return _MinionPoolManagerProviderEventHandler(
+            ctxt, task_object_id)
+    return _ConductorProviderEventHandler(ctxt, task_object_id)
+
+
 class WorkerServerEndpoint(object):
     def __init__(self):
         self._server = utils.get_hostname()
@@ -198,7 +247,8 @@ class WorkerServerEndpoint(object):
         """ Returns a list of strings with paths on the worker with shared
         libraries needed by the source/destination providers.
         """
-        event_handler = _ConductorProviderEventHandler(ctxt, task_id)
+        event_handler = _get_event_handler_for_task_type(
+            task_type, ctxt, task_id)
         task_runner = task_runners_factory.get_task_runner_class(
             task_type)()
 
@@ -292,23 +342,23 @@ class WorkerServerEndpoint(object):
         return result
 
     def exec_task(self, ctxt, task_id, task_type, origin, destination,
-                  instance, task_info, asynchronous=True):
+                  instance, task_info, report_to_conductor=True):
         try:
             task_result = self._exec_task_process(
                 ctxt, task_id, task_type, origin, destination,
-                instance, task_info, report_to_conductor=asynchronous)
+                instance, task_info, report_to_conductor=report_to_conductor)
 
             LOG.info(
                 "Output of completed %s task with ID %s: %s",
                 task_type, task_id,
                 utils.sanitize_task_info(task_result))
 
-            if not asynchronous:
+            if not report_to_conductor:
                 return task_result
             self._rpc_conductor_client.task_completed(
                 ctxt, task_id, task_result)
         except exception.TaskProcessCanceledException as ex:
-            if asynchronous:
+            if report_to_conductor:
                 LOG.debug(
                     "Task with ID '%s' appears to have been cancelled. "
                     "Confirming cancellation to Conductor now. Error was: %s",
@@ -319,7 +369,7 @@ class WorkerServerEndpoint(object):
             else:
                 raise
         except exception.NoSuitableWorkerServiceError as ex:
-            if asynchronous:
+            if report_to_conductor:
                 LOG.warn(
                     "A conductor-side scheduling error has occurred following "
                     "the completion of task '%s'. Ignoring. Error was: %s",
@@ -327,7 +377,7 @@ class WorkerServerEndpoint(object):
             else:
                 raise
         except Exception as ex:
-            if asynchronous:
+            if report_to_conductor:
                 LOG.debug(
                     "Task with ID '%s' has error'd out. Reporting error to "
                     "Conductor now. Error was: %s",
@@ -657,7 +707,8 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
 
         task_runner = task_runners_factory.get_task_runner_class(
             task_type)()
-        event_handler = _ConductorProviderEventHandler(ctxt, task_id)
+        event_handler = _get_event_handler_for_task_type(
+            task_type, ctxt, task_id)
 
         LOG.debug("Executing task: %(task_id)s, type: %(task_type)s, "
                   "origin: %(origin)s, destination: %(destination)s, "

+ 2 - 1
requirements.txt

@@ -34,6 +34,7 @@ mysqlclient
 schedule
 strict-rfc3339
 sqlalchemy
+taskflow
 webob
 sshtunnel
-requests-unixsocket
+requests-unixsocket