__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import functools
  2. import operator
  3. import os
  4. import sys
  5. import unittest
  6. import uuid
  7. from cloudbridge.base import helpers as cb_helpers
  8. from cloudbridge.factory import CloudProviderFactory
  9. from cloudbridge.interfaces import CloudProvider
  10. from cloudbridge.interfaces import InstanceState
  11. from cloudbridge.interfaces import TestMockHelperMixin
  12. from cloudbridge.interfaces.resources import FloatingIpState
  13. from cloudbridge.interfaces.resources import NetworkState
  14. from cloudbridge.interfaces.resources import SubnetState
  15. def parse_bool(val):
  16. if val:
  17. return str(val).upper() in ['TRUE', 'YES']
  18. else:
  19. return False
  20. def skipIfNoService(services):
  21. """
  22. A decorator for skipping tests if the provider
  23. does not implement a given service.
  24. """
  25. def wrap(func):
  26. """
  27. The actual wrapper
  28. """
  29. @functools.wraps(func)
  30. def wrapper(self, *args, **kwargs):
  31. provider = getattr(self, 'provider')
  32. if provider:
  33. for service in services:
  34. if not provider.has_service(service):
  35. self.skipTest("Skipping test because '%s' service is"
  36. " not implemented" % (service,))
  37. func(self, *args, **kwargs)
  38. return wrapper
  39. return wrap
  40. def skipIfMockMotoVersion(spec, reason):
  41. """
  42. A decorator for skipping tests when running against the mock
  43. provider and the installed moto version matches `spec` (a
  44. PEP 440 specifier string such as ">=5.0,<5.3"). Other providers
  45. (aws / azure / gcp / openstack) always run the test, and the mock
  46. provider runs it normally when moto sits outside the broken range.
  47. """
  48. def wrap(func):
  49. @functools.wraps(func)
  50. def wrapper(self, *args, **kwargs):
  51. provider = getattr(self, 'provider', None)
  52. if provider and getattr(provider, 'PROVIDER_ID', None) == 'mock':
  53. from importlib.metadata import PackageNotFoundError
  54. from importlib.metadata import version as get_version
  55. from packaging.specifiers import SpecifierSet
  56. from packaging.version import Version
  57. try:
  58. moto_version = Version(get_version('moto'))
  59. except PackageNotFoundError:
  60. moto_version = None
  61. if moto_version and moto_version in SpecifierSet(spec):
  62. self.skipTest(
  63. "Skipping for mock/moto %s matching %s: %s"
  64. % (moto_version, spec, reason))
  65. func(self, *args, **kwargs)
  66. return wrapper
  67. return wrap
  68. def skipIfPython(op, major, minor):
  69. """
  70. A decorator for skipping tests if the python
  71. version doesn't match
  72. """
  73. def stringToOperator(op):
  74. op_map = {
  75. "=": operator.eq,
  76. "==": operator.eq,
  77. "<": operator.lt,
  78. "<=": operator.le,
  79. ">": operator.gt,
  80. ">=": operator.ge,
  81. }
  82. return op_map.get(op)
  83. def wrap(func):
  84. """
  85. The actual wrapper
  86. """
  87. @functools.wraps(func)
  88. def wrapper(self, *args, **kwargs):
  89. op_func = stringToOperator(op)
  90. if op_func(sys.version_info, (major, minor)):
  91. self.skipTest(
  92. "Skipping test because python version {0} is {1} expected"
  93. " version {2}".format(sys.version_info[:2],
  94. op, (major, minor)))
  95. func(self, *args, **kwargs)
  96. return wrapper
  97. return wrap
  98. TEST_DATA_CONFIG = {
  99. "AWSCloudProvider": {
  100. # Match the ami value with entry in custom_amis.json for use with moto
  101. "image": cb_helpers.get_env('CB_IMAGE_AWS', 'ami-aa2ea6d0'),
  102. "vm_type": cb_helpers.get_env('CB_VM_TYPE_AWS', 't2.nano'),
  103. "placement": cb_helpers.get_env('CB_PLACEMENT_AWS', 'us-east-1a'),
  104. "placement_cfg_key": "aws_zone_name"
  105. },
  106. 'OpenStackCloudProvider': {
  107. 'image': cb_helpers.get_env('CB_IMAGE_OS',
  108. 'c66bdfa1-62b1-43be-8964-e9ce208ac6a5'),
  109. "vm_type": cb_helpers.get_env('CB_VM_TYPE_OS', 'm1.tiny'),
  110. "placement": cb_helpers.get_env('CB_PLACEMENT_OS', 'nova'),
  111. "placement_cfg_key": "os_zone_name"
  112. },
  113. 'GCPCloudProvider': {
  114. 'image': cb_helpers.get_env(
  115. 'CB_IMAGE_GCP',
  116. 'https://www.googleapis.com/compute/v1/projects/ubuntu-os-cloud/'
  117. 'global/images/ubuntu-1804-bionic-v20200908'),
  118. 'vm_type': cb_helpers.get_env('CB_VM_TYPE_GCP', 'f1-micro'),
  119. 'placement': cb_helpers.get_env('GCP_ZONE_NAME', 'us-central1-a'),
  120. "placement_cfg_key": "gcp_zone_name"
  121. },
  122. "AzureCloudProvider": {
  123. "image":
  124. cb_helpers.get_env('CB_IMAGE_AZURE',
  125. 'Canonical:0001-com-ubuntu-minimal-jammy:minimal-22_04-lts-gen2:latest'),
  126. "vm_type": cb_helpers.get_env('CB_VM_TYPE_AZURE', 'Standard_DC1ds_v3'),
  127. "placement": cb_helpers.get_env('CB_PLACEMENT_AZURE', 'eastus'),
  128. "placement_cfg_key": "azure_zone_name"
  129. }
  130. }
  131. def get_provider_test_data(provider, key):
  132. provider_id = (provider.PROVIDER_ID if isinstance(provider, CloudProvider)
  133. else provider)
  134. if "aws" == provider_id:
  135. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  136. if "mock" == provider_id:
  137. return TEST_DATA_CONFIG.get("AWSCloudProvider").get(key)
  138. elif "openstack" == provider_id:
  139. return TEST_DATA_CONFIG.get("OpenStackCloudProvider").get(key)
  140. elif "gcp" == provider_id:
  141. return TEST_DATA_CONFIG.get("GCPCloudProvider").get(key)
  142. elif "azure" == provider_id:
  143. return TEST_DATA_CONFIG.get("AzureCloudProvider").get(key)
  144. return None
  145. def get_or_create_default_subnet(provider):
  146. """
  147. Return the default subnet to be used for tests
  148. """
  149. return provider.networking.subnets.get_or_create_default()
  150. def cleanup_subnet(subnet):
  151. if subnet:
  152. subnet.delete()
  153. subnet.wait_for([SubnetState.UNKNOWN],
  154. terminal_states=[SubnetState.ERROR])
  155. def cleanup_network(network):
  156. """
  157. Delete the supplied network, first deleting any contained subnets.
  158. """
  159. if network:
  160. try:
  161. for sn in network.subnets:
  162. with cb_helpers.cleanup_action(lambda: cleanup_subnet(sn)):
  163. pass
  164. finally:
  165. network.delete()
  166. network.wait_for([NetworkState.UNKNOWN],
  167. terminal_states=[NetworkState.ERROR])
  168. def cleanup_fip(fip):
  169. if fip:
  170. fip.delete()
  171. fip.wait_for([FloatingIpState.UNKNOWN],
  172. terminal_states=[FloatingIpState.ERROR])
  173. def get_test_gateway(provider):
  174. """
  175. Get an internet gateway for testing.
  176. This includes creating a network for the gateway, which is also returned.
  177. """
  178. sn = get_or_create_default_subnet(provider)
  179. net = sn.network
  180. return net.gateways.get_or_create()
  181. def cleanup_gateway(gateway):
  182. """
  183. Delete the supplied network and gateway.
  184. """
  185. with cb_helpers.cleanup_action(lambda: gateway.delete()):
  186. pass
  187. def create_test_instance(
  188. provider, instance_label, subnet, launch_config=None,
  189. key_pair=None, vm_firewalls=None, user_data=None):
  190. instance = provider.compute.instances.create(
  191. instance_label, get_provider_test_data(provider, 'image'),
  192. get_provider_test_data(provider, 'vm_type'),
  193. subnet=subnet,
  194. key_pair=key_pair,
  195. vm_firewalls=vm_firewalls,
  196. launch_config=launch_config,
  197. user_data=user_data)
  198. return instance
  199. def get_test_instance(provider, label, key_pair=None, vm_firewalls=None,
  200. subnet=None, user_data=None):
  201. launch_config = None
  202. instance = create_test_instance(
  203. provider,
  204. label,
  205. subnet=subnet,
  206. key_pair=key_pair,
  207. vm_firewalls=vm_firewalls,
  208. launch_config=launch_config,
  209. user_data=user_data)
  210. instance.wait_till_ready()
  211. return instance
  212. def get_test_fixtures_folder():
  213. return os.path.join(os.path.dirname(__file__), '../fixtures/')
  214. def delete_instance(instance):
  215. if instance:
  216. instance.delete()
  217. instance.wait_for([InstanceState.DELETED, InstanceState.UNKNOWN],
  218. terminal_states=[InstanceState.ERROR])
  219. def cleanup_test_resources(instance=None, vm_firewall=None,
  220. key_pair=None, network=None):
  221. """Clean up any combination of supplied resources."""
  222. with cb_helpers.cleanup_action(
  223. lambda: cleanup_network(network) if network else None):
  224. with cb_helpers.cleanup_action(
  225. lambda: key_pair.delete() if key_pair else None):
  226. with cb_helpers.cleanup_action(
  227. lambda: vm_firewall.delete() if vm_firewall else None):
  228. delete_instance(instance)
  229. def get_uuid():
  230. return str(uuid.uuid4())[:6]
  231. class ProviderTestBase(unittest.TestCase):
  232. _provider = None
  233. def setUp(self):
  234. if isinstance(self.provider, TestMockHelperMixin):
  235. self.provider.setUpMock()
  236. def tearDown(self):
  237. if isinstance(self.provider, TestMockHelperMixin):
  238. self.provider.tearDownMock()
  239. self._provider = None
  240. def get_provider_wait_interval(self, provider_class):
  241. if issubclass(provider_class, TestMockHelperMixin):
  242. return 0
  243. else:
  244. return 1
  245. def create_provider_instance(self):
  246. provider_name = cb_helpers.get_env("CB_TEST_PROVIDER", "aws")
  247. zone_cfg_key = get_provider_test_data(provider_name,
  248. 'placement_cfg_key')
  249. factory = CloudProviderFactory()
  250. provider_class = factory.get_provider_class(provider_name)
  251. config = {
  252. 'default_wait_interval': self.get_provider_wait_interval(
  253. provider_class),
  254. 'default_result_limit': 5,
  255. zone_cfg_key: get_provider_test_data(provider_name, 'placement')
  256. }
  257. return provider_class(config)
  258. @property
  259. def provider(self):
  260. if not self._provider:
  261. self._provider = self.create_provider_instance()
  262. return self._provider