|
|
@@ -10,7 +10,9 @@ import (
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
goerrors "errors"
|
|
|
@@ -107,6 +109,118 @@ func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[str
|
|
|
)
|
|
|
}
|
|
|
|
|
|
+func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
|
|
|
+ return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
|
|
|
+ context.TODO(),
|
|
|
+ &v1.ConfigMap{
|
|
|
+ ObjectMeta: metav1.ObjectMeta{
|
|
|
+ Name: fmt.Sprintf("%s.v%d", name, version),
|
|
|
+ Namespace: namespace,
|
|
|
+ Labels: map[string]string{
|
|
|
+ "owner": "porter",
|
|
|
+ "envgroup": name,
|
|
|
+ "version": fmt.Sprintf("%d", version),
|
|
|
+ },
|
|
|
+ Annotations: map[string]string{
|
|
|
+ PorterAppAnnotationName: strings.Join(apps, ","),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ Data: configMap,
|
|
|
+ },
|
|
|
+ metav1.CreateOptions{},
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+const PorterAppAnnotationName = "porter.run/apps"
|
|
|
+
|
|
|
+func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
|
|
|
+ annons := cm.Annotations
|
|
|
+
|
|
|
+ if annons == nil {
|
|
|
+ annons = make(map[string]string)
|
|
|
+ }
|
|
|
+
|
|
|
+ appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
|
|
|
+
|
|
|
+ if !appAnnonExists || appAnnon == "" {
|
|
|
+ annons[PorterAppAnnotationName] = appName
|
|
|
+ } else {
|
|
|
+ appStrArr := strings.Split(appAnnon, ",")
|
|
|
+ foundApp := false
|
|
|
+
|
|
|
+ for _, appStr := range appStrArr {
|
|
|
+ if appStr == appName {
|
|
|
+ foundApp = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !foundApp {
|
|
|
+ annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ cm.SetAnnotations(annons)
|
|
|
+
|
|
|
+ return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
|
|
|
+ context.TODO(),
|
|
|
+ cm,
|
|
|
+ metav1.UpdateOptions{},
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
|
|
|
+ annons := cm.Annotations
|
|
|
+
|
|
|
+ if annons == nil {
|
|
|
+ annons = make(map[string]string)
|
|
|
+ }
|
|
|
+
|
|
|
+ appAnn, appAnnExists := annons[PorterAppAnnotationName]
|
|
|
+
|
|
|
+ if !appAnnExists {
|
|
|
+ return nil, IsNotFoundError
|
|
|
+ }
|
|
|
+
|
|
|
+ appStrArr := strings.Split(appAnn, ",")
|
|
|
+ newStrArr := make([]string, 0)
|
|
|
+
|
|
|
+ for _, appStr := range appStrArr {
|
|
|
+ if appStr != appName {
|
|
|
+ newStrArr = append(newStrArr, appStr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
|
|
|
+
|
|
|
+ cm.SetAnnotations(annons)
|
|
|
+
|
|
|
+ return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
|
|
|
+ context.TODO(),
|
|
|
+ cm,
|
|
|
+ metav1.UpdateOptions{},
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
|
|
|
+ return a.Clientset.CoreV1().Secrets(namespace).Create(
|
|
|
+ context.TODO(),
|
|
|
+ &v1.Secret{
|
|
|
+ ObjectMeta: metav1.ObjectMeta{
|
|
|
+ Name: fmt.Sprintf("%s.v%d", name, version),
|
|
|
+ Namespace: namespace,
|
|
|
+ Labels: map[string]string{
|
|
|
+ "owner": "porter",
|
|
|
+ "envgroup": name,
|
|
|
+ "version": fmt.Sprintf("%d", version),
|
|
|
+ "configmap": cmName,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ Data: data,
|
|
|
+ },
|
|
|
+ metav1.CreateOptions{},
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
// CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
|
|
|
// base64 encoded
|
|
|
func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
|
|
|
@@ -219,6 +333,92 @@ func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
|
|
|
)
|
|
|
}
|
|
|
|
|
|
+func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
|
|
|
+ listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
|
|
|
+ context.Background(),
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup=%s", name),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return listResp.Items, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
|
|
|
+ return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
|
|
|
+ context.Background(),
|
|
|
+ metav1.DeleteOptions{},
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup=%s", name),
|
|
|
+ },
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
|
|
|
+ return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
|
|
|
+ context.Background(),
|
|
|
+ metav1.DeleteOptions{},
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup=%s", name),
|
|
|
+ },
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
|
|
|
+ listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
|
|
|
+ context.Background(),
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup"),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // only keep the latest version for each configmap
|
|
|
+ latestMap := make(map[string]v1.ConfigMap)
|
|
|
+
|
|
|
+ for _, configmap := range listResp.Items {
|
|
|
+ egName, egNameExists := configmap.Labels["envgroup"]
|
|
|
+
|
|
|
+ if !egNameExists {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
|
|
|
+
|
|
|
+ if currLatest, exists := latestMap[id]; exists {
|
|
|
+ // get version
|
|
|
+ currVersionStr, currVersionExists := currLatest.Labels["version"]
|
|
|
+ versionStr, versionExists := configmap.Labels["version"]
|
|
|
+
|
|
|
+ if versionExists && currVersionExists {
|
|
|
+ currVersion, currErr := strconv.Atoi(currVersionStr)
|
|
|
+ version, err := strconv.Atoi(versionStr)
|
|
|
+
|
|
|
+ if currErr == nil && err == nil && currVersion < version {
|
|
|
+ latestMap[id] = configmap
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ latestMap[id] = configmap
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ res := make([]v1.ConfigMap, 0)
|
|
|
+
|
|
|
+ for _, cm := range latestMap {
|
|
|
+ res = append(res, cm)
|
|
|
+ }
|
|
|
+
|
|
|
+ return res, nil
|
|
|
+}
|
|
|
+
|
|
|
// GetConfigMap retrieves the configmap given its name and namespace
|
|
|
func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
|
|
|
return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
|
|
|
@@ -228,6 +428,154 @@ func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, erro
|
|
|
)
|
|
|
}
|
|
|
|
|
|
+func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
|
|
|
+ listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
|
|
|
+ context.Background(),
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if listResp.Items == nil || len(listResp.Items) == 0 {
|
|
|
+ return nil, IsNotFoundError
|
|
|
+ }
|
|
|
+
|
|
|
+ // if the length of the list is greater than 1, return an error -- this shouldn't happen
|
|
|
+ if len(listResp.Items) > 1 {
|
|
|
+ return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
|
|
|
+ }
|
|
|
+
|
|
|
+ return &listResp.Items[0], nil
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
|
|
|
+ listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
|
|
|
+ context.Background(),
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup=%s", name),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, 0, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if listResp.Items == nil || len(listResp.Items) == 0 {
|
|
|
+ return nil, 0, IsNotFoundError
|
|
|
+ }
|
|
|
+
|
|
|
+ // iterate through the configmaps and get the greatest version
|
|
|
+ var res *v1.ConfigMap
|
|
|
+ var latestVersion uint
|
|
|
+
|
|
|
+ for _, configmap := range listResp.Items {
|
|
|
+ if res == nil {
|
|
|
+ versionStr, versionExists := configmap.Labels["version"]
|
|
|
+
|
|
|
+ if !versionExists {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ version, err := strconv.Atoi(versionStr)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ latestV := configmap
|
|
|
+ res = &latestV
|
|
|
+ latestVersion = uint(version)
|
|
|
+ } else {
|
|
|
+ // get version
|
|
|
+ versionStr, versionExists := configmap.Labels["version"]
|
|
|
+ currVersionStr, currVersionExists := res.Labels["version"]
|
|
|
+
|
|
|
+ if versionExists && currVersionExists {
|
|
|
+ currVersion, currErr := strconv.Atoi(currVersionStr)
|
|
|
+ version, err := strconv.Atoi(versionStr)
|
|
|
+ if currErr == nil && err == nil && currVersion < version {
|
|
|
+ latestV := configmap
|
|
|
+ res = &latestV
|
|
|
+ latestVersion = uint(version)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if res == nil {
|
|
|
+ return nil, 0, IsNotFoundError
|
|
|
+ }
|
|
|
+
|
|
|
+ return res, latestVersion, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
|
|
|
+ listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
|
|
|
+ context.Background(),
|
|
|
+ metav1.ListOptions{
|
|
|
+ LabelSelector: fmt.Sprintf("envgroup=%s", name),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, 0, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if listResp.Items == nil || len(listResp.Items) == 0 {
|
|
|
+ return nil, 0, IsNotFoundError
|
|
|
+ }
|
|
|
+
|
|
|
+ // iterate through the configmaps and get the greatest version
|
|
|
+ var res *v1.Secret
|
|
|
+ var latestVersion uint
|
|
|
+
|
|
|
+ for _, secret := range listResp.Items {
|
|
|
+ if res == nil {
|
|
|
+ versionStr, versionExists := secret.Labels["version"]
|
|
|
+
|
|
|
+ if !versionExists {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ version, err := strconv.Atoi(versionStr)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ latestV := secret
|
|
|
+ res = &latestV
|
|
|
+ latestVersion = uint(version)
|
|
|
+ } else {
|
|
|
+ // get version
|
|
|
+ versionStr, versionExists := secret.Labels["version"]
|
|
|
+ currVersionStr, currVersionExists := res.Labels["version"]
|
|
|
+
|
|
|
+ if versionExists && currVersionExists {
|
|
|
+ currVersion, currErr := strconv.Atoi(currVersionStr)
|
|
|
+ version, err := strconv.Atoi(versionStr)
|
|
|
+ if currErr == nil && err == nil && currVersion < version {
|
|
|
+ latestV := secret
|
|
|
+ res = &latestV
|
|
|
+ latestVersion = uint(version)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if res == nil {
|
|
|
+ return nil, 0, IsNotFoundError
|
|
|
+ }
|
|
|
+
|
|
|
+ return res, latestVersion, nil
|
|
|
+}
|
|
|
+
|
|
|
// GetSecret retrieves the secret given its name and namespace
|
|
|
func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
|
|
|
return a.Clientset.CoreV1().Secrets(namespace).Get(
|
|
|
@@ -242,7 +590,7 @@ func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
|
|
|
return a.Clientset.CoreV1().ConfigMaps(namespace).List(
|
|
|
context.TODO(),
|
|
|
metav1.ListOptions{
|
|
|
- LabelSelector: "porter=true",
|
|
|
+ LabelSelector: "porter",
|
|
|
},
|
|
|
)
|
|
|
}
|
|
|
@@ -605,13 +953,29 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
|
|
|
return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
|
|
|
}
|
|
|
|
|
|
- defer podLogs.Close()
|
|
|
-
|
|
|
r := bufio.NewReader(podLogs)
|
|
|
errorchan := make(chan error)
|
|
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ var once sync.Once
|
|
|
+ wg.Add(2)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ wg.Wait()
|
|
|
+ close(errorchan)
|
|
|
+ }()
|
|
|
+
|
|
|
go func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ // TODO: add method to alert on panic
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
// listens for websocket closing handshake
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
for {
|
|
|
if _, _, err := rw.ReadMessage(); err != nil {
|
|
|
errorchan <- nil
|
|
|
@@ -621,36 +985,39 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
|
|
|
}()
|
|
|
|
|
|
go func() {
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-errorchan:
|
|
|
- defer close(errorchan)
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ // TODO: add method to alert on panic
|
|
|
return
|
|
|
- default:
|
|
|
}
|
|
|
+ }()
|
|
|
+
|
|
|
+ defer wg.Done()
|
|
|
|
|
|
+ for {
|
|
|
bytes, err := r.ReadBytes('\n')
|
|
|
- if _, writeErr := rw.Write(bytes); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
return
|
|
|
}
|
|
|
- if err != nil {
|
|
|
- if err != io.EOF {
|
|
|
- errorchan <- err
|
|
|
- return
|
|
|
- }
|
|
|
- errorchan <- nil
|
|
|
+
|
|
|
+ if _, writeErr := rw.Write(bytes); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- for {
|
|
|
- select {
|
|
|
- case err = <-errorchan:
|
|
|
- return err
|
|
|
- }
|
|
|
+ for err = range errorchan {
|
|
|
+ // only call these methods a single time
|
|
|
+ once.Do(func() {
|
|
|
+ rw.Close()
|
|
|
+ podLogs.Close()
|
|
|
+ })
|
|
|
}
|
|
|
+
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
// GetPodLogs streams real-time logs from a given pod.
|
|
|
@@ -838,43 +1205,29 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
|
|
|
|
|
|
stopper := make(chan struct{})
|
|
|
errorchan := make(chan error)
|
|
|
- defer close(stopper)
|
|
|
|
|
|
- informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
|
|
- if strings.HasSuffix(err.Error(), ": Unauthorized") {
|
|
|
- errorchan <- &AuthError{}
|
|
|
- }
|
|
|
- })
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ var once sync.Once
|
|
|
+ var err error
|
|
|
|
|
|
- informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
- UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
- msg := Message{
|
|
|
- EventType: "UPDATE",
|
|
|
- Object: newObj,
|
|
|
- Kind: strings.ToLower(kind),
|
|
|
- }
|
|
|
- rw.WriteJSONWithChannel(msg, errorchan)
|
|
|
- },
|
|
|
- AddFunc: func(obj interface{}) {
|
|
|
- msg := Message{
|
|
|
- EventType: "ADD",
|
|
|
- Object: obj,
|
|
|
- Kind: strings.ToLower(kind),
|
|
|
- }
|
|
|
- rw.WriteJSONWithChannel(msg, errorchan)
|
|
|
- },
|
|
|
- DeleteFunc: func(obj interface{}) {
|
|
|
- msg := Message{
|
|
|
- EventType: "DELETE",
|
|
|
- Object: obj,
|
|
|
- Kind: strings.ToLower(kind),
|
|
|
- }
|
|
|
- rw.WriteJSONWithChannel(msg, errorchan)
|
|
|
- },
|
|
|
- })
|
|
|
+ wg.Add(2)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ wg.Wait()
|
|
|
+ close(errorchan)
|
|
|
+ }()
|
|
|
|
|
|
go func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ // TODO: add method to alert on panic
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
// listens for websocket closing handshake
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
for {
|
|
|
if _, _, err := rw.ReadMessage(); err != nil {
|
|
|
errorchan <- nil
|
|
|
@@ -883,14 +1236,75 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- go informer.Run(stopper)
|
|
|
+ go func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ // TODO: add method to alert on panic
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
- for {
|
|
|
- select {
|
|
|
- case err := <-errorchan:
|
|
|
- return err
|
|
|
- }
|
|
|
+ // listens for websocket closing handshake
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
|
|
+ if strings.HasSuffix(err.Error(), ": Unauthorized") {
|
|
|
+ errorchan <- &AuthError{}
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
+ UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
+ msg := Message{
|
|
|
+ EventType: "UPDATE",
|
|
|
+ Object: newObj,
|
|
|
+ Kind: strings.ToLower(kind),
|
|
|
+ }
|
|
|
+ err := rw.WriteJSON(msg)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ }
|
|
|
+ },
|
|
|
+ AddFunc: func(obj interface{}) {
|
|
|
+ msg := Message{
|
|
|
+ EventType: "ADD",
|
|
|
+ Object: obj,
|
|
|
+ Kind: strings.ToLower(kind),
|
|
|
+ }
|
|
|
+
|
|
|
+ err := rw.WriteJSON(msg)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ }
|
|
|
+ },
|
|
|
+ DeleteFunc: func(obj interface{}) {
|
|
|
+ msg := Message{
|
|
|
+ EventType: "DELETE",
|
|
|
+ Object: obj,
|
|
|
+ Kind: strings.ToLower(kind),
|
|
|
+ }
|
|
|
+
|
|
|
+ err := rw.WriteJSON(msg)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ }
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ informer.Run(stopper)
|
|
|
+ }()
|
|
|
+
|
|
|
+ for err = range errorchan {
|
|
|
+ once.Do(func() {
|
|
|
+ close(stopper)
|
|
|
+ rw.Close()
|
|
|
+ })
|
|
|
}
|
|
|
+
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
return a.RunWebsocketTask(run)
|
|
|
@@ -982,113 +1396,146 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
|
|
|
|
|
|
stopper := make(chan struct{})
|
|
|
errorchan := make(chan error)
|
|
|
- defer close(stopper)
|
|
|
|
|
|
- informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
|
|
- if strings.HasSuffix(err.Error(), ": Unauthorized") {
|
|
|
- errorchan <- &AuthError{}
|
|
|
- }
|
|
|
- })
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ var once sync.Once
|
|
|
+ var err error
|
|
|
|
|
|
- informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
- UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
- secretObj, ok := newObj.(*v1.Secret)
|
|
|
+ wg.Add(2)
|
|
|
|
|
|
- if !ok {
|
|
|
- errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ go func() {
|
|
|
+ wg.Wait()
|
|
|
+ close(errorchan)
|
|
|
+ }()
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ // TODO: add method to alert on panic
|
|
|
return
|
|
|
}
|
|
|
+ }()
|
|
|
|
|
|
- helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
|
|
|
+ // listens for websocket closing handshake
|
|
|
+ defer wg.Done()
|
|
|
|
|
|
- if isNotHelmRelease && err == nil {
|
|
|
+ for {
|
|
|
+ if _, _, err := rw.ReadMessage(); err != nil {
|
|
|
+ errorchan <- nil
|
|
|
return
|
|
|
}
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
- if err != nil {
|
|
|
- errorchan <- err
|
|
|
+ go func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ // TODO: add method to alert on panic
|
|
|
return
|
|
|
}
|
|
|
+ }()
|
|
|
+
|
|
|
+ // listens for websocket closing handshake
|
|
|
+ defer wg.Done()
|
|
|
|
|
|
- msg := Message{
|
|
|
- EventType: "UPDATE",
|
|
|
- Object: helm_object,
|
|
|
+ informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
|
|
+ if strings.HasSuffix(err.Error(), ": Unauthorized") {
|
|
|
+ errorchan <- &AuthError{}
|
|
|
}
|
|
|
+ })
|
|
|
|
|
|
- rw.WriteJSONWithChannel(msg, errorchan)
|
|
|
- },
|
|
|
- AddFunc: func(obj interface{}) {
|
|
|
- secretObj, ok := obj.(*v1.Secret)
|
|
|
+ informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
+ UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
+ secretObj, ok := newObj.(*v1.Secret)
|
|
|
|
|
|
- if !ok {
|
|
|
- errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
- return
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
+ errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
|
|
|
+ helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
|
|
|
|
|
|
- if isNotHelmRelease && err == nil {
|
|
|
- return
|
|
|
- }
|
|
|
+ if isNotHelmRelease && err == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
- errorchan <- err
|
|
|
- return
|
|
|
- }
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- msg := Message{
|
|
|
- EventType: "ADD",
|
|
|
- Object: helm_object,
|
|
|
- }
|
|
|
+ msg := Message{
|
|
|
+ EventType: "UPDATE",
|
|
|
+ Object: helm_object,
|
|
|
+ }
|
|
|
|
|
|
- rw.WriteJSONWithChannel(msg, errorchan)
|
|
|
- },
|
|
|
- DeleteFunc: func(obj interface{}) {
|
|
|
- secretObj, ok := obj.(*v1.Secret)
|
|
|
+ rw.WriteJSON(msg)
|
|
|
+ },
|
|
|
+ AddFunc: func(obj interface{}) {
|
|
|
+ secretObj, ok := obj.(*v1.Secret)
|
|
|
|
|
|
- if !ok {
|
|
|
- errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
- return
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
+ errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
|
|
|
+ helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
|
|
|
|
|
|
- if isNotHelmRelease && err == nil {
|
|
|
- return
|
|
|
- }
|
|
|
+ if isNotHelmRelease && err == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
- errorchan <- err
|
|
|
- return
|
|
|
- }
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- msg := Message{
|
|
|
- EventType: "DELETE",
|
|
|
- Object: helm_object,
|
|
|
- }
|
|
|
+ msg := Message{
|
|
|
+ EventType: "ADD",
|
|
|
+ Object: helm_object,
|
|
|
+ }
|
|
|
|
|
|
- rw.WriteJSONWithChannel(msg, errorchan)
|
|
|
- },
|
|
|
- })
|
|
|
+ rw.WriteJSON(msg)
|
|
|
+ },
|
|
|
+ DeleteFunc: func(obj interface{}) {
|
|
|
+ secretObj, ok := obj.(*v1.Secret)
|
|
|
|
|
|
- go func() {
|
|
|
- // listens for websocket closing handshake
|
|
|
- for {
|
|
|
- if _, _, err := rw.ReadMessage(); err != nil {
|
|
|
- errorchan <- nil
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
+ if !ok {
|
|
|
+ errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- go informer.Run(stopper)
|
|
|
+ helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
|
|
|
|
|
|
- for {
|
|
|
- select {
|
|
|
- case err := <-errorchan:
|
|
|
- return err
|
|
|
- }
|
|
|
+ if isNotHelmRelease && err == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ msg := Message{
|
|
|
+ EventType: "DELETE",
|
|
|
+ Object: helm_object,
|
|
|
+ }
|
|
|
+
|
|
|
+ rw.WriteJSON(msg)
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ informer.Run(stopper)
|
|
|
+ }()
|
|
|
+
|
|
|
+ for err = range errorchan {
|
|
|
+ once.Do(func() {
|
|
|
+ close(stopper)
|
|
|
+ rw.Close()
|
|
|
+ })
|
|
|
}
|
|
|
+
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
return a.RunWebsocketTask(run)
|
|
|
@@ -1099,7 +1546,13 @@ func (a *Agent) Provision(
|
|
|
) error {
|
|
|
// get the provisioner job template
|
|
|
job, err := provisioner.GetProvisionerJobTemplate(opts)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
+ // clearExistingJob with the same name
|
|
|
+ // this is required in case of a job retry
|
|
|
+ err = a.clearExistingJobs(job)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -1114,6 +1567,67 @@ func (a *Agent) Provision(
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+func (a *Agent) clearExistingJobs(j *batchv1.Job) error {
|
|
|
+ // find if existingJob already exists
|
|
|
+ existingJob, err := a.Clientset.BatchV1().Jobs(j.Namespace).Get(
|
|
|
+ context.TODO(),
|
|
|
+ j.Name,
|
|
|
+ metav1.GetOptions{},
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ // job not found, no further action needed
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ w, err := a.Clientset.BatchV1().Jobs(existingJob.Namespace).Watch(
|
|
|
+ context.TODO(),
|
|
|
+ metav1.ListOptions{
|
|
|
+ ResourceVersion: existingJob.ResourceVersion,
|
|
|
+ // ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ // most probably the job has already been deleted
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ deleteErrorChan := make(chan error)
|
|
|
+
|
|
|
+ go func(errChan chan<- error) {
|
|
|
+ // job exists, delete it and wait for its deletion
|
|
|
+ // delete job if it already exists
|
|
|
+ err = a.Clientset.BatchV1().Jobs(existingJob.Namespace).Delete(
|
|
|
+ context.TODO(),
|
|
|
+ j.Name,
|
|
|
+ metav1.DeleteOptions{},
|
|
|
+ )
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ // unable to delete job
|
|
|
+ errChan <- err
|
|
|
+ }
|
|
|
+ }(deleteErrorChan)
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return fmt.Errorf("timedout waiting for existing job deletion")
|
|
|
+ case event := <-w.ResultChan():
|
|
|
+ switch event.Type {
|
|
|
+ case watch.Deleted:
|
|
|
+ // job has been successfully delete
|
|
|
+ // return without error
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// CreateImagePullSecrets will create the required image pull secrets and
|
|
|
// return a map from the registry name to the name of the secret.
|
|
|
func (a *Agent) CreateImagePullSecrets(
|
|
|
@@ -1223,11 +1737,12 @@ func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
|
|
|
return err, false
|
|
|
}
|
|
|
defer w.Stop()
|
|
|
- for {
|
|
|
+
|
|
|
+ expireTime := time.Now().Add(time.Second * 30)
|
|
|
+
|
|
|
+ for time.Now().Unix() <= expireTime.Unix() {
|
|
|
select {
|
|
|
- case <-time.After(time.Second * 30):
|
|
|
- return goerrors.New("timed out waiting for pod"), false
|
|
|
- case <-time.Tick(time.Second):
|
|
|
+ case <-time.NewTicker(time.Second).C:
|
|
|
// poll every second in case we already missed the ready event while
|
|
|
// creating the listener.
|
|
|
pod, err = a.Clientset.CoreV1().
|
|
|
@@ -1253,6 +1768,8 @@ func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return goerrors.New("timed out waiting for pod"), false
|
|
|
}
|
|
|
|
|
|
func isPodReady(pod *v1.Pod) bool {
|