test_azure_object_store_service.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import azure_test.helpers as helpers
  2. from azure_test.helpers import ProviderTestBase
  3. class AzureObjectStoreServiceTestCase(ProviderTestBase):
  4. @helpers.skipIfNoService(['object_store'])
  5. def test_azure_bucket_create(self):
  6. container = self.provider.object_store.create("container3")
  7. print("Create - " + str(container))
  8. self.assertEqual(
  9. str(container), "<CB-AzureBucket: container3>")
  10. self.assertIsNotNone(container.id)
  11. @helpers.skipIfNoService(['object_store'])
  12. def test_azure_bucket_list(self):
  13. containerList = self.provider.object_store.list()
  14. print("List Container - " + str(containerList))
  15. self.assertEqual(
  16. len(containerList), 2)
  17. @helpers.skipIfNoService(['object_store'])
  18. def test_azure_bucket_find_Exist(self):
  19. container = self.provider.object_store.find("container")
  20. print("Find Exist - " + str(container))
  21. self.assertEqual(
  22. len(container), 2)
  23. @helpers.skipIfNoService(['object_store'])
  24. def test_azure_bucket_find_NotExist(self):
  25. # For testing the case when container does not exist
  26. container = self.provider.object_store.find("container23")
  27. print("Find Not Exist - " + str(container))
  28. self.assertEqual(
  29. len(container), 0)
  30. @helpers.skipIfNoService(['object_store'])
  31. def test_azure_bucket_get_Exist(self):
  32. container = self.provider.object_store.get("container2")
  33. print("Get Exist - " + str(container))
  34. self.assertTrue(
  35. str(container) == "<CB-AzureBucket: container2>",
  36. "Object find returned value should be container3")
  37. @helpers.skipIfNoService(['object_store'])
  38. def test_azure_bucket_get_NotExist(self):
  39. container = self.provider.object_store.get("container23")
  40. print("Get Not Exist - " + str(container))
  41. self.assertEqual(
  42. str(container), 'None')
  43. @helpers.skipIfNoService(['object_store'])
  44. def test_azure_bucket_delete(self):
  45. containers = self.provider.object_store.find("container1")
  46. cont = containers[0]
  47. contDel = cont.delete()
  48. print("Bucket delete - " + str(contDel))
  49. self.assertEqual(
  50. contDel, True)
  51. contDel = cont.delete()
  52. self.assertEqual(
  53. contDel, False)
  54. @helpers.skipIfNoService(['object_store'])
  55. def test_azure_bucket_create_object(self):
  56. containers = self.provider.object_store.find("container1")
  57. cont = containers[0]
  58. obj = cont.create_object("block1")
  59. self.assertIsNotNone(obj.id)
  60. self.assertIsNotNone(obj.size)
  61. self.assertIsNotNone(obj.last_modified)
  62. print("Create object - " + str(obj))
  63. self.assertEqual(
  64. str(obj), '<CB-AzureBucketObject: block1>')
  65. @helpers.skipIfNoService(['object_store'])
  66. def test_azure_bucket_object_exists__internalE(self):
  67. containers = self.provider.object_store.find("container2")
  68. cont = containers[0]
  69. contDel = cont.exists("block2")
  70. print("List object - " + str(contDel))
  71. self.assertEqual(
  72. str(contDel), 'True')
  73. @helpers.skipIfNoService(['object_store'])
  74. def test_azure_bucket_object_exists__internalNE(self):
  75. containers = self.provider.object_store.find("container2")
  76. cont = containers[0]
  77. contDel = cont.exists("blob3")
  78. print("List object - " + str(contDel))
  79. self.assertEqual(
  80. str(contDel), 'False')
  81. @helpers.skipIfNoService(['object_store'])
  82. def test_azure_bucket_object_list(self):
  83. containers = self.provider.object_store.find("container2")
  84. cont = containers[0]
  85. contDel = cont.list()
  86. print("List object - " + str(contDel))
  87. self.assertEqual(
  88. len(contDel), 3)
  89. @helpers.skipIfNoService(['object_store'])
  90. def test_azure_bucket_object_get(self):
  91. containers = self.provider.object_store.find("container2")
  92. cont = containers[0]
  93. contDel = cont.get("block2")
  94. print("List object - " + str(contDel))
  95. self.assertEqual(
  96. str(contDel), "<CB-AzureBucketObject: block2>")
  97. @helpers.skipIfNoService(['object_store'])
  98. def test_azure_bucket_object_iter_content(self):
  99. containers = self.provider.object_store.find("container2")
  100. cont = containers[0]
  101. blocks = cont.list()
  102. block = blocks[0]
  103. content = block.iter_content()
  104. print("Iter content - " + str(content))
  105. self.assertEqual(
  106. content.getvalue(), b'blob2Content')
  107. @helpers.skipIfNoService(['object_store'])
  108. def test_azure_bucket_object_iter_content_ifBlobNotExists(self):
  109. containers = self.provider.object_store.find("container2")
  110. cont = containers[0]
  111. block = cont.get('block3')
  112. content = block.iter_content()
  113. self.assertEqual(
  114. content, None, 'content should be None')
  115. @helpers.skipIfNoService(['object_store'])
  116. def test_azure_bucket_object_upload(self):
  117. containers = self.provider.object_store.find("container2")
  118. cont = containers[0]
  119. blocks = cont.list()
  120. block = blocks[0]
  121. block.upload('blob1Content')
  122. self.assertEqual(
  123. block.iter_content().getvalue(), b'blob1Content')
  124. @helpers.skipIfNoService(['object_store'])
  125. def test_azure_bucket_object_delete(self):
  126. containers = self.provider.object_store.find("container2")
  127. cont = containers[0]
  128. blocks = cont.list()
  129. block = blocks[0]
  130. block.delete()
  131. self.assertEqual(
  132. len(cont.list()), 3)
  133. deleted = block.delete()
  134. self.assertEqual(deleted, False)
  135. @helpers.skipIfNoService(['object_store'])
  136. def test_azure_bucket_object_upload_from_file(self):
  137. containers = self.provider.object_store.find("container2")
  138. cont = containers[0]
  139. blocks = cont.list()
  140. block = blocks[0]
  141. block.upload_from_file('blob2Content')
  142. self.assertEqual(
  143. block.iter_content().getvalue(), b'blob2Content')
  144. @helpers.skipIfNoService(['object_store'])
  145. def test_azure_bucket_object_upload_exception(self):
  146. containers = self.provider.object_store.find("container2")
  147. cont = containers[0]
  148. blob = cont.create_object('block4')
  149. uploaded = blob.upload_from_file('E:\\blob2Content.txt')
  150. self.assertEqual(uploaded, False)
  151. uploaded = blob.upload('blob2Content')
  152. self.assertEqual(uploaded, False)
  153. blob.delete()
  154. @helpers.skipIfNoService(['object_store'])
  155. def test_azure_bucket_object_generate_url(self):
  156. containers = self.provider.object_store.find("container2")
  157. cont = containers[0]
  158. blocks = cont.list()
  159. block = blocks[0]
  160. url = block.generate_url()
  161. print(str(url))
  162. self.assertEqual(
  163. str(url),
  164. 'https://cloudbridgeazure.blob.core.windows.net/vhds/block1')