Просмотр исходного кода

tests: Extract osmorphing_utils from integration test utils

Move OS morphing helpers (write_os_image_to_disk, make_luks_device,
path_exists_on_device, etc.) into a dedicated osmorphing_utils module.

Refactored utils to use a context manager, simplifying the code.
Claudiu Belu 1 неделя назад
Родитель
Сommit
4eb11fbf9f

+ 5 - 5
coriolis/tests/integration/deployments/test_luks_osmorphing.py

@@ -23,7 +23,7 @@ import unittest
 from coriolis import constants
 from coriolis import constants
 from coriolis.tests.integration import base as integration_base
 from coriolis.tests.integration import base as integration_base
 from coriolis.tests.integration import harness as integration_harness
 from coriolis.tests.integration import harness as integration_harness
-from coriolis.tests.integration import utils as test_utils
+from coriolis.tests.integration import osmorphing_utils
 
 
 _LUKS_PASSPHRASE = "it-luks-encrypted"
 _LUKS_PASSPHRASE = "it-luks-encrypted"
 
 
@@ -53,7 +53,7 @@ class _LUKSOSMorphingMixin:
         self._prepare_src_device()
         self._prepare_src_device()
 
 
     def _prepare_src_device(self):
     def _prepare_src_device(self):
-        test_utils.make_luks_device(
+        osmorphing_utils.make_luks_device(
             self._src_device, self._key_file, "ubuntu:24.04")
             self._src_device, self._key_file, "ubuntu:24.04")
 
 
         dest_env = {
         dest_env = {
@@ -66,8 +66,8 @@ class _LUKSOSMorphingMixin:
         )
         )
 
 
     def _check_path_exists(self, device, path):
     def _check_path_exists(self, device, path):
-        with test_utils.luks_open(device, self._key_file) as mapper_path:
-            return test_utils.path_exists_on_device(mapper_path, path)
+        with osmorphing_utils.luks_open(device, self._key_file) as mapper_path:
+            return osmorphing_utils.path_exists_on_device(mapper_path, path)
 
 
     def _assert_luks_common_firstboot_files(self):
     def _assert_luks_common_firstboot_files(self):
         dst_basename = os.path.basename(self._dst_device)
         dst_basename = os.path.basename(self._dst_device)
@@ -121,7 +121,7 @@ class LUKSRockyLinuxOSMorphingDeploymentTest(
     _SCSI_DEBUG_SIZE_MB = 777
     _SCSI_DEBUG_SIZE_MB = 777
 
 
     def _prepare_src_device(self):
     def _prepare_src_device(self):
-        test_utils.make_luks_device(
+        osmorphing_utils.make_luks_device(
             self._src_device, self._key_file, "rockylinux:9")
             self._src_device, self._key_file, "rockylinux:9")
 
 
         dest_env = {
         dest_env = {

+ 13 - 10
coriolis/tests/integration/deployments/test_osmorphing.py

@@ -14,7 +14,7 @@ import uuid
 
 
 from coriolis.tests.integration import base as integration_base
 from coriolis.tests.integration import base as integration_base
 from coriolis.tests.integration import harness as integration_harness
 from coriolis.tests.integration import harness as integration_harness
-from coriolis.tests.integration import utils as test_utils
+from coriolis.tests.integration import osmorphing_utils
 
 
 
 
 class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
 class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
@@ -33,18 +33,21 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
 
 
     def setUp(self):
     def setUp(self):
         super().setUp()
         super().setUp()
-        test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04")
+        osmorphing_utils.write_os_image_to_disk(
+            self._src_device, "ubuntu:24.04")
 
 
     def test_deployment_with_os_morphing(self):
     def test_deployment_with_os_morphing(self):
         self.assertFalse(
         self.assertFalse(
-            test_utils.path_exists_on_device(self._src_device, "usr/bin/jq"),
+            osmorphing_utils.path_exists_on_device(
+                self._src_device, "usr/bin/jq"),
             "jq was found on the source device before OS morphing",
             "jq was found on the source device before OS morphing",
         )
         )
 
 
         self._execute_transfer_and_deployment()
         self._execute_transfer_and_deployment()
 
 
         self.assertTrue(
         self.assertTrue(
-            test_utils.path_exists_on_device(self._dst_device, "usr/bin/jq"),
+            osmorphing_utils.path_exists_on_device(
+                self._dst_device, "usr/bin/jq"),
             "jq was not found on the destination device after OS morphing",
             "jq was not found on the destination device after OS morphing",
         )
         )
 
 
@@ -64,7 +67,7 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
         }
         }
         self._execute_transfer_and_deployment(deployment_kwargs)
         self._execute_transfer_and_deployment(deployment_kwargs)
 
 
-        file_contents = test_utils.read_file_from_device(
+        file_contents = osmorphing_utils.read_file_from_device(
             self._dst_device,
             self._dst_device,
             "cookie")
             "cookie")
         self.assertEqual(expected_string, file_contents)
         self.assertEqual(expected_string, file_contents)
@@ -82,7 +85,7 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
         }
         }
         self._execute_transfer_and_deployment(deployment_kwargs)
         self._execute_transfer_and_deployment(deployment_kwargs)
 
 
-        file_contents = test_utils.read_file_from_device(
+        file_contents = osmorphing_utils.read_file_from_device(
             self._dst_device,
             self._dst_device,
             "cookie")
             "cookie")
         self.assertEqual(expected_string, file_contents)
         self.assertEqual(expected_string, file_contents)
@@ -124,10 +127,10 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
         }
         }
         self._execute_transfer_and_deployment(deployment_kwargs)
         self._execute_transfer_and_deployment(deployment_kwargs)
 
 
-        pre_mounts = test_utils.read_file_from_device(
+        pre_mounts = osmorphing_utils.read_file_from_device(
             self._dst_device,
             self._dst_device,
             "pre_mounts")
             "pre_mounts")
-        post_mounts = test_utils.read_file_from_device(
+        post_mounts = osmorphing_utils.read_file_from_device(
             self._dst_device,
             self._dst_device,
             "post_mounts")
             "post_mounts")
 
 
@@ -167,7 +170,7 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
         # actually get executed. We'll merely verify that those files
         # actually get executed. We'll merely verify that those files
         # have been injected at the expected location.
         # have been injected at the expected location.
         first_boot_script_dir = "usr/lib/coriolis/firstboot/user"
         first_boot_script_dir = "usr/lib/coriolis/firstboot/user"
-        first_boot_scripts = test_utils.list_files_from_device(
+        first_boot_scripts = osmorphing_utils.list_files_from_device(
             self._dst_device, first_boot_script_dir)
             self._dst_device, first_boot_script_dir)
         if not first_boot_scripts:
         if not first_boot_scripts:
             raise AssertionError("Couldn't find first boot script dir.")
             raise AssertionError("Couldn't find first boot script dir.")
@@ -177,7 +180,7 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
             if re.match(r"\d+-\w+\.sh", file_name):
             if re.match(r"\d+-\w+\.sh", file_name):
                 first_boot_script_path = os.path.join(
                 first_boot_script_path = os.path.join(
                     first_boot_script_dir, file_name)
                     first_boot_script_dir, file_name)
-                first_boot_script = test_utils.read_file_from_device(
+                first_boot_script = osmorphing_utils.read_file_from_device(
                     self._dst_device,
                     self._dst_device,
                     first_boot_script_path)
                     first_boot_script_path)
                 if payload == first_boot_script:
                 if payload == first_boot_script:

+ 162 - 0
coriolis/tests/integration/osmorphing_utils.py

@@ -0,0 +1,162 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+OS morphing integration test utilities.
+
+LUKS / OS morphing helpers (loopback, LUKS, bootable VM disk setup)
+"""
+
+import contextlib
+import os
+import subprocess
+import tempfile
+from typing import Iterator
+
+from oslo_log import log as logging
+
+from coriolis.tests.integration.utils import _run
+
+LOG = logging.getLogger(__name__)
+
+
+# LUKS / OS Morphing utils
+
+
+@contextlib.contextmanager
+def mounted(device_path, read_only=False) -> Iterator[str]:
+    """Mount *device_path* in a temporary directory, yield the mount point."""
+    opts = ["-o", "ro"] if read_only else []
+
+    with tempfile.TemporaryDirectory() as mount_point:
+        _run(["mount"] + opts + [device_path, mount_point])
+        try:
+            yield mount_point
+        finally:
+            _run(["umount", mount_point])
+
+
+def write_os_image_to_disk(device_path, container_image):
+    """Write a real Linux rootfs to *device_path*.
+
+    Exports the filesystem of a container image via ``docker export`` and
+    extracts it onto an ext4-formatted device, giving a chroot-able root with
+    that container OS' standard filesystem and binaries present.
+    """
+    _run(["mkfs.ext4", "-F", device_path])
+
+    result = _run(["docker", "create", container_image])
+    container_id = result.stdout.decode().strip()
+
+    try:
+        with mounted(device_path) as mount_point:
+            export = subprocess.Popen(
+                ["docker", "export", container_id],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.DEVNULL,
+            )
+            subprocess.run(
+                ["tar", "-x", "-C", mount_point],
+                stdin=export.stdout,
+                stdout=subprocess.DEVNULL,
+                stderr=subprocess.DEVNULL,
+                check=True,
+            )
+            export.stdout.close()
+            export.wait()
+
+    finally:
+        _run(["docker", "rm", "-f", container_id], check=False)
+
+
+def _fixup_luks_inner_os(mapper_path, luks_uuid):
+    """Patch the OS image inside a LUKS mapper to work with OS morphing.
+
+    Docker container images are not full OS installs, so a few things need
+    fixing before Coriolis can morph them:
+
+    1. /etc/crypttab is missing: the LUKS mixin needs a UUID= entry there to
+       configure initramfs auto-unlock.
+    2. /boot may be absent (e.g. Rocky Linux 9 Docker image): the osmount
+       root-finder requires etc, bin, sbin, and boot to all be present.
+    """
+    mapper_name = "luks-%s" % luks_uuid
+    crypttab_entry = "%s\tUUID=%s\tnone\tluks\n" % (mapper_name, luks_uuid)
+
+    with mounted(mapper_path) as mount_point:
+        etc_dir = os.path.join(mount_point, "etc")
+        os.makedirs(etc_dir, exist_ok=True)
+        crypttab_path = os.path.join(etc_dir, "crypttab")
+
+        with open(crypttab_path, "w") as fh:
+            fh.write(crypttab_entry)
+
+        os.makedirs(os.path.join(mount_point, "boot"), exist_ok=True)
+
+
+def make_luks_device(device_path, key_file, container_image):
+    """Format *device_path* with LUKS and write a minimal Linux OS inside.
+
+    The mapper device is opened only for the duration of the call. It is closed
+    before returning, leaving the raw device encrypted.
+
+    Exports the filesystem the container image onto the given device, then
+    writes a /etc/crypttab entry so that the LUKS mixin can find the UUID
+    when configuring initramfs auto-unlock during OS morphing.
+    """
+    _run([
+        "cryptsetup", "luksFormat", "--batch-mode", "--key-file", key_file,
+        device_path,
+    ])
+
+    luks_uuid = _run(
+        ["cryptsetup", "luksUUID", device_path]).stdout.decode().strip()
+
+    with luks_open(device_path, key_file) as mapper_path:
+        write_os_image_to_disk(mapper_path, container_image)
+        _fixup_luks_inner_os(mapper_path, luks_uuid)
+
+
+@contextlib.contextmanager
+def luks_open(device_path, key_file):
+    mapper_name = "coriolis_luks_setup_%s" % os.path.basename(device_path)
+    _run([
+        "cryptsetup", "luksOpen", "--key-file", key_file, device_path,
+        mapper_name,
+    ])
+
+    try:
+        yield "/dev/mapper/%s" % mapper_name
+    finally:
+        _run(["cryptsetup", "luksClose", mapper_name])
+
+
+def path_exists_on_device(device_path, rel_path):
+    """Checks if *rel_path* exists on the filesystem of *device_path*.
+
+    Uses lexists so dangling symlinks (e.g. absolute targets only valid inside
+    the OS, not on the host) are still reported as present.
+    """
+    with mounted(device_path, read_only=True) as mount_point:
+        return os.path.lexists(os.path.join(mount_point, rel_path))
+
+
+def read_file_from_device(device_path, rel_path):
+    """Retrieves the specified file from the filesystem of *device_path*.
+
+    Mounts the device read-only into a temporary directory, reads the file,
+    then unmounts.
+    """
+    with mounted(device_path, read_only=True) as mount_point:
+        with open(os.path.join(mount_point, rel_path)) as f:
+            return f.read()
+
+
+def list_files_from_device(device_path, rel_path):
+    """Enumerates files from the filesystem of *device_path*.
+
+    Mounts the device read-only into a temporary directory, enumerates files,
+    then unmounts.
+    """
+    with mounted(device_path, read_only=True) as mount_point:
+        return os.listdir(os.path.join(mount_point, rel_path))

+ 0 - 155
coriolis/tests/integration/utils.py

@@ -5,7 +5,6 @@
 Integration test utils.
 Integration test utils.
 """
 """
 
 
-import contextlib
 import json
 import json
 import os
 import os
 import socket
 import socket
@@ -352,157 +351,3 @@ def unplug_device_from_container(container_id, device_path):
         "nsenter", "--target", str(pid), "--mount", "--",
         "nsenter", "--target", str(pid), "--mount", "--",
         "rm", "-f", device_path,
         "rm", "-f", device_path,
     ], check=False)
     ], check=False)
-
-
-# OS Morphing utils
-
-
-def write_os_image_to_disk(device_path, container_image):
-    """Write a real Linux rootfs to *device_path*.
-
-    Exports the filesystem of a container image via ``docker export`` and
-    extracts it onto an ext4-formatted device, giving a chroot-able root with
-    that container OS' standard filesystem and binaries present.
-    """
-    _run(["mkfs.ext4", "-F", device_path])
-
-    result = _run(["docker", "create", container_image])
-    container_id = result.stdout.decode().strip()
-
-    try:
-        with tempfile.TemporaryDirectory() as mount_point:
-            _run(["mount", device_path, mount_point])
-
-            try:
-                export = subprocess.Popen(
-                    ["docker", "export", container_id],
-                    stdout=subprocess.PIPE,
-                    stderr=subprocess.DEVNULL,
-                )
-                subprocess.run(
-                    ["tar", "-x", "-C", mount_point],
-                    stdin=export.stdout,
-                    stdout=subprocess.DEVNULL,
-                    stderr=subprocess.DEVNULL,
-                    check=True,
-                )
-                export.stdout.close()
-                export.wait()
-            finally:
-                _run(["umount", mount_point])
-
-    finally:
-        _run(["docker", "rm", "-f", container_id], check=False)
-
-
-def _fixup_luks_inner_os(mapper_path, luks_uuid):
-    """Patch the OS image inside a LUKS mapper to work with OS morphing.
-
-    Docker container images are not full OS installs, so a few things need
-    fixing before Coriolis can morph them:
-
-    1. /etc/crypttab is missing: the LUKS mixin needs a UUID= entry there to
-       configure initramfs auto-unlock.
-    2. /boot may be absent (e.g. Rocky Linux 9 Docker image): the osmount
-       root-finder requires etc, bin, sbin, and boot to all be present.
-    """
-    mapper_name = "luks-%s" % luks_uuid
-    crypttab_entry = "%s\tUUID=%s\tnone\tluks\n" % (mapper_name, luks_uuid)
-
-    with tempfile.TemporaryDirectory() as mount_point:
-        _run(["mount", mapper_path, mount_point])
-
-        try:
-            etc_dir = os.path.join(mount_point, "etc")
-            os.makedirs(etc_dir, exist_ok=True)
-            crypttab_path = os.path.join(etc_dir, "crypttab")
-
-            with open(crypttab_path, "w") as fh:
-                fh.write(crypttab_entry)
-
-            os.makedirs(os.path.join(mount_point, "boot"), exist_ok=True)
-        finally:
-            _run(["umount", mount_point])
-
-
-def make_luks_device(device_path, key_file, container_image):
-    """Format *device_path* with LUKS and write a minimal Linux OS inside.
-
-    The mapper device is opened only for the duration of the call. It is closed
-    before returning, leaving the raw device encrypted.
-
-    Exports the filesystem the container image onto the given device, then
-    writes a /etc/crypttab entry so that the LUKS mixin can find the UUID
-    when configuring initramfs auto-unlock during OS morphing.
-    """
-    _run([
-        "cryptsetup", "luksFormat", "--batch-mode", "--key-file", key_file,
-        device_path,
-    ])
-
-    luks_uuid = _run(
-        ["cryptsetup", "luksUUID", device_path]).stdout.decode().strip()
-
-    with luks_open(device_path, key_file) as mapper_path:
-        write_os_image_to_disk(mapper_path, container_image)
-        _fixup_luks_inner_os(mapper_path, luks_uuid)
-
-
-@contextlib.contextmanager
-def luks_open(device_path, key_file):
-    mapper_name = "coriolis_luks_setup_%s" % os.path.basename(device_path)
-    _run([
-        "cryptsetup", "luksOpen", "--key-file", key_file, device_path,
-        mapper_name,
-    ])
-
-    try:
-        yield "/dev/mapper/%s" % mapper_name
-    finally:
-        _run(["cryptsetup", "luksClose", mapper_name])
-
-
-def path_exists_on_device(device_path, rel_path):
-    """Checks if *rel_path* exists on the filesystem of *device_path*.
-
-    Uses lexists so dangling symlinks (e.g. absolute targets only valid inside
-    the OS, not on the host) are still reported as present.
-    """
-    with tempfile.TemporaryDirectory() as mount_point:
-        _run(["mount", "-o", "ro", device_path, mount_point])
-
-        try:
-            return os.path.lexists(os.path.join(mount_point, rel_path))
-        finally:
-            _run(["umount", mount_point])
-
-
-def read_file_from_device(device_path, rel_path):
-    """Retrieves the specified file from the filesystem of *device_path*.
-
-    Mounts the device read-only into a temporary directory, reads the file,
-    then unmounts.
-    """
-    with tempfile.TemporaryDirectory() as mount_point:
-        _run(["mount", "-o", "ro", device_path, mount_point])
-
-        try:
-            with open(os.path.join(mount_point, rel_path)) as f:
-                return f.read()
-        finally:
-            _run(["umount", mount_point])
-
-
-def list_files_from_device(device_path, rel_path):
-    """Enumerates files from the filesystem of *device_path*.
-
-    Mounts the device read-only into a temporary directory, enumerates files,
-    then unmounts.
-    """
-    with tempfile.TemporaryDirectory() as mount_point:
-        _run(["mount", "-o", "ro", device_path, mount_point])
-
-        try:
-            return os.listdir(os.path.join(mount_point, rel_path))
-        finally:
-            _run(["umount", mount_point])