test_provider_block_store_service.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import uuid
  2. from cloudbridge.providers.interfaces import SnapshotState
  3. from cloudbridge.providers.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class ProviderBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(ProviderBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def setUp(self):
  11. self.instance = helpers.get_test_instance(self.provider)
  12. def tearDown(self):
  13. self.instance.terminate()
  14. def test_crud_volume(self):
  15. """
  16. Create a new volume, check whether the expected values are set,
  17. and delete it
  18. """
  19. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  20. test_vol = self.provider.block_store.volumes.create_volume(
  21. name,
  22. 1,
  23. self.instance.placement_zone)
  24. with helpers.exception_action(lambda x: test_vol.delete()):
  25. test_vol.wait_till_ready()
  26. volumes = self.provider.block_store.volumes.list_volumes()
  27. found_volumes = [vol for vol in volumes if vol.name == name]
  28. self.assertTrue(
  29. len(found_volumes) == 1,
  30. "List volumes does not return the expected volume %s" %
  31. name)
  32. test_vol.delete()
  33. def test_attach_detach_volume(self):
  34. """
  35. Create a new volume, and attempt to attach it to an instance
  36. """
  37. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  38. test_vol = self.provider.block_store.volumes.create_volume(
  39. name, 1, self.instance.placement_zone)
  40. with helpers.exception_action(lambda x: test_vol.delete()):
  41. test_vol.wait_till_ready()
  42. test_vol.attach(self.instance, '/dev/sda2')
  43. test_vol.wait_for(
  44. [VolumeState.IN_USE], terminal_states=[VolumeState.ERROR,
  45. VolumeState.DELETED])
  46. test_vol.detach()
  47. test_vol.wait_for(
  48. [VolumeState.AVAILABLE], terminal_states=[VolumeState.ERROR,
  49. VolumeState.DELETED])
  50. test_vol.delete()
  51. def test_crud_snapshot(self):
  52. """
  53. Create a new volume, create a snapshot of the volume, and check
  54. whether list_snapshots properly detects the new snapshot.
  55. Delete everything afterwards.
  56. """
  57. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  58. test_vol = self.provider.block_store.volumes.create_volume(
  59. name,
  60. 1,
  61. self.instance.placement_zone)
  62. with helpers.exception_action(lambda x: test_vol.delete()):
  63. test_vol.wait_till_ready()
  64. snap_name = "CBSnapshot-{0}".format(name)
  65. test_snap = test_vol.create_snapshot(name=snap_name,
  66. description=snap_name)
  67. def cleanup_snap(snap):
  68. snap.delete()
  69. snap.wait_for(
  70. [SnapshotState.UNKNOWN],
  71. terminal_states=[SnapshotState.ERROR])
  72. with helpers.exception_action(lambda x: cleanup_snap(test_snap)):
  73. test_snap.wait_till_ready()
  74. snaps = self.provider.block_store.snapshots.list_snapshots()
  75. found_snaps = [snap for snap in snaps
  76. if snap.name == snap_name]
  77. self.assertTrue(
  78. len(found_snaps) == 1,
  79. "List snapshots does not return the expected volume %s" %
  80. name)
  81. cleanup_snap(test_snap)