test_data_transfer.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # Copyright 2023 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import stat
  4. import struct
  5. from unittest import mock
  6. from urllib import parse
  7. import requests
  8. import requests_unixsocket
  9. from coriolis import data_transfer
  10. from coriolis import exception
  11. from coriolis.tests import test_base
  12. class DataTranferTestCase(test_base.CoriolisBaseTestCase):
  13. """Collection of tests for the Coriolis data transfer module."""
  14. def setUp(self):
  15. super(DataTranferTestCase, self).setUp()
  16. self.data_content = 'test-content'.encode()
  17. self.fmt = 'gzip'
  18. self.mock_session = mock.Mock()
  19. self.mock_url = 'http://localhost:8000/'
  20. self.mock_response = mock.Mock()
  21. self.mock_response.content = self.data_content
  22. self.mock_session.post.return_value = self.mock_response
  23. self.msg_id = 1
  24. self.path = 'test-path'
  25. self.offset = 0
  26. @mock.patch.object(data_transfer, 'CONF')
  27. def test_get_session_and_address(self, mock_conf):
  28. mock_conf.compressor_address = 'localhost:8000'
  29. result = data_transfer._get_session_and_address()
  30. self.assertIsInstance(result[0], requests.Session)
  31. self.assertEqual(result[1], "http://%s/" %
  32. mock_conf.compressor_address)
  33. def test_get_session_and_address_no_compressor_address(self):
  34. result = data_transfer._get_session_and_address()
  35. self.assertEqual(result, (None, None))
  36. @mock.patch.object(data_transfer.os, 'stat')
  37. @mock.patch.object(data_transfer, 'CONF')
  38. def test_get_session_and_address_unix_socket(self, mock_conf, mock_stat):
  39. mock_conf.compressor_address = '/var/run/compressor.sock'
  40. mock_stat.return_value.st_mode = stat.S_IFSOCK
  41. result = data_transfer._get_session_and_address()
  42. self.assertIsInstance(result[0], requests_unixsocket.Session)
  43. self.assertEqual(result[1], "http+unix://%s/" % parse.quote_plus(
  44. mock_conf.compressor_address))
  45. @mock.patch.object(data_transfer, 'CONF')
  46. def test_get_session_and_address_nonexistent_path(self, mock_conf):
  47. mock_conf.compressor_address = '/var/run/compressor.sock'
  48. self.assertRaises(exception.CoriolisException,
  49. data_transfer._get_session_and_address)
  50. @mock.patch.object(data_transfer.os, 'stat')
  51. @mock.patch.object(data_transfer, 'CONF')
  52. def test_get_session_and_address_invalid_unix_socket(self, mock_conf,
  53. mock_stat):
  54. mock_conf.compressor_address = '/var/run/compressor.sock'
  55. mock_stat.return_value.st_mode = stat.S_IFREG
  56. self.assertRaises(exception.CoriolisException,
  57. data_transfer._get_session_and_address)
  58. @mock.patch.object(data_transfer.constants, 'COMPRESSION_FORMAT_GZIP')
  59. def test_compression_proxy_invalid_format(self, mock_compression_format):
  60. self.assertRaises(exception.CoriolisException,
  61. data_transfer.compression_proxy,
  62. mock.sentinel.content,
  63. mock_compression_format.return_value)
  64. @mock.patch.object(data_transfer, '_COMPRESS_FUNC')
  65. def test_compression_proxy(self, mock_compress_func):
  66. result = data_transfer.compression_proxy(self.data_content, self.fmt)
  67. mock_compress_func[self.fmt].assert_called_once_with(self.data_content)
  68. self.assertEqual(
  69. result, (mock_compress_func[self.fmt].return_value, True))
  70. @mock.patch.object(data_transfer, '_get_session_and_address')
  71. @mock.patch.object(data_transfer, 'CONF')
  72. def test_compression_proxy_with_compression(self, mock_conf,
  73. mock_get_session_and_address):
  74. mock_get_session_and_address.return_value = (
  75. self.mock_session, self.mock_url)
  76. result = data_transfer.compression_proxy(self.data_content, self.fmt)
  77. self.mock_session.post.assert_called_once_with(
  78. self.mock_url, data=self.data_content,
  79. headers={'X-Compression-Format': self.fmt},
  80. timeout=mock_conf.default_requests_timeout)
  81. self.assertEqual(
  82. result, (self.mock_session.post.return_value.content, False))
  83. self.mock_session.close.assert_called_once()
  84. @mock.patch.object(data_transfer, '_get_session_and_address')
  85. @mock.patch.object(data_transfer, '_COMPRESS_FUNC')
  86. def test_compression_proxy_with_exception(self, mock_compress_func,
  87. mock_get_session_and_address):
  88. mock_get_session_and_address.return_value = (self.mock_session,
  89. self.mock_url)
  90. self.mock_session.post.side_effect = Exception("mock_message")
  91. result = data_transfer.compression_proxy(self.data_content, self.fmt)
  92. self.assertEqual(
  93. result, (mock_compress_func[self.fmt].return_value, True))
  94. self.mock_session.close.assert_called_once()
  95. @mock.patch.object(data_transfer, 'compression_proxy')
  96. @mock.patch.object(data_transfer, 'struct')
  97. def test_encode_data(self, mock_struct, mock_compression_proxy):
  98. mock_compression_proxy.return_value = (self.data_content, True)
  99. mock_struct.pack.side_effect = (lambda fmt,
  100. *args: struct.pack(fmt, *args))
  101. result = data_transfer.encode_data(self.msg_id, self.path, self.offset,
  102. self.data_content, True)
  103. expected_result = struct.pack('<III', self.msg_id,
  104. len(self.data_content),
  105. len(self.path) + 1 + 8 +
  106. len(self.data_content))
  107. expected_content = (self.path.encode() + b'\0' +
  108. struct.pack("<Q", self.offset))
  109. mock_compression_proxy.assert_called_once_with(
  110. expected_content + self.data_content,
  111. data_transfer.constants.COMPRESSION_FORMAT_ZLIB)
  112. self.assertEqual(result, expected_result + self.data_content)
  113. @mock.patch.object(data_transfer, 'struct')
  114. def test_encode_data_uncompressed(self, mock_struct):
  115. mock_struct.pack.side_effect = (lambda fmt,
  116. *args: struct.pack(fmt, *args))
  117. result = data_transfer.encode_data(self.msg_id, self.path, self.offset,
  118. self.data_content, False)
  119. inflated_content = (self.path.encode() + b'\0' +
  120. struct.pack("<Q", self.offset) +
  121. self.data_content)
  122. expected_result = (struct.pack("<I", self.msg_id) +
  123. struct.pack("<I", len(inflated_content)) +
  124. struct.pack("<I", 0) +
  125. inflated_content)
  126. self.assertEqual(result, expected_result)
  127. @mock.patch.object(data_transfer, 'struct')
  128. def test_encode_eod(self, mock_struct):
  129. self.msg_id = 1
  130. mock_struct.pack.side_effect = (lambda fmt,
  131. *args: struct.pack(fmt, *args))
  132. result = data_transfer.encode_eod(self.msg_id)
  133. expected_result = struct.pack("<II", self.msg_id, 0)
  134. self.assertEqual(result, expected_result)