test_block_store_service.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import uuid
  2. from cloudbridge.cloud.interfaces import SnapshotState
  3. from cloudbridge.cloud.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class CloudBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(CloudBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def test_crud_volume(self):
  11. """
  12. Create a new volume, check whether the expected values are set,
  13. and delete it
  14. """
  15. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  16. test_vol = self.provider.block_store.volumes.create(
  17. name,
  18. 1,
  19. helpers.get_provider_test_data(self.provider, "placement"))
  20. def cleanup_vol(vol):
  21. vol.delete()
  22. vol.wait_for([VolumeState.DELETED, VolumeState.UNKNOWN],
  23. terminal_states=[VolumeState.ERROR])
  24. with helpers.cleanup_action(lambda: cleanup_vol(test_vol)):
  25. test_vol.wait_till_ready()
  26. self.assertTrue(
  27. test_vol.id in repr(test_vol),
  28. "repr(obj) should contain the object id so that the object"
  29. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  30. volumes = self.provider.block_store.volumes.list()
  31. list_volumes = [vol for vol in volumes if vol.name == name]
  32. self.assertTrue(
  33. len(list_volumes) == 1,
  34. "List volumes does not return the expected volume %s" %
  35. name)
  36. # check iteration
  37. iter_volumes = [vol for vol in self.provider.block_store.volumes
  38. if vol.name == name]
  39. self.assertTrue(
  40. len(iter_volumes) == 1,
  41. "Iter volumes does not return the expected volume %s" %
  42. name)
  43. # check find
  44. find_vols = self.provider.block_store.volumes.find(name=name)
  45. self.assertTrue(
  46. len(find_vols) == 1,
  47. "Find volumes does not return the expected volume %s" %
  48. name)
  49. # check non-existent find
  50. # TODO: Moto has a bug with filters causing the following test
  51. # to fail. Need to add tag based filtering support for volumes
  52. # find_vols = self.provider.block_store.volumes.find(
  53. # name="non_existent_vol")
  54. # self.assertTrue(
  55. # len(find_vols) == 0,
  56. # "Find() for a non-existent volume returned %s" % find_vols)
  57. get_vol = self.provider.block_store.volumes.get(
  58. test_vol.id)
  59. self.assertTrue(
  60. list_volumes[0] ==
  61. get_vol == test_vol,
  62. "Ids returned by list: {0} and get: {1} are not as "
  63. " expected: {2}" .format(list_volumes[0].id,
  64. get_vol.id,
  65. test_vol.id))
  66. self.assertTrue(
  67. list_volumes[0].name ==
  68. get_vol.name == test_vol.name,
  69. "Names returned by list: {0} and get: {1} are not as "
  70. " expected: {2}" .format(list_volumes[0].name,
  71. get_vol.name,
  72. test_vol.name))
  73. volumes = self.provider.block_store.volumes.list()
  74. found_volumes = [vol for vol in volumes if vol.name == name]
  75. self.assertTrue(
  76. len(found_volumes) == 0,
  77. "Volume %s should have been deleted but still exists." %
  78. name)
  79. def test_attach_detach_volume(self):
  80. """
  81. Create a new volume, and attempt to attach it to an instance
  82. """
  83. instance_name = "CBVolOps-{0}-{1}".format(
  84. self.provider.name,
  85. uuid.uuid4())
  86. test_instance = helpers.get_test_instance(self.provider, instance_name)
  87. with helpers.cleanup_action(lambda: test_instance.terminate()):
  88. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  89. test_vol = self.provider.block_store.volumes.create(
  90. name, 1, test_instance.placement_zone)
  91. with helpers.cleanup_action(lambda: test_vol.delete()):
  92. test_vol.wait_till_ready()
  93. test_vol.attach(test_instance, '/dev/sda2')
  94. test_vol.wait_for(
  95. [VolumeState.IN_USE],
  96. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  97. test_vol.detach()
  98. test_vol.wait_for(
  99. [VolumeState.AVAILABLE],
  100. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  101. def test_crud_snapshot(self):
  102. """
  103. Create a new volume, create a snapshot of the volume, and check
  104. whether list_snapshots properly detects the new snapshot.
  105. Delete everything afterwards.
  106. """
  107. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  108. test_vol = self.provider.block_store.volumes.create(
  109. name,
  110. 1,
  111. helpers.get_provider_test_data(self.provider, "placement"))
  112. with helpers.cleanup_action(lambda: test_vol.delete()):
  113. test_vol.wait_till_ready()
  114. snap_name = "CBSnapshot-{0}".format(name)
  115. test_snap = test_vol.create_snapshot(name=snap_name,
  116. description=snap_name)
  117. def cleanup_snap(snap):
  118. snap.delete()
  119. snap.wait_for(
  120. [SnapshotState.UNKNOWN],
  121. terminal_states=[SnapshotState.ERROR])
  122. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  123. test_snap.wait_till_ready()
  124. self.assertTrue(
  125. test_snap.id in repr(test_snap),
  126. "repr(obj) should contain the object id so that the object"
  127. " can be reconstructed, but does not.")
  128. snaps = self.provider.block_store.snapshots.list()
  129. list_snaps = [snap for snap in snaps
  130. if snap.name == snap_name]
  131. self.assertTrue(
  132. len(list_snaps) == 1,
  133. "List snapshots does not return the expected volume %s" %
  134. name)
  135. # check iteration
  136. iter_snaps = [
  137. snap for snap in self.provider.block_store.snapshots
  138. if snap.name == snap_name]
  139. self.assertTrue(
  140. len(iter_snaps) == 1,
  141. "Iter snapshots does not return the expected volume %s" %
  142. name)
  143. # check find
  144. find_snap = self.provider.block_store.snapshots.find(
  145. name=snap_name)
  146. self.assertTrue(
  147. len(find_snap) == 1,
  148. "Find snaps does not return the expected snapshot %s" %
  149. name)
  150. # check non-existent find
  151. # TODO: Moto has a bug with filters causing the following test
  152. # to fail. Need to add tag based filtering support for snaps
  153. # find_snap = self.provider.block_store.snapshots.find(
  154. # name="non_existent_snap")
  155. # self.assertTrue(
  156. # len(find_snap) == 0,
  157. # "Find() for a non-existent snap returned %s" %
  158. # find_snap)
  159. get_snap = self.provider.block_store.snapshots.get(
  160. test_snap.id)
  161. self.assertTrue(
  162. list_snaps[0] ==
  163. get_snap == test_snap,
  164. "Ids returned by list: {0} and get: {1} are not as "
  165. " expected: {2}" .format(list_snaps[0].id,
  166. get_snap.id,
  167. test_snap.id))
  168. self.assertTrue(
  169. list_snaps[0].name ==
  170. get_snap.name == test_snap.name,
  171. "Names returned by list: {0} and get: {1} are not as "
  172. " expected: {2}" .format(list_snaps[0].name,
  173. get_snap.name,
  174. test_snap.name))
  175. # Create volume creation based on this snapshot
  176. sv_name = "CBUnitTestSnapVol-{0}".format(uuid.uuid4())
  177. snap_vol = self.provider.block_store.volumes.create(
  178. sv_name,
  179. 1,
  180. helpers.get_provider_test_data(self.provider, "placement"),
  181. snapshot=test_snap)
  182. with helpers.cleanup_action(lambda: snap_vol.delete()):
  183. snap_vol.wait_till_ready()
  184. snaps = self.provider.block_store.snapshots.list()
  185. found_snaps = [snap for snap in snaps
  186. if snap.name == snap_name]
  187. self.assertTrue(
  188. len(found_snaps) == 0,
  189. "Snapshot %s should have been deleted but still exists." %
  190. snap_name)