Просмотр исходного кода

Add LUKS disk encryption support for Debian / Ubuntu OS morphing

The bulk of the work lives in the new `LinuxLUKSMixin` class
(`osmount/luks_mixin.py`), which is then included in `BaseLinuxOSMountTools`:

- `mount_os()`: check `osmorphing_info["encrypted_disks_passphrase"]` for each
  block device. confirmed LUKS containers are opened via `cryptsetup luksOpen`
  and the resulting `/dev/mapper/<name>` path is used in place of the raw device.
  `dismount_os()` closes them again after all filesystems have been unmounted.

- `remove_encryption_artifacts`: after OS morphing, stale TPM2 LUKS tokens and
  their keyslots are killed and the corresponding `tpm2-*` options are stripped
  from `/etc/crypttab`. The source TPM does not exist on the destination, so
  leaving these in place would cause the initramfs to hang or fail on first boot.

- `install_encryption_firstboot_setup`: a temporary migration keyfile is injected
  into the guest, `/etc/crypttab` is updated to reference it, the initramfs is
  rebuilt so the migrated VM can boot, GRUB is patched to use the crypttab mapper
  names instead of the osmount-time names, and a systemd one-shot service is
  installed to re-enroll TPM2 and remove the migration keyfile on the first boot
  of the destination VM.

The firstboot shell script itself lives in
`coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh` and
targets `update-initramfs`-based systems (Debian / Ubuntu).
Claudiu Belu 1 месяц назад
Родитель
Сommit
afa6762389

+ 2 - 0
coriolis/constants.py

@@ -398,3 +398,5 @@ USER_SCRIPT_PHASES = [
     PHASE_OSMORPHING_POST_OS_MOUNT,
     PHASE_REPLICA_FIRST_BOOT,
 ]
+
+ENCRYPTED_DISKS_PASS = "encrypted_disks_passphrase"

+ 5 - 1
coriolis/osmorphing/manager.py

@@ -137,7 +137,8 @@ def morph_image(origin_provider, destination_provider, connection_info,
     # instantiate and run OSMount tools:
     os_mount_tools = osmount_factory.get_os_mount_tools(
         os_type, connection_info, event_manager, ignore_devices,
-        CONF.default_osmorphing_operation_timeout)
+        CONF.default_osmorphing_operation_timeout,
+        osmorphing_info=osmorphing_info)
 
     proxy_settings = _get_proxy_settings()
     os_mount_tools.set_proxy(proxy_settings)
@@ -317,3 +318,6 @@ def _morph_image(origin_provider, destination_provider, connection_info,
             user_script, script_idx, user_provided=True)
     if not first_boot_user_scripts:
         event_manager.progress_update('No first-boot user script specified')
+
+    os_mount_tools.remove_encryption_artifacts(os_root_dir)
+    os_mount_tools.install_encryption_firstboot_setup(os_root_dir)

+ 14 - 2
coriolis/osmorphing/osmount/base.py

@@ -14,6 +14,7 @@ from oslo_log import log as logging
 from six import with_metaclass
 
 from coriolis import exception
+from coriolis.osmorphing.osmount import luks_mixin
 from coriolis import utils
 
 LOG = logging.getLogger(__name__)
@@ -24,12 +25,13 @@ MAJOR_COLUMN_INDEX = 4
 class BaseOSMountTools(object, with_metaclass(abc.ABCMeta)):
 
     def __init__(self, connection_info, event_manager, ignore_devices,
-                 operation_timeout):
+                 operation_timeout, osmorphing_info=None):
         self._event_manager = event_manager
         self._ignore_devices = ignore_devices
         self._environment = {}
         self._connection_info = connection_info
         self._osmount_operation_timeout = operation_timeout
+        self._osmorphing_info = osmorphing_info or {}
         self._connect()
 
     @abc.abstractmethod
@@ -58,6 +60,12 @@ class BaseOSMountTools(object, with_metaclass(abc.ABCMeta)):
     def set_proxy(self, proxy_settings):
         pass
 
+    def remove_encryption_artifacts(self, os_root_dir):
+        pass
+
+    def install_encryption_firstboot_setup(self, os_root_dir):
+        pass
+
     def get_environment(self):
         return self._environment
 
@@ -122,7 +130,7 @@ class BaseSSHOSMountTools(BaseOSMountTools):
         return self._ssh
 
 
-class BaseLinuxOSMountTools(BaseSSHOSMountTools):
+class BaseLinuxOSMountTools(luks_mixin.LinuxLUKSMixin, BaseSSHOSMountTools):
     def _get_pvs(self):
         out = self._exec_cmd("sudo pvdisplay -c").splitlines()
         LOG.debug("Output of 'pvdisplay -c' command: %s", out)
@@ -560,6 +568,8 @@ class BaseLinuxOSMountTools(BaseSSHOSMountTools):
                 "sudo ls -1 %s*" % volume_dev).splitlines()
         LOG.debug("All simple devices to scan: %s", dev_paths)
 
+        self._unlock_luks_devices(dev_paths)
+
         lvm_dev_paths = []
         self._check_vgs()
         vgs = self._get_vgs()
@@ -643,6 +653,8 @@ class BaseLinuxOSMountTools(BaseSSHOSMountTools):
         self._exec_cmd(
             'mountpoint -q %s && sudo umount -R %s' % (root_dir, root_dir))
 
+        self._close_luks_devices()
+
     def set_proxy(self, proxy_settings):
         url = proxy_settings.get('url')
         if not url:

+ 3 - 2
coriolis/osmorphing/osmount/factory.py

@@ -16,7 +16,8 @@ LOG = logging.getLogger(__name__)
 
 
 def get_os_mount_tools(os_type, connection_info, event_manager,
-                       ignore_devices, operation_timeout):
+                       ignore_devices, operation_timeout,
+                       osmorphing_info=None):
     os_mount_tools = {constants.OS_TYPE_LINUX: [ubuntu.UbuntuOSMountTools,
                                                 redhat.RedHatOSMountTools,
                                                 suse.SUSEOSMountTools],
@@ -28,7 +29,7 @@ def get_os_mount_tools(os_type, connection_info, event_manager,
     for cls in os_mount_tools.get(os_type,
                                   itertools.chain(*os_mount_tools.values())):
         tools = cls(connection_info, event_manager, ignore_devices,
-                    operation_timeout)
+                    operation_timeout, osmorphing_info=osmorphing_info)
         LOG.debug("Testing OS mount tools: %s", cls.__name__)
         if tools.check_os():
             return tools

+ 587 - 0
coriolis/osmorphing/osmount/luks_mixin.py

@@ -0,0 +1,587 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import contextlib
+import json
+import os
+import re
+
+from oslo_log import log as logging
+
+from coriolis import constants
+from coriolis import exception
+from coriolis import utils
+
+LOG = logging.getLogger(__name__)
+
+_LUKS_KEYFILE_DIR = "/etc/luks"
+
+_FIRSTBOOT_SCRIPT_PATH = "/usr/local/sbin/coriolis-luks-firstboot.sh"
+_SYSTEMD_UNIT_PATH = "/etc/systemd/system/coriolis-luks-firstboot.service"
+
+_RESOURCES_DIR = os.path.join(os.path.dirname(__file__), "resources")
+
+
+def _load_script(filename):
+    with open(os.path.join(_RESOURCES_DIR, filename)) as fh:
+        return fh.read()
+
+
+_LUKS_FIRSTBOOT_SCRIPTS = {
+    "update-initramfs": _load_script("luks_firstboot_initramfs_tools.sh"),
+}
+
+_SYSTEMD_UNIT = """\
+[Unit]
+Description=Coriolis LUKS migration firstboot cleanup
+After=local-fs.target
+ConditionPathExists=/etc/luks
+
+[Service]
+Type=oneshot
+ExecStart=/usr/local/sbin/coriolis-luks-firstboot.sh
+RemainAfterExit=yes
+StandardOutput=journal+console
+StandardError=journal+console
+
+[Install]
+WantedBy=multi-user.target
+"""
+
+
+class LinuxLUKSMixin:
+    """Mixin providing LUKS-related methods for BaseLinuxOSMountTools.
+
+    Expects the consuming class to provide ``self._ssh``, ``self._exec_cmd()``,
+    and ``self._osmorphing_info``.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._luks_opened = []
+
+    def _unlock_luks_devices(self, dev_paths):
+        """Open any LUKS-encrypted devices listed in dev_paths in-place.
+
+        Reads osmorphing_info["encrypted_disks_passphrase"], a single
+        passphrase used for all LUKS-encrypted devices. Non-LUKS devices are
+        silently skipped.
+
+        `dev_paths` entries are replaced in-place with the resulting
+        `/dev/mapper/<name>` path for any device that is unlocked.
+
+        :returns: list of (mapper_name, original_dev_path) pairs opened,
+          to be used for cleanup and further processing.
+        """
+        passphrase = self._osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS)
+
+        try:
+            for i, dev_path in enumerate(dev_paths):
+                mapper_path = self._unlock_luks_device(dev_path, passphrase)
+                if mapper_path is None:
+                    continue
+
+                dev_paths[i] = mapper_path
+                self._luks_opened.append(
+                    (os.path.basename(mapper_path), dev_path))
+        except Exception:
+            self._close_luks_devices()
+            raise
+
+    def _unlock_luks_device(self, dev_path, passphrase):
+        """Open a single LUKS-encrypted device.
+
+        :raises CoriolisException: if the device is LUKS-encrypted but no
+          passphrase was provided.
+        :returns: the device mapper name, or None if the device is not LUKS
+          and no passphrase was given.
+        """
+        is_luks = self._is_luks(dev_path)
+
+        if is_luks and not passphrase:
+            raise exception.CoriolisException(
+                f"{dev_path} is LUKS-encrypted, but no passphrase is "
+                "provided.")
+        if not is_luks:
+            LOG.debug(
+                "Device '%s' is not a LUKS container; skipping.", dev_path)
+            return None
+
+        mapper_name = "coriolis_%s" % os.path.basename(dev_path)
+        key_path = "/tmp/%s.key" % mapper_name
+        with self._auth_luks(passphrase, key_path):
+            self._exec_cmd(
+                "sudo cryptsetup luksOpen --disable-keyring "
+                "--key-file %s %s %s" % (key_path, dev_path, mapper_name))
+
+        mapper_path = "/dev/mapper/%s" % mapper_name
+        LOG.info("Unlocked LUKS device '%s' as '%s'", dev_path, mapper_path)
+
+        return mapper_path
+
+    def _is_luks(self, dev_path):
+        try:
+            self._exec_cmd("sudo cryptsetup isLuks %s" % dev_path)
+            return True
+        except exception.SSHCommandNotFoundException:
+            LOG.warn("cryptsetup missing from OS morpher; cannot check if "
+                     "device is LUKS-encrypted.")
+        except Exception:
+            # if it's not LUKS, we'll get exit code 1.
+            # The exception is already logged in self._exec_cmd.
+            pass
+
+        return False
+
+    def _close_luks_devices(self):
+        """Close any LUKS mapper devices opened by _unlock_luks_devices."""
+        for mapper_name, _ in self._luks_opened:
+            self._exec_cmd(
+                "sudo cryptsetup luksClose %s || true" % mapper_name)
+
+        self._luks_opened = []
+
+    def _write_remote_file(self, dest_path, content, mode=None):
+        """Write content to dest_path on the remote host via a temp file."""
+        tmp = self._exec_cmd("mktemp").strip()
+        utils.write_ssh_file(self._ssh, tmp, content.encode("utf-8"))
+        if mode is not None:
+            self._exec_cmd(
+                "sudo mv %s %s && sudo chmod %s %s" % (
+                    tmp, dest_path, mode, dest_path))
+        else:
+            self._exec_cmd("sudo mv %s %s" % (tmp, dest_path))
+
+    @contextlib.contextmanager
+    def _auth_luks(self, passphrase, key_path):
+        """Write passphrase to key_path on the remote host; yield; delete."""
+        self._write_remote_file(key_path, passphrase)
+        try:
+            yield key_path
+        finally:
+            self._exec_cmd("sudo rm -f %s" % key_path)
+
+    def _get_tpm2_token_info(self, dev_path):
+        """Return list of (token_id, keyslot_id) pairs for systemd-tpm2 tokens.
+
+        Returns an empty list for LUKS1 devices, which do not support tokens.
+        """
+        try:
+            raw = self._exec_cmd(
+                "sudo cryptsetup luksDump --dump-json-metadata %s" % dev_path)
+            header = json.loads(raw)
+        except Exception:
+            LOG.warning(
+                "Could not dump LUKS header for '%s': %s",
+                dev_path, utils.get_exception_details())
+            return []
+
+        results = []
+        for token_id, token in header.get("tokens", {}).items():
+            if token.get("type") != "systemd-tpm2":
+                continue
+
+            for keyslot_id in token.get("keyslots", []):
+                results.append((token_id, keyslot_id))
+
+        return results
+
+    def _remove_tpm2_tokens(self, dev_path, passphrase):
+        """Remove systemd-tpm2 tokens and kill their keyslots from dev_path.
+
+        Token removal modifies only the LUKS2 header metadata. Keyslot
+        removal is authenticated via the migration passphrase.
+        """
+        token_info = self._get_tpm2_token_info(dev_path)
+        if not token_info:
+            return
+
+        for token_id, keyslot_id in token_info:
+            try:
+                self._exec_cmd(
+                    "sudo cryptsetup token remove --token-id %s %s" % (
+                        token_id, dev_path))
+                LOG.info(
+                    "Removed systemd-tpm2 token %s from '%s'",
+                    token_id, dev_path)
+            except Exception:
+                LOG.warning(
+                    "Failed to remove TPM2 token %s from '%s': %s",
+                    token_id, dev_path, utils.get_exception_details())
+                continue
+
+            key_path = "/tmp/coriolis_%s.key" % os.path.basename(dev_path)
+            try:
+                with self._auth_luks(passphrase, key_path):
+                    self._exec_cmd(
+                        "sudo cryptsetup luksKillSlot --key-file %s %s %s" % (
+                            key_path, dev_path, keyslot_id))
+                LOG.info(
+                    "Killed TPM2 keyslot %s from '%s'", keyslot_id, dev_path)
+            except Exception:
+                LOG.warning(
+                    "Failed to kill TPM2 keyslot %s from '%s': %s",
+                    keyslot_id, dev_path, utils.get_exception_details())
+
+    def _transform_crypttab(self, os_root_dir, transform):
+        """Apply transform to each non-comment entry in /etc/crypttab.
+
+        transform(parts) receives the split fields [name, device, keyfile,
+        options] and returns a modified parts list to replace the line, or
+        None to leave it unchanged.
+
+        Returns True if the file was written back, False if nothing changed or
+        the file does not exist.
+        """
+        crypttab_path = os.path.join(os_root_dir, "etc/crypttab")
+        if not utils.test_ssh_path(self._ssh, crypttab_path):
+            return False
+
+        content = utils.read_ssh_file(self._ssh, crypttab_path).decode("utf-8")
+        new_lines = []
+        changed = False
+
+        for line in content.splitlines(keepends=True):
+            stripped = line.strip()
+            if not stripped or stripped.startswith("#"):
+                new_lines.append(line)
+                continue
+
+            parts = stripped.split(None, 3)
+            new_parts = transform(parts)
+            if new_parts is None:
+                new_lines.append(line)
+            else:
+                new_lines.append("\t".join(new_parts) + "\n")
+                changed = True
+
+        if not changed:
+            return False
+
+        self._write_remote_file(crypttab_path, "".join(new_lines))
+        return True
+
+    def _remove_tpm2_crypttab_options(self, os_root_dir):
+        """Strip tpm2-* options from /etc/crypttab in the mounted OS.
+
+        Prevents the initramfs from attempting TPM2 unsealing on the target,
+        which would fail because the source TPM is not present there.
+        """
+        def _strip_tpm2(parts):
+            if len(parts) < 4:
+                return None
+
+            opts = [
+                o for o in parts[3].split(",") if not o.startswith("tpm2-")
+            ]
+            new_opts = ",".join(opts)
+            if new_opts == parts[3]:
+                return None
+
+            return [parts[0], parts[1], parts[2], new_opts]
+
+        if self._transform_crypttab(os_root_dir, _strip_tpm2):
+            LOG.info(
+                "Removed TPM2 options from /etc/crypttab in '%s'", os_root_dir)
+
+    def remove_encryption_artifacts(self, os_root_dir):
+        """Remove stale TPM2 tokens, kill their keyslots, and strip tpm2-*
+        options from /etc/crypttab.
+
+        Called after OS morphing, before closing LUKS devices. The source
+        TPM does not exist on the destination, leaving its token and crypttab
+        options in place would cause the initramfs to hang or fail on first
+        boot when it tries and fails to unseal the key.
+        """
+        if not self._luks_opened:
+            return
+
+        self._event_manager.progress_update(
+            "Removing stale TPM2 LUKS artifacts")
+
+        passphrase = self._osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS)
+        for _, dev_path in self._luks_opened:
+            self._remove_tpm2_tokens(dev_path, passphrase)
+
+        self._remove_tpm2_crypttab_options(os_root_dir)
+
+    def _get_migration_keyfile_path(self, dev_path):
+        return "%s/coriolis_%s.key" % (
+            _LUKS_KEYFILE_DIR, os.path.basename(dev_path))
+
+    def _get_luks_uuid(self, dev_path):
+        return self._exec_cmd(
+            "sudo cryptsetup luksUUID %s" % dev_path).strip()
+
+    def _update_crypttab_keyfile(self, os_root_dir, uuid_to_keyfile):
+        """Update the keyfile column in crypttab for matching LUKS UUIDs."""
+        def _set_keyfile(parts):
+            if len(parts) < 2:
+                return None
+
+            m = (re.match(r"UUID=([0-9a-f-]+)", parts[1], re.IGNORECASE) or
+                 re.match(r".*/by-uuid/([0-9a-f-]+)", parts[1], re.IGNORECASE))
+            if not m:
+                return None
+
+            keyfile = uuid_to_keyfile.get(m.group(1))
+            if keyfile is None:
+                return None
+
+            while len(parts) < 4:
+                parts.append("")
+
+            parts[2] = keyfile
+            # cryptsetup-initramfs (Ubuntu 22.04+) only embeds crypttab
+            # entries in the initramfs when the device is verifiable at build
+            # time OR when the 'initramfs' option is present. Inside a chroot
+            # (no udev, no /dev/disk/by-uuid/), verification always fails, so
+            # we force-add 'initramfs' here.
+            opts_list = [o for o in parts[3].split(",") if o]
+            if "initramfs" not in opts_list:
+                opts_list.append("initramfs")
+
+            parts[3] = ",".join(opts_list)
+
+            return parts
+
+        if not self._transform_crypttab(os_root_dir, _set_keyfile):
+            raise exception.CoriolisException(
+                "No /etc/crypttab entries matched LUKS UUIDs in '%s'; "
+                "cannot configure initramfs auto-unlock." % os_root_dir)
+
+        LOG.info("Updated crypttab keyfile entries in '%s'", os_root_dir)
+
+    def _write_migration_keyfiles(self, os_root_dir):
+        """Write migration keyfiles into the OS and update crypttab."""
+        passphrase = self._osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS)
+        if not passphrase or not self._luks_opened:
+            return
+
+        keyfile_dir = os.path.join(os_root_dir, _LUKS_KEYFILE_DIR.lstrip("/"))
+        self._exec_cmd(
+            "sudo mkdir -p %s && sudo chmod 700 %s" % (
+                keyfile_dir, keyfile_dir))
+
+        uuid_to_keyfile = {}
+        for _, dev_path in self._luks_opened:
+            luks_uuid = self._get_luks_uuid(dev_path)
+
+            keyfile_path = self._get_migration_keyfile_path(dev_path)
+            abs_path = os.path.join(os_root_dir, keyfile_path.lstrip("/"))
+
+            self._write_remote_file(abs_path, passphrase, mode="400")
+            uuid_to_keyfile[luks_uuid] = keyfile_path
+            LOG.info(
+                "Written migration keyfile for LUKS device '%s' (UUID %s)",
+                dev_path, luks_uuid)
+
+        self._update_crypttab_keyfile(os_root_dir, uuid_to_keyfile)
+
+        initramfs_tool = self._detect_initramfs_tool(os_root_dir)
+        if initramfs_tool == "update-initramfs":
+            self._configure_initramfs_tools_keyfiles(os_root_dir)
+        else:
+            raise exception.CoriolisException(
+                "No initramfs tool found in OS at '%s'; cannot configure "
+                "keyfile-based LUKS auto-unlock." % os_root_dir)
+
+    def _configure_initramfs_tools_keyfiles(self, os_root_dir):
+        """Set KEYFILE_PATTERN in cryptsetup-initramfs conf-hook.
+
+        cryptsetup-initramfs (Debian / Ubuntu) only embeds keyfiles whose paths
+        match KEYFILE_PATTERN. Set it so migration keyfiles are included in the
+        rebuilt initramfs.
+        """
+        hook_dir = os.path.join(os_root_dir, "etc/cryptsetup-initramfs")
+        self._exec_cmd("sudo mkdir -p %s" % hook_dir)
+        hook_abs = os.path.join(hook_dir, "conf-hook")
+        pattern = "%s/coriolis_*.key" % _LUKS_KEYFILE_DIR
+
+        existing = ""
+        if utils.test_ssh_path(self._ssh, hook_abs):
+            existing = utils.read_ssh_file(self._ssh, hook_abs).decode("utf-8")
+
+        # Always append: the default conf-hook has #KEYFILE_PATTERN=
+        # (commented out). Appending sets the active value. The last assignment
+        # wins, overriding any earlier line.
+        new_content = existing + '\nKEYFILE_PATTERN="%s"\n' % pattern
+        self._write_remote_file(hook_abs, new_content)
+        self._exec_cmd(
+            "sudo chown root:root %s && sudo chmod 644 %s" % (
+                hook_abs, hook_abs))
+        LOG.info(
+            "Set KEYFILE_PATTERN in cryptsetup-initramfs conf-hook at '%s'",
+            hook_abs)
+
+    def _detect_initramfs_tool(self, os_root_dir):
+        initramfs_bins = ["usr/sbin/update-initramfs", "sbin/update-initramfs"]
+        for initramfs_bin in initramfs_bins:
+            path = os.path.join(os_root_dir, initramfs_bin)
+            if utils.test_ssh_path(self._ssh, path):
+                return "update-initramfs"
+
+        return None
+
+    def _rebuild_initramfs(self, os_root_dir):
+        """Rebuild the initramfs inside the mounted OS chroot.
+
+        /dev, /proc, /sys, /run are already bind-mounted by mount_os().
+        """
+        tool = self._detect_initramfs_tool(os_root_dir)
+        if tool == "update-initramfs":
+            self._exec_cmd(
+                "sudo chroot %s update-initramfs -u -k all" % os_root_dir)
+        else:
+            raise exception.CoriolisException(
+                "No initramfs tool found in OS at '%s'; cannot rebuild "
+                "initramfs for LUKS auto-unlock." % os_root_dir)
+
+    def _detect_init_system(self, os_root_dir):
+        path = os.path.join(os_root_dir, "lib/systemd/systemd")
+        if utils.test_ssh_path(self._ssh, path):
+            return "systemd"
+
+        path = os.path.join(os_root_dir, "sbin/openrc")
+        if utils.test_ssh_path(self._ssh, path):
+            return "openrc"
+
+        path = os.path.join(os_root_dir, "sbin/initctl")
+        if utils.test_ssh_path(self._ssh, path):
+            return "upstart"
+
+        return "sysvinit"
+
+    def _register_firstboot_script_systemd(self, os_root_dir):
+        unit_abs = os.path.join(os_root_dir, _SYSTEMD_UNIT_PATH.lstrip("/"))
+        self._write_remote_file(unit_abs, _SYSTEMD_UNIT)
+        self._exec_cmd(
+            "sudo chown root:root %s && sudo chmod 644 %s" % (
+                unit_abs, unit_abs))
+
+        wants_dir = os.path.join(
+            os_root_dir, "etc/systemd/system/multi-user.target.wants")
+        self._exec_cmd("sudo mkdir -p %s" % wants_dir)
+        self._exec_cmd(
+            "sudo ln -sf %s %s/coriolis-luks-firstboot.service" % (
+                _SYSTEMD_UNIT_PATH, wants_dir))
+
+    def _install_luks_firstboot_script(self, os_root_dir):
+        """Write firstboot cleanup script and register with the init system."""
+        initramfs_tool = self._detect_initramfs_tool(os_root_dir)
+        script_content = _LUKS_FIRSTBOOT_SCRIPTS.get(initramfs_tool)
+        if script_content is None:
+            raise exception.CoriolisException(
+                "No initramfs tool found in OS at '%s'; cannot install "
+                "LUKS firstboot cleanup script." % os_root_dir)
+
+        script_abs = os.path.join(
+            os_root_dir, _FIRSTBOOT_SCRIPT_PATH.lstrip("/"))
+        self._exec_cmd("sudo mkdir -p %s" % os.path.dirname(script_abs))
+        self._write_remote_file(script_abs, script_content)
+        self._exec_cmd(
+            "sudo chown root:root %s && sudo chmod 500 %s" % (
+                script_abs, script_abs))
+
+        init_system = self._detect_init_system(os_root_dir)
+        LOG.info(
+            "Detected init system '%s'; installing LUKS firstboot script",
+            init_system)
+
+        if init_system == "systemd":
+            self._register_firstboot_script_systemd(os_root_dir)
+        else:
+            raise exception.CoriolisException(
+                "For VMs with LUKS-encrypted devices, only systemd-based VMs "
+                "are supported.")
+
+    def install_encryption_firstboot_setup(self, os_root_dir):
+        """Install a firstboot script to re-enroll TPM2."""
+        if not self._luks_opened:
+            return
+
+        self._event_manager.progress_update(
+            "Injecting migration keyfile and installing firstboot LUKS "
+            "cleanup")
+
+        self._write_migration_keyfiles(os_root_dir)
+        self._fix_grub_luks_root(os_root_dir)
+        self._rebuild_initramfs(os_root_dir)
+        self._install_luks_firstboot_script(os_root_dir)
+
+    def _fix_grub_luks_root(self, os_root_dir):
+        """Patch grub.cfg to use crypttab mapper names for LUKS root devices.
+
+        update-grub names the root device after the mapper opened by osmount
+        (e.g.: ``/dev/mapper/coriolis_foo``). The initramfs opens LUKS device
+        using the name from crypttab (e.g.: ``/dev/mapper/luks-root``). They
+        must match, so replace the osmount names in every grub.cfg we find.
+        """
+        if not self._luks_opened:
+            return
+
+        crypttab_path = os.path.join(os_root_dir, "etc/crypttab")
+        if not utils.test_ssh_path(self._ssh, crypttab_path):
+            return
+
+        crypttab = utils.read_ssh_file(
+            self._ssh, crypttab_path).decode("utf-8")
+
+        # Build UUID -> crypttab-mapper-name mapping.
+        uuid_to_crypttab_name = {}
+        for line in crypttab.splitlines():
+            stripped = line.strip()
+            if not stripped or stripped.startswith('#'):
+                continue
+
+            parts = stripped.split(None, 3)
+            if len(parts) < 2:
+                continue
+
+            mapper_name = parts[0]
+            m = (re.match(r'UUID=([0-9a-f-]+)', parts[1], re.IGNORECASE) or
+                 re.match(r'.*/by-uuid/([0-9a-f-]+)', parts[1], re.IGNORECASE))
+            if m:
+                uuid_to_crypttab_name[m.group(1).lower()] = mapper_name
+
+        if not uuid_to_crypttab_name:
+            return
+
+        # Map osmount mapper name -> crypttab mapper name for every opened
+        # LUKS device that appears in crypttab.
+        replacements = {}
+        for _, dev_path in self._luks_opened:
+            luks_uuid = self._get_luks_uuid(dev_path)
+            if not luks_uuid:
+                continue
+
+            crypttab_name = uuid_to_crypttab_name.get(luks_uuid.lower())
+            if not crypttab_name:
+                continue
+
+            osmount_name = "coriolis_%s" % os.path.basename(dev_path)
+            if osmount_name != crypttab_name:
+                replacements["/dev/mapper/%s" % osmount_name] = (
+                    "/dev/mapper/%s" % crypttab_name
+                )
+
+        if not replacements:
+            return
+
+        for rel_cfg in ["boot/grub/grub.cfg", "boot/grub2/grub.cfg"]:
+            cfg_path = os.path.join(os_root_dir, rel_cfg)
+            if not utils.test_ssh_path(self._ssh, cfg_path):
+                continue
+
+            content = self._exec_cmd("sudo cat %s" % cfg_path)
+            modified = False
+            for old, new in replacements.items():
+                if old not in content:
+                    continue
+
+                content = content.replace(old, new)
+                modified = True
+                LOG.info("grub.cfg: replaced '%s' -> '%s'", old, new)
+
+            if modified:
+                self._write_remote_file(cfg_path, content)

+ 147 - 0
coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh

@@ -0,0 +1,147 @@
+#!/bin/bash
+
+# Coriolis LUKS firstboot cleanup for initramfs-tools (Debian / Ubuntu).
+# Runs once on first boot to re-enroll TPM2, remove migration keyslots, and
+# rebuild the initramfs so the embedded keyfile is gone.
+
+set -e
+set -x
+KEYFILE_DIR=/etc/luks
+
+# helpers
+
+load_keyfile_map() {
+    [ -f /etc/crypttab ] || return 0
+
+    # Format: <target name> <source device> <key file> <options>
+    while read -r _name dev_spec keyfile _; do
+	# handle only coriolis migration keys.
+        [[ "$keyfile" == "$KEYFILE_DIR/coriolis_"*.key ]] || continue
+
+        local dev
+        if [[ "$dev_spec" == UUID=* ]]; then
+            dev=$(blkid -l -t "$dev_spec" -o device 2>/dev/null)
+        else
+            dev="$dev_spec"
+        fi
+
+	# Add dev: keyfile mapping, if we have a dev.
+        [ -n "$dev" ] && dev_to_keyfile["$dev"]="$keyfile"
+    done < <(grep -Ev '^\s*(#|$)' /etc/crypttab)
+}
+
+wait_for_tpm2() {
+    for dev in /dev/tpmrm0 /dev/tpm0; do
+        local deadline=$(( $(date +%s) + 10 ))
+        until [ -e "$dev" ] || [ "$(date +%s)" -ge "$deadline" ]; do
+	    sleep 1
+        done
+
+        if [ -e "$dev" ]; then
+	    echo "$dev"
+	    return
+	fi
+    done
+}
+
+enroll_clevis() {
+    local tpm2_dev
+
+    tpm2_dev=$(wait_for_tpm2)
+    if [ -z "$tpm2_dev" ]; then
+        echo "ERROR: no TPM2 device found; aborting to avoid lockout." >&2
+        return 1
+    fi
+
+    echo "TPM2 device detected ($tpm2_dev); enrolling via clevis."
+
+    for dev in "${!dev_to_keyfile[@]}"; do
+        local keyfile="${dev_to_keyfile[$dev]}"
+
+	if ! clevis luks bind -k "$keyfile" -d "$dev" tpm2 '{"pcr_ids":""}'; then
+            echo "ERROR: clevis luks bind failed for $dev; aborting to avoid lockout." >&2
+            return 1
+        fi
+
+	if ! cryptsetup luksDump "$dev" 2>/dev/null | grep -q 'clevis'; then
+            echo "ERROR: clevis token not found in LUKS header for $dev; aborting to avoid lockout." >&2
+            return 1
+        fi
+
+	echo "clevis TPM2 enrollment verified for $dev."
+    done
+}
+
+remove_migration_keyslots() {
+    for dev in "${!dev_to_keyfile[@]}"; do
+        local keyfile="${dev_to_keyfile[$dev]}"
+
+	echo "Removing migration keyslot from $dev using $keyfile."
+        if ! cryptsetup luksRemoveKey "$dev" "$keyfile"; then
+            echo "ERROR: failed to remove migration keyslot from $dev." >&2
+            return 1
+        fi
+
+	echo "Migration keyslot removed from $dev."
+
+	sed -i "s|$keyfile|none|g" /etc/crypttab
+    done
+}
+
+deregister_service() {
+    # Only disable, do NOT delete the unit file, or daemon-reload while the
+    # service is still running. systemd would detect "Current command vanished"
+    # and kill this process immediately.
+    systemctl disable coriolis-luks-firstboot.service 2>/dev/null || true
+}
+
+# main
+
+shopt -s nullglob
+keyfiles=("$KEYFILE_DIR"/coriolis_*.key)
+shopt -u nullglob
+
+if [ "${#keyfiles[@]}" -eq 0 ]; then
+    echo "ERROR: no migration keyfiles found in $KEYFILE_DIR; setup is broken." >&2
+    exit 1
+fi
+
+echo "Found ${#keyfiles[@]} migration keyfile(s): ${keyfiles[*]}"
+
+declare -A dev_to_keyfile
+load_keyfile_map
+
+if [ "${#dev_to_keyfile[@]}" -eq 0 ]; then
+    echo "ERROR: no coriolis migration entries found in /etc/crypttab; setup is broken." >&2
+    exit 1
+fi
+
+echo "Found ${#dev_to_keyfile[@]} crypttab entry / entries to process."
+
+for _cmd in clevis clevis-encrypt-tpm2 clevis-luks-bind; do
+    if ! command -v "$_cmd" >/dev/null 2>&1; then
+        echo "ERROR: $_cmd not found; TPM2 enrollment requires clevis, clevis-tpm2, and clevis-luks packages." >&2
+        exit 1
+    fi
+done
+
+enroll_clevis
+
+remove_migration_keyslots
+
+echo "Deleting migration keyfiles."
+rm -f "${keyfiles[@]}"
+rmdir "$KEYFILE_DIR" 2>/dev/null || true
+
+echo "Rebuilding initramfs."
+# Suppress needrestart: it reboots the VM after initramfs rebuild, which doesn't
+# allow us to continue with the rest of the script.
+NEEDRESTART_SUSPEND=1 DEBIAN_FRONTEND=noninteractive update-initramfs -u -k all
+
+deregister_service
+
+echo "Firstboot LUKS cleanup complete."
+rm -f "$0"
+
+echo "Rebooting to finish LUKS first boot cleanup."
+reboot

+ 6 - 2
coriolis/tests/osmorphing/osmount/test_base.py

@@ -846,7 +846,9 @@ class BaseLinuxOSMountToolsTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(
         base.BaseLinuxOSMountTools, '_check_mount_fstab_partitions'
     )
-    def test_mount_os(self, mock_check_mount_fstab_partitions,
+    @mock.patch.object(base.luks_mixin.LinuxLUKSMixin, '_unlock_luks_devices')
+    def test_mount_os(self, mock_unlock_luks_devices,
+                      mock_check_mount_fstab_partitions,
                       mock_get_volume_block_devices,
                       mock_find_dev_with_contents, mock_find_and_mount_root,
                       mock_get_mounted_devices, mock_get_vgs, mock_exec_cmd,
@@ -923,7 +925,9 @@ class BaseLinuxOSMountToolsTestCase(test_base.CoriolisBaseTestCase):
     @mock.patch.object(
         base.BaseLinuxOSMountTools, '_check_mount_fstab_partitions'
     )
-    def test_mount_os_run_xfs(self, mock_check_mount_fstab_partitions,
+    @mock.patch.object(base.luks_mixin.LinuxLUKSMixin, '_unlock_luks_devices')
+    def test_mount_os_run_xfs(self, mock_unlock_luks_devices,
+                              mock_check_mount_fstab_partitions,
                               mock_get_volume_block_devices,
                               mock_find_dev_with_contents,
                               mock_find_and_mount_root,

+ 4 - 3
coriolis/tests/osmorphing/test_manager.py

@@ -253,8 +253,8 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
         self.os_mount_tools.dismount_os.assert_called_once()
 
         mock_get_os_mount_tools.assert_called_once_with(
-            'linux', mock.sentinel.connection_info, self.event_manager, [], 60)
-        mock_EventManager.assert_called_with(self.event_handler)
+            'linux', mock.sentinel.connection_info, self.event_manager, [], 60,
+            osmorphing_info=self.osmorphing_info)
 
         self.os_mount_tools.dismount_os.assert_called_once()
 
@@ -278,7 +278,8 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
             self.event_handler)
 
         mock_get_os_mount_tools.assert_called_once_with(
-            'linux', mock.sentinel.connection_info, self.event_manager, [], 60)
+            'linux', mock.sentinel.connection_info, self.event_manager, [], 60,
+            osmorphing_info=self.osmorphing_info)
         mock_EventManager.assert_called_once_with(self.event_handler)
 
         mock_get_osmorphing_tools_class.assert_not_called()