test_azure_object_store_service.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import azure_test.helpers as helpers
  2. from azure_test.helpers import ProviderTestBase
  3. class AzureObjectStoreServiceTestCase(ProviderTestBase):
  4. @helpers.skipIfNoService(['object_store'])
  5. def test_azure_bucket_create(self):
  6. container = self.provider.object_store.create("container3")
  7. print("Create - " + str(container))
  8. self.assertEqual(
  9. str(container), "<CB-AzureBucket: container3>")
  10. @helpers.skipIfNoService(['object_store'])
  11. def test_azure_bucket_list(self):
  12. containerList = self.provider.object_store.list()
  13. print("List Container - " + str(containerList))
  14. self.assertEqual(
  15. len(containerList), 2)
  16. @helpers.skipIfNoService(['object_store'])
  17. def test_azure_bucket_find_Exist(self):
  18. container = self.provider.object_store.find("container")
  19. print("Find Exist - " + str(container))
  20. self.assertEqual(
  21. len(container), 2)
  22. @helpers.skipIfNoService(['object_store'])
  23. def test_azure_bucket_find_NotExist(self):
  24. # For testing the case when container does not exist
  25. container = self.provider.object_store.find("container23")
  26. print("Find Not Exist - " + str(container))
  27. self.assertEqual(
  28. len(container), 0)
  29. @helpers.skipIfNoService(['object_store'])
  30. def test_azure_bucket_get_Exist(self):
  31. container = self.provider.object_store.get("container2")
  32. print("Get Exist - " + str(container))
  33. self.assertTrue(
  34. str(container) == "<CB-AzureBucket: container2>",
  35. "Object find returned value should be container3")
  36. @helpers.skipIfNoService(['object_store'])
  37. def test_azure_bucket_get_NotExist(self):
  38. container = self.provider.object_store.get("container23")
  39. print("Get Not Exist - " + str(container))
  40. self.assertEqual(
  41. str(container), 'None')
  42. @helpers.skipIfNoService(['object_store'])
  43. def test_azure_bucket_delete(self):
  44. containers = self.provider.object_store.find("container1")
  45. cont = containers[0]
  46. contDel = cont.delete()
  47. print("Bucket delete - " + str(contDel))
  48. self.assertEqual(
  49. contDel, True)
  50. @helpers.skipIfNoService(['object_store'])
  51. def test_azure_bucket_create_object(self):
  52. containers = self.provider.object_store.find("container1")
  53. cont = containers[0]
  54. contDel = cont.create_object("block1")
  55. print("Create object - " + str(contDel))
  56. self.assertEqual(
  57. str(contDel), '<CB-AzureBucketObject: block1>')
  58. @helpers.skipIfNoService(['object_store'])
  59. def test_azure_bucket_object_exists__internalE(self):
  60. containers = self.provider.object_store.find("container2")
  61. cont = containers[0]
  62. contDel = cont.exists("block2")
  63. print("List object - " + str(contDel))
  64. self.assertEqual(
  65. str(contDel), 'True')
  66. @helpers.skipIfNoService(['object_store'])
  67. def test_azure_bucket_object_exists__internalNE(self):
  68. containers = self.provider.object_store.find("container2")
  69. cont = containers[0]
  70. contDel = cont.exists("blob3")
  71. print("List object - " + str(contDel))
  72. self.assertEqual(
  73. str(contDel), 'False')
  74. @helpers.skipIfNoService(['object_store'])
  75. def test_azure_bucket_object_list(self):
  76. containers = self.provider.object_store.find("container2")
  77. cont = containers[0]
  78. contDel = cont.list()
  79. print("List object - " + str(contDel))
  80. self.assertEqual(
  81. len(contDel), 2)
  82. @helpers.skipIfNoService(['object_store'])
  83. def test_azure_bucket_object_get(self):
  84. containers = self.provider.object_store.find("container2")
  85. cont = containers[0]
  86. contDel = cont.get("block2")
  87. print("List object - " + str(contDel))
  88. self.assertEqual(
  89. str(contDel), "<CB-AzureBucketObject: block2>")
  90. @helpers.skipIfNoService(['object_store'])
  91. def test_azure_bucket_object_iter_content(self):
  92. containers = self.provider.object_store.find("container2")
  93. cont = containers[0]
  94. blocks = cont.list()
  95. block = blocks[0]
  96. content = block.iter_content()
  97. print("Iter content - " + str(content))
  98. self.assertEqual(
  99. content.getvalue(), b'blob2Content')
  100. @helpers.skipIfNoService(['object_store'])
  101. def test_azure_bucket_object_iter_content_ifBlobNotExists(self):
  102. containers = self.provider.object_store.find("container2")
  103. cont = containers[0]
  104. block = cont.get('block3')
  105. content = block.iter_content()
  106. self.assertEqual(
  107. content, None, 'content should be None')
  108. @helpers.skipIfNoService(['object_store'])
  109. def test_azure_bucket_object_upload(self):
  110. containers = self.provider.object_store.find("container2")
  111. cont = containers[0]
  112. blocks = cont.list()
  113. block = blocks[0]
  114. block.upload('blob1Content')
  115. self.assertEqual(
  116. block.iter_content().getvalue(), b'blob1Content')
  117. @helpers.skipIfNoService(['object_store'])
  118. def test_azure_bucket_object_delete(self):
  119. containers = self.provider.object_store.find("container2")
  120. cont = containers[0]
  121. blocks = cont.list()
  122. block = blocks[0]
  123. block.delete()
  124. self.assertEqual(
  125. len(cont.list()), 2)
  126. @helpers.skipIfNoService(['object_store'])
  127. def test_azure_bucket_object_upload_from_file(self):
  128. containers = self.provider.object_store.find("container2")
  129. cont = containers[0]
  130. blocks = cont.list()
  131. block = blocks[0]
  132. block.upload_from_file('blob2Content')
  133. self.assertEqual(
  134. block.iter_content().getvalue(), b'blob2Content')
  135. @helpers.skipIfNoService(['object_store'])
  136. def test_azure_bucket_object_generate_url(self):
  137. containers = self.provider.object_store.find("container2")
  138. cont = containers[0]
  139. blocks = cont.list()
  140. block = blocks[0]
  141. url = block.generate_url()
  142. print(str(url))
  143. self.assertEqual(
  144. str(url), 'https://cloudbridgeazure.blob.core.windows.net/vhds/block1')