test_security_service.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """Test cloudbridge.security modules."""
  2. from test import helpers
  3. from test.helpers import ProviderTestBase
  4. from test.helpers import standard_interface_tests as sit
  5. import cloudbridge.cloud.base.helpers as cb_helpers
  6. from cloudbridge.cloud.interfaces.exceptions import DuplicateResourceException
  7. from cloudbridge.cloud.interfaces.resources import KeyPair
  8. from cloudbridge.cloud.interfaces.resources import TrafficDirection
  9. from cloudbridge.cloud.interfaces.resources import VMFirewall
  10. from cloudbridge.cloud.interfaces.resources import VMFirewallRule
  11. class CloudSecurityServiceTestCase(ProviderTestBase):
  12. _multiprocess_can_split_ = True
  13. @helpers.skipIfNoService(['security.key_pairs'])
  14. def test_crud_key_pair_service(self):
  15. def create_kp(name):
  16. return self.provider.security.key_pairs.create(name=name)
  17. def cleanup_kp(kp):
  18. if kp:
  19. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  20. def extra_tests(kp):
  21. # Recreating existing keypair should raise an exception
  22. with self.assertRaises(DuplicateResourceException):
  23. self.provider.security.key_pairs.create(name=kp.name)
  24. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  25. "cb_crudkp", create_kp, cleanup_kp,
  26. extra_test_func=extra_tests)
  27. @helpers.skipIfNoService(['security.key_pairs'])
  28. def test_key_pair_properties(self):
  29. name = 'cb_kpprops-{0}'.format(helpers.get_uuid())
  30. kp = self.provider.security.key_pairs.create(name=name)
  31. with helpers.cleanup_action(lambda: kp.delete()):
  32. self.assertIsNotNone(
  33. kp.material,
  34. "KeyPair material is empty but it should not be.")
  35. # get the keypair again - keypair material should now be empty
  36. kp = self.provider.security.key_pairs.get(kp.id)
  37. self.assertIsNone(kp.material,
  38. "Keypair material should now be empty")
  39. @helpers.skipIfNoService(['security.key_pairs'])
  40. def test_import_key_pair(self):
  41. name = 'cb_kpimport-{0}'.format(helpers.get_uuid())
  42. public_key, _ = cb_helpers.generate_key_pair()
  43. kp = self.provider.security.key_pairs.create(
  44. name=name, public_key_material=public_key)
  45. with helpers.cleanup_action(lambda: kp.delete()):
  46. self.assertIsNone(kp.material, "Private KeyPair material should"
  47. " be None when key is imported.")
  48. @helpers.skipIfNoService(['security.vm_firewalls'])
  49. def test_crud_vm_firewall(self):
  50. name = 'cb_crudfw-{0}'.format(helpers.get_uuid())
  51. # Declare these variables and late binding will allow
  52. # the cleanup method access to the most current values
  53. net = None
  54. def create_fw(name):
  55. return self.provider.security.vm_firewalls.create(
  56. name=name, description=name, network_id=net.id)
  57. def cleanup_fw(fw):
  58. if fw:
  59. fw.delete()
  60. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  61. network=net)):
  62. net, _ = helpers.create_test_network(self.provider, name)
  63. sit.check_crud(self, self.provider.security.vm_firewalls,
  64. VMFirewall, "cb_crudfw", create_fw, cleanup_fw)
  65. @helpers.skipIfNoService(['security.vm_firewalls'])
  66. def test_vm_firewall_properties(self):
  67. name = 'cb_propfw-{0}'.format(helpers.get_uuid())
  68. # Declare these variables and late binding will allow
  69. # the cleanup method access to the most current values
  70. net = None
  71. fw = None
  72. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  73. network=net, vm_firewall=fw)):
  74. net, _ = helpers.create_test_network(self.provider, name)
  75. fw = self.provider.security.vm_firewalls.create(
  76. name=name, description=name, network_id=net.id)
  77. self.assertEqual(name, fw.description)
  78. @helpers.skipIfNoService(['security.vm_firewalls'])
  79. def test_crud_vm_firewall_rules(self):
  80. name = 'cb_crudfw_rules-{0}'.format(helpers.get_uuid())
  81. # Declare these variables and late binding will allow
  82. # the cleanup method access to the most current values
  83. net = None
  84. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  85. network=net)):
  86. net, _ = helpers.create_test_network(self.provider, name)
  87. fw = None
  88. with helpers.cleanup_action(lambda: fw.delete()):
  89. fw = self.provider.security.vm_firewalls.create(
  90. name=name, description=name, network_id=net.id)
  91. def create_fw_rule(name):
  92. return fw.rules.create(
  93. direction=TrafficDirection.INBOUND, protocol='tcp',
  94. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  95. def cleanup_fw_rule(rule):
  96. if rule:
  97. rule.delete()
  98. sit.check_crud(self, fw.rules, VMFirewallRule, "cb_crudfwrule",
  99. create_fw_rule, cleanup_fw_rule,
  100. skip_name_check=True)
  101. @helpers.skipIfNoService(['security.vm_firewalls'])
  102. def test_vm_firewall_rule_properties(self):
  103. name = 'cb_propfwrule-{0}'.format(helpers.get_uuid())
  104. # Declare these variables and late binding will allow
  105. # the cleanup method access to the most current values
  106. net = None
  107. fw = None
  108. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  109. network=net, vm_firewall=fw)):
  110. net, _ = helpers.create_test_network(self.provider, name)
  111. fw = self.provider.security.vm_firewalls.create(
  112. name=name, description=name, network_id=net.id)
  113. rule = fw.rules.create(
  114. direction=TrafficDirection.INBOUND, protocol='tcp',
  115. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  116. self.assertEqual(rule.direction, TrafficDirection.INBOUND)
  117. self.assertEqual(rule.protocol, 'tcp')
  118. self.assertEqual(rule.from_port, 1111)
  119. self.assertEqual(rule.to_port, 1111)
  120. self.assertEqual(rule.cidr, '0.0.0.0/0')
  121. @helpers.skipIfNoService(['security.vm_firewalls'])
  122. def test_vm_firewall_rule_add_twice(self):
  123. name = 'cb_fwruletwice-{0}'.format(helpers.get_uuid())
  124. # Declare these variables and late binding will allow
  125. # the cleanup method access to the most current values
  126. net = None
  127. fw = None
  128. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  129. network=net, vm_firewall=fw)):
  130. net, _ = helpers.create_test_network(self.provider, name)
  131. fw = self.provider.security.vm_firewalls.create(
  132. name=name, description=name, network_id=net.id)
  133. rule = fw.rules.create(
  134. direction=TrafficDirection.INBOUND, protocol='tcp',
  135. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  136. # attempting to add the same rule twice should succeed
  137. same_rule = fw.rules.create(
  138. direction=TrafficDirection.INBOUND, protocol='tcp',
  139. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  140. self.assertEqual(rule, same_rule)
  141. @helpers.skipIfNoService(['security.vm_firewalls'])
  142. def test_vm_firewall_group_rule(self):
  143. name = 'cb_fwrule-{0}'.format(helpers.get_uuid())
  144. # Declare these variables and late binding will allow
  145. # the cleanup method access to the most current values
  146. net = None
  147. fw = None
  148. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  149. network=net, vm_firewall=fw)):
  150. net, _ = helpers.create_test_network(self.provider, name)
  151. fw = self.provider.security.vm_firewalls.create(
  152. name=name, description=name, network_id=net.id)
  153. rules = list(fw.rules)
  154. self.assertTrue(
  155. # TODO: This should be made consistent across all providers.
  156. # Currently, OpenStack creates two rules, one for IPV6 and
  157. # another for IPV4
  158. len(rules) >= 1, "Expected a single VM firewall rule allowing"
  159. " all outbound traffic. Got {0}.".format(rules))
  160. self.assertEqual(
  161. rules[0].direction, TrafficDirection.OUTBOUND,
  162. "Expected rule to be outbound. Got {0}.".format(rules))
  163. rule = fw.rules.create(
  164. direction=TrafficDirection.INBOUND, src_dest_fw=fw,
  165. protocol='tcp', from_port=1, to_port=65535)
  166. self.assertTrue(
  167. rule.src_dest_fw.name == name,
  168. "Expected VM firewall rule name {0}. Got {1}."
  169. .format(name, rule.src_dest_fw.name))
  170. for r in fw.rules:
  171. r.delete()
  172. fw = self.provider.security.vm_firewalls.get(fw.id) # update
  173. self.assertTrue(
  174. len(list(fw.rules)) == 0,
  175. "Deleting VMFirewallRule should delete it: {0}".format(
  176. fw.rules))
  177. fwl = self.provider.security.vm_firewalls.list()
  178. found_fw = [f for f in fwl if f.name == name]
  179. self.assertTrue(
  180. len(found_fw) == 0,
  181. "VM firewall {0} should have been deleted but still exists."
  182. .format(name))