فهرست منبع

Adds replica snapshot disks restore

Alessandro Pilotti 9 سال پیش
والد
کامیت
fcba9ca131

+ 34 - 2
coriolis/conductor/rpc/server.py

@@ -311,6 +311,11 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS,
                 execution, [deploy_replica_task.id])
 
+            self._create_task(
+                instance,
+                constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS,
+                execution, on_error=True)
+
         db_api.add_migration(ctxt, migration)
         LOG.info("Migration created: %s", migration.id)
 
@@ -440,6 +445,26 @@ class ConductorServerEndpoint(object):
                             instance=task.instance,
                             task_info=task_info)
 
+    def _update_replica_volumes_info(self, ctxt, migration_id, instance,
+                                     task_info):
+        # When restoring a snapshot in some import providers (e.g. OpenStack),
+        # a new volume_id is generated. This needs to be updated in the
+        # Replica instance as well.
+        volumes_info = task_info.get("volumes_info")
+        if volumes_info:
+            updated_task_info = {"volumes_info": volumes_info}
+
+            migration = db_api.get_migration(ctxt, migration_id)
+            replica_id = migration.replica_id
+
+            with lockutils.lock(replica_id):
+                LOG.debug(
+                    "Updating volume_info in replica due to snapshot "
+                    "restore during migration. replica id: %s", replica_id)
+                db_api.set_transfer_action_info(
+                    ctxt, replica_id, instance,
+                    updated_task_info)
+
     @task_synchronized
     def task_completed(self, ctxt, task_id, task_info):
         LOG.info("Task completed: %s", task_id)
@@ -450,12 +475,19 @@ class ConductorServerEndpoint(object):
         task = db_api.get_task(ctxt, task_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
-        with lockutils.lock(execution.action_id):
+        action_id = execution.action_id
+        with lockutils.lock(action_id):
             LOG.info("Setting instance %(instance)s "
                      "action info: %(task_info)s",
                      {"instance": task.instance, "task_info": task_info})
             updated_task_info = db_api.set_transfer_action_info(
-                ctxt, execution.action_id, task.instance, task_info)
+                ctxt, action_id, task.instance, task_info)
+
+            if (task.task_type ==
+                    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS):
+                self._update_replica_volumes_info(
+                    ctxt, execution.action_id, task.instance,
+                    updated_task_info)
 
             if execution.status == constants.EXECUTION_STATUS_RUNNING:
                 self._start_pending_tasks(

+ 1 - 0
coriolis/constants.py

@@ -26,6 +26,7 @@ TASK_TYPE_SHUTDOWN_INSTANCE = "SHUTDOWN_INSTANCE"
 TASK_TYPE_DEPLOY_REPLICA_INSTANCE = "DEPLOY_REPLICA_INSTANCE"
 TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS = "CREATE_REPLICA_DISK_SNAPSHOTS"
 TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS = "DELETE_REPLICA_DISK_SNAPSHOTS"
+TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS = "RESTORE_REPLICA_DISK_SNAPSHOTS"
 
 
 PROVIDER_TYPE_IMPORT = 1

+ 4 - 1
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -37,6 +37,9 @@ def upgrade(migrate_engine):
                           sqlalchemy.ForeignKey(
                               'base_transfer_action.base_id'),
                           primary_key=True),
+        sqlalchemy.Column("replica_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey(
+                              'replica.id'), nullable=True),
         mysql_engine='InnoDB',
         mysql_charset='utf8'
     )
@@ -131,8 +134,8 @@ def upgrade(migrate_engine):
 
     tables = (
         base_transfer_action,
-        migration,
         replica,
+        migration,
         tasks_execution,
         task,
         task_progress_update,

+ 11 - 6
coriolis/db/sqlalchemy/models.py

@@ -112,8 +112,8 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
     }
 
 
-class Migration(BaseTransferAction):
-    __tablename__ = 'migration'
+class Replica(BaseTransferAction):
+    __tablename__ = 'replica'
 
     id = sqlalchemy.Column(
         sqlalchemy.String(36),
@@ -121,18 +121,23 @@ class Migration(BaseTransferAction):
             'base_transfer_action.base_id'), primary_key=True)
 
     __mapper_args__ = {
-        'polymorphic_identity': 'migration',
+        'polymorphic_identity': 'replica',
     }
 
 
-class Replica(BaseTransferAction):
-    __tablename__ = 'replica'
+class Migration(BaseTransferAction):
+    __tablename__ = 'migration'
 
     id = sqlalchemy.Column(
         sqlalchemy.String(36),
         sqlalchemy.ForeignKey(
             'base_transfer_action.base_id'), primary_key=True)
+    replica_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('replica.id'), nullable=True)
+    replica = orm.relationship(
+        Replica, backref=orm.backref("migrations"), foreign_keys=[replica_id])
 
     __mapper_args__ = {
-        'polymorphic_identity': 'replica',
+        'polymorphic_identity': 'migration',
     }

+ 66 - 8
coriolis/providers/openstack/__init__.py

@@ -137,12 +137,22 @@ def _extend_volume(cinder, volume_id, new_size):
 
 
 @utils.retry_on_error()
-def _create_volume(cinder, size, name, image_ref=None):
-    volume_size_gb = math.ceil(size / units.Gi)
+def _get_volume_from_snapshot(cinder, snapshot_id):
+    snapshot = cinder.volume_snapshots.get(snapshot_id)
+    return cinder.volumes.get(snapshot.volume_id)
+
+
+@utils.retry_on_error()
+def _create_volume(cinder, size, name, image_ref=None, snapshot_id=None):
+    if snapshot_id:
+        volume_size_gb = None
+    else:
+        volume_size_gb = math.ceil(size / units.Gi)
     return cinder.volumes.create(
         size=volume_size_gb,
         name=name,
-        imageRef=image_ref)
+        imageRef=image_ref,
+        snapshot_id=snapshot_id)
 
 
 @utils.retry_on_error()
@@ -178,12 +188,20 @@ def _wait_for_volume_snapshot(cinder, snapshot_id,
     snapshots = cinder.volume_snapshots.findall(id=snapshot_id)
 
     if not snapshots:
+        if expected_status == 'deleted':
+            return
         raise exception.CoriolisException("Volume snapshot not found")
     snapshot = snapshots[0]
 
     while snapshot.status not in [expected_status, 'error']:
         time.sleep(2)
-        snapshot = cinder.volume_snapshots.get(snapshot.id)
+        if expected_status == 'deleted':
+            snapshots = cinder.volume_snapshots.findall(id=snapshot_id)
+            if not snapshots:
+                return
+            snapshot = snapshots[0]
+        else:
+            snapshot = cinder.volume_snapshots.get(snapshot.id)
     if snapshot.status != expected_status:
         raise exception.CoriolisException(
             "Volume snapshot is in status: %s" % snapshot.status)
@@ -698,7 +716,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
 
             try:
                 for i, volume in enumerate(volumes):
-                    _wait_for_volume(cinder, volume.id, 'available')
+                    _wait_for_volume(cinder, volume.id)
 
                     self._event_manager.progress_update(
                         "Attaching volume to worker instance")
@@ -894,12 +912,12 @@ class ImportProvider(base.BaseReplicaImportProvider):
                         "Using previously deployed volume")
 
             for volume in new_volumes:
-                _wait_for_volume(cinder, volume.id, 'available')
+                _wait_for_volume(cinder, volume.id)
 
             return volumes_info
         except:
             for volume in new_volumes:
-                _delete_volume(cinder, volume)
+                _delete_volume(cinder, volume.id)
             raise
 
     def deploy_replica_disks(self, ctxt, connection_info, target_environment,
@@ -1002,7 +1020,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
             volume_info["volume_snapshot_id"] = snapshot.id
 
         for snapshot in snapshots:
-            _wait_for_volume_snapshot(cinder, snapshot.id, 'available')
+            _wait_for_volume_snapshot(cinder, snapshot.id)
 
         return volumes_info
 
@@ -1018,6 +1036,46 @@ class ImportProvider(base.BaseReplicaImportProvider):
             snapshot_id = volume_info.get("volume_snapshot_id")
             if snapshot_id:
                 _delete_volume_snapshot(cinder, snapshot_id)
+                _wait_for_volume_snapshot(cinder, snapshot_id, 'deleted')
+                volume_info["volume_snapshot_id"] = None
+
+    def restore_replica_disk_snapshots(self, ctxt, connection_info,
+                                       volumes_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        self._event_manager.progress_update(
+            "Restoring replica disk snapshots")
+
+        new_volumes = []
+        try:
+            for volume_info in volumes_info:
+                snapshot_id = volume_info.get("volume_snapshot_id")
+                if snapshot_id:
+                    original_volume = _get_volume_from_snapshot(
+                        cinder, snapshot_id)
+
+                    volume_name = original_volume.name
+                    volume = _create_volume(
+                        cinder, None, volume_name, snapshot_id=snapshot_id)
+                    new_volumes.append((volume_info, snapshot_id, volume))
+
+            for volume_info, snapshot_id, volume in new_volumes:
+                old_volume_id = volume_info["volume_id"]
+                _wait_for_volume(cinder, volume.id)
+                _delete_volume_snapshot(cinder, snapshot_id)
+                _wait_for_volume_snapshot(cinder, snapshot_id, 'deleted')
+                _delete_volume(cinder, old_volume_id)
+
+                volume_info["volume_id"] = volume.id
+                volume_info["volume_snapshot_id"] = None
+        except:
+            for _, _, volume in new_volumes:
+                _delete_volume(cinder, volume.id)
+            raise
+
+        return volumes_info
 
 
 class ExportProvider(base.BaseExportProvider):

+ 2 - 0
coriolis/tasks/factory.py

@@ -35,6 +35,8 @@ _TASKS_MAP = {
         replica_tasks.CreateReplicaDiskSnapshotsTask,
     constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
         replica_tasks.DeleteReplicaDiskSnapshotsTask,
+    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
+        replica_tasks.RestoreReplicaDiskSnapshotsTask,
 }
 
 

+ 17 - 0
coriolis/tasks/replica_tasks.py

@@ -267,3 +267,20 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
         task_info["volumes_info"] = volumes_info
 
         return task_info
+
+
+class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+
+        volumes_info = task_info["volumes_info"]
+
+        volumes_info = provider.restore_replica_disk_snapshots(
+            ctxt, connection_info, volumes_info)
+
+        task_info["volumes_info"] = volumes_info
+
+        return task_info