test_utils.py 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599
  1. # Copyright 2023 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import datetime
  4. import hashlib
  5. import json
  6. import logging
  7. import os
  8. import socket
  9. from unittest import mock
  10. import uuid
  11. import ddt
  12. from webob import exc
  13. from coriolis import constants
  14. from coriolis import exception
  15. from coriolis.tests import test_base
  16. from coriolis.tests import testutils
  17. from coriolis import utils
  18. class CoriolisTestException(Exception):
  19. pass
  20. @ddt.ddt
  21. class UtilsTestCase(test_base.CoriolisBaseTestCase):
  22. """Test suite for the Coriolis utils module."""
  23. def setUp(self):
  24. super(UtilsTestCase, self).setUp()
  25. self.mock_func = mock.Mock()
  26. self.mock_ssh = mock.Mock()
  27. self.mock_sftp = mock.Mock()
  28. self.mock_file = mock.Mock()
  29. self.mock_conn = mock.Mock()
  30. self.mock_stdout = mock.Mock()
  31. self.mock_process = mock.Mock()
  32. @mock.patch('oslo_log.log.setup')
  33. def test_setup_logging(self, mock_setup):
  34. utils.setup_logging()
  35. mock_setup.assert_called_once_with(utils.CONF, 'coriolis')
  36. @mock.patch.object(utils, 'get_exception_details')
  37. def test_ignore_exceptions(self, mock_get_details):
  38. mock_get_details.return_value = 'Test exception details'
  39. self.mock_func.side_effect = Exception
  40. with self.assertLogs('coriolis.utils', level=logging.WARN):
  41. utils.ignore_exceptions(self.mock_func)()
  42. mock_get_details.assert_called_once_with()
  43. def test_get_single_result_empty_list(self):
  44. self.assertRaises(KeyError, utils.get_single_result, [])
  45. def test_get_single_result_multiple_elements(self):
  46. self.assertRaises(KeyError, utils.get_single_result, [1, 2])
  47. def test_get_single_result_single_element(self):
  48. result = utils.get_single_result([1])
  49. self.assertEqual(result, 1)
  50. def test_retry_on_error_no_exception(self):
  51. result = utils.retry_on_error(
  52. max_attempts=5, sleep_seconds=0,
  53. terminal_exceptions=[])(self.mock_func)(
  54. mock.sentinel.arg1, kwarg1=mock.sentinel.kwarg1)
  55. self.assertEqual(result, self.mock_func.return_value)
  56. self.assertEqual(self.mock_func.call_count, 1)
  57. self.mock_func.assert_called_with(
  58. mock.sentinel.arg1, kwarg1=mock.sentinel.kwarg1)
  59. def test_retry_on_error_exception_keyboard_interrupt(self):
  60. self.mock_func.side_effect = KeyboardInterrupt
  61. self.assertRaises(KeyboardInterrupt, utils.retry_on_error(
  62. max_attempts=5, sleep_seconds=0,
  63. terminal_exceptions=[])(self.mock_func))
  64. self.assertEqual(self.mock_func.call_count, 1)
  65. def test_retry_on_error_not_in_list_of_retried(self):
  66. self.mock_func.side_effect = ValueError
  67. self.assertRaises(ValueError, utils.retry_on_error(
  68. max_attempts=5, sleep_seconds=0,
  69. terminal_exceptions=[],
  70. retried_exceptions=(CoriolisTestException, ))(self.mock_func))
  71. self.assertEqual(self.mock_func.call_count, 1)
  72. def test_retry_on_error_in_list_of_retried(self):
  73. self.mock_func.side_effect = CoriolisTestException
  74. self.assertRaises(CoriolisTestException, utils.retry_on_error(
  75. max_attempts=5, sleep_seconds=0,
  76. terminal_exceptions=[],
  77. retried_exceptions=(CoriolisTestException, ))(self.mock_func))
  78. self.assertEqual(self.mock_func.call_count, 5)
  79. def test_retry_on_error_terminal_exception(self):
  80. self.mock_func.side_effect = CoriolisTestException
  81. self.assertRaises(CoriolisTestException, utils.retry_on_error(
  82. max_attempts=5, sleep_seconds=0,
  83. terminal_exceptions=[CoriolisTestException])(self.mock_func))
  84. self.assertEqual(self.mock_func.call_count, 1)
  85. def test_retry_on_error_exception_retry_max_attempts(self):
  86. self.mock_func.side_effect = CoriolisTestException
  87. self.assertRaises(CoriolisTestException, utils.retry_on_error(
  88. max_attempts=5, sleep_seconds=0,
  89. terminal_exceptions=[])(self.mock_func))
  90. self.assertEqual(self.mock_func.call_count, 5)
  91. def test_get_udev_net_rules(self):
  92. net_ifaces_info = {"eth0": "AA:BB:CC:DD:EE:FF",
  93. "eth1": "FF:EE:DD:CC:BB:AA"}
  94. expected_result = (
  95. 'SUBSYSTEM=="net", ACTION=="add", DRIVERS=="?*", '
  96. 'ATTR{address}=="aa:bb:cc:dd:ee:ff", '
  97. 'NAME="eth0"\n'
  98. 'SUBSYSTEM=="net", ACTION=="add", DRIVERS=="?*", '
  99. 'ATTR{address}=="ff:ee:dd:cc:bb:aa", '
  100. 'NAME="eth1"\n'
  101. )
  102. result = utils.get_udev_net_rules(net_ifaces_info)
  103. self.assertEqual(result, expected_result)
  104. @mock.patch.object(utils, 'exec_ssh_cmd')
  105. def test_parse_os_release(self, mock_ssh_cmd):
  106. mock_ssh_cmd.return_value = 'ID=ubuntu\nVERSION_ID="20.04"\n'
  107. result = utils.parse_os_release(self.mock_ssh)
  108. mock_ssh_cmd.assert_called_once_with(
  109. self.mock_ssh,
  110. "[ -f '/etc/os-release' ] && cat /etc/os-release || true")
  111. self.assertEqual(result, ('ubuntu', '20.04'))
  112. @mock.patch.object(utils, 'exec_ssh_cmd')
  113. def test_parse_os_release_no_equal(self, mock_ssh_cmd):
  114. mock_ssh_cmd.return_value = 'ID=ubuntu\nVERSION_ID="20.04"\nNOEQUAL\n'
  115. result = utils.parse_os_release(self.mock_ssh)
  116. mock_ssh_cmd.assert_called_once_with(
  117. self.mock_ssh,
  118. "[ -f '/etc/os-release' ] && cat /etc/os-release || true")
  119. self.assertEqual(result, ("ubuntu", "20.04"))
  120. @mock.patch.object(utils, 'exec_ssh_cmd')
  121. def test_parse_os_release_missing_fields(self, mock_ssh_cmd):
  122. mock_ssh_cmd.return_value = 'NO_ID\nNO_VERSION_ID\n'
  123. result = utils.parse_os_release(self.mock_ssh)
  124. mock_ssh_cmd.assert_called_once_with(
  125. self.mock_ssh,
  126. "[ -f '/etc/os-release' ] && cat /etc/os-release || true")
  127. self.assertIsNone(result)
  128. @mock.patch.object(utils, 'exec_ssh_cmd')
  129. def test_parse_lsb_release(self, mock_ssh_cmd):
  130. mock_ssh_cmd.return_value = 'Distributor ID: Ubuntu\nRelease: 20.04\n'
  131. result = utils.parse_lsb_release(self.mock_ssh)
  132. mock_ssh_cmd.assert_called_once_with(
  133. self.mock_ssh, "lsb_release -a || true")
  134. self.assertEqual(result, ("Ubuntu", "20.04"))
  135. @mock.patch.object(utils, 'exec_ssh_cmd')
  136. def test_parse_lsb_release_missing_fields(self, mock_ssh_cmd):
  137. mock_ssh_cmd.return_value = 'No Distributor ID\nNo Release\n'
  138. result = utils.parse_lsb_release(self.mock_ssh)
  139. mock_ssh_cmd.assert_called_once_with(
  140. self.mock_ssh,
  141. "lsb_release -a || true")
  142. self.assertIsNone(result)
  143. @mock.patch.object(utils, 'parse_os_release')
  144. def test_get_linux_os_info(self, mock_parse_os_release):
  145. result = utils.get_linux_os_info(self.mock_ssh)
  146. mock_parse_os_release.assert_called_once_with(self.mock_ssh)
  147. self.assertEqual(result, mock_parse_os_release.return_value)
  148. @mock.patch.object(utils, 'parse_os_release')
  149. @mock.patch.object(utils, 'parse_lsb_release')
  150. def test_get_linux_os_info_lsb_release(self, mock_parse_lsb_release,
  151. mock_parse_os_release):
  152. mock_parse_os_release.return_value = None
  153. mock_parse_lsb_release.return_value = ("ubuntu", "20.04")
  154. result = utils.get_linux_os_info(self.mock_ssh)
  155. mock_parse_os_release.assert_called_once_with(self.mock_ssh)
  156. mock_parse_lsb_release.assert_called_once_with(self.mock_ssh)
  157. self.assertEqual(result, ("ubuntu", "20.04"))
  158. def test_test_ssh_path_exists(self):
  159. self.mock_sftp.stat.return_value = None
  160. self.mock_ssh.open_sftp.return_value = self.mock_sftp
  161. result = utils.test_ssh_path(self.mock_ssh, "/test/file")
  162. self.mock_ssh.open_sftp.assert_called_once_with()
  163. self.mock_sftp.stat.assert_called_once_with("/test/file")
  164. self.assertEqual(result, True)
  165. def test_test_ssh_path_not_exists(self):
  166. self.mock_sftp.stat.side_effect = IOError(2,
  167. "No such file or directory")
  168. self.mock_ssh.open_sftp.return_value = self.mock_sftp
  169. result = utils.test_ssh_path(self.mock_ssh, "/nonexistent/path")
  170. self.mock_ssh.open_sftp.assert_called_once_with()
  171. self.mock_sftp.stat.assert_called_once_with("/nonexistent/path")
  172. self.assertEqual(result, False)
  173. def test_test_ssh_path_raises(self):
  174. self.mock_sftp.stat.side_effect = IOError(1, "Some other error")
  175. self.mock_ssh.open_sftp.return_value = self.mock_sftp
  176. original_test_ssh_path = testutils.get_wrapped_function(
  177. utils.test_ssh_path)
  178. self.assertRaises(IOError, original_test_ssh_path, self.mock_ssh,
  179. "/nonexistent/path")
  180. def test_read_ssh_file(self):
  181. self.mock_sftp.open.return_value = self.mock_file
  182. self.mock_ssh.open_sftp.return_value = self.mock_sftp
  183. result = utils.read_ssh_file(self.mock_ssh, "/test/file")
  184. self.mock_ssh.open_sftp.assert_called_once_with()
  185. self.mock_sftp.open.assert_called_once_with("/test/file", "rb")
  186. self.assertEqual(result, self.mock_file.read.return_value)
  187. def test_write_ssh_file(self):
  188. self.mock_sftp.open.return_value = self.mock_file
  189. self.mock_ssh.open_sftp.return_value = self.mock_sftp
  190. utils.write_ssh_file(self.mock_ssh, "/test/file", b"file content")
  191. self.mock_ssh.open_sftp.assert_called_once_with()
  192. self.mock_sftp.open.assert_called_once_with("/test/file", "wb")
  193. self.mock_file.write.assert_called_once_with(b"file content")
  194. @mock.patch('base64.b64encode')
  195. def test_write_winrm_file(self, mock_b64encode):
  196. self.mock_conn.test_path.return_value = True
  197. self.mock_conn.exec_ps_command.return_value = None
  198. utils.write_winrm_file(self.mock_conn, "/test/file", "file content",
  199. overwrite=True)
  200. self.mock_conn.test_path.assert_called_once_with("/test/file")
  201. self.assertEqual(self.mock_conn.exec_ps_command.call_count, 2)
  202. mock_b64encode.assert_called_once_with(b"file content")
  203. @mock.patch('base64.b64encode')
  204. def test_write_winrm_file_file_does_not_exist(self, mock_b64encode):
  205. self.mock_conn.test_path.return_value = False
  206. self.mock_conn.exec_ps_command.return_value = None
  207. utils.write_winrm_file(self.mock_conn, "/nonexistent/path",
  208. "nonexistent-file", overwrite=True)
  209. self.mock_conn.test_path.assert_called_once_with("/nonexistent/path")
  210. mock_b64encode.assert_called_once_with(b"nonexistent-file")
  211. def test_write_winrm_file_file_exists_overwrite_false(self):
  212. self.mock_conn.test_path.return_value = True
  213. self.mock_conn.exec_ps_command.return_value = None
  214. self.assertRaises(exception.CoriolisException, utils.write_winrm_file,
  215. self.mock_conn, "/test/file", "file content",
  216. overwrite=False)
  217. self.mock_conn.test_path.assert_called_once_with("/test/file")
  218. @mock.patch('base64.b64encode')
  219. def test_write_winrm_file_content_is_not_string(self, mock_b64encode):
  220. self.mock_conn.test_path.return_value = False
  221. self.mock_conn.exec_ps_command.return_value = None
  222. utils.write_winrm_file(self.mock_conn, "/test/file", "file content",
  223. overwrite=True)
  224. self.mock_conn.test_path.assert_called_once_with("/test/file")
  225. mock_b64encode.assert_called_once_with(b"file content")
  226. @mock.patch('base64.b64encode')
  227. def test_write_winrm_file_long_content(self, mock_b64encode):
  228. self.mock_conn.test_path.return_value = False
  229. self.mock_conn.exec_ps_command.return_value = None
  230. mock_b64encode.return_value = b'encoded_content'
  231. long_content = "a" * 3000
  232. utils.write_winrm_file(self.mock_conn, "/test/file", long_content,
  233. overwrite=True)
  234. self.mock_conn.test_path.assert_called_once_with("/test/file")
  235. self.assertEqual(self.mock_conn.exec_ps_command.call_count, 2)
  236. expected_calls = [mock.call(long_content[:2048].encode()),
  237. mock.call(long_content[2048:].encode())]
  238. mock_b64encode.assert_has_calls(expected_calls)
  239. def test_list_ssh_dir(self):
  240. self.mock_ssh.open_sftp.return_value = self.mock_sftp
  241. result = utils.list_ssh_dir(self.mock_ssh, "/test/file")
  242. self.assertEqual(result, self.mock_sftp.listdir.return_value)
  243. self.mock_ssh.open_sftp.assert_called_once_with()
  244. self.mock_sftp.listdir.assert_called_once_with("/test/file")
  245. def test_exec_ssh_cmd(self):
  246. self.mock_stdout.read.return_value = b'output\r\n'
  247. self.mock_stdout.channel.recv_exit_status.return_value = 0
  248. self.mock_ssh.exec_command.return_value = (
  249. None, self.mock_stdout, self.mock_stdout)
  250. result = utils.exec_ssh_cmd(self.mock_ssh, "command")
  251. self.mock_ssh.exec_command.assert_called_once_with(
  252. "command", environment=None, get_pty=False, timeout=None)
  253. self.assertEqual(result, 'output\n')
  254. def test_exec_ssh_cmd_timeout_with_timeout(self):
  255. self.mock_stdout.read.return_value = b'output\n'
  256. self.mock_stdout.channel.recv_exit_status.return_value = 0
  257. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  258. self.mock_stdout)
  259. result = utils.exec_ssh_cmd(self.mock_ssh, "command", timeout=10)
  260. self.mock_ssh.exec_command.assert_called_once_with(
  261. "command", environment=None, get_pty=False, timeout=10.0)
  262. expected = self.mock_stdout.read.return_value.decode(
  263. 'utf-8', errors='replace')
  264. self.assertEqual(result, expected)
  265. def test_exec_ssh_cmd_getpeername_value_error(self):
  266. self.mock_stdout.read.return_value = b'output\n'
  267. self.mock_stdout.channel.recv_exit_status.return_value = 0
  268. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  269. self.mock_stdout)
  270. self.mock_ssh.get_transport.return_value.sock.\
  271. getpeername.side_effect = ValueError
  272. with self.assertLogs('coriolis.utils', level=logging.WARN):
  273. output = utils.exec_ssh_cmd(self.mock_ssh, "command")
  274. self.mock_ssh.exec_command.assert_called_once_with(
  275. "command", environment=None, get_pty=False, timeout=None)
  276. expected = self.mock_stdout.read.return_value.decode(
  277. 'utf-8', errors='replace')
  278. self.assertEqual(output, expected)
  279. def test_exec_ssh_cmd_timeout(self):
  280. self.mock_stdout.read.side_effect = socket.timeout
  281. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  282. self.mock_stdout)
  283. self.assertRaises(exception.MinionMachineCommandTimeout,
  284. utils.exec_ssh_cmd, self.mock_ssh, "command")
  285. self.mock_ssh.exec_command.assert_called_once_with(
  286. "command", environment=None, get_pty=False, timeout=None)
  287. def test_exec_ssh_cmd_exception(self):
  288. self.mock_stdout.read.return_value = b'some error output'
  289. self.mock_stdout.channel.recv_exit_status.return_value = 1
  290. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  291. self.mock_stdout)
  292. self.assertRaises(exception.SSHCommandFailed, utils.exec_ssh_cmd,
  293. self.mock_ssh, "command")
  294. self.mock_ssh.exec_command.assert_called_once_with(
  295. "command", environment=None, get_pty=False, timeout=None)
  296. def test_exec_ssh_cmd_command_not_found_in_stdout(self):
  297. self.mock_stdout.read.return_value = b'sudo: foo: command not found'
  298. self.mock_stdout.channel.recv_exit_status.return_value = 1
  299. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  300. self.mock_stdout)
  301. self.assertRaises(exception.SSHCommandNotFoundException,
  302. utils.exec_ssh_cmd, self.mock_ssh, "command")
  303. def test_exec_ssh_cmd_exit_code_127(self):
  304. self.mock_stdout.read.return_value = b''
  305. self.mock_stdout.channel.recv_exit_status.return_value = 127
  306. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  307. self.mock_stdout)
  308. self.assertRaises(exception.SSHCommandNotFoundException,
  309. utils.exec_ssh_cmd, self.mock_ssh, "command")
  310. def test_exec_ssh_cmd_chroot(self):
  311. self.mock_stdout.read.return_value = b'output\n'
  312. self.mock_stdout.channel.recv_exit_status.return_value = 0
  313. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  314. self.mock_stdout)
  315. result = utils.exec_ssh_cmd_chroot(
  316. self.mock_ssh, "/chroot /bin/bash -c", "command")
  317. self.mock_ssh.exec_command.assert_called_once_with(
  318. "sudo -E chroot /chroot /bin/bash -c command",
  319. environment=None, get_pty=False, timeout=None)
  320. expected = self.mock_stdout.read.return_value.decode(
  321. 'utf-8', errors='replace')
  322. self.assertEqual(result, expected)
  323. def test_check_fs(self):
  324. self.mock_stdout.read.return_value.replace.return_value = \
  325. self.mock_stdout.read.return_value
  326. self.mock_stdout.channel.recv_exit_status.return_value = 0
  327. self.mock_ssh.exec_command.return_value = (None, self.mock_stdout,
  328. self.mock_stdout)
  329. utils.check_fs(self.mock_ssh, "ext4", "/dev/sda1")
  330. self.mock_ssh.exec_command.assert_called_once_with(
  331. "sudo fsck -p -t ext4 /dev/sda1", environment=None, get_pty=True,
  332. timeout=None)
  333. @mock.patch.object(utils, 'exec_ssh_cmd')
  334. def test_check_fs_exception(self, mock_exec_ssh_cmd):
  335. mock_exec_ssh_cmd.side_effect = exception.CoriolisException()
  336. self.assertRaises(exception.CoriolisException, utils.check_fs,
  337. self.mock_ssh, "ext4", "/dev/sda1")
  338. mock_exec_ssh_cmd.assert_called_once_with(
  339. self.mock_ssh, "sudo fsck -p -t ext4 /dev/sda1", get_pty=True)
  340. @mock.patch.object(utils, 'exec_ssh_cmd')
  341. def test_run_xfs_repair(self, mock_exec_ssh_cmd):
  342. mock_exec_ssh_cmd.return_value = "/tmp/tmp_dir\n"
  343. utils.run_xfs_repair(self.mock_ssh, "/dev/sda1")
  344. expected_calls = [
  345. mock.call(self.mock_ssh, "mktemp -d"),
  346. mock.call(self.mock_ssh, "sudo mount /dev/sda1 /tmp/tmp_dir",
  347. get_pty=True),
  348. mock.call(self.mock_ssh, "sudo umount /tmp/tmp_dir",
  349. get_pty=True),
  350. mock.call(self.mock_ssh, "sudo xfs_repair /dev/sda1",
  351. get_pty=True),
  352. ]
  353. mock_exec_ssh_cmd.assert_has_calls(expected_calls)
  354. @mock.patch("time.sleep")
  355. def test_run_xfs_repair_exception(self, mock_sleep):
  356. self.mock_ssh.exec_command.side_effect = Exception()
  357. with self.assertLogs('coriolis.utils', level=logging.WARN):
  358. utils.run_xfs_repair(self.mock_ssh, "/dev/sda1")
  359. @mock.patch('socket.socket')
  360. def test_check_port_open_success(self, mock_socket):
  361. mock_socket.return_value.connect.return_value = None
  362. result = utils._check_port_open('localhost', 8080)
  363. self.assertEqual(result, True)
  364. mock_socket.return_value.close.assert_called_once()
  365. @mock.patch('socket.socket')
  366. def test_check_port_open_exception(self, mock_socket):
  367. mock_socket.return_value.connect.side_effect = socket.error
  368. result = utils._check_port_open('localhost', 8080)
  369. self.assertEqual(result, False)
  370. mock_socket.return_value.close.assert_called_once()
  371. @mock.patch.object(utils, '_check_port_open')
  372. @mock.patch('time.sleep')
  373. def test_wait_for_port_connectivity(self, mock_sleep,
  374. mock_check_port_open):
  375. mock_check_port_open.return_value = True
  376. mock_sleep.return_value = None
  377. utils.wait_for_port_connectivity('localhost', 8080)
  378. mock_check_port_open.assert_called_with('localhost', 8080)
  379. @mock.patch.object(utils, '_check_port_open')
  380. @mock.patch('time.sleep')
  381. def test_wait_for_port_connectivity_exception(self, mock_sleep,
  382. mock_check_port_open):
  383. mock_check_port_open.return_value = False
  384. mock_sleep.return_value = None
  385. self.assertRaises(exception.CoriolisException,
  386. utils.wait_for_port_connectivity, 'localhost', 8080,
  387. max_wait=1)
  388. mock_check_port_open.assert_called_with('localhost', 8080)
  389. @mock.patch('subprocess.Popen')
  390. def test_exec_process(self, mock_popen):
  391. self.mock_process.returncode = 0
  392. self.mock_process.communicate.return_value = (b'stdout', b'stderr')
  393. mock_popen.return_value = self.mock_process
  394. result = utils.exec_process("command")
  395. mock_popen.assert_called_once_with("command", stdout=-1, stderr=-1)
  396. self.assertEqual(result, self.mock_process.communicate.return_value[0])
  397. @mock.patch('subprocess.Popen')
  398. def test_exec_process_exception(self, mock_popen):
  399. self.mock_process.returncode = 1
  400. self.mock_process.communicate.return_value = (b'stdout', b'stderr')
  401. mock_popen.return_value = self.mock_process
  402. self.assertRaises(exception.CoriolisException, utils.exec_process,
  403. "command")
  404. mock_popen.assert_called_once_with("command", stdout=-1, stderr=-1)
  405. @mock.patch.object(utils, 'exec_process')
  406. def test_get_disk_info(self, mock_exec_process):
  407. mock_exec_process.return_value = b'{"format": "vpc"}'
  408. result = utils.get_disk_info('disk_path')
  409. self.assertEqual(result, {'format': 'vhd'})
  410. mock_exec_process.assert_called_with([utils.CONF.qemu_img_path,
  411. 'info', '--output=json',
  412. 'disk_path'])
  413. @mock.patch.object(utils, 'exec_process')
  414. def test_convert_disk_format(self, mock_exec_process):
  415. mock_exec_process.return_value = None
  416. utils.convert_disk_format('disk_path', 'target_disk_path',
  417. constants.DISK_FORMAT_VHD,
  418. preallocated=True)
  419. mock_exec_process.assert_called_with([utils.CONF.qemu_img_path,
  420. 'convert', '-O', 'vpc', '-o',
  421. 'subformat=fixed',
  422. 'disk_path',
  423. 'target_disk_path'])
  424. def test_convert_disk_format_not_vhd(self):
  425. self.assertRaises(NotImplementedError, utils.convert_disk_format,
  426. 'disk_path', 'target_disk_path', 'not_vhd',
  427. preallocated=True)
  428. @mock.patch.object(utils, 'exec_process')
  429. @mock.patch.object(utils, 'ignore_exceptions')
  430. def test_convert_disk_format_exception(self, mock_ignore_exceptions,
  431. mock_exec_process):
  432. mock_remove = mock.MagicMock()
  433. mock_exec_process.side_effect = exception.CoriolisException
  434. mock_ignore_exceptions.return_value = mock_remove
  435. self.assertRaises(exception.CoriolisException,
  436. utils.convert_disk_format, 'disk_path',
  437. 'target_disk_path', constants.DISK_FORMAT_VHD,
  438. preallocated=True)
  439. mock_exec_process.assert_called_with(
  440. [utils.CONF.qemu_img_path, 'convert', '-O', 'vpc', '-o',
  441. 'subformat=fixed', 'disk_path', 'target_disk_path'])
  442. mock_ignore_exceptions.assert_called_with(os.remove)
  443. mock_remove.assert_called_with('target_disk_path')
  444. def test_walk_class_hierarchy(self):
  445. class A:
  446. pass
  447. class B(A):
  448. pass
  449. class C(B):
  450. pass
  451. class D(A):
  452. pass
  453. result = list(utils.walk_class_hierarchy(A))
  454. self.assertEqual(result, [C, B, D])
  455. def test_walk_class_hierarchy_no_subclasses(self):
  456. class A:
  457. pass
  458. result = list(utils.walk_class_hierarchy(A, encountered=None))
  459. self.assertEqual(result, [])
  460. @mock.patch('socket.socket')
  461. @mock.patch('ssl.SSLContext')
  462. @mock.patch('OpenSSL.crypto')
  463. def test_get_ssl_cert_thumbprint(self, mock_crypto, mock_ssl_context,
  464. mock_socket):
  465. mock_socket.return_value = mock.MagicMock()
  466. mock_ssl_context.return_value = mock.MagicMock()
  467. result = utils.get_ssl_cert_thumbprint(
  468. mock_ssl_context.return_value, 'localhost',
  469. digest_algorithm='sha1')
  470. self.assertEqual(result, mock_crypto.load_certificate.return_value.
  471. digest.return_value.decode.return_value)
  472. mock_crypto.load_certificate.return_value.digest.\
  473. assert_called_once_with('sha1')
  474. @mock.patch('os.path')
  475. def test_get_resources_dir(self, mock_path):
  476. mock_path.dirname.return_value = 'dirname'
  477. mock_path.abspath.return_value = 'dirname/abs_path'
  478. result = utils.get_resources_dir()
  479. self.assertEqual(result, mock_path.join.return_value)
  480. @mock.patch('os.path')
  481. def test_get_resources_bin_dir(self, mock_path):
  482. mock_path.dirname.return_value = 'dirname'
  483. mock_path.abspath.return_value = 'dirname/abs_path '
  484. result = utils.get_resources_bin_dir()
  485. self.assertEqual(result, mock_path.join.return_value)
  486. def test_serialize_key(self):
  487. mock_key = mock.MagicMock()
  488. mock_key.write_private_key.return_value = None
  489. utils.serialize_key(mock_key, password='password')
  490. args, _ = mock_key.write_private_key.call_args
  491. self.assertEqual(args[1], 'password')
  492. @mock.patch('io.StringIO')
  493. @mock.patch('paramiko.RSAKey')
  494. def test_deserialize_key(self, mock_rsa_key, mock_string_io):
  495. mock_string_io.return_value = mock.MagicMock()
  496. result = utils.deserialize_key('key', password='password')
  497. mock_rsa_key.from_private_key.assert_called_with(
  498. mock_string_io.return_value, 'password')
  499. self.assertEqual(result, mock_rsa_key.from_private_key.return_value)
  500. @mock.patch('coriolis.utils.jsonutils.dumps')
  501. @mock.patch('coriolis.utils.jsonutils.loads')
  502. def test_to_dict(self, mock_loads, mock_dumps):
  503. mock_dumps.return_value = 'json'
  504. obj = mock.MagicMock()
  505. result = utils.to_dict(obj)
  506. mock_dumps.assert_called_once_with(obj, default=mock.ANY)
  507. mock_loads.assert_called_once_with(mock_dumps.return_value)
  508. self.assertEqual(result, mock_loads.return_value)
  509. def test_to_dict_with_primitive(self):
  510. obj = datetime.datetime.now()
  511. result = utils.to_dict(obj)
  512. self.assertEqual(result, obj.isoformat())
  513. def test_load_class(self):
  514. class_path = 'json.JSONDecoder'
  515. result = utils.load_class(class_path)
  516. self.assertEqual(result, json.JSONDecoder)
  517. def test_check_md5_with_matching_hashes(self):
  518. data = b'test data'
  519. md5 = hashlib.md5(data).hexdigest()
  520. utils.check_md5(data, md5)
  521. def test_check_md5_with_exception(self):
  522. data = b'test data'
  523. md5 = hashlib.md5(b'other data').hexdigest()
  524. self.assertRaises(exception.CoriolisException, utils.check_md5, data,
  525. md5)
  526. @mock.patch.object(utils, 'secrets')
  527. def test_get_secret_connection_info_with_secret_ref(self, mock_secrets):
  528. with self.assertLogs('coriolis.utils', level=logging.INFO):
  529. result = utils.get_secret_connection_info('context',
  530. {'secret_ref': 'ref'})
  531. self.assertEqual(result, mock_secrets.get_secret.return_value)
  532. def test_get_secret_connection_info_with_no_secret_ref(self):
  533. result = utils.get_secret_connection_info('context', {})
  534. self.assertEqual(result, {})
  535. @ddt.data(123, '123')
  536. def test_parse_int_value_with_valid_input(self, value):
  537. result = utils.parse_int_value(value)
  538. self.assertEqual(result, 123)
  539. @ddt.data('invalid', '123.45', None)
  540. def test_parse_int_value_with_invalid_input(self, value):
  541. self.assertRaises(exception.InvalidInput, utils.parse_int_value, value)
  542. @ddt.data(
  543. ('SGVsbG8gd29ybGQ=', False, 'Hello world'),
  544. ('eyJ0ZXN0IjogInZhbHVlIn0=', True, {'test': 'value'}),
  545. )
  546. @ddt.unpack
  547. def test_decode_base64_param(self, value, is_json, expected):
  548. result = utils.decode_base64_param(value, is_json=is_json)
  549. self.assertEqual(result, expected)
  550. @ddt.data(
  551. ('invalid', False),
  552. ('invalid', True),
  553. ('SGVsbG8gd29ybGQ=', True),
  554. )
  555. @ddt.unpack
  556. def test_decode_base64_param_with_invalid_input(self, value, is_json):
  557. self.assertRaises(exception.InvalidInput, utils.decode_base64_param,
  558. value, is_json=is_json)
  559. def test_quote_url(self):
  560. result = utils.quote_url('Hello world')
  561. self.assertEqual(result, 'Hello%20world')
  562. @ddt.data(
  563. ('00-11-22-33-44-55', '00:11:22:33:44:55'),
  564. ('00:11:22:33:44:55', '00:11:22:33:44:55'),
  565. ('001122334455', '00:11:22:33:44:55'),
  566. ('00-11-22-AA-BB-CC', '00:11:22:aa:bb:cc'),
  567. )
  568. @ddt.unpack
  569. def test_normalize_mac_address(self, input, expected):
  570. result = utils.normalize_mac_address(input)
  571. self.assertEqual(result, expected)
  572. @ddt.data(
  573. 'invalid',
  574. '00112233445566',
  575. '00-11-22-33-44-GG',
  576. 123456789012,
  577. )
  578. def test_normalize_mac_address_with_invalid_input(self, input):
  579. self.assertRaises(ValueError, utils.normalize_mac_address,
  580. input)
  581. def test_get_url_with_credentials(self):
  582. url = 'http://example.com'
  583. username = 'user'
  584. password = 'pass'
  585. expected = 'http://user:pass@example.com'
  586. result = utils.get_url_with_credentials(url, username, password)
  587. self.assertEqual(result, expected)
  588. def test_get_url_with_credentials_existing_credentials(self):
  589. url = 'http://olduser:oldpass@example.com'
  590. username = 'newuser'
  591. password = 'newpass'
  592. expected = 'http://newuser:newpass@example.com'
  593. result = utils.get_url_with_credentials(url, username, password)
  594. self.assertEqual(result, expected)
  595. @ddt.data(
  596. (
  597. [
  598. {'id': '1', 'name': 'Resource1'},
  599. {'id': '2', 'name': 'Resource2'},
  600. {'id': '3', 'name': 'Resource1'}
  601. ],
  602. ['Resource2', '1', '3']
  603. ),
  604. (
  605. [
  606. {'id': '1', 'name': 'Resource1'},
  607. {'id': '2', 'name': 'Resource2'},
  608. {'id': '3', 'name': 'Resource3'}
  609. ],
  610. ['Resource1', 'Resource2', 'Resource3']
  611. ),
  612. (
  613. [
  614. {'id': '1', 'name': 'Resource1'},
  615. {'id': '2', 'name': 'Resource2'},
  616. {'id': '3'}
  617. ],
  618. KeyError
  619. )
  620. )
  621. @ddt.unpack
  622. def test_get_unique_option_ids(self, resources, expected):
  623. if isinstance(expected, list):
  624. result = utils.get_unique_option_ids(resources)
  625. self.assertEqual(sorted(result), sorted(expected))
  626. else:
  627. self.assertRaises(expected, utils.get_unique_option_ids, resources)
  628. def test_get_unique_option_ids_with_custom_keys(self):
  629. resources = [
  630. {'custom_id': '1', 'custom_name': 'Resource1'},
  631. {'custom_id': '2', 'custom_name': 'Resource2'},
  632. {'custom_id': '3', 'custom_name': 'Resource1'}
  633. ]
  634. expected_result = ['Resource2', '1', '3']
  635. result = utils.get_unique_option_ids(
  636. resources, id_key='custom_id', name_key='custom_name')
  637. self.assertEqual(sorted(result), sorted(expected_result))
  638. def test_bad_request_on_error(self):
  639. @utils.bad_request_on_error("An error occurred: %s")
  640. def mock_func():
  641. return (True, "Everything is fine")
  642. is_valid, message = mock_func()
  643. self.assertTrue(is_valid)
  644. self.assertEqual(message, "Everything is fine")
  645. def test_bad_request_on_error_httpBadRequest(self):
  646. @utils.bad_request_on_error("An error occurred: %s")
  647. def mock_func():
  648. return (False, "Something is wrong")
  649. self.assertRaises(exc.HTTPBadRequest, mock_func)
  650. @ddt.data(
  651. ({
  652. "key1": "value1",
  653. "origin": {"connection_info": "sensitive_info"},
  654. "destination": {"connection_info": "sensitive_info"},
  655. "volumes_info": [
  656. {
  657. "key2": "value2",
  658. "replica_state": {
  659. "key3": "value3",
  660. "chunks": "sensitive_info"
  661. }
  662. }
  663. ]
  664. }, {
  665. "key1": "value1",
  666. "origin": {"connection_info": {"got": "redacted"}},
  667. "destination": {"connection_info": {"got": "redacted"}},
  668. "volumes_info": [
  669. {
  670. "key2": "value2",
  671. "replica_state": {
  672. "key3": "value3",
  673. "chunks": ["<redacted>"]
  674. }
  675. }
  676. ]
  677. }),
  678. ({
  679. "key1": "value1",
  680. "key2": "value2",
  681. }, {
  682. "key1": "value1",
  683. "key2": "value2",
  684. }),
  685. )
  686. @ddt.unpack
  687. def test_sanitize_task_info(self, task_info, expected):
  688. result = utils.sanitize_task_info(task_info)
  689. self.assertEqual(result, expected)
  690. self.assertIsInstance(result, dict)
  691. def test_parse_ini_config(self):
  692. file_contents = 'key1 = value1\nkey2 = value2\nkey3 = value3'
  693. expected = {
  694. "key1": "value1",
  695. "key2": "value2",
  696. "key3": "value3",
  697. }
  698. result = utils.parse_ini_config(file_contents)
  699. self.assertEqual(result, expected)
  700. self.assertIsInstance(result, dict)
  701. @mock.patch('coriolis.utils.test_ssh_path')
  702. @mock.patch('coriolis.utils.read_ssh_file')
  703. @mock.patch('coriolis.utils.parse_ini_config')
  704. def test_read_ssh_ini_config_file_check_false(self, mock_parse_ini_config,
  705. mock_read_ssh_file,
  706. mock_test_ssh_path):
  707. mock_test_ssh_path.return_value = True
  708. result = utils.read_ssh_ini_config_file(self.mock_ssh, '/test/file',
  709. check_exists=False)
  710. self.assertEqual(result, mock_parse_ini_config.return_value)
  711. mock_read_ssh_file.assert_called_once_with(self.mock_ssh, '/test/file')
  712. mock_parse_ini_config.assert_called_once_with(
  713. mock_read_ssh_file.return_value.decode())
  714. @mock.patch('coriolis.utils.test_ssh_path')
  715. def test_read_ssh_ini_config_file_check_true_path_not_exists(
  716. self, mock_test_ssh_path):
  717. mock_test_ssh_path.return_value = False
  718. result = utils.read_ssh_ini_config_file(self.mock_ssh, '/test/to/file',
  719. check_exists=True)
  720. self.assertEqual(result, {})
  721. mock_test_ssh_path.assert_called_once_with(self.mock_ssh,
  722. '/test/to/file')
  723. @mock.patch('coriolis.utils.exec_ssh_cmd')
  724. @mock.patch('coriolis.utils.write_ssh_file')
  725. @mock.patch('coriolis.utils.test_ssh_path')
  726. @mock.patch.object(uuid, 'uuid4')
  727. def test_write_systemd(self, mock_uuid, mock_test_ssh,
  728. mock_write_ssh_file, mock_exec_ssh_cmd):
  729. mock_uuid.return_value = 'uuid'
  730. mock_test_ssh.side_effect = [True, False]
  731. mock_write_ssh_file.return_value = None
  732. utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name')
  733. mock_uuid.assert_called_once_with()
  734. mock_test_ssh.assert_has_calls([
  735. mock.call(self.mock_ssh, '/lib/systemd/system'),
  736. mock.call(self.mock_ssh,
  737. '/lib/systemd/system/svc_name.service')])
  738. mock_write_ssh_file.assert_called_once_with(self.mock_ssh,
  739. '/tmp/uuid.service',
  740. mock.ANY)
  741. mock_exec_ssh_cmd.assert_has_calls([
  742. mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.service '
  743. '/lib/systemd/system/svc_name.service', get_pty=True),
  744. mock.call(self.mock_ssh, 'sudo restorecon -v '
  745. '/lib/systemd/system/svc_name.service', get_pty=True),
  746. mock.call(self.mock_ssh, 'sudo systemctl daemon-reload',
  747. get_pty=True),
  748. mock.call(self.mock_ssh, 'sudo systemctl start svc_name',
  749. get_pty=True)])
  750. @mock.patch('coriolis.utils.exec_ssh_cmd')
  751. @mock.patch('coriolis.utils.write_ssh_file')
  752. @mock.patch('coriolis.utils.test_ssh_path')
  753. @mock.patch.object(uuid, 'uuid4')
  754. def test_write_systemd_usr_lib(self, mock_uuid, mock_test_ssh,
  755. mock_write_ssh_file, mock_exec_ssh_cmd):
  756. mock_uuid.return_value = 'uuid'
  757. mock_test_ssh.side_effect = [False, False]
  758. mock_write_ssh_file.return_value = None
  759. utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name')
  760. mock_test_ssh.assert_has_calls([
  761. mock.call(self.mock_ssh, '/lib/systemd/system'),
  762. mock.call(self.mock_ssh,
  763. '/usr/lib/systemd/system/svc_name.service')])
  764. mock_exec_ssh_cmd.assert_has_calls([
  765. mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.service '
  766. '/usr/lib/systemd/system/svc_name.service',
  767. get_pty=True)])
  768. @mock.patch('coriolis.utils.exec_ssh_cmd')
  769. @mock.patch('coriolis.utils.test_ssh_path')
  770. def test_write_systemd_service_exists(self, mock_test_ssh,
  771. mock_exec_ssh_cmd):
  772. mock_test_ssh.return_value = True
  773. utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name', start=True)
  774. mock_test_ssh.assert_has_calls([
  775. mock.call(self.mock_ssh, '/lib/systemd/system'),
  776. mock.call(self.mock_ssh,
  777. '/lib/systemd/system/svc_name.service')])
  778. mock_exec_ssh_cmd.assert_called_once_with(
  779. self.mock_ssh, 'sudo systemctl start svc_name', get_pty=True)
  780. @mock.patch('coriolis.utils.exec_ssh_cmd')
  781. @mock.patch('coriolis.utils.write_ssh_file')
  782. @mock.patch('coriolis.utils.test_ssh_path')
  783. @mock.patch.object(uuid, 'uuid4')
  784. def test_write_systemd_service_selinux_exception(self, mock_uuid,
  785. mock_test_ssh,
  786. mock_write_ssh_file,
  787. mock_exec_ssh_cmd):
  788. mock_uuid.return_value = 'uuid'
  789. mock_test_ssh.side_effect = [True, False]
  790. mock_write_ssh_file.return_value = None
  791. mock_exec_ssh_cmd.side_effect = [
  792. None, exception.CoriolisException(), None, None]
  793. _write_systemd_undecorated = testutils.get_wrapped_function(
  794. utils._write_systemd)
  795. with self.assertLogs('coriolis.utils', level=logging.WARN):
  796. _write_systemd_undecorated(self.mock_ssh, '/test/file', 'svc_name',
  797. start=True)
  798. mock_uuid.assert_called_once_with()
  799. mock_test_ssh.assert_has_calls([
  800. mock.call(self.mock_ssh, '/lib/systemd/system'),
  801. mock.call(self.mock_ssh,
  802. '/lib/systemd/system/svc_name.service')])
  803. mock_write_ssh_file.assert_called_once_with(self.mock_ssh,
  804. '/tmp/uuid.service',
  805. mock.ANY)
  806. @mock.patch('coriolis.utils.exec_ssh_cmd')
  807. @mock.patch('coriolis.utils.write_ssh_file')
  808. @mock.patch('coriolis.utils.test_ssh_path')
  809. @mock.patch.object(uuid, 'uuid4')
  810. def test_test_write_systemd_with_run_as(self, mock_uuid, mock_test_ssh,
  811. mock_write_ssh_file,
  812. mock_exec_ssh_cmd):
  813. mock_uuid.return_value = 'uuid'
  814. mock_test_ssh.side_effect = [True, False]
  815. utils._write_systemd(self.mock_ssh, 'cmdline', 'svc_name',
  816. run_as='test_user')
  817. mock_uuid.assert_called_once_with()
  818. mock_test_ssh.assert_has_calls([
  819. mock.call(self.mock_ssh, '/lib/systemd/system'),
  820. mock.call(self.mock_ssh,
  821. '/lib/systemd/system/svc_name.service')])
  822. mock_write_ssh_file.assert_called_once_with(
  823. self.mock_ssh, '/tmp/uuid.service',
  824. utils.SYSTEMD_TEMPLATE % {
  825. "cmdline": 'cmdline',
  826. "username": 'test_user',
  827. "svc_name": 'svc_name'})
  828. mock_exec_ssh_cmd.assert_has_calls([
  829. mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.service '
  830. '/lib/systemd/system/svc_name.service', get_pty=True),
  831. mock.call(self.mock_ssh, 'sudo restorecon -v '
  832. '/lib/systemd/system/svc_name.service', get_pty=True),
  833. mock.call(self.mock_ssh, 'sudo systemctl daemon-reload',
  834. get_pty=True),
  835. mock.call(self.mock_ssh, 'sudo systemctl start svc_name',
  836. get_pty=True)])
  837. @mock.patch('coriolis.utils.exec_ssh_cmd')
  838. @mock.patch('coriolis.utils.write_ssh_file')
  839. @mock.patch('coriolis.utils.test_ssh_path')
  840. @mock.patch.object(uuid, 'uuid4')
  841. def test_write_upstart(self, mock_uuid, mock_test_ssh,
  842. mock_write_ssh_file, mock_exec_ssh_cmd):
  843. mock_uuid.return_value = 'uuid'
  844. mock_test_ssh.return_value = False
  845. mock_write_ssh_file.return_value = None
  846. utils._write_upstart(self.mock_ssh, 'cmdline', 'svc_name')
  847. mock_uuid.assert_called_once_with()
  848. mock_test_ssh.assert_called_once_with(
  849. self.mock_ssh, '/etc/init/svc_name.conf')
  850. mock_write_ssh_file.assert_called_once_with(self.mock_ssh,
  851. '/tmp/uuid.conf',
  852. mock.ANY)
  853. mock_exec_ssh_cmd.assert_has_calls([
  854. mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.conf '
  855. '/etc/init/svc_name.conf', get_pty=True),
  856. mock.call(self.mock_ssh, 'start svc_name')])
  857. @mock.patch('coriolis.utils.test_ssh_path')
  858. def test_write_upstart_service_exists(self, mock_test_ssh):
  859. mock_test_ssh.return_value = True
  860. utils._write_upstart(self.mock_ssh, 'cmdline', 'svc_name')
  861. mock_test_ssh.assert_called_once_with(
  862. self.mock_ssh, '/etc/init/svc_name.conf')
  863. @mock.patch('coriolis.utils.exec_ssh_cmd')
  864. @mock.patch('coriolis.utils.write_ssh_file')
  865. @mock.patch('coriolis.utils.test_ssh_path')
  866. @mock.patch.object(uuid, 'uuid4')
  867. def test_write_upstart_with_run_as(self, mock_uuid, mock_test_ssh,
  868. mock_write_ssh_file,
  869. mock_exec_ssh_cmd):
  870. mock_uuid.return_value = 'uuid'
  871. mock_test_ssh.return_value = False
  872. utils._write_upstart(self.mock_ssh, 'cmdline', 'svc_name',
  873. run_as='test-user')
  874. mock_uuid.assert_called_once_with()
  875. mock_test_ssh.assert_called_once_with(
  876. self.mock_ssh, '/etc/init/svc_name.conf')
  877. mock_write_ssh_file.assert_called_once_with(
  878. self.mock_ssh, '/tmp/uuid.conf',
  879. utils.UPSTART_TEMPLATE % {
  880. "cmdline": 'sudo -u test-user -- cmdline',
  881. "svc_name": 'svc_name'})
  882. mock_exec_ssh_cmd.assert_has_calls([
  883. mock.call(self.mock_ssh, 'sudo mv /tmp/uuid.conf '
  884. '/etc/init/svc_name.conf', get_pty=True),
  885. mock.call(self.mock_ssh, 'start svc_name')])
  886. @mock.patch('coriolis.utils._write_systemd')
  887. @mock.patch('coriolis.utils.test_ssh_path')
  888. def test_create_service_systemd(self, mock_test_ssh, mock_write_systemd):
  889. mock_test_ssh.return_value = True
  890. utils.create_service(self.mock_ssh, 'cmdline', 'svc_name',
  891. run_as='user', start=True)
  892. mock_write_systemd.assert_called_once_with(self.mock_ssh, 'cmdline',
  893. 'svc_name', run_as='user',
  894. start=True)
  895. @mock.patch('coriolis.utils._write_upstart')
  896. @mock.patch('coriolis.utils.test_ssh_path')
  897. def test_create_service_upstart(self, mock_test_ssh, mock_write_upstart):
  898. mock_test_ssh.side_effect = [False, False, True]
  899. utils.create_service(self.mock_ssh, 'cmdline', 'svc_name',
  900. run_as='user', start=True)
  901. mock_write_upstart.assert_called_once_with(self.mock_ssh, 'cmdline',
  902. 'svc_name', run_as='user',
  903. start=True)
  904. @mock.patch('coriolis.utils._write_systemd')
  905. @mock.patch('coriolis.utils.test_ssh_path')
  906. def test_create_service_exception(self, mock_test_ssh, mock_write_systemd):
  907. mock_test_ssh.return_value = False
  908. create_svc_undecorated = testutils.get_wrapped_function(
  909. utils.create_service)
  910. self.assertRaises(exception.CoriolisException, create_svc_undecorated,
  911. self.mock_ssh, 'cmdline', 'svc_name', run_as='user',
  912. start=True)
  913. @mock.patch('coriolis.utils.exec_ssh_cmd')
  914. @mock.patch('coriolis.utils.test_ssh_path')
  915. def test_restart_service_with_systemd(self, mock_test_ssh,
  916. mock_exec_ssh_cmd):
  917. mock_test_ssh.return_value = True
  918. utils.restart_service(self.mock_ssh, 'svc_name')
  919. mock_test_ssh.assert_called_once_with(self.mock_ssh,
  920. '/lib/systemd/system')
  921. mock_exec_ssh_cmd.assert_called_once_with(
  922. self.mock_ssh, 'sudo systemctl restart svc_name', get_pty=True)
  923. @mock.patch('coriolis.utils.exec_ssh_cmd')
  924. @mock.patch('coriolis.utils.test_ssh_path')
  925. def test_restart_service_with_upstart(self, mock_test_ssh,
  926. mock_exec_ssh_cmd):
  927. mock_test_ssh.side_effect = [False, False, True]
  928. utils.restart_service(self.mock_ssh, 'svc_name')
  929. mock_exec_ssh_cmd.assert_called_once_with(
  930. self.mock_ssh, 'restart svc_name')
  931. @mock.patch('coriolis.utils.test_ssh_path')
  932. def test_restart_service_exception(self, mock_test_ssh):
  933. mock_test_ssh.return_value = False
  934. restart_svc_undecorated = testutils.get_wrapped_function(
  935. utils.restart_service)
  936. self.assertRaises(exception.UnrecognizedWorkerInitSystem,
  937. restart_svc_undecorated, self.mock_ssh, 'svc_name')
  938. @mock.patch('coriolis.utils.exec_ssh_cmd')
  939. @mock.patch('coriolis.utils.test_ssh_path')
  940. def test_start_service_with_systemd(self, mock_test_ssh,
  941. mock_exec_ssh_cmd):
  942. mock_test_ssh.return_value = True
  943. utils.start_service(self.mock_ssh, 'svc_name')
  944. mock_test_ssh.assert_called_once_with(self.mock_ssh,
  945. '/lib/systemd/system')
  946. mock_exec_ssh_cmd.assert_called_once_with(
  947. self.mock_ssh, 'sudo systemctl start svc_name', get_pty=True)
  948. @mock.patch('coriolis.utils.exec_ssh_cmd')
  949. @mock.patch('coriolis.utils.test_ssh_path')
  950. def test_start_service_with_upstart(self, mock_test_ssh,
  951. mock_exec_ssh_cmd):
  952. mock_test_ssh.side_effect = [False, False, True]
  953. utils.start_service(self.mock_ssh, 'svc_name')
  954. mock_exec_ssh_cmd.assert_called_once_with(
  955. self.mock_ssh, 'start svc_name')
  956. @mock.patch('coriolis.utils.test_ssh_path')
  957. def test_start_service_exception(self, mock_test_ssh):
  958. mock_test_ssh.return_value = False
  959. start_svc_undecorated = testutils.get_wrapped_function(
  960. utils.start_service)
  961. self.assertRaises(exception.UnrecognizedWorkerInitSystem,
  962. start_svc_undecorated, self.mock_ssh, 'svc_name')
  963. @mock.patch('coriolis.utils.exec_ssh_cmd')
  964. @mock.patch('coriolis.utils.test_ssh_path')
  965. def test_stop_service_with_systemd(self, mock_test_ssh,
  966. mock_exec_ssh_cmd):
  967. mock_test_ssh.return_value = True
  968. utils.stop_service(self.mock_ssh, 'svc_name')
  969. mock_test_ssh.assert_called_once_with(self.mock_ssh,
  970. '/lib/systemd/system')
  971. mock_exec_ssh_cmd.assert_called_once_with(
  972. self.mock_ssh, 'sudo systemctl stop svc_name', get_pty=True)
  973. @mock.patch('coriolis.utils.exec_ssh_cmd')
  974. @mock.patch('coriolis.utils.test_ssh_path')
  975. def test_stop_service_with_upstart(self, mock_test_ssh,
  976. mock_exec_ssh_cmd):
  977. mock_test_ssh.side_effect = [False, False, True]
  978. utils.stop_service(self.mock_ssh, 'svc_name')
  979. mock_exec_ssh_cmd.assert_called_once_with(
  980. self.mock_ssh, 'stop svc_name')
  981. @mock.patch('coriolis.utils.test_ssh_path')
  982. def test_stop_service_exception(self, mock_test_ssh):
  983. mock_test_ssh.return_value = False
  984. stop_svc_undecorated = testutils.get_wrapped_function(
  985. utils.stop_service)
  986. self.assertRaises(exception.UnrecognizedWorkerInitSystem,
  987. stop_svc_undecorated, self.mock_ssh, 'svc_name')
  988. def _get_mock_conn_info(self):
  989. return {
  990. "ip": "1.2.3.4",
  991. "port": 2222,
  992. "username": "someUser",
  993. "password": "pwned",
  994. "pkey": mock.Mock(),
  995. }
  996. @mock.patch.object(utils, "connect_ssh")
  997. @mock.patch.object(utils, "exec_ssh_cmd")
  998. @mock.patch("time.sleep")
  999. def test_poll_instance_ssh(
  1000. self,
  1001. mock_sleep,
  1002. mock_exec_ssh,
  1003. mock_connect,
  1004. ):
  1005. mock_ssh_conn = mock.Mock()
  1006. mock_connect.side_effect = [
  1007. Exception,
  1008. mock_ssh_conn,
  1009. mock_ssh_conn,
  1010. ]
  1011. mock_exec_ssh.side_effect = [
  1012. Exception,
  1013. mock.sentinel.stdout,
  1014. ]
  1015. poll_interval = 5
  1016. connection_info = self._get_mock_conn_info()
  1017. utils.poll_instance_until_reachable(
  1018. connection_info=connection_info,
  1019. protocol=constants.PROTOCOL_SSH,
  1020. timeout=30,
  1021. poll_interval=poll_interval,
  1022. )
  1023. mock_connect.assert_has_calls(
  1024. [
  1025. mock.call(
  1026. hostname=connection_info["ip"],
  1027. port=connection_info["port"],
  1028. username=connection_info["username"],
  1029. password=connection_info["password"],
  1030. pkey=connection_info["pkey"],
  1031. )
  1032. ] * 2)
  1033. mock_exec_ssh.assert_has_calls(
  1034. [mock.call(mock_ssh_conn, "exit 0")] * 2)
  1035. mock_ssh_conn.close.assert_has_calls([mock.call()] * 2)
  1036. mock_sleep.assert_has_calls([mock.call(poll_interval)] * 2)
  1037. @mock.patch.object(utils, "connect_ssh")
  1038. @mock.patch.object(utils, "exec_ssh_cmd")
  1039. @mock.patch("time.sleep")
  1040. @mock.patch("time.time")
  1041. def test_poll_instance_ssh_timeout(
  1042. self,
  1043. mock_time,
  1044. mock_sleep,
  1045. mock_exec_ssh,
  1046. mock_connect,
  1047. ):
  1048. poll_interval = 5
  1049. mock_time.side_effect = [x * 10 for x in range(20)]
  1050. mock_connect.side_effect = IOError
  1051. connection_info = self._get_mock_conn_info()
  1052. self.assertRaises(
  1053. exception.CoriolisException,
  1054. utils.poll_instance_until_reachable,
  1055. connection_info=connection_info,
  1056. protocol=constants.PROTOCOL_SSH,
  1057. timeout=30,
  1058. poll_interval=poll_interval,
  1059. )
  1060. @mock.patch("coriolis.wsman.WSManConnection", new_callable=mock.Mock)
  1061. @mock.patch("time.sleep")
  1062. def test_poll_instance_winrm(
  1063. self,
  1064. mock_sleep,
  1065. mock_wsman,
  1066. ):
  1067. mock_conn = mock.Mock()
  1068. mock_conn.exec_ps_command.side_effect = [
  1069. Exception, mock.sentinel.stdout]
  1070. mock_wsman.from_connection_info.return_value = mock_conn
  1071. poll_interval = 5
  1072. connection_info = self._get_mock_conn_info()
  1073. utils.poll_instance_until_reachable(
  1074. connection_info=connection_info,
  1075. protocol=constants.PROTOCOL_WINRM,
  1076. timeout=30,
  1077. poll_interval=poll_interval,
  1078. )
  1079. mock_wsman.from_connection_info.assert_has_calls(
  1080. [mock.call(connection_info)] * 2, any_order=True)
  1081. mock_conn.exec_ps_command.assert_has_calls(
  1082. [mock.call("whoami")] * 2)
  1083. mock_sleep.assert_called_once_with(poll_interval)
  1084. @mock.patch("coriolis.wsman.WSManConnection", new_callable=mock.Mock)
  1085. @mock.patch("time.sleep")
  1086. @mock.patch("time.time")
  1087. def test_poll_instance_winrm_timeout(
  1088. self,
  1089. mock_time,
  1090. mock_sleep,
  1091. mock_wsman,
  1092. ):
  1093. poll_interval = 5
  1094. mock_time.side_effect = [x * 10 for x in range(20)]
  1095. mock_conn = mock.Mock()
  1096. mock_conn.exec_ps_command.side_effect = IOError
  1097. mock_wsman.from_connection_info.return_value = mock_conn
  1098. connection_info = self._get_mock_conn_info()
  1099. self.assertRaises(
  1100. exception.CoriolisException,
  1101. utils.poll_instance_until_reachable,
  1102. connection_info=connection_info,
  1103. protocol=constants.PROTOCOL_WINRM,
  1104. timeout=30,
  1105. poll_interval=poll_interval,
  1106. )
  1107. @ddt.ddt
  1108. class Grub2ConfigEditorTestCase(test_base.CoriolisBaseTestCase):
  1109. """Test suite for the Coriolis Grub2ConfigEditor class."""
  1110. def setUp(self):
  1111. super(Grub2ConfigEditorTestCase, self).setUp()
  1112. self.cfg = "test configuration"
  1113. self.parser = utils.Grub2ConfigEditor(self.cfg)
  1114. def test__init__(self):
  1115. result = utils.Grub2ConfigEditor(self.cfg)
  1116. self.assertEqual(
  1117. result._parsed, [{'type': 'raw', 'payload': self.cfg}])
  1118. def test_parse_cfg_comment_line(self):
  1119. self.cfg = '# This is a comment'
  1120. result = self.parser._parse_cfg(self.cfg)
  1121. self.assertEqual(result, [{'type': 'raw', 'payload': self.cfg}])
  1122. def test_parse_cfg_option_line_with_quoted_value(self):
  1123. self.cfg = 'option="value"'
  1124. expected_result = [{'type': 'option', 'payload': self.cfg,
  1125. 'quoted': True, 'option_name': 'option',
  1126. 'option_value':
  1127. [{'opt_type': 'single', 'opt_val': 'value'}]}]
  1128. result = self.parser._parse_cfg(self.cfg)
  1129. self.assertEqual(result, expected_result)
  1130. def test_parse_cfg_option_line_without_value(self):
  1131. self.cfg = 'option='
  1132. expected_result = [{'type': 'option', 'payload': self.cfg,
  1133. 'quoted': False, 'option_name': 'option',
  1134. 'option_value':
  1135. [{'opt_type': 'single', 'opt_val': ''}]}]
  1136. result = self.parser._parse_cfg(self.cfg)
  1137. self.assertEqual(result, expected_result)
  1138. def test_parse_cfg_option_line_with_value(self):
  1139. self.cfg = 'option=value'
  1140. expected_result = [{'type': 'option', 'payload': self.cfg,
  1141. 'quoted': False, 'option_name': 'option',
  1142. 'option_value':
  1143. [{'opt_type': 'single', 'opt_val': 'value'}]}]
  1144. result = self.parser._parse_cfg(self.cfg)
  1145. self.assertEqual(result, expected_result)
  1146. def test_parse_cfg_option_line_with_multiple_values(self):
  1147. self.cfg = 'option=value1 value2'
  1148. expected_result = [{'type': 'option', 'payload': self.cfg,
  1149. 'quoted': False, 'option_name': 'option',
  1150. 'option_value':
  1151. [{'opt_type': 'single', 'opt_val': 'value1'},
  1152. {'opt_type': 'single', 'opt_val': 'value2'}]}]
  1153. result = self.parser._parse_cfg(self.cfg)
  1154. self.assertEqual(result, expected_result)
  1155. def test_parse_cfg_option_line_with_key_value(self):
  1156. self.cfg = 'option=key=value'
  1157. expected_result = [{'type': 'option', 'payload': self.cfg,
  1158. 'quoted': False, 'option_name': 'option',
  1159. 'option_value':
  1160. [{'opt_type': 'key_val', 'opt_val': 'value',
  1161. 'opt_key': 'key'}]}]
  1162. result = self.parser._parse_cfg(self.cfg)
  1163. self.assertEqual(result, expected_result)
  1164. @ddt.data(
  1165. ("not a dict", ValueError),
  1166. ({"opt_type": "invalid"}, ValueError),
  1167. ({"opt_type": "key_val"}, ValueError),
  1168. ({"opt_type": "single"}, ValueError),
  1169. ({"unknown_opt_type"}, ValueError),
  1170. ({"opt_type": "key_val", "opt_key": "key", "opt_val": "val"}, None),
  1171. ({"opt_type": "single", "opt_val": "val"}, None),
  1172. )
  1173. @ddt.unpack
  1174. def test_validate_value(self, value, expected):
  1175. if expected:
  1176. self.assertRaises(expected, self.parser._validate_value, value)
  1177. else:
  1178. self.parser._validate_value(value)
  1179. def test_set_option_updates_existing_option(self):
  1180. self.parser._parsed = [{"option_name": "existing_option",
  1181. "option_value": ["old_value"]}]
  1182. new_value = {"opt_type": "key_val", "opt_key": "key", "opt_val":
  1183. "new_value"}
  1184. expected_value = [{"option_name": "existing_option",
  1185. "option_value": [new_value]}]
  1186. self.parser.set_option("existing_option", new_value)
  1187. self.assertEqual(self.parser._parsed, expected_value)
  1188. def test_set_option_adds_new_option(self):
  1189. self.parser._parsed = [{"option_name": "existing_option",
  1190. "option_value": ["old_value"]}]
  1191. new_value = {"opt_type": "key_val", "opt_key": "key",
  1192. "opt_val": "new_value", "quoted": True, "type": "option"}
  1193. expected_value = [{"option_name": "existing_option", "option_value":
  1194. ["old_value"]}, {"option_name": "new_option",
  1195. "option_value": [new_value],
  1196. "quoted": True, "type": "option"}]
  1197. self.parser.set_option("new_option", new_value)
  1198. self.assertEqual(self.parser._parsed, expected_value)
  1199. def test_append_to_option_updates_existing_key_val_option(self):
  1200. self.parser._parsed = [{"option_name": "existing_option",
  1201. "option_value": [{"opt_type": "key_val",
  1202. "opt_key": "key",
  1203. "opt_val": "old_value"}]}]
  1204. new_value = {"opt_type": "key_val", "opt_key": "key",
  1205. "opt_val": "new_value"}
  1206. expected_value = [{"option_name": "existing_option", "option_value":
  1207. [new_value]}]
  1208. self.parser.append_to_option("existing_option", new_value)
  1209. self.assertEqual(self.parser._parsed, expected_value)
  1210. def test_append_to_option_ignores_existing_single_option(self):
  1211. self.parser._parsed = [{"option_name": "existing_option",
  1212. "option_value": [{"opt_type": "single",
  1213. "opt_val": "old_value"}]}]
  1214. new_value = {"opt_type": "single", "opt_val": "old_value"}
  1215. expected_value = [{"option_name": "existing_option",
  1216. "option_value": [new_value]}]
  1217. self.parser.append_to_option("existing_option", new_value)
  1218. self.assertEqual(self.parser._parsed, expected_value)
  1219. def test_append_to_option_adds_new_single_option(self):
  1220. self.parser._parsed = [{"option_name": "existing_option",
  1221. "option_value": [{"opt_type": "single",
  1222. "opt_val": "old_value"}]}]
  1223. new_value = {"opt_type": "single", "opt_val": "new_value"}
  1224. expected_value = [{
  1225. "option_name": "existing_option", "option_value":
  1226. [{"opt_type": "single", "opt_val": "old_value"}, new_value]}]
  1227. self.parser.append_to_option("existing_option", new_value)
  1228. self.assertEqual(self.parser._parsed, expected_value)
  1229. def test_append_to_option_adds_new_option(self):
  1230. self.parser._parsed = [{"option_name": "existing_option",
  1231. "option_value": [{"opt_type": "single",
  1232. "opt_val": "old_value"}]}]
  1233. new_value = {"opt_type": "key_val", "opt_key": "key", "opt_val":
  1234. "new_value"}
  1235. expected_value = [{
  1236. "option_name": "existing_option", "option_value":
  1237. [{"opt_type": "single", "opt_val": "old_value"}]},
  1238. {"option_name": "new_option", "option_value": [new_value],
  1239. "quoted": True, "type": "option"}]
  1240. self.parser.append_to_option("new_option", new_value)
  1241. self.assertEqual(self.parser._parsed, expected_value)
  1242. @ddt.data(
  1243. (
  1244. [{"type": "raw", "payload": "raw_data"}],
  1245. "raw_data\n"
  1246. ),
  1247. (
  1248. [{"type": "option", "option_name": "option1", "option_value":
  1249. [{"opt_type": "single", "opt_val": "value1"}], "quoted": False}],
  1250. "option1=value1\n"
  1251. ),
  1252. (
  1253. [{"type": "option", "option_name": "option2", "option_value":
  1254. [{"opt_type": "key_val", "opt_key": "key2",
  1255. "opt_val": "value2"}],
  1256. "quoted": True}], "option2=\"key2=value2\"\n"
  1257. ),
  1258. (
  1259. [{"type": "option", "option_name": "option3", "option_value": [],
  1260. "quoted": False}], "option3=\n"
  1261. ),
  1262. (
  1263. [{"type": "option", "option_name": "option4", "option_value":
  1264. [{"opt_type": "single", "opt_val": "value4_1"},
  1265. {"opt_type": "single", "opt_val":
  1266. "value4_2"}], "quoted": False}],
  1267. "option4=\"value4_1 value4_2\"\n"
  1268. ))
  1269. @ddt.unpack
  1270. def test_dump(self, parsed, expected_output):
  1271. self.parser._parsed = parsed
  1272. self.assertEqual(self.parser.dump(), expected_output)