test_integration_azure_volume_service.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import os
  2. import tempfile
  3. import uuid
  4. import integration_test.helpers as helpers
  5. class AzureIntegrationVolumeServiceTestCase(helpers.ProviderTestBase):
  6. def __init__(self, methodName, provider):
  7. super(AzureIntegrationVolumeServiceTestCase, self).__init__(
  8. methodName=methodName, provider=provider)
  9. @helpers.skipIfNoService(['block_store'])
  10. def test_azure_volume_service(self):
  11. volume_name = '{0}'.format(uuid.uuid4())
  12. snapshot_name = '{0}'.format(uuid.uuid4())
  13. volume_list_before_create = self.provider.block_store.volumes.list()
  14. print(str(len(volume_list_before_create)))
  15. volume = self.provider.block_store.volumes.create(volume_name, 1)
  16. volume_id= volume.id
  17. volume_list_after_create = self.provider.block_store.volumes.list()
  18. print(str(len(volume_list_after_create)))
  19. self.assertTrue(len(volume_list_after_create), len(volume_list_before_create) + 1)
  20. volume = self.provider.block_store.volumes.get(volume_name)
  21. print("Get Volume - " + str(volume))
  22. self.assertTrue(
  23. volume.name == volume_name, "Volume name should be MyVolume")
  24. volume.attach('/subscriptions/7904d702-e01c-4826-8519-f5a25c866a96/resourceGroups/CloudBridge-Azure/providers/Microsoft.Compute/virtualMachines/ubuntu-intro2')
  25. #TODO: Add logic to verify that disk is attached to instance
  26. volume.detach()
  27. #TODO: Add logic to verify that disk is not in use
  28. # snapshot = volume.create_snapshot(snapshot_name)
  29. # self.assertTrue(snapshot is not None, 'Snapshot {0} not created'.format(snapshot_name))
  30. volume.refresh()
  31. self.assertTrue(volume.id == volume_id, 'Volume id should match on refresh')
  32. volume_list_before_delete = self.provider.block_store.volumes.list()
  33. print(str(len(volume_list_before_delete)))
  34. volume.delete()
  35. volume_list_after_delete = self.provider.block_store.volumes.list()
  36. print(str(len(volume_list_after_delete)))
  37. self.assertTrue(len(volume_list_after_delete), len(volume_list_before_delete) - 1)