test_security_service.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """Test cloudbridge.security modules."""
  2. from test import helpers
  3. from test.helpers import ProviderTestBase
  4. from test.helpers import standard_interface_tests as sit
  5. from cloudbridge.cloud.interfaces.resources import KeyPair
  6. from cloudbridge.cloud.interfaces.resources import SecurityGroup
  7. class CloudSecurityServiceTestCase(ProviderTestBase):
  8. @helpers.skipIfNoService(['security.key_pairs'])
  9. def test_crud_key_pair_service(self):
  10. def create_kp(name):
  11. return self.provider.security.key_pairs.create(name=name)
  12. def cleanup_kp(kp):
  13. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  14. def extra_tests(kp):
  15. # Recreating existing keypair should raise an exception
  16. with self.assertRaises(Exception):
  17. self.provider.security.key_pairs.create(name=kp.name)
  18. sit.check_crud(self, self.provider.security.key_pairs, KeyPair,
  19. "cb_crudkp", create_kp, cleanup_kp,
  20. extra_test_func=extra_tests)
  21. @helpers.skipIfNoService(['security.key_pairs'])
  22. def test_key_pair_properties(self):
  23. name = 'cb_kpprops-{0}'.format(helpers.get_uuid())
  24. kp = self.provider.security.key_pairs.create(name=name)
  25. with helpers.cleanup_action(lambda: kp.delete()):
  26. self.assertIsNotNone(
  27. kp.material,
  28. "KeyPair material is empty but it should not be.")
  29. # get the keypair again - keypair material should now be empty
  30. kp = self.provider.security.key_pairs.get(kp.id)
  31. self.assertIsNone(kp.material,
  32. "Keypair material should now be empty")
  33. @helpers.skipIfNoService(['security.security_groups'])
  34. def test_crud_security_group(self):
  35. name = 'cb_crudsg-{0}'.format(helpers.get_uuid())
  36. # Declare these variables and late binding will allow
  37. # the cleanup method access to the most current values
  38. net = None
  39. def create_sg(name):
  40. return self.provider.security.security_groups.create(
  41. name=name, description=name, network_id=net.id)
  42. def cleanup_sg(sg):
  43. sg.delete()
  44. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  45. network=net)):
  46. net, _ = helpers.create_test_network(self.provider, name)
  47. sit.check_crud(self, self.provider.security.security_groups,
  48. SecurityGroup, "cb_crudsg", create_sg, cleanup_sg)
  49. @helpers.skipIfNoService(['security.security_groups'])
  50. def test_security_group_properties(self):
  51. name = 'cb_propsg-{0}'.format(helpers.get_uuid())
  52. # Declare these variables and late binding will allow
  53. # the cleanup method access to the most current values
  54. net = None
  55. sg = None
  56. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  57. network=net, security_group=sg)):
  58. net, _ = helpers.create_test_network(self.provider, name)
  59. sg = self.provider.security.security_groups.create(
  60. name=name, description=name, network_id=net.id)
  61. self.assertEqual(name, sg.description)
  62. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  63. cidr_ip='0.0.0.0/0')
  64. found_rule = sg.get_rule(ip_protocol='tcp', from_port=1111,
  65. to_port=1111, cidr_ip='0.0.0.0/0')
  66. self.assertTrue(
  67. rule == found_rule,
  68. "Expected rule {0} not found in security group: {0}".format(
  69. rule, sg.rules))
  70. object_keys = (
  71. sg.rules[0].ip_protocol,
  72. sg.rules[0].from_port,
  73. sg.rules[0].to_port)
  74. self.assertTrue(
  75. all(str(key) in repr(sg.rules[0]) for key in object_keys),
  76. "repr(obj) should contain ip_protocol, form_port, and to_port"
  77. " so that the object can be reconstructed, but does not:"
  78. " {0}; {1}".format(sg.rules[0], object_keys))
  79. self.assertTrue(
  80. sg == sg,
  81. "The same security groups should be equal?")
  82. self.assertFalse(
  83. sg != sg,
  84. "The same security groups should still be equal?")
  85. sit.check_delete(self, self.provider.security.security_groups, sg)
  86. sgl = self.provider.security.security_groups.list()
  87. found_sg = [g for g in sgl if g.name == name]
  88. self.assertTrue(
  89. len(found_sg) == 0,
  90. "Security group {0} should have been deleted but still exists."
  91. .format(name))
  92. @helpers.skipIfNoService(['security.security_groups'])
  93. def test_security_group_rule_add_twice(self):
  94. name = 'cb_sgruletwice-{0}'.format(helpers.get_uuid())
  95. # Declare these variables and late binding will allow
  96. # the cleanup method access to the most current values
  97. net = None
  98. sg = None
  99. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  100. network=net, security_group=sg)):
  101. net, _ = helpers.create_test_network(self.provider, name)
  102. sg = self.provider.security.security_groups.create(
  103. name=name, description=name, network_id=net.id)
  104. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  105. cidr_ip='0.0.0.0/0')
  106. # attempting to add the same rule twice should succeed
  107. same_rule = sg.add_rule(ip_protocol='tcp', from_port=1111,
  108. to_port=1111, cidr_ip='0.0.0.0/0')
  109. self.assertTrue(
  110. rule == same_rule,
  111. "Expected rule {0} not found in security group: {0}".format(
  112. same_rule, sg.rules))
  113. @helpers.skipIfNoService(['security.security_groups'])
  114. def test_security_group_group_rule(self):
  115. name = 'cb_sgrule-{0}'.format(helpers.get_uuid())
  116. # Declare these variables and late binding will allow
  117. # the cleanup method access to the most current values
  118. net = None
  119. sg = None
  120. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  121. network=net, security_group=sg)):
  122. net, _ = helpers.create_test_network(self.provider, name)
  123. sg = self.provider.security.security_groups.create(
  124. name=name, description=name, network_id=net.id)
  125. self.assertTrue(
  126. len(sg.rules) == 0,
  127. "Expected no security group group rule. Got {0}."
  128. .format(sg.rules))
  129. rule = sg.add_rule(src_group=sg, ip_protocol='tcp', from_port=1,
  130. to_port=65535)
  131. self.assertTrue(
  132. rule.group.name == name,
  133. "Expected security group rule name {0}. Got {1}."
  134. .format(name, rule.group.name))
  135. for r in sg.rules:
  136. r.delete()
  137. sg = self.provider.security.security_groups.get(sg.id) # update
  138. self.assertTrue(
  139. len(sg.rules) == 0,
  140. "Deleting SecurityGroupRule should delete it: {0}".format(
  141. sg.rules))
  142. sgl = self.provider.security.security_groups.list()
  143. found_sg = [g for g in sgl if g.name == name]
  144. self.assertTrue(
  145. len(found_sg) == 0,
  146. "Security group {0} should have been deleted but still exists."
  147. .format(name))