2
0

test_security_service.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. """Test cloudbridge.security modules."""
  2. from test import helpers
  3. from test.helpers import ProviderTestBase
  4. from test.helpers import standard_interface_tests as sit
  5. import cloudbridge.cloud.base.helpers as cb_helpers
  6. from cloudbridge.cloud.interfaces.exceptions import DuplicateResourceException
  7. from cloudbridge.cloud.interfaces.resources import KeyPair
  8. from cloudbridge.cloud.interfaces.resources import TrafficDirection
  9. from cloudbridge.cloud.interfaces.resources import VMFirewall
  10. from cloudbridge.cloud.interfaces.resources import VMFirewallRule
  11. class CloudSecurityServiceTestCase(ProviderTestBase):
  12. _multiprocess_can_split_ = True
  13. @helpers.skipIfNoService(['security.key_pairs'])
  14. def test_crud_key_pair_service(self):
  15. def create_kp(name):
  16. return self.provider.security.key_pairs.create(name=name)
  17. def cleanup_kp(kp):
  18. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  19. def extra_tests(kp):
  20. # Recreating existing keypair should raise an exception
  21. with self.assertRaises(DuplicateResourceException):
  22. self.provider.security.key_pairs.create(name=kp.name)
  23. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  24. "cb-crudkp", create_kp, cleanup_kp,
  25. extra_test_func=extra_tests)
  26. @helpers.skipIfNoService(['security.key_pairs'])
  27. def test_key_pair_properties(self):
  28. name = 'cb-kpprops-{0}'.format(helpers.get_uuid())
  29. kp = self.provider.security.key_pairs.create(name=name)
  30. with helpers.cleanup_action(lambda: kp.delete()):
  31. self.assertIsNotNone(
  32. kp.material,
  33. "KeyPair material is empty but it should not be.")
  34. # get the keypair again - keypair material should now be empty
  35. kp = self.provider.security.key_pairs.get(kp.id)
  36. self.assertIsNone(kp.material,
  37. "Keypair material should now be empty")
  38. @helpers.skipIfNoService(['security.key_pairs'])
  39. def test_import_key_pair(self):
  40. name = 'cb-kpimport-{0}'.format(helpers.get_uuid())
  41. public_key, _ = cb_helpers.generate_key_pair()
  42. kp = self.provider.security.key_pairs.create(
  43. name=name, public_key_material=public_key)
  44. with helpers.cleanup_action(lambda: kp.delete()):
  45. self.assertIsNone(kp.material, "Private KeyPair material should"
  46. " be None when key is imported.")
  47. @helpers.skipIfNoService(['security.vm_firewalls'])
  48. def test_crud_vm_firewall(self):
  49. name = 'cb-crudfw-{0}'.format(helpers.get_uuid())
  50. # Declare these variables and late binding will allow
  51. # the cleanup method access to the most current values
  52. net = None
  53. def create_fw(name):
  54. return self.provider.security.vm_firewalls.create(
  55. name=name, description=name, network_id=net.id)
  56. def cleanup_fw(fw):
  57. fw.delete()
  58. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  59. network=net)):
  60. net, _ = helpers.create_test_network(self.provider, name)
  61. sit.check_crud(self, self.provider.security.vm_firewalls,
  62. VMFirewall, "cb-crudfw", create_fw, cleanup_fw)
  63. @helpers.skipIfNoService(['security.vm_firewalls'])
  64. def test_vm_firewall_properties(self):
  65. name = 'cb-propfw-{0}'.format(helpers.get_uuid())
  66. # Declare these variables and late binding will allow
  67. # the cleanup method access to the most current values
  68. net = None
  69. fw = None
  70. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  71. network=net, vm_firewall=fw)):
  72. net, _ = helpers.create_test_network(self.provider, name)
  73. fw = self.provider.security.vm_firewalls.create(
  74. name=name, description=name, network_id=net.id)
  75. self.assertEqual(name, fw.description)
  76. @helpers.skipIfNoService(['security.vm_firewalls'])
  77. def test_crud_vm_firewall_rules(self):
  78. name = 'cb-crudfw-rules-{0}'.format(helpers.get_uuid())
  79. # Declare these variables and late binding will allow
  80. # the cleanup method access to the most current values
  81. net = None
  82. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  83. network=net)):
  84. net, _ = helpers.create_test_network(self.provider, name)
  85. fw = None
  86. with helpers.cleanup_action(lambda: fw.delete()):
  87. fw = self.provider.security.vm_firewalls.create(
  88. name=name, description=name, network_id=net.id)
  89. def create_fw_rule(name):
  90. return fw.rules.create(
  91. direction=TrafficDirection.INBOUND, protocol='tcp',
  92. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  93. def cleanup_fw_rule(rule):
  94. rule.delete()
  95. sit.check_crud(self, fw.rules, VMFirewallRule, "cb-crudfwrule",
  96. create_fw_rule, cleanup_fw_rule,
  97. skip_name_check=True)
  98. @helpers.skipIfNoService(['security.vm_firewalls'])
  99. def test_vm_firewall_rule_properties(self):
  100. name = 'cb-propfwrule-{0}'.format(helpers.get_uuid())
  101. # Declare these variables and late binding will allow
  102. # the cleanup method access to the most current values
  103. net = None
  104. fw = None
  105. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  106. network=net, vm_firewall=fw)):
  107. net, _ = helpers.create_test_network(self.provider, name)
  108. fw = self.provider.security.vm_firewalls.create(
  109. name=name, description=name, network_id=net.id)
  110. rule = fw.rules.create(
  111. direction=TrafficDirection.INBOUND, protocol='tcp',
  112. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  113. self.assertEqual(rule.direction, TrafficDirection.INBOUND)
  114. self.assertEqual(rule.protocol, 'tcp')
  115. self.assertEqual(rule.from_port, 1111)
  116. self.assertEqual(rule.to_port, 1111)
  117. self.assertEqual(rule.cidr, '0.0.0.0/0')
  118. @helpers.skipIfNoService(['security.vm_firewalls'])
  119. def test_vm_firewall_rule_add_twice(self):
  120. name = 'cb-fwruletwice-{0}'.format(helpers.get_uuid())
  121. # Declare these variables and late binding will allow
  122. # the cleanup method access to the most current values
  123. net = None
  124. fw = None
  125. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  126. network=net, vm_firewall=fw)):
  127. net, _ = helpers.create_test_network(self.provider, name)
  128. fw = self.provider.security.vm_firewalls.create(
  129. name=name, description=name, network_id=net.id)
  130. rule = fw.rules.create(
  131. direction=TrafficDirection.INBOUND, protocol='tcp',
  132. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  133. # attempting to add the same rule twice should succeed
  134. same_rule = fw.rules.create(
  135. direction=TrafficDirection.INBOUND, protocol='tcp',
  136. from_port=1111, to_port=1111, cidr='0.0.0.0/0')
  137. self.assertEqual(rule, same_rule)
  138. @helpers.skipIfNoService(['security.vm_firewalls'])
  139. def test_vm_firewall_group_rule(self):
  140. name = 'cb-fwrule-{0}'.format(helpers.get_uuid())
  141. # Declare these variables and late binding will allow
  142. # the cleanup method access to the most current values
  143. net = None
  144. fw = None
  145. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  146. network=net, vm_firewall=fw)):
  147. net, _ = helpers.create_test_network(self.provider, name)
  148. fw = self.provider.security.vm_firewalls.create(
  149. name=name, description=name, network_id=net.id)
  150. rule = fw.rules.create(
  151. direction=TrafficDirection.INBOUND, src_dest_fw=fw,
  152. protocol='tcp', from_port=1, to_port=65535)
  153. self.assertTrue(
  154. rule.src_dest_fw.name == name,
  155. "Expected VM firewall rule name {0}. Got {1}."
  156. .format(name, rule.src_dest_fw.name))
  157. for r in fw.rules:
  158. r.delete()
  159. fw = self.provider.security.vm_firewalls.get(fw.id) # update
  160. self.assertTrue(
  161. len(list(fw.rules)) == 0,
  162. "Deleting VMFirewallRule should delete it: {0}".format(
  163. fw.rules))
  164. fwl = self.provider.security.vm_firewalls.list()
  165. found_fw = [f for f in fwl if f.name == name]
  166. self.assertTrue(
  167. len(found_fw) == 0,
  168. "VM firewall {0} should have been deleted but still exists."
  169. .format(name))