| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- import uuid
- from cloudbridge.cloud.interfaces import VolumeState
- from cloudbridge.cloud.interfaces.exceptions import WaitStateException
- from test.helpers import ProviderTestBase
- import test.helpers as helpers
- class CloudObjectLifeCycleTestCase(ProviderTestBase):
- def __init__(self, methodName, provider):
- super(CloudObjectLifeCycleTestCase, self).__init__(
- methodName=methodName, provider=provider)
- def test_object_life_cycle(self):
- """
- Test object life cycle methods by using a volume.
- """
- name = "CBUnitTestLifeCycle-{0}".format(uuid.uuid4())
- test_vol = self.provider.block_store.volumes.create(
- name,
- 1,
- helpers.get_provider_test_data(self.provider, "placement"))
- # Waiting for an invalid timeout should raise an exception
- with self.assertRaises(AssertionError):
- test_vol.wait_for([VolumeState.ERROR], timeout=-1, interval=1)
- with self.assertRaises(AssertionError):
- test_vol.wait_for([VolumeState.ERROR], timeout=1, interval=-1)
- # If interval < timeout, an exception should be raised
- with self.assertRaises(AssertionError):
- test_vol.wait_for([VolumeState.ERROR], timeout=10, interval=20)
- with helpers.cleanup_action(lambda: test_vol.delete()):
- test_vol.wait_till_ready()
- # Hitting a terminal state should raise an exception
- with self.assertRaises(WaitStateException):
- test_vol.wait_for([VolumeState.ERROR],
- terminal_states=[VolumeState.AVAILABLE])
- # Hitting the timeout should raise an exception
- with self.assertRaises(WaitStateException):
- test_vol.wait_for([VolumeState.ERROR], timeout=0, interval=0)
|