Просмотр исходного кода

tests: Adds LUKS migration-key workflow

Simulate real pre-migration flow: add migration key via volume master
key, pass only the migration passphrase to Coriolis.
Claudiu Belu 1 неделя назад
Родитель
Сommit
35a286d07d

+ 100 - 15
coriolis/tests/integration/deployments/test_luks_osmorphing.py

@@ -26,13 +26,29 @@ from coriolis.tests.integration import harness as integration_harness
 from coriolis.tests.integration import osmorphing_utils
 
 _LUKS_PASSPHRASE = "it-luks-encrypted"
+_MIGRATION_PASSPHRASE = "it-luks-migratable"
+_TPM_SLOT_SECRET = "fake-tpm-sealed-secret"
 
 
 class _LUKSOSMorphingMixin:
+    """Mixin for LUKS OS morphing integration tests.
+
+    Simulates the pre-migration workflow during setUp: the source disk is
+    encrypted with an original key (which may be TPM-sealed on a real VM),
+    then a separate migration key is added via the volume master key without
+    needing the original passphrase. Only the migration passphrase is given
+    to Coriolis, matching what would happen in an actual migration.
+
+    After OS morphing:
+    - The original keyslot is intact (Coriolis never touches it).
+    - The migration passphrase is written to /etc/luks/coriolis_<dev>.key, so
+      the firstboot script can remove the keyslot autonomously.
+    """
 
     # Extra space for initramfs-tools and cryptsetup-initramfs packages that
     # the LUKS morphing tools install on top of the base OS image.
     _SCSI_DEBUG_SIZE_MB = 512
+    _CONTAINER_IMAGE = "ubuntu:24.04"
 
     @classmethod
     def setUpClass(cls):
@@ -49,16 +65,59 @@ class _LUKSOSMorphingMixin:
             fh.write(_LUKS_PASSPHRASE)
         self.addCleanup(os.unlink, self._key_file)
 
+        with tempfile.NamedTemporaryFile(
+                mode="w", suffix=".key", delete=False) as fh:
+            self._migration_key_file = fh.name
+            fh.write(_MIGRATION_PASSPHRASE)
+        self.addCleanup(os.unlink, self._migration_key_file)
+
+        with tempfile.NamedTemporaryFile(
+                mode="w", suffix=".key", delete=False) as fh:
+            self._tpm_key_file = fh.name
+            fh.write(_TPM_SLOT_SECRET)
+        self.addCleanup(os.unlink, self._tpm_key_file)
+
         super().setUp()
         self._prepare_src_device()
 
     def _prepare_src_device(self):
         osmorphing_utils.make_luks_device(
-            self._src_device, self._key_file, "ubuntu:24.04")
+            self._src_device, self._key_file, self._CONTAINER_IMAGE)
+
+        # Simulate the real pre-migration step: the disk is already mounted
+        # (the VM is running), so the migration key can be added via the
+        # volume master key without needing the original passphrase (which
+        # may be TPM-sealed, Tang-bound, etc.).
+        with osmorphing_utils.luks_open(
+                self._src_device, self._key_file,
+                disable_keyring=True) as mapper:
+            osmorphing_utils.luks_add_key_from_mapper(
+                mapper, self._src_device, self._migration_key_file)
+
+        # Add a keyslot representing the TPM-sealed key. LUKS fills slots in
+        # ascending order: original (0), migration (1), and this is slot 2.
+        osmorphing_utils.luks_add_key(
+            self._src_device, self._key_file, self._tpm_key_file)
+
+        # Inject a fake systemd-tpm2 token pointing at the TPM keyslot.
+        osmorphing_utils.luks_add_tpm2_token(self._src_device, keyslot_id=2)
+
+        # Overwrite crypttab with the real LUKS UUID but adding
+        # tpm2-device=auto, matching what systemd-cryptenroll sets up on a
+        # TPM-enrolled system. The real UUID is required so
+        # _update_crypttab_keyfile can match it.
+        luks_uuid = osmorphing_utils.get_luks_uuid(self._src_device)
+        osmorphing_utils.write_file_on_luks_device(
+            self._src_device, self._key_file, "etc/crypttab",
+            "luks-%s\tUUID=%s\tnone\tluks,tpm2-device=auto,tpm2-pcrs=7\n"
+            % (luks_uuid, luks_uuid),
+        )
 
+        # Pass only the migration passphrase to Coriolis; the original key
+        # is never shared.
         dest_env = {
             "devices": [self._dst_device],
-            constants.ENCRYPTED_DISKS_PASS: _LUKS_PASSPHRASE,
+            constants.ENCRYPTED_DISKS_PASS: _MIGRATION_PASSPHRASE,
         }
         self._client.transfers.update(
             self._transfer.id,
@@ -98,6 +157,44 @@ class _LUKSOSMorphingMixin:
         )
         self._assert_firstboot_setup()
 
+        # The migration keyslot must still be present after morphing; it is
+        # supposed to be removed on the first VM boot by the firstboot script.
+        self.assertTrue(
+            osmorphing_utils.luks_can_open(
+                self._dst_device, self._migration_key_file),
+            "Migration LUKS keyslot should still be present after OS morphing",
+        )
+
+        # Coriolis writes the migration passphrase to the keyfile on the OS,
+        # so the filesystem can be unlocked and mounted on the first boot, and
+        # the firstboot script can run.
+        dst_basename = os.path.basename(self._dst_device)
+        keyfile_content = osmorphing_utils.read_file_from_luks_device(
+            self._dst_device, self._key_file,
+            "etc/luks/coriolis_%s.key" % dst_basename)
+        self.assertEqual(
+            keyfile_content.strip(), _MIGRATION_PASSPHRASE,
+            "Migration keyfile content does not match migration passphrase",
+        )
+
+        self.assertFalse(
+            osmorphing_utils.luks_has_tpm2_token(self._dst_device),
+            "systemd-tpm2 token not removed from destination after morphing",
+        )
+        self.assertFalse(
+            osmorphing_utils.luks_can_open(
+                self._dst_device, self._tpm_key_file),
+            "TPM2 keyslot was not killed on destination after OS morphing",
+        )
+        crypttab = osmorphing_utils.read_file_from_luks_device(
+            self._dst_device, self._key_file, "etc/crypttab")
+        self.assertIsNotNone(
+            crypttab, "/etc/crypttab not found on destination")
+        self.assertNotIn(
+            "tpm2-", crypttab,
+            "TPM2 options remain in /etc/crypttab after OS morphing",
+        )
+
 
 class LUKSOSMorphingDeploymentTest(
         _LUKSOSMorphingMixin, integration_base.ReplicaIntegrationTestBase):
@@ -119,19 +216,7 @@ class LUKSRockyLinuxOSMorphingDeploymentTest(
     # kernel-core (~150 MB installed) needs extra room on top of the base
     # container image and the other morphing packages.
     _SCSI_DEBUG_SIZE_MB = 777
-
-    def _prepare_src_device(self):
-        osmorphing_utils.make_luks_device(
-            self._src_device, self._key_file, "rockylinux:9")
-
-        dest_env = {
-            "devices": [self._dst_device],
-            constants.ENCRYPTED_DISKS_PASS: _LUKS_PASSPHRASE,
-        }
-        self._client.transfers.update(
-            self._transfer.id,
-            {"destination_environment": dest_env},
-        )
+    _CONTAINER_IMAGE = "rockylinux:9"
 
     def _assert_firstboot_setup(self):
         self._assert_luks_common_firstboot_files()

+ 127 - 9
coriolis/tests/integration/osmorphing_utils.py

@@ -8,6 +8,7 @@ LUKS / OS morphing helpers (loopback, LUKS, bootable VM disk setup)
 """
 
 import contextlib
+import json
 import os
 import subprocess
 import tempfile
@@ -105,25 +106,32 @@ def make_luks_device(device_path, key_file, container_image):
     when configuring initramfs auto-unlock during OS morphing.
     """
     _run([
-        "cryptsetup", "luksFormat", "--batch-mode", "--key-file", key_file,
-        device_path,
+        "cryptsetup", "luksFormat", "--batch-mode", "--type", "luks2",
+        "--key-file", key_file, device_path,
     ])
 
-    luks_uuid = _run(
-        ["cryptsetup", "luksUUID", device_path]).stdout.decode().strip()
+    luks_uuid = get_luks_uuid(device_path)
 
     with luks_open(device_path, key_file) as mapper_path:
         write_os_image_to_disk(mapper_path, container_image)
         _fixup_luks_inner_os(mapper_path, luks_uuid)
 
 
+def get_luks_uuid(device_path):
+    """Return the LUKS UUID of `device_path`."""
+    result = _run(["cryptsetup", "luksUUID", device_path])
+    return result.stdout.decode().strip()
+
+
 @contextlib.contextmanager
-def luks_open(device_path, key_file):
+def luks_open(device_path, key_file, disable_keyring=False):
     mapper_name = "coriolis_luks_setup_%s" % os.path.basename(device_path)
-    _run([
-        "cryptsetup", "luksOpen", "--key-file", key_file, device_path,
-        mapper_name,
-    ])
+    cmd = ["cryptsetup", "luksOpen", "--key-file", key_file]
+    if disable_keyring:
+        cmd.append("--disable-keyring")
+    cmd += [device_path, mapper_name]
+
+    _run(cmd)
 
     try:
         yield "/dev/mapper/%s" % mapper_name
@@ -131,6 +139,116 @@ def luks_open(device_path, key_file):
         _run(["cryptsetup", "luksClose", mapper_name])
 
 
+def luks_can_open(device_path, key_file):
+    """Return True if `key_file` can unlock `device_path`, False otherwise."""
+    result = _run([
+        "cryptsetup", "luksOpen", "--test-passphrase",
+        "--key-file", key_file, device_path,
+    ], check=False)
+
+    return result.returncode == 0
+
+
+def luks_add_tpm2_token(device_path, keyslot_id):
+    """Inject a systemd-tpm2 token into device_path pointing at keyslot_id.
+
+    The token contains no real TPM material, it exists only so the LUKS2
+    header reports a systemd-tpm2 token, letting tests verify that Coriolis
+    removes it during OS morphing.
+    """
+    token = json.dumps(
+        {"type": "systemd-tpm2", "keyslots": [str(keyslot_id)]})
+    # --disable-external-tokens bypasses the libcryptsetup-token-systemd-tpm2
+    # plugin that validates real systemd-tpm2 token fields (tpm2-pcrs, blob,
+    # etc.). Our token is intentionally minimal, just enough for the test to
+    # verify Coriolis removes it during OS morphing.
+    subprocess.run(
+        [
+            "cryptsetup", "token", "import",
+            "--disable-external-tokens", device_path,
+        ],
+        input=token.encode(),
+        stdout=subprocess.DEVNULL,
+        check=True,
+    )
+
+
+def luks_has_tpm2_token(device_path):
+    """Return True if `device_path` has any systemd-tpm2 LUKS2 tokens."""
+    result = _run(["cryptsetup", "luksDump", device_path], check=False)
+    return result.returncode == 0 and b"systemd-tpm2" in result.stdout
+
+
+def write_file_on_luks_device(device_path, key_file, rel_path, content):
+    """Write content into the given rel_path on the given LUKS device.
+
+    Open `device_path` with `key_file`, mount it, and write `content` to
+    `rel_path` inside the filesystem, creating parent directories as needed.
+    """
+    with luks_open(device_path, key_file) as mapper_path:
+        with mounted(mapper_path) as mount_point:
+            full_path = os.path.join(mount_point, rel_path)
+            os.makedirs(os.path.dirname(full_path), exist_ok=True)
+            with open(full_path, "w") as fh:
+                fh.write(content)
+
+
+def read_file_from_luks_device(device_path, key_file, rel_path):
+    """Read the rel_path file from the given LUKS device.
+
+    Open `device_path` with `key_file`, mount read-only, and return the
+    contents of `rel_path`, or None if the path does not exist.
+    """
+    with luks_open(device_path, key_file) as mapper_path:
+        with mounted(mapper_path, read_only=True) as mount_point:
+            full_path = os.path.join(mount_point, rel_path)
+            if not os.path.exists(full_path):
+                return None
+            with open(full_path) as fh:
+                return fh.read()
+
+
+def luks_add_key(device_path, existing_key_file, new_key_file):
+    """Add a new keyslot to `device_path` using `existing_key_file` to auth."""
+    _run([
+        "cryptsetup", "luksAddKey",
+        "--pbkdf-memory", "65536",
+        "--key-file", existing_key_file,
+        device_path, new_key_file,
+    ])
+
+
+def luks_add_key_from_mapper(mapper_path, device_path, new_key_file):
+    """Add a new keyslot to `device_path` to an open dm-crypt device.
+
+    Does not require the original passphrase, the master key is extracted
+    directly from the live, already mounted device via dmsetup.
+    """
+    mapper_name = os.path.basename(mapper_path)
+    result = _run(["dmsetup", "table", "--showkeys", mapper_name])
+    fields = result.stdout.decode().split()
+
+    # dmsetup may prefix the line with "<name>: "; detect by trailing colon.
+    offset = 1 if fields[0].endswith(':') else 0
+
+    # dm-crypt table: <start> <end> crypt <cipher> <key> ...
+    master_key_hex = fields[offset + 4]
+
+    with tempfile.NamedTemporaryFile(delete=False, suffix=".key") as fh:
+        master_key_path = fh.name
+        fh.write(bytes.fromhex(master_key_hex))
+
+    try:
+        _run([
+            "cryptsetup", "luksAddKey",
+            "--pbkdf-memory", "65536",
+            "--master-key-file", master_key_path,
+            device_path, new_key_file,
+        ])
+    finally:
+        os.unlink(master_key_path)
+
+
 def path_exists_on_device(device_path, rel_path):
     """Checks if *rel_path* exists on the filesystem of *device_path*.