test_security_service.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """Test cloudbridge.security modules."""
  2. from test import helpers
  3. from test.helpers import ProviderTestBase
  4. from test.helpers import standard_interface_tests as sit
  5. import cloudbridge.cloud.base.helpers as cb_helpers
  6. from cloudbridge.cloud.interfaces.resources import KeyPair
  7. from cloudbridge.cloud.interfaces.resources import TrafficDirection
  8. from cloudbridge.cloud.interfaces.resources import VMFirewall
  9. from cloudbridge.cloud.interfaces.resources import VMFirewallRule
  10. class CloudSecurityServiceTestCase(ProviderTestBase):
  11. @helpers.skipIfNoService(['security.key_pairs'])
  12. def test_crud_key_pair_service(self):
  13. def create_kp(name):
  14. return self.provider.security.key_pairs.create(name=name)
  15. def cleanup_kp(kp):
  16. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  17. def extra_tests(kp):
  18. # Recreating existing keypair should raise an exception
  19. with self.assertRaises(Exception):
  20. self.provider.security.key_pairs.create(name=kp.name)
  21. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  22. "cb-crudkp", create_kp, cleanup_kp,
  23. extra_test_func=extra_tests)
  24. @helpers.skipIfNoService(['security.key_pairs'])
  25. def test_key_pair_properties(self):
  26. name = 'cb-kpprops-{0}'.format(helpers.get_uuid())
  27. kp = self.provider.security.key_pairs.create(name=name)
  28. with helpers.cleanup_action(lambda: kp.delete()):
  29. self.assertIsNotNone(
  30. kp.material,
  31. "KeyPair material is empty but it should not be.")
  32. # get the keypair again - keypair material should now be empty
  33. kp = self.provider.security.key_pairs.get(kp.id)
  34. self.assertIsNone(kp.material,
  35. "Keypair material should now be empty")
  36. @helpers.skipIfNoService(['security.key_pairs'])
  37. def test_import_key_pair(self):
  38. name = 'cb_kpimport-{0}'.format(helpers.get_uuid())
  39. public_key, _ = cb_helpers.generate_key_pair()
  40. kp = self.provider.security.key_pairs.create(
  41. name=name, public_key_material=public_key)
  42. with helpers.cleanup_action(lambda: kp.delete()):
  43. self.assertIsNone(kp.material, "Private KeyPair material should"
  44. " be None when key is imported.")
  45. @helpers.skipIfNoService(['security.vm_firewalls'])
  46. def test_crud_vm_firewall(self):
  47. name = 'cb-crudfw-{0}'.format(helpers.get_uuid())
  48. # Declare these variables and late binding will allow
  49. # the cleanup method access to the most current values
  50. net = None
  51. def create_fw(name):
  52. return self.provider.security.vm_firewalls.create(
  53. name=name, description=name, network_id=net.id)
  54. def cleanup_fw(fw):
  55. fw.delete()
  56. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  57. network=net)):
  58. net, _ = helpers.create_test_network(self.provider, name)
  59. sit.check_crud(self, self.provider.security.vm_firewalls,
  60. VMFirewall, "cb-crudfw", create_fw, cleanup_fw)
  61. @helpers.skipIfNoService(['security.vm_firewalls'])
  62. def test_vm_firewall_properties(self):
  63. name = 'cb-propfw-{0}'.format(helpers.get_uuid())
  64. # Declare these variables and late binding will allow
  65. # the cleanup method access to the most current values
  66. net = None
  67. fw = None
  68. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  69. network=net, vm_firewall=fw)):
  70. net, _ = helpers.create_test_network(self.provider, name)
  71. fw = self.provider.security.vm_firewalls.create(
  72. name=name, description=name, network_id=net.id)
  73. self.assertEqual(name, fw.description)
  74. @helpers.skipIfNoService(['security.vm_firewalls'])
  75. def test_crud_vm_firewall_rules(self):
  76. name = 'cb-crudfw_rules-{0}'.format(helpers.get_uuid())
  77. # Declare these variables and late binding will allow
  78. # the cleanup method access to the most current values
  79. net = None
  80. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  81. network=net)):
  82. net, _ = helpers.create_test_network(self.provider, name)
  83. fw = None
  84. with helpers.cleanup_action(lambda: fw.delete()):
  85. fw = self.provider.security.vm_firewalls.create(
  86. name=name, description=name, network_id=net.id)
  87. def create_fw_rule(name):
  88. return fw.rules.create(
  89. direction=TrafficDirection.INBOUND, protocol='tcp',
  90. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  91. def cleanup_fw_rule(rule):
  92. rule.delete()
  93. sit.check_crud(self, fw.rules, VMFirewallRule, "cb-crudfwrule",
  94. create_fw_rule, cleanup_fw_rule,
  95. skip_name_check=True)
  96. @helpers.skipIfNoService(['security.vm_firewalls'])
  97. def test_vm_firewall_rule_properties(self):
  98. name = 'cb-propfwrule-{0}'.format(helpers.get_uuid())
  99. # Declare these variables and late binding will allow
  100. # the cleanup method access to the most current values
  101. net = None
  102. fw = None
  103. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  104. network=net, vm_firewall=fw)):
  105. net, _ = helpers.create_test_network(self.provider, name)
  106. fw = self.provider.security.vm_firewalls.create(
  107. name=name, description=name, network_id=net.id)
  108. rule = fw.rules.create(
  109. direction=TrafficDirection.INBOUND, protocol='tcp',
  110. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  111. self.assertEqual(rule.direction, TrafficDirection.INBOUND)
  112. self.assertEqual(rule.protocol, 'tcp')
  113. self.assertEqual(rule.from_port, 1111)
  114. self.assertEqual(rule.to_port, 1111)
  115. self.assertEqual(rule.cidr, '0.0.0.0/0')
  116. @helpers.skipIfNoService(['security.vm_firewalls'])
  117. def test_vm_firewall_rule_add_twice(self):
  118. name = 'cb-fwruletwice-{0}'.format(helpers.get_uuid())
  119. # Declare these variables and late binding will allow
  120. # the cleanup method access to the most current values
  121. net = None
  122. fw = None
  123. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  124. network=net, vm_firewall=fw)):
  125. net, _ = helpers.create_test_network(self.provider, name)
  126. fw = self.provider.security.vm_firewalls.create(
  127. name=name, description=name, network_id=net.id)
  128. rule = fw.rules.create(
  129. direction=TrafficDirection.INBOUND, protocol='tcp',
  130. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  131. # attempting to add the same rule twice should succeed
  132. same_rule = fw.rules.create(
  133. direction=TrafficDirection.INBOUND, protocol='tcp',
  134. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  135. self.assertEqual(rule, same_rule)
  136. @helpers.skipIfNoService(['security.vm_firewalls'])
  137. def test_vm_firewall_group_rule(self):
  138. name = 'cb-fwrule-{0}'.format(helpers.get_uuid())
  139. # Declare these variables and late binding will allow
  140. # the cleanup method access to the most current values
  141. net = None
  142. fw = None
  143. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  144. network=net, vm_firewall=fw)):
  145. net, _ = helpers.create_test_network(self.provider, name)
  146. fw = self.provider.security.vm_firewalls.create(
  147. name=name, description=name, network_id=net.id)
  148. rules = list(fw.rules)
  149. self.assertTrue(
  150. # TODO: This should be made consistent across all providers.
  151. # Currently, OpenStack creates two rules, one for IPV6 and
  152. # another for IPV4
  153. len(rules) >= 1, "Expected a single VM firewall rule allowing"
  154. " all outbound traffic. Got {0}.".format(rules))
  155. self.assertEqual(
  156. rules[0].direction, TrafficDirection.OUTBOUND,
  157. "Expected rule to be outbound. Got {0}.".format(rules))
  158. rule = fw.rules.create(
  159. direction=TrafficDirection.INBOUND, src_dest_fw=fw,
  160. protocol='tcp', from_port=1, to_port=65535)
  161. self.assertTrue(
  162. rule.src_dest_fw.name == name,
  163. "Expected VM firewall rule name {0}. Got {1}."
  164. .format(name, rule.src_dest_fw.name))
  165. for r in fw.rules:
  166. r.delete()
  167. fw = self.provider.security.vm_firewalls.get(fw.id) # update
  168. self.assertTrue(
  169. len(list(fw.rules)) == 0,
  170. "Deleting VMFirewallRule should delete it: {0}".format(
  171. fw.rules))
  172. fwl = self.provider.security.vm_firewalls.list()
  173. found_fw = [f for f in fwl if f.name == name]
  174. self.assertTrue(
  175. len(found_fw) == 0,
  176. "VM firewall {0} should have been deleted but still exists."
  177. .format(name))