agent.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. goerrors "errors"
  17. "github.com/porter-dev/porter/api/server/shared/websocket"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/registry"
  20. "github.com/porter-dev/porter/internal/repository"
  21. "golang.org/x/oauth2"
  22. errors2 "errors"
  23. "github.com/porter-dev/porter/internal/helm/grapher"
  24. appsv1 "k8s.io/api/apps/v1"
  25. batchv1 "k8s.io/api/batch/v1"
  26. batchv1beta1 "k8s.io/api/batch/v1beta1"
  27. v1 "k8s.io/api/core/v1"
  28. v1beta1 "k8s.io/api/extensions/v1beta1"
  29. "k8s.io/apimachinery/pkg/api/errors"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/runtime"
  33. "k8s.io/apimachinery/pkg/runtime/schema"
  34. "k8s.io/apimachinery/pkg/types"
  35. "k8s.io/apimachinery/pkg/watch"
  36. "k8s.io/cli-runtime/pkg/genericclioptions"
  37. "k8s.io/client-go/informers"
  38. "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/rest"
  40. "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/remotecommand"
  42. rspb "helm.sh/helm/v3/pkg/release"
  43. )
  44. // Agent is a Kubernetes agent for performing operations that interact with the
  45. // api server
  46. type Agent struct {
  47. RESTClientGetter genericclioptions.RESTClientGetter
  48. Clientset kubernetes.Interface
  49. }
  50. type Message struct {
  51. EventType string `json:"event_type"`
  52. Object interface{}
  53. Kind string
  54. }
  55. type ListOptions struct {
  56. FieldSelector string
  57. }
  58. type AuthError struct{}
  59. func (e *AuthError) Error() string {
  60. return "Unauthorized error"
  61. }
  62. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  63. func (a *Agent) UpdateClientset() error {
  64. restConf, err := a.RESTClientGetter.ToRESTConfig()
  65. if err != nil {
  66. return err
  67. }
  68. clientset, err := kubernetes.NewForConfig(restConf)
  69. if err != nil {
  70. return err
  71. }
  72. a.Clientset = clientset
  73. return nil
  74. }
  75. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  76. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  77. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  78. context.TODO(),
  79. &v1.ConfigMap{
  80. ObjectMeta: metav1.ObjectMeta{
  81. Name: name,
  82. Namespace: namespace,
  83. Labels: map[string]string{
  84. "porter": "true",
  85. },
  86. },
  87. Data: configMap,
  88. },
  89. metav1.CreateOptions{},
  90. )
  91. }
  92. func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
  93. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  94. context.TODO(),
  95. &v1.ConfigMap{
  96. ObjectMeta: metav1.ObjectMeta{
  97. Name: fmt.Sprintf("%s.v%d", name, version),
  98. Namespace: namespace,
  99. Labels: map[string]string{
  100. "owner": "porter",
  101. "envgroup": name,
  102. "version": fmt.Sprintf("%d", version),
  103. },
  104. Annotations: map[string]string{
  105. PorterAppAnnotationName: strings.Join(apps, ","),
  106. },
  107. },
  108. Data: configMap,
  109. },
  110. metav1.CreateOptions{},
  111. )
  112. }
  113. const PorterAppAnnotationName = "porter.run/apps"
  114. func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  115. annons := cm.Annotations
  116. if annons == nil {
  117. annons = make(map[string]string)
  118. }
  119. appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
  120. if !appAnnonExists || appAnnon == "" {
  121. annons[PorterAppAnnotationName] = appName
  122. } else {
  123. appStrArr := strings.Split(appAnnon, ",")
  124. foundApp := false
  125. for _, appStr := range appStrArr {
  126. if appStr == appName {
  127. foundApp = true
  128. }
  129. }
  130. if !foundApp {
  131. annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
  132. }
  133. }
  134. cm.SetAnnotations(annons)
  135. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  136. context.TODO(),
  137. cm,
  138. metav1.UpdateOptions{},
  139. )
  140. }
  141. func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  142. annons := cm.Annotations
  143. if annons == nil {
  144. annons = make(map[string]string)
  145. }
  146. appAnn, appAnnExists := annons[PorterAppAnnotationName]
  147. if !appAnnExists {
  148. return nil, IsNotFoundError
  149. }
  150. appStrArr := strings.Split(appAnn, ",")
  151. newStrArr := make([]string, 0)
  152. for _, appStr := range appStrArr {
  153. if appStr != appName {
  154. newStrArr = append(newStrArr, appStr)
  155. }
  156. }
  157. annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
  158. cm.SetAnnotations(annons)
  159. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  160. context.TODO(),
  161. cm,
  162. metav1.UpdateOptions{},
  163. )
  164. }
  165. func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
  166. return a.Clientset.CoreV1().Secrets(namespace).Create(
  167. context.TODO(),
  168. &v1.Secret{
  169. ObjectMeta: metav1.ObjectMeta{
  170. Name: fmt.Sprintf("%s.v%d", name, version),
  171. Namespace: namespace,
  172. Labels: map[string]string{
  173. "owner": "porter",
  174. "envgroup": name,
  175. "version": fmt.Sprintf("%d", version),
  176. "configmap": cmName,
  177. },
  178. },
  179. Data: data,
  180. },
  181. metav1.CreateOptions{},
  182. )
  183. }
  184. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  185. // base64 encoded
  186. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  187. return a.Clientset.CoreV1().Secrets(namespace).Create(
  188. context.TODO(),
  189. &v1.Secret{
  190. ObjectMeta: metav1.ObjectMeta{
  191. Name: name,
  192. Namespace: namespace,
  193. Labels: map[string]string{
  194. "porter": "true",
  195. "configmap": cmName,
  196. },
  197. },
  198. Data: data,
  199. },
  200. metav1.CreateOptions{},
  201. )
  202. }
  203. type mergeConfigMapData struct {
  204. Data map[string]*string `json:"data"`
  205. }
  206. // UpdateConfigMap updates the configmap given its name and namespace
  207. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  208. cmData := make(map[string]*string)
  209. for key, val := range configMap {
  210. valCopy := val
  211. cmData[key] = &valCopy
  212. if len(val) == 0 {
  213. cmData[key] = nil
  214. }
  215. }
  216. mergeCM := &mergeConfigMapData{
  217. Data: cmData,
  218. }
  219. patchBytes, err := json.Marshal(mergeCM)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  224. context.Background(),
  225. name,
  226. types.MergePatchType,
  227. patchBytes,
  228. metav1.PatchOptions{},
  229. )
  230. }
  231. type mergeLinkedSecretData struct {
  232. Data map[string]*[]byte `json:"data"`
  233. }
  234. // UpdateLinkedSecret updates the secret given its name and namespace
  235. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  236. secretData := make(map[string]*[]byte)
  237. for key, val := range data {
  238. valCopy := val
  239. secretData[key] = &valCopy
  240. if len(val) == 0 {
  241. secretData[key] = nil
  242. }
  243. }
  244. mergeSecret := &mergeLinkedSecretData{
  245. Data: secretData,
  246. }
  247. patchBytes, err := json.Marshal(mergeSecret)
  248. if err != nil {
  249. return err
  250. }
  251. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  252. context.TODO(),
  253. name,
  254. types.MergePatchType,
  255. patchBytes,
  256. metav1.PatchOptions{},
  257. )
  258. return err
  259. }
  260. // DeleteConfigMap deletes the configmap given its name and namespace
  261. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  262. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  263. context.TODO(),
  264. name,
  265. metav1.DeleteOptions{},
  266. )
  267. }
  268. // DeleteLinkedSecret deletes the secret given its name and namespace
  269. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  270. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  271. context.TODO(),
  272. name,
  273. metav1.DeleteOptions{},
  274. )
  275. }
  276. func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
  277. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  278. context.Background(),
  279. metav1.ListOptions{
  280. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  281. },
  282. )
  283. if err != nil {
  284. return nil, err
  285. }
  286. return listResp.Items, nil
  287. }
  288. func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
  289. return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
  290. context.Background(),
  291. metav1.DeleteOptions{},
  292. metav1.ListOptions{
  293. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  294. },
  295. )
  296. }
  297. func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
  298. return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
  299. context.Background(),
  300. metav1.DeleteOptions{},
  301. metav1.ListOptions{
  302. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  303. },
  304. )
  305. }
  306. func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
  307. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  308. context.Background(),
  309. metav1.ListOptions{
  310. LabelSelector: fmt.Sprintf("envgroup"),
  311. },
  312. )
  313. if err != nil {
  314. return nil, err
  315. }
  316. // only keep the latest version for each configmap
  317. latestMap := make(map[string]v1.ConfigMap)
  318. for _, configmap := range listResp.Items {
  319. egName, egNameExists := configmap.Labels["envgroup"]
  320. if !egNameExists {
  321. continue
  322. }
  323. id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
  324. if currLatest, exists := latestMap[id]; exists {
  325. // get version
  326. currVersionStr, currVersionExists := currLatest.Labels["version"]
  327. versionStr, versionExists := configmap.Labels["version"]
  328. if versionExists && currVersionExists {
  329. currVersion, currErr := strconv.Atoi(currVersionStr)
  330. version, err := strconv.Atoi(versionStr)
  331. if currErr == nil && err == nil && currVersion < version {
  332. latestMap[id] = configmap
  333. }
  334. }
  335. } else {
  336. latestMap[id] = configmap
  337. }
  338. }
  339. res := make([]v1.ConfigMap, 0)
  340. for _, cm := range latestMap {
  341. res = append(res, cm)
  342. }
  343. return res, nil
  344. }
  345. // GetConfigMap retrieves the configmap given its name and namespace
  346. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  347. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  348. context.TODO(),
  349. name,
  350. metav1.GetOptions{},
  351. )
  352. }
  353. func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
  354. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  355. context.Background(),
  356. metav1.ListOptions{
  357. LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
  358. },
  359. )
  360. if err != nil {
  361. return nil, err
  362. }
  363. if listResp.Items == nil || len(listResp.Items) == 0 {
  364. return nil, IsNotFoundError
  365. }
  366. // if the length of the list is greater than 1, return an error -- this shouldn't happen
  367. if len(listResp.Items) > 1 {
  368. return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
  369. }
  370. return &listResp.Items[0], nil
  371. }
  372. func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
  373. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  374. context.Background(),
  375. metav1.ListOptions{
  376. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  377. },
  378. )
  379. if err != nil {
  380. return nil, 0, err
  381. }
  382. if listResp.Items == nil || len(listResp.Items) == 0 {
  383. return nil, 0, IsNotFoundError
  384. }
  385. // iterate through the configmaps and get the greatest version
  386. var res *v1.ConfigMap
  387. var latestVersion uint
  388. for _, configmap := range listResp.Items {
  389. if res == nil {
  390. versionStr, versionExists := configmap.Labels["version"]
  391. if !versionExists {
  392. continue
  393. }
  394. version, err := strconv.Atoi(versionStr)
  395. if err != nil {
  396. continue
  397. }
  398. latestV := configmap
  399. res = &latestV
  400. latestVersion = uint(version)
  401. } else {
  402. // get version
  403. versionStr, versionExists := configmap.Labels["version"]
  404. currVersionStr, currVersionExists := res.Labels["version"]
  405. if versionExists && currVersionExists {
  406. currVersion, currErr := strconv.Atoi(currVersionStr)
  407. version, err := strconv.Atoi(versionStr)
  408. if currErr == nil && err == nil && currVersion < version {
  409. latestV := configmap
  410. res = &latestV
  411. latestVersion = uint(version)
  412. }
  413. }
  414. }
  415. }
  416. if res == nil {
  417. return nil, 0, IsNotFoundError
  418. }
  419. return res, latestVersion, nil
  420. }
  421. func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
  422. listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
  423. context.Background(),
  424. metav1.ListOptions{
  425. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  426. },
  427. )
  428. if err != nil {
  429. return nil, 0, err
  430. }
  431. if listResp.Items == nil || len(listResp.Items) == 0 {
  432. return nil, 0, IsNotFoundError
  433. }
  434. // iterate through the configmaps and get the greatest version
  435. var res *v1.Secret
  436. var latestVersion uint
  437. for _, secret := range listResp.Items {
  438. if res == nil {
  439. versionStr, versionExists := secret.Labels["version"]
  440. if !versionExists {
  441. continue
  442. }
  443. version, err := strconv.Atoi(versionStr)
  444. if err != nil {
  445. continue
  446. }
  447. latestV := secret
  448. res = &latestV
  449. latestVersion = uint(version)
  450. } else {
  451. // get version
  452. versionStr, versionExists := secret.Labels["version"]
  453. currVersionStr, currVersionExists := res.Labels["version"]
  454. if versionExists && currVersionExists {
  455. currVersion, currErr := strconv.Atoi(currVersionStr)
  456. version, err := strconv.Atoi(versionStr)
  457. if currErr == nil && err == nil && currVersion < version {
  458. latestV := secret
  459. res = &latestV
  460. latestVersion = uint(version)
  461. }
  462. }
  463. }
  464. }
  465. if res == nil {
  466. return nil, 0, IsNotFoundError
  467. }
  468. return res, latestVersion, nil
  469. }
  470. // GetSecret retrieves the secret given its name and namespace
  471. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  472. return a.Clientset.CoreV1().Secrets(namespace).Get(
  473. context.TODO(),
  474. name,
  475. metav1.GetOptions{},
  476. )
  477. }
  478. // ListConfigMaps simply lists namespaces
  479. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  480. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  481. context.TODO(),
  482. metav1.ListOptions{
  483. LabelSelector: "porter",
  484. },
  485. )
  486. }
  487. // ListEvents lists the events of a given object.
  488. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  489. return a.Clientset.CoreV1().Events(namespace).List(
  490. context.TODO(),
  491. metav1.ListOptions{
  492. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  493. },
  494. )
  495. }
  496. // ListNamespaces simply lists namespaces
  497. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  498. return a.Clientset.CoreV1().Namespaces().List(
  499. context.TODO(),
  500. metav1.ListOptions{},
  501. )
  502. }
  503. // CreateNamespace creates a namespace with the given name.
  504. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  505. // check if namespace exists
  506. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  507. context.TODO(),
  508. name,
  509. metav1.GetOptions{},
  510. )
  511. if err == nil && checkNS != nil {
  512. return checkNS, nil
  513. }
  514. namespace := v1.Namespace{
  515. ObjectMeta: metav1.ObjectMeta{
  516. Name: name,
  517. },
  518. }
  519. return a.Clientset.CoreV1().Namespaces().Create(
  520. context.TODO(),
  521. &namespace,
  522. metav1.CreateOptions{},
  523. )
  524. }
  525. // DeleteNamespace deletes the namespace given the name.
  526. func (a *Agent) DeleteNamespace(name string) error {
  527. return a.Clientset.CoreV1().Namespaces().Delete(
  528. context.TODO(),
  529. name,
  530. metav1.DeleteOptions{},
  531. )
  532. }
  533. func (a *Agent) GetPorterAgent() (*appsv1.Deployment, error) {
  534. depl, err := a.Clientset.AppsV1().Deployments("porter-agent-system").Get(
  535. context.TODO(),
  536. "porter-agent-controller-manager",
  537. metav1.GetOptions{},
  538. )
  539. if err != nil && errors.IsNotFound(err) {
  540. return nil, IsNotFoundError
  541. }
  542. return depl, err
  543. }
  544. // ListJobsByLabel lists jobs in a namespace matching a label
  545. type Label struct {
  546. Key string
  547. Val string
  548. }
  549. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  550. selectors := make([]string, 0)
  551. for _, label := range labels {
  552. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  553. }
  554. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  555. context.TODO(),
  556. metav1.ListOptions{
  557. LabelSelector: strings.Join(selectors, ","),
  558. },
  559. )
  560. if err != nil {
  561. return nil, err
  562. }
  563. return resp.Items, nil
  564. }
  565. // StreamJobs streams a list of jobs to the websocket writer, closing the connection once all jobs have been sent
  566. func (a *Agent) StreamJobs(namespace string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  567. run := func() error {
  568. errorchan := make(chan error)
  569. ctx, cancel := context.WithCancel(context.Background())
  570. defer cancel()
  571. var wg sync.WaitGroup
  572. var once sync.Once
  573. var err error
  574. wg.Add(2)
  575. go func() {
  576. wg.Wait()
  577. close(errorchan)
  578. }()
  579. go func() {
  580. defer func() {
  581. if r := recover(); r != nil {
  582. // TODO: add method to alert on panic
  583. return
  584. }
  585. }()
  586. // listens for websocket closing handshake
  587. defer wg.Done()
  588. for {
  589. if _, _, err := rw.ReadMessage(); err != nil {
  590. errorchan <- nil
  591. return
  592. }
  593. }
  594. }()
  595. go func() {
  596. defer func() {
  597. if r := recover(); r != nil {
  598. // TODO: add method to alert on panic
  599. return
  600. }
  601. }()
  602. // listens for websocket closing handshake
  603. defer wg.Done()
  604. continueVal := ""
  605. for {
  606. if ctx.Err() != nil {
  607. errorchan <- nil
  608. return
  609. }
  610. labelSelector := "meta.helm.sh/release-name"
  611. if selectors != "" {
  612. labelSelector = selectors
  613. }
  614. jobs, err := a.Clientset.BatchV1().Jobs(namespace).List(
  615. ctx,
  616. metav1.ListOptions{
  617. Limit: 100,
  618. Continue: continueVal,
  619. LabelSelector: labelSelector,
  620. },
  621. )
  622. if err != nil {
  623. errorchan <- err
  624. return
  625. }
  626. for _, job := range jobs.Items {
  627. err := rw.WriteJSON(job)
  628. if err != nil {
  629. errorchan <- err
  630. return
  631. }
  632. }
  633. if jobs.Continue == "" {
  634. // we have reached the end of the list of jobs
  635. break
  636. } else {
  637. // start pagination
  638. continueVal = jobs.Continue
  639. }
  640. }
  641. // at this point, we can return the status finished
  642. err := rw.WriteJSON(map[string]interface{}{
  643. "streamStatus": "finished",
  644. })
  645. errorchan <- err
  646. }()
  647. for err = range errorchan {
  648. once.Do(func() {
  649. rw.Close()
  650. cancel()
  651. })
  652. }
  653. return err
  654. }
  655. return a.RunWebsocketTask(run)
  656. }
  657. // DeleteJob deletes the job in the given name and namespace.
  658. func (a *Agent) DeleteJob(name, namespace string) error {
  659. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  660. context.TODO(),
  661. name,
  662. metav1.DeleteOptions{},
  663. )
  664. }
  665. // GetJobPods lists all pods belonging to a job in a namespace
  666. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  667. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  668. context.TODO(),
  669. metav1.ListOptions{
  670. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  671. },
  672. )
  673. if err != nil {
  674. return nil, err
  675. }
  676. return resp.Items, nil
  677. }
  678. // GetIngress gets ingress given the name and namespace
  679. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  680. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  681. context.TODO(),
  682. name,
  683. metav1.GetOptions{},
  684. )
  685. if err != nil && errors.IsNotFound(err) {
  686. return nil, IsNotFoundError
  687. } else if err != nil {
  688. return nil, err
  689. }
  690. return resp, nil
  691. }
  692. var IsNotFoundError = fmt.Errorf("not found")
  693. type BadRequestError struct {
  694. msg string
  695. }
  696. func (e *BadRequestError) Error() string {
  697. return e.msg
  698. }
  699. // GetDeployment gets the deployment given the name and namespace
  700. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  701. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  702. context.TODO(),
  703. c.Name,
  704. metav1.GetOptions{},
  705. )
  706. if err != nil && errors.IsNotFound(err) {
  707. return nil, IsNotFoundError
  708. } else if err != nil {
  709. return nil, err
  710. }
  711. res.Kind = c.Kind
  712. return res, nil
  713. }
  714. // GetStatefulSet gets the statefulset given the name and namespace
  715. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  716. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  717. context.TODO(),
  718. c.Name,
  719. metav1.GetOptions{},
  720. )
  721. if err != nil && errors.IsNotFound(err) {
  722. return nil, IsNotFoundError
  723. } else if err != nil {
  724. return nil, err
  725. }
  726. res.Kind = c.Kind
  727. return res, nil
  728. }
  729. // GetReplicaSet gets the replicaset given the name and namespace
  730. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  731. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  732. context.TODO(),
  733. c.Name,
  734. metav1.GetOptions{},
  735. )
  736. if err != nil && errors.IsNotFound(err) {
  737. return nil, IsNotFoundError
  738. } else if err != nil {
  739. return nil, err
  740. }
  741. res.Kind = c.Kind
  742. return res, nil
  743. }
  744. // GetDaemonSet gets the daemonset by name and namespace
  745. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  746. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  747. context.TODO(),
  748. c.Name,
  749. metav1.GetOptions{},
  750. )
  751. if err != nil && errors.IsNotFound(err) {
  752. return nil, IsNotFoundError
  753. } else if err != nil {
  754. return nil, err
  755. }
  756. res.Kind = c.Kind
  757. return res, nil
  758. }
  759. // GetJob gets the job by name and namespace
  760. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  761. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  762. context.TODO(),
  763. c.Name,
  764. metav1.GetOptions{},
  765. )
  766. if err != nil && errors.IsNotFound(err) {
  767. return nil, IsNotFoundError
  768. } else if err != nil {
  769. return nil, err
  770. }
  771. res.Kind = c.Kind
  772. return res, nil
  773. }
  774. // GetCronJob gets the CronJob by name and namespace
  775. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  776. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  777. context.TODO(),
  778. c.Name,
  779. metav1.GetOptions{},
  780. )
  781. if err != nil && errors.IsNotFound(err) {
  782. return nil, IsNotFoundError
  783. } else if err != nil {
  784. return nil, err
  785. }
  786. res.Kind = c.Kind
  787. return res, nil
  788. }
  789. // GetPodsByLabel retrieves pods with matching labels
  790. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  791. // Search in all namespaces for matching pods
  792. return a.Clientset.CoreV1().Pods(namespace).List(
  793. context.TODO(),
  794. metav1.ListOptions{
  795. LabelSelector: selector,
  796. },
  797. )
  798. }
  799. // GetPodByName retrieves a single instance of pod with given name
  800. func (a *Agent) GetPodByName(name string, namespace string) (*v1.Pod, error) {
  801. // Get pod by name
  802. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  803. context.TODO(),
  804. name,
  805. metav1.GetOptions{},
  806. )
  807. if err != nil && errors.IsNotFound(err) {
  808. return nil, IsNotFoundError
  809. }
  810. if err != nil {
  811. return nil, err
  812. }
  813. return pod, nil
  814. }
  815. // DeletePod deletes a pod by name and namespace
  816. func (a *Agent) DeletePod(namespace string, name string) error {
  817. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  818. context.TODO(),
  819. name,
  820. metav1.DeleteOptions{},
  821. )
  822. if err != nil && errors.IsNotFound(err) {
  823. return IsNotFoundError
  824. }
  825. return err
  826. }
  827. // GetPodLogs streams real-time logs from a given pod.
  828. func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer string, rw *websocket.WebsocketSafeReadWriter) error {
  829. // get the pod to read in the list of contains
  830. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  831. context.Background(),
  832. name,
  833. metav1.GetOptions{},
  834. )
  835. if err != nil && errors.IsNotFound(err) {
  836. return IsNotFoundError
  837. } else if err != nil {
  838. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  839. }
  840. // see if container is ready and able to open a stream. If not, wait for container
  841. // to be ready.
  842. err, _ = a.waitForPod(pod)
  843. if err != nil && goerrors.Is(err, IsNotFoundError) {
  844. return IsNotFoundError
  845. } else if err != nil {
  846. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  847. }
  848. container := pod.Spec.Containers[0].Name
  849. if len(selectedContainer) > 0 {
  850. container = selectedContainer
  851. }
  852. tails := int64(400)
  853. // follow logs
  854. podLogOpts := v1.PodLogOptions{
  855. Follow: true,
  856. TailLines: &tails,
  857. Container: container,
  858. }
  859. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  860. podLogs, err := req.Stream(context.TODO())
  861. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  862. // we'd like to pass this through to the client.
  863. if err != nil && errors.IsBadRequest(err) {
  864. return &BadRequestError{err.Error()}
  865. } else if err != nil {
  866. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  867. }
  868. r := bufio.NewReader(podLogs)
  869. errorchan := make(chan error)
  870. var wg sync.WaitGroup
  871. var once sync.Once
  872. wg.Add(2)
  873. go func() {
  874. wg.Wait()
  875. close(errorchan)
  876. }()
  877. go func() {
  878. defer func() {
  879. if r := recover(); r != nil {
  880. // TODO: add method to alert on panic
  881. return
  882. }
  883. }()
  884. // listens for websocket closing handshake
  885. defer wg.Done()
  886. for {
  887. if _, _, err := rw.ReadMessage(); err != nil {
  888. errorchan <- nil
  889. return
  890. }
  891. }
  892. }()
  893. go func() {
  894. defer func() {
  895. if r := recover(); r != nil {
  896. // TODO: add method to alert on panic
  897. return
  898. }
  899. }()
  900. defer wg.Done()
  901. for {
  902. bytes, err := r.ReadBytes('\n')
  903. if err != nil {
  904. errorchan <- err
  905. return
  906. }
  907. if _, writeErr := rw.Write(bytes); writeErr != nil {
  908. errorchan <- writeErr
  909. return
  910. }
  911. }
  912. }()
  913. for err = range errorchan {
  914. // only call these methods a single time
  915. once.Do(func() {
  916. rw.Close()
  917. podLogs.Close()
  918. })
  919. }
  920. return err
  921. }
  922. // GetPodLogs streams real-time logs from a given pod.
  923. func (a *Agent) GetPreviousPodLogs(namespace string, name string, selectedContainer string) ([]string, error) {
  924. // get the pod to read in the list of contains
  925. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  926. context.Background(),
  927. name,
  928. metav1.GetOptions{},
  929. )
  930. if err != nil && errors.IsNotFound(err) {
  931. return nil, IsNotFoundError
  932. } else if err != nil {
  933. return nil, fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  934. }
  935. container := pod.Spec.Containers[0].Name
  936. if len(selectedContainer) > 0 {
  937. container = selectedContainer
  938. }
  939. tails := int64(400)
  940. // follow logs
  941. podLogOpts := v1.PodLogOptions{
  942. Follow: true,
  943. TailLines: &tails,
  944. Container: container,
  945. Previous: true,
  946. }
  947. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  948. podLogs, err := req.Stream(context.TODO())
  949. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  950. // we'd like to pass this through to the client.
  951. if err != nil && strings.Contains(err.Error(), "not found") {
  952. return nil, IsNotFoundError
  953. }
  954. if err != nil && errors.IsBadRequest(err) {
  955. return nil, &BadRequestError{err.Error()}
  956. } else if err != nil {
  957. return nil, fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  958. }
  959. defer podLogs.Close()
  960. r := bufio.NewReader(podLogs)
  961. var logs []string
  962. for {
  963. line, err := r.ReadString('\n')
  964. logs = append(logs, line)
  965. if err == io.EOF {
  966. break
  967. } else if err != nil {
  968. return nil, err
  969. }
  970. }
  971. return logs, nil
  972. }
  973. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  974. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  975. jobPods, err := a.GetJobPods(namespace, name)
  976. if err != nil {
  977. return err
  978. }
  979. podName := jobPods[0].ObjectMeta.Name
  980. restConf, err := a.RESTClientGetter.ToRESTConfig()
  981. restConf.GroupVersion = &schema.GroupVersion{
  982. Group: "api",
  983. Version: "v1",
  984. }
  985. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  986. restClient, err := rest.RESTClientFor(restConf)
  987. if err != nil {
  988. return err
  989. }
  990. req := restClient.Post().
  991. Resource("pods").
  992. Name(podName).
  993. Namespace(namespace).
  994. SubResource("exec")
  995. req.Param("command", "./signal.sh")
  996. req.Param("container", "sidecar")
  997. req.Param("stdin", "true")
  998. req.Param("stdout", "false")
  999. req.Param("tty", "false")
  1000. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  1001. if err != nil {
  1002. return err
  1003. }
  1004. return exec.Stream(remotecommand.StreamOptions{
  1005. Tty: false,
  1006. Stdin: strings.NewReader("./signal.sh"),
  1007. })
  1008. }
  1009. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  1010. // the task some number of times until failing
  1011. func (a *Agent) RunWebsocketTask(task func() error) error {
  1012. lastTime := int64(0)
  1013. for {
  1014. if err := a.UpdateClientset(); err != nil {
  1015. return err
  1016. }
  1017. err := task()
  1018. if err == nil {
  1019. return nil
  1020. }
  1021. if !errors2.Is(err, &AuthError{}) {
  1022. return err
  1023. }
  1024. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  1025. return err
  1026. }
  1027. lastTime = time.Now().Unix()
  1028. }
  1029. }
  1030. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  1031. // TODO: Support Jobs
  1032. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1033. run := func() error {
  1034. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  1035. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  1036. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1037. options.LabelSelector = selectors
  1038. }
  1039. factory := informers.NewSharedInformerFactoryWithOptions(
  1040. a.Clientset,
  1041. 0,
  1042. informers.WithTweakListOptions(tweakListOptionsFunc),
  1043. )
  1044. var informer cache.SharedInformer
  1045. // Spins up an informer depending on kind. Convert to lowercase for robustness
  1046. switch strings.ToLower(kind) {
  1047. case "deployment":
  1048. informer = factory.Apps().V1().Deployments().Informer()
  1049. case "statefulset":
  1050. informer = factory.Apps().V1().StatefulSets().Informer()
  1051. case "replicaset":
  1052. informer = factory.Apps().V1().ReplicaSets().Informer()
  1053. case "daemonset":
  1054. informer = factory.Apps().V1().DaemonSets().Informer()
  1055. case "job":
  1056. informer = factory.Batch().V1().Jobs().Informer()
  1057. case "cronjob":
  1058. informer = factory.Batch().V1beta1().CronJobs().Informer()
  1059. case "namespace":
  1060. informer = factory.Core().V1().Namespaces().Informer()
  1061. case "pod":
  1062. informer = factory.Core().V1().Pods().Informer()
  1063. }
  1064. stopper := make(chan struct{})
  1065. errorchan := make(chan error)
  1066. var wg sync.WaitGroup
  1067. var once sync.Once
  1068. var err error
  1069. wg.Add(2)
  1070. go func() {
  1071. wg.Wait()
  1072. close(errorchan)
  1073. }()
  1074. go func() {
  1075. defer func() {
  1076. if r := recover(); r != nil {
  1077. // TODO: add method to alert on panic
  1078. return
  1079. }
  1080. }()
  1081. // listens for websocket closing handshake
  1082. defer wg.Done()
  1083. for {
  1084. if _, _, err := rw.ReadMessage(); err != nil {
  1085. errorchan <- nil
  1086. return
  1087. }
  1088. }
  1089. }()
  1090. go func() {
  1091. defer func() {
  1092. if r := recover(); r != nil {
  1093. // TODO: add method to alert on panic
  1094. return
  1095. }
  1096. }()
  1097. // listens for websocket closing handshake
  1098. defer wg.Done()
  1099. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1100. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1101. errorchan <- &AuthError{}
  1102. }
  1103. })
  1104. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1105. UpdateFunc: func(oldObj, newObj interface{}) {
  1106. msg := Message{
  1107. EventType: "UPDATE",
  1108. Object: newObj,
  1109. Kind: strings.ToLower(kind),
  1110. }
  1111. err := rw.WriteJSON(msg)
  1112. if err != nil {
  1113. errorchan <- err
  1114. }
  1115. },
  1116. AddFunc: func(obj interface{}) {
  1117. msg := Message{
  1118. EventType: "ADD",
  1119. Object: obj,
  1120. Kind: strings.ToLower(kind),
  1121. }
  1122. err := rw.WriteJSON(msg)
  1123. if err != nil {
  1124. errorchan <- err
  1125. }
  1126. },
  1127. DeleteFunc: func(obj interface{}) {
  1128. msg := Message{
  1129. EventType: "DELETE",
  1130. Object: obj,
  1131. Kind: strings.ToLower(kind),
  1132. }
  1133. err := rw.WriteJSON(msg)
  1134. if err != nil {
  1135. errorchan <- err
  1136. }
  1137. },
  1138. })
  1139. informer.Run(stopper)
  1140. }()
  1141. for err = range errorchan {
  1142. once.Do(func() {
  1143. close(stopper)
  1144. rw.Close()
  1145. })
  1146. }
  1147. return err
  1148. }
  1149. return a.RunWebsocketTask(run)
  1150. }
  1151. var b64 = base64.StdEncoding
  1152. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  1153. func decodeRelease(data string) (*rspb.Release, error) {
  1154. // base64 decode string
  1155. b, err := b64.DecodeString(data)
  1156. if err != nil {
  1157. return nil, err
  1158. }
  1159. // For backwards compatibility with releases that were stored before
  1160. // compression was introduced we skip decompression if the
  1161. // gzip magic header is not found
  1162. if bytes.Equal(b[0:3], magicGzip) {
  1163. r, err := gzip.NewReader(bytes.NewReader(b))
  1164. if err != nil {
  1165. return nil, err
  1166. }
  1167. defer r.Close()
  1168. b2, err := ioutil.ReadAll(r)
  1169. if err != nil {
  1170. return nil, err
  1171. }
  1172. b = b2
  1173. }
  1174. var rls rspb.Release
  1175. // unmarshal release object bytes
  1176. if err := json.Unmarshal(b, &rls); err != nil {
  1177. return nil, err
  1178. }
  1179. return &rls, nil
  1180. }
  1181. func contains(s []string, str string) bool {
  1182. for _, v := range s {
  1183. if v == str {
  1184. return true
  1185. }
  1186. }
  1187. return false
  1188. }
  1189. func ParseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  1190. if secret.Type != "helm.sh/release.v1" {
  1191. return nil, true, nil
  1192. }
  1193. releaseData, ok := secret.Data["release"]
  1194. if !ok {
  1195. return nil, true, fmt.Errorf("release field not found")
  1196. }
  1197. helm_object, err := decodeRelease(string(releaseData))
  1198. if err != nil {
  1199. return nil, true, err
  1200. }
  1201. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  1202. return nil, true, nil
  1203. }
  1204. return helm_object, false, nil
  1205. }
  1206. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1207. run := func() error {
  1208. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1209. options.LabelSelector = selectors
  1210. }
  1211. factory := informers.NewSharedInformerFactoryWithOptions(
  1212. a.Clientset,
  1213. 0,
  1214. informers.WithTweakListOptions(tweakListOptionsFunc),
  1215. informers.WithNamespace(namespace),
  1216. )
  1217. informer := factory.Core().V1().Secrets().Informer()
  1218. stopper := make(chan struct{})
  1219. errorchan := make(chan error)
  1220. var wg sync.WaitGroup
  1221. var once sync.Once
  1222. var err error
  1223. wg.Add(2)
  1224. go func() {
  1225. wg.Wait()
  1226. close(errorchan)
  1227. }()
  1228. go func() {
  1229. defer func() {
  1230. if r := recover(); r != nil {
  1231. // TODO: add method to alert on panic
  1232. return
  1233. }
  1234. }()
  1235. // listens for websocket closing handshake
  1236. defer wg.Done()
  1237. for {
  1238. if _, _, err := rw.ReadMessage(); err != nil {
  1239. errorchan <- nil
  1240. return
  1241. }
  1242. }
  1243. }()
  1244. go func() {
  1245. defer func() {
  1246. if r := recover(); r != nil {
  1247. // TODO: add method to alert on panic
  1248. return
  1249. }
  1250. }()
  1251. // listens for websocket closing handshake
  1252. defer wg.Done()
  1253. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1254. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1255. errorchan <- &AuthError{}
  1256. }
  1257. })
  1258. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1259. UpdateFunc: func(oldObj, newObj interface{}) {
  1260. secretObj, ok := newObj.(*v1.Secret)
  1261. if !ok {
  1262. errorchan <- fmt.Errorf("could not cast to secret")
  1263. return
  1264. }
  1265. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1266. if isNotHelmRelease && err == nil {
  1267. return
  1268. }
  1269. if err != nil {
  1270. errorchan <- err
  1271. return
  1272. }
  1273. msg := Message{
  1274. EventType: "UPDATE",
  1275. Object: helm_object,
  1276. }
  1277. rw.WriteJSON(msg)
  1278. },
  1279. AddFunc: func(obj interface{}) {
  1280. secretObj, ok := obj.(*v1.Secret)
  1281. if !ok {
  1282. errorchan <- fmt.Errorf("could not cast to secret")
  1283. return
  1284. }
  1285. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1286. if isNotHelmRelease && err == nil {
  1287. return
  1288. }
  1289. if err != nil {
  1290. errorchan <- err
  1291. return
  1292. }
  1293. msg := Message{
  1294. EventType: "ADD",
  1295. Object: helm_object,
  1296. }
  1297. rw.WriteJSON(msg)
  1298. },
  1299. DeleteFunc: func(obj interface{}) {
  1300. secretObj, ok := obj.(*v1.Secret)
  1301. if !ok {
  1302. errorchan <- fmt.Errorf("could not cast to secret")
  1303. return
  1304. }
  1305. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1306. if isNotHelmRelease && err == nil {
  1307. return
  1308. }
  1309. if err != nil {
  1310. errorchan <- err
  1311. return
  1312. }
  1313. msg := Message{
  1314. EventType: "DELETE",
  1315. Object: helm_object,
  1316. }
  1317. rw.WriteJSON(msg)
  1318. },
  1319. })
  1320. informer.Run(stopper)
  1321. }()
  1322. for err = range errorchan {
  1323. once.Do(func() {
  1324. close(stopper)
  1325. rw.Close()
  1326. })
  1327. }
  1328. return err
  1329. }
  1330. return a.RunWebsocketTask(run)
  1331. }
  1332. // CreateImagePullSecrets will create the required image pull secrets and
  1333. // return a map from the registry name to the name of the secret.
  1334. func (a *Agent) CreateImagePullSecrets(
  1335. repo repository.Repository,
  1336. namespace string,
  1337. linkedRegs map[string]*models.Registry,
  1338. doAuth *oauth2.Config,
  1339. ) (map[string]string, error) {
  1340. res := make(map[string]string)
  1341. for key, val := range linkedRegs {
  1342. _reg := registry.Registry(*val)
  1343. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1344. if err != nil {
  1345. return nil, err
  1346. }
  1347. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1348. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1349. context.TODO(),
  1350. secretName,
  1351. metav1.GetOptions{},
  1352. )
  1353. // if not found, create the secret
  1354. if err != nil && errors.IsNotFound(err) {
  1355. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1356. context.TODO(),
  1357. &v1.Secret{
  1358. ObjectMeta: metav1.ObjectMeta{
  1359. Name: secretName,
  1360. },
  1361. Data: map[string][]byte{
  1362. string(v1.DockerConfigJsonKey): data,
  1363. },
  1364. Type: v1.SecretTypeDockerConfigJson,
  1365. },
  1366. metav1.CreateOptions{},
  1367. )
  1368. if err != nil {
  1369. return nil, err
  1370. }
  1371. // add secret name to the map
  1372. res[key] = secretName
  1373. continue
  1374. } else if err != nil {
  1375. return nil, err
  1376. }
  1377. // otherwise, check that the secret contains the correct data: if
  1378. // if doesn't, update it
  1379. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1380. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1381. context.TODO(),
  1382. &v1.Secret{
  1383. ObjectMeta: metav1.ObjectMeta{
  1384. Name: secretName,
  1385. },
  1386. Data: map[string][]byte{
  1387. string(v1.DockerConfigJsonKey): data,
  1388. },
  1389. Type: v1.SecretTypeDockerConfigJson,
  1390. },
  1391. metav1.UpdateOptions{},
  1392. )
  1393. if err != nil {
  1394. return nil, err
  1395. }
  1396. }
  1397. // add secret name to the map
  1398. res[key] = secretName
  1399. }
  1400. return res, nil
  1401. }
  1402. // helper that waits for pod to be ready
  1403. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1404. var (
  1405. w watch.Interface
  1406. err error
  1407. ok bool
  1408. )
  1409. // immediately after creating a pod, the API may return a 404. heuristically 1
  1410. // second seems to be plenty.
  1411. watchRetries := 3
  1412. for i := 0; i < watchRetries; i++ {
  1413. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1414. w, err = a.Clientset.CoreV1().
  1415. Pods(pod.Namespace).
  1416. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1417. if err == nil {
  1418. break
  1419. }
  1420. time.Sleep(time.Second)
  1421. }
  1422. if err != nil {
  1423. return err, false
  1424. }
  1425. defer w.Stop()
  1426. expireTime := time.Now().Add(time.Second * 30)
  1427. for time.Now().Unix() <= expireTime.Unix() {
  1428. select {
  1429. case <-time.NewTicker(time.Second).C:
  1430. // poll every second in case we already missed the ready event while
  1431. // creating the listener.
  1432. pod, err = a.Clientset.CoreV1().
  1433. Pods(pod.Namespace).
  1434. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1435. if err != nil && errors.IsNotFound(err) {
  1436. return IsNotFoundError, false
  1437. } else if err != nil {
  1438. return err, false
  1439. }
  1440. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1441. return nil, isExited
  1442. }
  1443. case evt := <-w.ResultChan():
  1444. pod, ok = evt.Object.(*v1.Pod)
  1445. if !ok {
  1446. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1447. }
  1448. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1449. return nil, isExited
  1450. }
  1451. }
  1452. }
  1453. return goerrors.New("timed out waiting for pod"), false
  1454. }
  1455. func isPodReady(pod *v1.Pod) bool {
  1456. ready := false
  1457. conditions := pod.Status.Conditions
  1458. for i := range conditions {
  1459. if conditions[i].Type == v1.PodReady {
  1460. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1461. }
  1462. }
  1463. return ready
  1464. }
  1465. func isPodExited(pod *v1.Pod) bool {
  1466. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1467. }