test_security_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. """Test cloudbridge.security modules."""
  2. import json
  3. import unittest
  4. import uuid
  5. from test import helpers
  6. from test.helpers import ProviderTestBase
  7. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  8. class CloudSecurityServiceTestCase(ProviderTestBase):
  9. @helpers.skipIfNoService(['security.key_pairs'])
  10. def test_crud_key_pair_service(self):
  11. name = 'cbtestkeypairA-{0}'.format(uuid.uuid4())
  12. kp = self.provider.security.key_pairs.create(name=name)
  13. with helpers.cleanup_action(
  14. lambda:
  15. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  16. ):
  17. # test list method
  18. kpl = self.provider.security.key_pairs.list()
  19. list_kpl = [i for i in kpl if i.name == name]
  20. self.assertTrue(
  21. len(list_kpl) == 1,
  22. "List key pairs does not return the expected key pair %s" %
  23. name)
  24. # check iteration
  25. iter_kpl = [i for i in self.provider.security.key_pairs
  26. if i.name == name]
  27. self.assertTrue(
  28. len(iter_kpl) == 1,
  29. "Iter key pairs does not return the expected key pair %s" %
  30. name)
  31. # check find
  32. find_kp = self.provider.security.key_pairs.find(name=name)[0]
  33. self.assertTrue(
  34. find_kp == kp,
  35. "Find key pair did not return the expected key {0}."
  36. .format(name))
  37. # check get
  38. get_kp = self.provider.security.key_pairs.get(name)
  39. self.assertTrue(
  40. get_kp == kp,
  41. "Get key pair did not return the expected key {0}."
  42. .format(name))
  43. # Recreating existing keypair should raise an exception
  44. with self.assertRaises(Exception):
  45. self.provider.security.key_pairs.create(name=name)
  46. kpl = self.provider.security.key_pairs.list()
  47. found_kp = [k for k in kpl if k.name == name]
  48. self.assertTrue(
  49. len(found_kp) == 0,
  50. "Key pair {0} should have been deleted but still exists."
  51. .format(name))
  52. no_kp = self.provider.security.key_pairs.find(name='bogus_kp')
  53. self.assertFalse(
  54. no_kp,
  55. "Found a key pair {0} that should not exist?".format(no_kp))
  56. @helpers.skipIfNoService(['security.key_pairs'])
  57. def test_key_pair(self):
  58. name = 'cbtestkeypairB-{0}'.format(uuid.uuid4())
  59. kp = self.provider.security.key_pairs.create(name=name)
  60. with helpers.cleanup_action(lambda: kp.delete()):
  61. kpl = self.provider.security.key_pairs.list()
  62. found_kp = [k for k in kpl if k.name == name]
  63. self.assertTrue(
  64. len(found_kp) == 1,
  65. "List key pairs did not return the expected key {0}."
  66. .format(name))
  67. self.assertTrue(
  68. kp.id in repr(kp),
  69. "repr(obj) should contain the object id so that the object"
  70. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  71. self.assertIsNotNone(
  72. kp.material,
  73. "KeyPair material is empty but it should not be.")
  74. self.assertTrue(
  75. kp == kp,
  76. "The same key pair should be equal to self.")
  77. json_repr = json.dumps(
  78. {"material": kp.material, "id": name, "name": name},
  79. sort_keys=True)
  80. self.assertEqual(
  81. kp.to_json(), json_repr,
  82. "JSON key pair representation {0} does not match expected {1}"
  83. .format(kp.to_json(), json_repr))
  84. kpl = self.provider.security.key_pairs.list()
  85. found_kp = [k for k in kpl if k.name == name]
  86. self.assertTrue(
  87. len(found_kp) == 0,
  88. "Key pair {0} should have been deleted but still exists."
  89. .format(name))
  90. def cleanup_sg(self, sg, net):
  91. with helpers.cleanup_action(
  92. lambda: self.provider.network.delete(network_id=net.id)):
  93. self.provider.security.security_groups.delete(group_id=sg.id)
  94. @helpers.skipIfNoService(['security.security_groups'])
  95. def test_crud_security_group_service(self):
  96. name = 'CBTestSecurityGroupA-{0}'.format(uuid.uuid4())
  97. # Declare these variables and late binding will allow
  98. # the cleanup method access to the most current values
  99. net = None
  100. sg = None
  101. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  102. network=net, security_group=sg)):
  103. net, _ = helpers.create_test_network(self.provider, name)
  104. sg = self.provider.security.security_groups.create(
  105. name=name, description=name, network_id=net.id)
  106. self.assertEqual(name, sg.description)
  107. # test list method
  108. sgl = self.provider.security.security_groups.list()
  109. found_sgl = [i for i in sgl if i.name == name]
  110. self.assertTrue(
  111. len(found_sgl) == 1,
  112. "List security groups does not return the expected group %s" %
  113. name)
  114. # check iteration
  115. found_sgl = [i for i in self.provider.security.security_groups
  116. if i.name == name]
  117. self.assertTrue(
  118. len(found_sgl) == 1,
  119. "Iter security groups does not return the expected group %s" %
  120. name)
  121. # check find
  122. find_sg = self.provider.security.security_groups.find(name=sg.name)
  123. self.assertTrue(
  124. len(find_sg) == 1,
  125. "List security groups returned {0} when expected was: {1}."
  126. .format(find_sg, sg.name))
  127. # check get
  128. get_sg = self.provider.security.security_groups.get(sg.id)
  129. self.assertTrue(
  130. get_sg == sg,
  131. "Get SecurityGroup did not return the expected key {0}."
  132. .format(name))
  133. self.assertTrue(
  134. sg.id in repr(sg),
  135. "repr(obj) should contain the object id so that the object"
  136. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  137. sgl = self.provider.security.security_groups.list()
  138. found_sg = [g for g in sgl if g.name == name]
  139. self.assertTrue(
  140. len(found_sg) == 0,
  141. "Security group {0} should have been deleted but still exists."
  142. .format(name))
  143. no_sg = self.provider.security.security_groups.find(name='bogus_sg')
  144. self.assertTrue(
  145. len(no_sg) == 0,
  146. "Found a bogus security group?!?".format(no_sg))
  147. @helpers.skipIfNoService(['security.security_groups'])
  148. def test_security_group(self):
  149. """Test for proper creation of a security group."""
  150. name = 'CBTestSecurityGroupB-{0}'.format(uuid.uuid4())
  151. # Declare these variables and late binding will allow
  152. # the cleanup method access to the most current values
  153. net = None
  154. sg = None
  155. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  156. network=net, security_group=sg)):
  157. net, _ = helpers.create_test_network(self.provider, name)
  158. sg = self.provider.security.security_groups.create(
  159. name=name, description=name, network_id=net.id)
  160. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  161. cidr_ip='0.0.0.0/0')
  162. found_rule = sg.get_rule(ip_protocol='tcp', from_port=1111,
  163. to_port=1111, cidr_ip='0.0.0.0/0')
  164. self.assertTrue(
  165. rule == found_rule,
  166. "Expected rule {0} not found in security group: {0}".format(
  167. rule, sg.rules))
  168. object_keys = (
  169. sg.rules[0].ip_protocol,
  170. sg.rules[0].from_port,
  171. sg.rules[0].to_port)
  172. self.assertTrue(
  173. all(str(key) in repr(sg.rules[0]) for key in object_keys),
  174. "repr(obj) should contain ip_protocol, form_port, and to_port"
  175. " so that the object can be reconstructed, but does not:"
  176. " {0}; {1}".format(sg.rules[0], object_keys))
  177. self.assertTrue(
  178. sg == sg,
  179. "The same security groups should be equal?")
  180. self.assertFalse(
  181. sg != sg,
  182. "The same security groups should still be equal?")
  183. # json_repr = json.dumps(
  184. # {"description": name, "name": name, "id": sg.id,
  185. # "rules":
  186. # [{"from_port": 1111, "group": "", "cidr_ip": "0.0.0.0/0",
  187. # "parent": sg.id, "to_port": 1111, "ip_protocol": "tcp",
  188. # "id": sg.rules[0].id}]},
  189. # sort_keys=True)
  190. # self.assertTrue(
  191. # sg.to_json() == json_repr,
  192. # "JSON SG representation {0} does not match expected {1}"
  193. # .format(sg.to_json(), json_repr))
  194. sgl = self.provider.security.security_groups.list()
  195. found_sg = [g for g in sgl if g.name == name]
  196. self.assertTrue(
  197. len(found_sg) == 0,
  198. "Security group {0} should have been deleted but still exists."
  199. .format(name))
  200. @helpers.skipIfNoService(['security.security_groups'])
  201. def test_security_group_rule_add_twice(self):
  202. """Test whether adding the same rule twice succeeds."""
  203. if isinstance(self.provider, TestMockHelperMixin):
  204. raise unittest.SkipTest(
  205. "Mock provider returns InvalidParameterValue: "
  206. "Value security_group is invalid for parameter.")
  207. name = 'CBTestSecurityGroupC-{0}'.format(uuid.uuid4())
  208. # Declare these variables and late binding will allow
  209. # the cleanup method access to the most current values
  210. net = None
  211. sg = None
  212. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  213. network=net, security_group=sg)):
  214. net, _ = helpers.create_test_network(self.provider, name)
  215. sg = self.provider.security.security_groups.create(
  216. name=name, description=name, network_id=net.id)
  217. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  218. cidr_ip='0.0.0.0/0')
  219. # attempting to add the same rule twice should succeed
  220. same_rule = sg.add_rule(ip_protocol='tcp', from_port=1111,
  221. to_port=1111, cidr_ip='0.0.0.0/0')
  222. self.assertTrue(
  223. rule == same_rule,
  224. "Expected rule {0} not found in security group: {0}".format(
  225. same_rule, sg.rules))
  226. @helpers.skipIfNoService(['security.security_groups'])
  227. def test_security_group_group_rule(self):
  228. """Test for proper creation of a security group rule."""
  229. name = 'CBTestSecurityGroupD-{0}'.format(uuid.uuid4())
  230. # Declare these variables and late binding will allow
  231. # the cleanup method access to the most current values
  232. net = None
  233. sg = None
  234. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  235. network=net, security_group=sg)):
  236. net, _ = helpers.create_test_network(self.provider, name)
  237. sg = self.provider.security.security_groups.create(
  238. name=name, description=name, network_id=net.id)
  239. self.assertTrue(
  240. len(sg.rules) == 0,
  241. "Expected no security group group rule. Got {0}."
  242. .format(sg.rules))
  243. rule = sg.add_rule(src_group=sg, ip_protocol='tcp', from_port=1,
  244. to_port=65535)
  245. self.assertTrue(
  246. rule.group.name == name,
  247. "Expected security group rule name {0}. Got {1}."
  248. .format(name, rule.group.name))
  249. for r in sg.rules:
  250. r.delete()
  251. sg = self.provider.security.security_groups.get(sg.id) # update
  252. self.assertTrue(
  253. len(sg.rules) == 0,
  254. "Deleting SecurityGroupRule should delete it: {0}".format(
  255. sg.rules))
  256. sgl = self.provider.security.security_groups.list()
  257. found_sg = [g for g in sgl if g.name == name]
  258. self.assertTrue(
  259. len(found_sg) == 0,
  260. "Security group {0} should have been deleted but still exists."
  261. .format(name))