Ver Fonte

Unlock Bitlocker encrypted volumes

Just like in case of LUKS (https://github.com/cloudbase/coriolis/pull/436),
we'll let Coriolis users specify a BitLocker recovery password.

At the very least it should unlock the OS volume, however it may
be used for other encrypted volumes as well. If no encrypted
volume could be unlocked using the specified key, Coriolis will
error out.

In addition to that, we'll temporarily suspend BitLocker on the
specified volumes. It won't decrypt the volumes, it merely adds a
publicly accessible protector that allows the replica instance to
boot.

Once the replica instance boots, BitLocker will be resumed automatically
and the TPM protector will be reconfigured.
Lucian Petrut há 2 semanas atrás
pai
commit
45c54c4586

+ 65 - 0
coriolis/osmorphing/osmount/windows.py

@@ -6,6 +6,7 @@ import uuid
 
 from oslo_log import log as logging
 
+from coriolis import constants
 from coriolis import exception
 from coriolis.osmorphing.osmount import base
 from coriolis import utils
@@ -223,9 +224,73 @@ class WindowsMountTools(base.BaseOSMountTools):
                         f"Error was: {utils.get_exception_details()}")
             self._rebring_disks_online(disk_nums=disk_nums)
 
+    def _get_encrypted_volume_ids(self):
+        out = self._conn.exec_ps_command(
+            'gwmi -ns "Root\\CIMV2\\Security\\MicrosoftVolumeEncryption" '
+            '-class Win32_EncryptableVolume | % {$_.DeviceID}')
+        return [x for x in out.replace("\r\n", "\n").split("\n") if x]
+
+    def _unlock_encrypted_volume(self, volume_id: str, recovery_password: str):
+        self._conn.exec_ps_command(
+            f'manage-bde -unlock "{volume_id}" '
+            f'-RecoveryPassword "{recovery_password}"')
+
+    def _suspend_bitlocker(self, volume_id: str):
+        """Suspend BitLocker until the next reboot for a given volume.
+
+        It doesn't decrypt the device, it just adds a publicly accessible
+        BitLocker protector that automatically unlocks the volume.
+
+        When the replica instance boots, BitLocker will be automatically
+        enabled again and the TPM protector reconfigured.
+        """
+        self._conn.exec_ps_command(f'Suspend-BitLocker "{volume_id}"')
+
+    def _unlock_encrypted_volumes(self):
+        recovery_password = self._osmorphing_info.get(
+            constants.ENCRYPTED_DISKS_PASS)
+        if not recovery_password:
+            LOG.info("No encrypted disk password specified, "
+                     "skipping BitLocker unlock.")
+            return
+
+        encrypted_volume_ids = self._get_encrypted_volume_ids()
+        if not encrypted_volume_ids:
+            LOG.warning("Received encrypted disk password but no "
+                        "BitLocker encrypted volumes found.")
+            return
+
+        unlocked = False
+        for encrypted_volume_id in encrypted_volume_ids:
+            try:
+                self._unlock_encrypted_volume(
+                    encrypted_volume_id, recovery_password)
+                LOG.info(
+                    "Successfully unlocked BitLocker encrypted volume: %s",
+                    encrypted_volume_id)
+                unlocked = True
+            except Exception:
+                LOG.info(
+                    "Could not unlock volume %s using the specified "
+                    "recovery password.",
+                    encrypted_volume_id)
+                continue
+
+            # Suspend BitLocker until the replica boots.
+            #
+            # We'll intentionally propagate the failure if we managed to
+            # unlock the volume but failed to suspend BitLocker.
+            self._suspend_bitlocker(encrypted_volume_id)
+
+        if not unlocked:
+            raise exception.CoriolisException(
+                "Could not unlock any volume using the specified "
+                "BitLocker recovery password.")
+
     def mount_os(self):
         self._set_basic_disks_rw_mode()
         self._bring_disks_online()
+        self._unlock_encrypted_volumes()
         self._set_volumes_drive_letter()
         fs_roots = utils.retry_on_error(sleep_seconds=5)(self._get_fs_roots)(
             fail_if_empty=True)

+ 145 - 0
coriolis/tests/osmorphing/osmount/test_windows.py

@@ -4,6 +4,7 @@
 import logging
 from unittest import mock
 
+from coriolis import constants
 from coriolis import exception
 from coriolis.osmorphing.osmount import windows
 from coriolis.tests import test_base
@@ -336,3 +337,147 @@ class WindowsMountToolsTestCase(test_base.CoriolisBaseTestCase):
 
         self.tools._conn.exec_ps_command.assert_called_once_with(
             '(Get-Disk | Where-Object { $_.IsBoot -eq $False }).Number')
+
+    def test_get_encrypted_volume_ids(self):
+        # Powershell wouldn't mix line endings, we're just ensuring that
+        # we can properly handle both line ending types.
+        self.tools._conn.exec_ps_command.return_value = (
+            "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\\r\n"
+            "\\\\?\\Volume{7723f315-c13c-450c-8be6-f58e06f4ad45}\\\r\n"
+            "\\\\?\\Volume{cb7399af-8f6a-4a7b-a55c-e885ec3ff5fd}\\\n"
+        )
+
+        exp_ret = [
+            "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\",
+            "\\\\?\\Volume{7723f315-c13c-450c-8be6-f58e06f4ad45}\\",
+            "\\\\?\\Volume{cb7399af-8f6a-4a7b-a55c-e885ec3ff5fd}\\",
+        ]
+        ret = self.tools._get_encrypted_volume_ids()
+
+        self.assertEqual(exp_ret, ret)
+        self.tools._conn.exec_ps_command.assert_called_once_with(
+            'gwmi -ns "Root\\CIMV2\\Security\\MicrosoftVolumeEncryption" '
+            '-class Win32_EncryptableVolume | % {$_.DeviceID}')
+
+    def test_unlock_encrypted_volume(self):
+        vol = "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\"
+        password = "6010ba47-28e4-4105-8b0a-69eed0a54283"
+
+        self.tools._unlock_encrypted_volume(vol, password)
+
+        exp_cmd = 'manage-bde -unlock "%s" -RecoveryPassword "%s"' % (
+            vol, password)
+        self.tools._conn.exec_ps_command.assert_called_once_with(
+            exp_cmd)
+
+    def test_suspend_bitlocker(self):
+        vol = "\\\\?\\Volume{2750d574-b333-4e7b-a0a2-d739279d39e9}\\"
+
+        self.tools._suspend_bitlocker(vol)
+
+        exp_cmd = 'Suspend-BitLocker "%s"' % (vol)
+        self.tools._conn.exec_ps_command.assert_called_once_with(
+            exp_cmd)
+
+    @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
+    def test_unlock_encrypted_volumes_no_password(
+        self,
+        mock_get_encrypted_volume_ids,
+    ):
+        self.tools._unlock_encrypted_volumes()
+        mock_get_encrypted_volume_ids.assert_not_called()
+
+    @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
+    @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
+    def test_unlock_encrypted_volumes_not_encrypted(
+        self,
+        mock_unlock_encrypted_volume,
+        mock_get_encrypted_volume_ids,
+    ):
+        fake_pass = "fake-recovery-password"
+        self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass
+
+        mock_get_encrypted_volume_ids.return_value = []
+
+        self.tools._unlock_encrypted_volumes()
+        mock_unlock_encrypted_volume.assert_not_called()
+
+    @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
+    @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
+    @mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker")
+    def test_unlock_encrypted_volumes_all_failed(
+        self,
+        mock_suspend_bitlocker,
+        mock_unlock_encrypted_volume,
+        mock_get_encrypted_volume_ids,
+    ):
+        fake_pass = "fake-recovery-password"
+        self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass
+
+        mock_get_encrypted_volume_ids.return_value = [
+            mock.sentinel.volume0,
+            mock.sentinel.volume1,
+        ]
+        mock_unlock_encrypted_volume.side_effect = ValueError
+
+        self.assertRaises(
+            exception.CoriolisException,
+            self.tools._unlock_encrypted_volumes,
+        )
+
+    @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
+    @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
+    @mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker")
+    def test_unlock_encrypted_volumes_one_failed(
+        self,
+        mock_suspend_bitlocker,
+        mock_unlock_encrypted_volume,
+        mock_get_encrypted_volume_ids,
+    ):
+        fake_pass = "fake-recovery-password"
+        self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass
+
+        encrypted_volume_ids = [
+            mock.sentinel.volume0,
+            mock.sentinel.volume1,
+        ]
+        mock_get_encrypted_volume_ids.return_value = encrypted_volume_ids
+        mock_unlock_encrypted_volume.side_effect = [ValueError, None]
+
+        self.tools._unlock_encrypted_volumes()
+
+        mock_unlock_encrypted_volume.assert_has_calls(
+            [mock.call(vol_id, fake_pass) for vol_id in encrypted_volume_ids])
+        mock_suspend_bitlocker.assert_called_once_with(
+            mock.sentinel.volume1)
+
+    @mock.patch.object(windows.WindowsMountTools, "_get_encrypted_volume_ids")
+    @mock.patch.object(windows.WindowsMountTools, "_unlock_encrypted_volume")
+    @mock.patch.object(windows.WindowsMountTools, "_suspend_bitlocker")
+    def test_unlock_encrypted_volumes_suspend_failed(
+        self,
+        mock_suspend_bitlocker,
+        mock_unlock_encrypted_volume,
+        mock_get_encrypted_volume_ids,
+    ):
+        fake_pass = "fake-recovery-password"
+        self.tools._osmorphing_info[constants.ENCRYPTED_DISKS_PASS] = fake_pass
+
+        encrypted_volume_ids = [
+            mock.sentinel.volume0,
+            mock.sentinel.volume1,
+            mock.sentinel.volume2
+        ]
+        mock_get_encrypted_volume_ids.return_value = encrypted_volume_ids
+        mock_unlock_encrypted_volume.side_effect = [ValueError, None, None]
+        mock_suspend_bitlocker.side_effect = [IOError, None]
+
+        # It should error out immediately when failing to suspend BitLocker.
+        self.assertRaises(
+            IOError,
+            self.tools._unlock_encrypted_volumes,
+        )
+        mock_unlock_encrypted_volume.assert_has_calls(
+            [mock.call(mock.sentinel.volume0, fake_pass),
+             mock.call(mock.sentinel.volume1, fake_pass)]
+        )

+ 1 - 0
coriolis/tests/osmorphing/test_manager.py

@@ -255,6 +255,7 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
         mock_get_os_mount_tools.assert_called_once_with(
             'linux', mock.sentinel.connection_info, self.event_manager, [], 60,
             osmorphing_info=self.osmorphing_info)
+        mock_EventManager.assert_called_with(self.event_handler)
 
         self.os_mount_tools.dismount_os.assert_called_once()