Просмотр исходного кода

handle pod log connections with concurrent goroutines

sunguroku 5 лет назад
Родитель
Сommit
04b6cd3c35

+ 27 - 9
dashboard/src/main/home/cluster-dashboard/expanded-chart/log/Logs.tsx

@@ -7,13 +7,15 @@ type PropsType = {
 };
 
 type StateType = {
-  logs: string[]
+  logs: string[],
+  ws: any
 };
 
 export default class Logs extends Component<PropsType, StateType> {
   
   state = {
     logs: [] as string[],
+    ws : null as any
   }
 
   scrollRef = React.createRef<HTMLDivElement>()
@@ -30,17 +32,33 @@ export default class Logs extends Component<PropsType, StateType> {
 
   componentDidMount() {
     let { currentCluster, currentProject } = this.context;
+    if (!this.props.selectedPod) return
+
     let ws = new WebSocket(`ws://localhost:8080/api/projects/${currentProject.id}/k8s/default/pod/${this.props.selectedPod}/logs?cluster_id=${currentCluster.id}&service_account_id=${currentCluster.service_account_id}`)
     // let ws = new WebSocket(`ws://localhost:8080/api/projects/${currentProject.id}/k8s/deployment/status?cluster_id=${currentCluster.id}&service_account_id=${currentCluster.service_account_id}`)
+    this.setState({ ws }, () => {
+      if (!this.state.ws) return;
+  
+      this.state.ws.onopen = () => {
+        console.log('connected to websocket')
+      }
+  
+      this.state.ws.onmessage = (evt: MessageEvent) => {
+        this.setState({ logs: [...this.state.logs, evt.data] }, () => {
+          this.scrollToBottom()
+        })
+      }
+  
+      this.state.ws.onerror = (err: ErrorEvent) => {
+        console.log(err)
+      }
+    })
+  }
 
-    ws.onopen = () => {
-      console.log('connected to websocket')
-    }
-
-    ws.onmessage = evt => {
-      this.setState({ logs: [...this.state.logs, evt.data] }, () => {
-        this.scrollToBottom()
-      })
+  componentWillUnmount() {
+    if (this.state.ws) {
+      console.log('unmounting')
+      this.state.ws.close()
     }
   }
 

+ 38 - 9
internal/kubernetes/agent.go

@@ -55,20 +55,49 @@ func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn)
 	if err != nil {
 		return fmt.Errorf("Cannot open log stream for pod %s", name)
 	}
+	defer podLogs.Close()
 
 	r := bufio.NewReader(podLogs)
-	for {
-		bytes, err := r.ReadBytes('\n')
-		fmt.Println(bytes)
-		if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
-			return writeErr
+	errorchan := make(chan error)
+
+	go func() {
+		for {
+			if _, _, err := conn.ReadMessage(); err != nil {
+				conn.Close()
+				errorchan <- nil
+				return
+			}
 		}
+	}()
 
-		if err != nil {
-			if err != io.EOF {
-				return err
+	go func() {
+		for {
+			select {
+			case <-errorchan:
+				return
+			default:
+			}
+			bytes, err := r.ReadBytes('\n')
+			fmt.Println("BYTES", bytes)
+			if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
+				errorchan <- writeErr
+				return
 			}
-			return nil
+			if err != nil {
+				if err != io.EOF {
+					errorchan <- err
+					return
+				}
+				errorchan <- nil
+				return
+			}
+		}
+	}()
+
+	for {
+		select {
+		case err = <-errorchan:
+			return err
 		}
 	}
 }