test_object_store_service.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import filecmp
  2. import os
  3. import tempfile
  4. from datetime import datetime
  5. from io import BytesIO
  6. from unittest import mock
  7. from unittest import skip
  8. import requests
  9. from cloudbridge.base import helpers as cb_helpers
  10. from cloudbridge.base.resources import BaseBucketObject
  11. from cloudbridge.interfaces.exceptions import DuplicateResourceException
  12. from cloudbridge.interfaces.provider import TestMockHelperMixin
  13. from cloudbridge.interfaces.resources import Bucket
  14. from cloudbridge.interfaces.resources import BucketObject
  15. from tests import helpers
  16. from tests.helpers import ProviderTestBase
  17. from tests.helpers import standard_interface_tests as sit
  18. # S3 (and Swift) require every part except the last to be >= 5 MiB. Tests use
  19. # this size so they remain valid against real cloud providers, not just moto.
  20. MIN_PART_SIZE = 5 * 1024 * 1024
  21. class CloudObjectStoreServiceTestCase(ProviderTestBase):
  22. _multiprocess_can_split_ = True
  23. @helpers.skipIfNoService(['storage._bucket_objects', 'storage.buckets'])
  24. def test_storage_services_event_pattern(self):
  25. # pylint:disable=protected-access
  26. self.assertEqual(
  27. self.provider.storage.buckets._service_event_pattern,
  28. "provider.storage.buckets",
  29. "Event pattern for {} service should be '{}', "
  30. "but found '{}'.".format("buckets",
  31. "provider.storage.buckets",
  32. self.provider.storage.buckets.
  33. _service_event_pattern))
  34. # pylint:disable=protected-access
  35. self.assertEqual(
  36. self.provider.storage._bucket_objects._service_event_pattern,
  37. "provider.storage._bucket_objects",
  38. "Event pattern for {} service should be '{}', "
  39. "but found '{}'.".format("bucket_objects",
  40. "provider.storage._bucket_objects",
  41. self.provider.storage._bucket_objects.
  42. _service_event_pattern))
  43. @helpers.skipIfNoService(['storage.buckets'])
  44. def test_crud_bucket(self):
  45. def create_bucket(name):
  46. return self.provider.storage.buckets.create(name)
  47. def cleanup_bucket(bucket):
  48. if bucket:
  49. bucket.delete()
  50. def extra_tests(bucket):
  51. # Recreating existing bucket should raise an exception
  52. with self.assertRaises(DuplicateResourceException):
  53. self.provider.storage.buckets.create(name=bucket.name)
  54. sit.check_crud(self, self.provider.storage.buckets, Bucket,
  55. "cb-crudbucket", create_bucket, cleanup_bucket,
  56. extra_test_func=extra_tests)
  57. @helpers.skipIfNoService(['storage.buckets'])
  58. def test_crud_bucket_object(self):
  59. test_bucket = None
  60. def create_bucket_obj(name):
  61. obj = test_bucket.objects.create(name)
  62. # TODO: This is wrong. We shouldn't have to have a separate
  63. # call to upload some content before being able to delete
  64. # the content. Maybe the create_object method should accept
  65. # the file content as a parameter.
  66. obj.upload("dummy content")
  67. return obj
  68. def cleanup_bucket_obj(bucket_obj):
  69. if bucket_obj:
  70. bucket_obj.delete()
  71. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  72. name = "cb-crudbucketobj-{0}".format(helpers.get_uuid())
  73. test_bucket = self.provider.storage.buckets.create(name)
  74. sit.check_crud(self, test_bucket.objects, BucketObject,
  75. "cb-bucketobj", create_bucket_obj,
  76. cleanup_bucket_obj, skip_name_check=True)
  77. @helpers.skipIfNoService(['storage.buckets'])
  78. def test_crud_bucket_object_properties(self):
  79. # Create a new bucket, upload some contents into the bucket, and
  80. # check whether list properly detects the new content.
  81. # Delete everything afterwards.
  82. name = "cbtestbucketobjs-{0}".format(helpers.get_uuid())
  83. test_bucket = self.provider.storage.buckets.create(name)
  84. # ensure that the bucket is empty
  85. objects = test_bucket.objects.list()
  86. self.assertEqual([], objects)
  87. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  88. obj_name_prefix = "hello"
  89. obj_name = obj_name_prefix + "_world.txt"
  90. obj = test_bucket.objects.create(obj_name)
  91. with cb_helpers.cleanup_action(lambda: obj.delete()):
  92. # TODO: This is wrong. We shouldn't have to have a separate
  93. # call to upload some content before being able to delete
  94. # the content. Maybe the create_object method should accept
  95. # the file content as a parameter.
  96. obj.upload("dummy content")
  97. objs = test_bucket.objects.list()
  98. self.assertTrue(
  99. isinstance(objs[0].size, int),
  100. "Object size property needs to be a int, not {0}".format(
  101. type(objs[0].size)))
  102. # GET an object as the size property implementation differs
  103. # for objects returned by LIST and GET.
  104. obj = test_bucket.objects.get(objs[0].id)
  105. self.assertTrue(
  106. isinstance(objs[0].size, int),
  107. "Object size property needs to be an int, not {0}".format(
  108. type(obj.size)))
  109. self.assertTrue(
  110. datetime.strptime(objs[0].last_modified[:23],
  111. "%Y-%m-%dT%H:%M:%S.%f"),
  112. "Object's last_modified field format {0} not matching."
  113. .format(objs[0].last_modified))
  114. # check iteration
  115. iter_objs = list(test_bucket.objects)
  116. self.assertListEqual(iter_objs, objs)
  117. obj_too = test_bucket.objects.get(obj_name)
  118. self.assertTrue(
  119. isinstance(obj_too, BucketObject),
  120. "Did not get object {0} of expected type.".format(obj_too))
  121. prefix_filtered_list = test_bucket.objects.list(
  122. prefix=obj_name_prefix)
  123. self.assertTrue(
  124. len(objs) == len(prefix_filtered_list) == 1,
  125. 'The number of objects returned by list function, '
  126. 'with and without a prefix, are expected to be equal, '
  127. 'but its detected otherwise.')
  128. sit.check_delete(self, test_bucket.objects, obj)
  129. @helpers.skipIfNoService(['storage.buckets'])
  130. def test_upload_download_bucket_content(self):
  131. name = "cbtestbucketobjs-{0}".format(helpers.get_uuid())
  132. test_bucket = self.provider.storage.buckets.create(name)
  133. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  134. obj_name = "hello_upload_download.txt"
  135. obj = test_bucket.objects.create(obj_name)
  136. with cb_helpers.cleanup_action(lambda: obj.delete()):
  137. content = b"Hello World. Here's some content."
  138. # TODO: Upload and download methods accept different parameter
  139. # types. Need to make this consistent - possibly provider
  140. # multiple methods like upload_from_file, from_stream etc.
  141. obj.upload(content)
  142. target_stream = BytesIO()
  143. obj.save_content(target_stream)
  144. self.assertEqual(target_stream.getvalue(), content)
  145. target_stream2 = BytesIO()
  146. for data in obj.iter_content():
  147. target_stream2.write(data)
  148. self.assertEqual(target_stream2.getvalue(), content)
  149. @helpers.skipIfNoService(['storage.buckets'])
  150. def test_generate_url(self):
  151. name = "cbtestbucketobjs-{0}".format(helpers.get_uuid())
  152. test_bucket = self.provider.storage.buckets.create(name)
  153. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  154. obj_name = "hello_upload_download.txt"
  155. obj = test_bucket.objects.create(obj_name)
  156. with cb_helpers.cleanup_action(lambda: obj.delete()):
  157. content = b"Hello World. Generate a url."
  158. obj.upload(content)
  159. target_stream = BytesIO()
  160. obj.save_content(target_stream)
  161. url = obj.generate_url(100)
  162. if isinstance(self.provider, TestMockHelperMixin):
  163. raise self.skipTest(
  164. "Skipping rest of test - mock providers can't"
  165. " access generated url")
  166. self.assertEqual(requests.get(url).content, content)
  167. @helpers.skipIfNoService(['storage.buckets'])
  168. def test_generate_url_write_permissions(self):
  169. name = "cbtestbucketobjs-{0}".format(helpers.get_uuid())
  170. test_bucket = self.provider.storage.buckets.create(name)
  171. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  172. obj_name = "hello_upload_download.txt"
  173. obj = test_bucket.objects.create(obj_name)
  174. with cb_helpers.cleanup_action(lambda: obj.delete()):
  175. content = b"Hello World. Generate a url."
  176. url = obj.generate_url(100, writable=True)
  177. if isinstance(self.provider, TestMockHelperMixin):
  178. raise self.skipTest(
  179. "Skipping rest of test - mock providers can't"
  180. " access generated url")
  181. # Only Azure requires the x-ms-blob-type header to be present, but there's no harm
  182. # in sending this in for all providers.
  183. headers = {'x-ms-blob-type': 'BlockBlob'}
  184. response = requests.put(url, headers=headers, data=content)
  185. response.raise_for_status()
  186. obj = test_bucket.objects.get(obj_name)
  187. target_stream = BytesIO()
  188. obj.save_content(target_stream)
  189. self.assertEqual(target_stream.getvalue(), content)
  190. @helpers.skipIfNoService(['storage.buckets'])
  191. def test_upload_download_bucket_content_from_file(self):
  192. name = "cbtestbucketobjs-{0}".format(helpers.get_uuid())
  193. test_bucket = self.provider.storage.buckets.create(name)
  194. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  195. obj_name = "hello_upload_download.txt"
  196. obj = test_bucket.objects.create(obj_name)
  197. with cb_helpers.cleanup_action(lambda: obj.delete()):
  198. test_file = os.path.join(
  199. helpers.get_test_fixtures_folder(), 'logo.jpg')
  200. obj.upload_from_file(test_file)
  201. target_stream = BytesIO()
  202. obj.save_content(target_stream)
  203. with open(test_file, 'rb') as f:
  204. self.assertEqual(target_stream.getvalue(), f.read())
  205. @helpers.skipIfNoService(['storage.buckets'])
  206. def test_explicit_multipart_upload_roundtrip(self):
  207. name = "cbtest-mpu-{0}".format(helpers.get_uuid())
  208. test_bucket = self.provider.storage.buckets.create(name)
  209. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  210. obj_name = "mpu-roundtrip.bin"
  211. obj = test_bucket.objects.create(obj_name)
  212. with cb_helpers.cleanup_action(lambda: obj.delete()):
  213. part1 = b"a" * MIN_PART_SIZE
  214. part2 = b"b" * MIN_PART_SIZE
  215. part3 = b"c" * 1024 # final part may be smaller than the min
  216. expected = part1 + part2 + part3
  217. upload = obj.create_multipart_upload()
  218. parts = [upload.upload_part(1, part1),
  219. upload.upload_part(2, part2),
  220. upload.upload_part(3, part3)]
  221. upload.complete(parts)
  222. stored = test_bucket.objects.get(obj_name)
  223. self.assertIsNotNone(
  224. stored, "Object should exist after multipart completion")
  225. self.assertEqual(stored.size, len(expected))
  226. target_stream = BytesIO()
  227. stored.save_content(target_stream)
  228. self.assertEqual(target_stream.getvalue(), expected)
  229. @helpers.skipIfNoService(['storage.buckets'])
  230. def test_multipart_upload_out_of_order_parts(self):
  231. name = "cbtest-mpu-{0}".format(helpers.get_uuid())
  232. test_bucket = self.provider.storage.buckets.create(name)
  233. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  234. obj_name = "mpu-ooo.bin"
  235. obj = test_bucket.objects.create(obj_name)
  236. with cb_helpers.cleanup_action(lambda: obj.delete()):
  237. part1 = b"1" * MIN_PART_SIZE
  238. part2 = b"2" * MIN_PART_SIZE
  239. part3 = b"3" * 1024
  240. expected = part1 + part2 + part3
  241. upload = obj.create_multipart_upload()
  242. # Upload and collect parts out of order; complete must
  243. # assemble them in ascending part-number order regardless.
  244. p3 = upload.upload_part(3, part3)
  245. p1 = upload.upload_part(1, part1)
  246. p2 = upload.upload_part(2, part2)
  247. upload.complete([p3, p1, p2])
  248. stored = test_bucket.objects.get(obj_name)
  249. target_stream = BytesIO()
  250. stored.save_content(target_stream)
  251. self.assertEqual(target_stream.getvalue(), expected)
  252. @helpers.skipIfNoService(['storage.buckets'])
  253. def test_multipart_upload_abort(self):
  254. name = "cbtest-mpu-{0}".format(helpers.get_uuid())
  255. test_bucket = self.provider.storage.buckets.create(name)
  256. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  257. obj_name = "mpu-abort.bin"
  258. obj = test_bucket.objects.create(obj_name)
  259. upload = obj.create_multipart_upload()
  260. upload.upload_part(1, b"a" * MIN_PART_SIZE)
  261. upload.abort()
  262. # Aborting must not materialise the target object.
  263. self.assertIsNone(
  264. test_bucket.objects.get(obj_name),
  265. "Object should not exist after a multipart upload is aborted")
  266. @helpers.skipIfNoService(['storage.buckets'])
  267. def test_transparent_upload_large_stream_uses_multipart(self):
  268. name = "cbtest-mpu-{0}".format(helpers.get_uuid())
  269. test_bucket = self.provider.storage.buckets.create(name)
  270. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  271. obj_name = "transparent.bin"
  272. obj = test_bucket.objects.create(obj_name)
  273. with cb_helpers.cleanup_action(lambda: obj.delete()):
  274. content = b"x" * (MIN_PART_SIZE * 2 + 1024)
  275. # Lower the threshold/part size so a modest stream crosses it,
  276. # and assert the multipart path is taken (each provider routes
  277. # its own way underneath) and the object round-trips exactly.
  278. with mock.patch.object(
  279. BaseBucketObject, 'CB_MULTIPART_THRESHOLD',
  280. MIN_PART_SIZE), \
  281. mock.patch.object(
  282. BaseBucketObject, 'CB_MULTIPART_PART_SIZE',
  283. MIN_PART_SIZE), \
  284. mock.patch.object(
  285. obj, '_upload_multipart',
  286. wraps=obj._upload_multipart) as spy:
  287. obj.upload(BytesIO(content))
  288. spy.assert_called_once()
  289. stored = test_bucket.objects.get(obj_name)
  290. self.assertEqual(stored.size, len(content))
  291. target_stream = BytesIO()
  292. stored.save_content(target_stream)
  293. self.assertEqual(target_stream.getvalue(), content)
  294. @helpers.skipIfNoService(['storage.buckets'])
  295. def test_small_upload_stays_single_shot(self):
  296. name = "cbtest-mpu-{0}".format(helpers.get_uuid())
  297. test_bucket = self.provider.storage.buckets.create(name)
  298. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  299. obj = test_bucket.objects.create("small.txt")
  300. with cb_helpers.cleanup_action(lambda: obj.delete()):
  301. content = b"a small payload below the multipart threshold"
  302. # A payload below the threshold must not trigger multipart.
  303. svc = self.provider.storage._bucket_objects
  304. with mock.patch.object(
  305. svc, 'create_multipart_upload',
  306. wraps=svc.create_multipart_upload) as spy:
  307. obj.upload(content)
  308. spy.assert_not_called()
  309. target_stream = BytesIO()
  310. obj.save_content(target_stream)
  311. self.assertEqual(target_stream.getvalue(), content)
  312. @skip("Skip unless you want to test objects bigger than 5GB")
  313. @helpers.skipIfNoService(['storage.buckets'])
  314. def test_upload_download_bucket_content_with_large_file(self):
  315. # Creates a 6 Gig file in the temp directory, then uploads it to
  316. # Swift. Once uploaded, then downloads to a new file in the temp
  317. # directory and compares the two files to see if they match.
  318. temp_dir = tempfile.gettempdir()
  319. file_name = '6GigTest.tmp'
  320. six_gig_file = os.path.join(temp_dir, file_name)
  321. with open(six_gig_file, "wb") as out:
  322. out.truncate(6 * 1024 * 1024 * 1024) # 6 Gig...
  323. with cb_helpers.cleanup_action(lambda: os.remove(six_gig_file)):
  324. download_file = "{0}/cbtestfile-{1}".format(temp_dir, file_name)
  325. bucket_name = "cbtestbucketlargeobjs-{0}".format(
  326. helpers.get_uuid())
  327. test_bucket = self.provider.storage.buckets.create(bucket_name)
  328. with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
  329. test_obj = test_bucket.objects.create(file_name)
  330. with cb_helpers.cleanup_action(lambda: test_obj.delete()):
  331. file_uploaded = test_obj.upload_from_file(six_gig_file)
  332. self.assertTrue(file_uploaded, "Could not upload object?")
  333. with cb_helpers.cleanup_action(
  334. lambda: os.remove(download_file)):
  335. with open(download_file, 'wb') as f:
  336. test_obj.save_content(f)
  337. self.assertTrue(
  338. filecmp.cmp(six_gig_file, download_file),
  339. "Uploaded file != downloaded")