test_azure_object_store_service.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import test.helpers as helpers
  2. from test.helpers import ProviderTestBase
  3. class AzureObjectStoreServiceTestCase(ProviderTestBase):
  4. def __init__(self, methodName, provider):
  5. super(AzureObjectStoreServiceTestCase, self).__init__(
  6. methodName=methodName, provider=provider)
  7. @helpers.skipIfNoService(['object_store'])
  8. def test_azure_bucket_create(self):
  9. container = self.provider.object_store.create("container3")
  10. print("Create - " + str(container))
  11. self.assertEqual(
  12. str(container), "<CB-AzureBucket: container3>")
  13. @helpers.skipIfNoService(['object_store'])
  14. def test_azure_bucket_list(self):
  15. containerList = self.provider.object_store.list()
  16. print("List Container - " + str(containerList))
  17. self.assertEqual(
  18. len(containerList), 2)
  19. @helpers.skipIfNoService(['object_store'])
  20. def test_azure_bucket_find_Exist(self):
  21. container = self.provider.object_store.find("container")
  22. print("Find Exist - " + str(container))
  23. self.assertEqual(
  24. len(container), 2)
  25. @helpers.skipIfNoService(['object_store'])
  26. def test_azure_bucket_find_NotExist(self):
  27. # For testing the case when container does not exist
  28. container = self.provider.object_store.find("container23")
  29. print("Find Not Exist - " + str(container))
  30. self.assertEqual(
  31. len(container), 0)
  32. @helpers.skipIfNoService(['object_store'])
  33. def test_azure_bucket_get_Exist(self):
  34. container = self.provider.object_store.get("container2")
  35. print("Get Exist - " + str(container))
  36. self.assertTrue(
  37. str(container) == "<CB-AzureBucket: container2>",
  38. "Object find returned value should be container3")
  39. @helpers.skipIfNoService(['object_store'])
  40. def test_azure_bucket_get_NotExist(self):
  41. container = self.provider.object_store.get("container23")
  42. print("Get Not Exist - " + str(container))
  43. self.assertEqual(
  44. str(container), 'None')
  45. @helpers.skipIfNoService(['object_store'])
  46. def test_azure_bucket_delete(self):
  47. containers = self.provider.object_store.find("container1")
  48. cont = containers[0]
  49. contDel = cont.delete()
  50. print("Bucket delete - " + str(contDel))
  51. self.assertEqual(
  52. contDel, True)
  53. @helpers.skipIfNoService(['object_store'])
  54. def test_azure_bucket_create_object(self):
  55. containers = self.provider.object_store.find("container1")
  56. cont = containers[0]
  57. contDel = cont.create_object("block1")
  58. print("Create object - " + str(contDel))
  59. self.assertEqual(
  60. str(contDel), '<CB-AzureBucketObject: block1>')
  61. @helpers.skipIfNoService(['object_store'])
  62. def test_azure_bucket_object_exists__internalE(self):
  63. containers = self.provider.object_store.find("container2")
  64. cont = containers[0]
  65. contDel = cont.exists("block2")
  66. print("List object - " + str(contDel))
  67. self.assertEqual(
  68. str(contDel), 'True')
  69. @helpers.skipIfNoService(['object_store'])
  70. def test_azure_bucket_object_exists__internalNE(self):
  71. containers = self.provider.object_store.find("container2")
  72. cont = containers[0]
  73. contDel = cont.exists("blob3")
  74. print("List object - " + str(contDel))
  75. self.assertEqual(
  76. str(contDel), 'False')
  77. @helpers.skipIfNoService(['object_store'])
  78. def test_azure_bucket_object_list(self):
  79. containers = self.provider.object_store.find("container2")
  80. cont = containers[0]
  81. contDel = cont.list()
  82. print("List object - " + str(contDel))
  83. self.assertEqual(
  84. len(contDel), 2)
  85. @helpers.skipIfNoService(['object_store'])
  86. def test_azure_bucket_object_get(self):
  87. containers = self.provider.object_store.find("container2")
  88. cont = containers[0]
  89. contDel = cont.get("block2")
  90. print("List object - " + str(contDel))
  91. self.assertEqual(
  92. str(contDel), "<CB-AzureBucketObject: block2>")
  93. @helpers.skipIfNoService(['object_store'])
  94. def test_azure_bucket_object_iter_content(self):
  95. containers = self.provider.object_store.find("container2")
  96. cont = containers[0]
  97. blocks = cont.list()
  98. block = blocks[0]
  99. content = block.iter_content()
  100. print("Iter content - " + str(content))
  101. self.assertEqual(
  102. content.getvalue(), b'blob2Content')
  103. @helpers.skipIfNoService(['object_store'])
  104. def test_azure_bucket_object_iter_content_ifBlobNotExists(self):
  105. containers = self.provider.object_store.find("container2")
  106. cont = containers[0]
  107. block = cont.get('block3')
  108. content = block.iter_content()
  109. self.assertEqual(
  110. content, None, 'content should be None')
  111. @helpers.skipIfNoService(['object_store'])
  112. def test_azure_bucket_object_upload(self):
  113. containers = self.provider.object_store.find("container2")
  114. cont = containers[0]
  115. blocks = cont.list()
  116. block = blocks[0]
  117. block.upload('blob1Content')
  118. self.assertEqual(
  119. block.iter_content().getvalue(), b'blob1Content')
  120. @helpers.skipIfNoService(['object_store'])
  121. def test_azure_bucket_object_delete(self):
  122. containers = self.provider.object_store.find("container2")
  123. cont = containers[0]
  124. blocks = cont.list()
  125. block = blocks[0]
  126. block.delete()
  127. self.assertEqual(
  128. len(cont.list()), 2)
  129. @helpers.skipIfNoService(['object_store'])
  130. def test_azure_bucket_object_upload_from_file(self):
  131. containers = self.provider.object_store.find("container2")
  132. cont = containers[0]
  133. blocks = cont.list()
  134. block = blocks[0]
  135. block.upload_from_file('blob2Content')
  136. self.assertEqual(
  137. block.iter_content().getvalue(), b'blob2Content')
  138. @helpers.skipIfNoService(['object_store'])
  139. def test_azure_bucket_object_generate_url(self):
  140. containers = self.provider.object_store.find("container2")
  141. cont = containers[0]
  142. blocks = cont.list()
  143. block = blocks[0]
  144. url = block.generate_url()
  145. print(str(url))
  146. self.assertEqual(
  147. str(url), 'https://cloudbridgeazure.blob.core.windows.net/vhds/block1')