test_integration_azure_volume_service.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import os
  2. import tempfile
  3. import uuid
  4. import integration_test.helpers as helpers
  5. class AzureIntegrationVolumeServiceTestCase(helpers.ProviderTestBase):
  6. def __init__(self, methodName, provider):
  7. super(AzureIntegrationVolumeServiceTestCase, self).__init__(
  8. methodName=methodName, provider=provider)
  9. @helpers.skipIfNoService(['block_store'])
  10. def test_azure_volume_service(self):
  11. volume_name = '{0}'.format(uuid.uuid4())
  12. snapshot_name = '{0}'.format(uuid.uuid4())
  13. # volumes_count1 = len(self.provider.block_store.volumes.list())
  14. volume = self.provider.block_store.volumes.create(volume_name, 1)
  15. volume_id= volume.id
  16. self.assertTrue(volume is not None , 'Volume {0} not created'.format(volume_name))
  17. # volumes_count2 = len(self.provider.block_store.volumes.list())
  18. # self.assertTrue(volumes_count2 > volumes_count1, 'Volume {0} not present in list'.format(volume_name))
  19. # find_volume = self.provider.block_store.volumes.find(volume_name)
  20. # self.assertTrue(len(find_volume) == 1, 'Volume {0} not found'.format(volume_name))
  21. # get_volume = self.provider.block_store.volumes.get(volume.id)
  22. # self.assertTrue(get_volume is not None, 'Unable to get the volume {0}'.format(volume_name))
  23. volume.attach('/subscriptions/7904d702-e01c-4826-8519-f5a25c866a96/resourceGroups/CloudBridge-Azure/providers/Microsoft.Compute/virtualMachines/ubuntu-intro2')
  24. #TODO: Add logic to verify that disk is attached to instance
  25. volume.detach()
  26. #TODO: Add logic to verify that disk is not in use
  27. # snapshot = volume.create_snapshot(snapshot_name)
  28. # self.assertTrue(snapshot is not None, 'Snapshot {0} not created'.format(snapshot_name))
  29. volume.refresh()
  30. self.assertTrue(volume.id == volume_id, 'Volume id should match on refresh')
  31. volume.delete()
  32. # deleted_volume = self.provider.block_store.volumes.get(volume.id)
  33. # self.assertTrue(deleted_volume is None, 'Volume {0} not deleted'.format(volume_name))