test_block_store_service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import time
  2. import six
  3. from cloudbridge.cloud.factory import ProviderList
  4. from cloudbridge.cloud.interfaces import SnapshotState
  5. from cloudbridge.cloud.interfaces import VolumeState
  6. from cloudbridge.cloud.interfaces.provider import TestMockHelperMixin
  7. from cloudbridge.cloud.interfaces.resources import AttachmentInfo
  8. from cloudbridge.cloud.interfaces.resources import Snapshot
  9. from cloudbridge.cloud.interfaces.resources import Volume
  10. from test import helpers
  11. from test.helpers import ProviderTestBase
  12. from test.helpers import standard_interface_tests as sit
  13. class CloudBlockStoreServiceTestCase(ProviderTestBase):
  14. _multiprocess_can_split_ = True
  15. @helpers.skipIfNoService(['storage.volumes'])
  16. def test_crud_volume(self):
  17. """
  18. Create a new volume, check whether the expected values are set,
  19. and delete it
  20. """
  21. def create_vol(label):
  22. return self.provider.storage.volumes.create(
  23. label, 1,
  24. helpers.get_provider_test_data(self.provider, "placement"))
  25. def cleanup_vol(vol):
  26. if vol:
  27. vol.delete()
  28. vol.wait_for([VolumeState.DELETED, VolumeState.UNKNOWN],
  29. terminal_states=[VolumeState.ERROR])
  30. sit.check_crud(self, self.provider.storage.volumes, Volume,
  31. "cb-createvol", create_vol, cleanup_vol)
  32. @helpers.skipIfNoService(['storage.volumes'])
  33. def test_attach_detach_volume(self):
  34. """
  35. Create a new volume, and attempt to attach it to an instance
  36. """
  37. label = "cb-attachvol-{0}".format(helpers.get_uuid())
  38. # Declare these variables and late binding will allow
  39. # the cleanup method access to the most current values
  40. test_instance = None
  41. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  42. test_instance)):
  43. subnet = helpers.get_or_create_default_subnet(
  44. self.provider)
  45. test_instance = helpers.get_test_instance(
  46. self.provider, label, subnet=subnet)
  47. test_vol = self.provider.storage.volumes.create(
  48. label, 1, test_instance.zone_id)
  49. with helpers.cleanup_action(lambda: test_vol.delete()):
  50. test_vol.wait_till_ready()
  51. test_vol.attach(test_instance, '/dev/sda2')
  52. test_vol.wait_for(
  53. [VolumeState.IN_USE],
  54. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  55. test_vol.detach()
  56. test_vol.wait_for(
  57. [VolumeState.AVAILABLE],
  58. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  59. @helpers.skipIfNoService(['storage.volumes'])
  60. def test_volume_properties(self):
  61. """
  62. Test volume properties
  63. """
  64. label = "cb-volprops-{0}".format(helpers.get_uuid())
  65. vol_desc = 'newvoldesc1'
  66. # Declare these variables and late binding will allow
  67. # the cleanup method access to the most current values
  68. test_instance = None
  69. with helpers.cleanup_action(lambda: helpers.cleanup_test_resources(
  70. test_instance)):
  71. subnet = helpers.get_or_create_default_subnet(
  72. self.provider)
  73. test_instance = helpers.get_test_instance(
  74. self.provider, label, subnet=subnet)
  75. test_vol = self.provider.storage.volumes.create(
  76. label, 1, test_instance.zone_id, description=vol_desc)
  77. with helpers.cleanup_action(lambda: test_vol.delete()):
  78. test_vol.wait_till_ready()
  79. self.assertTrue(
  80. isinstance(test_vol.size, six.integer_types) and
  81. test_vol.size >= 0,
  82. "Volume.size must be a positive number, but got %s"
  83. % test_vol.size)
  84. self.assertTrue(
  85. test_vol.description is None or
  86. isinstance(test_vol.description, six.string_types),
  87. "Volume.description must be None or a string. Got: %s"
  88. % test_vol.description)
  89. self.assertIsNone(test_vol.source)
  90. self.assertIsNone(test_vol.source)
  91. self.assertIsNotNone(test_vol.create_time)
  92. self.assertIsNotNone(test_vol.zone_id)
  93. self.assertIsNone(test_vol.attachments)
  94. test_vol.attach(test_instance, '/dev/sda2')
  95. test_vol.wait_for(
  96. [VolumeState.IN_USE],
  97. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  98. self.assertIsNotNone(test_vol.attachments)
  99. self.assertIsInstance(test_vol.attachments, AttachmentInfo)
  100. self.assertEqual(test_vol.attachments.volume, test_vol)
  101. self.assertEqual(test_vol.attachments.instance_id,
  102. test_instance.id)
  103. if not self.provider.PROVIDER_ID == 'azure':
  104. self.assertEqual(test_vol.attachments.device,
  105. "/dev/sda2")
  106. test_vol.detach()
  107. test_vol.label = 'newvolname1'
  108. test_vol.wait_for(
  109. [VolumeState.AVAILABLE],
  110. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  111. self.assertEqual(test_vol.label, 'newvolname1')
  112. self.assertEqual(test_vol.description, vol_desc)
  113. self.assertIsNone(test_vol.attachments)
  114. test_vol.wait_for(
  115. [VolumeState.AVAILABLE],
  116. terminal_states=[VolumeState.ERROR, VolumeState.DELETED])
  117. @helpers.skipIfNoService(['storage.snapshots'])
  118. def test_crud_snapshot(self):
  119. """
  120. Create a new volume, create a snapshot of the volume, and check
  121. whether list_snapshots properly detects the new snapshot.
  122. Delete everything afterwards.
  123. """
  124. label = "cb-crudsnap-{0}".format(helpers.get_uuid())
  125. test_vol = self.provider.storage.volumes.create(
  126. label, 1,
  127. helpers.get_provider_test_data(self.provider, "placement"))
  128. with helpers.cleanup_action(lambda: test_vol.delete()):
  129. test_vol.wait_till_ready()
  130. def create_snap(label):
  131. return test_vol.create_snapshot(label=label,
  132. description=label)
  133. def cleanup_snap(snap):
  134. if snap:
  135. snap.delete()
  136. snap.wait_for([SnapshotState.UNKNOWN],
  137. terminal_states=[SnapshotState.ERROR])
  138. sit.check_crud(self, self.provider.storage.snapshots, Snapshot,
  139. "cb-snap", create_snap, cleanup_snap)
  140. # Test creation of a snap via SnapshotService
  141. def create_snap2(label):
  142. return self.provider.storage.snapshots.create(
  143. label=label, volume=test_vol, description=label)
  144. if (self.provider.PROVIDER_ID == ProviderList.AWS and
  145. not isinstance(self.provider, TestMockHelperMixin)):
  146. time.sleep(15) # Or get SnapshotCreationPerVolumeRateExceeded
  147. sit.check_crud(self, self.provider.storage.snapshots, Snapshot,
  148. "cb-snaptwo", create_snap2, cleanup_snap)
  149. @helpers.skipIfNoService(['storage.snapshots'])
  150. def test_snapshot_properties(self):
  151. """
  152. Test snapshot properties
  153. """
  154. label = "cb-snapprop-{0}".format(helpers.get_uuid())
  155. test_vol = self.provider.storage.volumes.create(
  156. label, 1,
  157. helpers.get_provider_test_data(self.provider, "placement"))
  158. with helpers.cleanup_action(lambda: test_vol.delete()):
  159. test_vol.wait_till_ready()
  160. snap_label = "cb-snap-{0}".format(label)
  161. test_snap = test_vol.create_snapshot(label=snap_label,
  162. description=snap_label)
  163. def cleanup_snap(snap):
  164. if snap:
  165. snap.delete()
  166. snap.wait_for([SnapshotState.UNKNOWN],
  167. terminal_states=[SnapshotState.ERROR])
  168. with helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
  169. test_snap.wait_till_ready()
  170. self.assertTrue(isinstance(test_vol.size, six.integer_types))
  171. self.assertEqual(
  172. test_snap.size, test_vol.size,
  173. "Snapshot.size must match original volume's size: %s"
  174. " but is: %s" % (test_vol.size, test_snap.size))
  175. self.assertTrue(
  176. test_vol.description is None or
  177. isinstance(test_vol.description, six.string_types),
  178. "Snapshot.description must be None or a string. Got: %s"
  179. % test_vol.description)
  180. self.assertEqual(test_vol.id, test_snap.volume_id)
  181. self.assertIsNotNone(test_vol.create_time)
  182. test_snap.label = 'snapnewname1'
  183. test_snap.description = 'snapnewdescription1'
  184. test_snap.refresh()
  185. self.assertEqual(test_snap.label, 'snapnewname1')
  186. self.assertEqual(test_snap.description, 'snapnewdescription1')
  187. # Test volume creation from a snapshot (via VolumeService)
  188. sv_label = "cb-snapvol-{0}".format(test_snap.name)
  189. snap_vol = self.provider.storage.volumes.create(
  190. sv_label, 1,
  191. helpers.get_provider_test_data(self.provider, "placement"),
  192. snapshot=test_snap)
  193. with helpers.cleanup_action(lambda: snap_vol.delete()):
  194. snap_vol.wait_till_ready()
  195. # Test volume creation from a snapshot (via Snapshot)
  196. snap_vol2 = test_snap.create_volume(
  197. helpers.get_provider_test_data(self.provider, "placement"))
  198. with helpers.cleanup_action(lambda: snap_vol2.delete()):
  199. snap_vol2.wait_till_ready()