helpers.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from contextlib import contextmanager
  2. import os
  3. import sys
  4. import unittest
  5. from six import reraise
  6. from cloudbridge.cloud.factory import CloudProviderFactory
  7. from cloudbridge.cloud.interfaces import InstanceState
  8. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  9. def parse_bool(val):
  10. if val:
  11. return str(val).upper() in ['TRUE', 'YES']
  12. else:
  13. return False
  14. @contextmanager
  15. def cleanup_action(cleanup_func):
  16. """
  17. Context manager to carry out a given
  18. cleanup action after carrying out a set
  19. of tasks, or when an exception occurs.
  20. If any errors occur during the cleanup
  21. action, those are ignored, and the original
  22. traceback is preserved.
  23. :params func: This function is called if
  24. an exception occurs or at the end of the
  25. context block. If any exceptions raised
  26. by func are ignored.
  27. Usage:
  28. with cleanup_action(lambda e: print("Oops!")):
  29. do_something()
  30. """
  31. try:
  32. yield
  33. except Exception:
  34. ex_class, ex_val, ex_traceback = sys.exc_info()
  35. try:
  36. cleanup_func()
  37. except Exception as e:
  38. print("Error during exception cleanup: {0}".format(e))
  39. reraise(ex_class, ex_val, ex_traceback)
  40. cleanup_func()
  41. TEST_DATA_CONFIG = {
  42. "AWSCloudProvider": {
  43. "image": os.environ.get('CB_IMAGE_AWS', 'ami-d85e75b0'),
  44. "instance_type": os.environ.get('CB_INSTANCE_TYPE_AWS',
  45. 't1.micro'),
  46. "placement": os.environ.get('CB_PLACEMENT_AWS', 'us-east-1a'),
  47. },
  48. "OpenStackCloudProvider": {
  49. "image": os.environ.get('CB_IMAGE_OS',
  50. 'a471339a-bd0e-41e2-9406-4f308267ed0f'),
  51. "instance_type": os.environ.get('CB_INSTANCE_TYPE_OS', 'm1.tiny'),
  52. "placement": os.environ.get('CB_PLACEMENT_OS', 'nova'),
  53. }
  54. }
  55. def get_provider_test_data(provider, key):
  56. if "AWSCloudProvider" in provider.name:
  57. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  58. elif "OpenStackCloudProvider" in provider.name:
  59. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  60. return None
  61. def create_test_network(provider, name):
  62. """
  63. Create a network with one subnet, returning the network and subnet objects.
  64. """
  65. net = provider.network.create(name=name)
  66. cidr_block = (net.cidr_block).split('/')[0] or '10.0.0.1'
  67. sn = net.create_subnet(cidr_block='{0}/28'.format(cidr_block, name=name))
  68. return net, sn
  69. def delete_test_network(network):
  70. """
  71. Delete the supplied network, first deleting any contained subnets.
  72. """
  73. for sn in network.subnets():
  74. sn.delete()
  75. network.delete()
  76. def create_test_instance(
  77. provider, instance_name, zone=None, launch_config=None,
  78. key_pair=None, security_groups=None):
  79. return provider.compute.instances.create(
  80. instance_name,
  81. get_provider_test_data(provider, 'image'),
  82. get_provider_test_data(provider, 'instance_type'),
  83. zone=zone,
  84. key_pair=key_pair,
  85. security_groups=security_groups,
  86. launch_config=launch_config)
  87. def get_test_instance(provider, name, key_pair=None, security_groups=None,
  88. network=None):
  89. launch_config = None
  90. if network:
  91. launch_config = provider.compute.instances.create_launch_config()
  92. launch_config.add_network_interface(network.id)
  93. instance = create_test_instance(
  94. provider,
  95. name,
  96. key_pair=key_pair,
  97. security_groups=security_groups,
  98. launch_config=launch_config)
  99. instance.wait_till_ready()
  100. return instance
  101. def cleanup_test_resources(instance=None, network=None, security_group=None,
  102. key_pair=None):
  103. if instance:
  104. instance.terminate()
  105. instance.wait_for(
  106. [InstanceState.TERMINATED, InstanceState.UNKNOWN],
  107. terminal_states=[InstanceState.ERROR])
  108. if security_group:
  109. security_group.delete()
  110. if key_pair:
  111. key_pair.delete()
  112. if network:
  113. delete_test_network(network)
  114. class ProviderTestBase(object):
  115. """
  116. A dummy base class for Test Cases. Does not inherit from unittest.TestCase
  117. to avoid confusing test discovery by unittest and nose2. unittest.TestCase
  118. is injected as a base class by the generator, so calling the unittest
  119. constructor works correctly.
  120. """
  121. def __init__(self, methodName, provider):
  122. unittest.TestCase.__init__(self, methodName=methodName)
  123. self.provider = provider
  124. def setUp(self):
  125. if isinstance(self.provider, TestMockHelperMixin):
  126. self.provider.setUpMock()
  127. def tearDown(self):
  128. if isinstance(self.provider, TestMockHelperMixin):
  129. self.provider.tearDownMock()
  130. class ProviderTestCaseGenerator():
  131. """
  132. Generates test cases for all provider - testcase combinations.
  133. Detailed docs at test/__init__.py
  134. """
  135. def __init__(self, test_classes):
  136. self.all_test_classes = test_classes
  137. def get_provider_wait_interval(self, provider_class):
  138. if issubclass(provider_class, TestMockHelperMixin):
  139. return 0
  140. else:
  141. return 1
  142. def create_provider_instance(self, provider_class):
  143. """
  144. Instantiate a default provider instance. All required connection
  145. settings are expected to be set as environment variables.
  146. """
  147. config = {'default_wait_interval':
  148. self.get_provider_wait_interval(provider_class)}
  149. return provider_class(config)
  150. def generate_new_test_class(self, name, testcase_class):
  151. """
  152. Generates a new type which inherits from the given testcase_class and
  153. unittest.TestCase
  154. """
  155. class_name = "{0}{1}".format(name, testcase_class.__name__)
  156. return type(class_name, (testcase_class, unittest.TestCase), {})
  157. def generate_test_suite_for_provider_testcase(
  158. self, provider_class, testcase_class):
  159. """
  160. Generate and return a suite of tests for a specific provider class and
  161. testcase combination
  162. """
  163. testloader = unittest.TestLoader()
  164. testnames = testloader.getTestCaseNames(testcase_class)
  165. suite = unittest.TestSuite()
  166. for name in testnames:
  167. generated_cls = self.generate_new_test_class(
  168. provider_class.__name__,
  169. testcase_class)
  170. suite.addTest(
  171. generated_cls(
  172. name,
  173. self.create_provider_instance(provider_class)))
  174. return suite
  175. def generate_test_suite_for_provider(self, provider_class):
  176. """
  177. Generate and return a suite of all available tests for a given provider
  178. class
  179. """
  180. suite = unittest.TestSuite()
  181. suites = [
  182. self.generate_test_suite_for_provider_testcase(
  183. provider_class, test_class)
  184. for test_class in self.all_test_classes]
  185. for s in suites:
  186. suite.addTest(s)
  187. return suite
  188. def generate_tests(self):
  189. """
  190. Generate and return a suite of tests for all provider and test class
  191. combinations
  192. """
  193. factory = CloudProviderFactory()
  194. use_mock_drivers = parse_bool(
  195. os.environ.get("CB_USE_MOCK_PROVIDERS", True))
  196. provider_name = os.environ.get("CB_TEST_PROVIDER", None)
  197. if provider_name:
  198. provider_classes = [
  199. factory.get_provider_class(
  200. provider_name,
  201. get_mock=use_mock_drivers)]
  202. if not provider_classes[0]:
  203. raise ValueError(
  204. "Could not find specified test provider %s" %
  205. provider_name)
  206. else:
  207. provider_classes = factory.get_all_provider_classes(
  208. get_mock=use_mock_drivers)
  209. suite = unittest.TestSuite()
  210. suites = [
  211. self.generate_test_suite_for_provider(p) for p in provider_classes]
  212. for s in suites:
  213. suite.addTest(s)
  214. return suite