test_security_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. """Test cloudbridge.security modules."""
  2. import json
  3. import unittest
  4. import uuid
  5. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  6. from test.helpers import ProviderTestBase
  7. import test.helpers as helpers
  8. class CloudSecurityServiceTestCase(ProviderTestBase):
  9. @helpers.skipIfNoService(['security.key_pairs'])
  10. def test_crud_key_pair_service(self):
  11. name = 'cbtestkeypairA-{0}'.format(uuid.uuid4())
  12. kp = self.provider.security.key_pairs.create(name=name)
  13. with helpers.cleanup_action(
  14. lambda:
  15. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  16. ):
  17. # test list method
  18. kpl = self.provider.security.key_pairs.list()
  19. list_kpl = [i for i in kpl if i.name == name]
  20. self.assertTrue(
  21. len(list_kpl) == 1,
  22. "List key pairs does not return the expected key pair %s" %
  23. name)
  24. # check iteration
  25. iter_kpl = [i for i in self.provider.security.key_pairs
  26. if i.name == name]
  27. self.assertTrue(
  28. len(iter_kpl) == 1,
  29. "Iter key pairs does not return the expected key pair %s" %
  30. name)
  31. # check find
  32. find_kp = self.provider.security.key_pairs.find(name=name)[0]
  33. self.assertTrue(
  34. find_kp == kp,
  35. "Find key pair did not return the expected key {0}."
  36. .format(name))
  37. # check get
  38. get_kp = self.provider.security.key_pairs.get(name)
  39. self.assertTrue(
  40. get_kp == kp,
  41. "Get key pair did not return the expected key {0}."
  42. .format(name))
  43. # Recreating existing keypair should raise an exception
  44. with self.assertRaises(Exception):
  45. self.provider.security.key_pairs.create(name=name)
  46. kpl = self.provider.security.key_pairs.list()
  47. found_kp = [k for k in kpl if k.name == name]
  48. self.assertTrue(
  49. len(found_kp) == 0,
  50. "Key pair {0} should have been deleted but still exists."
  51. .format(name))
  52. no_kp = self.provider.security.key_pairs.find(name='bogus_kp')
  53. self.assertFalse(
  54. no_kp,
  55. "Found a key pair {0} that should not exist?".format(no_kp))
  56. @helpers.skipIfNoService(['security.key_pairs'])
  57. def test_key_pair(self):
  58. name = 'cbtestkeypairB-{0}'.format(uuid.uuid4())
  59. kp = self.provider.security.key_pairs.create(name=name)
  60. with helpers.cleanup_action(lambda: kp.delete()):
  61. kpl = self.provider.security.key_pairs.list()
  62. found_kp = [k for k in kpl if k.name == name]
  63. self.assertTrue(
  64. len(found_kp) == 1,
  65. "List key pairs did not return the expected key {0}."
  66. .format(name))
  67. self.assertTrue(
  68. kp.id in repr(kp),
  69. "repr(obj) should contain the object id so that the object"
  70. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  71. self.assertIsNotNone(
  72. kp.material,
  73. "KeyPair material is empty but it should not be.")
  74. self.assertTrue(
  75. kp == kp,
  76. "The same key pair should be equal to self.")
  77. json_repr = json.dumps(
  78. {"material": kp.material, "id": name, "name": name},
  79. sort_keys=True)
  80. self.assertEqual(
  81. kp.to_json(), json_repr,
  82. "JSON key pair representation {0} does not match expected {1}"
  83. .format(kp.to_json(), json_repr))
  84. kpl = self.provider.security.key_pairs.list()
  85. found_kp = [k for k in kpl if k.name == name]
  86. self.assertTrue(
  87. len(found_kp) == 0,
  88. "Key pair {0} should have been deleted but still exists."
  89. .format(name))
  90. def cleanup_sg(self, sg, net):
  91. with helpers.cleanup_action(
  92. lambda: self.provider.network.delete(network_id=net.id)):
  93. self.provider.security.security_groups.delete(group_id=sg.id)
  94. @helpers.skipIfNoService(['security.security_groups'])
  95. def test_crud_security_group_service(self):
  96. name = 'CBTestSecurityGroupA-{0}'.format(uuid.uuid4())
  97. net = None
  98. sg = None
  99. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  100. network=net, security_group=sg)):
  101. net, _ = helpers.create_test_network(self.provider, name)
  102. sg = self.provider.security.security_groups.create(
  103. name=name, description=name, network_id=net.id)
  104. self.assertEqual(name, sg.description)
  105. # test list method
  106. sgl = self.provider.security.security_groups.list()
  107. found_sgl = [i for i in sgl if i.name == name]
  108. self.assertTrue(
  109. len(found_sgl) == 1,
  110. "List security groups does not return the expected group %s" %
  111. name)
  112. # check iteration
  113. found_sgl = [i for i in self.provider.security.security_groups
  114. if i.name == name]
  115. self.assertTrue(
  116. len(found_sgl) == 1,
  117. "Iter security groups does not return the expected group %s" %
  118. name)
  119. # check find
  120. find_sg = self.provider.security.security_groups.find(name=sg.name)
  121. self.assertTrue(
  122. len(find_sg) == 1,
  123. "List security groups returned {0} when expected was: {1}."
  124. .format(find_sg, sg.name))
  125. # check get
  126. get_sg = self.provider.security.security_groups.get(sg.id)
  127. self.assertTrue(
  128. get_sg == sg,
  129. "Get SecurityGroup did not return the expected key {0}."
  130. .format(name))
  131. self.assertTrue(
  132. sg.id in repr(sg),
  133. "repr(obj) should contain the object id so that the object"
  134. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  135. sgl = self.provider.security.security_groups.list()
  136. found_sg = [g for g in sgl if g.name == name]
  137. self.assertTrue(
  138. len(found_sg) == 0,
  139. "Security group {0} should have been deleted but still exists."
  140. .format(name))
  141. no_sg = self.provider.security.security_groups.find(name='bogus_sg')
  142. self.assertTrue(
  143. len(no_sg) == 0,
  144. "Found a bogus security group?!?".format(no_sg))
  145. @helpers.skipIfNoService(['security.security_groups'])
  146. def test_security_group(self):
  147. """Test for proper creation of a security group."""
  148. name = 'CBTestSecurityGroupB-{0}'.format(uuid.uuid4())
  149. net = None
  150. sg = None
  151. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  152. network=net, security_group=sg)):
  153. net, _ = helpers.create_test_network(self.provider, name)
  154. sg = self.provider.security.security_groups.create(
  155. name=name, description=name, network_id=net.id)
  156. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  157. cidr_ip='0.0.0.0/0')
  158. found_rule = sg.get_rule(ip_protocol='tcp', from_port=1111,
  159. to_port=1111, cidr_ip='0.0.0.0/0')
  160. self.assertTrue(
  161. rule == found_rule,
  162. "Expected rule {0} not found in security group: {0}".format(
  163. rule, sg.rules))
  164. object_keys = (
  165. sg.rules[0].ip_protocol,
  166. sg.rules[0].from_port,
  167. sg.rules[0].to_port)
  168. self.assertTrue(
  169. all(str(key) in repr(sg.rules[0]) for key in object_keys),
  170. "repr(obj) should contain ip_protocol, form_port, and to_port"
  171. " so that the object can be reconstructed, but does not:"
  172. " {0}; {1}".format(sg.rules[0], object_keys))
  173. self.assertTrue(
  174. sg == sg,
  175. "The same security groups should be equal?")
  176. self.assertFalse(
  177. sg != sg,
  178. "The same security groups should still be equal?")
  179. # json_repr = json.dumps(
  180. # {"description": name, "name": name, "id": sg.id,
  181. # "rules":
  182. # [{"from_port": 1111, "group": "", "cidr_ip": "0.0.0.0/0",
  183. # "parent": sg.id, "to_port": 1111, "ip_protocol": "tcp",
  184. # "id": sg.rules[0].id}]},
  185. # sort_keys=True)
  186. # self.assertTrue(
  187. # sg.to_json() == json_repr,
  188. # "JSON SG representation {0} does not match expected {1}"
  189. # .format(sg.to_json(), json_repr))
  190. sgl = self.provider.security.security_groups.list()
  191. found_sg = [g for g in sgl if g.name == name]
  192. self.assertTrue(
  193. len(found_sg) == 0,
  194. "Security group {0} should have been deleted but still exists."
  195. .format(name))
  196. @helpers.skipIfNoService(['security.security_groups'])
  197. def test_security_group_rule_add_twice(self):
  198. """Test whether adding the same rule twice succeeds."""
  199. if isinstance(self.provider, TestMockHelperMixin):
  200. raise unittest.SkipTest(
  201. "Mock provider returns InvalidParameterValue: "
  202. "Value security_group is invalid for parameter.")
  203. name = 'CBTestSecurityGroupC-{0}'.format(uuid.uuid4())
  204. net = None
  205. sg = None
  206. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  207. network=net, security_group=sg)):
  208. net, _ = helpers.create_test_network(self.provider, name)
  209. sg = self.provider.security.security_groups.create(
  210. name=name, description=name, network_id=net.id)
  211. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  212. cidr_ip='0.0.0.0/0')
  213. # attempting to add the same rule twice should succeed
  214. same_rule = sg.add_rule(ip_protocol='tcp', from_port=1111,
  215. to_port=1111, cidr_ip='0.0.0.0/0')
  216. self.assertTrue(
  217. rule == same_rule,
  218. "Expected rule {0} not found in security group: {0}".format(
  219. same_rule, sg.rules))
  220. @helpers.skipIfNoService(['security.security_groups'])
  221. def test_security_group_group_rule(self):
  222. """Test for proper creation of a security group rule."""
  223. name = 'CBTestSecurityGroupD-{0}'.format(uuid.uuid4())
  224. net = None
  225. sg = None
  226. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  227. network=net, security_group=sg)):
  228. net, _ = helpers.create_test_network(self.provider, name)
  229. sg = self.provider.security.security_groups.create(
  230. name=name, description=name, network_id=net.id)
  231. self.assertTrue(
  232. len(sg.rules) == 0,
  233. "Expected no security group group rule. Got {0}."
  234. .format(sg.rules))
  235. rule = sg.add_rule(src_group=sg, ip_protocol='tcp', from_port=1,
  236. to_port=65535)
  237. self.assertTrue(
  238. rule.group.name == name,
  239. "Expected security group rule name {0}. Got {1}."
  240. .format(name, rule.group.name))
  241. for r in sg.rules:
  242. r.delete()
  243. sg = self.provider.security.security_groups.get(sg.id) # update
  244. self.assertTrue(
  245. len(sg.rules) == 0,
  246. "Deleting SecurityGroupRule should delete it: {0}".format(
  247. sg.rules))
  248. sgl = self.provider.security.security_groups.list()
  249. found_sg = [g for g in sgl if g.name == name]
  250. self.assertTrue(
  251. len(found_sg) == 0,
  252. "Security group {0} should have been deleted but still exists."
  253. .format(name))