| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599 |
- # Copyright 2023 Cloudbase Solutions Srl
- # All Rights Reserved.
- import datetime
- import hashlib
- import json
- import logging
- import os
- import socket
- from unittest import mock
- import uuid
- import ddt
- from webob import exc
- from coriolis import constants
- from coriolis import exception
- from coriolis.tests import test_base
- from coriolis.tests import testutils
- from coriolis import utils
- class CoriolisTestException(Exception):
- pass
- @ddt.ddt
- class UtilsTestCase(test_base.CoriolisBaseTestCase):
- """Test suite for the Coriolis utils module."""
- def setUp(self):
- super(UtilsTestCase, self).setUp()
- self.mock_func = mock.Mock()
- self.mock_ssh = mock.Mock()
- self.mock_sftp = mock.Mock()
- self.mock_file = mock.Mock()
- self.mock_conn = mock.Mock()
- self.mock_stdout = mock.Mock()
- self.mock_process = mock.Mock()
- @mock.patch('oslo_log.log.setup')
- def test_setup_logging(self, mock_setup):
- utils.setup_logging()
- mock_setup.assert_called_once_with(utils.CONF, 'coriolis')
- @mock.patch.object(utils, 'get_exception_details')
- def test_ignore_exceptions(self, mock_get_details):
- mock_get_details.return_value = 'Test exception details'
- self.mock_func.side_effect = Exception
- with self.assertLogs('coriolis.utils', level=logging.WARN):
- utils.ignore_exceptions(self.mock_func)()
- mock_get_details.assert_called_once_with()
- def test_get_single_result_empty_list(self):
- self.assertRaises(KeyError, utils.get_single_result, [])
- def test_get_single_result_multiple_elements(self):
- self.assertRaises(KeyError, utils.get_single_result, [1, 2])
- def test_get_single_result_single_element(self):
- result = utils.get_single_result([1])
- self.assertEqual(result, 1)
- def test_retry_on_error_no_exception(self):
- result = utils.retry_on_error(
- max_attempts=5, sleep_seconds=0,
- terminal_exceptions=[])(self.mock_func)(
- mock.sentinel.arg1, kwarg1=mock.sentinel.kwarg1)
- self.assertEqual(result, self.mock_func.return_value)
- self.assertEqual(self.mock_func.call_count, 1)
- self.mock_func.assert_called_with(
- mock.sentinel.arg1, kwarg1=mock.sentinel.kwarg1)
- def test_retry_on_error_exception_keyboard_interrupt(self):
- self.mock_func.side_effect = KeyboardInterrupt
- self.assertRaises(KeyboardInterrupt, utils.retry_on_error(
- max_attempts=5, sleep_seconds=0,
- terminal_exceptions=[])(self.mock_func))
- self.assertEqual(self.mock_func.call_count, 1)
- def test_retry_on_error_not_in_list_of_retried(self):
- self.mock_func.side_effect = ValueError
- self.assertRaises(ValueError, utils.retry_on_error(
- max_attempts=5, sleep_seconds=0,
- terminal_exceptions=[],
- retried_exceptions=(CoriolisTestException, ))(self.mock_func))
- self.assertEqual(self.mock_func.call_count, 1)
- def test_retry_on_error_in_list_of_retried(self):
- self.mock_func.side_effect = CoriolisTestException
- self.assertRaises(CoriolisTestException, utils.retry_on_error(
- max_attempts=5, sleep_seconds=0,
- terminal_exceptions=[],
- retried_exceptions=(CoriolisTestException, ))(self.mock_func))
- self.assertEqual(self.mock_func.call_count, 5)
- def test_retry_on_error_terminal_exception(self):
- self.mock_func.side_effect = CoriolisTestException
- self.assertRaises(CoriolisTestException, utils.retry_on_error(
- max_attempts=5, sleep_seconds=0,
- terminal_exceptions=[CoriolisTestException])(self.mock_func))
- self.assertEqual(self.mock_func.call_count, 1)
- def test_retry_on_error_exception_retry_max_attempts(self):
- self.mock_func.side_effect = CoriolisTestException
- self.assertRaises(CoriolisTestException, utils.retry_on_error(
- max_attempts=5, sleep_seconds=0,
- terminal_exceptions=[])(self.mock_func))
- self.assertEqual(self.mock_func.call_count, 5)
- def test_get_udev_net_rules(self):
- net_ifaces_info = {"eth0": "AA:BB:CC:DD:EE:FF",
- "eth1": "FF:EE:DD:CC:BB:AA"}
- expected_result = (
- 'SUBSYSTEM=="net", ACTION=="add", DRIVERS=="?*", '
- 'ATTR{address}=="aa:bb:cc:dd:ee:ff", '
- 'NAME="eth0"\n'
- 'SUBSYSTEM=="net", ACTION=="add", DRIVERS=="?*", '
- 'ATTR{address}=="ff:ee:dd:cc:bb:aa", '
- 'NAME="eth1"\n'
- )
- result = utils.get_udev_net_rules(net_ifaces_info)
- self.assertEqual(result, expected_result)
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_parse_os_release(self, mock_ssh_cmd):
- mock_ssh_cmd.return_value = 'ID=ubuntu\nVERSION_ID="20.04"\n'
- result = utils.parse_os_release(self.mock_ssh)
- mock_ssh_cmd.assert_called_once_with(
- self.mock_ssh,
- "[ -f '/etc/os-release' ] && cat /etc/os-release || true")
- self.assertEqual(result, ('ubuntu', '20.04'))
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_parse_os_release_no_equal(self, mock_ssh_cmd):
- mock_ssh_cmd.return_value = 'ID=ubuntu\nVERSION_ID="20.04"\nNOEQUAL\n'
- result = utils.parse_os_release(self.mock_ssh)
- mock_ssh_cmd.assert_called_once_with(
- self.mock_ssh,
- "[ -f '/etc/os-release' ] && cat /etc/os-release || true")
- self.assertEqual(result, ("ubuntu", "20.04"))
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_parse_os_release_missing_fields(self, mock_ssh_cmd):
- mock_ssh_cmd.return_value = 'NO_ID\nNO_VERSION_ID\n'
- result = utils.parse_os_release(self.mock_ssh)
- mock_ssh_cmd.assert_called_once_with(
- self.mock_ssh,
- "[ -f '/etc/os-release' ] && cat /etc/os-release || true")
- self.assertIsNone(result)
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_parse_lsb_release(self, mock_ssh_cmd):
- mock_ssh_cmd.return_value = 'Distributor ID: Ubuntu\nRelease: 20.04\n'
- result = utils.parse_lsb_release(self.mock_ssh)
- mock_ssh_cmd.assert_called_once_with(
- self.mock_ssh, "lsb_release -a || true")
- self.assertEqual(result, ("Ubuntu", "20.04"))
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_parse_lsb_release_missing_fields(self, mock_ssh_cmd):
- mock_ssh_cmd.return_value = 'No Distributor ID\nNo Release\n'
- result = utils.parse_lsb_release(self.mock_ssh)
- mock_ssh_cmd.assert_called_once_with(
- self.mock_ssh,
- "lsb_release -a || true")
- self.assertIsNone(result)
- @mock.patch.object(utils, 'parse_os_release')
- def test_get_linux_os_info(self, mock_parse_os_release):
- result = utils.get_linux_os_info(self.mock_ssh)
- mock_parse_os_release.assert_called_once_with(self.mock_ssh)
- self.assertEqual(result, mock_parse_os_release.return_value)
- @mock.patch.object(utils, 'parse_os_release')
- @mock.patch.object(utils, 'parse_lsb_release')
- def test_get_linux_os_info_lsb_release(self, mock_parse_lsb_release,
- mock_parse_os_release):
- mock_parse_os_release.return_value = None
- mock_parse_lsb_release.return_value = ("ubuntu", "20.04")
- result = utils.get_linux_os_info(self.mock_ssh)
- mock_parse_os_release.assert_called_once_with(self.mock_ssh)
- mock_parse_lsb_release.assert_called_once_with(self.mock_ssh)
- self.assertEqual(result, ("ubuntu", "20.04"))
- def test_test_ssh_path_exists(self):
- self.mock_sftp.stat.return_value = None
- self.mock_ssh.open_sftp.return_value = self.mock_sftp
- result = utils.test_ssh_path(self.mock_ssh, "/test/file")
- self.mock_ssh.open_sftp.assert_called_once_with()
- self.mock_sftp.stat.assert_called_once_with("/test/file")
- self.assertEqual(result, True)
- def test_test_ssh_path_not_exists(self):
- self.mock_sftp.stat.side_effect = IOError(2,
- "No such file or directory")
- self.mock_ssh.open_sftp.return_value = self.mock_sftp
- result = utils.test_ssh_path(self.mock_ssh, "/nonexistent/path")
- self.mock_ssh.open_sftp.assert_called_once_with()
- self.mock_sftp.stat.assert_called_once_with("/nonexistent/path")
- self.assertEqual(result, False)
- def test_test_ssh_path_raises(self):
- self.mock_sftp.stat.side_effect = IOError(1, "Some other error")
- self.mock_ssh.open_sftp.return_value = self.mock_sftp
- original_test_ssh_path = testutils.get_wrapped_function(
- utils.test_ssh_path)
- self.assertRaises(IOError, original_test_ssh_path, self.mock_ssh,
- "/nonexistent/path")
- def test_read_ssh_file(self):
- self.mock_sftp.open.return_value = self.mock_file
- self.mock_ssh.open_sftp.return_value = self.mock_sftp
- result = utils.read_ssh_file(self.mock_ssh, "/test/file")
- self.mock_ssh.open_sftp.assert_called_once_with()
- self.mock_sftp.open.assert_called_once_with("/test/file", "rb")
- self.assertEqual(result, self.mock_file.read.return_value)
- def test_write_ssh_file(self):
- self.mock_sftp.open.return_value = self.mock_file
- self.mock_ssh.open_sftp.return_value = self.mock_sftp
- utils.write_ssh_file(self.mock_ssh, "/test/file", b"file content")
- self.mock_ssh.open_sftp.assert_called_once_with()
- self.mock_sftp.open.assert_called_once_with("/test/file", "wb")
- self.mock_file.write.assert_called_once_with(b"file content")
- @mock.patch('base64.b64encode')
- def test_write_winrm_file(self, mock_b64encode):
- self.mock_conn.test_path.return_value = True
- self.mock_conn.exec_ps_command.return_value = None
- utils.write_winrm_file(self.mock_conn, "/test/file", "file content",
- overwrite=True)
- self.mock_conn.test_path.assert_called_once_with("/test/file")
- self.assertEqual(self.mock_conn.exec_ps_command.call_count, 2)
- mock_b64encode.assert_called_once_with(b"file content")
- @mock.patch('base64.b64encode')
- def test_write_winrm_file_file_does_not_exist(self, mock_b64encode):
- self.mock_conn.test_path.return_value = False
- self.mock_conn.exec_ps_command.return_value = None
- utils.write_winrm_file(self.mock_conn, "/nonexistent/path",
- "nonexistent-file", overwrite=True)
- self.mock_conn.test_path.assert_called_once_with("/nonexistent/path")
- mock_b64encode.assert_called_once_with(b"nonexistent-file")
- def test_write_winrm_file_file_exists_overwrite_false(self):
- self.mock_conn.test_path.return_value = True
- self.mock_conn.exec_ps_command.return_value = None
- self.assertRaises(exception.CoriolisException, utils.write_winrm_file,
- self.mock_conn, "/test/file", "file content",
- overwrite=False)
- self.mock_conn.test_path.assert_called_once_with("/test/file")
- @mock.patch('base64.b64encode')
- def test_write_winrm_file_content_is_not_string(self, mock_b64encode):
- self.mock_conn.test_path.return_value = False
- self.mock_conn.exec_ps_command.return_value = None
- utils.write_winrm_file(self.mock_conn, "/test/file", "file content",
- overwrite=True)
- self.mock_conn.test_path.assert_called_once_with("/test/file")
- mock_b64encode.assert_called_once_with(b"file content")
- @mock.patch('base64.b64encode')
- def test_write_winrm_file_long_content(self, mock_b64encode):
- self.mock_conn.test_path.return_value = False
- self.mock_conn.exec_ps_command.return_value = None
- mock_b64encode.return_value = b'encoded_content'
- long_content = "a" * 3000
- utils.write_winrm_file(self.mock_conn, "/test/file", long_content,
- overwrite=True)
- self.mock_conn.test_path.assert_called_once_with("/test/file")
- self.assertEqual(self.mock_conn.exec_ps_command.call_count, 2)
- expected_calls = [mock.call(long_content[:2048].encode()),
- mock.call(long_content[2048:].encode())]
- mock_b64encode.assert_has_calls(expected_calls)
- def test_list_ssh_dir(self):
- self.mock_ssh.open_sftp.return_value = self.mock_sftp
- result = utils.list_ssh_dir(self.mock_ssh, "/test/file")
- self.assertEqual(result, self.mock_sftp.listdir.return_value)
- self.mock_ssh.open_sftp.assert_called_once_with()
- self.mock_sftp.listdir.assert_called_once_with("/test/file")
- def test_exec_ssh_cmd(self):
- self.mock_stdout.read.return_value = b'output\r\n'
- self.mock_stdout.channel.recv_exit_status.return_value = 0
- self.mock_ssh.exec_command.return_value = (
- None, self.mock_stdout, self.mock_stdout)
- result = utils.exec_ssh_cmd(self.mock_ssh, "command")
- self.mock_ssh.exec_command.assert_called_once_with(
- "command", environment=None, get_pty=False, timeout=None)
- self.assertEqual(result, 'output\n')
- def test_exec_ssh_cmd_timeout_with_timeout(self):
- self.mock_stdout.read.return_value = b'output\n'
- self.mock_stdout.channel.recv_exit_status.return_value = 0
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- result = utils.exec_ssh_cmd(self.mock_ssh, "command", timeout=10)
- self.mock_ssh.exec_command.assert_called_once_with(
- "command", environment=None, get_pty=False, timeout=10.0)
- expected = self.mock_stdout.read.return_value.decode(
- 'utf-8', errors='replace')
- self.assertEqual(result, expected)
- def test_exec_ssh_cmd_getpeername_value_error(self):
- self.mock_stdout.read.return_value = b'output\n'
- self.mock_stdout.channel.recv_exit_status.return_value = 0
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- self.mock_ssh.get_transport.return_value.sock.\
- getpeername.side_effect = ValueError
- with self.assertLogs('coriolis.utils', level=logging.WARN):
- output = utils.exec_ssh_cmd(self.mock_ssh, "command")
- self.mock_ssh.exec_command.assert_called_once_with(
- "command", environment=None, get_pty=False, timeout=None)
- expected = self.mock_stdout.read.return_value.decode(
- 'utf-8', errors='replace')
- self.assertEqual(output, expected)
- def test_exec_ssh_cmd_timeout(self):
- self.mock_stdout.read.side_effect = socket.timeout
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- self.assertRaises(exception.MinionMachineCommandTimeout,
- utils.exec_ssh_cmd, self.mock_ssh, "command")
- self.mock_ssh.exec_command.assert_called_once_with(
- "command", environment=None, get_pty=False, timeout=None)
- def test_exec_ssh_cmd_exception(self):
- self.mock_stdout.read.return_value = b'some error output'
- self.mock_stdout.channel.recv_exit_status.return_value = 1
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- self.assertRaises(exception.SSHCommandFailed, utils.exec_ssh_cmd,
- self.mock_ssh, "command")
- self.mock_ssh.exec_command.assert_called_once_with(
- "command", environment=None, get_pty=False, timeout=None)
- def test_exec_ssh_cmd_command_not_found_in_stdout(self):
- self.mock_stdout.read.return_value = b'sudo: foo: command not found'
- self.mock_stdout.channel.recv_exit_status.return_value = 1
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- self.assertRaises(exception.SSHCommandNotFoundException,
- utils.exec_ssh_cmd, self.mock_ssh, "command")
- def test_exec_ssh_cmd_exit_code_127(self):
- self.mock_stdout.read.return_value = b''
- self.mock_stdout.channel.recv_exit_status.return_value = 127
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- self.assertRaises(exception.SSHCommandNotFoundException,
- utils.exec_ssh_cmd, self.mock_ssh, "command")
- def test_exec_ssh_cmd_chroot(self):
- self.mock_stdout.read.return_value = b'output\n'
- self.mock_stdout.channel.recv_exit_status.return_value = 0
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- result = utils.exec_ssh_cmd_chroot(
- self.mock_ssh, "/chroot /bin/bash -c", "command")
- self.mock_ssh.exec_command.assert_called_once_with(
- "sudo -E chroot /chroot /bin/bash -c command",
- environment=None, get_pty=False, timeout=None)
- expected = self.mock_stdout.read.return_value.decode(
- 'utf-8', errors='replace')
- self.assertEqual(result, expected)
- def test_check_fs(self):
- self.mock_stdout.read.return_value.replace.return_value = \
- self.mock_stdout.read.return_value
- self.mock_stdout.channel.recv_exit_status.return_value = 0
- self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
- self.mock_stdout)
- utils.check_fs(self.mock_ssh, "ext4", "/dev/sda1")
- self.mock_ssh.exec_command.assert_called_once_with(
- "sudo fsck -p -t ext4 /dev/sda1", environment=None, get_pty=True,
- timeout=None)
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_check_fs_exception(self, mock_exec_ssh_cmd):
- mock_exec_ssh_cmd.side_effect = exception.CoriolisException()
- self.assertRaises(exception.CoriolisException, utils.check_fs,
- self.mock_ssh, "ext4", "/dev/sda1")
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, "sudo fsck -p -t ext4 /dev/sda1", get_pty=True)
- @mock.patch.object(utils, 'exec_ssh_cmd')
- def test_run_xfs_repair(self, mock_exec_ssh_cmd):
- mock_exec_ssh_cmd.return_value = "/tmp/tmp_dir\n"
- utils.run_xfs_repair(self.mock_ssh, "/dev/sda1")
- expected_calls = [
- mock.call(self.mock_ssh, "mktemp -d"),
- mock.call(self.mock_ssh, "sudo mount /dev/sda1 /tmp/tmp_dir",
- get_pty=True),
- mock.call(self.mock_ssh, "sudo umount /tmp/tmp_dir",
- get_pty=True),
- mock.call(self.mock_ssh, "sudo xfs_repair /dev/sda1",
- get_pty=True),
- ]
- mock_exec_ssh_cmd.assert_has_calls(expected_calls)
- @mock.patch("time.sleep")
- def test_run_xfs_repair_exception(self, mock_sleep):
- self.mock_ssh.exec_command.side_effect = Exception()
- with self.assertLogs('coriolis.utils', level=logging.WARN):
- utils.run_xfs_repair(self.mock_ssh, "/dev/sda1")
- @mock.patch('socket.socket')
- def test_check_port_open_success(self, mock_socket):
- mock_socket.return_value.connect.return_value = None
- result = utils._check_port_open('localhost', 8080)
- self.assertEqual(result, True)
- mock_socket.return_value.close.assert_called_once()
- @mock.patch('socket.socket')
- def test_check_port_open_exception(self, mock_socket):
- mock_socket.return_value.connect.side_effect = socket.error
- result = utils._check_port_open('localhost', 8080)
- self.assertEqual(result, False)
- mock_socket.return_value.close.assert_called_once()
- @mock.patch.object(utils, '_check_port_open')
- @mock.patch('time.sleep')
- def test_wait_for_port_connectivity(self, mock_sleep,
- mock_check_port_open):
- mock_check_port_open.return_value = True
- mock_sleep.return_value = None
- utils.wait_for_port_connectivity('localhost', 8080)
- mock_check_port_open.assert_called_with('localhost', 8080)
- @mock.patch.object(utils, '_check_port_open')
- @mock.patch('time.sleep')
- def test_wait_for_port_connectivity_exception(self, mock_sleep,
- mock_check_port_open):
- mock_check_port_open.return_value = False
- mock_sleep.return_value = None
- self.assertRaises(exception.CoriolisException,
- utils.wait_for_port_connectivity, 'localhost', 8080,
- max_wait=1)
- mock_check_port_open.assert_called_with('localhost', 8080)
- @mock.patch('subprocess.Popen')
- def test_exec_process(self, mock_popen):
- self.mock_process.returncode = 0
- self.mock_process.communicate.return_value = (b'stdout', b'stderr')
- mock_popen.return_value = self.mock_process
- result = utils.exec_process("command")
- mock_popen.assert_called_once_with("command", stdout=-1, stderr=-1)
- self.assertEqual(result, self.mock_process.communicate.return_value[0])
- @mock.patch('subprocess.Popen')
- def test_exec_process_exception(self, mock_popen):
- self.mock_process.returncode = 1
- self.mock_process.communicate.return_value = (b'stdout', b'stderr')
- mock_popen.return_value = self.mock_process
- self.assertRaises(exception.CoriolisException, utils.exec_process,
- "command")
- mock_popen.assert_called_once_with("command", stdout=-1, stderr=-1)
- @mock.patch.object(utils, 'exec_process')
- def test_get_disk_info(self, mock_exec_process):
- mock_exec_process.return_value = b'{"format": "vpc"}'
- result = utils.get_disk_info('disk_path')
- self.assertEqual(result, {'format': 'vhd'})
- mock_exec_process.assert_called_with([utils.CONF.qemu_img_path,
- 'info', '--output=json',
- 'disk_path'])
- @mock.patch.object(utils, 'exec_process')
- def test_convert_disk_format(self, mock_exec_process):
- mock_exec_process.return_value = None
- utils.convert_disk_format('disk_path', 'target_disk_path',
- constants.DISK_FORMAT_VHD,
- preallocated=True)
- mock_exec_process.assert_called_with([utils.CONF.qemu_img_path,
- 'convert', '-O', 'vpc', '-o',
- 'subformat=fixed',
- 'disk_path',
- 'target_disk_path'])
- def test_convert_disk_format_not_vhd(self):
- self.assertRaises(NotImplementedError, utils.convert_disk_format,
- 'disk_path', 'target_disk_path', 'not_vhd',
- preallocated=True)
- @mock.patch.object(utils, 'exec_process')
- @mock.patch.object(utils, 'ignore_exceptions')
- def test_convert_disk_format_exception(self, mock_ignore_exceptions,
- mock_exec_process):
- mock_remove = mock.MagicMock()
- mock_exec_process.side_effect = exception.CoriolisException
- mock_ignore_exceptions.return_value = mock_remove
- self.assertRaises(exception.CoriolisException,
- utils.convert_disk_format, 'disk_path',
- 'target_disk_path', constants.DISK_FORMAT_VHD,
- preallocated=True)
- mock_exec_process.assert_called_with(
- [utils.CONF.qemu_img_path, 'convert', '-O', 'vpc', '-o',
- 'subformat=fixed', 'disk_path', 'target_disk_path'])
- mock_ignore_exceptions.assert_called_with(os.remove)
- mock_remove.assert_called_with('target_disk_path')
- def test_walk_class_hierarchy(self):
- class A:
- pass
- class B(A):
- pass
- class C(B):
- pass
- class D(A):
- pass
- result = list(utils.walk_class_hierarchy(A))
- self.assertEqual(result, [C, B, D])
- def test_walk_class_hierarchy_no_subclasses(self):
- class A:
- pass
- result = list(utils.walk_class_hierarchy(A, encountered=None))
- self.assertEqual(result, [])
- @mock.patch('socket.socket')
- @mock.patch('ssl.SSLContext')
- @mock.patch('OpenSSL.crypto')
- def test_get_ssl_cert_thumbprint(self, mock_crypto, mock_ssl_context,
- mock_socket):
- mock_socket.return_value = mock.MagicMock()
- mock_ssl_context.return_value = mock.MagicMock()
- result = utils.get_ssl_cert_thumbprint(
- mock_ssl_context.return_value, 'localhost',
- digest_algorithm='sha1')
- self.assertEqual(result, mock_crypto.load_certificate.return_value.
- digest.return_value.decode.return_value)
- mock_crypto.load_certificate.return_value.digest.\
- assert_called_once_with('sha1')
- @mock.patch('os.path')
- def test_get_resources_dir(self, mock_path):
- mock_path.dirname.return_value = 'dirname'
- mock_path.abspath.return_value = 'dirname/abs_path'
- result = utils.get_resources_dir()
- self.assertEqual(result, mock_path.join.return_value)
- @mock.patch('os.path')
- def test_get_resources_bin_dir(self, mock_path):
- mock_path.dirname.return_value = 'dirname'
- mock_path.abspath.return_value = 'dirname/abs_path '
- result = utils.get_resources_bin_dir()
- self.assertEqual(result, mock_path.join.return_value)
- def test_serialize_key(self):
- mock_key = mock.MagicMock()
- mock_key.write_private_key.return_value = None
- utils.serialize_key(mock_key, password='password')
- args, _ = mock_key.write_private_key.call_args
- self.assertEqual(args[1], 'password')
- @mock.patch('io.StringIO')
- @mock.patch('paramiko.RSAKey')
- def test_deserialize_key(self, mock_rsa_key, mock_string_io):
- mock_string_io.return_value = mock.MagicMock()
- result = utils.deserialize_key('key', password='password')
- mock_rsa_key.from_private_key.assert_called_with(
- mock_string_io.return_value, 'password')
- self.assertEqual(result, mock_rsa_key.from_private_key.return_value)
- @mock.patch('coriolis.utils.jsonutils.dumps')
- @mock.patch('coriolis.utils.jsonutils.loads')
- def test_to_dict(self, mock_loads, mock_dumps):
- mock_dumps.return_value = 'json'
- obj = mock.MagicMock()
- result = utils.to_dict(obj)
- mock_dumps.assert_called_once_with(obj, default=mock.ANY)
- mock_loads.assert_called_once_with(mock_dumps.return_value)
- self.assertEqual(result, mock_loads.return_value)
- def test_to_dict_with_primitive(self):
- obj = datetime.datetime.now()
- result = utils.to_dict(obj)
- self.assertEqual(result, obj.isoformat())
- def test_load_class(self):
- class_path = 'json.JSONDecoder'
- result = utils.load_class(class_path)
- self.assertEqual(result, json.JSONDecoder)
- def test_check_md5_with_matching_hashes(self):
- data = b'test data'
- md5 = hashlib.md5(data).hexdigest()
- utils.check_md5(data, md5)
- def test_check_md5_with_exception(self):
- data = b'test data'
- md5 = hashlib.md5(b'other data').hexdigest()
- self.assertRaises(exception.CoriolisException, utils.check_md5, data,
- md5)
- @mock.patch.object(utils, 'secrets')
- def test_get_secret_connection_info_with_secret_ref(self, mock_secrets):
- with self.assertLogs('coriolis.utils', level=logging.INFO):
- result = utils.get_secret_connection_info('context',
- {'secret_ref': 'ref'})
- self.assertEqual(result, mock_secrets.get_secret.return_value)
- def test_get_secret_connection_info_with_no_secret_ref(self):
- result = utils.get_secret_connection_info('context', {})
- self.assertEqual(result, {})
- @ddt.data(123, '123')
- def test_parse_int_value_with_valid_input(self, value):
- result = utils.parse_int_value(value)
- self.assertEqual(result, 123)
- @ddt.data('invalid', '123.45', None)
- def test_parse_int_value_with_invalid_input(self, value):
- self.assertRaises(exception.InvalidInput, utils.parse_int_value, value)
- @ddt.data(
- ('SGVsbG8gd29ybGQ=', False, 'Hello world'),
- ('eyJ0ZXN0IjogInZhbHVlIn0=', True, {'test': 'value'}),
- )
- @ddt.unpack
- def test_decode_base64_param(self, value, is_json, expected):
- result = utils.decode_base64_param(value, is_json=is_json)
- self.assertEqual(result, expected)
- @ddt.data(
- ('invalid', False),
- ('invalid', True),
- ('SGVsbG8gd29ybGQ=', True),
- )
- @ddt.unpack
- def test_decode_base64_param_with_invalid_input(self, value, is_json):
- self.assertRaises(exception.InvalidInput, utils.decode_base64_param,
- value, is_json=is_json)
- def test_quote_url(self):
- result = utils.quote_url('Hello world')
- self.assertEqual(result, 'Hello%20world')
- @ddt.data(
- ('00-11-22-33-44-55', '00:11:22:33:44:55'),
- ('00:11:22:33:44:55', '00:11:22:33:44:55'),
- ('001122334455', '00:11:22:33:44:55'),
- ('00-11-22-AA-BB-CC', '00:11:22:aa:bb:cc'),
- )
- @ddt.unpack
- def test_normalize_mac_address(self, input, expected):
- result = utils.normalize_mac_address(input)
- self.assertEqual(result, expected)
- @ddt.data(
- 'invalid',
- '00112233445566',
- '00-11-22-33-44-GG',
- 123456789012,
- )
- def test_normalize_mac_address_with_invalid_input(self, input):
- self.assertRaises(ValueError, utils.normalize_mac_address,
- input)
- def test_get_url_with_credentials(self):
- url = 'http://example.com'
- username = 'user'
- password = 'pass'
- expected = 'http://user:pass@example.com'
- result = utils.get_url_with_credentials(url, username, password)
- self.assertEqual(result, expected)
- def test_get_url_with_credentials_existing_credentials(self):
- url = 'http://olduser:oldpass@example.com'
- username = 'newuser'
- password = 'newpass'
- expected = 'http://newuser:newpass@example.com'
- result = utils.get_url_with_credentials(url, username, password)
- self.assertEqual(result, expected)
- @ddt.data(
- (
- [
- {'id': '1', 'name': 'Resource1'},
- {'id': '2', 'name': 'Resource2'},
- {'id': '3', 'name': 'Resource1'}
- ],
- ['Resource2', '1', '3']
- ),
- (
- [
- {'id': '1', 'name': 'Resource1'},
- {'id': '2', 'name': 'Resource2'},
- {'id': '3', 'name': 'Resource3'}
- ],
- ['Resource1', 'Resource2', 'Resource3']
- ),
- (
- [
- {'id': '1', 'name': 'Resource1'},
- {'id': '2', 'name': 'Resource2'},
- {'id': '3'}
- ],
- KeyError
- )
- )
- @ddt.unpack
- def test_get_unique_option_ids(self, resources, expected):
- if isinstance(expected, list):
- result = utils.get_unique_option_ids(resources)
- self.assertEqual(sorted(result), sorted(expected))
- else:
- self.assertRaises(expected, utils.get_unique_option_ids, resources)
- def test_get_unique_option_ids_with_custom_keys(self):
- resources = [
- {'custom_id': '1', 'custom_name': 'Resource1'},
- {'custom_id': '2', 'custom_name': 'Resource2'},
- {'custom_id': '3', 'custom_name': 'Resource1'}
- ]
- expected_result = ['Resource2', '1', '3']
- result = utils.get_unique_option_ids(
- resources, id_key='custom_id', name_key='custom_name')
- self.assertEqual(sorted(result), sorted(expected_result))
- def test_bad_request_on_error(self):
- @utils.bad_request_on_error("An error occurred: %s")
- def mock_func():
- return (True, "Everything is fine")
- is_valid, message = mock_func()
- self.assertTrue(is_valid)
- self.assertEqual(message, "Everything is fine")
- def test_bad_request_on_error_httpBadRequest(self):
- @utils.bad_request_on_error("An error occurred: %s")
- def mock_func():
- return (False, "Something is wrong")
- self.assertRaises(exc.HTTPBadRequest, mock_func)
- @ddt.data(
- ({
- "key1": "value1",
- "origin": {"connection_info": "sensitive_info"},
- "destination": {"connection_info": "sensitive_info"},
- "volumes_info": [
- {
- "key2": "value2",
- "replica_state": {
- "key3": "value3",
- "chunks": "sensitive_info"
- }
- }
- ]
- }, {
- "key1": "value1",
- "origin": {"connection_info": {"got": "redacted"}},
- "destination": {"connection_info": {"got": "redacted"}},
- "volumes_info": [
- {
- "key2": "value2",
- "replica_state": {
- "key3": "value3",
- "chunks": ["<redacted>"]
- }
- }
- ]
- }),
- ({
- "key1": "value1",
- "key2": "value2",
- }, {
- "key1": "value1",
- "key2": "value2",
- }),
- )
- @ddt.unpack
- def test_sanitize_task_info(self, task_info, expected):
- result = utils.sanitize_task_info(task_info)
- self.assertEqual(result, expected)
- self.assertIsInstance(result, dict)
- def test_parse_ini_config(self):
- file_contents = 'key1 = value1\nkey2 = value2\nkey3 = value3'
- expected = {
- "key1": "value1",
- "key2": "value2",
- "key3": "value3",
- }
- result = utils.parse_ini_config(file_contents)
- self.assertEqual(result, expected)
- self.assertIsInstance(result, dict)
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch('coriolis.utils.read_ssh_file')
- @mock.patch('coriolis.utils.parse_ini_config')
- def test_read_ssh_ini_config_file_check_false(self, mock_parse_ini_config,
- mock_read_ssh_file,
- mock_test_ssh_path):
- mock_test_ssh_path.return_value = True
- result = utils.read_ssh_ini_config_file(self.mock_ssh, '/test/file',
- check_exists=False)
- self.assertEqual(result, mock_parse_ini_config.return_value)
- mock_read_ssh_file.assert_called_once_with(self.mock_ssh, '/test/file')
- mock_parse_ini_config.assert_called_once_with(
- mock_read_ssh_file.return_value.decode())
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_read_ssh_ini_config_file_check_true_path_not_exists(
- self, mock_test_ssh_path):
- mock_test_ssh_path.return_value = False
- result = utils.read_ssh_ini_config_file(self.mock_ssh, '/test/to/file',
- check_exists=True)
- self.assertEqual(result, {})
- mock_test_ssh_path.assert_called_once_with(self.mock_ssh,
- '/test/to/file')
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.write_ssh_file')
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch.object(uuid, 'uuid4')
- def test_write_systemd(self, mock_uuid, mock_test_ssh,
- mock_write_ssh_file, mock_exec_ssh_cmd):
- mock_uuid.return_value = 'uuid'
- mock_test_ssh.side_effect = [True, False]
- mock_write_ssh_file.return_value = None
- utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name')
- mock_uuid.assert_called_once_with()
- mock_test_ssh.assert_has_calls([
- mock.call(self.mock_ssh, '/lib/systemd/system'),
- mock.call(self.mock_ssh,
- '/lib/systemd/system/svc_name.service')])
- mock_write_ssh_file.assert_called_once_with(self.mock_ssh,
- '/tmp/uuid.service',
- mock.ANY)
- mock_exec_ssh_cmd.assert_has_calls([
- mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.service '
- '/lib/systemd/system/svc_name.service', get_pty=True),
- mock.call(self.mock_ssh, 'sudo restorecon -v '
- '/lib/systemd/system/svc_name.service', get_pty=True),
- mock.call(self.mock_ssh, 'sudo systemctl daemon-reload',
- get_pty=True),
- mock.call(self.mock_ssh, 'sudo systemctl start svc_name',
- get_pty=True)])
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.write_ssh_file')
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch.object(uuid, 'uuid4')
- def test_write_systemd_usr_lib(self, mock_uuid, mock_test_ssh,
- mock_write_ssh_file, mock_exec_ssh_cmd):
- mock_uuid.return_value = 'uuid'
- mock_test_ssh.side_effect = [False, False]
- mock_write_ssh_file.return_value = None
- utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name')
- mock_test_ssh.assert_has_calls([
- mock.call(self.mock_ssh, '/lib/systemd/system'),
- mock.call(self.mock_ssh,
- '/usr/lib/systemd/system/svc_name.service')])
- mock_exec_ssh_cmd.assert_has_calls([
- mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.service '
- '/usr/lib/systemd/system/svc_name.service',
- get_pty=True)])
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_write_systemd_service_exists(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.return_value = True
- utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name', start=True)
- mock_test_ssh.assert_has_calls([
- mock.call(self.mock_ssh, '/lib/systemd/system'),
- mock.call(self.mock_ssh,
- '/lib/systemd/system/svc_name.service')])
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'sudo systemctl start svc_name', get_pty=True)
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.write_ssh_file')
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch.object(uuid, 'uuid4')
- def test_write_systemd_service_selinux_exception(self, mock_uuid,
- mock_test_ssh,
- mock_write_ssh_file,
- mock_exec_ssh_cmd):
- mock_uuid.return_value = 'uuid'
- mock_test_ssh.side_effect = [True, False]
- mock_write_ssh_file.return_value = None
- mock_exec_ssh_cmd.side_effect = [
- None, exception.CoriolisException(), None, None]
- _write_systemd_undecorated = testutils.get_wrapped_function(
- utils._write_systemd)
- with self.assertLogs('coriolis.utils', level=logging.WARN):
- _write_systemd_undecorated(self.mock_ssh, '/test/file', 'svc_name',
- start=True)
- mock_uuid.assert_called_once_with()
- mock_test_ssh.assert_has_calls([
- mock.call(self.mock_ssh, '/lib/systemd/system'),
- mock.call(self.mock_ssh,
- '/lib/systemd/system/svc_name.service')])
- mock_write_ssh_file.assert_called_once_with(self.mock_ssh,
- '/tmp/uuid.service',
- mock.ANY)
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.write_ssh_file')
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch.object(uuid, 'uuid4')
- def test_test_write_systemd_with_run_as(self, mock_uuid, mock_test_ssh,
- mock_write_ssh_file,
- mock_exec_ssh_cmd):
- mock_uuid.return_value = 'uuid'
- mock_test_ssh.side_effect = [True, False]
- utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name',
- run_as='test_user')
- mock_uuid.assert_called_once_with()
- mock_test_ssh.assert_has_calls([
- mock.call(self.mock_ssh, '/lib/systemd/system'),
- mock.call(self.mock_ssh,
- '/lib/systemd/system/svc_name.service')])
- mock_write_ssh_file.assert_called_once_with(
- self.mock_ssh, '/tmp/uuid.service',
- utils.SYSTEMD_TEMPLATE % {
- "cmdline": 'cmdline',
- "username": 'test_user',
- "svc_name": 'svc_name'})
- mock_exec_ssh_cmd.assert_has_calls([
- mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.service '
- '/lib/systemd/system/svc_name.service', get_pty=True),
- mock.call(self.mock_ssh, 'sudo restorecon -v '
- '/lib/systemd/system/svc_name.service', get_pty=True),
- mock.call(self.mock_ssh, 'sudo systemctl daemon-reload',
- get_pty=True),
- mock.call(self.mock_ssh, 'sudo systemctl start svc_name',
- get_pty=True)])
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.write_ssh_file')
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch.object(uuid, 'uuid4')
- def test_write_upstart(self, mock_uuid, mock_test_ssh,
- mock_write_ssh_file, mock_exec_ssh_cmd):
- mock_uuid.return_value = 'uuid'
- mock_test_ssh.return_value = False
- mock_write_ssh_file.return_value = None
- utils._write_upstart(self.mock_ssh, 'cmdline', 'svc_name')
- mock_uuid.assert_called_once_with()
- mock_test_ssh.assert_called_once_with(
- self.mock_ssh, '/etc/init/svc_name.conf')
- mock_write_ssh_file.assert_called_once_with(self.mock_ssh,
- '/tmp/uuid.conf',
- mock.ANY)
- mock_exec_ssh_cmd.assert_has_calls([
- mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.conf '
- '/etc/init/svc_name.conf', get_pty=True),
- mock.call(self.mock_ssh, 'start svc_name')])
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_write_upstart_service_exists(self, mock_test_ssh):
- mock_test_ssh.return_value = True
- utils._write_upstart(self.mock_ssh, 'cmdline', 'svc_name')
- mock_test_ssh.assert_called_once_with(
- self.mock_ssh, '/etc/init/svc_name.conf')
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.write_ssh_file')
- @mock.patch('coriolis.utils.test_ssh_path')
- @mock.patch.object(uuid, 'uuid4')
- def test_write_upstart_with_run_as(self, mock_uuid, mock_test_ssh,
- mock_write_ssh_file,
- mock_exec_ssh_cmd):
- mock_uuid.return_value = 'uuid'
- mock_test_ssh.return_value = False
- utils._write_upstart(self.mock_ssh, 'cmdline', 'svc_name',
- run_as='test-user')
- mock_uuid.assert_called_once_with()
- mock_test_ssh.assert_called_once_with(
- self.mock_ssh, '/etc/init/svc_name.conf')
- mock_write_ssh_file.assert_called_once_with(
- self.mock_ssh, '/tmp/uuid.conf',
- utils.UPSTART_TEMPLATE % {
- "cmdline": 'sudo -u test-user -- cmdline',
- "svc_name": 'svc_name'})
- mock_exec_ssh_cmd.assert_has_calls([
- mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.conf '
- '/etc/init/svc_name.conf', get_pty=True),
- mock.call(self.mock_ssh, 'start svc_name')])
- @mock.patch('coriolis.utils._write_systemd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_create_service_systemd(self, mock_test_ssh, mock_write_systemd):
- mock_test_ssh.return_value = True
- utils.create_service(self.mock_ssh, 'cmdline', 'svc_name',
- run_as='user', start=True)
- mock_write_systemd.assert_called_once_with(self.mock_ssh, 'cmdline',
- 'svc_name', run_as='user',
- start=True)
- @mock.patch('coriolis.utils._write_upstart')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_create_service_upstart(self, mock_test_ssh, mock_write_upstart):
- mock_test_ssh.side_effect = [False, False, True]
- utils.create_service(self.mock_ssh, 'cmdline', 'svc_name',
- run_as='user', start=True)
- mock_write_upstart.assert_called_once_with(self.mock_ssh, 'cmdline',
- 'svc_name', run_as='user',
- start=True)
- @mock.patch('coriolis.utils._write_systemd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_create_service_exception(self, mock_test_ssh, mock_write_systemd):
- mock_test_ssh.return_value = False
- create_svc_undecorated = testutils.get_wrapped_function(
- utils.create_service)
- self.assertRaises(exception.CoriolisException, create_svc_undecorated,
- self.mock_ssh, 'cmdline', 'svc_name', run_as='user',
- start=True)
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_restart_service_with_systemd(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.return_value = True
- utils.restart_service(self.mock_ssh, 'svc_name')
- mock_test_ssh.assert_called_once_with(self.mock_ssh,
- '/lib/systemd/system')
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'sudo systemctl restart svc_name', get_pty=True)
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_restart_service_with_upstart(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.side_effect = [False, False, True]
- utils.restart_service(self.mock_ssh, 'svc_name')
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'restart svc_name')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_restart_service_exception(self, mock_test_ssh):
- mock_test_ssh.return_value = False
- restart_svc_undecorated = testutils.get_wrapped_function(
- utils.restart_service)
- self.assertRaises(exception.UnrecognizedWorkerInitSystem,
- restart_svc_undecorated, self.mock_ssh, 'svc_name')
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_start_service_with_systemd(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.return_value = True
- utils.start_service(self.mock_ssh, 'svc_name')
- mock_test_ssh.assert_called_once_with(self.mock_ssh,
- '/lib/systemd/system')
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'sudo systemctl start svc_name', get_pty=True)
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_start_service_with_upstart(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.side_effect = [False, False, True]
- utils.start_service(self.mock_ssh, 'svc_name')
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'start svc_name')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_start_service_exception(self, mock_test_ssh):
- mock_test_ssh.return_value = False
- start_svc_undecorated = testutils.get_wrapped_function(
- utils.start_service)
- self.assertRaises(exception.UnrecognizedWorkerInitSystem,
- start_svc_undecorated, self.mock_ssh, 'svc_name')
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_stop_service_with_systemd(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.return_value = True
- utils.stop_service(self.mock_ssh, 'svc_name')
- mock_test_ssh.assert_called_once_with(self.mock_ssh,
- '/lib/systemd/system')
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'sudo systemctl stop svc_name', get_pty=True)
- @mock.patch('coriolis.utils.exec_ssh_cmd')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_stop_service_with_upstart(self, mock_test_ssh,
- mock_exec_ssh_cmd):
- mock_test_ssh.side_effect = [False, False, True]
- utils.stop_service(self.mock_ssh, 'svc_name')
- mock_exec_ssh_cmd.assert_called_once_with(
- self.mock_ssh, 'stop svc_name')
- @mock.patch('coriolis.utils.test_ssh_path')
- def test_stop_service_exception(self, mock_test_ssh):
- mock_test_ssh.return_value = False
- stop_svc_undecorated = testutils.get_wrapped_function(
- utils.stop_service)
- self.assertRaises(exception.UnrecognizedWorkerInitSystem,
- stop_svc_undecorated, self.mock_ssh, 'svc_name')
- def _get_mock_conn_info(self):
- return {
- "ip": "1.2.3.4",
- "port": 2222,
- "username": "someUser",
- "password": "pwned",
- "pkey": mock.Mock(),
- }
- @mock.patch.object(utils, "connect_ssh")
- @mock.patch.object(utils, "exec_ssh_cmd")
- @mock.patch("time.sleep")
- def test_poll_instance_ssh(
- self,
- mock_sleep,
- mock_exec_ssh,
- mock_connect,
- ):
- mock_ssh_conn = mock.Mock()
- mock_connect.side_effect = [
- Exception,
- mock_ssh_conn,
- mock_ssh_conn,
- ]
- mock_exec_ssh.side_effect = [
- Exception,
- mock.sentinel.stdout,
- ]
- poll_interval = 5
- connection_info = self._get_mock_conn_info()
- utils.poll_instance_until_reachable(
- connection_info=connection_info,
- protocol=constants.PROTOCOL_SSH,
- timeout=30,
- poll_interval=poll_interval,
- )
- mock_connect.assert_has_calls(
- [
- mock.call(
- hostname=connection_info["ip"],
- port=connection_info["port"],
- username=connection_info["username"],
- password=connection_info["password"],
- pkey=connection_info["pkey"],
- )
- ] * 2)
- mock_exec_ssh.assert_has_calls(
- [mock.call(mock_ssh_conn, "exit 0")] * 2)
- mock_ssh_conn.close.assert_has_calls([mock.call()] * 2)
- mock_sleep.assert_has_calls([mock.call(poll_interval)] * 2)
- @mock.patch.object(utils, "connect_ssh")
- @mock.patch.object(utils, "exec_ssh_cmd")
- @mock.patch("time.sleep")
- @mock.patch("time.time")
- def test_poll_instance_ssh_timeout(
- self,
- mock_time,
- mock_sleep,
- mock_exec_ssh,
- mock_connect,
- ):
- poll_interval = 5
- mock_time.side_effect = [x * 10 for x in range(20)]
- mock_connect.side_effect = IOError
- connection_info = self._get_mock_conn_info()
- self.assertRaises(
- exception.CoriolisException,
- utils.poll_instance_until_reachable,
- connection_info=connection_info,
- protocol=constants.PROTOCOL_SSH,
- timeout=30,
- poll_interval=poll_interval,
- )
- @mock.patch("coriolis.wsman.WSManConnection", new_callable=mock.Mock)
- @mock.patch("time.sleep")
- def test_poll_instance_winrm(
- self,
- mock_sleep,
- mock_wsman,
- ):
- mock_conn = mock.Mock()
- mock_conn.exec_ps_command.side_effect = [
- Exception, mock.sentinel.stdout]
- mock_wsman.from_connection_info.return_value = mock_conn
- poll_interval = 5
- connection_info = self._get_mock_conn_info()
- utils.poll_instance_until_reachable(
- connection_info=connection_info,
- protocol=constants.PROTOCOL_WINRM,
- timeout=30,
- poll_interval=poll_interval,
- )
- mock_wsman.from_connection_info.assert_has_calls(
- [mock.call(connection_info)] * 2, any_order=True)
- mock_conn.exec_ps_command.assert_has_calls(
- [mock.call("whoami")] * 2)
- mock_sleep.assert_called_once_with(poll_interval)
- @mock.patch("coriolis.wsman.WSManConnection", new_callable=mock.Mock)
- @mock.patch("time.sleep")
- @mock.patch("time.time")
- def test_poll_instance_winrm_timeout(
- self,
- mock_time,
- mock_sleep,
- mock_wsman,
- ):
- poll_interval = 5
- mock_time.side_effect = [x * 10 for x in range(20)]
- mock_conn = mock.Mock()
- mock_conn.exec_ps_command.side_effect = IOError
- mock_wsman.from_connection_info.return_value = mock_conn
- connection_info = self._get_mock_conn_info()
- self.assertRaises(
- exception.CoriolisException,
- utils.poll_instance_until_reachable,
- connection_info=connection_info,
- protocol=constants.PROTOCOL_WINRM,
- timeout=30,
- poll_interval=poll_interval,
- )
- @ddt.ddt
- class Grub2ConfigEditorTestCase(test_base.CoriolisBaseTestCase):
- """Test suite for the Coriolis Grub2ConfigEditor class."""
- def setUp(self):
- super(Grub2ConfigEditorTestCase, self).setUp()
- self.cfg = "test configuration"
- self.parser = utils.Grub2ConfigEditor(self.cfg)
- def test__init__(self):
- result = utils.Grub2ConfigEditor(self.cfg)
- self.assertEqual(
- result._parsed, [{'type': 'raw', 'payload': self.cfg}])
- def test_parse_cfg_comment_line(self):
- self.cfg = '# This is a comment'
- result = self.parser._parse_cfg(self.cfg)
- self.assertEqual(result, [{'type': 'raw', 'payload': self.cfg}])
- def test_parse_cfg_option_line_with_quoted_value(self):
- self.cfg = 'option="value"'
- expected_result = [{'type': 'option', 'payload': self.cfg,
- 'quoted': True, 'option_name': 'option',
- 'option_value':
- [{'opt_type': 'single', 'opt_val': 'value'}]}]
- result = self.parser._parse_cfg(self.cfg)
- self.assertEqual(result, expected_result)
- def test_parse_cfg_option_line_without_value(self):
- self.cfg = 'option='
- expected_result = [{'type': 'option', 'payload': self.cfg,
- 'quoted': False, 'option_name': 'option',
- 'option_value':
- [{'opt_type': 'single', 'opt_val': ''}]}]
- result = self.parser._parse_cfg(self.cfg)
- self.assertEqual(result, expected_result)
- def test_parse_cfg_option_line_with_value(self):
- self.cfg = 'option=value'
- expected_result = [{'type': 'option', 'payload': self.cfg,
- 'quoted': False, 'option_name': 'option',
- 'option_value':
- [{'opt_type': 'single', 'opt_val': 'value'}]}]
- result = self.parser._parse_cfg(self.cfg)
- self.assertEqual(result, expected_result)
- def test_parse_cfg_option_line_with_multiple_values(self):
- self.cfg = 'option=value1 value2'
- expected_result = [{'type': 'option', 'payload': self.cfg,
- 'quoted': False, 'option_name': 'option',
- 'option_value':
- [{'opt_type': 'single', 'opt_val': 'value1'},
- {'opt_type': 'single', 'opt_val': 'value2'}]}]
- result = self.parser._parse_cfg(self.cfg)
- self.assertEqual(result, expected_result)
- def test_parse_cfg_option_line_with_key_value(self):
- self.cfg = 'option=key=value'
- expected_result = [{'type': 'option', 'payload': self.cfg,
- 'quoted': False, 'option_name': 'option',
- 'option_value':
- [{'opt_type': 'key_val', 'opt_val': 'value',
- 'opt_key': 'key'}]}]
- result = self.parser._parse_cfg(self.cfg)
- self.assertEqual(result, expected_result)
- @ddt.data(
- ("not a dict", ValueError),
- ({"opt_type": "invalid"}, ValueError),
- ({"opt_type": "key_val"}, ValueError),
- ({"opt_type": "single"}, ValueError),
- ({"unknown_opt_type"}, ValueError),
- ({"opt_type": "key_val", "opt_key": "key", "opt_val": "val"}, None),
- ({"opt_type": "single", "opt_val": "val"}, None),
- )
- @ddt.unpack
- def test_validate_value(self, value, expected):
- if expected:
- self.assertRaises(expected, self.parser._validate_value, value)
- else:
- self.parser._validate_value(value)
- def test_set_option_updates_existing_option(self):
- self.parser._parsed = [{"option_name": "existing_option",
- "option_value": ["old_value"]}]
- new_value = {"opt_type": "key_val", "opt_key": "key", "opt_val":
- "new_value"}
- expected_value = [{"option_name": "existing_option",
- "option_value": [new_value]}]
- self.parser.set_option("existing_option", new_value)
- self.assertEqual(self.parser._parsed, expected_value)
- def test_set_option_adds_new_option(self):
- self.parser._parsed = [{"option_name": "existing_option",
- "option_value": ["old_value"]}]
- new_value = {"opt_type": "key_val", "opt_key": "key",
- "opt_val": "new_value", "quoted": True, "type": "option"}
- expected_value = [{"option_name": "existing_option", "option_value":
- ["old_value"]}, {"option_name": "new_option",
- "option_value": [new_value],
- "quoted": True, "type": "option"}]
- self.parser.set_option("new_option", new_value)
- self.assertEqual(self.parser._parsed, expected_value)
- def test_append_to_option_updates_existing_key_val_option(self):
- self.parser._parsed = [{"option_name": "existing_option",
- "option_value": [{"opt_type": "key_val",
- "opt_key": "key",
- "opt_val": "old_value"}]}]
- new_value = {"opt_type": "key_val", "opt_key": "key",
- "opt_val": "new_value"}
- expected_value = [{"option_name": "existing_option", "option_value":
- [new_value]}]
- self.parser.append_to_option("existing_option", new_value)
- self.assertEqual(self.parser._parsed, expected_value)
- def test_append_to_option_ignores_existing_single_option(self):
- self.parser._parsed = [{"option_name": "existing_option",
- "option_value": [{"opt_type": "single",
- "opt_val": "old_value"}]}]
- new_value = {"opt_type": "single", "opt_val": "old_value"}
- expected_value = [{"option_name": "existing_option",
- "option_value": [new_value]}]
- self.parser.append_to_option("existing_option", new_value)
- self.assertEqual(self.parser._parsed, expected_value)
- def test_append_to_option_adds_new_single_option(self):
- self.parser._parsed = [{"option_name": "existing_option",
- "option_value": [{"opt_type": "single",
- "opt_val": "old_value"}]}]
- new_value = {"opt_type": "single", "opt_val": "new_value"}
- expected_value = [{
- "option_name": "existing_option", "option_value":
- [{"opt_type": "single", "opt_val": "old_value"}, new_value]}]
- self.parser.append_to_option("existing_option", new_value)
- self.assertEqual(self.parser._parsed, expected_value)
- def test_append_to_option_adds_new_option(self):
- self.parser._parsed = [{"option_name": "existing_option",
- "option_value": [{"opt_type": "single",
- "opt_val": "old_value"}]}]
- new_value = {"opt_type": "key_val", "opt_key": "key", "opt_val":
- "new_value"}
- expected_value = [{
- "option_name": "existing_option", "option_value":
- [{"opt_type": "single", "opt_val": "old_value"}]},
- {"option_name": "new_option", "option_value": [new_value],
- "quoted": True, "type": "option"}]
- self.parser.append_to_option("new_option", new_value)
- self.assertEqual(self.parser._parsed, expected_value)
- @ddt.data(
- (
- [{"type": "raw", "payload": "raw_data"}],
- "raw_data\n"
- ),
- (
- [{"type": "option", "option_name": "option1", "option_value":
- [{"opt_type": "single", "opt_val": "value1"}], "quoted": False}],
- "option1=value1\n"
- ),
- (
- [{"type": "option", "option_name": "option2", "option_value":
- [{"opt_type": "key_val", "opt_key": "key2",
- "opt_val": "value2"}],
- "quoted": True}], "option2=\"key2=value2\"\n"
- ),
- (
- [{"type": "option", "option_name": "option3", "option_value": [],
- "quoted": False}], "option3=\n"
- ),
- (
- [{"type": "option", "option_name": "option4", "option_value":
- [{"opt_type": "single", "opt_val": "value4_1"},
- {"opt_type": "single", "opt_val":
- "value4_2"}], "quoted": False}],
- "option4=\"value4_1 value4_2\"\n"
- ))
- @ddt.unpack
- def test_dump(self, parsed, expected_output):
- self.parser._parsed = parsed
- self.assertEqual(self.parser.dump(), expected_output)
|