test_security_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. """Test cloudbridge.security modules."""
  2. import json
  3. import unittest
  4. import uuid
  5. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  6. from test.helpers import ProviderTestBase
  7. import test.helpers as helpers
  8. class CloudSecurityServiceTestCase(ProviderTestBase):
  9. def __init__(self, methodName, provider):
  10. super(CloudSecurityServiceTestCase, self).__init__(
  11. methodName=methodName, provider=provider)
  12. @helpers.skipIfNoService(['security.key_pairs'])
  13. def test_crud_key_pair_service(self):
  14. name = 'cbtestkeypairA-{0}'.format(uuid.uuid4())
  15. kp = self.provider.security.key_pairs.create(name=name)
  16. with helpers.cleanup_action(
  17. lambda:
  18. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  19. ):
  20. # test list method
  21. kpl = self.provider.security.key_pairs.list()
  22. list_kpl = [i for i in kpl if i.name == name]
  23. self.assertTrue(
  24. len(list_kpl) == 1,
  25. "List key pairs does not return the expected key pair %s" %
  26. name)
  27. # check iteration
  28. iter_kpl = [i for i in self.provider.security.key_pairs
  29. if i.name == name]
  30. self.assertTrue(
  31. len(iter_kpl) == 1,
  32. "Iter key pairs does not return the expected key pair %s" %
  33. name)
  34. # check find
  35. find_kp = self.provider.security.key_pairs.find(name=name)[0]
  36. self.assertTrue(
  37. find_kp == kp,
  38. "Find key pair did not return the expected key {0}."
  39. .format(name))
  40. # check get
  41. get_kp = self.provider.security.key_pairs.get(name)
  42. self.assertTrue(
  43. get_kp == kp,
  44. "Get key pair did not return the expected key {0}."
  45. .format(name))
  46. # Recreating existing keypair should raise an exception
  47. with self.assertRaises(Exception):
  48. self.provider.security.key_pairs.create(name=name)
  49. kpl = self.provider.security.key_pairs.list()
  50. found_kp = [k for k in kpl if k.name == name]
  51. self.assertTrue(
  52. len(found_kp) == 0,
  53. "Key pair {0} should have been deleted but still exists."
  54. .format(name))
  55. no_kp = self.provider.security.key_pairs.find(name='bogus_kp')
  56. self.assertFalse(
  57. no_kp,
  58. "Found a key pair {0} that should not exist?".format(no_kp))
  59. @helpers.skipIfNoService(['security.key_pairs'])
  60. def test_key_pair(self):
  61. name = 'cbtestkeypairB-{0}'.format(uuid.uuid4())
  62. kp = self.provider.security.key_pairs.create(name=name)
  63. with helpers.cleanup_action(lambda: kp.delete()):
  64. kpl = self.provider.security.key_pairs.list()
  65. found_kp = [k for k in kpl if k.name == name]
  66. self.assertTrue(
  67. len(found_kp) == 1,
  68. "List key pairs did not return the expected key {0}."
  69. .format(name))
  70. self.assertTrue(
  71. kp.id in repr(kp),
  72. "repr(obj) should contain the object id so that the object"
  73. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  74. self.assertIsNotNone(
  75. kp.material,
  76. "KeyPair material is empty but it should not be.")
  77. self.assertTrue(
  78. kp == kp,
  79. "The same key pair should be equal to self.")
  80. json_repr = json.dumps(
  81. {"material": kp.material, "id": name, "name": name},
  82. sort_keys=True)
  83. self.assertEqual(
  84. kp.to_json(), json_repr,
  85. "JSON key pair representation {0} does not match expected {1}"
  86. .format(kp.to_json(), json_repr))
  87. kpl = self.provider.security.key_pairs.list()
  88. found_kp = [k for k in kpl if k.name == name]
  89. self.assertTrue(
  90. len(found_kp) == 0,
  91. "Key pair {0} should have been deleted but still exists."
  92. .format(name))
  93. def cleanup_sg(self, sg, net):
  94. with helpers.cleanup_action(
  95. lambda: self.provider.network.delete(network_id=net.id)):
  96. self.provider.security.security_groups.delete(group_id=sg.id)
  97. @helpers.skipIfNoService(['security.security_groups'])
  98. def test_crud_security_group_service(self):
  99. name = 'cbtestsecuritygroupA-{0}'.format(uuid.uuid4())
  100. net = self.provider.network.create(name=name)
  101. sg = self.provider.security.security_groups.create(
  102. name=name, description=name, network_id=net.id)
  103. with helpers.cleanup_action(lambda: self.cleanup_sg(sg, net)):
  104. self.assertEqual(name, sg.description)
  105. # test list method
  106. sgl = self.provider.security.security_groups.list()
  107. found_sgl = [i for i in sgl if i.name == name]
  108. self.assertTrue(
  109. len(found_sgl) == 1,
  110. "List security groups does not return the expected group %s" %
  111. name)
  112. # check iteration
  113. found_sgl = [i for i in self.provider.security.security_groups
  114. if i.name == name]
  115. self.assertTrue(
  116. len(found_sgl) == 1,
  117. "Iter security groups does not return the expected group %s" %
  118. name)
  119. # check find
  120. find_sg = self.provider.security.security_groups.find(name=sg.name)
  121. self.assertTrue(
  122. len(find_sg) == 1,
  123. "List security groups returned {0} when expected was: {1}."
  124. .format(find_sg, sg.name))
  125. # check get
  126. get_sg = self.provider.security.security_groups.get(sg.id)
  127. self.assertTrue(
  128. get_sg == sg,
  129. "Get SecurityGroup did not return the expected key {0}."
  130. .format(name))
  131. self.assertTrue(
  132. sg.id in repr(sg),
  133. "repr(obj) should contain the object id so that the object"
  134. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  135. sgl = self.provider.security.security_groups.list()
  136. found_sg = [g for g in sgl if g.name == name]
  137. self.assertTrue(
  138. len(found_sg) == 0,
  139. "Security group {0} should have been deleted but still exists."
  140. .format(name))
  141. no_sg = self.provider.security.security_groups.find(name='bogus_sg')
  142. self.assertTrue(
  143. len(no_sg) == 0,
  144. "Found a bogus security group?!?".format(no_sg))
  145. @helpers.skipIfNoService(['security.security_groups'])
  146. def test_security_group(self):
  147. """Test for proper creation of a security group."""
  148. name = 'cbtestsecuritygroupB-{0}'.format(uuid.uuid4())
  149. net = self.provider.network.create(name=name)
  150. sg = self.provider.security.security_groups.create(
  151. name=name, description=name, network_id=net.id)
  152. with helpers.cleanup_action(lambda: self.cleanup_sg(sg, net)):
  153. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  154. cidr_ip='0.0.0.0/0')
  155. found_rule = sg.get_rule(ip_protocol='tcp', from_port=1111,
  156. to_port=1111, cidr_ip='0.0.0.0/0')
  157. self.assertTrue(
  158. rule == found_rule,
  159. "Expected rule {0} not found in security group: {0}".format(
  160. rule, sg.rules))
  161. object_keys = (
  162. sg.rules[0].ip_protocol,
  163. sg.rules[0].from_port,
  164. sg.rules[0].to_port)
  165. self.assertTrue(
  166. all(str(key) in repr(sg.rules[0]) for key in object_keys),
  167. "repr(obj) should contain ip_protocol, form_port, and to_port"
  168. " so that the object can be reconstructed, but does not:"
  169. " {0}; {1}".format(sg.rules[0], object_keys))
  170. self.assertTrue(
  171. sg == sg,
  172. "The same security groups should be equal?")
  173. self.assertFalse(
  174. sg != sg,
  175. "The same security groups should still be equal?")
  176. # json_repr = json.dumps(
  177. # {"description": name, "name": name, "id": sg.id,
  178. # "rules":
  179. # [{"from_port": 1111, "group": "", "cidr_ip": "0.0.0.0/0",
  180. # "parent": sg.id, "to_port": 1111, "ip_protocol": "tcp",
  181. # "id": sg.rules[0].id}]},
  182. # sort_keys=True)
  183. # self.assertTrue(
  184. # sg.to_json() == json_repr,
  185. # "JSON SG representation {0} does not match expected {1}"
  186. # .format(sg.to_json(), json_repr))
  187. sgl = self.provider.security.security_groups.list()
  188. found_sg = [g for g in sgl if g.name == name]
  189. self.assertTrue(
  190. len(found_sg) == 0,
  191. "Security group {0} should have been deleted but still exists."
  192. .format(name))
  193. @helpers.skipIfNoService(['security.security_groups'])
  194. def test_security_group_rule_add_twice(self):
  195. """Test whether adding the same rule twice succeeds."""
  196. if isinstance(self.provider, TestMockHelperMixin):
  197. raise unittest.SkipTest(
  198. "Mock provider returns InvalidParameterValue: "
  199. "Value security_group is invalid for parameter.")
  200. name = 'cbtestsecuritygroupB-{0}'.format(uuid.uuid4())
  201. net = self.provider.network.create(name=name)
  202. sg = self.provider.security.security_groups.create(
  203. name=name, description=name, network_id=net.id)
  204. with helpers.cleanup_action(lambda: self.cleanup_sg(sg, net)):
  205. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  206. cidr_ip='0.0.0.0/0')
  207. # attempting to add the same rule twice should succeed
  208. same_rule = sg.add_rule(ip_protocol='tcp', from_port=1111,
  209. to_port=1111, cidr_ip='0.0.0.0/0')
  210. self.assertTrue(
  211. rule == same_rule,
  212. "Expected rule {0} not found in security group: {0}".format(
  213. same_rule, sg.rules))
  214. @helpers.skipIfNoService(['security.security_groups'])
  215. def test_security_group_group_rule(self):
  216. """Test for proper creation of a security group rule."""
  217. name = 'cbtestsecuritygroupC-{0}'.format(uuid.uuid4())
  218. net = self.provider.network.create(name=name)
  219. sg = self.provider.security.security_groups.create(
  220. name=name, description=name, network_id=net.id)
  221. with helpers.cleanup_action(lambda: self.cleanup_sg(sg, net)):
  222. self.assertTrue(
  223. len(sg.rules) == 0,
  224. "Expected no security group group rule. Got {0}."
  225. .format(sg.rules))
  226. rule = sg.add_rule(src_group=sg, ip_protocol='tcp', from_port=1,
  227. to_port=65535)
  228. self.assertTrue(
  229. rule.group.name == name,
  230. "Expected security group rule name {0}. Got {1}."
  231. .format(name, rule.group.name))
  232. for r in sg.rules:
  233. r.delete()
  234. sg = self.provider.security.security_groups.get(sg.id) # update
  235. self.assertTrue(
  236. len(sg.rules) == 0,
  237. "Deleting SecurityGroupRule should delete it: {0}".format(
  238. sg.rules))
  239. sgl = self.provider.security.security_groups.list()
  240. found_sg = [g for g in sgl if g.name == name]
  241. self.assertTrue(
  242. len(found_sg) == 0,
  243. "Security group {0} should have been deleted but still exists."
  244. .format(name))