2
0

test_block_store_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. import time
  2. import uuid
  3. from test import helpers
  4. from test.helpers import ProviderTestBase
  5. from cloudbridge.cloud.interfaces import SnapshotState
  6. from cloudbridge.cloud.interfaces import VolumeState
  7. from cloudbridge.cloud.interfaces.resources import AttachmentInfo
  8. import six
  9. class CloudBlockStoreServiceTestCase(ProviderTestBase):
  10. @helpers.skipIfNoService(['block_store.volumes'])
  11. def test_crud_volume(self):
  12. """
  13. Create a new volume, check whether the expected values are set,
  14. and delete it
  15. """
  16. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  17. test_vol = self.provider.block_store.volumes.create(
  18. name,
  19. 1,
  20. helpers.get_provider_test_data(self.provider, "placement"))
  21. def cleanup_vol(vol):
  22. vol.delete()
  23. vol.wait_for([VolumeState.DELETED, VolumeState.UNKNOWN],
  24. terminal_states=[VolumeState.ERROR])
  25. with helpers.cleanup_action(lambda: cleanup_vol(test_vol)):
  26. test_vol.wait_till_ready()
  27. self.assertTrue(
  28. test_vol.id in repr(test_vol),
  29. "repr(obj) should contain the object id so that the object"
  30. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  31. volumes = self.provider.block_store.volumes.list()
  32. list_volumes = [vol for vol in volumes if vol.name == name]
  33. self.assertTrue(
  34. len(list_volumes) == 1,
  35. "List volumes does not return the expected volume %s" %
  36. name)
  37. # check iteration
  38. iter_volumes = [vol for vol in self.provider.block_store.volumes
  39. if vol.name == name]
  40. self.assertTrue(
  41. len(iter_volumes) == 1,
  42. "Iter volumes does not return the expected volume %s" %
  43. name)
  44. # check find
  45. find_vols = self.provider.block_store.volumes.find(name=name)
  46. self.assertTrue(
  47. len(find_vols) == 1,
  48. "Find volumes does not return the expected volume %s" %
  49. name)
  50. # check non-existent find
  51. # TODO: Moto has a bug with filters causing the following test
  52. # to fail. Need to add tag based filtering support for volumes
  53. # find_vols = self.provider.block_store.volumes.find(
  54. # name="non_existent_vol")
  55. # self.assertTrue(
  56. # len(find_vols) == 0,
  57. # "Find() for a non-existent volume returned %s" % find_vols)
  58. get_vol = self.provider.block_store.volumes.get(
  59. test_vol.id)
  60. self.assertTrue(
  61. list_volumes[0] ==
  62. get_vol == test_vol,
  63. "Ids returned by list: {0} and get: {1} are not as "
  64. " expected: {2}" .format(list_volumes[0].id,
  65. get_vol.id,
  66. test_vol.id))
  67. self.assertTrue(
  68. list_volumes[0].name ==
  69. get_vol.name == test_vol.name,
  70. "Names returned by list: {0} and get: {1} are not as "
  71. " expected: {2}" .format(list_volumes[0].name,
  72. get_vol.name,
  73. test_vol.name))
  74. volumes = self.provider.block_store.volumes.list()
  75. found_volumes = [vol for vol in volumes if vol.name == name]
  76. self.assertTrue(
  77. len(found_volumes) == 0,
  78. "Volume %s should have been deleted but still exists." %
  79. name)
  80. @helpers.skipIfNoService(['block_store.volumes'])
  81. def test_attach_detach_volume(self):
  82. """
  83. Create a new volume, and attempt to attach it to an instance
  84. """
  85. instance_name = "CBVolOps-{0}-{1}".format(
  86. self.provider.name,
  87. uuid.uuid4())
  88. # Declare these variables and late binding will allow
  89. # the cleanup method access to the most current values
  90. net = None
  91. test_instance = None
  92. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  93. test_instance, net)):
  94. net, subnet = helpers.create_test_network(
  95. self.provider, instance_name)
  96. test_instance = helpers.get_test_instance(
  97. self.provider, instance_name, subnet=subnet)
  98. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  99. test_vol = self.provider.block_store.volumes.create(
  100. name, 1, test_instance.zone_id)
  101. with helpers.cleanup_action(lambda: test_vol.delete()):
  102. test_vol.wait_till_ready()
  103. test_vol.attach(test_instance, '/dev/sda2')
  104. test_vol.wait_for(
  105. [VolumeState.IN_USE],
  106. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  107. test_vol.detach()
  108. test_vol.wait_for(
  109. [VolumeState.AVAILABLE],
  110. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  111. @helpers.skipIfNoService(['block_store.volumes'])
  112. def test_volume_properties(self):
  113. """
  114. Test volume properties
  115. """
  116. instance_name = "CBVolProps-{0}-{1}".format(
  117. self.provider.name,
  118. uuid.uuid4())
  119. vol_desc = 'newvoldesc1'
  120. # Declare these variables and late binding will allow
  121. # the cleanup method access to the most current values
  122. test_instance = None
  123. net = None
  124. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  125. test_instance, net)):
  126. net, subnet = helpers.create_test_network(
  127. self.provider, instance_name)
  128. test_instance = helpers.get_test_instance(
  129. self.provider, instance_name, subnet=subnet)
  130. name = "CBUnitTestVolProps-{0}".format(uuid.uuid4())
  131. test_vol = self.provider.block_store.volumes.create(
  132. name, 1, test_instance.zone_id, description=vol_desc)
  133. with helpers.cleanup_action(lambda: test_vol.delete()):
  134. test_vol.wait_till_ready()
  135. self.assertTrue(
  136. isinstance(test_vol.size, six.integer_types) and
  137. test_vol.size >= 0,
  138. "Volume.size must be a positive number, but got %s"
  139. % test_vol.size)
  140. self.assertTrue(
  141. test_vol.description is None or
  142. isinstance(test_vol.description, six.string_types),
  143. "Volume.description must be None or a string. Got: %s"
  144. % test_vol.description)
  145. self.assertIsNone(test_vol.source)
  146. self.assertIsNone(test_vol.source)
  147. self.assertIsNotNone(test_vol.create_time)
  148. self.assertIsNotNone(test_vol.zone_id)
  149. self.assertIsNone(test_vol.attachments)
  150. test_vol.attach(test_instance, '/dev/sda2')
  151. test_vol.wait_for(
  152. [VolumeState.IN_USE],
  153. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  154. self.assertIsNotNone(test_vol.attachments)
  155. self.assertIsInstance(test_vol.attachments, AttachmentInfo)
  156. self.assertEqual(test_vol.attachments.volume, test_vol)
  157. self.assertEqual(test_vol.attachments.instance_id,
  158. test_instance.id)
  159. self.assertEqual(test_vol.attachments.device,
  160. "/dev/sda2")
  161. test_vol.detach()
  162. test_vol.name = 'newvolname1'
  163. test_vol.wait_for(
  164. [VolumeState.AVAILABLE],
  165. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  166. self.assertEqual(test_vol.name, 'newvolname1')
  167. self.assertEqual(test_vol.description, vol_desc)
  168. self.assertIsNone(test_vol.attachments)
  169. test_vol.wait_for(
  170. [VolumeState.AVAILABLE],
  171. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  172. @helpers.skipIfNoService(['block_store.snapshots'])
  173. def test_crud_snapshot(self):
  174. """
  175. Create a new volume, create a snapshot of the volume, and check
  176. whether list_snapshots properly detects the new snapshot.
  177. Delete everything afterwards.
  178. """
  179. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  180. test_vol = self.provider.block_store.volumes.create(
  181. name,
  182. 1,
  183. helpers.get_provider_test_data(self.provider, "placement"))
  184. with helpers.cleanup_action(lambda: test_vol.delete()):
  185. test_vol.wait_till_ready()
  186. snap_name = "CBSnapshot-{0}".format(name)
  187. test_snap = test_vol.create_snapshot(name=snap_name,
  188. description=snap_name)
  189. def cleanup_snap(snap):
  190. snap.delete()
  191. snap.wait_for(
  192. [SnapshotState.UNKNOWN],
  193. terminal_states=[SnapshotState.ERROR])
  194. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  195. test_snap.wait_till_ready()
  196. self.assertTrue(
  197. test_snap.id in repr(test_snap),
  198. "repr(obj) should contain the object id so that the object"
  199. " can be reconstructed, but does not.")
  200. snaps = self.provider.block_store.snapshots.list()
  201. list_snaps = [snap for snap in snaps
  202. if snap.name == snap_name]
  203. self.assertTrue(
  204. len(list_snaps) == 1,
  205. "List snapshots does not return the expected volume %s" %
  206. name)
  207. # check iteration
  208. iter_snaps = [
  209. snap for snap in self.provider.block_store.snapshots
  210. if snap.name == snap_name]
  211. self.assertTrue(
  212. len(iter_snaps) == 1,
  213. "Iter snapshots does not return the expected volume %s" %
  214. name)
  215. # check find
  216. find_snap = self.provider.block_store.snapshots.find(
  217. name=snap_name)
  218. self.assertTrue(
  219. len(find_snap) == 1,
  220. "Find snaps does not return the expected snapshot %s" %
  221. name)
  222. # check non-existent find
  223. # TODO: Moto has a bug with filters causing the following test
  224. # to fail. Need to add tag based filtering support for snaps
  225. # find_snap = self.provider.block_store.snapshots.find(
  226. # name="non_existent_snap")
  227. # self.assertTrue(
  228. # len(find_snap) == 0,
  229. # "Find() for a non-existent snap returned %s" %
  230. # find_snap)
  231. get_snap = self.provider.block_store.snapshots.get(
  232. test_snap.id)
  233. self.assertTrue(
  234. list_snaps[0] ==
  235. get_snap == test_snap,
  236. "Ids returned by list: {0} and get: {1} are not as "
  237. " expected: {2}" .format(list_snaps[0].id,
  238. get_snap.id,
  239. test_snap.id))
  240. self.assertTrue(
  241. list_snaps[0].name ==
  242. get_snap.name == test_snap.name,
  243. "Names returned by list: {0} and get: {1} are not as "
  244. " expected: {2}" .format(list_snaps[0].name,
  245. get_snap.name,
  246. test_snap.name))
  247. # Test volume creation from a snapshot (via VolumeService)
  248. sv_name = "CBUnitTestSnapVol-{0}".format(name)
  249. snap_vol = self.provider.block_store.volumes.create(
  250. sv_name,
  251. 1,
  252. helpers.get_provider_test_data(self.provider, "placement"),
  253. snapshot=test_snap)
  254. with helpers.cleanup_action(lambda: snap_vol.delete()):
  255. snap_vol.wait_till_ready()
  256. # Test volume creation from a snapshot (via Snapshot)
  257. snap_vol2 = test_snap.create_volume(
  258. helpers.get_provider_test_data(self.provider, "placement"))
  259. with helpers.cleanup_action(lambda: snap_vol2.delete()):
  260. snap_vol2.wait_till_ready()
  261. snaps = self.provider.block_store.snapshots.list()
  262. found_snaps = [snap for snap in snaps
  263. if snap.name == snap_name]
  264. self.assertTrue(
  265. len(found_snaps) == 0,
  266. "Snapshot %s should have been deleted but still exists." %
  267. snap_name)
  268. # Test creation of a snap via SnapshotService
  269. snap_too_name = "CBSnapToo-{0}".format(name)
  270. time.sleep(15) # Or get SnapshotCreationPerVolumeRateExceeded
  271. test_snap_too = self.provider.block_store.snapshots.create(
  272. name=snap_too_name, volume=test_vol, description=snap_too_name)
  273. with helpers.cleanup_action(lambda: cleanup_snap(test_snap_too)):
  274. test_snap_too.wait_till_ready()
  275. self.assertTrue(
  276. test_snap_too.id in repr(test_snap_too),
  277. "repr(obj) should contain the object id so that the object"
  278. " can be reconstructed, but does not.")
  279. @helpers.skipIfNoService(['block_store.snapshots'])
  280. def test_snapshot_properties(self):
  281. """
  282. Test snapshot properties
  283. """
  284. name = "CBTestSnapProp-{0}".format(uuid.uuid4())
  285. test_vol = self.provider.block_store.volumes.create(
  286. name,
  287. 1,
  288. helpers.get_provider_test_data(self.provider, "placement"))
  289. with helpers.cleanup_action(lambda: test_vol.delete()):
  290. test_vol.wait_till_ready()
  291. snap_name = "CBSnapProp-{0}".format(name)
  292. test_snap = test_vol.create_snapshot(name=snap_name,
  293. description=snap_name)
  294. def cleanup_snap(snap):
  295. snap.delete()
  296. snap.wait_for(
  297. [SnapshotState.UNKNOWN],
  298. terminal_states=[SnapshotState.ERROR])
  299. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  300. test_snap.wait_till_ready()
  301. self.assertTrue(isinstance(test_vol.size, six.integer_types))
  302. self.assertEqual(
  303. test_snap.size, test_vol.size,
  304. "Snapshot.size must match original volume's size: %s"
  305. " but is: %s" % (test_vol.size, test_snap.size))
  306. self.assertTrue(
  307. test_vol.description is None or
  308. isinstance(test_vol.description, six.string_types),
  309. "Snapshot.description must be None or a string. Got: %s"
  310. % test_vol.description)
  311. self.assertEqual(test_vol.id, test_snap.volume_id)
  312. self.assertIsNotNone(test_vol.create_time)
  313. test_snap.name = 'snapnewname1'
  314. test_snap.description = 'snapnewdescription1'
  315. test_snap.refresh()
  316. self.assertEqual(test_snap.name, 'snapnewname1')
  317. self.assertEqual(test_snap.description, 'snapnewdescription1')