helpers.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. from contextlib import contextmanager
  2. import os
  3. import sys
  4. import unittest
  5. from six import reraise
  6. from cloudbridge.cloud.factory import CloudProviderFactory
  7. def parse_bool(val):
  8. if val:
  9. return str(val).upper() in ['TRUE', 'YES']
  10. else:
  11. return False
  12. @contextmanager
  13. def cleanup_action(cleanup_func):
  14. """
  15. Context manager to carry out a given
  16. cleanup action after carrying out a set
  17. of tasks, or when an exception occurs.
  18. If any errors occur during the cleanup
  19. action, those are ignored, and the original
  20. traceback is preserved.
  21. :params func: This function is called if
  22. an exception occurs or at the end of the
  23. context block. If any exceptions raised
  24. by func are ignored.
  25. Usage:
  26. with cleanup_action(lambda e: print("Oops!")):
  27. do_something()
  28. """
  29. try:
  30. yield
  31. except Exception:
  32. ex_class, ex_val, ex_traceback = sys.exc_info()
  33. try:
  34. cleanup_func()
  35. except Exception as e:
  36. print("Error during exception cleanup: {0}".format(e))
  37. reraise(ex_class, ex_val, ex_traceback)
  38. cleanup_func()
  39. TEST_DATA_CONFIG = {
  40. "AWSCloudProvider": {
  41. "image": os.environ.get('CB_IMAGE_AWS', 'ami-d85e75b0'),
  42. "instance_type": os.environ.get('CB_INSTANCE_TYPE_AWS',
  43. 't1.micro'),
  44. "placement": os.environ.get('CB_PLACEMENT_AWS', 'us-east-1a'),
  45. },
  46. "OpenStackCloudProvider": {
  47. "image": os.environ.get('CB_IMAGE_OS',
  48. 'eaf2fa40-25a3-44cd-b2f1-40de42a62154'),
  49. "instance_type": os.environ.get('CB_INSTANCE_TYPE_OS', 'm1.tiny'),
  50. "placement": os.environ.get('CB_PLACEMENT_OS', 'nova'),
  51. }
  52. }
  53. def get_provider_test_data(provider, key):
  54. if "AWSCloudProvider" in provider.name:
  55. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  56. elif "OpenStackCloudProvider" in provider.name:
  57. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  58. return None
  59. def create_test_instance(
  60. provider, instance_name, zone=None, launch_config=None,
  61. keypair=None, security_groups=None):
  62. return provider.compute.instances.create(
  63. instance_name,
  64. get_provider_test_data(provider, 'image'),
  65. get_provider_test_data(provider, 'instance_type'),
  66. zone=zone,
  67. keypair=keypair,
  68. security_groups=security_groups,
  69. launch_config=launch_config)
  70. def get_provider_wait_interval(provider):
  71. if isinstance(provider, TestMockHelperMixin):
  72. return 0
  73. else:
  74. return 1
  75. def get_test_instance(provider, name, keypair=None, security_groups=None):
  76. instance = create_test_instance(
  77. provider,
  78. name,
  79. keypair=keypair,
  80. security_groups=security_groups)
  81. instance.wait_till_ready(interval=get_provider_wait_interval(provider))
  82. return instance
  83. class TestMockHelperMixin(object):
  84. """
  85. A helper class that providers mock drivers can use to be notified when a
  86. test setup/teardown occurs. This is useful when activating libraries
  87. like HTTPretty which take over socket communications.
  88. """
  89. def setUpMock(self):
  90. """
  91. Called before a test is started.
  92. """
  93. raise NotImplementedError(
  94. 'TestMockHelperMixin.setUpMock not implemented')
  95. def tearDownMock(self):
  96. """
  97. Called before test teardown.
  98. """
  99. raise NotImplementedError(
  100. 'TestMockHelperMixin.tearDownMock not implemented by this'
  101. ' provider')
  102. class ProviderTestBase(object):
  103. """
  104. A dummy base class for Test Cases. Does not inherit from unittest.TestCase
  105. to avoid confusing test discovery by unittest and nose2. unittest.TestCase
  106. is injected as a base class by the generator, so calling the unittest
  107. constructor works correctly.
  108. """
  109. def __init__(self, methodName, provider):
  110. unittest.TestCase.__init__(self, methodName=methodName)
  111. self.provider = provider
  112. def setUp(self):
  113. if isinstance(self.provider, TestMockHelperMixin):
  114. self.provider.setUpMock()
  115. def tearDown(self):
  116. if isinstance(self.provider, TestMockHelperMixin):
  117. self.provider.tearDownMock()
  118. def get_test_wait_interval(self):
  119. return get_provider_wait_interval(self.provider)
  120. class ProviderTestCaseGenerator():
  121. """
  122. Generates test cases for all provider - testcase combinations.
  123. Detailed docs at test/__init__.py
  124. """
  125. def __init__(self, test_classes):
  126. self.all_test_classes = test_classes
  127. def create_provider_instance(self, provider_class):
  128. """
  129. Instantiate a default provider instance. All required connection
  130. settings are expected to be set as environment variables.
  131. """
  132. return provider_class({})
  133. def generate_new_test_class(self, name, testcase_class):
  134. """
  135. Generates a new type which inherits from the given testcase_class and
  136. unittest.TestCase
  137. """
  138. class_name = "{0}{1}".format(name, testcase_class.__name__)
  139. return type(class_name, (testcase_class, unittest.TestCase), {})
  140. def generate_test_suite_for_provider_testcase(
  141. self, provider_class, testcase_class):
  142. """
  143. Generate and return a suite of tests for a specific provider class and
  144. testcase combination
  145. """
  146. testloader = unittest.TestLoader()
  147. testnames = testloader.getTestCaseNames(testcase_class)
  148. suite = unittest.TestSuite()
  149. for name in testnames:
  150. generated_cls = self.generate_new_test_class(
  151. provider_class.__name__,
  152. testcase_class)
  153. suite.addTest(
  154. generated_cls(
  155. name,
  156. self.create_provider_instance(provider_class)))
  157. return suite
  158. def generate_test_suite_for_provider(self, provider_class):
  159. """
  160. Generate and return a suite of all available tests for a given provider
  161. class
  162. """
  163. suite = unittest.TestSuite()
  164. suites = [
  165. self.generate_test_suite_for_provider_testcase(
  166. provider_class, test_class)
  167. for test_class in self.all_test_classes]
  168. for s in suites:
  169. suite.addTest(s)
  170. return suite
  171. def generate_tests(self):
  172. """
  173. Generate and return a suite of tests for all provider and test class
  174. combinations
  175. """
  176. factory = CloudProviderFactory()
  177. use_mock_drivers = parse_bool(
  178. os.environ.get("CB_USE_MOCK_DRIVERS", True))
  179. provider_name = os.environ.get("CB_TEST_PROVIDER", None)
  180. if provider_name:
  181. provider_classes = [
  182. factory.get_provider_class(
  183. provider_name,
  184. get_mock=use_mock_drivers)]
  185. if not provider_classes[0]:
  186. raise ValueError(
  187. "Could not find specified test provider %s" %
  188. provider_name)
  189. else:
  190. provider_classes = factory.get_all_provider_classes(
  191. get_mock=use_mock_drivers)
  192. suite = unittest.TestSuite()
  193. suites = [
  194. self.generate_test_suite_for_provider(p) for p in provider_classes]
  195. for s in suites:
  196. suite.addTest(s)
  197. return suite