test_provider_block_store_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import uuid
  2. from cloudbridge.providers.interfaces import SnapshotState
  3. from cloudbridge.providers.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class ProviderBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(ProviderBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def test_crud_volume(self):
  11. """
  12. Create a new volume, check whether the expected values are set,
  13. and delete it
  14. """
  15. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  16. test_vol = self.provider.block_store.volumes.create_volume(
  17. name,
  18. 1,
  19. helpers.get_provider_test_data(self.provider, "placement"))
  20. with helpers.exception_action(lambda: test_vol.delete()):
  21. test_vol.wait_till_ready(interval=helpers.TEST_WAIT_INTERVAL)
  22. volumes = self.provider.block_store.volumes.list_volumes()
  23. found_volumes = [vol for vol in volumes if vol.name == name]
  24. self.assertTrue(
  25. len(found_volumes) == 1,
  26. "List volumes does not return the expected volume %s" %
  27. name)
  28. test_vol.delete()
  29. test_vol.wait_for(
  30. [VolumeState.DELETED, VolumeState.UNKNOWN],
  31. terminal_states=[VolumeState.ERROR],
  32. interval=helpers.TEST_WAIT_INTERVAL)
  33. volumes = self.provider.block_store.volumes.list_volumes()
  34. found_volumes = [vol for vol in volumes if vol.name == name]
  35. self.assertTrue(
  36. len(found_volumes) == 0,
  37. "Volume %s should have been deleted but still exists." %
  38. name)
  39. def test_attach_detach_volume(self):
  40. """
  41. Create a new volume, and attempt to attach it to an instance
  42. """
  43. instance_name = "CBVolOps-{0}-{1}".format(
  44. self.provider.name,
  45. uuid.uuid4())
  46. test_instance = helpers.get_test_instance(self.provider, instance_name)
  47. with helpers.exception_action(lambda: test_instance.terminate()):
  48. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  49. test_vol = self.provider.block_store.volumes.create_volume(
  50. name, 1, test_instance.placement_zone)
  51. with helpers.exception_action(lambda: test_vol.delete()):
  52. test_vol.wait_till_ready(interval=helpers.TEST_WAIT_INTERVAL)
  53. test_vol.attach(test_instance, '/dev/sda2')
  54. test_vol.wait_for(
  55. [VolumeState.IN_USE],
  56. terminal_states=[VolumeState.ERROR, VolumeState.DELETED],
  57. interval=helpers.TEST_WAIT_INTERVAL)
  58. test_vol.detach()
  59. test_vol.wait_for(
  60. [VolumeState.AVAILABLE],
  61. terminal_states=[VolumeState.ERROR, VolumeState.DELETED],
  62. interval=helpers.TEST_WAIT_INTERVAL)
  63. test_vol.delete()
  64. def test_crud_snapshot(self):
  65. """
  66. Create a new volume, create a snapshot of the volume, and check
  67. whether list_snapshots properly detects the new snapshot.
  68. Delete everything afterwards.
  69. """
  70. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  71. test_vol = self.provider.block_store.volumes.create_volume(
  72. name,
  73. 1,
  74. helpers.get_provider_test_data(self.provider, "placement"))
  75. with helpers.exception_action(lambda: test_vol.delete()):
  76. test_vol.wait_till_ready(interval=helpers.TEST_WAIT_INTERVAL)
  77. snap_name = "CBSnapshot-{0}".format(name)
  78. test_snap = test_vol.create_snapshot(name=snap_name,
  79. description=snap_name)
  80. def cleanup_snap(snap):
  81. snap.delete()
  82. snap.wait_for(
  83. [SnapshotState.UNKNOWN],
  84. terminal_states=[SnapshotState.ERROR],
  85. interval=helpers.TEST_WAIT_INTERVAL)
  86. with helpers.exception_action(lambda: cleanup_snap(test_snap)):
  87. test_snap.wait_till_ready(interval=helpers.TEST_WAIT_INTERVAL)
  88. snaps = self.provider.block_store.snapshots.list_snapshots()
  89. found_snaps = [snap for snap in snaps
  90. if snap.name == snap_name]
  91. self.assertTrue(
  92. len(found_snaps) == 1,
  93. "List snapshots does not return the expected volume %s" %
  94. name)
  95. cleanup_snap(test_snap)
  96. snaps = self.provider.block_store.snapshots.list_snapshots()
  97. found_snaps = [snap for snap in snaps
  98. if snap.name == snap_name]
  99. self.assertTrue(
  100. len(found_snaps) == 0,
  101. "Snapshot %s should have been deleted but still exists." %
  102. snap_name)