test_integration_azure_object_store_service.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import os
  2. import tempfile
  3. import uuid
  4. import azure_integration_test.helpers as helpers
  5. class AzureIntegrationObjectStoreServiceTestCase(helpers.ProviderTestBase):
  6. @helpers.skipIfNoService(['object_store'])
  7. def test_azure_bucket_service(self):
  8. container_name = '{0}'.format(uuid.uuid4())
  9. object_name = '{0}'.format(uuid.uuid4())
  10. containers_count1 = len(self.provider.object_store.list())
  11. container = self.provider.object_store.create(container_name)
  12. self.assertTrue(container is not None,
  13. 'Container {0} not created'.format(container_name))
  14. containers_count2 = len(self.provider.object_store.list())
  15. self.assertTrue(containers_count2 > containers_count1,
  16. 'Container {0} not present in list'.
  17. format(container_name))
  18. find_container = self.provider.object_store.find(container_name)
  19. self.assertTrue(len(find_container) == 1,
  20. 'Container {0} not found'.format(container_name))
  21. get_container = self.provider.object_store.get(container.id)
  22. self.assertTrue(get_container is not None,
  23. 'Unable to get the container {0}'.
  24. format(container_name))
  25. obj = container.create_object(object_name)
  26. self.assertTrue(obj is not None,
  27. 'Object {0} not created'.format(container_name))
  28. obj_count = len(container.list())
  29. self.assertTrue(obj_count == 1, 'Object count should be 1')
  30. get_obj = container.get(object_name)
  31. self.assertTrue(get_obj is not None,
  32. 'Unable to get object {0} from container {1}.'.
  33. format(object_name, container_name))
  34. exits = container.exists(object_name)
  35. self.assertTrue(exits, 'Object {0} not exists in container {1}'.
  36. format(object_name, container_name))
  37. obj_content = 'abc'
  38. obj.upload(obj_content)
  39. content = obj.iter_content()
  40. self.assertTrue(content.getvalue().decode('utf-8') == obj_content,
  41. 'Object {0} content should be {1}'.
  42. format(object_name, obj_content))
  43. file_name = 'mytest.txt'
  44. file_content = 'defaults'
  45. tmp = os.path.join(tempfile.gettempdir(), file_name)
  46. try:
  47. if not os.path.exists(tmp):
  48. with open(tmp, "w") as file:
  49. file.write(file_content)
  50. obj.upload_from_file(tmp)
  51. content = obj.iter_content()
  52. self.assertTrue(content.getvalue().decode('utf-8') == file_content,
  53. 'Object {0} content should be {1}'.
  54. format(object_name, file_content))
  55. finally:
  56. print('Deleting file')
  57. os.remove(tmp)
  58. url = obj.generate_url()
  59. self.assertTrue(url is not None, 'Url should not be None')
  60. obj.delete()
  61. delete_obj = container.get(object_name)
  62. self.assertTrue(delete_obj is None,
  63. 'Object {0} not deleted from container {1}'.
  64. format(object_name, container_name))
  65. container.delete()
  66. deleted_container = self.provider.object_store.get(container.id)
  67. self.assertTrue(deleted_container is None, 'Container {0} not deleted'.
  68. format(container_name))