test_provider_block_store_service.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import uuid
  2. from cloudbridge.providers.interfaces import SnapshotState
  3. from cloudbridge.providers.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class ProviderBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(ProviderBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def test_crud_volume(self):
  11. """
  12. Create a new volume, check whether the expected values are set,
  13. and delete it
  14. """
  15. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  16. test_vol = self.provider.block_store.volumes.create(
  17. name,
  18. 1,
  19. helpers.get_provider_test_data(self.provider, "placement"))
  20. with helpers.exception_action(lambda: test_vol.delete()):
  21. test_vol.wait_till_ready(interval=self.get_test_wait_interval())
  22. volumes = self.provider.block_store.volumes.list()
  23. found_volumes = [vol for vol in volumes if vol.name == name]
  24. self.assertTrue(
  25. len(found_volumes) == 1,
  26. "List volumes does not return the expected volume %s" %
  27. name)
  28. get_vol = self.provider.block_store.volumes.get(
  29. test_vol.volume_id)
  30. self.assertTrue(
  31. found_volumes[0].volume_id ==
  32. get_vol.volume_id == test_vol.volume_id,
  33. "Ids returned by list: {0} and get: {1} are not as "
  34. " expected: {2}" .format(found_volumes[0].volume_id,
  35. get_vol.volume_id,
  36. test_vol.volume_id))
  37. self.assertTrue(
  38. found_volumes[0].name ==
  39. get_vol.name == test_vol.name,
  40. "Names returned by list: {0} and get: {1} are not as "
  41. " expected: {2}" .format(found_volumes[0].name,
  42. get_vol.name,
  43. test_vol.name))
  44. test_vol.delete()
  45. test_vol.wait_for(
  46. [VolumeState.DELETED, VolumeState.UNKNOWN],
  47. terminal_states=[VolumeState.ERROR],
  48. interval=self.get_test_wait_interval())
  49. volumes = self.provider.block_store.volumes.list()
  50. found_volumes = [vol for vol in volumes if vol.name == name]
  51. self.assertTrue(
  52. len(found_volumes) == 0,
  53. "Volume %s should have been deleted but still exists." %
  54. name)
  55. def test_attach_detach_volume(self):
  56. """
  57. Create a new volume, and attempt to attach it to an instance
  58. """
  59. instance_name = "CBVolOps-{0}-{1}".format(
  60. self.provider.name,
  61. uuid.uuid4())
  62. test_instance = helpers.get_test_instance(self.provider, instance_name)
  63. with helpers.exception_action(lambda: test_instance.terminate()):
  64. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  65. test_vol = self.provider.block_store.volumes.create(
  66. name, 1, test_instance.placement_zone)
  67. with helpers.exception_action(lambda: test_vol.delete()):
  68. test_vol.wait_till_ready(
  69. interval=self.get_test_wait_interval())
  70. test_vol.attach(test_instance, '/dev/sda2')
  71. test_vol.wait_for(
  72. [VolumeState.IN_USE],
  73. terminal_states=[VolumeState.ERROR, VolumeState.DELETED],
  74. interval=self.get_test_wait_interval())
  75. test_vol.detach()
  76. test_vol.wait_for(
  77. [VolumeState.AVAILABLE],
  78. terminal_states=[VolumeState.ERROR, VolumeState.DELETED],
  79. interval=self.get_test_wait_interval())
  80. test_vol.delete()
  81. def test_crud_snapshot(self):
  82. """
  83. Create a new volume, create a snapshot of the volume, and check
  84. whether list_snapshots properly detects the new snapshot.
  85. Delete everything afterwards.
  86. """
  87. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  88. test_vol = self.provider.block_store.volumes.create(
  89. name,
  90. 1,
  91. helpers.get_provider_test_data(self.provider, "placement"))
  92. with helpers.exception_action(lambda: test_vol.delete()):
  93. test_vol.wait_till_ready(interval=self.get_test_wait_interval())
  94. snap_name = "CBSnapshot-{0}".format(name)
  95. test_snap = test_vol.create_snapshot(name=snap_name,
  96. description=snap_name)
  97. def cleanup_snap(snap):
  98. snap.delete()
  99. snap.wait_for(
  100. [SnapshotState.UNKNOWN],
  101. terminal_states=[SnapshotState.ERROR],
  102. interval=self.get_test_wait_interval())
  103. with helpers.exception_action(lambda: cleanup_snap(test_snap)):
  104. test_snap.wait_till_ready(
  105. interval=self.get_test_wait_interval())
  106. snaps = self.provider.block_store.snapshots.list()
  107. found_snaps = [snap for snap in snaps
  108. if snap.name == snap_name]
  109. self.assertTrue(
  110. len(found_snaps) == 1,
  111. "List snapshots does not return the expected volume %s" %
  112. name)
  113. get_snap = self.provider.block_store.snapshots.get(
  114. test_snap.snapshot_id)
  115. self.assertTrue(
  116. found_snaps[0].snapshot_id ==
  117. get_snap.snapshot_id == test_snap.snapshot_id,
  118. "Ids returned by list: {0} and get: {1} are not as "
  119. " expected: {2}" .format(found_snaps[0].snapshot_id,
  120. get_snap.snapshot_id,
  121. test_snap.snapshot_id))
  122. self.assertTrue(
  123. found_snaps[0].name ==
  124. get_snap.name == test_snap.name,
  125. "Names returned by list: {0} and get: {1} are not as "
  126. " expected: {2}" .format(found_snaps[0].name,
  127. get_snap.name,
  128. test_snap.name))
  129. cleanup_snap(test_snap)
  130. snaps = self.provider.block_store.snapshots.list()
  131. found_snaps = [snap for snap in snaps
  132. if snap.name == snap_name]
  133. self.assertTrue(
  134. len(found_snaps) == 0,
  135. "Snapshot %s should have been deleted but still exists." %
  136. snap_name)