Просмотр исходного кода

Improves migration status updates

Alessandro Pilotti 10 лет назад
Родитель
Сommit
53ad8f72ab

+ 69 - 29
coriolis/conductor/rpc/server.py

@@ -30,19 +30,28 @@ class ConductorServerEndpoint(object):
         migration.destination = destination
 
         for instance in instances:
-            task = models.Task()
-            task.id = str(uuid.uuid4())
-            task.migration = migration
-            task.instance = instance
-            task.status = constants.TASK_STATUS_STARTED
-            task.task_type = constants.TASK_TYPE_EXPORT
+            task_export = models.Task()
+            task_export.id = str(uuid.uuid4())
+            task_export.migration = migration
+            task_export.instance = instance
+            task_export.status = constants.TASK_STATUS_WAITING
+            task_export.task_type = constants.TASK_TYPE_EXPORT
+
+            task_import = models.Task()
+            task_import.id = str(uuid.uuid4())
+            task_import.migration = migration
+            task_import.instance = instance
+            task_import.status = constants.TASK_STATUS_WAITING
+            task_import.task_type = constants.TASK_TYPE_IMPORT
+            task_import.depends_on = [task_export.id]
 
         db_api.add_migration(ctxt, migration)
         LOG.info("Migration created: %s", migration.id)
 
         for task in migration.tasks:
-            self._rpc_worker_client.begin_export_instance(
-                ctxt, task.id, origin, instance)
+            if not task.depends_on:
+                self._rpc_worker_client.begin_export_instance(
+                    ctxt, task.id, origin, instance)
 
         return self.get_migration(ctxt, migration.id)
 
@@ -55,33 +64,64 @@ class ConductorServerEndpoint(object):
 
     def set_task_host(self, ctxt, task_id, host, process_id):
         db_api.set_task_host(ctxt, task_id, host, process_id)
+        db_api.set_task_status(
+            ctxt, task_id, constants.TASK_STATUS_STARTED)
+
+    def _start_pending_tasks(self, ctxt, migration, parent_task, task_info):
+        has_pending_tasks = False
+        for task in migration.tasks:
+            if (task.depends_on and parent_task.id in task.depends_on and
+                    task.status == constants.TASK_STATUS_WAITING):
+                has_pending_tasks = True
+                if task.task_type == constants.TASK_TYPE_IMPORT:
+                    # Needs to be executed on the same host
+                    self._rpc_worker_client.begin_import_instance(
+                        ctxt, parent_task.host, task.id,
+                        migration.destination,
+                        task.instance,
+                        task_info)
+        return has_pending_tasks
 
     def export_completed(self, ctxt, task_id, export_info):
-        db_api.update_task_status(
-            ctxt, task_id, constants.TASK_STATUS_COMPLETE)
-        op_export = db_api.get_task(ctxt, task_id)
+        self._task_completed(ctxt, task_id, export_info)
+
+    def _task_completed(self, ctxt, task_id, task_info):
+        LOG.info("Task completed: %s", task_id)
 
-        op_import = models.Task()
-        op_import.id = str(uuid.uuid4())
-        op_import.migration = op_export.migration
-        op_import.instance = op_export.instance
-        op_import.status = constants.TASK_STATUS_STARTED
-        op_import.task_type = constants.TASK_TYPE_IMPORT
+        db_api.set_task_status(
+            ctxt, task_id, constants.TASK_STATUS_COMPLETED)
 
-        db_api.add(ctxt, op_import)
+        task = db_api.get_task(
+            ctxt, task_id, include_migration_tasks=True)
 
-        self._rpc_worker_client.begin_import_instance(
-            ctxt, op_export.host, op_import.id,
-            op_import.migration.destination,
-            op_import.instance,
-            export_info)
+        migration = task.migration
+        has_pending_tasks = self._start_pending_tasks(ctxt, migration, task,
+                                                      task_info)
+
+        if not has_pending_tasks:
+            LOG.info("Migration completed: %s", migration.id)
+            db_api.set_migration_status(
+                ctxt, migration.id, constants.MIGRATION_STATUS_COMPLETED)
 
     def import_completed(self, ctxt, task_id):
-        db_api.update_task_status(
-            ctxt, task_id, constants.TASK_STATUS_COMPLETE)
+        self._task_completed(ctxt, task_id, None)
 
     def set_task_error(self, ctxt, task_id, exception_details):
-        db_api.update_task_status(
-            ctxt, task_id, constants.TASK_STATUS_ERROR,
-            exception_details)
-        # TODO: set migration in error state and canel other tasks
+        LOG.error("Task error: %(task_id)s - %(ex)s",
+                  {"task_id": task_id, "ex": exception_details})
+
+        db_api.set_task_status(
+            ctxt, task_id, constants.TASK_STATUS_ERROR, exception_details)
+
+        task = db_api.get_task(
+            ctxt, task_id, include_migration_tasks=True)
+        migration = task.migration
+
+        for task in migration.tasks:
+            if task.status == constants.TASK_STATUS_WAITING:
+                db_api.set_task_status(
+                    ctxt, task_id, constants.TASK_STATUS_CANCELED)
+
+        LOG.error("Migration failed: %s", migration.id)
+        db_api.set_migration_status(
+            ctxt, migration.id, constants.MIGRATION_STATUS_ERROR)

+ 4 - 2
coriolis/constants.py

@@ -1,10 +1,12 @@
 MIGRATION_STATUS_STARTED = "STARTED"
-MIGRATION_STATUS_COMPLETE = "COMPLETE"
+MIGRATION_STATUS_COMPLETED = "COMPLETED"
 MIGRATION_STATUS_ERROR = "ERROR"
 
+TASK_STATUS_WAITING = "WAITING"
 TASK_STATUS_STARTED = "STARTED"
-TASK_STATUS_COMPLETE = "COMPLETE"
+TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
+TASK_STATUS_CANCELED = "CANCELED"
 
 TASK_TYPE_EXPORT = "EXPORT"
 TASK_TYPE_IMPORT = "IMPORT"

+ 14 - 5
coriolis/db/api.py

@@ -50,12 +50,18 @@ def get_migration(context, migration_id):
 def add_migration(context, migration):
     migration.user_id = context.user
     migration.project_id = context.tenant
-
     context.session.add(migration)
 
 
 @enginefacade.writer
-def update_task_status(context, task_id, status, exception_details=None):
+def set_migration_status(context, migration_id, status):
+    migration = context.session.query(models.Migration).filter_by(
+        project_id=context.tenant, id=migration_id).first()
+    migration.status = status
+
+
+@enginefacade.writer
+def set_task_status(context, task_id, status, exception_details=None):
     task = context.session.query(models.Task).filter_by(
         id=task_id).first()
     task.status = status
@@ -71,7 +77,10 @@ def set_task_host(context, task_id, host, process_id):
 
 
 @enginefacade.reader
-def get_task(context, task_id):
-    return context.session.query(models.Task).options(
-        orm.joinedload("migration")).filter_by(
+def get_task(context, task_id, include_migration_tasks=False):
+    join_options = orm.joinedload("migration")
+    if include_migration_tasks:
+        join_options = join_options.joinedload("tasks")
+
+    return context.session.query(models.Task).options(join_options).filter_by(
         id=task_id).first()

+ 1 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -40,6 +40,7 @@ def upgrade(migrate_engine):
         sqlalchemy.Column("task_type", sqlalchemy.String(100),
                           nullable=False),
         sqlalchemy.Column("exception_details", sqlalchemy.Text, nullable=True),
+        sqlalchemy.Column("depends_on", sqlalchemy.Text, nullable=True),
         mysql_engine='InnoDB',
         mysql_charset='utf8'
     )

+ 1 - 0
coriolis/db/sqlalchemy/models.py

@@ -25,6 +25,7 @@ class Task(BASE, models.TimestampMixin, models.ModelBase):
     status = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
+    depends_on = sqlalchemy.Column(types.List, nullable=True)
 
 
 class Migration(BASE, models.TimestampMixin, models.ModelBase):

+ 18 - 0
coriolis/db/sqlalchemy/types.py

@@ -36,3 +36,21 @@ class Json(LongText):
         if value is None:
             return None
         return jsonutils.loads(value)
+
+
+class List(types.TypeDecorator):
+    impl = types.Text
+
+    def load_dialect_impl(self, dialect):
+        if dialect.name == 'mysql':
+            return dialect.type_descriptor(mysql.LONGTEXT())
+        else:
+            return self.impl
+
+    def process_bind_param(self, value, dialect):
+        return jsonutils.dumps(value)
+
+    def process_result_value(self, value, dialect):
+        if value is None:
+            return None
+        return jsonutils.loads(value)