test_provider_block_store_service.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import uuid
  2. from cloudbridge.providers.interfaces import SnapshotState
  3. from cloudbridge.providers.interfaces import VolumeState
  4. from test.helpers import ProviderTestBase
  5. import test.helpers as helpers
  6. class ProviderBlockStoreServiceTestCase(ProviderTestBase):
  7. def __init__(self, methodName, provider):
  8. super(ProviderBlockStoreServiceTestCase, self).__init__(
  9. methodName=methodName, provider=provider)
  10. def test_crud_volume(self):
  11. """
  12. Create a new volume, check whether the expected values are set,
  13. and delete it
  14. """
  15. name = "CBUnitTestCreateVol-{0}".format(uuid.uuid4())
  16. test_vol = self.provider.block_store.volumes.create_volume(
  17. name,
  18. 1,
  19. helpers.get_provider_test_data(self.provider, "placement"))
  20. with helpers.exception_action(lambda: test_vol.delete()):
  21. test_vol.wait_till_ready()
  22. volumes = self.provider.block_store.volumes.list_volumes()
  23. found_volumes = [vol for vol in volumes if vol.name == name]
  24. self.assertTrue(
  25. len(found_volumes) == 1,
  26. "List volumes does not return the expected volume %s" %
  27. name)
  28. test_vol.delete()
  29. test_vol.wait_for(
  30. [VolumeState.DELETED, VolumeState.UNKNOWN],
  31. terminal_states=[VolumeState.ERROR])
  32. volumes = self.provider.block_store.volumes.list_volumes()
  33. found_volumes = [vol for vol in volumes if vol.name == name]
  34. self.assertTrue(
  35. len(found_volumes) == 0,
  36. "Volume %s should have been deleted but still exists." %
  37. name)
  38. def test_attach_detach_volume(self):
  39. """
  40. Create a new volume, and attempt to attach it to an instance
  41. """
  42. instance_name = "CBVolOps-{0}-{1}".format(
  43. self.provider.name,
  44. uuid.uuid4())
  45. test_instance = helpers.get_test_instance(self.provider, instance_name)
  46. with helpers.exception_action(lambda: test_instance.terminate()):
  47. name = "CBUnitTestAttachVol-{0}".format(uuid.uuid4())
  48. test_vol = self.provider.block_store.volumes.create_volume(
  49. name, 1, test_instance.placement_zone)
  50. with helpers.exception_action(lambda: test_vol.delete()):
  51. test_vol.wait_till_ready()
  52. test_vol.attach(test_instance, '/dev/sda2')
  53. test_vol.wait_for(
  54. [VolumeState.IN_USE],
  55. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  56. test_vol.detach()
  57. test_vol.wait_for(
  58. [VolumeState.AVAILABLE],
  59. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  60. test_vol.delete()
  61. def test_crud_snapshot(self):
  62. """
  63. Create a new volume, create a snapshot of the volume, and check
  64. whether list_snapshots properly detects the new snapshot.
  65. Delete everything afterwards.
  66. """
  67. name = "CBUnitTestCreateSnap-{0}".format(uuid.uuid4())
  68. test_vol = self.provider.block_store.volumes.create_volume(
  69. name,
  70. 1,
  71. helpers.get_provider_test_data(self.provider, "placement"))
  72. with helpers.exception_action(lambda: test_vol.delete()):
  73. test_vol.wait_till_ready()
  74. snap_name = "CBSnapshot-{0}".format(name)
  75. test_snap = test_vol.create_snapshot(name=snap_name,
  76. description=snap_name)
  77. def cleanup_snap(snap):
  78. snap.delete()
  79. snap.wait_for(
  80. [SnapshotState.UNKNOWN],
  81. terminal_states=[SnapshotState.ERROR])
  82. with helpers.exception_action(lambda: cleanup_snap(test_snap)):
  83. test_snap.wait_till_ready()
  84. snaps = self.provider.block_store.snapshots.list_snapshots()
  85. found_snaps = [snap for snap in snaps
  86. if snap.name == snap_name]
  87. self.assertTrue(
  88. len(found_snaps) == 1,
  89. "List snapshots does not return the expected volume %s" %
  90. name)
  91. cleanup_snap(test_snap)
  92. snaps = self.provider.block_store.snapshots.list_snapshots()
  93. found_snaps = [snap for snap in snaps
  94. if snap.name == snap_name]
  95. self.assertTrue(
  96. len(found_snaps) == 0,
  97. "Snapshot %s should have been deleted but still exists." %
  98. snap_name)