/* Copyright (C) 2017 Cloudbase Solutions SRL This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ // @flow import moment from 'moment' import Api from '../utils/ApiCaller' import { OptionsSchemaPlugin } from '../plugins/endpoint' import { servicesUrl } from '../constants' import type { MainItem, UpdateData } from '../types/MainItem' import type { Execution } from '../types/Execution' import type { Endpoint } from '../types/Endpoint' import type { Task, ProgressUpdate } from '../types/Task' import type { Field } from '../types/Field' export const sortTasks = (tasks: Task[], taskUpdatesSortFunction: (updates: ProgressUpdate[]) => void) => { if (!tasks) { return } let sortedTasks = [] let buffer = [] let runningBuffer = [] let completedBuffer = [] tasks.forEach(task => { taskUpdatesSortFunction(task.progress_updates) buffer.push(task) if (task.status === 'RUNNING') { runningBuffer.push(task) } else if (task.status === 'COMPLETED' || task.status === 'ERROR') { completedBuffer.push(task) } else { if (runningBuffer.length >= 2) { sortedTasks = sortedTasks.concat([...completedBuffer, ...runningBuffer, task]) } else { sortedTasks = sortedTasks.concat([...buffer]) } buffer = [] runningBuffer = [] completedBuffer = [] } }) if (buffer.length) { if (runningBuffer.length >= 2) { sortedTasks = sortedTasks.concat([...completedBuffer, ...runningBuffer]) } else { sortedTasks = sortedTasks.concat([...buffer]) } } tasks.splice(0, tasks.length, ...sortedTasks) } class ReplicaSourceUtils { static filterDeletedExecutionsInReplicas(replicas) { return replicas.map(replica => { replica.executions = ReplicaSourceUtils.filterDeletedExecutions(replica.executions) return replica }) } static filterDeletedExecutions(executions) { if (!executions || !executions.length) { return executions } return executions.filter(execution => execution.deleted_at == null) } static sortReplicas(replicas) { if (replicas.length === 1) { ReplicaSourceUtils.sortExecutions(replicas[0].executions) return } replicas.sort((a, b) => { ReplicaSourceUtils.sortExecutions(a.executions) ReplicaSourceUtils.sortExecutions(b.executions) let aLastExecution = a.executions && a.executions.length ? a.executions[a.executions.length - 1] : null let bLastExecution = b.executions && b.executions.length ? b.executions[b.executions.length - 1] : null let aLastTime = aLastExecution ? aLastExecution.updated_at || aLastExecution.created_at : null let bLastTime = bLastExecution ? bLastExecution.updated_at || bLastExecution.created_at : null let aTime = aLastTime || a.updated_at || a.created_at let bTime = bLastTime || b.updated_at || b.created_at return moment(bTime).diff(moment(aTime)) }) } static sortExecutions(executions) { if (executions) { executions.sort((a, b) => a.number - b.number) } } static sortExecutionsAndTasks(executions) { this.sortExecutions(executions) executions.forEach(execution => { sortTasks(execution.tasks, ReplicaSourceUtils.sortTaskUpdates) }) } static sortTaskUpdates(updates) { if (!updates) { return } updates.sort((a, b) => moment(a.created_at).toDate().getTime() - moment(b.created_at).toDate().getTime()) } } class ReplicaSource { async getReplicas(skipLog?: boolean): Promise { let response = await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/detail`, skipLog, }) let replicas = response.data.replicas replicas = ReplicaSourceUtils.filterDeletedExecutionsInReplicas(replicas) ReplicaSourceUtils.sortReplicas(replicas) return replicas } async getReplicaExecutions(replicaId: string, skipLog?: boolean): Promise { let response = await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}/executions/detail`, skipLog, }) let executions = response.data.executions ReplicaSourceUtils.sortExecutionsAndTasks(executions) return executions } async getReplica(replicaId: string, skipLog?: boolean): Promise { let response = await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}`, skipLog, }) let replica = response.data.replica replica.executions = ReplicaSourceUtils.filterDeletedExecutions(replica.executions) ReplicaSourceUtils.sortExecutions(replica.executions) return replica } async execute(replicaId: string, fields?: Field[]): Promise { let payload = { execution: { shutdown_instances: false } } if (fields) { fields.forEach(f => { payload.execution[f.name] = f.value || false }) } let response = await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}/executions`, method: 'POST', data: payload, }) let execution = response.data.execution sortTasks(execution.tasks, ReplicaSourceUtils.sortTaskUpdates) return execution } async cancelExecution(replicaId: string, executionId: string, force: ?boolean): Promise { let data: any = { cancel: null } if (force) { data.cancel = { force: true } } await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}/executions/${executionId}/actions`, method: 'POST', data, }) return replicaId } async deleteExecution(replicaId: string, executionId: string): Promise { await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}/executions/${executionId}`, method: 'DELETE', }) return replicaId } async delete(replicaId: string): Promise { await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}`, method: 'DELETE', }) return replicaId } async deleteDisks(replicaId: string): Promise { let response = await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replicaId}/actions`, method: 'POST', data: { 'delete-disks': null }, }) return response.data.execution } async update(replica: MainItem, destinationEndpoint: Endpoint, updateData: UpdateData, defaultStorage: ?string, storageConfigDefault: string): Promise { const parser = OptionsSchemaPlugin[destinationEndpoint.type] || OptionsSchemaPlugin.default let payload = { replica: {} } if (updateData.network.length > 0) { payload.replica.network_map = parser.getNetworkMap(updateData.network) } if (Object.keys(updateData.destination).length > 0) { payload.replica.destination_environment = parser.getDestinationEnv(updateData.destination, replica.destination_environment) } if (Object.keys(updateData.source).length > 0) { payload.replica.source_environment = parser.getDestinationEnv(updateData.source, replica.source_environment) } if (defaultStorage || updateData.storage.length > 0) { payload.replica.storage_mappings = parser.getStorageMap(defaultStorage, updateData.storage, storageConfigDefault) } let response = await Api.send({ url: `${servicesUrl.coriolis}/${Api.projectId}/replicas/${replica.id}`, method: 'PUT', data: payload, }) return response.data } } export default new ReplicaSource()