Explorar el Código

Add post-transfer disk checksum verification

After each disk is written, compare the source (replicator) and
destination (writer) checksums to catch any data corruption during
transfer. The comparison happens while the writer device is still
acquired so the checksum job can run.

- Client.get_disk_checksum: calls GET /api/v1/dev/{disk}/checksum

- HTTPBackupWriterImpl:
  - _create_checksum_job: calls POST /api/v2/device/{disk}/checksumJob
  - _delete_checksum_job: calls DELETE /api/v2/device/{disk}/checksumJob/{id}
  - _get_checksum_job_status: calls GET /api/v2/device/{disk}/checksumJob/{id}
  - get_disk_checksum: creates checksum job, waits for it to finish, and
    returns the checksum value and algorithm.

- Replicator._verify_disk_checksum: compares both sides, raises on
  algorithm or value mismatch.
Claudiu Belu hace 1 mes
padre
commit
1a9f913ee3

+ 118 - 0
coriolis/providers/backup_writers.py

@@ -31,6 +31,11 @@ opts = [
     cfg.BoolOpt('compress_transfers',
                 default=True,
                 help='Use compression if possible during disk transfers'),
+    cfg.IntOpt('disk_checksum_timeout',
+               default=3600,
+               help='Maximum number of seconds to wait for a disk checksum '
+                    'job to complete on the backup writer. Larger disks may '
+                    'require a higher value.'),
 ]
 CONF.register_opts(opts)
 _CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer"
@@ -66,6 +71,10 @@ _WRITER_ERR_MAP = {
     15: "ERR_OUT_OF_BOUDS",
 }
 
+_CHECKSUM_JOB_POLL_INTERVAL = 15  # seconds between writer checksum job polls
+_CHECKSUM_JOB_FINISHED = "finished"
+_CHECKSUM_JOB_FAILED = "failed"
+
 
 def _disable_lvm2_lvmetad(ssh):
     """Disables lvm2-lvmetad service. This service is responsible
@@ -193,6 +202,18 @@ class BaseBackupWriterImpl(with_metaclass(abc.ABCMeta)):
     def close(self):
         pass
 
+    def get_disk_checksum(self, algorithm, start_offset=0, end_offset=0):
+        """Returns the destination disk checksum, or None if unsupported.
+
+        :param algorithm: Checksumming algorithm to use.
+        :param start_offset: Checksumming starts from this offset.
+        :param end_offset: Checksumming stops at this offset. If it is 0,
+          the end of the disk is considered instead.
+        :returns: dict with 'checksum' and 'algorithm' keys, or None if
+          unsupported.
+        """
+        return None
+
 
 class BaseBackupWriter(with_metaclass(abc.ABCMeta)):
     @abc.abstractmethod
@@ -735,6 +756,103 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
             LOG.info("Waiting for unfinished transfers to complete")
             time.sleep(0.5)
 
+    def _create_checksum_job(self, algorithm, start_offset=0, end_offset=0):
+        """Creates a full-disk checksum job on the writer.
+
+        The device must already be acquired.
+
+        :param algorithm: Checksumming algorithm to use.
+        :param start_offset: Checksumming starts from this offset.
+        :param end_offset: Checksumming stops at this offset. If it is 0,
+          the end of the disk is considered instead.
+        :returns: job ID string.
+        """
+        self._ensure_session()
+        uri = "%s/checksumJob" % self._uri
+        headers = {"X-Client-Token": self._id}
+        body = {
+            "start_offset": start_offset,
+            "end_offset": end_offset,
+            "checksum_algorithm": algorithm,
+        }
+
+        resp = self._session.post(
+            uri, headers=headers, json=body,
+            timeout=CONF.default_requests_timeout)
+        resp.raise_for_status()
+
+        return resp.json()["job_id"]
+
+    def _get_checksum_job_status(self, job_id):
+        """Returns the current status of a writer checksum job."""
+        self._ensure_session()
+        uri = "%s/checksumJob/%s" % (self._uri, job_id)
+
+        resp = self._session.get(
+            uri, timeout=CONF.default_requests_timeout)
+        resp.raise_for_status()
+
+        return resp.json()
+
+    def _delete_checksum_job(self, job_id):
+        """Deletes a writer checksum job."""
+        self._ensure_session()
+        uri = "%s/checksumJob/%s" % (self._uri, job_id)
+
+        resp = self._session.delete(
+            uri, timeout=CONF.default_requests_timeout)
+        resp.raise_for_status()
+
+    def get_disk_checksum(self, algorithm, start_offset=0, end_offset=0):
+        """Computes and returns the checksum of the entire destination disk.
+
+        Must be called while the device is acquired (inside open() context).
+        Flushes all pending writes before starting the checksum job.
+
+        :param algorithm: Checksumming algorithm to use.
+        :param start_offset: Checksumming starts from this offset.
+        :param end_offset: Checksumming stops at this offset. If it is 0,
+          the end of the disk is considered instead.
+        :returns: dict with 'checksum' and 'algorithm' keys.
+        """
+        self._wait_for_queues()
+        if self._exception:
+            raise exception.CoriolisException(
+                "Cannot checksum disk '%s', pending write error: %s" % (
+                    self._path, self._exception))
+
+        timeout = CONF.disk_checksum_timeout
+        deadline = time.monotonic() + timeout
+        job_id = self._create_checksum_job(algorithm, start_offset, end_offset)
+        try:
+            while True:
+                status = self._get_checksum_job_status(job_id)
+                execution_status = status.get("execution_status")
+                if execution_status == _CHECKSUM_JOB_FINISHED:
+                    return {
+                        "checksum": status["checksum_value"],
+                        "algorithm": status["checksum_algorithm"],
+                    }
+
+                if execution_status == _CHECKSUM_JOB_FAILED:
+                    raise exception.CoriolisException(
+                        "Checksum job failed for disk '%s': %s" % (
+                            self._path, status.get("error_message", "")))
+
+                if time.monotonic() >= deadline:
+                    raise exception.CoriolisException(
+                        "Timed out waiting for checksum job for disk '%s' "
+                        "after %d seconds." % (self._path, timeout))
+
+                time.sleep(_CHECKSUM_JOB_POLL_INTERVAL)
+        finally:
+            try:
+                self._delete_checksum_job(job_id)
+            except Exception:
+                LOG.warning(
+                    "Failed to delete checksum job %s for disk %s",
+                    job_id, self._path)
+
     def close(self):
         self._closing = True
         self._wait_for_queues()

+ 58 - 1
coriolis/providers/replicator.py

@@ -204,6 +204,22 @@ class Client(object):
         info.raise_for_status()
         return int(info.headers["Content-Length"])
 
+    @utils.retry_on_error()
+    def get_disk_checksum(self, device):
+        """Returns the total checksum of the given disk.
+
+        :raises HTTPError: with HTTP 409 status if checksumming has not
+          completed yet.
+        :returns: dict with 'checksum' and 'algorithm' keys.
+        """
+        uri = "%s/api/v1/dev/%s/checksum" % (self._base_uri, device)
+
+        resp = self._cli.get(
+            uri, timeout=CONF.replicator.default_requests_timeout)
+        resp.raise_for_status()
+
+        return resp.json()
+
     @utils.retry_on_error()
     def download_chunk(self, disk, chunk):
         diskUri = self.raw_disk_uri(disk)
@@ -768,7 +784,44 @@ class Replicator(object):
                 return vol
         return None
 
-    def replicate_disks(self, source_volumes_info, backup_writer):
+    def _verify_disk_checksum(self, dev_name, destination):
+        """Compares source and destination checksums for a transferred disk.
+
+        Must be called while the device is still acquired on the writer side.
+
+        :raises CoriolisException: if the checksum algorithms do not match, or
+          if the checksums do not match.
+        """
+        self._event_manager.progress_update(
+            "Verifying disk integrity for /dev/%s" % dev_name)
+        source = self._cli.get_disk_checksum(dev_name)
+        end_offset = self._cli.get_disk_size(dev_name)
+        writer = destination.get_disk_checksum(
+            source["algorithm"], end_offset=end_offset)
+        if writer is None:
+            self._event_manager.progress_update(
+                "Disk integrity check skipped for /dev/%s "
+                "(writer does not support checksums)" % dev_name)
+            return
+
+        if source["algorithm"] != writer["algorithm"]:
+            raise exception.CoriolisException(
+                "Checksum algorithm mismatch for disk '%s': "
+                "source=%s, destination=%s" % (
+                    dev_name, source["algorithm"], writer["algorithm"]))
+
+        if source["checksum"] != writer["checksum"]:
+            raise exception.CoriolisException(
+                "Checksum mismatch for disk '%s': "
+                "source=%s, destination=%s" % (
+                    dev_name, source["checksum"], writer["checksum"]))
+
+        self._event_manager.progress_update(
+            "Disk integrity verified for /dev/%s (checksum: %s)" % (
+                dev_name, source["checksum"]))
+
+    def replicate_disks(
+            self, source_volumes_info, backup_writer, verify_checksum=False):
         """
         Fetch the block diff and send it to the backup_writer.
         If the target_is_zeroed parameter is set to True, on initial
@@ -845,6 +898,10 @@ class Replicator(object):
                     total += 1
                     self._event_manager.set_percentage_step(
                         perc_step, total)
+
+                if verify_checksum:
+                    self._verify_disk_checksum(devName, destination)
+
             dst_vol["replica_state"] = state_for_vol
 
         self._repl_state = curr_state

BIN
coriolis/resources/bin/replicator


+ 155 - 0
coriolis/tests/providers/test_backup_writers.py

@@ -1061,6 +1061,161 @@ class HTTPBackupWriterImplTestCase(test_base.CoriolisBaseTestCase):
                              level=logging.ERROR):
             self.assertRaises(exception.CoriolisException, self.writer.close)
 
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
+                       new_callable=mock.PropertyMock)
+    @mock.patch.object(backup_writers, 'CONF')
+    def test__create_checksum_job(self, mock_conf, mock_uri,
+                                  mock_ensure_session):
+        self.writer._set_info(self.info)
+        mock_uri.return_value = "https://host:port/api/v2/device/b64path"
+        self.writer._session = mock.MagicMock()
+        mock_resp = mock.MagicMock()
+        mock_resp.json.return_value = {"job_id": "test-job-id"}
+        self.writer._session.post.return_value = mock_resp
+
+        result = self.writer._create_checksum_job("sha256")
+
+        self.assertEqual("test-job-id", result)
+        self.writer._session.post.assert_called_once_with(
+            "https://host:port/api/v2/device/b64path/checksumJob",
+            headers={"X-Client-Token": self.info["id"]},
+            json={"start_offset": 0, "end_offset": 0,
+                  "checksum_algorithm": "sha256"},
+            timeout=mock_conf.default_requests_timeout)
+        mock_resp.raise_for_status.assert_called_once()
+
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
+                       new_callable=mock.PropertyMock)
+    @mock.patch.object(backup_writers, 'CONF')
+    def test__get_checksum_job_status(self, mock_conf, mock_uri,
+                                      mock_ensure_session):
+        self.writer._set_info(self.info)
+        mock_uri.return_value = "https://host:port/api/v2/device/b64path"
+        self.writer._session = mock.MagicMock()
+        mock_resp = mock.MagicMock()
+        self.writer._session.get.return_value = mock_resp
+
+        result = self.writer._get_checksum_job_status("test-job-id")
+
+        self.assertEqual(result, mock_resp.json.return_value)
+        self.writer._session.get.assert_called_once_with(
+            "https://host:port/api/v2/device/b64path/checksumJob/test-job-id",
+            timeout=mock_conf.default_requests_timeout)
+        mock_resp.raise_for_status.assert_called_once()
+
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
+                       new_callable=mock.PropertyMock)
+    @mock.patch.object(backup_writers, 'CONF')
+    def test__delete_checksum_job(self, mock_conf, mock_uri,
+                                  mock_ensure_session):
+        self.writer._set_info(self.info)
+        mock_uri.return_value = "https://host:port/api/v2/device/b64path"
+        self.writer._session = mock.MagicMock()
+        mock_resp = mock.MagicMock()
+        self.writer._session.delete.return_value = mock_resp
+
+        self.writer._delete_checksum_job("test-job-id")
+
+        self.writer._session.delete.assert_called_once_with(
+            "https://host:port/api/v2/device/b64path/checksumJob/test-job-id",
+            timeout=mock_conf.default_requests_timeout)
+        mock_resp.raise_for_status.assert_called_once()
+
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_delete_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_get_checksum_job_status')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_create_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
+    def test_get_disk_checksum(self, mock_wait, mock_create, mock_status,
+                               mock_delete):
+        self.writer._set_info(self.info)
+        mock_create.return_value = "test-job-id"
+        mock_status.return_value = {
+            "execution_status": "finished",
+            "checksum_value": "abc123",
+            "checksum_algorithm": "sha256",
+        }
+
+        result = self.writer.get_disk_checksum(
+            "sha256",
+            mock.sentinel.start_offset,
+            mock.sentinel.end_offset,
+        )
+
+        self.assertEqual({"checksum": "abc123", "algorithm": "sha256"}, result)
+        mock_wait.assert_called_once()
+        mock_create.assert_called_once_with(
+            "sha256",
+            mock.sentinel.start_offset,
+            mock.sentinel.end_offset,
+        )
+        mock_delete.assert_called_once_with("test-job-id")
+
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_delete_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_get_checksum_job_status')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_create_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
+    def test_get_disk_checksum_job_failed(self, mock_wait, mock_create,
+                                          mock_status, mock_delete):
+        self.writer._set_info(self.info)
+        mock_create.return_value = "test-job-id"
+        mock_status.return_value = {
+            "execution_status": "failed",
+            "error_message": "disk error",
+        }
+
+        self.assertRaises(
+            exception.CoriolisException,
+            self.writer.get_disk_checksum, "sha256")
+        mock_delete.assert_called_once_with("test-job-id")
+
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_delete_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_create_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
+    def test_get_disk_checksum_write_error(self, mock_wait, mock_create,
+                                           mock_delete):
+        self.writer._set_info(self.info)
+        self.writer._exception = exception.CoriolisException("write failed")
+
+        self.assertRaises(
+            exception.CoriolisException,
+            self.writer.get_disk_checksum, "sha256")
+        mock_create.assert_not_called()
+        mock_delete.assert_not_called()
+
+    @mock.patch('coriolis.providers.backup_writers.time.sleep')
+    @mock.patch('coriolis.providers.backup_writers.time.monotonic')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_delete_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_get_checksum_job_status')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl,
+                       '_create_checksum_job')
+    @mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
+    def test_get_disk_checksum_timeout(self, mock_wait, mock_create,
+                                       mock_status, mock_delete,
+                                       mock_monotonic, mock_sleep):
+        self.writer._set_info(self.info)
+        mock_create.return_value = "test-job-id"
+        mock_status.return_value = {"execution_status": "running"}
+        # First call sets the deadline; second call (after the poll) exceeds it
+        mock_monotonic.side_effect = [0, 3601]
+
+        self.assertRaises(
+            exception.CoriolisException,
+            self.writer.get_disk_checksum, "sha256")
+        mock_delete.assert_called_once_with("test-job-id")
+
 
 class HTTPBackupWriterBootstrapperTestcase(test_base.CoriolisBaseTestCase):
     """Test suite for the Coriolis HTTPBackupWriterBootstrapper class."""

+ 93 - 2
coriolis/tests/providers/test_replicator.py

@@ -312,6 +312,22 @@ class ClientTestCase(test_base.CoriolisBaseTestCase):
         )
         self.mock_response.raise_for_status.assert_called_once()
 
+    def test_get_disk_checksum(self):
+        self.client._cli.get.return_value = self.mock_response
+
+        original_get_disk_checksum = testutils.get_wrapped_function(
+            self.client.get_disk_checksum)
+
+        result = original_get_disk_checksum(self.client, self.device)
+
+        self.assertEqual(result, self.mock_response.json.return_value)
+        self.client._cli.get.assert_called_once_with(
+            "https://%s:%s/api/v1/dev/%s/checksum" % (
+                self.ip, self.port, self.device),
+            timeout=replicator_module.CONF.replicator.default_requests_timeout,
+        )
+        self.mock_response.raise_for_status.assert_called_once()
+
 
 class ReplicatorTestCase(test_base.CoriolisBaseTestCase):
     """Test suite for the Coriolis Replicator class."""
@@ -1090,8 +1106,82 @@ class ReplicatorTestCase(test_base.CoriolisBaseTestCase):
 
         self.assertIsNone(result)
 
+    def test__verify_disk_checksum(self):
+        checksum = {"checksum": "abc123", "algorithm": "sha256"}
+        self.replicator._cli.get_disk_checksum.return_value = checksum
+        mock_destination = mock.MagicMock()
+        mock_destination.get_disk_checksum.return_value = checksum
+
+        self.replicator._verify_disk_checksum("sdb", mock_destination)
+
+        self.replicator._cli.get_disk_checksum.assert_called_once_with("sdb")
+        mock_get_disk_size = self.replicator._cli.get_disk_size
+        mock_get_disk_size.assert_called_once_with("sdb")
+        mock_destination.get_disk_checksum.assert_called_once_with(
+            "sha256", end_offset=mock_get_disk_size.return_value)
+
+    def test__verify_disk_checksum_value_mismatch(self):
+        self.replicator._cli.get_disk_checksum.return_value = {
+            "checksum": "abc123", "algorithm": "sha256"}
+        mock_destination = mock.MagicMock()
+        mock_destination.get_disk_checksum.return_value = {
+            "checksum": "different", "algorithm": "sha256"}
+
+        self.assertRaises(
+            exception.CoriolisException,
+            self.replicator._verify_disk_checksum,
+            "sdb", mock_destination)
+
+    def test__verify_disk_checksum_algorithm_mismatch(self):
+        self.replicator._cli.get_disk_checksum.return_value = {
+            "checksum": "abc123", "algorithm": "sha256"}
+        mock_destination = mock.MagicMock()
+        mock_destination.get_disk_checksum.return_value = {
+            "checksum": "abc123", "algorithm": "xxhash"}
+
+        self.assertRaises(
+            exception.CoriolisException,
+            self.replicator._verify_disk_checksum,
+            "sdb", mock_destination)
+
+    def test__verify_disk_checksum_not_supported(self):
+        self.replicator._cli.get_disk_checksum.return_value = {
+            "checksum": "abc123", "algorithm": "sha256"}
+        mock_destination = mock.MagicMock()
+        mock_destination.get_disk_checksum.return_value = None
+
+        self.replicator._verify_disk_checksum("sdb", mock_destination)
+
+        self.replicator._cli.get_disk_checksum.assert_called_once_with("sdb")
+        mock_get_disk_size = self.replicator._cli.get_disk_size
+        mock_get_disk_size.assert_called_once_with("sdb")
+        mock_destination.get_disk_checksum.assert_called_once_with(
+            "sha256", end_offset=mock_get_disk_size.return_value)
+
+    @mock.patch.object(replicator_module.Replicator, '_verify_disk_checksum')
+    @mock.patch.object(replicator_module, 'Client')
+    def test_replicate_disks_calls_verify_checksum(
+            self, mock_Client, mock_verify):
+        self.replicator._cli = mock_Client.return_value
+        self.replicator._cli.get_changes.return_value = [
+            {'length': 100, 'offset': 0}]
+        self.replicator._volumes_info = [
+            {"disk_id": "test_disk", "disk_path": "/dev/sdb"}]
+        source_volumes_info = [
+            {"disk_id": "test_disk", "disk_path": "/dev/sdb"}]
+        self.replicator._repl_state = ['non-empty']
+        mock_destination = mock.MagicMock(spec=['seek', 'write'])
+        self.backup_writer.open.return_value.__enter__.return_value = (
+            mock_destination)
+
+        self.replicator.replicate_disks(
+            source_volumes_info, self.backup_writer, True)
+
+        mock_verify.assert_called_once_with("sdb", mock_destination)
+
+    @mock.patch.object(replicator_module.Replicator, '_verify_disk_checksum')
     @mock.patch.object(replicator_module, 'Client')
-    def test_replicate_disks(self, mock_Client):
+    def test_replicate_disks(self, mock_Client, mock_verify):
         self.replicator._cli = mock_Client.return_value
         self.replicator._cli.get_changes.return_value = [
             {'length': 100, 'offset': 0}, {'length': 200, 'offset': 100}]
@@ -1127,10 +1217,11 @@ class ReplicatorTestCase(test_base.CoriolisBaseTestCase):
                           self.replicator.replicate_disks,
                           source_volumes_info, self.backup_writer)
 
+    @mock.patch.object(replicator_module.Replicator, '_verify_disk_checksum')
     @mock.patch.object(replicator_module.Replicator, '_find_vol_state')
     @mock.patch.object(replicator_module, 'Client')
     def test_replicate_disks_initial_sync(self, mock_Client,
-                                          mock_find_vol_state):
+                                          mock_find_vol_state, mock_verify):
         self.replicator._cli = mock_Client.return_value
 
         self.replicator._cli.get_changes.return_value = [