test_block_store_service.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import uuid
  2. from cloudbridge.cloud.interfaces import SnapshotState
  3. from cloudbridge.cloud.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class CloudBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(CloudBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def test_crud_volume(self):
  11. """
  12. Create a new volume, check whether the expected values are set,
  13. and delete it
  14. """
  15. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  16. test_vol = self.provider.block_store.volumes.create(
  17. name,
  18. 1,
  19. helpers.get_provider_test_data(self.provider, "placement"))
  20. def cleanup_vol(vol):
  21. vol.delete()
  22. vol.wait_for(
  23. [VolumeState.DELETED, VolumeState.UNKNOWN],
  24. terminal_states=[VolumeState.ERROR],
  25. interval=self.get_test_wait_interval())
  26. with helpers.cleanup_action(lambda: cleanup_vol(test_vol)):
  27. test_vol.wait_till_ready(interval=self.get_test_wait_interval())
  28. self.assertTrue(
  29. test_vol.id in repr(test_vol),
  30. "repr(obj) should contain the object id so that the object"
  31. " can be reconstructed, but does not. eval(repr(obj)) == obj")
  32. volumes = self.provider.block_store.volumes.list()
  33. found_volumes = [vol for vol in volumes if vol.name == name]
  34. self.assertTrue(
  35. len(found_volumes) == 1,
  36. "List volumes does not return the expected volume %s" %
  37. name)
  38. # check iteration
  39. found_volumes = [vol for vol in self.provider.block_store.volumes
  40. if vol.name == name]
  41. self.assertTrue(
  42. len(found_volumes) == 1,
  43. "Iter volumes does not return the expected volume %s" %
  44. name)
  45. get_vol = self.provider.block_store.volumes.get(
  46. test_vol.id)
  47. self.assertTrue(
  48. found_volumes[0] ==
  49. get_vol == test_vol,
  50. "Ids returned by list: {0} and get: {1} are not as "
  51. " expected: {2}" .format(found_volumes[0].id,
  52. get_vol.id,
  53. test_vol.id))
  54. self.assertTrue(
  55. found_volumes[0].name ==
  56. get_vol.name == test_vol.name,
  57. "Names returned by list: {0} and get: {1} are not as "
  58. " expected: {2}" .format(found_volumes[0].name,
  59. get_vol.name,
  60. test_vol.name))
  61. volumes = self.provider.block_store.volumes.list()
  62. found_volumes = [vol for vol in volumes if vol.name == name]
  63. self.assertTrue(
  64. len(found_volumes) == 0,
  65. "Volume %s should have been deleted but still exists." %
  66. name)
  67. def test_attach_detach_volume(self):
  68. """
  69. Create a new volume, and attempt to attach it to an instance
  70. """
  71. instance_name = "CBVolOps-{0}-{1}".format(
  72. self.provider.name,
  73. uuid.uuid4())
  74. test_instance = helpers.get_test_instance(self.provider, instance_name)
  75. with helpers.cleanup_action(lambda: test_instance.terminate()):
  76. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  77. test_vol = self.provider.block_store.volumes.create(
  78. name, 1, test_instance.placement_zone)
  79. with helpers.cleanup_action(lambda: test_vol.delete()):
  80. test_vol.wait_till_ready(
  81. interval=self.get_test_wait_interval())
  82. test_vol.attach(test_instance, '/dev/sda2')
  83. test_vol.wait_for(
  84. [VolumeState.IN_USE],
  85. terminal_states=[VolumeState.ERROR, VolumeState.DELETED],
  86. interval=self.get_test_wait_interval())
  87. test_vol.detach()
  88. test_vol.wait_for(
  89. [VolumeState.AVAILABLE],
  90. terminal_states=[VolumeState.ERROR, VolumeState.DELETED],
  91. interval=self.get_test_wait_interval())
  92. def test_crud_snapshot(self):
  93. """
  94. Create a new volume, create a snapshot of the volume, and check
  95. whether list_snapshots properly detects the new snapshot.
  96. Delete everything afterwards.
  97. """
  98. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  99. test_vol = self.provider.block_store.volumes.create(
  100. name,
  101. 1,
  102. helpers.get_provider_test_data(self.provider, "placement"))
  103. with helpers.cleanup_action(lambda: test_vol.delete()):
  104. test_vol.wait_till_ready(interval=self.get_test_wait_interval())
  105. snap_name = "CBSnapshot-{0}".format(name)
  106. test_snap = test_vol.create_snapshot(name=snap_name,
  107. description=snap_name)
  108. def cleanup_snap(snap):
  109. snap.delete()
  110. snap.wait_for(
  111. [SnapshotState.UNKNOWN],
  112. terminal_states=[SnapshotState.ERROR],
  113. interval=self.get_test_wait_interval())
  114. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  115. test_snap.wait_till_ready(
  116. interval=self.get_test_wait_interval())
  117. self.assertTrue(
  118. test_snap.id in repr(test_snap),
  119. "repr(obj) should contain the object id so that the object"
  120. " can be reconstructed, but does not.")
  121. snaps = self.provider.block_store.snapshots.list()
  122. found_snaps = [snap for snap in snaps
  123. if snap.name == snap_name]
  124. self.assertTrue(
  125. len(found_snaps) == 1,
  126. "List snapshots does not return the expected volume %s" %
  127. name)
  128. # check iteration
  129. found_snaps = [
  130. snap for snap in self.provider.block_store.snapshots
  131. if snap.name == snap_name]
  132. self.assertTrue(
  133. len(found_snaps) == 1,
  134. "Iter snapshots does not return the expected volume %s" %
  135. name)
  136. get_snap = self.provider.block_store.snapshots.get(
  137. test_snap.id)
  138. self.assertTrue(
  139. found_snaps[0] ==
  140. get_snap == test_snap,
  141. "Ids returned by list: {0} and get: {1} are not as "
  142. " expected: {2}" .format(found_snaps[0].id,
  143. get_snap.id,
  144. test_snap.id))
  145. self.assertTrue(
  146. found_snaps[0].name ==
  147. get_snap.name == test_snap.name,
  148. "Names returned by list: {0} and get: {1} are not as "
  149. " expected: {2}" .format(found_snaps[0].name,
  150. get_snap.name,
  151. test_snap.name))
  152. snaps = self.provider.block_store.snapshots.list()
  153. found_snaps = [snap for snap in snaps
  154. if snap.name == snap_name]
  155. self.assertTrue(
  156. len(found_snaps) == 0,
  157. "Snapshot %s should have been deleted but still exists." %
  158. snap_name)