helpers.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from contextlib import contextmanager
  2. import os
  3. import sys
  4. import unittest
  5. from six import reraise
  6. from cloudbridge.cloud.factory import CloudProviderFactory
  7. from cloudbridge.cloud.interfaces import InstanceState
  8. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  9. def parse_bool(val):
  10. if val:
  11. return str(val).upper() in ['TRUE', 'YES']
  12. else:
  13. return False
  14. @contextmanager
  15. def cleanup_action(cleanup_func):
  16. """
  17. Context manager to carry out a given
  18. cleanup action after carrying out a set
  19. of tasks, or when an exception occurs.
  20. If any errors occur during the cleanup
  21. action, those are ignored, and the original
  22. traceback is preserved.
  23. :params func: This function is called if
  24. an exception occurs or at the end of the
  25. context block. If any exceptions raised
  26. by func are ignored.
  27. Usage:
  28. with cleanup_action(lambda e: print("Oops!")):
  29. do_something()
  30. """
  31. try:
  32. yield
  33. except Exception:
  34. ex_class, ex_val, ex_traceback = sys.exc_info()
  35. try:
  36. cleanup_func()
  37. except Exception as e:
  38. print("Error during exception cleanup: {0}".format(e))
  39. reraise(ex_class, ex_val, ex_traceback)
  40. try:
  41. cleanup_func()
  42. except Exception as e:
  43. print("Error during cleanup: {0}".format(e))
  44. def skipIfNoService(services):
  45. """
  46. A decorator for skipping tests if the provider
  47. does not implement a given service.
  48. """
  49. def wrap(func):
  50. """
  51. The actual wrapper
  52. """
  53. def wrapper(self, *args, **kwargs):
  54. provider = getattr(self, 'provider')
  55. if provider:
  56. for service in services:
  57. if not provider.has_service(service):
  58. self.skipTest("Skipping test because '%s' service is"
  59. " not implemented" % (service,))
  60. else:
  61. func(self, *args, **kwargs)
  62. return wrapper
  63. return wrap
  64. TEST_DATA_CONFIG = {
  65. "AWSCloudProvider": {
  66. "image": os.environ.get('CB_IMAGE_AWS', 'ami-5ac2cd4d'),
  67. "instance_type": os.environ.get('CB_INSTANCE_TYPE_AWS', 't2.nano'),
  68. "placement": os.environ.get('CB_PLACEMENT_AWS', 'us-east-1a'),
  69. },
  70. "OpenStackCloudProvider": {
  71. "image": os.environ.get('CB_IMAGE_OS',
  72. 'a471339a-bd0e-41e2-9406-4f308267ed0f'),
  73. "instance_type": os.environ.get('CB_INSTANCE_TYPE_OS', 'm1.tiny'),
  74. "placement": os.environ.get('CB_PLACEMENT_OS', 'nova'),
  75. }
  76. }
  77. def get_provider_test_data(provider, key):
  78. if "AWSCloudProvider" in provider.name:
  79. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  80. elif "OpenStackCloudProvider" in provider.name:
  81. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  82. return None
  83. def create_test_network(provider, name):
  84. """
  85. Create a network with one subnet, returning the network and subnet objects.
  86. """
  87. net = provider.network.create(name=name)
  88. cidr_block = (net.cidr_block).split('/')[0] or '10.0.0.1'
  89. sn = net.create_subnet(cidr_block='{0}/28'.format(cidr_block, name=name))
  90. return net, sn
  91. def delete_test_network(network):
  92. """
  93. Delete the supplied network, first deleting any contained subnets.
  94. """
  95. with cleanup_action(lambda: network.delete()):
  96. for sn in network.subnets():
  97. sn.delete()
  98. def create_test_instance(
  99. provider, instance_name, network, zone=None, launch_config=None,
  100. key_pair=None, security_groups=None):
  101. return provider.compute.instances.create(
  102. instance_name,
  103. get_provider_test_data(provider, 'image'),
  104. get_provider_test_data(provider, 'instance_type'),
  105. network=network,
  106. zone=zone,
  107. key_pair=key_pair,
  108. security_groups=security_groups,
  109. launch_config=launch_config)
  110. def get_test_instance(provider, name, key_pair=None, security_groups=None,
  111. network=None):
  112. launch_config = None
  113. instance = create_test_instance(
  114. provider,
  115. name,
  116. network=network,
  117. key_pair=key_pair,
  118. security_groups=security_groups,
  119. launch_config=launch_config)
  120. instance.wait_till_ready()
  121. return instance
  122. def cleanup_test_resources(instance=None, network=None, security_group=None,
  123. key_pair=None):
  124. with cleanup_action(lambda: delete_test_network(network)):
  125. with cleanup_action(lambda: key_pair.delete()):
  126. with cleanup_action(lambda: security_group.delete()):
  127. instance.terminate()
  128. instance.wait_for(
  129. [InstanceState.TERMINATED, InstanceState.UNKNOWN],
  130. terminal_states=[InstanceState.ERROR])
  131. class ProviderTestBase(object):
  132. """
  133. A dummy base class for Test Cases. Does not inherit from unittest.TestCase
  134. to avoid confusing test discovery by unittest and nose2. unittest.TestCase
  135. is injected as a base class by the generator, so calling the unittest
  136. constructor works correctly.
  137. """
  138. def __init__(self, methodName, provider):
  139. unittest.TestCase.__init__(self, methodName=methodName)
  140. self.provider = provider
  141. def setUp(self):
  142. if isinstance(self.provider, TestMockHelperMixin):
  143. self.provider.setUpMock()
  144. def tearDown(self):
  145. if isinstance(self.provider, TestMockHelperMixin):
  146. self.provider.tearDownMock()
  147. class ProviderTestCaseGenerator():
  148. """
  149. Generates test cases for all provider - testcase combinations.
  150. Detailed docs at test/__init__.py
  151. """
  152. def __init__(self, test_classes):
  153. self.all_test_classes = test_classes
  154. def get_provider_wait_interval(self, provider_class):
  155. if issubclass(provider_class, TestMockHelperMixin):
  156. return 0
  157. else:
  158. return 1
  159. def create_provider_instance(self, provider_class):
  160. """
  161. Instantiate a default provider instance. All required connection
  162. settings are expected to be set as environment variables.
  163. """
  164. config = {'default_wait_interval':
  165. self.get_provider_wait_interval(provider_class)}
  166. return provider_class(config)
  167. def generate_new_test_class(self, name, testcase_class):
  168. """
  169. Generates a new type which inherits from the given testcase_class and
  170. unittest.TestCase
  171. """
  172. class_name = "{0}{1}".format(name, testcase_class.__name__)
  173. return type(class_name, (testcase_class, unittest.TestCase), {})
  174. def generate_test_suite_for_provider_testcase(
  175. self, provider_class, testcase_class):
  176. """
  177. Generate and return a suite of tests for a specific provider class and
  178. testcase combination
  179. """
  180. testloader = unittest.TestLoader()
  181. testnames = testloader.getTestCaseNames(testcase_class)
  182. suite = unittest.TestSuite()
  183. for name in testnames:
  184. generated_cls = self.generate_new_test_class(
  185. provider_class.__name__,
  186. testcase_class)
  187. suite.addTest(
  188. generated_cls(
  189. name,
  190. self.create_provider_instance(provider_class)))
  191. return suite
  192. def generate_test_suite_for_provider(self, provider_class):
  193. """
  194. Generate and return a suite of all available tests for a given provider
  195. class
  196. """
  197. suite = unittest.TestSuite()
  198. suites = [
  199. self.generate_test_suite_for_provider_testcase(
  200. provider_class, test_class)
  201. for test_class in self.all_test_classes]
  202. for s in suites:
  203. suite.addTest(s)
  204. return suite
  205. def generate_tests(self):
  206. """
  207. Generate and return a suite of tests for all provider and test class
  208. combinations
  209. """
  210. factory = CloudProviderFactory()
  211. use_mock_drivers = parse_bool(
  212. os.environ.get("CB_USE_MOCK_PROVIDERS", True))
  213. provider_name = os.environ.get("CB_TEST_PROVIDER", None)
  214. if provider_name:
  215. provider_classes = [
  216. factory.get_provider_class(
  217. provider_name,
  218. get_mock=use_mock_drivers)]
  219. if not provider_classes[0]:
  220. raise ValueError(
  221. "Could not find specified test provider %s" %
  222. provider_name)
  223. else:
  224. provider_classes = factory.get_all_provider_classes(
  225. get_mock=use_mock_drivers)
  226. suite = unittest.TestSuite()
  227. suites = [
  228. self.generate_test_suite_for_provider(p) for p in provider_classes]
  229. for s in suites:
  230. suite.addTest(s)
  231. return suite