test_integration_azure_volume_service.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import uuid
  2. import azure_integration_test.helpers as helpers
  3. class AzureIntegrationVolumeServiceTestCase(helpers.ProviderTestBase):
  4. @helpers.skipIfNoService(['block_store'])
  5. def test_azure_volume_service(self):
  6. volume_name = '{0}'.format(uuid.uuid4())
  7. snapshot_name = '{0}'.format(uuid.uuid4())
  8. volume_list_before_create = self.provider.block_store.volumes.list()
  9. print(str(len(volume_list_before_create)))
  10. volume = self.provider.block_store.volumes.create(volume_name, 1)
  11. volume.wait_till_ready()
  12. self.assertTrue(volume is not None, 'Volume not created')
  13. volume_id = volume.id
  14. volume_list_after_create = self.provider.block_store.volumes.list()
  15. print(str(len(volume_list_after_create)))
  16. self.assertTrue(
  17. len(volume_list_after_create), len(volume_list_before_create) + 1)
  18. volume = self.provider.block_store.volumes.get(volume_id)
  19. print("Get Volume - " + str(volume))
  20. self.assertTrue(
  21. volume.name == volume_name, "Volume name should be MyVolume")
  22. volume_find = self.provider.block_store.volumes.find(volume_name)
  23. print("Find Volume - " + str(volume))
  24. self.assertEqual(
  25. len(volume_find), 1)
  26. # volume.attach('/subscriptions/7904d702-e01c-4826-8519-f5a25c866a96/resourceGroups/CloudBridge-Azure/providers/Microsoft.Compute/virtualMachines/ubuntu-intro2')
  27. # TODO: Add logic to verify that disk is attached to instance
  28. # volume.detach()
  29. # TODO: Add logic to verify that disk is not in use
  30. with self.assertRaises(NotImplementedError):
  31. volume.create_snapshot(snapshot_name)
  32. # self.assertTrue(snapshot is not None,
  33. # 'Snapshot {0} not created'.format(snapshot_name))
  34. volume.refresh()
  35. self.assertTrue(volume.id == volume_id,
  36. 'Volume id should match on refresh')
  37. volume_list_before_delete = self.provider.block_store.volumes.list()
  38. print(str(len(volume_list_before_delete)))
  39. volume.delete()
  40. volume_list_after_delete = self.provider.block_store.volumes.list()
  41. print(str(len(volume_list_after_delete)))
  42. self.assertTrue(len(volume_list_after_delete),
  43. len(volume_list_before_delete) - 1)