agent.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. goerrors "errors"
  17. "github.com/porter-dev/porter/api/server/shared/websocket"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/registry"
  20. "github.com/porter-dev/porter/internal/repository"
  21. "golang.org/x/oauth2"
  22. errors2 "errors"
  23. "github.com/porter-dev/porter/internal/helm/grapher"
  24. appsv1 "k8s.io/api/apps/v1"
  25. batchv1 "k8s.io/api/batch/v1"
  26. batchv1beta1 "k8s.io/api/batch/v1beta1"
  27. v1 "k8s.io/api/core/v1"
  28. v1beta1 "k8s.io/api/extensions/v1beta1"
  29. "k8s.io/apimachinery/pkg/api/errors"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/runtime"
  33. "k8s.io/apimachinery/pkg/runtime/schema"
  34. "k8s.io/apimachinery/pkg/types"
  35. "k8s.io/apimachinery/pkg/watch"
  36. "k8s.io/cli-runtime/pkg/genericclioptions"
  37. "k8s.io/client-go/informers"
  38. "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/rest"
  40. "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/remotecommand"
  42. rspb "helm.sh/helm/v3/pkg/release"
  43. )
  44. // Agent is a Kubernetes agent for performing operations that interact with the
  45. // api server
  46. type Agent struct {
  47. RESTClientGetter genericclioptions.RESTClientGetter
  48. Clientset kubernetes.Interface
  49. }
  50. type Message struct {
  51. EventType string `json:"event_type"`
  52. Object interface{}
  53. Kind string
  54. }
  55. type ListOptions struct {
  56. FieldSelector string
  57. }
  58. type AuthError struct{}
  59. func (e *AuthError) Error() string {
  60. return "Unauthorized error"
  61. }
  62. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  63. func (a *Agent) UpdateClientset() error {
  64. restConf, err := a.RESTClientGetter.ToRESTConfig()
  65. if err != nil {
  66. return err
  67. }
  68. clientset, err := kubernetes.NewForConfig(restConf)
  69. if err != nil {
  70. return err
  71. }
  72. a.Clientset = clientset
  73. return nil
  74. }
  75. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  76. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  77. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  78. context.TODO(),
  79. &v1.ConfigMap{
  80. ObjectMeta: metav1.ObjectMeta{
  81. Name: name,
  82. Namespace: namespace,
  83. Labels: map[string]string{
  84. "porter": "true",
  85. },
  86. },
  87. Data: configMap,
  88. },
  89. metav1.CreateOptions{},
  90. )
  91. }
  92. func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
  93. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  94. context.TODO(),
  95. &v1.ConfigMap{
  96. ObjectMeta: metav1.ObjectMeta{
  97. Name: fmt.Sprintf("%s.v%d", name, version),
  98. Namespace: namespace,
  99. Labels: map[string]string{
  100. "owner": "porter",
  101. "envgroup": name,
  102. "version": fmt.Sprintf("%d", version),
  103. },
  104. Annotations: map[string]string{
  105. PorterAppAnnotationName: strings.Join(apps, ","),
  106. },
  107. },
  108. Data: configMap,
  109. },
  110. metav1.CreateOptions{},
  111. )
  112. }
  113. const PorterAppAnnotationName = "porter.run/apps"
  114. func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  115. annons := cm.Annotations
  116. if annons == nil {
  117. annons = make(map[string]string)
  118. }
  119. appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
  120. if !appAnnonExists || appAnnon == "" {
  121. annons[PorterAppAnnotationName] = appName
  122. } else {
  123. appStrArr := strings.Split(appAnnon, ",")
  124. foundApp := false
  125. for _, appStr := range appStrArr {
  126. if appStr == appName {
  127. foundApp = true
  128. }
  129. }
  130. if !foundApp {
  131. annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
  132. }
  133. }
  134. cm.SetAnnotations(annons)
  135. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  136. context.TODO(),
  137. cm,
  138. metav1.UpdateOptions{},
  139. )
  140. }
  141. func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  142. annons := cm.Annotations
  143. if annons == nil {
  144. annons = make(map[string]string)
  145. }
  146. appAnn, appAnnExists := annons[PorterAppAnnotationName]
  147. if !appAnnExists {
  148. return nil, IsNotFoundError
  149. }
  150. appStrArr := strings.Split(appAnn, ",")
  151. newStrArr := make([]string, 0)
  152. for _, appStr := range appStrArr {
  153. if appStr != appName {
  154. newStrArr = append(newStrArr, appStr)
  155. }
  156. }
  157. annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
  158. cm.SetAnnotations(annons)
  159. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  160. context.TODO(),
  161. cm,
  162. metav1.UpdateOptions{},
  163. )
  164. }
  165. func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
  166. return a.Clientset.CoreV1().Secrets(namespace).Create(
  167. context.TODO(),
  168. &v1.Secret{
  169. ObjectMeta: metav1.ObjectMeta{
  170. Name: fmt.Sprintf("%s.v%d", name, version),
  171. Namespace: namespace,
  172. Labels: map[string]string{
  173. "owner": "porter",
  174. "envgroup": name,
  175. "version": fmt.Sprintf("%d", version),
  176. "configmap": cmName,
  177. },
  178. },
  179. Data: data,
  180. },
  181. metav1.CreateOptions{},
  182. )
  183. }
  184. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  185. // base64 encoded
  186. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  187. return a.Clientset.CoreV1().Secrets(namespace).Create(
  188. context.TODO(),
  189. &v1.Secret{
  190. ObjectMeta: metav1.ObjectMeta{
  191. Name: name,
  192. Namespace: namespace,
  193. Labels: map[string]string{
  194. "porter": "true",
  195. "configmap": cmName,
  196. },
  197. },
  198. Data: data,
  199. },
  200. metav1.CreateOptions{},
  201. )
  202. }
  203. type mergeConfigMapData struct {
  204. Data map[string]*string `json:"data"`
  205. }
  206. // UpdateConfigMap updates the configmap given its name and namespace
  207. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  208. cmData := make(map[string]*string)
  209. for key, val := range configMap {
  210. valCopy := val
  211. cmData[key] = &valCopy
  212. if len(val) == 0 {
  213. cmData[key] = nil
  214. }
  215. }
  216. mergeCM := &mergeConfigMapData{
  217. Data: cmData,
  218. }
  219. patchBytes, err := json.Marshal(mergeCM)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  224. context.Background(),
  225. name,
  226. types.MergePatchType,
  227. patchBytes,
  228. metav1.PatchOptions{},
  229. )
  230. }
  231. type mergeLinkedSecretData struct {
  232. Data map[string]*[]byte `json:"data"`
  233. }
  234. // UpdateLinkedSecret updates the secret given its name and namespace
  235. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  236. secretData := make(map[string]*[]byte)
  237. for key, val := range data {
  238. valCopy := val
  239. secretData[key] = &valCopy
  240. if len(val) == 0 {
  241. secretData[key] = nil
  242. }
  243. }
  244. mergeSecret := &mergeLinkedSecretData{
  245. Data: secretData,
  246. }
  247. patchBytes, err := json.Marshal(mergeSecret)
  248. if err != nil {
  249. return err
  250. }
  251. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  252. context.TODO(),
  253. name,
  254. types.MergePatchType,
  255. patchBytes,
  256. metav1.PatchOptions{},
  257. )
  258. return err
  259. }
  260. // DeleteConfigMap deletes the configmap given its name and namespace
  261. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  262. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  263. context.TODO(),
  264. name,
  265. metav1.DeleteOptions{},
  266. )
  267. }
  268. // DeleteLinkedSecret deletes the secret given its name and namespace
  269. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  270. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  271. context.TODO(),
  272. name,
  273. metav1.DeleteOptions{},
  274. )
  275. }
  276. func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
  277. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  278. context.Background(),
  279. metav1.ListOptions{
  280. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  281. },
  282. )
  283. if err != nil {
  284. return nil, err
  285. }
  286. return listResp.Items, nil
  287. }
  288. func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
  289. return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
  290. context.Background(),
  291. metav1.DeleteOptions{},
  292. metav1.ListOptions{
  293. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  294. },
  295. )
  296. }
  297. func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
  298. return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
  299. context.Background(),
  300. metav1.DeleteOptions{},
  301. metav1.ListOptions{
  302. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  303. },
  304. )
  305. }
  306. func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
  307. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  308. context.Background(),
  309. metav1.ListOptions{
  310. LabelSelector: fmt.Sprintf("envgroup"),
  311. },
  312. )
  313. if err != nil {
  314. return nil, err
  315. }
  316. // only keep the latest version for each configmap
  317. latestMap := make(map[string]v1.ConfigMap)
  318. for _, configmap := range listResp.Items {
  319. egName, egNameExists := configmap.Labels["envgroup"]
  320. if !egNameExists {
  321. continue
  322. }
  323. id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
  324. if currLatest, exists := latestMap[id]; exists {
  325. // get version
  326. currVersionStr, currVersionExists := currLatest.Labels["version"]
  327. versionStr, versionExists := configmap.Labels["version"]
  328. if versionExists && currVersionExists {
  329. currVersion, currErr := strconv.Atoi(currVersionStr)
  330. version, err := strconv.Atoi(versionStr)
  331. if currErr == nil && err == nil && currVersion < version {
  332. latestMap[id] = configmap
  333. }
  334. }
  335. } else {
  336. latestMap[id] = configmap
  337. }
  338. }
  339. res := make([]v1.ConfigMap, 0)
  340. for _, cm := range latestMap {
  341. res = append(res, cm)
  342. }
  343. return res, nil
  344. }
  345. // GetConfigMap retrieves the configmap given its name and namespace
  346. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  347. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  348. context.TODO(),
  349. name,
  350. metav1.GetOptions{},
  351. )
  352. }
  353. func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
  354. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  355. context.Background(),
  356. metav1.ListOptions{
  357. LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
  358. },
  359. )
  360. if err != nil {
  361. return nil, err
  362. }
  363. if listResp.Items == nil || len(listResp.Items) == 0 {
  364. return nil, IsNotFoundError
  365. }
  366. // if the length of the list is greater than 1, return an error -- this shouldn't happen
  367. if len(listResp.Items) > 1 {
  368. return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
  369. }
  370. return &listResp.Items[0], nil
  371. }
  372. func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
  373. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  374. context.Background(),
  375. metav1.ListOptions{
  376. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  377. },
  378. )
  379. if err != nil {
  380. return nil, 0, err
  381. }
  382. if listResp.Items == nil || len(listResp.Items) == 0 {
  383. return nil, 0, IsNotFoundError
  384. }
  385. // iterate through the configmaps and get the greatest version
  386. var res *v1.ConfigMap
  387. var latestVersion uint
  388. for _, configmap := range listResp.Items {
  389. if res == nil {
  390. versionStr, versionExists := configmap.Labels["version"]
  391. if !versionExists {
  392. continue
  393. }
  394. version, err := strconv.Atoi(versionStr)
  395. if err != nil {
  396. continue
  397. }
  398. latestV := configmap
  399. res = &latestV
  400. latestVersion = uint(version)
  401. } else {
  402. // get version
  403. versionStr, versionExists := configmap.Labels["version"]
  404. currVersionStr, currVersionExists := res.Labels["version"]
  405. if versionExists && currVersionExists {
  406. currVersion, currErr := strconv.Atoi(currVersionStr)
  407. version, err := strconv.Atoi(versionStr)
  408. if currErr == nil && err == nil && currVersion < version {
  409. latestV := configmap
  410. res = &latestV
  411. latestVersion = uint(version)
  412. }
  413. }
  414. }
  415. }
  416. if res == nil {
  417. return nil, 0, IsNotFoundError
  418. }
  419. return res, latestVersion, nil
  420. }
  421. func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
  422. listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
  423. context.Background(),
  424. metav1.ListOptions{
  425. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  426. },
  427. )
  428. if err != nil {
  429. return nil, 0, err
  430. }
  431. if listResp.Items == nil || len(listResp.Items) == 0 {
  432. return nil, 0, IsNotFoundError
  433. }
  434. // iterate through the configmaps and get the greatest version
  435. var res *v1.Secret
  436. var latestVersion uint
  437. for _, secret := range listResp.Items {
  438. if res == nil {
  439. versionStr, versionExists := secret.Labels["version"]
  440. if !versionExists {
  441. continue
  442. }
  443. version, err := strconv.Atoi(versionStr)
  444. if err != nil {
  445. continue
  446. }
  447. latestV := secret
  448. res = &latestV
  449. latestVersion = uint(version)
  450. } else {
  451. // get version
  452. versionStr, versionExists := secret.Labels["version"]
  453. currVersionStr, currVersionExists := res.Labels["version"]
  454. if versionExists && currVersionExists {
  455. currVersion, currErr := strconv.Atoi(currVersionStr)
  456. version, err := strconv.Atoi(versionStr)
  457. if currErr == nil && err == nil && currVersion < version {
  458. latestV := secret
  459. res = &latestV
  460. latestVersion = uint(version)
  461. }
  462. }
  463. }
  464. }
  465. if res == nil {
  466. return nil, 0, IsNotFoundError
  467. }
  468. return res, latestVersion, nil
  469. }
  470. // GetSecret retrieves the secret given its name and namespace
  471. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  472. return a.Clientset.CoreV1().Secrets(namespace).Get(
  473. context.TODO(),
  474. name,
  475. metav1.GetOptions{},
  476. )
  477. }
  478. // ListConfigMaps simply lists namespaces
  479. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  480. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  481. context.TODO(),
  482. metav1.ListOptions{
  483. LabelSelector: "porter",
  484. },
  485. )
  486. }
  487. // ListEvents lists the events of a given object.
  488. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  489. return a.Clientset.CoreV1().Events(namespace).List(
  490. context.TODO(),
  491. metav1.ListOptions{
  492. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  493. },
  494. )
  495. }
  496. // ListNamespaces simply lists namespaces
  497. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  498. return a.Clientset.CoreV1().Namespaces().List(
  499. context.TODO(),
  500. metav1.ListOptions{},
  501. )
  502. }
  503. // CreateNamespace creates a namespace with the given name.
  504. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  505. // check if namespace exists
  506. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  507. context.TODO(),
  508. name,
  509. metav1.GetOptions{},
  510. )
  511. if err == nil && checkNS != nil {
  512. return checkNS, nil
  513. }
  514. namespace := v1.Namespace{
  515. ObjectMeta: metav1.ObjectMeta{
  516. Name: name,
  517. },
  518. }
  519. return a.Clientset.CoreV1().Namespaces().Create(
  520. context.TODO(),
  521. &namespace,
  522. metav1.CreateOptions{},
  523. )
  524. }
  525. // DeleteNamespace deletes the namespace given the name.
  526. func (a *Agent) DeleteNamespace(name string) error {
  527. return a.Clientset.CoreV1().Namespaces().Delete(
  528. context.TODO(),
  529. name,
  530. metav1.DeleteOptions{},
  531. )
  532. }
  533. func (a *Agent) GetPorterAgent() (*appsv1.Deployment, error) {
  534. depl, err := a.Clientset.AppsV1().Deployments("porter-agent-system").Get(
  535. context.TODO(),
  536. "porter-agent-controller-manager",
  537. metav1.GetOptions{},
  538. )
  539. if err != nil && errors.IsNotFound(err) {
  540. return nil, IsNotFoundError
  541. }
  542. return depl, err
  543. }
  544. // ListJobsByLabel lists jobs in a namespace matching a label
  545. type Label struct {
  546. Key string
  547. Val string
  548. }
  549. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  550. selectors := make([]string, 0)
  551. for _, label := range labels {
  552. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  553. }
  554. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  555. context.TODO(),
  556. metav1.ListOptions{
  557. LabelSelector: strings.Join(selectors, ","),
  558. },
  559. )
  560. if err != nil {
  561. return nil, err
  562. }
  563. return resp.Items, nil
  564. }
  565. // StreamJobs streams a list of jobs to the websocket writer, closing the connection once all jobs have been sent
  566. func (a *Agent) StreamJobs(namespace string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  567. run := func() error {
  568. errorchan := make(chan error)
  569. ctx, cancel := context.WithCancel(context.Background())
  570. defer cancel()
  571. var wg sync.WaitGroup
  572. var once sync.Once
  573. var err error
  574. wg.Add(2)
  575. go func() {
  576. wg.Wait()
  577. close(errorchan)
  578. }()
  579. go func() {
  580. defer func() {
  581. if r := recover(); r != nil {
  582. // TODO: add method to alert on panic
  583. return
  584. }
  585. }()
  586. // listens for websocket closing handshake
  587. defer wg.Done()
  588. for {
  589. if _, _, err := rw.ReadMessage(); err != nil {
  590. errorchan <- nil
  591. return
  592. }
  593. }
  594. }()
  595. go func() {
  596. defer func() {
  597. if r := recover(); r != nil {
  598. // TODO: add method to alert on panic
  599. return
  600. }
  601. }()
  602. // listens for websocket closing handshake
  603. defer wg.Done()
  604. continueVal := ""
  605. for {
  606. if ctx.Err() != nil {
  607. errorchan <- nil
  608. return
  609. }
  610. jobs, err := a.Clientset.BatchV1().Jobs(namespace).List(
  611. ctx,
  612. metav1.ListOptions{
  613. Limit: 100,
  614. Continue: continueVal,
  615. LabelSelector: "meta.helm.sh/release-name",
  616. },
  617. )
  618. if err != nil {
  619. errorchan <- err
  620. return
  621. }
  622. for _, job := range jobs.Items {
  623. err := rw.WriteJSON(job)
  624. if err != nil {
  625. errorchan <- err
  626. return
  627. }
  628. }
  629. if jobs.Continue == "" {
  630. // we have reached the end of the list of jobs
  631. break
  632. } else {
  633. // start pagination
  634. continueVal = jobs.Continue
  635. }
  636. }
  637. // at this point, we can return the status finished
  638. err := rw.WriteJSON(map[string]interface{}{
  639. "streamStatus": "finished",
  640. })
  641. errorchan <- err
  642. }()
  643. for err = range errorchan {
  644. once.Do(func() {
  645. rw.Close()
  646. cancel()
  647. })
  648. }
  649. return err
  650. }
  651. return a.RunWebsocketTask(run)
  652. }
  653. // DeleteJob deletes the job in the given name and namespace.
  654. func (a *Agent) DeleteJob(name, namespace string) error {
  655. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  656. context.TODO(),
  657. name,
  658. metav1.DeleteOptions{},
  659. )
  660. }
  661. // GetJobPods lists all pods belonging to a job in a namespace
  662. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  663. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  664. context.TODO(),
  665. metav1.ListOptions{
  666. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  667. },
  668. )
  669. if err != nil {
  670. return nil, err
  671. }
  672. return resp.Items, nil
  673. }
  674. // GetIngress gets ingress given the name and namespace
  675. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  676. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  677. context.TODO(),
  678. name,
  679. metav1.GetOptions{},
  680. )
  681. if err != nil && errors.IsNotFound(err) {
  682. return nil, IsNotFoundError
  683. } else if err != nil {
  684. return nil, err
  685. }
  686. return resp, nil
  687. }
  688. var IsNotFoundError = fmt.Errorf("not found")
  689. type BadRequestError struct {
  690. msg string
  691. }
  692. func (e *BadRequestError) Error() string {
  693. return e.msg
  694. }
  695. // GetDeployment gets the deployment given the name and namespace
  696. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  697. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  698. context.TODO(),
  699. c.Name,
  700. metav1.GetOptions{},
  701. )
  702. if err != nil && errors.IsNotFound(err) {
  703. return nil, IsNotFoundError
  704. } else if err != nil {
  705. return nil, err
  706. }
  707. res.Kind = c.Kind
  708. return res, nil
  709. }
  710. // GetStatefulSet gets the statefulset given the name and namespace
  711. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  712. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  713. context.TODO(),
  714. c.Name,
  715. metav1.GetOptions{},
  716. )
  717. if err != nil && errors.IsNotFound(err) {
  718. return nil, IsNotFoundError
  719. } else if err != nil {
  720. return nil, err
  721. }
  722. res.Kind = c.Kind
  723. return res, nil
  724. }
  725. // GetReplicaSet gets the replicaset given the name and namespace
  726. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  727. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  728. context.TODO(),
  729. c.Name,
  730. metav1.GetOptions{},
  731. )
  732. if err != nil && errors.IsNotFound(err) {
  733. return nil, IsNotFoundError
  734. } else if err != nil {
  735. return nil, err
  736. }
  737. res.Kind = c.Kind
  738. return res, nil
  739. }
  740. // GetDaemonSet gets the daemonset by name and namespace
  741. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  742. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  743. context.TODO(),
  744. c.Name,
  745. metav1.GetOptions{},
  746. )
  747. if err != nil && errors.IsNotFound(err) {
  748. return nil, IsNotFoundError
  749. } else if err != nil {
  750. return nil, err
  751. }
  752. res.Kind = c.Kind
  753. return res, nil
  754. }
  755. // GetJob gets the job by name and namespace
  756. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  757. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  758. context.TODO(),
  759. c.Name,
  760. metav1.GetOptions{},
  761. )
  762. if err != nil && errors.IsNotFound(err) {
  763. return nil, IsNotFoundError
  764. } else if err != nil {
  765. return nil, err
  766. }
  767. res.Kind = c.Kind
  768. return res, nil
  769. }
  770. // GetCronJob gets the CronJob by name and namespace
  771. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  772. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  773. context.TODO(),
  774. c.Name,
  775. metav1.GetOptions{},
  776. )
  777. if err != nil && errors.IsNotFound(err) {
  778. return nil, IsNotFoundError
  779. } else if err != nil {
  780. return nil, err
  781. }
  782. res.Kind = c.Kind
  783. return res, nil
  784. }
  785. // GetPodsByLabel retrieves pods with matching labels
  786. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  787. // Search in all namespaces for matching pods
  788. return a.Clientset.CoreV1().Pods(namespace).List(
  789. context.TODO(),
  790. metav1.ListOptions{
  791. LabelSelector: selector,
  792. },
  793. )
  794. }
  795. // GetPodByName retrieves a single instance of pod with given name
  796. func (a *Agent) GetPodByName(name string, namespace string) (*v1.Pod, error) {
  797. // Get pod by name
  798. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  799. context.TODO(),
  800. name,
  801. metav1.GetOptions{},
  802. )
  803. if err != nil && errors.IsNotFound(err) {
  804. return nil, IsNotFoundError
  805. }
  806. if err != nil {
  807. return nil, err
  808. }
  809. return pod, nil
  810. }
  811. // DeletePod deletes a pod by name and namespace
  812. func (a *Agent) DeletePod(namespace string, name string) error {
  813. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  814. context.TODO(),
  815. name,
  816. metav1.DeleteOptions{},
  817. )
  818. if err != nil && errors.IsNotFound(err) {
  819. return IsNotFoundError
  820. }
  821. return err
  822. }
  823. // GetPodLogs streams real-time logs from a given pod.
  824. func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer string, rw *websocket.WebsocketSafeReadWriter) error {
  825. // get the pod to read in the list of contains
  826. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  827. context.Background(),
  828. name,
  829. metav1.GetOptions{},
  830. )
  831. if err != nil && errors.IsNotFound(err) {
  832. return IsNotFoundError
  833. } else if err != nil {
  834. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  835. }
  836. // see if container is ready and able to open a stream. If not, wait for container
  837. // to be ready.
  838. err, _ = a.waitForPod(pod)
  839. if err != nil && goerrors.Is(err, IsNotFoundError) {
  840. return IsNotFoundError
  841. } else if err != nil {
  842. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  843. }
  844. container := pod.Spec.Containers[0].Name
  845. if len(selectedContainer) > 0 {
  846. container = selectedContainer
  847. }
  848. tails := int64(400)
  849. // follow logs
  850. podLogOpts := v1.PodLogOptions{
  851. Follow: true,
  852. TailLines: &tails,
  853. Container: container,
  854. }
  855. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  856. podLogs, err := req.Stream(context.TODO())
  857. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  858. // we'd like to pass this through to the client.
  859. if err != nil && errors.IsBadRequest(err) {
  860. return &BadRequestError{err.Error()}
  861. } else if err != nil {
  862. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  863. }
  864. r := bufio.NewReader(podLogs)
  865. errorchan := make(chan error)
  866. var wg sync.WaitGroup
  867. var once sync.Once
  868. wg.Add(2)
  869. go func() {
  870. wg.Wait()
  871. close(errorchan)
  872. }()
  873. go func() {
  874. defer func() {
  875. if r := recover(); r != nil {
  876. // TODO: add method to alert on panic
  877. return
  878. }
  879. }()
  880. // listens for websocket closing handshake
  881. defer wg.Done()
  882. for {
  883. if _, _, err := rw.ReadMessage(); err != nil {
  884. errorchan <- nil
  885. return
  886. }
  887. }
  888. }()
  889. go func() {
  890. defer func() {
  891. if r := recover(); r != nil {
  892. // TODO: add method to alert on panic
  893. return
  894. }
  895. }()
  896. defer wg.Done()
  897. for {
  898. bytes, err := r.ReadBytes('\n')
  899. if err != nil {
  900. errorchan <- err
  901. return
  902. }
  903. if _, writeErr := rw.Write(bytes); writeErr != nil {
  904. errorchan <- writeErr
  905. return
  906. }
  907. }
  908. }()
  909. for err = range errorchan {
  910. // only call these methods a single time
  911. once.Do(func() {
  912. rw.Close()
  913. podLogs.Close()
  914. })
  915. }
  916. return err
  917. }
  918. // GetPodLogs streams real-time logs from a given pod.
  919. func (a *Agent) GetPreviousPodLogs(namespace string, name string, selectedContainer string) ([]string, error) {
  920. // get the pod to read in the list of contains
  921. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  922. context.Background(),
  923. name,
  924. metav1.GetOptions{},
  925. )
  926. if err != nil && errors.IsNotFound(err) {
  927. return nil, IsNotFoundError
  928. } else if err != nil {
  929. return nil, fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  930. }
  931. container := pod.Spec.Containers[0].Name
  932. if len(selectedContainer) > 0 {
  933. container = selectedContainer
  934. }
  935. tails := int64(400)
  936. // follow logs
  937. podLogOpts := v1.PodLogOptions{
  938. Follow: true,
  939. TailLines: &tails,
  940. Container: container,
  941. Previous: true,
  942. }
  943. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  944. podLogs, err := req.Stream(context.TODO())
  945. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  946. // we'd like to pass this through to the client.
  947. if err != nil && strings.Contains(err.Error(), "not found") {
  948. return nil, IsNotFoundError
  949. }
  950. if err != nil && errors.IsBadRequest(err) {
  951. return nil, &BadRequestError{err.Error()}
  952. } else if err != nil {
  953. return nil, fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  954. }
  955. defer podLogs.Close()
  956. r := bufio.NewReader(podLogs)
  957. var logs []string
  958. for {
  959. line, err := r.ReadString('\n')
  960. logs = append(logs, line)
  961. if err == io.EOF {
  962. break
  963. } else if err != nil {
  964. return nil, err
  965. }
  966. }
  967. return logs, nil
  968. }
  969. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  970. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  971. jobPods, err := a.GetJobPods(namespace, name)
  972. if err != nil {
  973. return err
  974. }
  975. podName := jobPods[0].ObjectMeta.Name
  976. restConf, err := a.RESTClientGetter.ToRESTConfig()
  977. restConf.GroupVersion = &schema.GroupVersion{
  978. Group: "api",
  979. Version: "v1",
  980. }
  981. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  982. restClient, err := rest.RESTClientFor(restConf)
  983. if err != nil {
  984. return err
  985. }
  986. req := restClient.Post().
  987. Resource("pods").
  988. Name(podName).
  989. Namespace(namespace).
  990. SubResource("exec")
  991. req.Param("command", "./signal.sh")
  992. req.Param("container", "sidecar")
  993. req.Param("stdin", "true")
  994. req.Param("stdout", "false")
  995. req.Param("tty", "false")
  996. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  997. if err != nil {
  998. return err
  999. }
  1000. return exec.Stream(remotecommand.StreamOptions{
  1001. Tty: false,
  1002. Stdin: strings.NewReader("./signal.sh"),
  1003. })
  1004. }
  1005. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  1006. // the task some number of times until failing
  1007. func (a *Agent) RunWebsocketTask(task func() error) error {
  1008. lastTime := int64(0)
  1009. for {
  1010. if err := a.UpdateClientset(); err != nil {
  1011. return err
  1012. }
  1013. err := task()
  1014. if err == nil {
  1015. return nil
  1016. }
  1017. if !errors2.Is(err, &AuthError{}) {
  1018. return err
  1019. }
  1020. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  1021. return err
  1022. }
  1023. lastTime = time.Now().Unix()
  1024. }
  1025. }
  1026. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  1027. // TODO: Support Jobs
  1028. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1029. run := func() error {
  1030. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  1031. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  1032. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1033. options.LabelSelector = selectors
  1034. }
  1035. factory := informers.NewSharedInformerFactoryWithOptions(
  1036. a.Clientset,
  1037. 0,
  1038. informers.WithTweakListOptions(tweakListOptionsFunc),
  1039. )
  1040. var informer cache.SharedInformer
  1041. // Spins up an informer depending on kind. Convert to lowercase for robustness
  1042. switch strings.ToLower(kind) {
  1043. case "deployment":
  1044. informer = factory.Apps().V1().Deployments().Informer()
  1045. case "statefulset":
  1046. informer = factory.Apps().V1().StatefulSets().Informer()
  1047. case "replicaset":
  1048. informer = factory.Apps().V1().ReplicaSets().Informer()
  1049. case "daemonset":
  1050. informer = factory.Apps().V1().DaemonSets().Informer()
  1051. case "job":
  1052. informer = factory.Batch().V1().Jobs().Informer()
  1053. case "cronjob":
  1054. informer = factory.Batch().V1beta1().CronJobs().Informer()
  1055. case "namespace":
  1056. informer = factory.Core().V1().Namespaces().Informer()
  1057. case "pod":
  1058. informer = factory.Core().V1().Pods().Informer()
  1059. }
  1060. stopper := make(chan struct{})
  1061. errorchan := make(chan error)
  1062. var wg sync.WaitGroup
  1063. var once sync.Once
  1064. var err error
  1065. wg.Add(2)
  1066. go func() {
  1067. wg.Wait()
  1068. close(errorchan)
  1069. }()
  1070. go func() {
  1071. defer func() {
  1072. if r := recover(); r != nil {
  1073. // TODO: add method to alert on panic
  1074. return
  1075. }
  1076. }()
  1077. // listens for websocket closing handshake
  1078. defer wg.Done()
  1079. for {
  1080. if _, _, err := rw.ReadMessage(); err != nil {
  1081. errorchan <- nil
  1082. return
  1083. }
  1084. }
  1085. }()
  1086. go func() {
  1087. defer func() {
  1088. if r := recover(); r != nil {
  1089. // TODO: add method to alert on panic
  1090. return
  1091. }
  1092. }()
  1093. // listens for websocket closing handshake
  1094. defer wg.Done()
  1095. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1096. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1097. errorchan <- &AuthError{}
  1098. }
  1099. })
  1100. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1101. UpdateFunc: func(oldObj, newObj interface{}) {
  1102. msg := Message{
  1103. EventType: "UPDATE",
  1104. Object: newObj,
  1105. Kind: strings.ToLower(kind),
  1106. }
  1107. err := rw.WriteJSON(msg)
  1108. if err != nil {
  1109. errorchan <- err
  1110. }
  1111. },
  1112. AddFunc: func(obj interface{}) {
  1113. msg := Message{
  1114. EventType: "ADD",
  1115. Object: obj,
  1116. Kind: strings.ToLower(kind),
  1117. }
  1118. err := rw.WriteJSON(msg)
  1119. if err != nil {
  1120. errorchan <- err
  1121. }
  1122. },
  1123. DeleteFunc: func(obj interface{}) {
  1124. msg := Message{
  1125. EventType: "DELETE",
  1126. Object: obj,
  1127. Kind: strings.ToLower(kind),
  1128. }
  1129. err := rw.WriteJSON(msg)
  1130. if err != nil {
  1131. errorchan <- err
  1132. }
  1133. },
  1134. })
  1135. informer.Run(stopper)
  1136. }()
  1137. for err = range errorchan {
  1138. once.Do(func() {
  1139. close(stopper)
  1140. rw.Close()
  1141. })
  1142. }
  1143. return err
  1144. }
  1145. return a.RunWebsocketTask(run)
  1146. }
  1147. var b64 = base64.StdEncoding
  1148. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  1149. func decodeRelease(data string) (*rspb.Release, error) {
  1150. // base64 decode string
  1151. b, err := b64.DecodeString(data)
  1152. if err != nil {
  1153. return nil, err
  1154. }
  1155. // For backwards compatibility with releases that were stored before
  1156. // compression was introduced we skip decompression if the
  1157. // gzip magic header is not found
  1158. if bytes.Equal(b[0:3], magicGzip) {
  1159. r, err := gzip.NewReader(bytes.NewReader(b))
  1160. if err != nil {
  1161. return nil, err
  1162. }
  1163. defer r.Close()
  1164. b2, err := ioutil.ReadAll(r)
  1165. if err != nil {
  1166. return nil, err
  1167. }
  1168. b = b2
  1169. }
  1170. var rls rspb.Release
  1171. // unmarshal release object bytes
  1172. if err := json.Unmarshal(b, &rls); err != nil {
  1173. return nil, err
  1174. }
  1175. return &rls, nil
  1176. }
  1177. func contains(s []string, str string) bool {
  1178. for _, v := range s {
  1179. if v == str {
  1180. return true
  1181. }
  1182. }
  1183. return false
  1184. }
  1185. func ParseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  1186. if secret.Type != "helm.sh/release.v1" {
  1187. return nil, true, nil
  1188. }
  1189. releaseData, ok := secret.Data["release"]
  1190. if !ok {
  1191. return nil, true, fmt.Errorf("release field not found")
  1192. }
  1193. helm_object, err := decodeRelease(string(releaseData))
  1194. if err != nil {
  1195. return nil, true, err
  1196. }
  1197. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  1198. return nil, true, nil
  1199. }
  1200. return helm_object, false, nil
  1201. }
  1202. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1203. run := func() error {
  1204. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1205. options.LabelSelector = selectors
  1206. }
  1207. factory := informers.NewSharedInformerFactoryWithOptions(
  1208. a.Clientset,
  1209. 0,
  1210. informers.WithTweakListOptions(tweakListOptionsFunc),
  1211. informers.WithNamespace(namespace),
  1212. )
  1213. informer := factory.Core().V1().Secrets().Informer()
  1214. stopper := make(chan struct{})
  1215. errorchan := make(chan error)
  1216. var wg sync.WaitGroup
  1217. var once sync.Once
  1218. var err error
  1219. wg.Add(2)
  1220. go func() {
  1221. wg.Wait()
  1222. close(errorchan)
  1223. }()
  1224. go func() {
  1225. defer func() {
  1226. if r := recover(); r != nil {
  1227. // TODO: add method to alert on panic
  1228. return
  1229. }
  1230. }()
  1231. // listens for websocket closing handshake
  1232. defer wg.Done()
  1233. for {
  1234. if _, _, err := rw.ReadMessage(); err != nil {
  1235. errorchan <- nil
  1236. return
  1237. }
  1238. }
  1239. }()
  1240. go func() {
  1241. defer func() {
  1242. if r := recover(); r != nil {
  1243. // TODO: add method to alert on panic
  1244. return
  1245. }
  1246. }()
  1247. // listens for websocket closing handshake
  1248. defer wg.Done()
  1249. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1250. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1251. errorchan <- &AuthError{}
  1252. }
  1253. })
  1254. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1255. UpdateFunc: func(oldObj, newObj interface{}) {
  1256. secretObj, ok := newObj.(*v1.Secret)
  1257. if !ok {
  1258. errorchan <- fmt.Errorf("could not cast to secret")
  1259. return
  1260. }
  1261. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1262. if isNotHelmRelease && err == nil {
  1263. return
  1264. }
  1265. if err != nil {
  1266. errorchan <- err
  1267. return
  1268. }
  1269. msg := Message{
  1270. EventType: "UPDATE",
  1271. Object: helm_object,
  1272. }
  1273. rw.WriteJSON(msg)
  1274. },
  1275. AddFunc: func(obj interface{}) {
  1276. secretObj, ok := obj.(*v1.Secret)
  1277. if !ok {
  1278. errorchan <- fmt.Errorf("could not cast to secret")
  1279. return
  1280. }
  1281. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1282. if isNotHelmRelease && err == nil {
  1283. return
  1284. }
  1285. if err != nil {
  1286. errorchan <- err
  1287. return
  1288. }
  1289. msg := Message{
  1290. EventType: "ADD",
  1291. Object: helm_object,
  1292. }
  1293. rw.WriteJSON(msg)
  1294. },
  1295. DeleteFunc: func(obj interface{}) {
  1296. secretObj, ok := obj.(*v1.Secret)
  1297. if !ok {
  1298. errorchan <- fmt.Errorf("could not cast to secret")
  1299. return
  1300. }
  1301. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1302. if isNotHelmRelease && err == nil {
  1303. return
  1304. }
  1305. if err != nil {
  1306. errorchan <- err
  1307. return
  1308. }
  1309. msg := Message{
  1310. EventType: "DELETE",
  1311. Object: helm_object,
  1312. }
  1313. rw.WriteJSON(msg)
  1314. },
  1315. })
  1316. informer.Run(stopper)
  1317. }()
  1318. for err = range errorchan {
  1319. once.Do(func() {
  1320. close(stopper)
  1321. rw.Close()
  1322. })
  1323. }
  1324. return err
  1325. }
  1326. return a.RunWebsocketTask(run)
  1327. }
  1328. // CreateImagePullSecrets will create the required image pull secrets and
  1329. // return a map from the registry name to the name of the secret.
  1330. func (a *Agent) CreateImagePullSecrets(
  1331. repo repository.Repository,
  1332. namespace string,
  1333. linkedRegs map[string]*models.Registry,
  1334. doAuth *oauth2.Config,
  1335. ) (map[string]string, error) {
  1336. res := make(map[string]string)
  1337. for key, val := range linkedRegs {
  1338. _reg := registry.Registry(*val)
  1339. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1340. if err != nil {
  1341. return nil, err
  1342. }
  1343. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1344. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1345. context.TODO(),
  1346. secretName,
  1347. metav1.GetOptions{},
  1348. )
  1349. // if not found, create the secret
  1350. if err != nil && errors.IsNotFound(err) {
  1351. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1352. context.TODO(),
  1353. &v1.Secret{
  1354. ObjectMeta: metav1.ObjectMeta{
  1355. Name: secretName,
  1356. },
  1357. Data: map[string][]byte{
  1358. string(v1.DockerConfigJsonKey): data,
  1359. },
  1360. Type: v1.SecretTypeDockerConfigJson,
  1361. },
  1362. metav1.CreateOptions{},
  1363. )
  1364. if err != nil {
  1365. return nil, err
  1366. }
  1367. // add secret name to the map
  1368. res[key] = secretName
  1369. continue
  1370. } else if err != nil {
  1371. return nil, err
  1372. }
  1373. // otherwise, check that the secret contains the correct data: if
  1374. // if doesn't, update it
  1375. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1376. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1377. context.TODO(),
  1378. &v1.Secret{
  1379. ObjectMeta: metav1.ObjectMeta{
  1380. Name: secretName,
  1381. },
  1382. Data: map[string][]byte{
  1383. string(v1.DockerConfigJsonKey): data,
  1384. },
  1385. Type: v1.SecretTypeDockerConfigJson,
  1386. },
  1387. metav1.UpdateOptions{},
  1388. )
  1389. if err != nil {
  1390. return nil, err
  1391. }
  1392. }
  1393. // add secret name to the map
  1394. res[key] = secretName
  1395. }
  1396. return res, nil
  1397. }
  1398. // helper that waits for pod to be ready
  1399. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1400. var (
  1401. w watch.Interface
  1402. err error
  1403. ok bool
  1404. )
  1405. // immediately after creating a pod, the API may return a 404. heuristically 1
  1406. // second seems to be plenty.
  1407. watchRetries := 3
  1408. for i := 0; i < watchRetries; i++ {
  1409. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1410. w, err = a.Clientset.CoreV1().
  1411. Pods(pod.Namespace).
  1412. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1413. if err == nil {
  1414. break
  1415. }
  1416. time.Sleep(time.Second)
  1417. }
  1418. if err != nil {
  1419. return err, false
  1420. }
  1421. defer w.Stop()
  1422. expireTime := time.Now().Add(time.Second * 30)
  1423. for time.Now().Unix() <= expireTime.Unix() {
  1424. select {
  1425. case <-time.NewTicker(time.Second).C:
  1426. // poll every second in case we already missed the ready event while
  1427. // creating the listener.
  1428. pod, err = a.Clientset.CoreV1().
  1429. Pods(pod.Namespace).
  1430. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1431. if err != nil && errors.IsNotFound(err) {
  1432. return IsNotFoundError, false
  1433. } else if err != nil {
  1434. return err, false
  1435. }
  1436. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1437. return nil, isExited
  1438. }
  1439. case evt := <-w.ResultChan():
  1440. pod, ok = evt.Object.(*v1.Pod)
  1441. if !ok {
  1442. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1443. }
  1444. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1445. return nil, isExited
  1446. }
  1447. }
  1448. }
  1449. return goerrors.New("timed out waiting for pod"), false
  1450. }
  1451. func isPodReady(pod *v1.Pod) bool {
  1452. ready := false
  1453. conditions := pod.Status.Conditions
  1454. for i := range conditions {
  1455. if conditions[i].Type == v1.PodReady {
  1456. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1457. }
  1458. }
  1459. return ready
  1460. }
  1461. func isPodExited(pod *v1.Pod) bool {
  1462. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1463. }