2
0

test_integration_azure_object_store_service.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import os
  2. import tempfile
  3. import uuid
  4. import integration_test.helpers as helpers
  5. class AzureIntegrationObjectStoreServiceTestCase(helpers.ProviderTestBase):
  6. def __init__(self, methodName, provider):
  7. super(AzureIntegrationObjectStoreServiceTestCase, self).__init__(
  8. methodName=methodName, provider=provider)
  9. @helpers.skipIfNoService(['object_store'])
  10. def test_azure_bucket_service(self):
  11. container_name = '{0}'.format(uuid.uuid4())
  12. object_name = '{0}'.format(uuid.uuid4())
  13. containers_count1 = len(self.provider.object_store.list())
  14. container = self.provider.object_store.create(container_name)
  15. self.assertTrue(container is not None , 'Container {0} not created'.format(container_name))
  16. containers_count2 = len(self.provider.object_store.list())
  17. self.assertTrue(containers_count2 > containers_count1, 'Container {0} not present in list'.format(container_name))
  18. find_container = self.provider.object_store.find(container_name)
  19. self.assertTrue(len(find_container) == 1, 'Container {0} not found'.format(container_name))
  20. get_container = self.provider.object_store.get(container.id)
  21. self.assertTrue(get_container is not None, 'Unable to get the container {0}'.format(container_name))
  22. obj = container.create_object(object_name)
  23. self.assertTrue(obj is not None, 'Object {0} not created'.format(container_name))
  24. obj_count = len(container.list())
  25. self.assertTrue(obj_count == 1, 'Object count should be 1')
  26. get_obj = container.get(object_name)
  27. self.assertTrue(get_obj is not None, 'Unable to get object {0} from container {1}.'.format(object_name, container_name))
  28. exits = container.exists(object_name)
  29. self.assertTrue(exits, 'Object {0} not exists in container {1}'.format(object_name, container_name))
  30. obj_content = 'abc'
  31. obj.upload(obj_content)
  32. content = obj.iter_content()
  33. self.assertTrue(content.getvalue().decode('utf-8') == obj_content,
  34. 'Object {0} content should be {1}'.format(object_name, obj_content))
  35. file_name = 'mytest.txt'
  36. file_content = 'defaults'
  37. tmp = os.path.join(tempfile.gettempdir(), file_name)
  38. try:
  39. if not os.path.exists(tmp):
  40. with open(tmp, "w") as file:
  41. file.write(file_content)
  42. obj.upload_from_file(tmp)
  43. content = obj.iter_content()
  44. self.assertTrue(content.getvalue().decode('utf-8') == file_content,
  45. 'Object {0} content should be {1}'.format(object_name, file_content))
  46. finally:
  47. print('Deleting file')
  48. os.remove(tmp)
  49. url = obj.generate_url()
  50. self.assertTrue(url is not None, 'Url should not be None')
  51. obj.delete()
  52. delete_obj = container.get(object_name)
  53. self.assertTrue(delete_obj is None, 'Object {0} not deleted from container {1}'.format(object_name,container_name))
  54. container.delete()
  55. deleted_container = self.provider.object_store.get(container.id)
  56. self.assertTrue(deleted_container is None, 'Container {0} not deleted'.format(container_name))