test_block_store_service.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import uuid
  2. from cloudbridge.cloud.interfaces import SnapshotState
  3. from cloudbridge.cloud.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class CloudBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(CloudBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def test_crud_volume(self):
  11. """
  12. Create a new volume, check whether the expected values are set,
  13. and delete it
  14. """
  15. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  16. test_vol = self.provider.block_store.volumes.create(
  17. name,
  18. 1,
  19. helpers.get_provider_test_data(self.provider, "placement"))
  20. def cleanup_vol(vol):
  21. vol.delete()
  22. vol.wait_for([VolumeState.DELETED, VolumeState.UNKNOWN],
  23. terminal_states=[VolumeState.ERROR])
  24. with helpers.cleanup_action(lambda: cleanup_vol(test_vol)):
  25. test_vol.wait_till_ready()
  26. self.assertTrue(
  27. test_vol.id in repr(test_vol),
  28. "repr(obj) should contain the object id so that the object"
  29. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  30. volumes = self.provider.block_store.volumes.list()
  31. list_volumes = [vol for vol in volumes if vol.name == name]
  32. self.assertTrue(
  33. len(list_volumes) == 1,
  34. "List volumes does not return the expected volume %s" %
  35. name)
  36. # check iteration
  37. iter_volumes = [vol for vol in self.provider.block_store.volumes
  38. if vol.name == name]
  39. self.assertTrue(
  40. len(iter_volumes) == 1,
  41. "Iter volumes does not return the expected volume %s" %
  42. name)
  43. # check find
  44. find_volumes = self.provider.block_store.volumes.find(name=name)
  45. self.assertTrue(
  46. len(find_volumes) == 1,
  47. "Find volumes does not return the expected volume %s" %
  48. name)
  49. get_vol = self.provider.block_store.volumes.get(
  50. test_vol.id)
  51. self.assertTrue(
  52. list_volumes[0] ==
  53. get_vol == test_vol,
  54. "Ids returned by list: {0} and get: {1} are not as "
  55. " expected: {2}" .format(list_volumes[0].id,
  56. get_vol.id,
  57. test_vol.id))
  58. self.assertTrue(
  59. list_volumes[0].name ==
  60. get_vol.name == test_vol.name,
  61. "Names returned by list: {0} and get: {1} are not as "
  62. " expected: {2}" .format(list_volumes[0].name,
  63. get_vol.name,
  64. test_vol.name))
  65. volumes = self.provider.block_store.volumes.list()
  66. found_volumes = [vol for vol in volumes if vol.name == name]
  67. self.assertTrue(
  68. len(found_volumes) == 0,
  69. "Volume %s should have been deleted but still exists." %
  70. name)
  71. def test_attach_detach_volume(self):
  72. """
  73. Create a new volume, and attempt to attach it to an instance
  74. """
  75. instance_name = "CBVolOps-{0}-{1}".format(
  76. self.provider.name,
  77. uuid.uuid4())
  78. test_instance = helpers.get_test_instance(self.provider, instance_name)
  79. with helpers.cleanup_action(lambda: test_instance.terminate()):
  80. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  81. test_vol = self.provider.block_store.volumes.create(
  82. name, 1, test_instance.placement_zone)
  83. with helpers.cleanup_action(lambda: test_vol.delete()):
  84. test_vol.wait_till_ready()
  85. test_vol.attach(test_instance, '/dev/sda2')
  86. test_vol.wait_for(
  87. [VolumeState.IN_USE],
  88. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  89. test_vol.detach()
  90. test_vol.wait_for(
  91. [VolumeState.AVAILABLE],
  92. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  93. def test_crud_snapshot(self):
  94. """
  95. Create a new volume, create a snapshot of the volume, and check
  96. whether list_snapshots properly detects the new snapshot.
  97. Delete everything afterwards.
  98. """
  99. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  100. test_vol = self.provider.block_store.volumes.create(
  101. name,
  102. 1,
  103. helpers.get_provider_test_data(self.provider, "placement"))
  104. with helpers.cleanup_action(lambda: test_vol.delete()):
  105. test_vol.wait_till_ready()
  106. snap_name = "CBSnapshot-{0}".format(name)
  107. test_snap = test_vol.create_snapshot(name=snap_name,
  108. description=snap_name)
  109. def cleanup_snap(snap):
  110. snap.delete()
  111. snap.wait_for(
  112. [SnapshotState.UNKNOWN],
  113. terminal_states=[SnapshotState.ERROR])
  114. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  115. test_snap.wait_till_ready()
  116. self.assertTrue(
  117. test_snap.id in repr(test_snap),
  118. "repr(obj) should contain the object id so that the object"
  119. " can be reconstructed, but does not.")
  120. snaps = self.provider.block_store.snapshots.list()
  121. list_snaps = [snap for snap in snaps
  122. if snap.name == snap_name]
  123. self.assertTrue(
  124. len(list_snaps) == 1,
  125. "List snapshots does not return the expected volume %s" %
  126. name)
  127. # check iteration
  128. iter_snaps = [
  129. snap for snap in self.provider.block_store.snapshots
  130. if snap.name == snap_name]
  131. self.assertTrue(
  132. len(iter_snaps) == 1,
  133. "Iter snapshots does not return the expected volume %s" %
  134. name)
  135. # check find
  136. find_snap = self.provider.block_store.snapshots.find(name=name)
  137. self.assertTrue(
  138. len(find_snap) == 1,
  139. "Find snaps does not return the expected snapshot %s" %
  140. name)
  141. get_snap = self.provider.block_store.snapshots.get(
  142. test_snap.id)
  143. self.assertTrue(
  144. list_snaps[0] ==
  145. get_snap == test_snap,
  146. "Ids returned by list: {0} and get: {1} are not as "
  147. " expected: {2}" .format(list_snaps[0].id,
  148. get_snap.id,
  149. test_snap.id))
  150. self.assertTrue(
  151. list_snaps[0].name ==
  152. get_snap.name == test_snap.name,
  153. "Names returned by list: {0} and get: {1} are not as "
  154. " expected: {2}" .format(list_snaps[0].name,
  155. get_snap.name,
  156. test_snap.name))
  157. snaps = self.provider.block_store.snapshots.list()
  158. found_snaps = [snap for snap in snaps
  159. if snap.name == snap_name]
  160. self.assertTrue(
  161. len(found_snaps) == 0,
  162. "Snapshot %s should have been deleted but still exists." %
  163. snap_name)