helpers.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. from contextlib import contextmanager
  2. import os
  3. import sys
  4. import unittest
  5. import functools
  6. from six import reraise
  7. from cloudbridge.cloud.factory import CloudProviderFactory
  8. from cloudbridge.cloud.interfaces import InstanceState
  9. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  10. def parse_bool(val):
  11. if val:
  12. return str(val).upper() in ['TRUE', 'YES']
  13. else:
  14. return False
  15. @contextmanager
  16. def cleanup_action(cleanup_func):
  17. """
  18. Context manager to carry out a given
  19. cleanup action after carrying out a set
  20. of tasks, or when an exception occurs.
  21. If any errors occur during the cleanup
  22. action, those are ignored, and the original
  23. traceback is preserved.
  24. :params func: This function is called if
  25. an exception occurs or at the end of the
  26. context block. If any exceptions raised
  27. by func are ignored.
  28. Usage:
  29. with cleanup_action(lambda e: print("Oops!")):
  30. do_something()
  31. """
  32. try:
  33. yield
  34. except Exception:
  35. ex_class, ex_val, ex_traceback = sys.exc_info()
  36. try:
  37. cleanup_func()
  38. except Exception as e:
  39. print("Error during exception cleanup: {0}".format(e))
  40. reraise(ex_class, ex_val, ex_traceback)
  41. try:
  42. cleanup_func()
  43. except Exception as e:
  44. print("Error during cleanup: {0}".format(e))
  45. def skipIfNoService(services):
  46. """
  47. A decorator for skipping tests if the provider
  48. does not implement a given service.
  49. """
  50. def wrap(func):
  51. """
  52. The actual wrapper
  53. """
  54. @functools.wraps(func)
  55. def wrapper(self, *args, **kwargs):
  56. provider = getattr(self, 'provider')
  57. if provider:
  58. for service in services:
  59. if not provider.has_service(service):
  60. self.skipTest("Skipping test because '%s' service is"
  61. " not implemented" % (service,))
  62. func(self, *args, **kwargs)
  63. return wrapper
  64. return wrap
  65. TEST_DATA_CONFIG = {
  66. "AWSCloudProvider": {
  67. "image": os.environ.get('CB_IMAGE_AWS', 'ami-5ac2cd4d'),
  68. "instance_type": os.environ.get('CB_INSTANCE_TYPE_AWS', 't2.nano'),
  69. "placement": os.environ.get('CB_PLACEMENT_AWS', 'us-east-1a'),
  70. },
  71. "OpenStackCloudProvider": {
  72. "image": os.environ.get('CB_IMAGE_OS',
  73. '842b949c-ea76-48df-998d-8a41f2626243'),
  74. "instance_type": os.environ.get('CB_INSTANCE_TYPE_OS', 'm1.tiny'),
  75. "placement": os.environ.get('CB_PLACEMENT_OS', 'zone-r6'),
  76. }
  77. }
  78. def get_provider_test_data(provider, key):
  79. if "AWSCloudProvider" in provider.name:
  80. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  81. elif "OpenStackCloudProvider" in provider.name:
  82. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  83. return None
  84. def create_test_network(provider, name):
  85. """
  86. Create a network with one subnet, returning the network and subnet objects.
  87. """
  88. net = provider.network.create(name=name)
  89. cidr_block = (net.cidr_block).split('/')[0] or '10.0.0.1'
  90. sn = net.create_subnet(cidr_block='{0}/28'.format(cidr_block), name=name,
  91. zone=get_provider_test_data(provider, 'placement'))
  92. return net, sn
  93. def delete_test_network(network):
  94. """
  95. Delete the supplied network, first deleting any contained subnets.
  96. """
  97. with cleanup_action(lambda: network.delete()):
  98. for sn in network.subnets():
  99. sn.delete()
  100. def create_test_instance(
  101. provider, instance_name, subnet, zone=None, launch_config=None,
  102. key_pair=None, security_groups=None):
  103. return provider.compute.instances.create(
  104. instance_name,
  105. get_provider_test_data(provider, 'image'),
  106. get_provider_test_data(provider, 'instance_type'),
  107. subnet=subnet,
  108. zone=zone,
  109. key_pair=key_pair,
  110. security_groups=security_groups,
  111. launch_config=launch_config)
  112. def get_test_instance(provider, name, key_pair=None, security_groups=None,
  113. subnet=None):
  114. launch_config = None
  115. instance = create_test_instance(
  116. provider,
  117. name,
  118. subnet=subnet,
  119. key_pair=key_pair,
  120. security_groups=security_groups,
  121. launch_config=launch_config)
  122. instance.wait_till_ready()
  123. return instance
  124. def cleanup_test_resources(instance=None, network=None, security_group=None,
  125. key_pair=None):
  126. with cleanup_action(lambda: delete_test_network(network)
  127. if network else None):
  128. with cleanup_action(lambda: key_pair.delete() if key_pair else None):
  129. with cleanup_action(lambda: security_group.delete()
  130. if security_group else None):
  131. instance.terminate()
  132. instance.wait_for(
  133. [InstanceState.TERMINATED, InstanceState.UNKNOWN],
  134. terminal_states=[InstanceState.ERROR])
  135. class ProviderTestBase(object):
  136. """
  137. A dummy base class for Test Cases. Does not inherit from unittest.TestCase
  138. to avoid confusing test discovery by unittest and nose2. unittest.TestCase
  139. is injected as a base class by the generator, so calling the unittest
  140. constructor works correctly.
  141. """
  142. def __init__(self, methodName, provider):
  143. unittest.TestCase.__init__(self, methodName=methodName)
  144. self.provider = provider
  145. def setUp(self):
  146. if isinstance(self.provider, TestMockHelperMixin):
  147. self.provider.setUpMock()
  148. def tearDown(self):
  149. if isinstance(self.provider, TestMockHelperMixin):
  150. self.provider.tearDownMock()
  151. class ProviderTestCaseGenerator():
  152. """
  153. Generates test cases for all provider - testcase combinations.
  154. Detailed docs at test/__init__.py
  155. """
  156. def __init__(self, test_classes):
  157. self.all_test_classes = test_classes
  158. def get_provider_wait_interval(self, provider_class):
  159. if issubclass(provider_class, TestMockHelperMixin):
  160. return 0
  161. else:
  162. return 1
  163. def create_provider_instance(self, provider_class):
  164. """
  165. Instantiate a default provider instance. All required connection
  166. settings are expected to be set as environment variables.
  167. """
  168. config = {'default_wait_interval':
  169. self.get_provider_wait_interval(provider_class)}
  170. return provider_class(config)
  171. def generate_new_test_class(self, name, testcase_class):
  172. """
  173. Generates a new type which inherits from the given testcase_class and
  174. unittest.TestCase
  175. """
  176. class_name = "{0}{1}".format(name, testcase_class.__name__)
  177. return type(class_name, (testcase_class, unittest.TestCase), {})
  178. def generate_test_suite_for_provider_testcase(
  179. self, provider_class, testcase_class):
  180. """
  181. Generate and return a suite of tests for a specific provider class and
  182. testcase combination
  183. """
  184. testloader = unittest.TestLoader()
  185. testnames = testloader.getTestCaseNames(testcase_class)
  186. suite = unittest.TestSuite()
  187. for name in testnames:
  188. generated_cls = self.generate_new_test_class(
  189. provider_class.__name__,
  190. testcase_class)
  191. suite.addTest(
  192. generated_cls(
  193. name,
  194. self.create_provider_instance(provider_class)))
  195. return suite
  196. def generate_test_suite_for_provider(self, provider_class):
  197. """
  198. Generate and return a suite of all available tests for a given provider
  199. class
  200. """
  201. suite = unittest.TestSuite()
  202. suites = [
  203. self.generate_test_suite_for_provider_testcase(
  204. provider_class, test_class)
  205. for test_class in self.all_test_classes]
  206. for s in suites:
  207. suite.addTest(s)
  208. return suite
  209. def generate_tests(self):
  210. """
  211. Generate and return a suite of tests for all provider and test class
  212. combinations
  213. """
  214. factory = CloudProviderFactory()
  215. use_mock_drivers = parse_bool(
  216. os.environ.get("CB_USE_MOCK_PROVIDERS", True))
  217. provider_name = os.environ.get("CB_TEST_PROVIDER", None)
  218. if provider_name:
  219. provider_classes = [
  220. factory.get_provider_class(
  221. provider_name,
  222. get_mock=use_mock_drivers)]
  223. if not provider_classes[0]:
  224. raise ValueError(
  225. "Could not find specified test provider %s" %
  226. provider_name)
  227. else:
  228. provider_classes = factory.get_all_provider_classes(
  229. get_mock=use_mock_drivers)
  230. suite = unittest.TestSuite()
  231. suites = [
  232. self.generate_test_suite_for_provider(p) for p in provider_classes]
  233. for s in suites:
  234. suite.addTest(s)
  235. return suite