2
0
Эх сурвалжийг харах

basic stop command functionality

Alexander Belanger 5 жил өмнө
parent
commit
697302cbd6

+ 0 - 2
dashboard/src/main/home/cluster-dashboard/expanded-chart/ExpandedJobChart.tsx

@@ -230,8 +230,6 @@ export default class ExpandedJobChart extends Component<PropsType, StateType> {
       return date2.getTime() - date1.getTime();
     });
 
-    console.log("JOBS ARE", jobs);
-
     this.setState({ jobs });
   };
 

+ 34 - 1
dashboard/src/main/home/cluster-dashboard/expanded-chart/jobs/JobResource.tsx

@@ -20,11 +20,35 @@ export default class JobResource extends Component<PropsType, StateType> {
     pods: [] as any[],
   };
 
-  expandJob = () => {
+  expandJob = (event: MouseEvent) => {
+    if (event) {
+      event.stopPropagation();
+    }
+    
     this.getPods(() => {
       this.setState({ expanded: !this.state.expanded });
     });
   };
+  
+  stopJob = (event: MouseEvent) => {
+    if (event) {
+      event.stopPropagation();
+    }
+    
+    let { currentCluster, currentProject, setCurrentError } = this.context;
+
+    api.stopJob(
+      "<token>",
+        {},
+        {
+          id: currentProject.id,
+          name: this.props.job.metadata?.name,
+          namespace: this.props.job.metadata?.namespace,
+          cluster_id: currentCluster.id,
+        }
+    ).then((res) => {})
+      .catch((err) => setCurrentError(JSON.stringify(err)));
+  }
 
   getPods = (callback: () => void) => {
     let { currentCluster, currentProject, setCurrentError } = this.context;
@@ -129,6 +153,14 @@ export default class JobResource extends Component<PropsType, StateType> {
     return <Status color="#ffffff11">Running</Status>;
   };
 
+  renderStopButton = () => {
+    if (!this.props.job.status?.succeeded && !this.props.job.status?.failed) {
+      return <i className="material-icons" onClick={this.stopJob}>
+        stop
+      </i>
+    }
+  }
+
   render() {
     let icon =
       "https://user-images.githubusercontent.com/65516095/111258413-4e2c3800-85f3-11eb-8a6a-88e03460f8fe.png";
@@ -150,6 +182,7 @@ export default class JobResource extends Component<PropsType, StateType> {
             <MaterialIconTray disabled={false}>
               {/* <i className="material-icons"
               onClick={this.editButtonOnClick}>mode_edit</i> */}
+              {this.renderStopButton()}
               <i className="material-icons" onClick={this.expandJob}>
                 {this.state.expanded ? "expand_less" : "expand_more"}
               </i>

+ 0 - 1
dashboard/src/main/home/cluster-dashboard/expanded-chart/status/Logs.tsx

@@ -123,7 +123,6 @@ export default class Logs extends Component<PropsType, StateType> {
   }
 
   componentWillUnmount() {
-    console.log("log unmount");
     if (this.ws) {
       this.ws.close();
     }

+ 10 - 0
dashboard/src/shared/api.tsx

@@ -789,6 +789,15 @@ const deleteConfigMap = baseApi<
   return `/api/projects/${pathParams.id}/k8s/configmap/delete`;
 });
 
+const stopJob = baseApi<
+  {},
+  { name: string; namespace: string; id: number, cluster_id: number }
+>("POST", (pathParams) => {
+  let { id, name, namespace, cluster_id } = pathParams
+  return `/api/projects/${id}/k8s/jobs/${namespace}/${name}/stop?cluster_id=${cluster_id}`;
+});
+
+
 // Bundle export to allow default api import (api.<method> is more readable)
 export default {
   checkAuth,
@@ -871,4 +880,5 @@ export default {
   updateUser,
   updateConfigMap,
   upgradeChartValues,
+  stopJob,
 };

+ 69 - 1
internal/kubernetes/agent.go

@@ -34,11 +34,15 @@ import (
 	v1beta1 "k8s.io/api/extensions/v1beta1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/runtime/schema"
 	"k8s.io/apimachinery/pkg/types"
 	"k8s.io/cli-runtime/pkg/genericclioptions"
 	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
 	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/tools/remotecommand"
 
 	"github.com/porter-dev/porter/internal/config"
 )
@@ -347,19 +351,34 @@ func (a *Agent) DeletePod(namespace string, name string) error {
 
 // GetPodLogs streams real-time logs from a given pod.
 func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
+	// get the pod to read in the list of contains
+	pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
+		context.Background(),
+		name,
+		metav1.GetOptions{},
+	)
+
+	if err != nil {
+		return fmt.Errorf("Cannot get pod %s: %s", name, err.Error())
+	}
+
+	container := pod.Spec.Containers[0].Name
+
 	tails := int64(400)
 
 	// follow logs
 	podLogOpts := v1.PodLogOptions{
 		Follow:    true,
 		TailLines: &tails,
+		Container: container,
 	}
+
 	req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
 
 	podLogs, err := req.Stream(context.TODO())
 
 	if err != nil {
-		return fmt.Errorf("Cannot open log stream for pod %s", name)
+		return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
 	}
 	defer podLogs.Close()
 
@@ -410,6 +429,55 @@ func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn)
 	}
 }
 
+// StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
+func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
+	jobPods, err := a.GetJobPods(namespace, name)
+
+	if err != nil {
+		return err
+	}
+
+	podName := jobPods[0].ObjectMeta.Name
+
+	restConf, err := a.RESTClientGetter.ToRESTConfig()
+
+	restConf.GroupVersion = &schema.GroupVersion{
+		Group:   "api",
+		Version: "v1",
+	}
+
+	restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
+
+	restClient, err := rest.RESTClientFor(restConf)
+
+	if err != nil {
+		return err
+	}
+
+	req := restClient.Post().
+		Resource("pods").
+		Name(podName).
+		Namespace(namespace).
+		SubResource("exec")
+
+	req.Param("command", "./signal.sh")
+	req.Param("container", "sidecar")
+	req.Param("stdin", "true")
+	req.Param("stdout", "false")
+	req.Param("tty", "false")
+
+	exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
+
+	if err != nil {
+		return err
+	}
+
+	return exec.Stream(remotecommand.StreamOptions{
+		Tty:   false,
+		Stdin: strings.NewReader("./signal.sh"),
+	})
+}
+
 // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
 // TODO: Support Jobs
 func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {

+ 49 - 0
server/api/k8s_handler.go

@@ -691,6 +691,55 @@ func (app *App) HandleListJobsByChart(w http.ResponseWriter, r *http.Request) {
 	}
 }
 
+// HandleStopJob stops a running job
+func (app *App) HandleStopJob(w http.ResponseWriter, r *http.Request) {
+	// get path parameters
+	namespace := chi.URLParam(r, "namespace")
+	name := chi.URLParam(r, "name")
+
+	vals, err := url.ParseQuery(r.URL.RawQuery)
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	// get the filter options
+	form := &forms.K8sForm{
+		OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
+			Repo:              app.Repo,
+			DigitalOceanOAuth: app.DOConf,
+		},
+	}
+
+	form.PopulateK8sOptionsFromQueryParams(vals, app.Repo.Cluster)
+
+	// validate the form
+	if err := app.validator.Struct(form); err != nil {
+		app.handleErrorFormValidation(err, ErrK8sValidate, w)
+		return
+	}
+
+	// create a new agent
+	var agent *kubernetes.Agent
+
+	if app.ServerConf.IsTesting {
+		agent = app.TestAgents.K8sAgent
+	} else {
+		agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
+	}
+
+	err = agent.StopJobWithJobSidecar(namespace, name)
+
+	if err != nil {
+		app.handleErrorInternal(err, w)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+	return
+}
+
 // HandleListJobPods lists all pods belonging to a specific job
 func (app *App) HandleListJobPods(w http.ResponseWriter, r *http.Request) {
 	// get path parameters

+ 14 - 0
server/router/router.go

@@ -1338,6 +1338,20 @@ func New(a *api.App) *chi.Mux {
 			),
 		)
 
+		r.Method(
+			"POST",
+			"/projects/{project_id}/k8s/jobs/{namespace}/{name}/stop",
+			auth.DoesUserHaveProjectAccess(
+				auth.DoesUserHaveClusterAccess(
+					requestlog.NewHandler(a.HandleStopJob, l),
+					mw.URLParam,
+					mw.QueryParam,
+				),
+				mw.URLParam,
+				mw.ReadAccess,
+			),
+		)
+
 		// /api/projects/{project_id}/subdomain routes
 		r.Method(
 			"POST",

+ 1 - 1
services/job_sidecar_container/job_killer.sh

@@ -72,7 +72,7 @@ graceful_shutdown() {
     [ -z "$target_pid" ] && echo "Exit Gracefully (0)" && exit 0 || echo "Dirty Exit (1)" && exit 1
 }
 
-trap 'graceful_shutdown $1 $2' SIGTERM SIGINT SIGHUP
+trap 'graceful_shutdown $grace_period_seconds $target' SIGTERM SIGINT SIGHUP
 
 echo "waiting for job kill signal..."
 sleep infinity &