|
|
@@ -133,76 +133,6 @@ class WorkerServerEndpoint(object):
|
|
|
self._rpc_conductor_client.confirm_task_cancellation(
|
|
|
ctxt, task_id, msg)
|
|
|
|
|
|
- def _deploy_after_execution(
|
|
|
- self, ctxt, deployment, transfer_id, execution_id, **kwargs):
|
|
|
- ctxt.trust_id = None
|
|
|
- active_statuses = [
|
|
|
- constants.EXECUTION_STATUS_UNEXECUTED,
|
|
|
- constants.EXECUTION_STATUS_RUNNING,
|
|
|
- constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS
|
|
|
- ]
|
|
|
- error_statuses = [
|
|
|
- constants.EXECUTION_STATUS_ERROR,
|
|
|
- constants.EXECUTION_STATUS_DEADLOCKED,
|
|
|
- constants.EXECUTION_STATUS_CANCELED,
|
|
|
- constants.EXECUTION_STATUS_CANCELLING,
|
|
|
- constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
|
|
|
- constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS,
|
|
|
- ]
|
|
|
- LOG.debug(f"Waiting for deployer '{execution_id}' to complete.")
|
|
|
- while True:
|
|
|
- execution = (
|
|
|
- self._rpc_conductor_client.get_transfer_tasks_execution(
|
|
|
- ctxt, transfer_id, execution_id))
|
|
|
- ex_status = execution['status']
|
|
|
- LOG.debug(
|
|
|
- f"Deployer '{execution_id}' status is {ex_status}")
|
|
|
- if ex_status in active_statuses:
|
|
|
- time.sleep(5)
|
|
|
- elif ex_status == constants.EXECUTION_STATUS_COMPLETED:
|
|
|
- LOG.debug(f"Confirming deployer '{execution_id}' completed.")
|
|
|
- return self._rpc_conductor_client.confirm_deployer_completed(
|
|
|
- ctxt, deployment['id'], **kwargs)
|
|
|
- else:
|
|
|
- if ex_status in error_statuses:
|
|
|
- raise exception.InvalidTransferState(
|
|
|
- f"Got status '{ex_status}' for execution with "
|
|
|
- "ID '{execution.id}'. Deployment cannot occur.")
|
|
|
- else:
|
|
|
- raise exception.InvalidTransferState(
|
|
|
- f"Unknown status '{ex_status}' of execution "
|
|
|
- f"with ID '{execution['id']}'. Deployment cannot "
|
|
|
- f"occur.")
|
|
|
-
|
|
|
- def _execute_auto_deployment(
|
|
|
- self, ctxt, transfer_id, deployer_execution, **kwargs):
|
|
|
- deployment = None
|
|
|
- try:
|
|
|
- LOG.debug(
|
|
|
- f"Creating deployment for deployer ID '{deployer_execution}' "
|
|
|
- f"of transfer '{transfer_id}'")
|
|
|
- deployment = self._rpc_conductor_client.deploy_transfer_instances(
|
|
|
- ctxt, transfer_id, wait_for_execution=deployer_execution,
|
|
|
- **kwargs)
|
|
|
- self._deploy_after_execution(
|
|
|
- ctxt, deployment, transfer_id, deployer_execution, **kwargs)
|
|
|
- except BaseException as ex:
|
|
|
- LOG.warning(
|
|
|
- f"Error occurred while waiting for deployer "
|
|
|
- f"'{deployer_execution}'. Error was: "
|
|
|
- f"{utils.get_exception_details()}")
|
|
|
- if deployment:
|
|
|
- LOG.debug(
|
|
|
- f"Reporting deployer '{deployer_execution}' failure")
|
|
|
- return self._rpc_conductor_client.report_deployer_failure(
|
|
|
- ctxt, deployment['id'], str(ex))
|
|
|
-
|
|
|
- def execute_auto_deployment(
|
|
|
- self, ctxt, transfer_id, deployer_execution, **kwargs):
|
|
|
- eventlet.spawn(
|
|
|
- self._execute_auto_deployment, ctxt, transfer_id,
|
|
|
- deployer_execution, **kwargs)
|
|
|
-
|
|
|
def _handle_mp_log_events(self, p, mp_log_q):
|
|
|
while True:
|
|
|
try:
|