|
|
@@ -1,27 +1,74 @@
|
|
|
-import Anser from "anser";
|
|
|
-import { flatMap } from "lodash";
|
|
|
-import { useContext, useEffect, useMemo, useRef, useState } from "react";
|
|
|
+import Anser, { AnserJsonEntry } from "anser";
|
|
|
+import dayjs from "dayjs";
|
|
|
+import _ from "lodash";
|
|
|
+import { useContext, useEffect, useRef, useState } from "react";
|
|
|
import api from "shared/api";
|
|
|
import { Context } from "shared/Context";
|
|
|
import { useWebsockets, NewWebsocketOptions } from "shared/hooks/useWebsockets";
|
|
|
+import { isJSON } from "shared/util";
|
|
|
|
|
|
-const MAX_LOGS = 250;
|
|
|
+const MAX_LOGS = 5000;
|
|
|
+const MAX_BUFFER_LOGS = 1000;
|
|
|
+const QUERY_LIMIT = 1000;
|
|
|
+
|
|
|
+export enum Direction {
|
|
|
+ forward = "forward",
|
|
|
+ backward = "backward",
|
|
|
+}
|
|
|
+
|
|
|
+interface Log {
|
|
|
+ line: AnserJsonEntry[];
|
|
|
+ lineNumber: number;
|
|
|
+ timestamp: string;
|
|
|
+}
|
|
|
+
|
|
|
+interface LogLine {
|
|
|
+ log: string;
|
|
|
+ stream: string;
|
|
|
+ time: string;
|
|
|
+}
|
|
|
+
|
|
|
+const parseLogs = (logs: string[] = []): Log[] => {
|
|
|
+ return logs
|
|
|
+ .filter(Boolean)
|
|
|
+ .filter(isJSON)
|
|
|
+ .map((logLine: string, idx) => {
|
|
|
+ try {
|
|
|
+ const parsedLine: LogLine = JSON.parse(logLine);
|
|
|
+ // TODO Move log parsing to the render method
|
|
|
+ const ansiLog = Anser.ansiToJson(parsedLine.log);
|
|
|
+ return {
|
|
|
+ line: ansiLog,
|
|
|
+ lineNumber: idx + 1,
|
|
|
+ timestamp: parsedLine.time,
|
|
|
+ };
|
|
|
+ } catch (err) {
|
|
|
+ console.error(err, logLine);
|
|
|
+ }
|
|
|
+ });
|
|
|
+};
|
|
|
+
|
|
|
+interface PaginationInfo {
|
|
|
+ previousCursor: string | null;
|
|
|
+ nextCursor: string | null;
|
|
|
+}
|
|
|
|
|
|
export const useLogs = (
|
|
|
currentPod: string,
|
|
|
namespace: string,
|
|
|
searchParam: string,
|
|
|
// if setDate is set, results are not live
|
|
|
- setDate: Date
|
|
|
+ setDate?: Date
|
|
|
) => {
|
|
|
- var d = new Date();
|
|
|
- d.setDate(d.getDate() - 14);
|
|
|
-
|
|
|
const isLive = !setDate;
|
|
|
+ const logsBufferRef = useRef<Log[]>([]);
|
|
|
const { currentCluster, currentProject } = useContext(Context);
|
|
|
- const [logs, setLogs] = useState<Anser.AnserJsonEntry[][]>([]);
|
|
|
- const [startDate, setStartDate] = useState<Date>(d);
|
|
|
- const [endDate, setEndDate] = useState<Date>(setDate || new Date());
|
|
|
+ const [logs, setLogs] = useState<Log[]>([]);
|
|
|
+ const [paginationInfo, setPaginationInfo] = useState<PaginationInfo>({
|
|
|
+ previousCursor: null,
|
|
|
+ nextCursor: null,
|
|
|
+ });
|
|
|
+ const [loading, setLoading] = useState(true);
|
|
|
|
|
|
// if we are live:
|
|
|
// - start date is initially set to 2 weeks ago
|
|
|
@@ -41,16 +88,80 @@ export const useLogs = (
|
|
|
closeAllWebsockets,
|
|
|
} = useWebsockets();
|
|
|
|
|
|
- useEffect(() => {
|
|
|
- return refresh();
|
|
|
- }, [currentPod, namespace, searchParam, setDate]);
|
|
|
+ const updateLogs = (
|
|
|
+ newLogs: Log[],
|
|
|
+ direction: Direction = Direction.forward
|
|
|
+ ) => {
|
|
|
+ // Nothing to update here
|
|
|
+ if (!newLogs.length) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- useEffect(() => {
|
|
|
- // if the streaming is no longer live, close all websockets
|
|
|
- if (!isLive) {
|
|
|
- closeAllWebsockets();
|
|
|
+ setLogs((logs) => {
|
|
|
+ let updatedLogs = _.cloneDeep(logs);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If direction = Direction.forward, we want to append the new logs
|
|
|
+ * at the end of the current logs, else we want to append before the current logs
|
|
|
+ *
|
|
|
+ */
|
|
|
+ if (direction === Direction.forward) {
|
|
|
+ const lastLineNumber = updatedLogs.at(-1)?.lineNumber ?? 0;
|
|
|
+
|
|
|
+ updatedLogs.push(
|
|
|
+ ...newLogs.map((log, idx) => ({
|
|
|
+ ...log,
|
|
|
+ lineNumber: lastLineNumber + idx + 1,
|
|
|
+ }))
|
|
|
+ );
|
|
|
+
|
|
|
+ // For direction = Direction.forward, remove logs from the front
|
|
|
+ if (updatedLogs.length > MAX_LOGS) {
|
|
|
+ const logsToBeRemoved =
|
|
|
+ newLogs.length < MAX_BUFFER_LOGS ? newLogs.length : MAX_BUFFER_LOGS;
|
|
|
+ updatedLogs = updatedLogs.slice(logsToBeRemoved);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ updatedLogs = newLogs.concat(
|
|
|
+ updatedLogs.map((log) => ({
|
|
|
+ ...log,
|
|
|
+ lineNumber: log.lineNumber + newLogs.length,
|
|
|
+ }))
|
|
|
+ );
|
|
|
+
|
|
|
+ // For direction = Direction.backward, remove logs from the back
|
|
|
+ if (updatedLogs.length > MAX_LOGS) {
|
|
|
+ const logsToBeRemoved =
|
|
|
+ newLogs.length < MAX_BUFFER_LOGS ? newLogs.length : MAX_BUFFER_LOGS;
|
|
|
+
|
|
|
+ updatedLogs = updatedLogs.slice(0, logsToBeRemoved);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return updatedLogs;
|
|
|
+ });
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Flushes the logs buffer. If `discard` is true,
|
|
|
+ * it will update `current logs` before executing
|
|
|
+ * the flush operation
|
|
|
+ */
|
|
|
+ const flushLogsBuffer = (discard: boolean = false) => {
|
|
|
+ if (!discard) {
|
|
|
+ updateLogs(logsBufferRef.current ?? []);
|
|
|
}
|
|
|
- }, [isLive]);
|
|
|
+
|
|
|
+ logsBufferRef.current = [];
|
|
|
+ };
|
|
|
+
|
|
|
+ const pushLogs = (newLogs: Log[]) => {
|
|
|
+ logsBufferRef.current.push(...newLogs);
|
|
|
+
|
|
|
+ if (logsBufferRef.current.length >= MAX_BUFFER_LOGS) {
|
|
|
+ flushLogsBuffer();
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
const setupWebsocket = (websocketKey: string) => {
|
|
|
const endpoint = `/api/projects/${currentProject.id}/clusters/${currentCluster.id}/namespaces/${namespace}/logs/loki?pod_selector=${currentPod}&namespace=${namespace}&search_param=${searchParam}`;
|
|
|
@@ -60,20 +171,16 @@ export const useLogs = (
|
|
|
console.log("Opened websocket:", websocketKey);
|
|
|
},
|
|
|
onmessage: (evt: MessageEvent) => {
|
|
|
- let newLogs: Anser.AnserJsonEntry[][] = [];
|
|
|
-
|
|
|
- evt?.data?.split("\n").forEach((logLine: string) => {
|
|
|
- if (logLine) {
|
|
|
- var parsedLine = JSON.parse(logLine);
|
|
|
+ // Nothing to do here
|
|
|
+ if (!evt?.data || typeof evt.data !== "string") {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- let ansiLog = Anser.ansiToJson(parsedLine.log);
|
|
|
- newLogs.push(ansiLog);
|
|
|
- }
|
|
|
- });
|
|
|
+ const newLogs = parseLogs(
|
|
|
+ evt?.data?.split("}\n").map((line: string) => line + "}")
|
|
|
+ );
|
|
|
|
|
|
- setLogs((prevLogs) => {
|
|
|
- return prevLogs.concat(newLogs);
|
|
|
- });
|
|
|
+ pushLogs(newLogs);
|
|
|
},
|
|
|
onclose: () => {
|
|
|
console.log("Closed websocket:", websocketKey);
|
|
|
@@ -85,23 +192,22 @@ export const useLogs = (
|
|
|
};
|
|
|
|
|
|
const queryLogs = (
|
|
|
- initLogs: Anser.AnserJsonEntry[][],
|
|
|
- startDate: Date,
|
|
|
- endDate: Date,
|
|
|
- direction: string,
|
|
|
- cb?: () => void
|
|
|
+ startDate: string,
|
|
|
+ endDate: string,
|
|
|
+ direction: Direction,
|
|
|
+ limit: number = QUERY_LIMIT
|
|
|
) => {
|
|
|
- api
|
|
|
+ return api
|
|
|
.getLogs(
|
|
|
"<token>",
|
|
|
{
|
|
|
pod_selector: currentPod,
|
|
|
- namespace: namespace,
|
|
|
+ namespace,
|
|
|
search_param: searchParam,
|
|
|
- start_range: startDate.toISOString(),
|
|
|
- end_range: endDate.toISOString(),
|
|
|
- limit: 1000,
|
|
|
- direction: direction,
|
|
|
+ start_range: startDate,
|
|
|
+ end_range: endDate,
|
|
|
+ limit,
|
|
|
+ direction,
|
|
|
},
|
|
|
{
|
|
|
cluster_id: currentCluster.id,
|
|
|
@@ -109,61 +215,84 @@ export const useLogs = (
|
|
|
}
|
|
|
)
|
|
|
.then((res) => {
|
|
|
- var newLogs: Anser.AnserJsonEntry[][] = [];
|
|
|
- res.data.logs?.forEach((logLine: any) => {
|
|
|
- if (logLine) {
|
|
|
- var parsedLine = JSON.parse(logLine.line);
|
|
|
-
|
|
|
- let ansiLog = Anser.ansiToJson(parsedLine.log);
|
|
|
- newLogs.push(ansiLog);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- var modifiedLogs: Anser.AnserJsonEntry[][] = initLogs;
|
|
|
-
|
|
|
- if (direction == "forward") {
|
|
|
- modifiedLogs.push(...newLogs);
|
|
|
- } else if (direction == "backward") {
|
|
|
- modifiedLogs.push(...newLogs.reverse());
|
|
|
+ const newLogs = parseLogs(
|
|
|
+ res.data.logs?.filter(Boolean).map((logLine: any) => logLine.line)
|
|
|
+ );
|
|
|
+
|
|
|
+ if (direction === Direction.backward) {
|
|
|
+ newLogs.reverse();
|
|
|
}
|
|
|
|
|
|
- setLogs([...modifiedLogs]);
|
|
|
- cb && cb();
|
|
|
+ return {
|
|
|
+ logs: newLogs,
|
|
|
+ previousCursor:
|
|
|
+ // There are no more historical logs so don't set the previous cursor
|
|
|
+ newLogs.length < QUERY_LIMIT && direction == Direction.backward
|
|
|
+ ? null
|
|
|
+ : res.data.backward_continue_time,
|
|
|
+ nextCursor: res.data.forward_continue_time,
|
|
|
+ };
|
|
|
});
|
|
|
};
|
|
|
|
|
|
- const refresh = () => {
|
|
|
+ const refresh = async () => {
|
|
|
if (!currentPod) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ setLoading(true);
|
|
|
+ setLogs([]);
|
|
|
+ flushLogsBuffer(true);
|
|
|
const websocketKey = `${currentPod}-${namespace}-websocket`;
|
|
|
- var newEndDate = setDate || new Date();
|
|
|
+ const endDate = dayjs(setDate);
|
|
|
+ const twoWeeksAgo = endDate.subtract(14, "days");
|
|
|
|
|
|
- queryLogs([], startDate, newEndDate, "backward", () => {
|
|
|
- setEndDate(newEndDate);
|
|
|
- closeWebsocket(websocketKey);
|
|
|
+ const { logs: initialLogs, previousCursor, nextCursor } = await queryLogs(
|
|
|
+ twoWeeksAgo.toISOString(),
|
|
|
+ endDate.toISOString(),
|
|
|
+ Direction.backward
|
|
|
+ );
|
|
|
|
|
|
- if (isLive) {
|
|
|
- setupWebsocket(websocketKey);
|
|
|
- return () => {
|
|
|
- closeWebsocket(websocketKey);
|
|
|
- };
|
|
|
- }
|
|
|
+ setPaginationInfo({
|
|
|
+ previousCursor,
|
|
|
+ nextCursor,
|
|
|
});
|
|
|
+
|
|
|
+ updateLogs(initialLogs);
|
|
|
+
|
|
|
+ closeWebsocket(websocketKey);
|
|
|
+
|
|
|
+ setLoading(false);
|
|
|
+
|
|
|
+ if (isLive) {
|
|
|
+ setupWebsocket(websocketKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ return () => isLive && closeWebsocket(websocketKey);
|
|
|
};
|
|
|
|
|
|
- const moveCursor = (direction: number) => {
|
|
|
- if (direction < 0) {
|
|
|
+ const moveCursor = async (direction: Direction) => {
|
|
|
+ if (direction === Direction.backward) {
|
|
|
// we query by setting the endDate equal to the previous startDate, and setting the direction
|
|
|
// to "backward"
|
|
|
- var twoWeeksAgo = new Date();
|
|
|
- twoWeeksAgo.setDate(twoWeeksAgo.getDate() - 14);
|
|
|
+ const refDate = paginationInfo.previousCursor ?? dayjs().toISOString();
|
|
|
+ const twoWeeksAgo = dayjs(refDate).subtract(14, "days");
|
|
|
|
|
|
- queryLogs(logs, twoWeeksAgo, startDate, "backward", () => {
|
|
|
- setEndDate(startDate);
|
|
|
- setStartDate(twoWeeksAgo);
|
|
|
- });
|
|
|
+ const { logs: newLogs, previousCursor } = await queryLogs(
|
|
|
+ twoWeeksAgo.toISOString(),
|
|
|
+ refDate,
|
|
|
+ Direction.backward
|
|
|
+ );
|
|
|
+
|
|
|
+ updateLogs(
|
|
|
+ paginationInfo.previousCursor ? newLogs.slice(0, -1) : newLogs,
|
|
|
+ direction
|
|
|
+ );
|
|
|
+
|
|
|
+ setPaginationInfo((paginationInfo) => ({
|
|
|
+ ...paginationInfo,
|
|
|
+ previousCursor,
|
|
|
+ }));
|
|
|
} else {
|
|
|
if (isLive) {
|
|
|
return;
|
|
|
@@ -171,17 +300,66 @@ export const useLogs = (
|
|
|
|
|
|
// we query by setting the startDate equal to the previous endDate, setting the endDate equal to the
|
|
|
// current time, and setting the direction to "forward"
|
|
|
- var currDate = new Date();
|
|
|
- queryLogs(logs, endDate, currDate, "forward", () => {
|
|
|
- setStartDate(endDate);
|
|
|
- setEndDate(currDate);
|
|
|
- });
|
|
|
+ const refDate = paginationInfo.nextCursor ?? dayjs(setDate).toISOString();
|
|
|
+ const currDate = dayjs();
|
|
|
+
|
|
|
+ const { logs: newLogs, nextCursor } = await queryLogs(
|
|
|
+ refDate,
|
|
|
+ currDate.toISOString(),
|
|
|
+ Direction.forward
|
|
|
+ );
|
|
|
+
|
|
|
+ // If previously we had next cursor set, it is likely that the log might have a duplicate entry so we ignore the first line
|
|
|
+ updateLogs(paginationInfo.nextCursor ? newLogs.slice(1) : newLogs);
|
|
|
+
|
|
|
+ setPaginationInfo((paginationInfo) => ({
|
|
|
+ ...paginationInfo,
|
|
|
+ nextCursor,
|
|
|
+ }));
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ useEffect(() => {
|
|
|
+ setLogs([]);
|
|
|
+ flushLogsBuffer(true);
|
|
|
+ }, []);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * In some situations, we might never hit the limit for the max buffer size.
|
|
|
+ * An example is if the total logs for the pod < MAX_BUFFER_LOGS.
|
|
|
+ *
|
|
|
+ * For handling situations like this, we would want to force a flush operation
|
|
|
+ * on the buffer so that we dont have any stale logs
|
|
|
+ */
|
|
|
+ useEffect(() => {
|
|
|
+ /**
|
|
|
+ * We don't want users to wait for too long for the initial
|
|
|
+ * logs to appear. So we use a setTimeout for 1s to force-flush
|
|
|
+ * logs after 1s of load
|
|
|
+ */
|
|
|
+ setTimeout(flushLogsBuffer, 500);
|
|
|
+
|
|
|
+ const flushLogsBufferInterval = setInterval(flushLogsBuffer, 3000);
|
|
|
+
|
|
|
+ return () => clearInterval(flushLogsBufferInterval);
|
|
|
+ }, []);
|
|
|
+
|
|
|
+ useEffect(() => {
|
|
|
+ refresh();
|
|
|
+ }, [currentPod, namespace, searchParam, setDate]);
|
|
|
+
|
|
|
+ useEffect(() => {
|
|
|
+ // if the streaming is no longer live, close all websockets
|
|
|
+ if (!isLive) {
|
|
|
+ closeAllWebsockets();
|
|
|
+ }
|
|
|
+ }, [isLive]);
|
|
|
+
|
|
|
return {
|
|
|
logs,
|
|
|
refresh,
|
|
|
moveCursor,
|
|
|
+ paginationInfo,
|
|
|
+ loading,
|
|
|
};
|
|
|
};
|