test_security_service.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """Test cloudbridge.security modules."""
  2. import cloudbridge.cloud.base.helpers as cb_helpers
  3. from cloudbridge.cloud.interfaces.exceptions import DuplicateResourceException
  4. from cloudbridge.cloud.interfaces.resources import KeyPair
  5. from cloudbridge.cloud.interfaces.resources import TrafficDirection
  6. from cloudbridge.cloud.interfaces.resources import VMFirewall
  7. from cloudbridge.cloud.interfaces.resources import VMFirewallRule
  8. from test import helpers
  9. from test.helpers import ProviderTestBase
  10. from test.helpers import standard_interface_tests as sit
  11. class CloudSecurityServiceTestCase(ProviderTestBase):
  12. _multiprocess_can_split_ = True
  13. @helpers.skipIfNoService(['security.key_pairs'])
  14. def test_crud_key_pair_service(self):
  15. def create_kp(name):
  16. return self.provider.security.key_pairs.create(name=name)
  17. def cleanup_kp(kp):
  18. if kp:
  19. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  20. def extra_tests(kp):
  21. # Recreating existing keypair should raise an exception
  22. with self.assertRaises(DuplicateResourceException):
  23. self.provider.security.key_pairs.create(name=kp.name)
  24. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  25. "cb-crudkp", create_kp, cleanup_kp,
  26. extra_test_func=extra_tests)
  27. @helpers.skipIfNoService(['security.key_pairs'])
  28. def test_key_pair_properties(self):
  29. name = 'cb_kpprops-{0}'.format(helpers.get_uuid())
  30. kp = self.provider.security.key_pairs.create(name=name)
  31. with helpers.cleanup_action(lambda: kp.delete()):
  32. self.assertIsNotNone(
  33. kp.material,
  34. "KeyPair material is empty but it should not be.")
  35. # get the keypair again - keypair material should now be empty
  36. kp = self.provider.security.key_pairs.get(kp.id)
  37. self.assertIsNone(kp.material,
  38. "Keypair material should now be empty")
  39. @helpers.skipIfNoService(['security.key_pairs'])
  40. def test_import_key_pair(self):
  41. name = 'cb_kpimport-{0}'.format(helpers.get_uuid())
  42. public_key, _ = cb_helpers.generate_key_pair()
  43. kp = self.provider.security.key_pairs.create(
  44. name=name, public_key_material=public_key)
  45. with helpers.cleanup_action(lambda: kp.delete()):
  46. self.assertIsNone(kp.material, "Private KeyPair material should"
  47. " be None when key is imported.")
  48. @helpers.skipIfNoService(['security.vm_firewalls'])
  49. def test_crud_vm_firewall(self):
  50. label = 'cb_crudfw-{0}'.format(helpers.get_uuid())
  51. # Declare these variables and late binding will allow
  52. # the cleanup method access to the most current values
  53. net = None
  54. def create_fw(label):
  55. return self.provider.security.vm_firewalls.create(
  56. label=label, description=label, network_id=net.id)
  57. def cleanup_fw(fw):
  58. if fw:
  59. fw.delete()
  60. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  61. network=net)):
  62. net, _ = helpers.create_test_network(self.provider, label)
  63. sit.check_crud(self, self.provider.security.vm_firewalls,
  64. VMFirewall, "cb-crudfw", create_fw, cleanup_fw)
  65. @helpers.skipIfNoService(['security.vm_firewalls'])
  66. def test_vm_firewall_properties(self):
  67. label = 'cb_propfw-{0}'.format(helpers.get_uuid())
  68. # Declare these variables and late binding will allow
  69. # the cleanup method access to the most current values
  70. net = None
  71. fw = None
  72. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  73. network=net, vm_firewall=fw)):
  74. net, _ = helpers.create_test_network(self.provider, label)
  75. fw = self.provider.security.vm_firewalls.create(
  76. label=label, description=label, network_id=net.id)
  77. self.assertEqual(label, fw.description)
  78. @helpers.skipIfNoService(['security.vm_firewalls'])
  79. def test_crud_vm_firewall_rules(self):
  80. label = 'cb_crudfw_rules-{0}'.format(helpers.get_uuid())
  81. # Declare these variables and late binding will allow
  82. # the cleanup method access to the most current values
  83. net = None
  84. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  85. network=net)):
  86. net, _ = helpers.create_test_network(self.provider, label)
  87. fw = None
  88. with helpers.cleanup_action(lambda: fw.delete()):
  89. fw = self.provider.security.vm_firewalls.create(
  90. label=label, description=label, network_id=net.id)
  91. def create_fw_rule(label):
  92. return fw.rules.create(
  93. direction=TrafficDirection.INBOUND, protocol='tcp',
  94. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  95. def cleanup_fw_rule(rule):
  96. if rule:
  97. rule.delete()
  98. sit.check_crud(self, fw.rules, VMFirewallRule, "cb-crudfwrule",
  99. create_fw_rule, cleanup_fw_rule,
  100. skip_label_check=True)
  101. @helpers.skipIfNoService(['security.vm_firewalls'])
  102. def test_vm_firewall_rule_properties(self):
  103. label = 'cb_propfwrule-{0}'.format(helpers.get_uuid())
  104. # Declare these variables and late binding will allow
  105. # the cleanup method access to the most current values
  106. net = None
  107. fw = None
  108. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  109. network=net, vm_firewall=fw)):
  110. net, _ = helpers.create_test_network(self.provider, label)
  111. fw = self.provider.security.vm_firewalls.create(
  112. label=label, description=label, network_id=net.id)
  113. rule = fw.rules.create(
  114. direction=TrafficDirection.INBOUND, protocol='tcp',
  115. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  116. self.assertEqual(rule.direction, TrafficDirection.INBOUND)
  117. self.assertEqual(rule.protocol, 'tcp')
  118. self.assertEqual(rule.from_port, 1111)
  119. self.assertEqual(rule.to_port, 1111)
  120. self.assertEqual(rule.cidr, '0.0.0.0/0')
  121. @helpers.skipIfNoService(['security.vm_firewalls'])
  122. def test_vm_firewall_rule_add_twice(self):
  123. label = 'cb_fwruletwice-{0}'.format(helpers.get_uuid())
  124. # Declare these variables and late binding will allow
  125. # the cleanup method access to the most current values
  126. net = None
  127. fw = None
  128. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  129. network=net, vm_firewall=fw)):
  130. net, _ = helpers.create_test_network(self.provider, label)
  131. fw = self.provider.security.vm_firewalls.create(
  132. label=label, description=label, network_id=net.id)
  133. rule = fw.rules.create(
  134. direction=TrafficDirection.INBOUND, protocol='tcp',
  135. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  136. # attempting to add the same rule twice should succeed
  137. same_rule = fw.rules.create(
  138. direction=TrafficDirection.INBOUND, protocol='tcp',
  139. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  140. self.assertEqual(rule, same_rule)
  141. @helpers.skipIfNoService(['security.vm_firewalls'])
  142. def test_vm_firewall_group_rule(self):
  143. label = 'cb_fwrule-{0}'.format(helpers.get_uuid())
  144. # Declare these variables and late binding will allow
  145. # the cleanup method access to the most current values
  146. net = None
  147. fw = None
  148. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  149. network=net, vm_firewall=fw)):
  150. net, _ = helpers.create_test_network(self.provider, label)
  151. fw = self.provider.security.vm_firewalls.create(
  152. label=label, description=label, network_id=net.id)
  153. rules = list(fw.rules)
  154. self.assertTrue(
  155. # TODO: This should be made consistent across all providers.
  156. # Currently, OpenStack creates two rules, one for IPV6 and
  157. # another for IPV4
  158. len(rules) >= 1, "Expected a single VM firewall rule allowing"
  159. " all outbound traffic. Got {0}.".format(rules))
  160. self.assertEqual(
  161. rules[0].direction, TrafficDirection.OUTBOUND,
  162. "Expected rule to be outbound. Got {0}.".format(rules))
  163. rule = fw.rules.create(
  164. direction=TrafficDirection.INBOUND, src_dest_fw=fw,
  165. protocol='tcp', from_port=1, to_port=65535)
  166. self.assertTrue(
  167. rule.src_dest_fw.label == label,
  168. "Expected VM firewall rule label {0}. Got {1}."
  169. .format(label, rule.src_dest_fw.label))
  170. for r in fw.rules:
  171. r.delete()
  172. fw = self.provider.security.vm_firewalls.get(fw.id) # update
  173. self.assertTrue(
  174. len(list(fw.rules)) == 0,
  175. "Deleting VMFirewallRule should delete it: {0}".format(
  176. fw.rules))
  177. fwl = self.provider.security.vm_firewalls.list()
  178. found_fw = [f for f in fwl if f.label == label]
  179. self.assertTrue(
  180. len(found_fw) == 0,
  181. "VM firewall {0} should have been deleted but still exists."
  182. .format(label))