|
|
@@ -1,27 +1,49 @@
|
|
|
-import Anser from "anser";
|
|
|
-import { flatMap } from "lodash";
|
|
|
-import { useContext, useEffect, useMemo, useRef, useState } from "react";
|
|
|
+import Anser, { AnserJsonEntry } from "anser";
|
|
|
+import dayjs from "dayjs";
|
|
|
+import _ from "lodash";
|
|
|
+import { useContext, useEffect, useRef, useState } from "react";
|
|
|
import api from "shared/api";
|
|
|
import { Context } from "shared/Context";
|
|
|
import { useWebsockets, NewWebsocketOptions } from "shared/hooks/useWebsockets";
|
|
|
|
|
|
-const MAX_LOGS = 250;
|
|
|
+const MAX_LOGS = 5000;
|
|
|
+const MAX_BUFFER_LOGS = 1000;
|
|
|
+
|
|
|
+export enum Direction {
|
|
|
+ forward = "forward",
|
|
|
+ backward = "backward",
|
|
|
+}
|
|
|
+
|
|
|
+interface Log {
|
|
|
+ line: AnserJsonEntry[];
|
|
|
+ lineNumber: number;
|
|
|
+ timestamp: string;
|
|
|
+}
|
|
|
+
|
|
|
+const parseLogs = (logs: string[] = []): AnserJsonEntry[][] => {
|
|
|
+ return logs.filter(Boolean).map((logLine: string) => {
|
|
|
+ const parsedLine = JSON.parse(logLine);
|
|
|
+ // TODO Move log parsing to the render method
|
|
|
+ const ansiLog = Anser.ansiToJson(parsedLine.log);
|
|
|
+ return ansiLog;
|
|
|
+ });
|
|
|
+};
|
|
|
|
|
|
export const useLogs = (
|
|
|
currentPod: string,
|
|
|
namespace: string,
|
|
|
searchParam: string,
|
|
|
// if setDate is set, results are not live
|
|
|
- setDate: Date
|
|
|
+ setDate?: Date
|
|
|
) => {
|
|
|
- var d = new Date();
|
|
|
- d.setDate(d.getDate() - 14);
|
|
|
+ const d = dayjs().subtract(14, "days");
|
|
|
|
|
|
const isLive = !setDate;
|
|
|
+ const logsBufferRef = useRef<AnserJsonEntry[][]>([]);
|
|
|
const { currentCluster, currentProject } = useContext(Context);
|
|
|
- const [logs, setLogs] = useState<Anser.AnserJsonEntry[][]>([]);
|
|
|
- const [startDate, setStartDate] = useState<Date>(d);
|
|
|
- const [endDate, setEndDate] = useState<Date>(setDate || new Date());
|
|
|
+ const [logs, setLogs] = useState<AnserJsonEntry[][]>([]);
|
|
|
+ const [startDate, setStartDate] = useState<dayjs.Dayjs>(d);
|
|
|
+ const [endDate, setEndDate] = useState<dayjs.Dayjs>(dayjs(setDate));
|
|
|
|
|
|
// if we are live:
|
|
|
// - start date is initially set to 2 weeks ago
|
|
|
@@ -41,16 +63,40 @@ export const useLogs = (
|
|
|
closeAllWebsockets,
|
|
|
} = useWebsockets();
|
|
|
|
|
|
- useEffect(() => {
|
|
|
- return refresh();
|
|
|
- }, [currentPod, namespace, searchParam, setDate]);
|
|
|
+ const updateLogs = (newLogs: AnserJsonEntry[][]) => {
|
|
|
+ setLogs((logs) => {
|
|
|
+ let currentLogs = logs;
|
|
|
+ currentLogs.push(...newLogs);
|
|
|
+ if (logs.length > MAX_LOGS) {
|
|
|
+ const logsToBeRemoved =
|
|
|
+ newLogs.length < MAX_BUFFER_LOGS ? newLogs.length : MAX_BUFFER_LOGS;
|
|
|
+ currentLogs = currentLogs.slice(logsToBeRemoved);
|
|
|
+ }
|
|
|
|
|
|
- useEffect(() => {
|
|
|
- // if the streaming is no longer live, close all websockets
|
|
|
- if (!isLive) {
|
|
|
- closeAllWebsockets();
|
|
|
+ return logs;
|
|
|
+ });
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Flushes the logs buffer. If `discard` is true,
|
|
|
+ * it will update `current logs` before executing
|
|
|
+ * the flush operation
|
|
|
+ */
|
|
|
+ const flushLogsBuffer = (discard: boolean = false) => {
|
|
|
+ if (!discard) {
|
|
|
+ updateLogs(logsBufferRef.current ?? []);
|
|
|
}
|
|
|
- }, [isLive]);
|
|
|
+
|
|
|
+ logsBufferRef.current = [];
|
|
|
+ };
|
|
|
+
|
|
|
+ const pushLogs = (newLogs: AnserJsonEntry[][]) => {
|
|
|
+ logsBufferRef.current.push(...newLogs);
|
|
|
+
|
|
|
+ if (logsBufferRef.current.length > MAX_BUFFER_LOGS) {
|
|
|
+ flushLogsBuffer();
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
const setupWebsocket = (websocketKey: string) => {
|
|
|
const endpoint = `/api/projects/${currentProject.id}/clusters/${currentCluster.id}/namespaces/${namespace}/logs/loki?pod_selector=${currentPod}&namespace=${namespace}&search_param=${searchParam}`;
|
|
|
@@ -60,20 +106,9 @@ export const useLogs = (
|
|
|
console.log("Opened websocket:", websocketKey);
|
|
|
},
|
|
|
onmessage: (evt: MessageEvent) => {
|
|
|
- let newLogs: Anser.AnserJsonEntry[][] = [];
|
|
|
-
|
|
|
- evt?.data?.split("\n").forEach((logLine: string) => {
|
|
|
- if (logLine) {
|
|
|
- var parsedLine = JSON.parse(logLine);
|
|
|
-
|
|
|
- let ansiLog = Anser.ansiToJson(parsedLine.log);
|
|
|
- newLogs.push(ansiLog);
|
|
|
- }
|
|
|
- });
|
|
|
+ const newLogs = parseLogs(evt?.data?.split("\n"));
|
|
|
|
|
|
- setLogs((prevLogs) => {
|
|
|
- return prevLogs.concat(newLogs);
|
|
|
- });
|
|
|
+ pushLogs(newLogs);
|
|
|
},
|
|
|
onclose: () => {
|
|
|
console.log("Closed websocket:", websocketKey);
|
|
|
@@ -85,10 +120,9 @@ export const useLogs = (
|
|
|
};
|
|
|
|
|
|
const queryLogs = (
|
|
|
- initLogs: Anser.AnserJsonEntry[][],
|
|
|
startDate: Date,
|
|
|
endDate: Date,
|
|
|
- direction: string,
|
|
|
+ direction: Direction,
|
|
|
cb?: () => void
|
|
|
) => {
|
|
|
api
|
|
|
@@ -109,25 +143,13 @@ export const useLogs = (
|
|
|
}
|
|
|
)
|
|
|
.then((res) => {
|
|
|
- var newLogs: Anser.AnserJsonEntry[][] = [];
|
|
|
- res.data.logs?.forEach((logLine: any) => {
|
|
|
- if (logLine) {
|
|
|
- var parsedLine = JSON.parse(logLine.line);
|
|
|
-
|
|
|
- let ansiLog = Anser.ansiToJson(parsedLine.log);
|
|
|
- newLogs.push(ansiLog);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- var modifiedLogs: Anser.AnserJsonEntry[][] = initLogs;
|
|
|
-
|
|
|
- if (direction == "forward") {
|
|
|
- modifiedLogs.push(...newLogs);
|
|
|
- } else if (direction == "backward") {
|
|
|
- modifiedLogs.push(...newLogs.reverse());
|
|
|
- }
|
|
|
+ const newLogs = parseLogs(
|
|
|
+ res.data.logs?.filter(Boolean).map((logLine: any) => logLine.line)
|
|
|
+ );
|
|
|
|
|
|
- setLogs([...modifiedLogs]);
|
|
|
+ pushLogs(
|
|
|
+ direction === Direction.backward ? newLogs.reverse() : newLogs
|
|
|
+ );
|
|
|
cb && cb();
|
|
|
});
|
|
|
};
|
|
|
@@ -137,33 +159,43 @@ export const useLogs = (
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ flushLogsBuffer(true);
|
|
|
const websocketKey = `${currentPod}-${namespace}-websocket`;
|
|
|
- var newEndDate = setDate || new Date();
|
|
|
-
|
|
|
- queryLogs([], startDate, newEndDate, "backward", () => {
|
|
|
- setEndDate(newEndDate);
|
|
|
- closeWebsocket(websocketKey);
|
|
|
-
|
|
|
- if (isLive) {
|
|
|
- setupWebsocket(websocketKey);
|
|
|
- return () => {
|
|
|
- closeWebsocket(websocketKey);
|
|
|
- };
|
|
|
+ const newEndDate = dayjs(setDate);
|
|
|
+
|
|
|
+ queryLogs(
|
|
|
+ startDate.toDate(),
|
|
|
+ newEndDate.toDate(),
|
|
|
+ Direction.backward,
|
|
|
+ () => {
|
|
|
+ setEndDate(newEndDate);
|
|
|
+ closeWebsocket(websocketKey);
|
|
|
+
|
|
|
+ if (isLive) {
|
|
|
+ setupWebsocket(websocketKey);
|
|
|
+ return () => {
|
|
|
+ closeWebsocket(websocketKey);
|
|
|
+ };
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
+ );
|
|
|
};
|
|
|
|
|
|
const moveCursor = (direction: number) => {
|
|
|
if (direction < 0) {
|
|
|
// we query by setting the endDate equal to the previous startDate, and setting the direction
|
|
|
// to "backward"
|
|
|
- var twoWeeksAgo = new Date();
|
|
|
- twoWeeksAgo.setDate(twoWeeksAgo.getDate() - 14);
|
|
|
-
|
|
|
- queryLogs(logs, twoWeeksAgo, startDate, "backward", () => {
|
|
|
- setEndDate(startDate);
|
|
|
- setStartDate(twoWeeksAgo);
|
|
|
- });
|
|
|
+ const twoWeeksAgo = dayjs().subtract(14, "days");
|
|
|
+
|
|
|
+ queryLogs(
|
|
|
+ twoWeeksAgo.toDate(),
|
|
|
+ startDate.toDate(),
|
|
|
+ Direction.backward,
|
|
|
+ () => {
|
|
|
+ setEndDate(startDate);
|
|
|
+ setStartDate(twoWeeksAgo);
|
|
|
+ }
|
|
|
+ );
|
|
|
} else {
|
|
|
if (isLive) {
|
|
|
return;
|
|
|
@@ -171,14 +203,49 @@ export const useLogs = (
|
|
|
|
|
|
// we query by setting the startDate equal to the previous endDate, setting the endDate equal to the
|
|
|
// current time, and setting the direction to "forward"
|
|
|
- var currDate = new Date();
|
|
|
- queryLogs(logs, endDate, currDate, "forward", () => {
|
|
|
+ const currDate = dayjs();
|
|
|
+ queryLogs(endDate.toDate(), currDate.toDate(), Direction.forward, () => {
|
|
|
setStartDate(endDate);
|
|
|
setEndDate(currDate);
|
|
|
});
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ useEffect(() => {
|
|
|
+ flushLogsBuffer(true);
|
|
|
+ }, []);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * In some situations, we might never hit the limit for the max buffer size.
|
|
|
+ * An example is if the total logs for the pod < MAX_BUFFER_LOGS.
|
|
|
+ *
|
|
|
+ * For handling situations like this, we would want to force a flush operation
|
|
|
+ * on the buffer so that we dont have any stale logs
|
|
|
+ */
|
|
|
+ useEffect(() => {
|
|
|
+ /**
|
|
|
+ * We don't want users to wait for too long for the initial
|
|
|
+ * logs to appear. So we use a setTimeout for 1s to force-flush
|
|
|
+ * logs after 1s of load
|
|
|
+ */
|
|
|
+ setTimeout(flushLogsBuffer, 500);
|
|
|
+
|
|
|
+ const flushLogsBufferInterval = setInterval(flushLogsBuffer, 3000);
|
|
|
+
|
|
|
+ return () => clearInterval(flushLogsBufferInterval);
|
|
|
+ }, []);
|
|
|
+
|
|
|
+ useEffect(() => {
|
|
|
+ refresh();
|
|
|
+ }, [currentPod, namespace, searchParam, setDate]);
|
|
|
+
|
|
|
+ useEffect(() => {
|
|
|
+ // if the streaming is no longer live, close all websockets
|
|
|
+ if (!isLive) {
|
|
|
+ closeAllWebsockets();
|
|
|
+ }
|
|
|
+ }, [isLive]);
|
|
|
+
|
|
|
return {
|
|
|
logs,
|
|
|
refresh,
|