helpers.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. from contextlib import contextmanager
  2. import os
  3. import sys
  4. import unittest
  5. from six import reraise
  6. from cloudbridge.cloud.factory import CloudProviderFactory
  7. from cloudbridge.cloud.interfaces import InstanceState
  8. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  9. def parse_bool(val):
  10. if val:
  11. return str(val).upper() in ['TRUE', 'YES']
  12. else:
  13. return False
  14. @contextmanager
  15. def cleanup_action(cleanup_func):
  16. """
  17. Context manager to carry out a given
  18. cleanup action after carrying out a set
  19. of tasks, or when an exception occurs.
  20. If any errors occur during the cleanup
  21. action, those are ignored, and the original
  22. traceback is preserved.
  23. :params func: This function is called if
  24. an exception occurs or at the end of the
  25. context block. If any exceptions raised
  26. by func are ignored.
  27. Usage:
  28. with cleanup_action(lambda e: print("Oops!")):
  29. do_something()
  30. """
  31. try:
  32. yield
  33. except Exception:
  34. ex_class, ex_val, ex_traceback = sys.exc_info()
  35. try:
  36. cleanup_func()
  37. except Exception as e:
  38. print("Error during exception cleanup: {0}".format(e))
  39. reraise(ex_class, ex_val, ex_traceback)
  40. try:
  41. cleanup_func()
  42. except Exception as e:
  43. print("Error during cleanup: {0}".format(e))
  44. TEST_DATA_CONFIG = {
  45. "AWSCloudProvider": {
  46. "image": os.environ.get('CB_IMAGE_AWS', 'ami-5ac2cd4d'),
  47. "instance_type": os.environ.get('CB_INSTANCE_TYPE_AWS', 't2.nano'),
  48. "placement": os.environ.get('CB_PLACEMENT_AWS', 'us-east-1a'),
  49. },
  50. "OpenStackCloudProvider": {
  51. "image": os.environ.get('CB_IMAGE_OS',
  52. 'a471339a-bd0e-41e2-9406-4f308267ed0f'),
  53. "instance_type": os.environ.get('CB_INSTANCE_TYPE_OS', 'm1.tiny'),
  54. "placement": os.environ.get('CB_PLACEMENT_OS', 'nova'),
  55. }
  56. }
  57. def get_provider_test_data(provider, key):
  58. if "AWSCloudProvider" in provider.name:
  59. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  60. elif "OpenStackCloudProvider" in provider.name:
  61. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  62. return None
  63. def create_test_network(provider, name):
  64. """
  65. Create a network with one subnet, returning the network and subnet objects.
  66. """
  67. net = provider.network.create(name=name)
  68. cidr_block = (net.cidr_block).split('/')[0] or '10.0.0.1'
  69. sn = net.create_subnet(cidr_block='{0}/28'.format(cidr_block, name=name))
  70. return net, sn
  71. def delete_test_network(network):
  72. """
  73. Delete the supplied network, first deleting any contained subnets.
  74. """
  75. with cleanup_action(lambda: network.delete()):
  76. for sn in network.subnets():
  77. sn.delete()
  78. def create_test_instance(
  79. provider, instance_name, network, zone=None, launch_config=None,
  80. key_pair=None, security_groups=None):
  81. return provider.compute.instances.create(
  82. instance_name,
  83. get_provider_test_data(provider, 'image'),
  84. get_provider_test_data(provider, 'instance_type'),
  85. network=network,
  86. zone=zone,
  87. key_pair=key_pair,
  88. security_groups=security_groups,
  89. launch_config=launch_config)
  90. def get_test_instance(provider, name, key_pair=None, security_groups=None,
  91. network=None):
  92. launch_config = None
  93. instance = create_test_instance(
  94. provider,
  95. name,
  96. network=network,
  97. key_pair=key_pair,
  98. security_groups=security_groups,
  99. launch_config=launch_config)
  100. instance.wait_till_ready()
  101. return instance
  102. def cleanup_test_resources(instance=None, network=None, security_group=None,
  103. key_pair=None):
  104. with cleanup_action(lambda: delete_test_network(network)):
  105. with cleanup_action(lambda: key_pair.delete()):
  106. with cleanup_action(lambda: security_group.delete()):
  107. instance.terminate()
  108. instance.wait_for(
  109. [InstanceState.TERMINATED, InstanceState.UNKNOWN],
  110. terminal_states=[InstanceState.ERROR])
  111. class ProviderTestBase(object):
  112. """
  113. A dummy base class for Test Cases. Does not inherit from unittest.TestCase
  114. to avoid confusing test discovery by unittest and nose2. unittest.TestCase
  115. is injected as a base class by the generator, so calling the unittest
  116. constructor works correctly.
  117. """
  118. def __init__(self, methodName, provider):
  119. unittest.TestCase.__init__(self, methodName=methodName)
  120. self.provider = provider
  121. def setUp(self):
  122. if isinstance(self.provider, TestMockHelperMixin):
  123. self.provider.setUpMock()
  124. def tearDown(self):
  125. if isinstance(self.provider, TestMockHelperMixin):
  126. self.provider.tearDownMock()
  127. class ProviderTestCaseGenerator():
  128. """
  129. Generates test cases for all provider - testcase combinations.
  130. Detailed docs at test/__init__.py
  131. """
  132. def __init__(self, test_classes):
  133. self.all_test_classes = test_classes
  134. def get_provider_wait_interval(self, provider_class):
  135. if issubclass(provider_class, TestMockHelperMixin):
  136. return 0
  137. else:
  138. return 1
  139. def create_provider_instance(self, provider_class):
  140. """
  141. Instantiate a default provider instance. All required connection
  142. settings are expected to be set as environment variables.
  143. """
  144. config = {'default_wait_interval':
  145. self.get_provider_wait_interval(provider_class)}
  146. return provider_class(config)
  147. def generate_new_test_class(self, name, testcase_class):
  148. """
  149. Generates a new type which inherits from the given testcase_class and
  150. unittest.TestCase
  151. """
  152. class_name = "{0}{1}".format(name, testcase_class.__name__)
  153. return type(class_name, (testcase_class, unittest.TestCase), {})
  154. def generate_test_suite_for_provider_testcase(
  155. self, provider_class, testcase_class):
  156. """
  157. Generate and return a suite of tests for a specific provider class and
  158. testcase combination
  159. """
  160. testloader = unittest.TestLoader()
  161. testnames = testloader.getTestCaseNames(testcase_class)
  162. suite = unittest.TestSuite()
  163. for name in testnames:
  164. generated_cls = self.generate_new_test_class(
  165. provider_class.__name__,
  166. testcase_class)
  167. suite.addTest(
  168. generated_cls(
  169. name,
  170. self.create_provider_instance(provider_class)))
  171. return suite
  172. def generate_test_suite_for_provider(self, provider_class):
  173. """
  174. Generate and return a suite of all available tests for a given provider
  175. class
  176. """
  177. suite = unittest.TestSuite()
  178. suites = [
  179. self.generate_test_suite_for_provider_testcase(
  180. provider_class, test_class)
  181. for test_class in self.all_test_classes]
  182. for s in suites:
  183. suite.addTest(s)
  184. return suite
  185. def generate_tests(self):
  186. """
  187. Generate and return a suite of tests for all provider and test class
  188. combinations
  189. """
  190. factory = CloudProviderFactory()
  191. use_mock_drivers = parse_bool(
  192. os.environ.get("CB_USE_MOCK_PROVIDERS", True))
  193. provider_name = os.environ.get("CB_TEST_PROVIDER", None)
  194. if provider_name:
  195. provider_classes = [
  196. factory.get_provider_class(
  197. provider_name,
  198. get_mock=use_mock_drivers)]
  199. if not provider_classes[0]:
  200. raise ValueError(
  201. "Could not find specified test provider %s" %
  202. provider_name)
  203. else:
  204. provider_classes = factory.get_all_provider_classes(
  205. get_mock=use_mock_drivers)
  206. suite = unittest.TestSuite()
  207. suites = [
  208. self.generate_test_suite_for_provider(p) for p in provider_classes]
  209. for s in suites:
  210. suite.addTest(s)
  211. return suite