helpers.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. from contextlib import contextmanager
  2. import os
  3. import sys
  4. import unittest
  5. from six import reraise
  6. from cloudbridge.cloud.factory import CloudProviderFactory
  7. from cloudbridge.cloud.interfaces import InstanceState
  8. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  9. def parse_bool(val):
  10. if val:
  11. return str(val).upper() in ['TRUE', 'YES']
  12. else:
  13. return False
  14. @contextmanager
  15. def cleanup_action(cleanup_func):
  16. """
  17. Context manager to carry out a given
  18. cleanup action after carrying out a set
  19. of tasks, or when an exception occurs.
  20. If any errors occur during the cleanup
  21. action, those are ignored, and the original
  22. traceback is preserved.
  23. :params func: This function is called if
  24. an exception occurs or at the end of the
  25. context block. If any exceptions raised
  26. by func are ignored.
  27. Usage:
  28. with cleanup_action(lambda e: print("Oops!")):
  29. do_something()
  30. """
  31. try:
  32. yield
  33. except Exception:
  34. ex_class, ex_val, ex_traceback = sys.exc_info()
  35. try:
  36. cleanup_func()
  37. except Exception as e:
  38. print("Error during exception cleanup: {0}".format(e))
  39. reraise(ex_class, ex_val, ex_traceback)
  40. cleanup_func()
  41. TEST_DATA_CONFIG = {
  42. "AWSCloudProvider": {
  43. "image": os.environ.get('CB_IMAGE_AWS', 'ami-d85e75b0'),
  44. "instance_type": os.environ.get('CB_INSTANCE_TYPE_AWS',
  45. 't1.micro'),
  46. "placement": os.environ.get('CB_PLACEMENT_AWS', 'us-east-1a'),
  47. },
  48. "OpenStackCloudProvider": {
  49. "image": os.environ.get('CB_IMAGE_OS',
  50. 'a471339a-bd0e-41e2-9406-4f308267ed0f'),
  51. "instance_type": os.environ.get('CB_INSTANCE_TYPE_OS', 'm1.tiny'),
  52. "placement": os.environ.get('CB_PLACEMENT_OS', 'nova'),
  53. }
  54. }
  55. def get_provider_test_data(provider, key):
  56. if "AWSCloudProvider" in provider.name:
  57. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  58. elif "OpenStackCloudProvider" in provider.name:
  59. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  60. return None
  61. def create_test_network(provider, name):
  62. """
  63. Create a network with one subnet, returning the network and subnet objects.
  64. """
  65. net = provider.network.create(name=name)
  66. cidr_block = (net.cidr_block).split('/')[0] or '10.0.0.1'
  67. sn = net.create_subnet(cidr_block='{0}/28'.format(cidr_block, name=name))
  68. return net, sn
  69. def delete_test_network(network):
  70. """
  71. Delete the supplied network, first deleting any contained subnets.
  72. """
  73. with cleanup_action(lambda: network.delete()):
  74. for sn in network.subnets():
  75. sn.delete()
  76. def create_test_instance(
  77. provider, instance_name, zone=None, launch_config=None,
  78. key_pair=None, security_groups=None):
  79. return provider.compute.instances.create(
  80. instance_name,
  81. get_provider_test_data(provider, 'image'),
  82. get_provider_test_data(provider, 'instance_type'),
  83. zone=zone,
  84. key_pair=key_pair,
  85. security_groups=security_groups,
  86. launch_config=launch_config)
  87. def get_test_instance(provider, name, key_pair=None, security_groups=None,
  88. network=None):
  89. launch_config = None
  90. if network:
  91. launch_config = provider.compute.instances.create_launch_config()
  92. launch_config.add_network_interface(network.id)
  93. instance = create_test_instance(
  94. provider,
  95. name,
  96. key_pair=key_pair,
  97. security_groups=security_groups,
  98. launch_config=launch_config)
  99. instance.wait_till_ready()
  100. return instance
  101. def cleanup_test_resources(instance=None, network=None, security_group=None,
  102. key_pair=None):
  103. with cleanup_action(lambda: delete_test_network(network)):
  104. with cleanup_action(lambda: key_pair.delete()):
  105. with cleanup_action(lambda: security_group.delete()):
  106. instance.terminate()
  107. instance.wait_for(
  108. [InstanceState.TERMINATED, InstanceState.UNKNOWN],
  109. terminal_states=[InstanceState.ERROR])
  110. class ProviderTestBase(object):
  111. """
  112. A dummy base class for Test Cases. Does not inherit from unittest.TestCase
  113. to avoid confusing test discovery by unittest and nose2. unittest.TestCase
  114. is injected as a base class by the generator, so calling the unittest
  115. constructor works correctly.
  116. """
  117. def __init__(self, methodName, provider):
  118. unittest.TestCase.__init__(self, methodName=methodName)
  119. self.provider = provider
  120. def setUp(self):
  121. if isinstance(self.provider, TestMockHelperMixin):
  122. self.provider.setUpMock()
  123. def tearDown(self):
  124. if isinstance(self.provider, TestMockHelperMixin):
  125. self.provider.tearDownMock()
  126. class ProviderTestCaseGenerator():
  127. """
  128. Generates test cases for all provider - testcase combinations.
  129. Detailed docs at test/__init__.py
  130. """
  131. def __init__(self, test_classes):
  132. self.all_test_classes = test_classes
  133. def get_provider_wait_interval(self, provider_class):
  134. if issubclass(provider_class, TestMockHelperMixin):
  135. return 0
  136. else:
  137. return 1
  138. def create_provider_instance(self, provider_class):
  139. """
  140. Instantiate a default provider instance. All required connection
  141. settings are expected to be set as environment variables.
  142. """
  143. config = {'default_wait_interval':
  144. self.get_provider_wait_interval(provider_class)}
  145. return provider_class(config)
  146. def generate_new_test_class(self, name, testcase_class):
  147. """
  148. Generates a new type which inherits from the given testcase_class and
  149. unittest.TestCase
  150. """
  151. class_name = "{0}{1}".format(name, testcase_class.__name__)
  152. return type(class_name, (testcase_class, unittest.TestCase), {})
  153. def generate_test_suite_for_provider_testcase(
  154. self, provider_class, testcase_class):
  155. """
  156. Generate and return a suite of tests for a specific provider class and
  157. testcase combination
  158. """
  159. testloader = unittest.TestLoader()
  160. testnames = testloader.getTestCaseNames(testcase_class)
  161. suite = unittest.TestSuite()
  162. for name in testnames:
  163. generated_cls = self.generate_new_test_class(
  164. provider_class.__name__,
  165. testcase_class)
  166. suite.addTest(
  167. generated_cls(
  168. name,
  169. self.create_provider_instance(provider_class)))
  170. return suite
  171. def generate_test_suite_for_provider(self, provider_class):
  172. """
  173. Generate and return a suite of all available tests for a given provider
  174. class
  175. """
  176. suite = unittest.TestSuite()
  177. suites = [
  178. self.generate_test_suite_for_provider_testcase(
  179. provider_class, test_class)
  180. for test_class in self.all_test_classes]
  181. for s in suites:
  182. suite.addTest(s)
  183. return suite
  184. def generate_tests(self):
  185. """
  186. Generate and return a suite of tests for all provider and test class
  187. combinations
  188. """
  189. factory = CloudProviderFactory()
  190. use_mock_drivers = parse_bool(
  191. os.environ.get("CB_USE_MOCK_PROVIDERS", True))
  192. provider_name = os.environ.get("CB_TEST_PROVIDER", None)
  193. if provider_name:
  194. provider_classes = [
  195. factory.get_provider_class(
  196. provider_name,
  197. get_mock=use_mock_drivers)]
  198. if not provider_classes[0]:
  199. raise ValueError(
  200. "Could not find specified test provider %s" %
  201. provider_name)
  202. else:
  203. provider_classes = factory.get_all_provider_classes(
  204. get_mock=use_mock_drivers)
  205. suite = unittest.TestSuite()
  206. suites = [
  207. self.generate_test_suite_for_provider(p) for p in provider_classes]
  208. for s in suites:
  209. suite.addTest(s)
  210. return suite