test_block_store_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. import uuid
  2. import six
  3. from cloudbridge.cloud.interfaces import SnapshotState
  4. from cloudbridge.cloud.interfaces import VolumeState
  5. from test.helpers import ProviderTestBase
  6. import test.helpers as helpers
  7. class CloudBlockStoreServiceTestCase(ProviderTestBase):
  8. def __init__(self, methodName, provider):
  9. super(CloudBlockStoreServiceTestCase, self).__init__(
  10. methodName=methodName, provider=provider)
  11. def test_crud_volume(self):
  12. """
  13. Create a new volume, check whether the expected values are set,
  14. and delete it
  15. """
  16. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  17. test_vol = self.provider.block_store.volumes.create(
  18. name,
  19. 1,
  20. helpers.get_provider_test_data(self.provider, "placement"))
  21. def cleanup_vol(vol):
  22. vol.delete()
  23. vol.wait_for([VolumeState.DELETED, VolumeState.UNKNOWN],
  24. terminal_states=[VolumeState.ERROR])
  25. with helpers.cleanup_action(lambda: cleanup_vol(test_vol)):
  26. test_vol.wait_till_ready()
  27. self.assertTrue(
  28. test_vol.id in repr(test_vol),
  29. "repr(obj) should contain the object id so that the object"
  30. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  31. volumes = self.provider.block_store.volumes.list()
  32. list_volumes = [vol for vol in volumes if vol.name == name]
  33. self.assertTrue(
  34. len(list_volumes) == 1,
  35. "List volumes does not return the expected volume %s" %
  36. name)
  37. # check iteration
  38. iter_volumes = [vol for vol in self.provider.block_store.volumes
  39. if vol.name == name]
  40. self.assertTrue(
  41. len(iter_volumes) == 1,
  42. "Iter volumes does not return the expected volume %s" %
  43. name)
  44. # check find
  45. find_vols = self.provider.block_store.volumes.find(name=name)
  46. self.assertTrue(
  47. len(find_vols) == 1,
  48. "Find volumes does not return the expected volume %s" %
  49. name)
  50. # check non-existent find
  51. # TODO: Moto has a bug with filters causing the following test
  52. # to fail. Need to add tag based filtering support for volumes
  53. # find_vols = self.provider.block_store.volumes.find(
  54. # name="non_existent_vol")
  55. # self.assertTrue(
  56. # len(find_vols) == 0,
  57. # "Find() for a non-existent volume returned %s" % find_vols)
  58. get_vol = self.provider.block_store.volumes.get(
  59. test_vol.id)
  60. self.assertTrue(
  61. list_volumes[0] ==
  62. get_vol == test_vol,
  63. "Ids returned by list: {0} and get: {1} are not as "
  64. " expected: {2}" .format(list_volumes[0].id,
  65. get_vol.id,
  66. test_vol.id))
  67. self.assertTrue(
  68. list_volumes[0].name ==
  69. get_vol.name == test_vol.name,
  70. "Names returned by list: {0} and get: {1} are not as "
  71. " expected: {2}" .format(list_volumes[0].name,
  72. get_vol.name,
  73. test_vol.name))
  74. volumes = self.provider.block_store.volumes.list()
  75. found_volumes = [vol for vol in volumes if vol.name == name]
  76. self.assertTrue(
  77. len(found_volumes) == 0,
  78. "Volume %s should have been deleted but still exists." %
  79. name)
  80. def test_attach_detach_volume(self):
  81. """
  82. Create a new volume, and attempt to attach it to an instance
  83. """
  84. instance_name = "CBVolOps-{0}-{1}".format(
  85. self.provider.name,
  86. uuid.uuid4())
  87. test_instance = helpers.get_test_instance(self.provider, instance_name)
  88. with helpers.cleanup_action(lambda: test_instance.terminate()):
  89. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  90. test_vol = self.provider.block_store.volumes.create(
  91. name, 1, test_instance.placement_zone)
  92. with helpers.cleanup_action(lambda: test_vol.delete()):
  93. test_vol.wait_till_ready()
  94. test_vol.attach(test_instance, '/dev/sda2')
  95. test_vol.wait_for(
  96. [VolumeState.IN_USE],
  97. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  98. test_vol.detach()
  99. test_vol.wait_for(
  100. [VolumeState.AVAILABLE],
  101. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  102. def test_volume_properties(self):
  103. """
  104. Test volume properties
  105. """
  106. instance_name = "CBVolProps-{0}-{1}".format(
  107. self.provider.name,
  108. uuid.uuid4())
  109. test_instance = helpers.get_test_instance(self.provider, instance_name)
  110. with helpers.cleanup_action(lambda: test_instance.terminate()):
  111. name = "CBUnitTestVolProps-{0}".format(uuid.uuid4())
  112. test_vol = self.provider.block_store.volumes.create(
  113. name, 1, test_instance.placement_zone)
  114. with helpers.cleanup_action(lambda: test_vol.delete()):
  115. test_vol.wait_till_ready()
  116. self.assertTrue(
  117. isinstance(test_vol.size, six.integer_types) and
  118. test_vol.size >= 0,
  119. "Volume.size must be a positive number, but got %s"
  120. % test_vol.size)
  121. self.assertTrue(
  122. test_vol.description is None or
  123. isinstance(test_vol.description, six.string_types),
  124. "Volume.description must be None or a string. Got: %s"
  125. % test_vol.description)
  126. self.assertIsNone(test_vol.source)
  127. self.assertIsNone(test_vol.source)
  128. self.assertIsNotNone(test_vol.create_time)
  129. self.assertIsNotNone(test_vol.zone_id)
  130. self.assertIsNone(test_vol.attachments)
  131. test_vol.attach(test_instance, '/dev/sda2')
  132. test_vol.wait_for(
  133. [VolumeState.IN_USE],
  134. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  135. self.assertIsNotNone(test_vol.attachments)
  136. self.assertEqual(test_vol.attachments.volume, test_vol)
  137. self.assertEqual(test_vol.attachments.instance_id,
  138. test_instance.id)
  139. self.assertEqual(test_vol.attachments.device,
  140. "/dev/sda2")
  141. test_vol.detach()
  142. # Force a refresh before checking attachment status
  143. test_vol.refresh()
  144. self.assertIsNone(test_vol.attachments)
  145. test_vol.wait_for(
  146. [VolumeState.AVAILABLE],
  147. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  148. def test_crud_snapshot(self):
  149. """
  150. Create a new volume, create a snapshot of the volume, and check
  151. whether list_snapshots properly detects the new snapshot.
  152. Delete everything afterwards.
  153. """
  154. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  155. test_vol = self.provider.block_store.volumes.create(
  156. name,
  157. 1,
  158. helpers.get_provider_test_data(self.provider, "placement"))
  159. with helpers.cleanup_action(lambda: test_vol.delete()):
  160. test_vol.wait_till_ready()
  161. snap_name = "CBSnapshot-{0}".format(name)
  162. test_snap = test_vol.create_snapshot(name=snap_name,
  163. description=snap_name)
  164. def cleanup_snap(snap):
  165. snap.delete()
  166. snap.wait_for(
  167. [SnapshotState.UNKNOWN],
  168. terminal_states=[SnapshotState.ERROR])
  169. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  170. test_snap.wait_till_ready()
  171. self.assertTrue(
  172. test_snap.id in repr(test_snap),
  173. "repr(obj) should contain the object id so that the object"
  174. " can be reconstructed, but does not.")
  175. snaps = self.provider.block_store.snapshots.list()
  176. list_snaps = [snap for snap in snaps
  177. if snap.name == snap_name]
  178. self.assertTrue(
  179. len(list_snaps) == 1,
  180. "List snapshots does not return the expected volume %s" %
  181. name)
  182. # check iteration
  183. iter_snaps = [
  184. snap for snap in self.provider.block_store.snapshots
  185. if snap.name == snap_name]
  186. self.assertTrue(
  187. len(iter_snaps) == 1,
  188. "Iter snapshots does not return the expected volume %s" %
  189. name)
  190. # check find
  191. find_snap = self.provider.block_store.snapshots.find(
  192. name=snap_name)
  193. self.assertTrue(
  194. len(find_snap) == 1,
  195. "Find snaps does not return the expected snapshot %s" %
  196. name)
  197. # check non-existent find
  198. # TODO: Moto has a bug with filters causing the following test
  199. # to fail. Need to add tag based filtering support for snaps
  200. # find_snap = self.provider.block_store.snapshots.find(
  201. # name="non_existent_snap")
  202. # self.assertTrue(
  203. # len(find_snap) == 0,
  204. # "Find() for a non-existent snap returned %s" %
  205. # find_snap)
  206. get_snap = self.provider.block_store.snapshots.get(
  207. test_snap.id)
  208. self.assertTrue(
  209. list_snaps[0] ==
  210. get_snap == test_snap,
  211. "Ids returned by list: {0} and get: {1} are not as "
  212. " expected: {2}" .format(list_snaps[0].id,
  213. get_snap.id,
  214. test_snap.id))
  215. self.assertTrue(
  216. list_snaps[0].name ==
  217. get_snap.name == test_snap.name,
  218. "Names returned by list: {0} and get: {1} are not as "
  219. " expected: {2}" .format(list_snaps[0].name,
  220. get_snap.name,
  221. test_snap.name))
  222. # Test volume creation from a snapshot (via VolumeService)
  223. sv_name = "CBUnitTestSnapVol-{0}".format(name)
  224. snap_vol = self.provider.block_store.volumes.create(
  225. sv_name,
  226. 1,
  227. helpers.get_provider_test_data(self.provider, "placement"),
  228. snapshot=test_snap)
  229. with helpers.cleanup_action(lambda: snap_vol.delete()):
  230. snap_vol.wait_till_ready()
  231. # Test volume creation from a snapshot (via Snapshot)
  232. snap_vol2 = test_snap.create_volume(
  233. helpers.get_provider_test_data(self.provider, "placement"))
  234. with helpers.cleanup_action(lambda: snap_vol2.delete()):
  235. snap_vol2.wait_till_ready()
  236. snaps = self.provider.block_store.snapshots.list()
  237. found_snaps = [snap for snap in snaps
  238. if snap.name == snap_name]
  239. self.assertTrue(
  240. len(found_snaps) == 0,
  241. "Snapshot %s should have been deleted but still exists." %
  242. snap_name)
  243. # Test creation of a snap via SnapshotService
  244. snap_too_name = "CBSnapToo-{0}".format(name)
  245. test_snap_too = self.provider.block_store.snapshots.create(
  246. name=snap_too_name, volume=test_vol, description=snap_too_name)
  247. with helpers.cleanup_action(lambda: cleanup_snap(test_snap_too)):
  248. test_snap_too.wait_till_ready()
  249. self.assertTrue(
  250. test_snap_too.id in repr(test_snap_too),
  251. "repr(obj) should contain the object id so that the object"
  252. " can be reconstructed, but does not.")