瀏覽代碼

Renames Operation in Task

Alessandro Pilotti 10 年之前
父節點
當前提交
e6a48bc27a

+ 22 - 0
coriolis/cmd/db_sync.py

@@ -0,0 +1,22 @@
+import sys
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from coriolis.db import api as db_api
+
+CONF = cfg.CONF
+
+
+def main():
+    logging.register_options(CONF)
+    logging.setup(CONF, 'coriolis')
+
+    CONF(sys.argv[1:], project='coriolis',
+         version="1.0.0")
+
+    db_api.db_sync(db_api.get_engine())
+
+
+if __name__ == "__main__":
+    main()

+ 6 - 6
coriolis/conductor/rpc/client.py

@@ -26,14 +26,14 @@ class ConductorClient(object):
             ctxt, 'migrate_instances', origin=origin, destination=destination,
             instances=instances)
 
-    def set_operation_host(self, ctxt, operation_id, host):
+    def set_task_host(self, ctxt, task_id, host):
         self._client.call(
-            ctxt, 'set_operation_host', operation_id=operation_id, host=host)
+            ctxt, 'set_task_host', task_id=task_id, host=host)
 
-    def export_completed(self, ctxt, operation_id, export_info):
+    def export_completed(self, ctxt, task_id, export_info):
         self._client.call(
-            ctxt, 'export_completed', operation_id=operation_id,
+            ctxt, 'export_completed', task_id=task_id,
             export_info=export_info)
 
-    def import_completed(self, ctxt, operation_id):
-        self._client.call(ctxt, 'import_completed', operation_id=operation_id)
+    def import_completed(self, ctxt, task_id):
+        self._client.call(ctxt, 'import_completed', task_id=task_id)

+ 16 - 16
coriolis/conductor/rpc/server.py

@@ -42,40 +42,40 @@ class ConductorServerEndpoint(object):
         migration.destination = json.dumps(destination)
 
         for instance in instances:
-            op = models.Operation()
+            op = models.Task()
             op.id = str(uuid.uuid4())
             op.migration = migration
             op.instance = instance
-            op.status = constants.OPERATION_STATUS_STARTED
-            op.operation_type = constants.OPERATION_TYPE_EXPORT
+            op.status = constants.TASK_STATUS_STARTED
+            op.task_type = constants.TASK_TYPE_EXPORT
 
         db_api.add(ctxt, migration)
 
-        for op in migration.operations:
+        for op in migration.tasks:
             self._rpc_worker_client.begin_export_instance(
                 ctxt.to_dict(), op.id, origin, instance)
 
-    def set_operation_host(self, ctxt, operation_id, host):
+    def set_task_host(self, ctxt, task_id, host):
         # TODO: fix context
         from coriolis import context
         ctxt = context.CoriolisContext()
-        db_api.set_operation_host(ctxt, operation_id, host)
+        db_api.set_task_host(ctxt, task_id, host)
 
-    def export_completed(self, ctxt, operation_id, export_info):
+    def export_completed(self, ctxt, task_id, export_info):
         # TODO: fix context
         from coriolis import context
         ctxt = context.CoriolisContext()
 
-        db_api.update_operation_status(
-            ctxt, operation_id, constants.OPERATION_STATUS_COMPLETE)
-        op_export = db_api.get_operation(ctxt, operation_id)
+        db_api.update_task_status(
+            ctxt, task_id, constants.TASK_STATUS_COMPLETE)
+        op_export = db_api.get_task(ctxt, task_id)
 
-        op_import = models.Operation()
+        op_import = models.Task()
         op_import.id = str(uuid.uuid4())
         op_import.migration = op_export.migration
         op_import.instance = op_export.instance
-        op_import.status = constants.OPERATION_STATUS_STARTED
-        op_import.operation_type = constants.OPERATION_TYPE_IMPORT
+        op_import.status = constants.TASK_STATUS_STARTED
+        op_import.task_type = constants.TASK_TYPE_IMPORT
 
         db_api.add(ctxt, op_import)
 
@@ -85,10 +85,10 @@ class ConductorServerEndpoint(object):
             op_import.instance,
             export_info)
 
-    def import_completed(self, ctxt, operation_id):
+    def import_completed(self, ctxt, task_id):
         # TODO: fix context
         from coriolis import context
         ctxt = context.CoriolisContext()
 
-        db_api.update_operation_status(
-            ctxt, operation_id, constants.OPERATION_STATUS_COMPLETE)
+        db_api.update_task_status(
+            ctxt, task_id, constants.TASK_STATUS_COMPLETE)

+ 5 - 5
coriolis/constants.py

@@ -2,12 +2,12 @@ MIGRATION_STATUS_STARTED = "STARTED"
 MIGRATION_STATUS_COMPLETE = "COMPLETE"
 MIGRATION_STATUS_ERROR = "ERROR"
 
-OPERATION_STATUS_STARTED = "STARTED"
-OPERATION_STATUS_COMPLETE = "COMPLETE"
-OPERATION_STATUS_ERROR = "ERROR"
+TASK_STATUS_STARTED = "STARTED"
+TASK_STATUS_COMPLETE = "COMPLETE"
+TASK_STATUS_ERROR = "ERROR"
 
-OPERATION_TYPE_EXPORT = "EXPORT"
-OPERATION_TYPE_IMPORT = "IMPORT"
+TASK_TYPE_EXPORT = "EXPORT"
+TASK_TYPE_IMPORT = "IMPORT"
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2

+ 11 - 14
coriolis/db/api.py

@@ -35,14 +35,14 @@ def db_version(engine):
 @enginefacade.reader
 def get_migrations(context):
     return context.session.query(models.Migration).options(
-        orm.joinedload("operations")).filter_by(
+        orm.joinedload("tasks")).filter_by(
         user_id=context.user_id).all()
 
 
 @enginefacade.reader
 def get_migration(context, migration_id):
     return context.session.query(models.Migration).options(
-        orm.joinedload("operations")).filter_by(
+        orm.joinedload("tasks")).filter_by(
         user_id=context.user_id, id=migration_id).first()
 
 
@@ -52,24 +52,21 @@ def add(context, migration):
 
 
 @enginefacade.writer
-def update_operation_status(context, operation_id, status):
-    op = context.session.query(models.Operation).filter_by(
-        id=operation_id).first()
+def update_task_status(context, task_id, status):
+    op = context.session.query(models.Task).filter_by(
+        id=task_id).first()
     op.status = status
 
 
 @enginefacade.writer
-def set_operation_host(context, operation_id, host):
-    op = context.session.query(models.Operation).filter_by(
-        id=operation_id).first()
+def set_task_host(context, task_id, host):
+    op = context.session.query(models.Task).filter_by(
+        id=task_id).first()
     op.host = host
 
 
 @enginefacade.reader
-def get_operation(context, operation_id):
-    return context.session.query(models.Operation).options(
+def get_task(context, task_id):
+    return context.session.query(models.Task).options(
         orm.joinedload("migration")).filter_by(
-        id=operation_id).first()
-
-# TODO: move from here
-db_sync(get_engine())
+        id=task_id).first()

+ 4 - 4
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -22,8 +22,8 @@ def upgrade(migrate_engine):
         mysql_charset='utf8'
     )
 
-    operation = sqlalchemy.Table(
-        'operation', meta,
+    task = sqlalchemy.Table(
+        'task', meta,
         sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
                           default=lambda: str(uuid.uuid4())),
         sqlalchemy.Column('created_at', sqlalchemy.DateTime),
@@ -34,7 +34,7 @@ def upgrade(migrate_engine):
         sqlalchemy.Column("instance", sqlalchemy.String(1024), nullable=False),
         sqlalchemy.Column("host", sqlalchemy.String(1024), nullable=True),
         sqlalchemy.Column("status", sqlalchemy.String(100), nullable=False),
-        sqlalchemy.Column("operation_type", sqlalchemy.String(100)
+        sqlalchemy.Column("task_type", sqlalchemy.String(100),
                           nullable=False),
         mysql_engine='InnoDB',
         mysql_charset='utf8'
@@ -42,7 +42,7 @@ def upgrade(migrate_engine):
 
     tables = (
         migration,
-        operation,
+        task,
     )
 
     for index, table in enumerate(tables):

+ 6 - 6
coriolis/db/sqlalchemy/models.py

@@ -8,8 +8,8 @@ from sqlalchemy import ForeignKey, DateTime, Boolean, Text, Float
 BASE = declarative.declarative_base()
 
 
-class Operation(BASE, models.TimestampMixin, models.ModelBase):
-    __tablename__ = 'operation'
+class Task(BASE, models.TimestampMixin, models.ModelBase):
+    __tablename__ = 'task'
 
     id = Column(String(36), default=lambda: str(uuid.uuid4()),
                 primary_key=True)
@@ -17,11 +17,11 @@ class Operation(BASE, models.TimestampMixin, models.ModelBase):
                           ForeignKey('migration.id'),
                           nullable=False)
     # migration = relationship("Migration",
-    # backref=backref("operations"), lazy='joined')
+    # backref=backref("tasks"), lazy='joined')
     instance = Column(String(1024), nullable=False)
     host = Column(String(1024), nullable=True)
     status = Column(String(100), nullable=False)
-    operation_type = Column(String(100), nullable=False)
+    task_type = Column(String(100), nullable=False)
 
 
 class Migration(BASE, models.TimestampMixin, models.ModelBase):
@@ -32,5 +32,5 @@ class Migration(BASE, models.TimestampMixin, models.ModelBase):
     origin = Column(String(1024), nullable=False)
     destination = Column(String(1024), nullable=False)
     status = Column(String(100), nullable=False)
-    operations = relationship(Operation, cascade="all,delete",
-                              backref=backref('migration'))
+    tasks = relationship(Task, cascade="all,delete",
+                         backref=backref('migration'))

+ 5 - 5
coriolis/worker/rpc/client.py

@@ -14,19 +14,19 @@ class WorkerClient(object):
             cfg.CONF, CONF.messaging_transport_url)
         self._client = messaging.RPCClient(transport, target)
 
-    def begin_export_instance(self, ctxt, operation_id, origin, instance):
+    def begin_export_instance(self, ctxt, task_id, origin, instance):
         self._client.cast(
-            ctxt, 'export_instance', operation_id=operation_id, origin=origin,
+            ctxt, 'export_instance', task_id=task_id, origin=origin,
             instance=instance)
 
-    def begin_import_instance(self, ctxt, server, operation_id, destination,
+    def begin_import_instance(self, ctxt, server, task_id, destination,
                               instance, export_info):
         # Needs to be executed on the same server
         cctxt = self._client.prepare(server=server)
         cctxt.cast(
-            ctxt, 'import_instance', operation_id=operation_id,
+            ctxt, 'import_instance', task_id=task_id,
             destination=destination, instance=instance,
             export_info=export_info)
 
-    def update_migration_status(self, ctx, operation_id, status):
+    def update_migration_status(self, ctx, task_id, status):
         self._client.call(ctxt, "update_migration_status", status=status)

+ 20 - 15
coriolis/worker/rpc/server.py

@@ -29,36 +29,41 @@ class WorkerServerEndpoint(object):
         self._server = utils.get_hostname()
         self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
 
-    def _get_operation_export_path(self, operation_id):
-        path = os.path.join(CONF.worker.export_base_path, operation_id)
+    def _get_task_export_path(self, task_id):
+        path = os.path.join(CONF.worker.export_base_path, task_id)
         if not os.path.exists(path):
             os.makedirs(path)
         return path
 
-    def export_instance(self, ctxt, operation_id, origin, instance):
-        self._rpc_conductor_client.set_operation_host(
-            ctxt, operation_id, self._server)
+    def _cleanup_task_export_path(self, export_path):
+        if os.path.exists(export_path):
+            shtutil.rmtree(export_path)
+
+    def export_instance(self, ctxt, task_id, origin, instance):
+        self._rpc_conductor_client.set_task_host(
+            ctxt, task_id, self._server)
 
         try:
             export_provider = factory.get_provider(
                 origin["type"], constants.PROVIDER_TYPE_EXPORT)
-            export_path = self._get_operation_export_path(operation_id)
+            export_path = self._get_task_export_path(task_id)
             vm_info = export_provider.export_instance(
                 origin["connection_info"], instance, export_path)
             LOG.info("Exported VM: %s" % vm_info)
 
             self._rpc_conductor_client.export_completed(
-                ctxt, operation_id, vm_info)
+                ctxt, task_id, vm_info)
         except Exception as ex:
             LOG.exception(ex)
+            self._cleanup_task_export_path(export_path)
             # TODO: set error state
-            # self._rpc_conductor_client.set_operation_error(ctxt,
-            # operation_id, ex)
+            # self._rpc_conductor_client.set_task_error(ctxt,
+            # task_id, ex)
 
-    def import_instance(self, ctxt, operation_id, destination, instance,
+    def import_instance(self, ctxt, task_id, destination, instance,
                         export_info):
-        self._rpc_conductor_client.set_operation_host(
-            ctxt, operation_id, self._server)
+        self._rpc_conductor_client.set_task_host(
+            ctxt, task_id, self._server)
 
         try:
             import_provider = factory.get_provider(
@@ -68,9 +73,9 @@ class WorkerServerEndpoint(object):
                 destination["target_environment"],
                 instance, export_info)
 
-            self._rpc_conductor_client.import_completed(ctxt, operation_id)
+            self._rpc_conductor_client.import_completed(ctxt, task_id)
         except Exception as ex:
             LOG.exception(ex)
             # TODO: set error state
-            # self._rpc_conductor_client.set_operation_error(
-            # ctxt, operation_id, ex)
+            # self._rpc_conductor_client.set_task_error(
+            # ctxt, task_id, ex)