Просмотр исходного кода

Sanitize commands before logging them

Coriolis currently logs sensitive information, the executed PowerShell
commands among other things:
https://github.com/cloudbase/coriolis/pull/450#discussion_r3371641045

We'll reuse the sanitization helpers from oslo.utils.
Lucian Petrut 2 недель назад
Родитель
Сommit
0ad6fd7fb7

+ 2 - 0
coriolis/tests/osmorphing/osmount/test_windows.py

@@ -4,6 +4,8 @@
 import logging
 from unittest import mock
 
+from oslo_utils import strutils
+
 from coriolis import constants
 from coriolis import exception
 from coriolis.osmorphing.osmount import windows

+ 7 - 7
coriolis/tests/osmorphing/test_manager.py

@@ -76,16 +76,16 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
         result = manager.run_os_detect(
             self.provider, self.destination_provider, self.worker_connection,
             mock.sentinel.os_type, mock.sentinel.os_root_dir,
-            mock.sentinel.osmorphing_info, tools_environment={})
+            self.osmorphing_info, tools_environment={})
 
         self.assertEqual(result, mock_detect_os.return_value)
 
         self.provider.get_custom_os_detect_tools.\
             assert_called_once_with(mock.sentinel.os_type,
-                                    mock.sentinel.osmorphing_info)
+                                    self.osmorphing_info)
         self.destination_provider.get_custom_os_detect_tools.\
             assert_called_once_with(mock.sentinel.os_type,
-                                    mock.sentinel.osmorphing_info)
+                                    self.osmorphing_info)
         mock_detect_os.assert_called_once_with(
             self.worker_connection, mock.sentinel.os_type,
             mock.sentinel.os_root_dir,
@@ -112,7 +112,7 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
 
         result = manager.get_osmorphing_tools_class_for_provider(
             self.provider, mock.sentinel.detected_os_info,
-            mock.sentinel.os_type, mock.sentinel.osmorphing_info)
+            mock.sentinel.os_type, self.osmorphing_info)
 
         self.assertEqual(result, MockToolsClass)
 
@@ -126,7 +126,7 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
         self.assertRaises(exception.InvalidOSMorphingTools,
                           manager.get_osmorphing_tools_class_for_provider,
                           self.provider, mock.sentinel.detected_os_info,
-                          mock.sentinel.os_type, mock.sentinel.osmorphing_info)
+                          mock.sentinel.os_type, self.osmorphing_info)
 
     def test_get_osmorphing_tools_class_for_provider_invalid_os_params(self):
         class MockToolsClass(base_osmorphing.BaseOSMorphingTools):
@@ -144,7 +144,7 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
             'coriolis.osmorphing.manager', level=logging.WARN):
             result = manager.get_osmorphing_tools_class_for_provider(
                 self.provider, mock.sentinel.detected_os_info,
-                mock.sentinel.os_type, mock.sentinel.osmorphing_info)
+                mock.sentinel.os_type, self.osmorphing_info)
 
         self.assertIsNone(result)
 
@@ -164,7 +164,7 @@ class ManagerTestCase(test_base.CoriolisBaseTestCase):
             'coriolis.osmorphing.manager', level=logging.DEBUG):
             result = manager.get_osmorphing_tools_class_for_provider(
                 self.provider, self.detected_os_info,
-                mock.sentinel.os_type, mock.sentinel.osmorphing_info)
+                mock.sentinel.os_type, self.osmorphing_info)
 
         self.assertIsNone(result)
 

+ 12 - 4
coriolis/tests/test_wsman.py

@@ -1,6 +1,7 @@
 # Copyright 2023 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import logging
 from unittest import mock
 
 import requests
@@ -20,7 +21,8 @@ class WSManConnectionTestCase(test_base.CoriolisBaseTestCase):
         self.conn._protocol = mock.Mock()
         self.conn._conn_timeout = 10
         self.cmd = "test_cmd"
-        self.args = ["arg1", "arg2"]
+        self.args = ["-RecoveryPassword", "'ShouldNotBeLogged'"]
+        self.sanitized_cmd = "test_cmd -RecoveryPassword '***'"
         self.url = "http://example.com/file"
         self.remote_path = "/remote/path"
 
@@ -134,8 +136,13 @@ class WSManConnectionTestCase(test_base.CoriolisBaseTestCase):
     def test_exec_command(self):
         self.conn._protocol.get_command_output.return_value = (
             "std_out", "std_err", 0)
-        std_out = self.conn.exec_command(self.cmd, self.args)
-        self.assertEqual(std_out, "std_out")
+        exp_sanitized_log = (
+            "DEBUG:coriolis.wsman:Executing WSMAN command: %s" %
+            self.sanitized_cmd)
+        with self.assertLogs("coriolis.wsman", level=logging.DEBUG) as log_cm:
+            std_out = self.conn.exec_command(self.cmd, self.args)
+            self.assertEqual(std_out, "std_out")
+        self.assertIn(exp_sanitized_log, log_cm.output)
 
     def test_exec_command_exception(self):
         self.conn._protocol.get_command_output.return_value = (
@@ -155,7 +162,8 @@ class WSManConnectionTestCase(test_base.CoriolisBaseTestCase):
                 '-NonInteractive',
                 '-ExecutionPolicy', 'RemoteSigned',
             ],
-            timeout=None)
+            timeout=None,
+            sanitizable=False)
         self.assertEqual(result, "std_out")
 
     def test_test_path(self):

+ 4 - 2
coriolis/utils.py

@@ -27,6 +27,7 @@ from io import StringIO
 from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_serialization import jsonutils
+from oslo_utils import strutils
 
 import netifaces
 import paramiko
@@ -316,6 +317,7 @@ def list_ssh_dir(ssh, remote_path):
 
 
 def _exec_ssh_cmd(ssh, cmd, environment=None, get_pty=False, timeout=None):
+    sanitized_cmd = strutils.mask_password(cmd)
     remote_str = "<undeterminable>"
     if timeout is not None:
         timeout = float(timeout)
@@ -327,7 +329,7 @@ def _exec_ssh_cmd(ssh, cmd, environment=None, get_pty=False, timeout=None):
             get_exception_details())
     LOG.debug(
         "Executing the following SSH command on '%s' with "
-        "environment %s: '%s'", remote_str, environment, cmd)
+        "environment %s: '%s'", remote_str, environment, sanitized_cmd)
 
     _, stdout, stderr = ssh.exec_command(
         cmd, environment=environment, get_pty=get_pty, timeout=timeout)
@@ -343,7 +345,7 @@ def _exec_ssh_cmd(ssh, cmd, environment=None, get_pty=False, timeout=None):
         msg = (
             "Command \"%s\" failed on host '%s' with exit code: %s\n"
             "stdout: %s\nstd_err: %s" %
-            (cmd, remote_str, exit_code, stdout_str, stderr_str))
+            (sanitized_cmd, remote_str, exit_code, stdout_str, stderr_str))
         if (exit_code == 127 or
                 "command not found" in stdout_str or
                 "command not found" in stderr_str):

+ 24 - 8
coriolis/wsman.py

@@ -5,6 +5,7 @@ import base64
 import requests
 
 from oslo_log import log as logging
+from oslo_utils import strutils
 
 from winrm import exceptions as winrm_exceptions
 from winrm import protocol
@@ -97,7 +98,13 @@ class WSManConnection(object):
     @utils.retry_on_error(
         terminal_exceptions=[winrm_exceptions.InvalidCredentialsError,
                              exception.OSMorphingWinRMOperationTimeout])
-    def _exec_command(self, cmd, args=[], timeout=None):
+    def _exec_command(self, cmd, args=[], timeout=None, sanitizable=True):
+        if sanitizable:
+            sanitized_cmd = strutils.mask_password(
+                "%s %s" % (cmd, " ".join(args)))
+        else:
+            sanitized_cmd = "***"
+
         timeout = int(timeout or self._conn_timeout)
         self.set_timeout(timeout)
         shell_id = None
@@ -111,7 +118,7 @@ class WSManConnection(object):
                     shell_id, command_id)
             except requests.exceptions.ReadTimeout:
                 raise exception.OSMorphingWinRMOperationTimeout(
-                    cmd=("%s %s" % (cmd, " ".join(args))), timeout=timeout)
+                    cmd=sanitized_cmd, timeout=timeout)
             finally:
                 self._protocol.cleanup_command(shell_id, command_id)
 
@@ -132,21 +139,29 @@ class WSManConnection(object):
             if shell_id:
                 self._protocol.close_shell(shell_id)
 
-    def exec_command(self, cmd, args=[], timeout=None):
-        LOG.debug("Executing WSMAN command: %s", str([cmd] + args))
+    def exec_command(self, cmd, args=[], timeout=None, sanitizable=True):
+        # Our sanitization helpers do not work for base64 encoded commands,
+        # in which case we'll avoid logging it so that we won't leak
+        # sensitive information.
+        if sanitizable:
+            sanitized_cmd = strutils.mask_password(
+                "%s %s" % (cmd, " ".join(args)))
+        else:
+            sanitized_cmd = "***"
+        LOG.debug("Executing WSMAN command: %s", sanitized_cmd)
         std_out, std_err, exit_code = self._exec_command(
-            cmd, args, timeout=timeout)
+            cmd, args, timeout=timeout, sanitizable=sanitizable)
 
         if exit_code:
             raise exception.CoriolisException(
                 "Command \"%s\" failed with exit code: %s\n"
                 "stdout: %s\nstd_err: %s" %
-                (str([cmd] + args), exit_code, std_out, std_err))
+                (sanitized_cmd, exit_code, std_out, std_err))
 
         return std_out
 
     def exec_ps_command(self, cmd, ignore_stdout=False, timeout=None):
-        LOG.debug("Executing PS command: %s", cmd)
+        LOG.debug("Executing PS command: %s", strutils.mask_password(cmd))
         base64_cmd = base64.b64encode(cmd.encode('utf-16le')).decode()
         return self.exec_command(
             "powershell.exe",
@@ -157,7 +172,8 @@ class WSManConnection(object):
                 "-ExecutionPolicy",
                 "RemoteSigned",
             ],
-            timeout=timeout)[:-2]
+            timeout=timeout,
+            sanitizable=False)[:-2]
 
     def test_path(self, remote_path):
         ret_val = self.exec_ps_command("Test-Path -Path \"%s\"" % remote_path)