test_security_service.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """Test cloudbridge.security modules."""
  2. from test import helpers
  3. from test.helpers import ProviderTestBase
  4. from test.helpers import standard_interface_tests as sit
  5. from cloudbridge.cloud.interfaces.resources import KeyPair
  6. from cloudbridge.cloud.interfaces.resources import TrafficDirection
  7. from cloudbridge.cloud.interfaces.resources import VMFirewall
  8. from cloudbridge.cloud.interfaces.resources import VMFirewallRule
  9. class CloudSecurityServiceTestCase(ProviderTestBase):
  10. @helpers.skipIfNoService(['security.key_pairs'])
  11. def test_crud_key_pair_service(self):
  12. def create_kp(name):
  13. return self.provider.security.key_pairs.create(name=name)
  14. def cleanup_kp(kp):
  15. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  16. def extra_tests(kp):
  17. # Recreating existing keypair should raise an exception
  18. with self.assertRaises(Exception):
  19. self.provider.security.key_pairs.create(name=kp.name)
  20. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  21. "cb_crudkp", create_kp, cleanup_kp,
  22. extra_test_func=extra_tests)
  23. @helpers.skipIfNoService(['security.key_pairs'])
  24. def test_key_pair_properties(self):
  25. name = 'cb_kpprops-{0}'.format(helpers.get_uuid())
  26. kp = self.provider.security.key_pairs.create(name=name)
  27. with helpers.cleanup_action(lambda: kp.delete()):
  28. self.assertIsNotNone(
  29. kp.material,
  30. "KeyPair material is empty but it should not be.")
  31. # get the keypair again - keypair material should now be empty
  32. kp = self.provider.security.key_pairs.get(kp.id)
  33. self.assertIsNone(kp.material,
  34. "Keypair material should now be empty")
  35. @helpers.skipIfNoService(['security.vm_firewalls'])
  36. def test_crud_vm_firewall(self):
  37. name = 'cb_crudfw-{0}'.format(helpers.get_uuid())
  38. # Declare these variables and late binding will allow
  39. # the cleanup method access to the most current values
  40. net = None
  41. def create_fw(name):
  42. return self.provider.security.vm_firewalls.create(
  43. name=name, description=name, network_id=net.id)
  44. def cleanup_fw(fw):
  45. fw.delete()
  46. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  47. network=net)):
  48. net, _ = helpers.create_test_network(self.provider, name)
  49. sit.check_crud(self, self.provider.security.vm_firewalls,
  50. VMFirewall, "cb_crudfw", create_fw, cleanup_fw)
  51. @helpers.skipIfNoService(['security.vm_firewalls'])
  52. def test_vm_firewall_properties(self):
  53. name = 'cb_propfw-{0}'.format(helpers.get_uuid())
  54. # Declare these variables and late binding will allow
  55. # the cleanup method access to the most current values
  56. net = None
  57. fw = None
  58. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  59. network=net, vm_firewall=fw)):
  60. net, _ = helpers.create_test_network(self.provider, name)
  61. fw = self.provider.security.vm_firewalls.create(
  62. name=name, description=name, network_id=net.id)
  63. self.assertEqual(name, fw.description)
  64. @helpers.skipIfNoService(['security.vm_firewalls'])
  65. def test_crud_vm_firewall_rules(self):
  66. name = 'cb_crudfw_rules-{0}'.format(helpers.get_uuid())
  67. # Declare these variables and late binding will allow
  68. # the cleanup method access to the most current values
  69. net = None
  70. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  71. network=net)):
  72. net, _ = helpers.create_test_network(self.provider, name)
  73. fw = None
  74. with helpers.cleanup_action(lambda: fw.delete()):
  75. fw = self.provider.security.vm_firewalls.create(
  76. name=name, description=name, network_id=net.id)
  77. def create_fw_rule(name):
  78. return fw.rules.create(
  79. direction=TrafficDirection.INBOUND, protocol='tcp',
  80. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  81. def cleanup_fw_rule(rule):
  82. rule.delete()
  83. sit.check_crud(self, fw.rules, VMFirewallRule, "cb_crudfwrule",
  84. create_fw_rule, cleanup_fw_rule,
  85. skip_name_check=True)
  86. @helpers.skipIfNoService(['security.vm_firewalls'])
  87. def test_vm_firewall_rule_properties(self):
  88. name = 'cb_propfwrule-{0}'.format(helpers.get_uuid())
  89. # Declare these variables and late binding will allow
  90. # the cleanup method access to the most current values
  91. net = None
  92. fw = None
  93. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  94. network=net, vm_firewall=fw)):
  95. net, _ = helpers.create_test_network(self.provider, name)
  96. fw = self.provider.security.vm_firewalls.create(
  97. name=name, description=name, network_id=net.id)
  98. rule = fw.rules.create(
  99. direction=TrafficDirection.INBOUND, protocol='tcp',
  100. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  101. self.assertEqual(rule.direction, TrafficDirection.INBOUND)
  102. self.assertEqual(rule.protocol, 'tcp')
  103. self.assertEqual(rule.from_port, 1111)
  104. self.assertEqual(rule.to_port, 1111)
  105. self.assertEqual(rule.cidr, '0.0.0.0/0')
  106. @helpers.skipIfNoService(['security.vm_firewalls'])
  107. def test_vm_firewall_rule_add_twice(self):
  108. name = 'cb_fwruletwice-{0}'.format(helpers.get_uuid())
  109. # Declare these variables and late binding will allow
  110. # the cleanup method access to the most current values
  111. net = None
  112. fw = None
  113. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  114. network=net, vm_firewall=fw)):
  115. net, _ = helpers.create_test_network(self.provider, name)
  116. fw = self.provider.security.vm_firewalls.create(
  117. name=name, description=name, network_id=net.id)
  118. rule = fw.rules.create(
  119. direction=TrafficDirection.INBOUND, protocol='tcp',
  120. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  121. # attempting to add the same rule twice should succeed
  122. same_rule = fw.rules.create(
  123. direction=TrafficDirection.INBOUND, protocol='tcp',
  124. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  125. self.assertEqual(rule, same_rule)
  126. @helpers.skipIfNoService(['security.vm_firewalls'])
  127. def test_vm_firewall_group_rule(self):
  128. name = 'cb_fwrule-{0}'.format(helpers.get_uuid())
  129. # Declare these variables and late binding will allow
  130. # the cleanup method access to the most current values
  131. net = None
  132. fw = None
  133. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  134. network=net, vm_firewall=fw)):
  135. net, _ = helpers.create_test_network(self.provider, name)
  136. fw = self.provider.security.vm_firewalls.create(
  137. name=name, description=name, network_id=net.id)
  138. rules = list(fw.rules)
  139. self.assertTrue(
  140. # TODO: This should be made consistent across all providers.
  141. # Currently, OpenStack creates two rules, one for IPV6 and
  142. # another for IPV4
  143. len(rules) >= 1, "Expected a single VM firewall rule allowing"
  144. " all outbound traffic. Got {0}.".format(rules))
  145. self.assertEqual(
  146. rules[0].direction, TrafficDirection.OUTBOUND,
  147. "Expected rule to be outbound. Got {0}.".format(rules))
  148. rule = fw.rules.create(
  149. direction=TrafficDirection.INBOUND, src_dest_fw=fw,
  150. protocol='tcp', from_port=1, to_port=65535)
  151. self.assertTrue(
  152. rule.src_dest_fw.name == name,
  153. "Expected VM firewall rule name {0}. Got {1}."
  154. .format(name, rule.src_dest_fw.name))
  155. for r in fw.rules:
  156. r.delete()
  157. fw = self.provider.security.vm_firewalls.get(fw.id) # update
  158. self.assertTrue(
  159. len(list(fw.rules)) == 0,
  160. "Deleting VMFirewallRule should delete it: {0}".format(
  161. fw.rules))
  162. fwl = self.provider.security.vm_firewalls.list()
  163. found_fw = [f for f in fwl if f.name == name]
  164. self.assertTrue(
  165. len(found_fw) == 0,
  166. "VM firewall {0} should have been deleted but still exists."
  167. .format(name))