replicator.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. # Copyright 2019 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import json
  4. import os
  5. import shutil
  6. import tempfile
  7. import time
  8. from oslo_config import cfg
  9. from oslo_log import log as logging
  10. from oslo_utils import units
  11. import paramiko
  12. from sshtunnel import SSHTunnelForwarder
  13. from coriolis import exception
  14. from coriolis.providers import provider_utils
  15. from coriolis import utils
  16. LOG = logging.getLogger(__name__)
  17. HASH_METHOD_SHA256 = "sha256"
  18. HASH_METHOD_XXHASH = "xxhash"
  19. REPLICATOR_PATH = "/usr/bin/replicator"
  20. REPLICATOR_DIR = "/etc/coriolis-replicator"
  21. REPLICATOR_STATE = "/tmp/replicator_state.json"
  22. REPLICATOR_USERNAME = "replicator"
  23. REPLICATOR_GROUP_NAME = "replicator"
  24. REPLICATOR_SVC_NAME = "coriolis-replicator"
  25. DEFAULT_REPLICATOR_PORT = 4433
  26. replicator_opts = [
  27. cfg.IntOpt('port',
  28. default=DEFAULT_REPLICATOR_PORT,
  29. help='The replicator TCP port.'),
  30. cfg.IntOpt('default_requests_timeout',
  31. default=60,
  32. help='Number of seconds for HTTP request timeouts.'),
  33. ]
  34. CONF = cfg.CONF
  35. CONF.register_opts(replicator_opts, 'replicator')
  36. class Client(object):
  37. def __init__(self, ip, port, credentials, ssh_conn_info,
  38. event_handler, use_compression=False,
  39. use_tunnel=False):
  40. self._ip = ip
  41. self._use_tunnel = use_tunnel
  42. self._port = port
  43. self._event_manager = event_handler
  44. self._creds = credentials
  45. self._ssh_conn_info = ssh_conn_info
  46. self._use_compression = use_compression
  47. self._cli = self._get_session()
  48. self._tunnel = None
  49. self._ip_via_tunnel = None
  50. self._port_via_tunnel = None
  51. self._test_connection()
  52. @property
  53. def repl_host(self):
  54. if self._ip_via_tunnel is not None:
  55. return self._ip_via_tunnel
  56. return self._ip
  57. @property
  58. def repl_port(self):
  59. if self._port_via_tunnel is not None:
  60. return self._port_via_tunnel
  61. return self._port
  62. @property
  63. def _base_uri(self):
  64. return "https://%s:%s" % (self.repl_host, self.repl_port)
  65. def _setup_tunnel_connection(self):
  66. self._tunnel = self._get_ssh_tunnel()
  67. self._tunnel.start()
  68. host, port = self._tunnel.local_bind_address
  69. self._ip_via_tunnel = host
  70. self._port_via_tunnel = port
  71. def _test_connection(self):
  72. """
  73. Attempt to connect to the IP/port pair. If direct connection
  74. fails, set up a SSH tunnel and attempt a connection through that.
  75. """
  76. if self._use_tunnel:
  77. # It was explicitly requested to use a tunnel
  78. self._setup_tunnel_connection()
  79. else:
  80. self._event_manager.progress_update(
  81. "Testing direct connection to replicator (%s:%s)" % (
  82. self._ip, self._port))
  83. try:
  84. utils.wait_for_port_connectivity(
  85. self._ip, self._port, max_wait=30)
  86. return
  87. except BaseException as err:
  88. LOG.debug("failed to connect to %s:%s Error: %s "
  89. "Trying tunneled connection" % (
  90. self._ip, self._port, err))
  91. self._event_manager.progress_update(
  92. "Direct connection to replicator failed. "
  93. "Setting up tunnel.")
  94. self._setup_tunnel_connection()
  95. self._event_manager.progress_update(
  96. "Testing connection to replicator (%s:%s)" % (
  97. self.repl_host, self.repl_port))
  98. try:
  99. utils.wait_for_port_connectivity(
  100. self.repl_host, self.repl_port, max_wait=30)
  101. except BaseException as err:
  102. self._tunnel.stop()
  103. LOG.warning(
  104. "failed to connect to replicator: %s" % err)
  105. raise
  106. def _get_ssh_tunnel(self):
  107. """
  108. gets a SSH tunnel object. Note, this does not start the tunnel,
  109. it simply creates the object, without actually connecting.
  110. """
  111. remote_host = self._ssh_conn_info["hostname"]
  112. remote_user = self._ssh_conn_info["username"]
  113. local_host = "127.0.0.1"
  114. remote_port = self._ssh_conn_info.get("port", 22)
  115. pkey = self._ssh_conn_info.get("pkey")
  116. password = self._ssh_conn_info.get("password")
  117. if any([pkey, password]) is False:
  118. raise exception.CoriolisException(
  119. "Either password or pkey is required")
  120. server = SSHTunnelForwarder(
  121. (remote_host, remote_port),
  122. ssh_username=remote_user,
  123. ssh_pkey=pkey,
  124. ssh_password=password,
  125. # bind to remote replicator port
  126. remote_bind_address=(local_host, self._port),
  127. # select random port on this end.
  128. local_bind_address=(local_host, 0),
  129. )
  130. return server
  131. def raw_disk_uri(self, disk_name):
  132. diskUri = "%s/device/%s" % (self._base_uri, disk_name)
  133. return diskUri
  134. def _get_session(self):
  135. sess = provider_utils.ProviderSession()
  136. sess.cert = (
  137. self._creds["client_cert"],
  138. self._creds["client_key"])
  139. sess.verify = self._creds["ca_cert"]
  140. return sess
  141. @utils.retry_on_error()
  142. def get_status(self, device=None, brief=True):
  143. uri = "%s/api/v1/dev" % (self._base_uri)
  144. if device is not None:
  145. uri = "%s/%s/" % (uri, device)
  146. params = {
  147. "brief": brief,
  148. }
  149. status = self._cli.get(
  150. uri, params=params,
  151. timeout=CONF.replicator.default_requests_timeout)
  152. status.raise_for_status()
  153. return status.json()
  154. @utils.retry_on_error()
  155. def get_chunks(self, device, skip_zeros=False):
  156. uri = "%s/api/v1/dev/%s/chunks/" % (self._base_uri, device)
  157. params = {
  158. "skipZeros": skip_zeros,
  159. }
  160. chunks = self._cli.get(
  161. uri, params=params,
  162. timeout=CONF.replicator.default_requests_timeout)
  163. chunks.raise_for_status()
  164. return chunks.json()
  165. @utils.retry_on_error()
  166. def get_changes(self, device):
  167. uri = "%s/api/v1/dev/%s/chunks/changes/" % (self._base_uri, device)
  168. chunks = self._cli.get(
  169. uri, timeout=CONF.replicator.default_requests_timeout)
  170. chunks.raise_for_status()
  171. return chunks.json()
  172. @utils.retry_on_error()
  173. def get_disk_size(self, disk):
  174. diskUri = self.raw_disk_uri(disk)
  175. info = self._cli.head(
  176. diskUri, timeout=CONF.replicator.default_requests_timeout)
  177. info.raise_for_status()
  178. return int(info.headers["Content-Length"])
  179. @utils.retry_on_error()
  180. def download_chunk(self, disk, chunk):
  181. diskUri = self.raw_disk_uri(disk)
  182. offset = int(chunk["offset"])
  183. end = offset + int(chunk["length"]) - 1
  184. headers = {
  185. "Range": "bytes=%s-%s" % (offset, end),
  186. }
  187. if self._use_compression is False:
  188. headers["Accept-encoding"] = "identity"
  189. data = self._cli.get(
  190. diskUri, headers=headers,
  191. timeout=CONF.replicator.default_requests_timeout)
  192. data.raise_for_status()
  193. return data.content
  194. class Replicator(object):
  195. def __init__(self, conn_info, event_manager, volumes_info, replica_state,
  196. use_compression=None, ignore_mounted=True,
  197. hash_method=HASH_METHOD_SHA256, watch_devices=True,
  198. chunk_size=10485760, use_tunnel=False):
  199. self._event_manager = event_manager
  200. self._repl_state = replica_state
  201. self._conn_info = conn_info
  202. self._config_dir = None
  203. self._cert_dir = tempfile.mkdtemp()
  204. self._volumes_info = volumes_info
  205. self._use_compression = use_compression
  206. if self._use_compression is None:
  207. self._use_compression = CONF.compress_transfers
  208. self._watch_devices = watch_devices
  209. self._hash_method = hash_method
  210. self._ignore_mounted = ignore_mounted
  211. self._chunk_size = chunk_size
  212. self._ssh = self._setup_ssh()
  213. self._credentials = None
  214. self._cli = None
  215. self._use_tunnel = use_tunnel
  216. def __del__(self):
  217. if self._cert_dir is not None:
  218. utils.ignore_exceptions(
  219. shutil.rmtree)(self._cert_dir)
  220. def _init_replicator_client(self, credentials):
  221. """
  222. Helper function to create an instance of the replicator
  223. client.
  224. """
  225. ssh_conn_info = self._parse_source_ssh_conn_info(
  226. self._conn_info)
  227. args = self._parse_replicator_conn_info(
  228. self._conn_info)
  229. self._cli = Client(
  230. args["ip"], args["port"],
  231. credentials, ssh_conn_info,
  232. self._event_manager,
  233. use_compression=self._use_compression,
  234. use_tunnel=self._use_tunnel)
  235. def _setup_ssh(self):
  236. args = self._parse_source_ssh_conn_info(
  237. self._conn_info)
  238. ssh = self._get_ssh_client(args)
  239. return ssh
  240. def _reconnect_ssh(self):
  241. if self._ssh:
  242. utils.ignore_exceptions(self._ssh.close)()
  243. self._ssh = self._setup_ssh()
  244. return self._ssh
  245. def init_replicator(self):
  246. self._credentials = utils.retry_on_error()(
  247. self._setup_replicator)(self._ssh)
  248. utils.retry_on_error()(
  249. self._init_replicator_client)(self._credentials)
  250. LOG.debug(
  251. "Disk status after Replicator initialization: %s",
  252. self._cli.get_status(device=None, brief=True))
  253. def get_current_disks_status(self):
  254. """ Returns a list of the current status of the attached data disks.
  255. The root disk of the worker VM is NOT returned.
  256. The result is a list with elements of the following format:
  257. [{
  258. 'device-path': '/dev/xvdf',
  259. 'device-name': 'xvdf',
  260. 'size': 10737418240,
  261. 'checksum-algorithm': 'sha256',
  262. 'chunk-size': 10485760,
  263. 'logical-sector-size': 512,
  264. 'physical-sector-size': 512,
  265. 'alignment-offset': 0,
  266. 'has-mounted-partitions': False,
  267. 'checksum-status': {
  268. 'status': 'running',
  269. 'total-chunks': 1024,
  270. 'completed-chunks': 0,
  271. 'percentage': 0},
  272. 'partitions': [{
  273. 'name': 'xvdf1',
  274. 'sectors': 20969439,
  275. 'uuid': '73e9659d-2fd9-46ca-a341-5a8637c416ee',
  276. 'fs': 'ext4',
  277. 'start-sector': 2048,
  278. 'end-sector': 20971486,
  279. 'alignment-offset': 0}]
  280. }]
  281. """
  282. return self._cli.get_status()
  283. def attach_new_disk(
  284. self, disk_id, attachf, previous_disks_status=None,
  285. retry_period=30, retry_count=10):
  286. """ Returns the volumes_info for the disk attached by running
  287. `attachf`. This is achieved by comparing the disk statuses before and
  288. after the execution of the attachment operation.
  289. param disk_id: str(): the 'disk_id' of the info from self._volumes_info
  290. for the disk which shall be attached by `attachf()`.
  291. param attachf: fun(): argument-less function which attaches the disk.
  292. The function should perform any waits required to reasonably expect
  293. the worker OS to have noticed the new disk, or reboot the worker VM
  294. and re-run `init_replicator()` if deemed necessary.
  295. param previous_disks_status: dict(): previous status of the disks as
  296. returned by `get_current_disks_status()`.
  297. return: dict(): returns the volumes_info associated to the new disk.
  298. """
  299. # check if volume with given ID is present in self._volumes_info:
  300. matching_vols = [
  301. vol for vol in self._volumes_info
  302. if vol['disk_id'] == disk_id]
  303. if not matching_vols:
  304. raise exception.CoriolisException(
  305. "No information regarding volume with ID '%s'. "
  306. "Cannot track its attachment." % disk_id)
  307. if len(matching_vols) > 1:
  308. raise exception.CoriolisException(
  309. "Multiple volumes info with ID '%s' found: %s" % (
  310. disk_id, matching_vols))
  311. vol_info = matching_vols[0]
  312. # get/refresh current device paths:
  313. if not previous_disks_status:
  314. previous_disks_status = self._cli.get_status()
  315. LOG.debug(
  316. "Disks status pre-attachment of %s: %s",
  317. disk_id, previous_disks_status)
  318. previous_device_paths = [
  319. dev['device-path'] for dev in previous_disks_status]
  320. # run attachment function and get new device paths:
  321. attachf()
  322. # graciously wait for disk to appear:
  323. new_disks_status = None
  324. new_device_paths = None
  325. for i in range(retry_count):
  326. new_disks_status = self._cli.get_status()
  327. new_device_paths = [dev['device-path']
  328. for dev in new_disks_status]
  329. LOG.debug(
  330. "Polled devices while waiting for disk '%s' to attach "
  331. "(try %d/%d): %s", disk_id, i + 1, retry_count,
  332. new_device_paths)
  333. # check for missing/multiple new device paths:
  334. missing_device_paths = (
  335. set(previous_device_paths) - set(new_device_paths))
  336. if missing_device_paths:
  337. LOG.warn(
  338. "The following devices from the previous disk state qeury "
  339. "are no longer detected: %s", [
  340. dev for dev in previous_disks_status
  341. if dev['device-path'] in missing_device_paths])
  342. new_device_paths = set(
  343. new_device_paths) - set(previous_device_paths)
  344. if new_device_paths:
  345. break
  346. else:
  347. LOG.debug(
  348. "Sleeping %d seconds for disk '%s' to get attached.",
  349. retry_period, disk_id)
  350. time.sleep(retry_period)
  351. if not new_device_paths:
  352. raise exception.CoriolisException(
  353. "No new device paths have appeared after volume attachment of "
  354. "disk with ID: %s" % disk_id)
  355. if len(new_device_paths) > 1:
  356. raise exception.CoriolisException(
  357. "Multiple device paths have appeared after attachment of disk "
  358. "with ID %s: %s" % (
  359. disk_id,
  360. [dev for dev in new_disks_status
  361. if dev['device-path'] in new_device_paths]))
  362. # record the new 'disk_path' for the volume:
  363. vol_info['disk_path'] = new_device_paths.pop()
  364. LOG.debug(
  365. "New device following attachment of disk '%s': %s",
  366. disk_id, vol_info['disk_path'])
  367. return vol_info
  368. def wait_for_chunks(self):
  369. if self._cli is None:
  370. raise exception.CoriolisException(
  371. "replicator not initialized. Run init_replicator()")
  372. perc_steps = {}
  373. while True:
  374. status = self._cli.get_status()
  375. done = []
  376. for vol in status:
  377. devName = vol["device-path"]
  378. dev_size = vol['size'] / units.Mi
  379. perc_step = perc_steps.get(devName)
  380. if perc_step is None:
  381. perc_step = self._event_manager.add_percentage_step(
  382. "Performing chunking for disk %s (total size %.2f MB)"
  383. % (devName, dev_size), 100)
  384. perc_steps[devName] = perc_step
  385. perc_done = vol["checksum-status"]["percentage"]
  386. self._event_manager.set_percentage_step(
  387. perc_step, perc_done)
  388. done.append(int(perc_done) == 100)
  389. if all(done):
  390. break
  391. else:
  392. time.sleep(5)
  393. def start(self):
  394. utils.start_service(self._ssh, REPLICATOR_SVC_NAME)
  395. def stop(self):
  396. utils.stop_service(self._ssh, REPLICATOR_SVC_NAME)
  397. def restart(self):
  398. utils.restart_service(self._ssh, REPLICATOR_SVC_NAME)
  399. def update_state(self, state, restart=False):
  400. state_file = tempfile.mkstemp()[1]
  401. with open(state_file, 'w') as fp:
  402. json.dump(state, fp)
  403. self._copy_file(self._ssh, state_file, REPLICATOR_STATE)
  404. if restart:
  405. self.restart()
  406. self._cli._test_connection()
  407. @utils.retry_on_error()
  408. def _get_ssh_client(self, args):
  409. """
  410. gets a paramiko SSH client
  411. """
  412. try:
  413. ssh = paramiko.SSHClient()
  414. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  415. ssh.connect(**args)
  416. return ssh
  417. except paramiko.ssh_exception.SSHException as ex:
  418. raise exception.CoriolisException(
  419. "Failed to setup SSH client: %s" % str(ex)) from ex
  420. def _parse_source_ssh_conn_info(self, conn_info):
  421. # if we get valid SSH connection info we can
  422. # use it to copy the binary, and potentially
  423. # create a SSH tunnel through which we will
  424. # connect to the coriolis replicator
  425. required = ('ip', 'username')
  426. port = conn_info.get('port', 22)
  427. password = conn_info.get('password', None)
  428. pkey = conn_info.get('pkey', None)
  429. missing = [
  430. field for field in required
  431. if field not in conn_info]
  432. if missing:
  433. raise exception.CoriolisException(
  434. "Missing some required fields from source replication "
  435. "worker VM connection info: %s" % missing)
  436. if any([password, pkey]) is False:
  437. raise exception.CoriolisException(
  438. "Either 'password' or 'pkey' for source worker VM is required "
  439. "to initialize the Coriolis replicator.")
  440. if pkey:
  441. if type(pkey) is str:
  442. pkey = utils.deserialize_key(
  443. pkey, CONF.serialization.temp_keypair_password)
  444. args = {
  445. "hostname": conn_info["ip"],
  446. "username": conn_info["username"],
  447. "password": password,
  448. "pkey": pkey,
  449. "port": port,
  450. "banner_timeout": CONF.replicator.default_requests_timeout,
  451. }
  452. return args
  453. def _get_replicator_state_file(self):
  454. """
  455. Looks for replicator state in volumes_info. If none
  456. is found, replicator will be initialized without it.
  457. """
  458. # if state is not present, just return an empty array
  459. # saves us the trouble of an extra if during the setup
  460. # of the replicator process
  461. state = self._repl_state
  462. filename = tempfile.mkstemp()[1]
  463. with open(filename, 'w') as fp:
  464. json.dump(state, fp)
  465. return filename
  466. def _parse_replicator_conn_info(self, conn_info):
  467. # The IP should be the same one as the SSH IP.
  468. # Only the port will differ, and that is configurable.
  469. ip = conn_info.get("ip", None)
  470. return {
  471. "ip": ip,
  472. "port": CONF.replicator.port,
  473. }
  474. @utils.retry_on_error()
  475. def _copy_file(self, ssh, localPath, remotePath):
  476. tmp = tempfile.mktemp(dir="/tmp")
  477. sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
  478. sftp.put(localPath, tmp)
  479. utils.exec_ssh_cmd(
  480. ssh, "sudo mv %s %s" % (tmp, remotePath), get_pty=True)
  481. sftp.close()
  482. @utils.retry_on_error()
  483. def _copy_replicator_cmd(self, ssh):
  484. local_path = os.path.join(
  485. utils.get_resources_bin_dir(), 'replicator')
  486. self._copy_file(ssh, local_path, REPLICATOR_PATH)
  487. utils.exec_ssh_cmd(
  488. ssh, "sudo chmod +x %s" % REPLICATOR_PATH, get_pty=True)
  489. def _setup_replicator_group(self, ssh, group_name=REPLICATOR_GROUP_NAME):
  490. """ Sets up a group with the given name and adds the
  491. user we're connected as to it.
  492. Returns True if the group already existed, else False.
  493. """
  494. group_exists = utils.exec_ssh_cmd(
  495. ssh,
  496. "getent group %(group)s > /dev/null && echo 1 || echo 0" % {
  497. "group": REPLICATOR_GROUP_NAME})
  498. if int(group_exists) == 0:
  499. utils.exec_ssh_cmd(
  500. ssh, "sudo groupadd %s" % group_name, get_pty=True)
  501. # NOTE: this is required in order for the user we connected
  502. # as to be able to read the certs:
  503. # NOTE2: the group change will only take effect after we reconnect:
  504. utils.exec_ssh_cmd(
  505. ssh, "sudo usermod -aG %s %s" % (
  506. REPLICATOR_GROUP_NAME, self._conn_info['username']),
  507. get_pty=True)
  508. return int(group_exists) == 1
  509. def _setup_replicator_user(self, ssh):
  510. # check for and create replicator user:
  511. user_exists = utils.exec_ssh_cmd(
  512. ssh,
  513. "getent passwd %(user)s > /dev/null && echo 1 || echo 0" % {
  514. "user": REPLICATOR_USERNAME})
  515. if int(user_exists) == 0:
  516. utils.exec_ssh_cmd(
  517. ssh, "sudo useradd -m -s /bin/bash -g %s %s" % (
  518. REPLICATOR_GROUP_NAME, REPLICATOR_USERNAME),
  519. get_pty=True)
  520. utils.exec_ssh_cmd(
  521. ssh, "sudo usermod -aG disk %s" % REPLICATOR_USERNAME,
  522. get_pty=True)
  523. @utils.retry_on_error()
  524. def _exec_replicator(self, ssh, port, certs, state_file):
  525. cmdline = ("%(replicator_path)s run -hash-method=%(hash_method)s "
  526. "-ignore-mounted-disks=%(ignore_mounted)s "
  527. "-listen-port=%(listen_port)s "
  528. "-chunk-size=%(chunk_size)s "
  529. "-watch-devices=%(watch_devs)s "
  530. "-state-file=%(state_file)s "
  531. "-ca-cert=%(ca_cert)s -cert=%(srv_cert)s "
  532. "-key=%(srv_key)s" % {
  533. "replicator_path": REPLICATOR_PATH,
  534. "hash_method": self._hash_method,
  535. "ignore_mounted": json.dumps(self._ignore_mounted),
  536. "watch_devs": json.dumps(self._watch_devices),
  537. "listen_port": str(port),
  538. "state_file": state_file,
  539. "chunk_size": self._chunk_size,
  540. "ca_cert": certs["ca_crt"],
  541. "srv_cert": certs["srv_crt"],
  542. "srv_key": certs["srv_key"],
  543. })
  544. utils.create_service(
  545. ssh, cmdline, REPLICATOR_SVC_NAME,
  546. run_as=REPLICATOR_USERNAME)
  547. def _fetch_remote_file(self, ssh, remote_file, local_file):
  548. # TODO(gsamfira): make this re-usable
  549. with open(local_file, 'wb') as fd:
  550. data = utils.retry_on_error()(
  551. utils.read_ssh_file)(ssh, remote_file)
  552. fd.write(data)
  553. @utils.retry_on_error(sleep_seconds=5)
  554. def _setup_certificates(self, ssh, args):
  555. # TODO(gsamfira): coriolis-replicator and coriolis-writer share
  556. # the functionality of being able to generate certificates
  557. # This will either be replaced with proper certificate management
  558. # in Coriolis, and the needed files will be pushed to the services
  559. # that need them (userdata or ssh), or the two applications will be
  560. # merged into one, and we will deduplicate this functionallity.
  561. remote_base_dir = REPLICATOR_DIR
  562. ip = args["ip"]
  563. ca_crt_name = "ca-cert.pem"
  564. client_crt_name = "client-cert.pem"
  565. client_key_name = "client-key.pem"
  566. srv_crt_name = "srv-cert.pem"
  567. srv_key_name = "srv-key.pem"
  568. remote_ca_crt = os.path.join(remote_base_dir, ca_crt_name)
  569. remote_client_crt = os.path.join(remote_base_dir, client_crt_name)
  570. remote_client_key = os.path.join(remote_base_dir, client_key_name)
  571. remote_srv_crt = os.path.join(remote_base_dir, srv_crt_name)
  572. remote_srv_key = os.path.join(remote_base_dir, srv_key_name)
  573. ca_crt = os.path.join(self._cert_dir, ca_crt_name)
  574. client_crt = os.path.join(self._cert_dir, client_crt_name)
  575. client_key = os.path.join(self._cert_dir, client_key_name)
  576. exist = []
  577. for i in (remote_ca_crt, remote_client_crt, remote_client_key,
  578. remote_srv_crt, remote_srv_key):
  579. exist.append(utils.test_ssh_path(ssh, i))
  580. force_fetch = False
  581. if not all(exist):
  582. utils.exec_ssh_cmd(
  583. ssh, "sudo mkdir -p %s" % remote_base_dir, get_pty=True)
  584. utils.exec_ssh_cmd(
  585. ssh,
  586. "sudo %(replicator_cmd)s gen-certs -output-dir "
  587. "%(cert_dir)s -certificate-hosts 127.0.0.1,%(extra_hosts)s" % {
  588. "replicator_cmd": REPLICATOR_PATH,
  589. "cert_dir": remote_base_dir,
  590. "extra_hosts": ip,
  591. },
  592. get_pty=True)
  593. utils.exec_ssh_cmd(
  594. ssh, "sudo chown -R %(user)s:%(group)s %(cert_dir)s" % {
  595. "cert_dir": remote_base_dir,
  596. "user": REPLICATOR_USERNAME,
  597. "group": REPLICATOR_GROUP_NAME
  598. }, get_pty=True)
  599. utils.exec_ssh_cmd(
  600. ssh, "sudo chmod -R g+r %(cert_dir)s" % {
  601. "cert_dir": remote_base_dir,
  602. }, get_pty=True)
  603. force_fetch = True
  604. exists = []
  605. for i in (ca_crt, client_crt, client_key):
  606. exists.append(os.path.isfile(i))
  607. if not all(exists) or force_fetch:
  608. # certificates either are missing, or have been regenerated
  609. # on the writer worker. We need to fetch them.
  610. self._fetch_remote_file(ssh, remote_ca_crt, ca_crt)
  611. self._fetch_remote_file(ssh, remote_client_crt, client_crt)
  612. self._fetch_remote_file(ssh, remote_client_key, client_key)
  613. return {
  614. "local": {
  615. "client_cert": client_crt,
  616. "client_key": client_key,
  617. "ca_cert": ca_crt,
  618. },
  619. "remote": {
  620. "srv_crt": remote_srv_crt,
  621. "srv_key": remote_srv_key,
  622. "ca_crt": remote_ca_crt,
  623. },
  624. }
  625. def _change_binary_se_context(self, ssh):
  626. cmd = "sudo chcon -t bin_t %s" % REPLICATOR_PATH
  627. try:
  628. utils.exec_ssh_cmd(ssh, cmd, get_pty=True)
  629. except exception.CoriolisException:
  630. LOG.warn("Could not change SELinux context of replicator binary. "
  631. "Error was:%s", utils.get_exception_details())
  632. @utils.retry_on_error()
  633. def _setup_replicator(self, ssh):
  634. # copy the binary, set up the service, generate certificates,
  635. # start service
  636. state_file = self._get_replicator_state_file()
  637. self._copy_file(ssh, state_file, REPLICATOR_STATE)
  638. utils.exec_ssh_cmd(
  639. ssh, "sudo chmod 755 %s" % REPLICATOR_STATE, get_pty=True)
  640. os.remove(state_file)
  641. args = self._parse_replicator_conn_info(self._conn_info)
  642. self._copy_replicator_cmd(ssh)
  643. self._change_binary_se_context(ssh)
  644. group_existed = self._setup_replicator_group(
  645. ssh, group_name=REPLICATOR_GROUP_NAME)
  646. if not group_existed:
  647. # NOTE: we must reconnect so that our user being added to the new
  648. # Replicator group can take effect:
  649. ssh = self._reconnect_ssh()
  650. self._setup_replicator_user(ssh)
  651. certs = self._setup_certificates(ssh, args)
  652. self._exec_replicator(
  653. ssh, args["port"], certs["remote"], REPLICATOR_STATE)
  654. self.start()
  655. return certs["local"]
  656. def _get_size_from_chunks(self, chunks):
  657. ret = 0
  658. for chunk in chunks:
  659. ret += chunk["length"]
  660. return ret / units.Mi
  661. def _find_vol_state(self, name, state):
  662. for vol in state:
  663. if vol["device-name"] == name:
  664. return vol
  665. return None
  666. def replicate_disks(self, source_volumes_info, backup_writer):
  667. """
  668. Fetch the block diff and send it to the backup_writer.
  669. If the target_is_zeroed parameter is set to True, on initial
  670. sync, zero blocks will be skipped. On subsequent syncs, even
  671. zero blocks will be synced, as we cannot tell if those zeros
  672. are part of a file or not.
  673. source_volumes_info should be of the following format:
  674. [
  675. {
  676. "disk_id": the_provider_ID_of_the_volume,
  677. "disk_path": /dev/sdb,
  678. },
  679. ]
  680. """
  681. LOG.warning("Source volumes info is: %r" % source_volumes_info)
  682. state = self._repl_state
  683. isInitial = False
  684. if state is None or len(state) == 0:
  685. isInitial = True
  686. curr_state = self._cli.get_status(brief=False)
  687. for volume in source_volumes_info:
  688. dst_vol_idx = None
  689. for idx, vol in enumerate(self._volumes_info):
  690. if vol["disk_id"] == volume["disk_id"]:
  691. dst_vol_idx = idx
  692. break
  693. if dst_vol_idx is None:
  694. raise exception.CoriolisException(
  695. "failed to find a coresponding volume in volumes_info"
  696. " for %s" % volume["disk_id"])
  697. dst_vol = self._volumes_info[dst_vol_idx]
  698. devName = volume["disk_path"]
  699. if devName.startswith('/dev'):
  700. devName = devName[5:]
  701. state_for_vol = self._find_vol_state(devName, curr_state)
  702. if isInitial and dst_vol.get("zeroed", False) is True:
  703. # This is an initial sync of the disk, and we can
  704. # skip zero chunks
  705. chunks = self._cli.get_chunks(
  706. devName, skip_zeros=True)
  707. else:
  708. # subsequent sync. Get changes.
  709. chunks = self._cli.get_changes(devName)
  710. if not chunks:
  711. self._event_manager.progress_update(
  712. "No new chunks to replicate for disk \"%s\" (%s)" % (
  713. volume['disk_id'], devName))
  714. self._repl_state = curr_state
  715. return self._repl_state
  716. size = self._get_size_from_chunks(chunks)
  717. msg = (
  718. "Replicating changed data for disk \"%s\" (device \"%s\", "
  719. "written chunks: %.2f MB)") % (
  720. volume["disk_id"], devName, size)
  721. perc_step = self._event_manager.add_percentage_step(
  722. msg, len(chunks))
  723. total = 0
  724. with backup_writer.open("", volume['disk_id']) as destination:
  725. for chunk in chunks:
  726. offset = int(chunk["offset"])
  727. destination.seek(offset)
  728. data = self._cli.download_chunk(devName, chunk)
  729. destination.write(data)
  730. total += 1
  731. self._event_manager.set_percentage_step(
  732. perc_step, total)
  733. dst_vol["replica_state"] = state_for_vol
  734. self._repl_state = curr_state
  735. return self._repl_state
  736. def _download_full_disk(self, disk, path):
  737. self._event_manager.progress_update(
  738. "Downloading %s as thick file" % disk)
  739. diskUri = self._cli.raw_disk_uri(disk)
  740. size = self._cli.get_disk_size(disk)
  741. perc_step = self._event_manager.add_percentage_step(
  742. "Downloading full disk /dev/%s" % disk, size)
  743. total = 0
  744. with self._cli._cli.get(diskUri, stream=True,
  745. timeout=(CONF.replicator.
  746. default_requests_timeout)) as dw:
  747. with open(path, 'wb') as dsk:
  748. for chunk in dw.iter_content(chunk_size=self._chunk_size):
  749. if chunk:
  750. writen = dsk.write(chunk)
  751. total += writen
  752. self._event_manager.set_percentage_step(
  753. perc_step, total)
  754. def _download_sparse_disk(self, disk, path, chunks):
  755. self._event_manager.progress_update(
  756. "Downloading %s as sparse file" % disk)
  757. size = self._cli.get_disk_size(disk)
  758. size_from_chunks = self._get_size_from_chunks(chunks)
  759. total = 0
  760. with open(path, 'wb') as fp:
  761. # create sparse file
  762. fp.truncate(size)
  763. perc_step = self._event_manager.add_percentage_step(
  764. "Downloading spart disk /dev/%s (%s MB)" % (
  765. disk, size_from_chunks), len(chunks))
  766. for chunk in chunks:
  767. offset = int(chunk["offset"])
  768. # seek to offset
  769. fp.seek(offset)
  770. data = self._cli.download_chunk(disk, chunk)
  771. fp.write(data)
  772. total += 1
  773. self._event_manager.set_percentage_step(
  774. perc_step, total)
  775. def download_disk(self, disk, path):
  776. """
  777. Download the disk from source, into path. If client calls
  778. wait_for_chunks() before executing this function, a sparse
  779. file will be created with written chunks, otherwise, the result
  780. will be a thickly provisioned disk file.
  781. """
  782. diskName = disk
  783. if diskName.startswith("/dev"):
  784. # just get the name
  785. diskName = diskName[5:]
  786. chunks = self._cli.get_chunks(
  787. device=diskName, skip_zeros=True)
  788. if len(chunks) == 0:
  789. self._download_full_disk(diskName, path)
  790. else:
  791. self._download_sparse_disk(diskName, path, chunks)