test_security_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. """Test cloudbridge.security modules."""
  2. import json
  3. import unittest
  4. import uuid
  5. from test import helpers
  6. from test.helpers import ProviderTestBase
  7. from cloudbridge.cloud.interfaces import TestMockHelperMixin
  8. class CloudSecurityServiceTestCase(ProviderTestBase):
  9. @helpers.skipIfNoService(['security.key_pairs'])
  10. def test_crud_key_pair_service(self):
  11. name = 'cbtestkeypairA-{0}'.format(uuid.uuid4())
  12. kp = self.provider.security.key_pairs.create(name=name)
  13. with helpers.cleanup_action(
  14. lambda:
  15. self.provider.security.key_pairs.delete(key_pair_id=kp.id)
  16. ):
  17. # test list method
  18. kpl = self.provider.security.key_pairs.list()
  19. list_kpl = [i for i in kpl if i.id == kp.id]
  20. self.assertTrue(
  21. len(list_kpl) == 1,
  22. "List key pairs does not return the expected key pair %s" %
  23. name)
  24. # check iteration
  25. iter_kpl = [i for i in self.provider.security.key_pairs
  26. if i.id == kp.id]
  27. self.assertTrue(
  28. len(iter_kpl) == 1,
  29. "Iter key pairs does not return the expected key pair %s" %
  30. name)
  31. # check find
  32. find_kp = self.provider.security.key_pairs.find(name=kp.name)[0]
  33. self.assertTrue(
  34. find_kp == kp,
  35. "Find key pair did not return the expected key {0}."
  36. .format(name))
  37. # check get
  38. get_kp = self.provider.security.key_pairs.get(kp.id)
  39. self.assertTrue(
  40. get_kp == kp,
  41. "Get key pair did not return the expected key {0}."
  42. .format(name))
  43. # Recreating existing keypair should raise an exception
  44. with self.assertRaises(Exception):
  45. self.provider.security.key_pairs.create(name=name)
  46. kpl = self.provider.security.key_pairs.list()
  47. found_kp = [k for k in kpl if k.id == kp.id]
  48. self.assertTrue(
  49. len(found_kp) == 0,
  50. "Key pair {0} should have been deleted but still exists."
  51. .format(name))
  52. no_kp = self.provider.security.key_pairs.find(name='bogus_kp')
  53. self.assertFalse(
  54. no_kp,
  55. "Found a key pair {0} that should not exist?".format(no_kp))
  56. @helpers.skipIfNoService(['security.key_pairs'])
  57. def test_key_pair(self):
  58. name = 'cbtestkeypairB-{0}'.format(uuid.uuid4())
  59. kp = self.provider.security.key_pairs.create(name=name)
  60. with helpers.cleanup_action(lambda: kp.delete()):
  61. kpl = self.provider.security.key_pairs.list()
  62. found_kp = [k for k in kpl if k.id == kp.id]
  63. self.assertTrue(
  64. len(found_kp) == 1,
  65. "List key pairs did not return the expected key {0}."
  66. .format(name))
  67. self.assertTrue(
  68. kp.id in repr(kp),
  69. "repr(obj) should contain the object id so that the object"
  70. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  71. self.assertIsNotNone(
  72. kp.material,
  73. "KeyPair material is empty but it should not be.")
  74. self.assertTrue(
  75. kp == kp,
  76. "The same key pair should be equal to self.")
  77. # check json deserialization
  78. self.assertTrue(json.loads(kp.to_json()),
  79. "to_json must yield a valid json string: {0}"
  80. .format(kp.to_json()))
  81. kpl = self.provider.security.key_pairs.list()
  82. found_kp = [k for k in kpl if k.id == kp.id]
  83. self.assertTrue(
  84. len(found_kp) == 0,
  85. "Key pair {0} should have been deleted but still exists."
  86. .format(name))
  87. def cleanup_sg(self, sg, net):
  88. with helpers.cleanup_action(
  89. lambda: self.provider.network.delete(network_id=net.id)):
  90. self.provider.security.security_groups.delete(group_id=sg.id)
  91. @helpers.skipIfNoService(['security.security_groups'])
  92. def test_crud_security_group_service(self):
  93. name = 'CBTestSecurityGroupA-{0}'.format(uuid.uuid4())
  94. # Declare these variables and late binding will allow
  95. # the cleanup method access to the most current values
  96. net = None
  97. sg = None
  98. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  99. network=net, security_group=sg)):
  100. net, _ = helpers.create_test_network(self.provider, name)
  101. sg = self.provider.security.security_groups.create(
  102. name=name, description=name, network_id=net.id)
  103. self.assertEqual(name, sg.description)
  104. # test list method
  105. sgl = self.provider.security.security_groups.list()
  106. found_sgl = [i for i in sgl if i.name == name]
  107. self.assertTrue(
  108. len(found_sgl) == 1,
  109. "List security groups does not return the expected group %s" %
  110. name)
  111. # check iteration
  112. found_sgl = [i for i in self.provider.security.security_groups
  113. if i.name == name]
  114. self.assertTrue(
  115. len(found_sgl) == 1,
  116. "Iter security groups does not return the expected group %s" %
  117. name)
  118. # check find
  119. find_sg = self.provider.security.security_groups.find(name=sg.name)
  120. self.assertTrue(
  121. len(find_sg) == 1,
  122. "List security groups returned {0} when expected was: {1}."
  123. .format(find_sg, sg.name))
  124. # check get
  125. get_sg = self.provider.security.security_groups.get(sg.id)
  126. self.assertTrue(
  127. get_sg == sg,
  128. "Get SecurityGroup did not return the expected key {0}."
  129. .format(name))
  130. self.assertTrue(
  131. sg.id in repr(sg),
  132. "repr(obj) should contain the object id so that the object"
  133. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  134. sgl = self.provider.security.security_groups.list()
  135. found_sg = [g for g in sgl if g.name == name]
  136. self.assertTrue(
  137. len(found_sg) == 0,
  138. "Security group {0} should have been deleted but still exists."
  139. .format(name))
  140. no_sg = self.provider.security.security_groups.find(name='bogus_sg')
  141. self.assertTrue(
  142. len(no_sg) == 0,
  143. "Found a bogus security group?!?".format(no_sg))
  144. @helpers.skipIfNoService(['security.security_groups'])
  145. def test_security_group(self):
  146. """Test for proper creation of a security group."""
  147. name = 'CBTestSecurityGroupB-{0}'.format(uuid.uuid4())
  148. # Declare these variables and late binding will allow
  149. # the cleanup method access to the most current values
  150. net = None
  151. sg = None
  152. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  153. network=net, security_group=sg)):
  154. net, _ = helpers.create_test_network(self.provider, name)
  155. sg = self.provider.security.security_groups.create(
  156. name=name, description=name, network_id=net.id)
  157. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  158. cidr_ip='0.0.0.0/0')
  159. found_rule = sg.get_rule(ip_protocol='tcp', from_port=1111,
  160. to_port=1111, cidr_ip='0.0.0.0/0')
  161. self.assertTrue(
  162. rule == found_rule,
  163. "Expected rule {0} not found in security group: {1}".format(
  164. rule, sg.rules))
  165. object_keys = (
  166. sg.rules[0].ip_protocol,
  167. sg.rules[0].from_port,
  168. sg.rules[0].to_port)
  169. self.assertTrue(
  170. all(str(key) in repr(sg.rules[0]) for key in object_keys),
  171. "repr(obj) should contain ip_protocol, form_port, and to_port"
  172. " so that the object can be reconstructed, but does not:"
  173. " {0}; {1}".format(sg.rules[0], object_keys))
  174. self.assertTrue(
  175. sg == sg,
  176. "The same security groups should be equal?")
  177. self.assertFalse(
  178. sg != sg,
  179. "The same security groups should still be equal?")
  180. # json_repr = json.dumps(
  181. # {"description": name, "name": name, "id": sg.id,
  182. # "rules":
  183. # [{"from_port": 1111, "group": "", "cidr_ip": "0.0.0.0/0",
  184. # "parent": sg.id, "to_port": 1111, "ip_protocol": "tcp",
  185. # "id": sg.rules[0].id}]},
  186. # sort_keys=True)
  187. # self.assertTrue(
  188. # sg.to_json() == json_repr,
  189. # "JSON SG representation {0} does not match expected {1}"
  190. # .format(sg.to_json(), json_repr))
  191. sgl = self.provider.security.security_groups.list()
  192. found_sg = [g for g in sgl if g.name == name]
  193. self.assertTrue(
  194. len(found_sg) == 0,
  195. "Security group {0} should have been deleted but still exists."
  196. .format(name))
  197. @helpers.skipIfNoService(['security.security_groups'])
  198. def test_security_group_rule_add_twice(self):
  199. """Test whether adding the same rule twice succeeds."""
  200. if isinstance(self.provider, TestMockHelperMixin):
  201. raise unittest.SkipTest(
  202. "Mock provider returns InvalidParameterValue: "
  203. "Value security_group is invalid for parameter.")
  204. name = 'CBTestSecurityGroupC-{0}'.format(uuid.uuid4())
  205. # Declare these variables and late binding will allow
  206. # the cleanup method access to the most current values
  207. net = None
  208. sg = None
  209. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  210. network=net, security_group=sg)):
  211. net, _ = helpers.create_test_network(self.provider, name)
  212. sg = self.provider.security.security_groups.create(
  213. name=name, description=name, network_id=net.id)
  214. rule = sg.add_rule(ip_protocol='tcp', from_port=1111, to_port=1111,
  215. cidr_ip='0.0.0.0/0')
  216. # attempting to add the same rule twice should succeed
  217. same_rule = sg.add_rule(ip_protocol='tcp', from_port=1111,
  218. to_port=1111, cidr_ip='0.0.0.0/0')
  219. self.assertTrue(
  220. rule == same_rule,
  221. "Expected rule {0} not found in security group: {1}".format(
  222. same_rule, sg.rules))
  223. @helpers.skipIfNoService(['security.security_groups'])
  224. def test_security_group_group_rule(self):
  225. """Test for proper creation of a security group rule."""
  226. name = 'CBTestSecurityGroupD-{0}'.format(uuid.uuid4())
  227. # Declare these variables and late binding will allow
  228. # the cleanup method access to the most current values
  229. net = None
  230. sg = None
  231. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  232. network=net, security_group=sg)):
  233. net, _ = helpers.create_test_network(self.provider, name)
  234. sg = self.provider.security.security_groups.create(
  235. name=name, description=name, network_id=net.id)
  236. self.assertTrue(
  237. len(sg.rules) == 0,
  238. "Expected no security group group rule. Got {0}."
  239. .format(sg.rules))
  240. rule = sg.add_rule(src_group=sg, ip_protocol='tcp', from_port=1,
  241. to_port=65535)
  242. self.assertTrue(
  243. rule.group.name == name,
  244. "Expected security group rule name {0}. Got {1}."
  245. .format(name, rule.group.name))
  246. for r in sg.rules:
  247. r.delete()
  248. sg = self.provider.security.security_groups.get(sg.id) # update
  249. self.assertTrue(
  250. sg is None or len(sg.rules) == 0,
  251. "Deleting SecurityGroupRule should delete it: {0}".format(
  252. [] if sg is None else sg.rules))
  253. sgl = self.provider.security.security_groups.list()
  254. found_sg = [g for g in sgl if g.name == name]
  255. self.assertTrue(
  256. len(found_sg) == 0,
  257. "Security group {0} should have been deleted but still exists."
  258. .format(name))