2
0

test_security_service.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. """Test cloudbridge.security modules."""
  2. import cloudbridge.base.helpers as cb_helpers
  3. from cloudbridge.interfaces.exceptions import DuplicateResourceException
  4. from cloudbridge.interfaces.resources import KeyPair
  5. from cloudbridge.interfaces.resources import TrafficDirection
  6. from cloudbridge.interfaces.resources import VMFirewall
  7. from cloudbridge.interfaces.resources import VMFirewallRule
  8. from tests import helpers
  9. from tests.helpers import ProviderTestBase
  10. from tests.helpers import standard_interface_tests as sit
  11. class CloudSecurityServiceTestCase(ProviderTestBase):
  12. _multiprocess_can_split_ = True
  13. @helpers.skipIfNoService(['security.vm_firewalls'])
  14. def test_storage_services_event_pattern(self):
  15. self.assertEqual(
  16. self.provider.security.key_pairs.
  17. _service_event_pattern,
  18. "provider.security.key_pairs",
  19. "Event pattern for {} service should be '{}', "
  20. "but found '{}'.".format("key_pairs",
  21. "provider.security.key_pairs",
  22. self.provider.security.
  23. key_pairs.
  24. _service_event_pattern))
  25. self.assertEqual(
  26. self.provider.security.vm_firewalls._service_event_pattern,
  27. "provider.security.vm_firewalls",
  28. "Event pattern for {} service should be '{}', "
  29. "but found '{}'.".format("vm_firewalls",
  30. "provider.security.vm_firewalls",
  31. self.provider.security.vm_firewalls.
  32. _service_event_pattern))
  33. @helpers.skipIfNoService(['security.key_pairs'])
  34. def test_crud_key_pair_service(self):
  35. def create_kp(name):
  36. return self.provider.security.key_pairs.create(name=name)
  37. def cleanup_kp(kp):
  38. if kp:
  39. self.provider.security.key_pairs.delete(kp.id)
  40. def extra_tests(kp):
  41. # Recreating existing keypair should raise an exception
  42. with self.assertRaises(DuplicateResourceException):
  43. self.provider.security.key_pairs.create(name=kp.name)
  44. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  45. "cb-crudkp", create_kp, cleanup_kp,
  46. extra_test_func=extra_tests)
  47. @helpers.skipIfNoService(['security.key_pairs'])
  48. def test_key_pair_properties(self):
  49. name = 'cb-kpprops-{0}'.format(helpers.get_uuid())
  50. kp = self.provider.security.key_pairs.create(name=name)
  51. with cb_helpers.cleanup_action(lambda: kp.delete()):
  52. self.assertIsNotNone(
  53. kp.material,
  54. "KeyPair material is empty but it should not be.")
  55. # get the keypair again - keypair material should now be empty
  56. kp = self.provider.security.key_pairs.get(kp.id)
  57. self.assertIsNone(kp.material,
  58. "Keypair material should now be empty")
  59. @helpers.skipIfNoService(['security.key_pairs'])
  60. def test_import_key_pair(self):
  61. name = 'cb-kpimport-{0}'.format(helpers.get_uuid())
  62. public_key, _ = cb_helpers.generate_key_pair()
  63. kp = self.provider.security.key_pairs.create(
  64. name=name, public_key_material=public_key)
  65. with cb_helpers.cleanup_action(lambda: kp.delete()):
  66. self.assertIsNone(kp.material, "Private KeyPair material should"
  67. " be None when key is imported.")
  68. @helpers.skipIfNoService(['security.vm_firewalls'])
  69. def test_crud_vm_firewall(self):
  70. subnet = helpers.get_or_create_default_subnet(self.provider)
  71. net = subnet.network
  72. def create_fw(label):
  73. return self.provider.security.vm_firewalls.create(
  74. label=label, description=label, network=net.id)
  75. def cleanup_fw(fw):
  76. if fw:
  77. fw.delete()
  78. def network_id_test(fw):
  79. # Checking that the network ID is returned correctly
  80. self.assertEqual(fw.network_id, net.id)
  81. sit.check_crud(self, self.provider.security.vm_firewalls,
  82. VMFirewall, "cb-crudfw", create_fw, cleanup_fw,
  83. extra_test_func=network_id_test)
  84. @helpers.skipIfNoService(['security.vm_firewalls'])
  85. def test_vm_firewall_properties(self):
  86. label = 'cb-propfw-{0}'.format(helpers.get_uuid())
  87. # Declare these variables and late binding will allow
  88. # the cleanup method access to the most current values
  89. fw = None
  90. with cb_helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  91. vm_firewall=fw)):
  92. subnet = helpers.get_or_create_default_subnet(self.provider)
  93. net = subnet.network
  94. fw = self.provider.security.vm_firewalls.create(
  95. label=label, description=label, network=net.id)
  96. self.assertEqual(label, fw.description)
  97. @helpers.skipIfNoService(['security.vm_firewalls'])
  98. def test_crud_vm_firewall_rules(self):
  99. label = 'cb-crudfw-rules-{0}'.format(helpers.get_uuid())
  100. subnet = helpers.get_or_create_default_subnet(self.provider)
  101. net = subnet.network
  102. fw = None
  103. with cb_helpers.cleanup_action(lambda: fw.delete()):
  104. fw = self.provider.security.vm_firewalls.create(
  105. label=label, description=label, network=net.id)
  106. def create_fw_rule(label):
  107. return fw.rules.create(
  108. direction=TrafficDirection.INBOUND, protocol='tcp',
  109. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  110. def cleanup_fw_rule(rule):
  111. if rule:
  112. rule.delete()
  113. sit.check_crud(self, fw.rules, VMFirewallRule, "cb-crudfwrule",
  114. create_fw_rule, cleanup_fw_rule,
  115. skip_name_check=True)
  116. @helpers.skipIfNoService(['security.vm_firewalls'])
  117. def test_vm_firewall_rule_properties(self):
  118. label = 'cb-propfwrule-{0}'.format(helpers.get_uuid())
  119. # Declare these variables and late binding will allow
  120. # the cleanup method access to the most current values
  121. fw = None
  122. with cb_helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  123. vm_firewall=fw)):
  124. subnet = helpers.get_or_create_default_subnet(self.provider)
  125. net = subnet.network
  126. fw = self.provider.security.vm_firewalls.create(
  127. label=label, description=label, network=net.id)
  128. rule = fw.rules.create(
  129. direction=TrafficDirection.INBOUND, protocol='tcp',
  130. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  131. self.assertEqual(rule.direction, TrafficDirection.INBOUND)
  132. self.assertEqual(rule.protocol, 'tcp')
  133. self.assertEqual(rule.from_port, 1111)
  134. self.assertEqual(rule.to_port, 1111)
  135. self.assertEqual(rule.cidr, '0.0.0.0/0')
  136. @helpers.skipIfNoService(['security.vm_firewalls'])
  137. def test_vm_firewall_rule_add_twice(self):
  138. label = 'cb-fwruletwice-{0}'.format(helpers.get_uuid())
  139. # Declare these variables and late binding will allow
  140. # the cleanup method access to the most current values
  141. fw = None
  142. with cb_helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  143. vm_firewall=fw)):
  144. subnet = helpers.get_or_create_default_subnet(self.provider)
  145. net = subnet.network
  146. fw = self.provider.security.vm_firewalls.create(
  147. label=label, description=label, network=net.id)
  148. rule = fw.rules.create(
  149. direction=TrafficDirection.INBOUND, protocol='tcp',
  150. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  151. # attempting to add the same rule twice should succeed
  152. same_rule = fw.rules.create(
  153. direction=TrafficDirection.INBOUND, protocol='tcp',
  154. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  155. self.assertEqual(rule, same_rule)
  156. @helpers.skipIfNoService(['security.vm_firewalls'])
  157. def test_vm_firewall_group_rule(self):
  158. label = 'cb-fwrule-{0}'.format(helpers.get_uuid())
  159. # Declare these variables and late binding will allow
  160. # the cleanup method access to the most current values
  161. fw = None
  162. with cb_helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  163. vm_firewall=fw)):
  164. subnet = helpers.get_or_create_default_subnet(self.provider)
  165. net = subnet.network
  166. fw = self.provider.security.vm_firewalls.create(
  167. label=label, description=label, network=net.id)
  168. rule = fw.rules.create(
  169. direction=TrafficDirection.INBOUND, src_dest_fw=fw,
  170. protocol='tcp', from_port=1, to_port=65535)
  171. self.assertTrue(
  172. rule.src_dest_fw.label == fw.label,
  173. "Expected VM firewall rule label {0}. Got {1}."
  174. .format(fw.label, rule.src_dest_fw.label))
  175. for r in fw.rules:
  176. r.delete()
  177. fw = self.provider.security.vm_firewalls.get(fw.id) # update
  178. self.assertTrue(
  179. len(list(fw.rules)) == 0,
  180. "Deleting VMFirewallRule should delete it: {0}".format(
  181. fw.rules))
  182. fwl = self.provider.security.vm_firewalls.list()
  183. found_fw = [f for f in fwl if f.label == label]
  184. self.assertTrue(
  185. len(found_fw) == 0,
  186. "VM firewall {0} should have been deleted but still exists."
  187. .format(label))