Просмотр исходного кода

broken: Started implementation of websocket for getting all job runs

jnfrati 4 лет назад
Родитель
Сommit
8a07d22c8c
2 измененных файлов с 134 добавлено и 80 удалено
  1. 134 65
      dashboard/src/main/home/cluster-dashboard/chart/ChartList.tsx
  2. 0 15
      dashboard/src/shared/api.tsx

+ 134 - 65
dashboard/src/main/home/cluster-dashboard/chart/ChartList.tsx

@@ -1,4 +1,4 @@
-import React, { useContext, useEffect, useMemo, useState } from "react";
+import React, { useContext, useEffect, useMemo, useRef, useState } from "react";
 import styled from "styled-components";
 import styled from "styled-components";
 import _, { differenceBy, differenceWith, isEqual } from "lodash";
 import _, { differenceBy, differenceWith, isEqual } from "lodash";
 
 
@@ -15,7 +15,7 @@ import { PorterUrl } from "shared/routing";
 
 
 import Chart from "./Chart";
 import Chart from "./Chart";
 import Loading from "components/Loading";
 import Loading from "components/Loading";
-import { useWebsockets } from "shared/hooks/useWebsockets";
+import { NewWebsocketOptions, useWebsockets } from "shared/hooks/useWebsockets";
 import { usePrevious } from "shared/hooks/usePrevious";
 import { usePrevious } from "shared/hooks/usePrevious";
 
 
 type Props = {
 type Props = {
@@ -48,6 +48,8 @@ const ChartList: React.FunctionComponent<Props> = ({
     closeAllWebsockets,
     closeAllWebsockets,
   } = useWebsockets();
   } = useWebsockets();
   const [charts, setCharts] = useState<ChartType[]>([]);
   const [charts, setCharts] = useState<ChartType[]>([]);
+  const tmpJobRuns = useRef([]);
+  const lastStreamStatus = useRef("");
 
 
   const previousCharts = usePrevious<ChartType[]>(charts, []);
   const previousCharts = usePrevious<ChartType[]>(charts, []);
 
 
@@ -276,64 +278,130 @@ const ChartList: React.FunctionComponent<Props> = ({
     openWebsocket(websocketID);
     openWebsocket(websocketID);
   };
   };
 
 
-  const getLatestJobRunFromRelease = async (chart: ChartType) => {
-    try {
-      const job = await api
-        .getLatestJobRunFromRelease(
-          "<token>",
-          {},
-          {
-            project_id: context.currentProject.id,
-            cluster_id: context.currentCluster.id,
-            namespace: chart.namespace,
-            release_name: chart.name,
-          }
-        )
-        .then((res) => res.data);
-
-      const chartName = job.metadata.labels["app.kubernetes.io/instance"];
-      const chartNamespace = job.metadata.namespace;
-      const key = getChartKey(chartName, chartNamespace);
-
-      setJobStatus((currentStatus) => {
-        let nextStatus: JobStatusType = null;
-        for (const status of Object.values(JobStatusType)) {
-          if (_.get(job.status, status, 0) > 0) {
-            nextStatus = status;
-            break;
-          }
+  const getJobRuns = () => {
+    const { currentProject, currentCluster, setCurrentError } = context;
+    closeAllWebsockets();
+    tmpJobRuns.current = [];
+    lastStreamStatus.current = "";
+    const websocketId = `job-runs-for-all-charts-ws`;
+    const endpoint = `/api/projects/${currentProject.id}/clusters/${currentCluster.id}/namespaces/${namespace}/jobs/stream`;
+
+    const config: NewWebsocketOptions = {
+      onopen: console.log,
+      onmessage: (message) => {
+        const data = JSON.parse(message.data);
+
+        if (data.streamStatus === "finished") {
+          const processedJobs = processLastJobs(tmpJobRuns.current);
+          setJobStatus(processedJobs);
+          setupJobWebsocket("jobs-live-update");
+          lastStreamStatus.current = data.streamStatus;
+          return;
         }
         }
 
 
-        const existingValue: JobStatusWithTimeAndVersion = _.get(
-          currentStatus,
-          key,
-          null
-        );
-        const newValue: JobStatusWithTimeAndVersion = {
-          status: nextStatus,
-          start_time: job.status.startTime,
-          resource_version: job.metadata.resourceVersion,
-        };
-
-        if (
-          !existingValue ||
-          Number(newValue.resource_version) >
-            Number(existingValue.resource_version)
-        ) {
-          return {
-            ...currentStatus,
-            [key]: newValue,
-          };
+        if (data.streamStatus === "errored") {
+          tmpJobRuns.current = [];
+          return;
         }
         }
 
 
-        return currentStatus;
-      });
-    } catch (error) {
-      console.log("failed");
-    } finally {
-    }
+        tmpJobRuns.current = [...tmpJobRuns.current, data];
+      },
+      onclose: (event) => {
+        console.log(event);
+        closeAllWebsockets();
+      },
+      onerror: (error) => {
+        setCurrentError(
+          "We couldn't get the latest status for the jobs, try reolading or opening a job to see their status."
+        );
+        console.log(error);
+        closeAllWebsockets();
+      },
+    };
+    newWebsocket(websocketId, endpoint, config);
+    openWebsocket(websocketId);
   };
   };
 
 
+  const processLastJobs = (jobRuns: any[]) => {
+    const jobs = charts
+      .filter((chart) => chart.chart?.metadata?.name === "job")
+      .reduce<{ [key: string]: JobStatusWithTimeAndVersion }>((acc, chart) => {
+        const chartName = chart.name;
+        acc[chartName] = jobRuns
+          .filter(
+            (job) =>
+              job.metadata.labels["app.kubernetes.io/instance"] === chartName
+          )
+          .map((job) => {
+            let nextStatus: JobStatusType = null;
+            for (const status of Object.values(JobStatusType)) {
+              if (_.get(job.status, status, 0) > 0) {
+                nextStatus = status;
+                break;
+              }
+            }
+
+            return {
+              resource_version: job.metadata.resourceVersion,
+              start_time: job.status.startTime,
+              status: nextStatus,
+            };
+          })
+          .sort(
+            (prev, next) =>
+              Number(next.resource_version) - Number(prev.resource_version)
+          )
+          .slice(1)[0];
+        return acc;
+      }, {});
+    return jobs;
+  };
+
+  // const getLatestJobRunFromRelease = async (chart: ChartType) => {
+  //   try {
+  //     const chartName = job.metadata.labels["app.kubernetes.io/instance"];
+  //     const chartNamespace = job.metadata.namespace;
+  //     const key = getChartKey(chartName, chartNamespace);
+
+  //     setJobStatus((currentStatus) => {
+  //       let nextStatus: JobStatusType = null;
+  //       for (const status of Object.values(JobStatusType)) {
+  //         if (_.get(job.status, status, 0) > 0) {
+  //           nextStatus = status;
+  //           break;
+  //         }
+  //       }
+
+  //       const existingValue: JobStatusWithTimeAndVersion = _.get(
+  //         currentStatus,
+  //         key,
+  //         null
+  //       );
+  //       const newValue: JobStatusWithTimeAndVersion = {
+  //         status: nextStatus,
+  //         start_time: job.status.startTime,
+  //         resource_version: job.metadata.resourceVersion,
+  //       };
+
+  //       if (
+  //         !existingValue ||
+  //         Number(newValue.resource_version) >
+  //           Number(existingValue.resource_version)
+  //       ) {
+  //         return {
+  //           ...currentStatus,
+  //           [key]: newValue,
+  //         };
+  //       }
+
+  //       return currentStatus;
+  //     });
+  //   } catch (error) {
+  //     console.log("failed");
+  //   } finally {
+  //   }
+  // };
+
   useEffect(() => {
   useEffect(() => {
     if (!charts?.length) {
     if (!charts?.length) {
       return () => {};
       return () => {};
@@ -348,22 +416,23 @@ const ChartList: React.FunctionComponent<Props> = ({
       return () => {};
       return () => {};
     }
     }
 
 
-    const jobCharts = changedCharts.filter(
-      (chart) => chart.chart.metadata.name === "job"
-    );
+    getJobRuns();
+    // const jobCharts = changedCharts.filter(
+    //   (chart) => chart.chart.metadata.name === "job"
+    // );
 
 
-    const promises = jobCharts.map((chart) => {
-      getLatestJobRunFromRelease(chart);
-    });
+    // const promises = jobCharts.map((chart) => {
+    //   getLatestJobRunFromRelease(chart);
+    // });
 
 
-    const jobWebsocketID = "job";
+    // const jobWebsocketID = "job";
 
 
-    Promise.all(promises).then(() => {
-      setupJobWebsocket(jobWebsocketID);
-    });
+    // Promise.all(promises).then(() => {
+    //   setupJobWebsocket(jobWebsocketID);
+    // });
 
 
     return () => {
     return () => {
-      closeWebsocket(jobWebsocketID);
+      // closeWebsocket(jobWebsocketID);
     };
     };
   }, [charts]);
   }, [charts]);
 
 

+ 0 - 15
dashboard/src/shared/api.tsx

@@ -1588,20 +1588,6 @@ const getPreviousLogsForContainer = baseApi<
     `/api/projects/${project_id}/clusters/${cluster_id}/namespaces/${namespace}/pod/${name}/previous_logs`
     `/api/projects/${project_id}/clusters/${cluster_id}/namespaces/${namespace}/pod/${name}/previous_logs`
 );
 );
 
 
-const getLatestJobRunFromRelease = baseApi<
-  {},
-  {
-    project_id: number;
-    cluster_id: number;
-    namespace: string;
-    release_name: string;
-  }
->(
-  "GET",
-  ({ project_id, cluster_id, namespace, release_name }) =>
-    `/api/projects/${project_id}/clusters/${cluster_id}/namespaces/${namespace}/releases/${release_name}/0/latest_job_run`
-);
-
 // Bundle export to allow default api import (api.<method> is more readable)
 // Bundle export to allow default api import (api.<method> is more readable)
 export default {
 export default {
   checkAuth,
   checkAuth,
@@ -1755,5 +1741,4 @@ export default {
   provisionDatabase,
   provisionDatabase,
   getDatabases,
   getDatabases,
   getPreviousLogsForContainer,
   getPreviousLogsForContainer,
-  getLatestJobRunFromRelease,
 };
 };