agent.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. goerrors "errors"
  17. "github.com/porter-dev/porter/api/server/shared/websocket"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  19. "github.com/porter-dev/porter/internal/models"
  20. "github.com/porter-dev/porter/internal/registry"
  21. "github.com/porter-dev/porter/internal/repository"
  22. "golang.org/x/oauth2"
  23. errors2 "errors"
  24. "github.com/porter-dev/porter/internal/helm/grapher"
  25. appsv1 "k8s.io/api/apps/v1"
  26. batchv1 "k8s.io/api/batch/v1"
  27. batchv1beta1 "k8s.io/api/batch/v1beta1"
  28. v1 "k8s.io/api/core/v1"
  29. v1beta1 "k8s.io/api/extensions/v1beta1"
  30. "k8s.io/apimachinery/pkg/api/errors"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/fields"
  33. "k8s.io/apimachinery/pkg/runtime"
  34. "k8s.io/apimachinery/pkg/runtime/schema"
  35. "k8s.io/apimachinery/pkg/types"
  36. "k8s.io/apimachinery/pkg/watch"
  37. "k8s.io/cli-runtime/pkg/genericclioptions"
  38. "k8s.io/client-go/informers"
  39. "k8s.io/client-go/kubernetes"
  40. "k8s.io/client-go/rest"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/tools/remotecommand"
  43. rspb "helm.sh/helm/v3/pkg/release"
  44. )
  45. // Agent is a Kubernetes agent for performing operations that interact with the
  46. // api server
  47. type Agent struct {
  48. RESTClientGetter genericclioptions.RESTClientGetter
  49. Clientset kubernetes.Interface
  50. }
  51. type Message struct {
  52. EventType string `json:"event_type"`
  53. Object interface{}
  54. Kind string
  55. }
  56. type ListOptions struct {
  57. FieldSelector string
  58. }
  59. type AuthError struct{}
  60. func (e *AuthError) Error() string {
  61. return "Unauthorized error"
  62. }
  63. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  64. func (a *Agent) UpdateClientset() error {
  65. restConf, err := a.RESTClientGetter.ToRESTConfig()
  66. if err != nil {
  67. return err
  68. }
  69. clientset, err := kubernetes.NewForConfig(restConf)
  70. if err != nil {
  71. return err
  72. }
  73. a.Clientset = clientset
  74. return nil
  75. }
  76. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  77. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  78. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  79. context.TODO(),
  80. &v1.ConfigMap{
  81. ObjectMeta: metav1.ObjectMeta{
  82. Name: name,
  83. Namespace: namespace,
  84. Labels: map[string]string{
  85. "porter": "true",
  86. },
  87. },
  88. Data: configMap,
  89. },
  90. metav1.CreateOptions{},
  91. )
  92. }
  93. func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
  94. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  95. context.TODO(),
  96. &v1.ConfigMap{
  97. ObjectMeta: metav1.ObjectMeta{
  98. Name: fmt.Sprintf("%s.v%d", name, version),
  99. Namespace: namespace,
  100. Labels: map[string]string{
  101. "owner": "porter",
  102. "envgroup": name,
  103. "version": fmt.Sprintf("%d", version),
  104. },
  105. Annotations: map[string]string{
  106. PorterAppAnnotationName: strings.Join(apps, ","),
  107. },
  108. },
  109. Data: configMap,
  110. },
  111. metav1.CreateOptions{},
  112. )
  113. }
  114. const PorterAppAnnotationName = "porter.run/apps"
  115. func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  116. annons := cm.Annotations
  117. if annons == nil {
  118. annons = make(map[string]string)
  119. }
  120. appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
  121. if !appAnnonExists || appAnnon == "" {
  122. annons[PorterAppAnnotationName] = appName
  123. } else {
  124. appStrArr := strings.Split(appAnnon, ",")
  125. foundApp := false
  126. for _, appStr := range appStrArr {
  127. if appStr == appName {
  128. foundApp = true
  129. }
  130. }
  131. if !foundApp {
  132. annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
  133. }
  134. }
  135. cm.SetAnnotations(annons)
  136. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  137. context.TODO(),
  138. cm,
  139. metav1.UpdateOptions{},
  140. )
  141. }
  142. func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  143. annons := cm.Annotations
  144. if annons == nil {
  145. annons = make(map[string]string)
  146. }
  147. appAnn, appAnnExists := annons[PorterAppAnnotationName]
  148. if !appAnnExists {
  149. return nil, IsNotFoundError
  150. }
  151. appStrArr := strings.Split(appAnn, ",")
  152. newStrArr := make([]string, 0)
  153. for _, appStr := range appStrArr {
  154. if appStr != appName {
  155. newStrArr = append(newStrArr, appStr)
  156. }
  157. }
  158. annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
  159. cm.SetAnnotations(annons)
  160. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  161. context.TODO(),
  162. cm,
  163. metav1.UpdateOptions{},
  164. )
  165. }
  166. func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
  167. return a.Clientset.CoreV1().Secrets(namespace).Create(
  168. context.TODO(),
  169. &v1.Secret{
  170. ObjectMeta: metav1.ObjectMeta{
  171. Name: fmt.Sprintf("%s.v%d", name, version),
  172. Namespace: namespace,
  173. Labels: map[string]string{
  174. "owner": "porter",
  175. "envgroup": name,
  176. "version": fmt.Sprintf("%d", version),
  177. "configmap": cmName,
  178. },
  179. },
  180. Data: data,
  181. },
  182. metav1.CreateOptions{},
  183. )
  184. }
  185. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  186. // base64 encoded
  187. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  188. return a.Clientset.CoreV1().Secrets(namespace).Create(
  189. context.TODO(),
  190. &v1.Secret{
  191. ObjectMeta: metav1.ObjectMeta{
  192. Name: name,
  193. Namespace: namespace,
  194. Labels: map[string]string{
  195. "porter": "true",
  196. "configmap": cmName,
  197. },
  198. },
  199. Data: data,
  200. },
  201. metav1.CreateOptions{},
  202. )
  203. }
  204. type mergeConfigMapData struct {
  205. Data map[string]*string `json:"data"`
  206. }
  207. // UpdateConfigMap updates the configmap given its name and namespace
  208. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  209. cmData := make(map[string]*string)
  210. for key, val := range configMap {
  211. valCopy := val
  212. cmData[key] = &valCopy
  213. if len(val) == 0 {
  214. cmData[key] = nil
  215. }
  216. }
  217. mergeCM := &mergeConfigMapData{
  218. Data: cmData,
  219. }
  220. patchBytes, err := json.Marshal(mergeCM)
  221. if err != nil {
  222. return nil, err
  223. }
  224. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  225. context.Background(),
  226. name,
  227. types.MergePatchType,
  228. patchBytes,
  229. metav1.PatchOptions{},
  230. )
  231. }
  232. type mergeLinkedSecretData struct {
  233. Data map[string]*[]byte `json:"data"`
  234. }
  235. // UpdateLinkedSecret updates the secret given its name and namespace
  236. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  237. secretData := make(map[string]*[]byte)
  238. for key, val := range data {
  239. valCopy := val
  240. secretData[key] = &valCopy
  241. if len(val) == 0 {
  242. secretData[key] = nil
  243. }
  244. }
  245. mergeSecret := &mergeLinkedSecretData{
  246. Data: secretData,
  247. }
  248. patchBytes, err := json.Marshal(mergeSecret)
  249. if err != nil {
  250. return err
  251. }
  252. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  253. context.TODO(),
  254. name,
  255. types.MergePatchType,
  256. patchBytes,
  257. metav1.PatchOptions{},
  258. )
  259. return err
  260. }
  261. // DeleteConfigMap deletes the configmap given its name and namespace
  262. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  263. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  264. context.TODO(),
  265. name,
  266. metav1.DeleteOptions{},
  267. )
  268. }
  269. // DeleteLinkedSecret deletes the secret given its name and namespace
  270. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  271. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  272. context.TODO(),
  273. name,
  274. metav1.DeleteOptions{},
  275. )
  276. }
  277. func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
  278. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  279. context.Background(),
  280. metav1.ListOptions{
  281. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  282. },
  283. )
  284. if err != nil {
  285. return nil, err
  286. }
  287. return listResp.Items, nil
  288. }
  289. func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
  290. return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
  291. context.Background(),
  292. metav1.DeleteOptions{},
  293. metav1.ListOptions{
  294. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  295. },
  296. )
  297. }
  298. func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
  299. return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
  300. context.Background(),
  301. metav1.DeleteOptions{},
  302. metav1.ListOptions{
  303. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  304. },
  305. )
  306. }
  307. func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
  308. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  309. context.Background(),
  310. metav1.ListOptions{
  311. LabelSelector: fmt.Sprintf("envgroup"),
  312. },
  313. )
  314. if err != nil {
  315. return nil, err
  316. }
  317. // only keep the latest version for each configmap
  318. latestMap := make(map[string]v1.ConfigMap)
  319. for _, configmap := range listResp.Items {
  320. egName, egNameExists := configmap.Labels["envgroup"]
  321. if !egNameExists {
  322. continue
  323. }
  324. id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
  325. if currLatest, exists := latestMap[id]; exists {
  326. // get version
  327. currVersionStr, currVersionExists := currLatest.Labels["version"]
  328. versionStr, versionExists := configmap.Labels["version"]
  329. if versionExists && currVersionExists {
  330. currVersion, currErr := strconv.Atoi(currVersionStr)
  331. version, err := strconv.Atoi(versionStr)
  332. if currErr == nil && err == nil && currVersion < version {
  333. latestMap[id] = configmap
  334. }
  335. }
  336. } else {
  337. latestMap[id] = configmap
  338. }
  339. }
  340. res := make([]v1.ConfigMap, 0)
  341. for _, cm := range latestMap {
  342. res = append(res, cm)
  343. }
  344. return res, nil
  345. }
  346. // GetConfigMap retrieves the configmap given its name and namespace
  347. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  348. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  349. context.TODO(),
  350. name,
  351. metav1.GetOptions{},
  352. )
  353. }
  354. func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
  355. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  356. context.Background(),
  357. metav1.ListOptions{
  358. LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
  359. },
  360. )
  361. if err != nil {
  362. return nil, err
  363. }
  364. if listResp.Items == nil || len(listResp.Items) == 0 {
  365. return nil, IsNotFoundError
  366. }
  367. // if the length of the list is greater than 1, return an error -- this shouldn't happen
  368. if len(listResp.Items) > 1 {
  369. return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
  370. }
  371. return &listResp.Items[0], nil
  372. }
  373. func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
  374. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  375. context.Background(),
  376. metav1.ListOptions{
  377. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  378. },
  379. )
  380. if err != nil {
  381. return nil, 0, err
  382. }
  383. if listResp.Items == nil || len(listResp.Items) == 0 {
  384. return nil, 0, IsNotFoundError
  385. }
  386. // iterate through the configmaps and get the greatest version
  387. var res *v1.ConfigMap
  388. var latestVersion uint
  389. for _, configmap := range listResp.Items {
  390. if res == nil {
  391. versionStr, versionExists := configmap.Labels["version"]
  392. if !versionExists {
  393. continue
  394. }
  395. version, err := strconv.Atoi(versionStr)
  396. if err != nil {
  397. continue
  398. }
  399. latestV := configmap
  400. res = &latestV
  401. latestVersion = uint(version)
  402. } else {
  403. // get version
  404. versionStr, versionExists := configmap.Labels["version"]
  405. currVersionStr, currVersionExists := res.Labels["version"]
  406. if versionExists && currVersionExists {
  407. currVersion, currErr := strconv.Atoi(currVersionStr)
  408. version, err := strconv.Atoi(versionStr)
  409. if currErr == nil && err == nil && currVersion < version {
  410. latestV := configmap
  411. res = &latestV
  412. latestVersion = uint(version)
  413. }
  414. }
  415. }
  416. }
  417. if res == nil {
  418. return nil, 0, IsNotFoundError
  419. }
  420. return res, latestVersion, nil
  421. }
  422. func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
  423. listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
  424. context.Background(),
  425. metav1.ListOptions{
  426. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  427. },
  428. )
  429. if err != nil {
  430. return nil, 0, err
  431. }
  432. if listResp.Items == nil || len(listResp.Items) == 0 {
  433. return nil, 0, IsNotFoundError
  434. }
  435. // iterate through the configmaps and get the greatest version
  436. var res *v1.Secret
  437. var latestVersion uint
  438. for _, secret := range listResp.Items {
  439. if res == nil {
  440. versionStr, versionExists := secret.Labels["version"]
  441. if !versionExists {
  442. continue
  443. }
  444. version, err := strconv.Atoi(versionStr)
  445. if err != nil {
  446. continue
  447. }
  448. latestV := secret
  449. res = &latestV
  450. latestVersion = uint(version)
  451. } else {
  452. // get version
  453. versionStr, versionExists := secret.Labels["version"]
  454. currVersionStr, currVersionExists := res.Labels["version"]
  455. if versionExists && currVersionExists {
  456. currVersion, currErr := strconv.Atoi(currVersionStr)
  457. version, err := strconv.Atoi(versionStr)
  458. if currErr == nil && err == nil && currVersion < version {
  459. latestV := secret
  460. res = &latestV
  461. latestVersion = uint(version)
  462. }
  463. }
  464. }
  465. }
  466. if res == nil {
  467. return nil, 0, IsNotFoundError
  468. }
  469. return res, latestVersion, nil
  470. }
  471. // GetSecret retrieves the secret given its name and namespace
  472. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  473. return a.Clientset.CoreV1().Secrets(namespace).Get(
  474. context.TODO(),
  475. name,
  476. metav1.GetOptions{},
  477. )
  478. }
  479. // ListConfigMaps simply lists namespaces
  480. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  481. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  482. context.TODO(),
  483. metav1.ListOptions{
  484. LabelSelector: "porter",
  485. },
  486. )
  487. }
  488. // ListEvents lists the events of a given object.
  489. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  490. return a.Clientset.CoreV1().Events(namespace).List(
  491. context.TODO(),
  492. metav1.ListOptions{
  493. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  494. },
  495. )
  496. }
  497. // ListNamespaces simply lists namespaces
  498. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  499. return a.Clientset.CoreV1().Namespaces().List(
  500. context.TODO(),
  501. metav1.ListOptions{},
  502. )
  503. }
  504. // CreateNamespace creates a namespace with the given name.
  505. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  506. // check if namespace exists
  507. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  508. context.TODO(),
  509. name,
  510. metav1.GetOptions{},
  511. )
  512. if err == nil && checkNS != nil {
  513. return checkNS, nil
  514. }
  515. namespace := v1.Namespace{
  516. ObjectMeta: metav1.ObjectMeta{
  517. Name: name,
  518. },
  519. }
  520. return a.Clientset.CoreV1().Namespaces().Create(
  521. context.TODO(),
  522. &namespace,
  523. metav1.CreateOptions{},
  524. )
  525. }
  526. // DeleteNamespace deletes the namespace given the name.
  527. func (a *Agent) DeleteNamespace(name string) error {
  528. return a.Clientset.CoreV1().Namespaces().Delete(
  529. context.TODO(),
  530. name,
  531. metav1.DeleteOptions{},
  532. )
  533. }
  534. func (a *Agent) GetPorterAgent() (*appsv1.Deployment, error) {
  535. depl, err := a.Clientset.AppsV1().Deployments("porter-agent-system").Get(
  536. context.TODO(),
  537. "porter-agent-controller-manager",
  538. metav1.GetOptions{},
  539. )
  540. if err != nil && errors.IsNotFound(err) {
  541. return nil, IsNotFoundError
  542. }
  543. return depl, err
  544. }
  545. // ListJobsByLabel lists jobs in a namespace matching a label
  546. type Label struct {
  547. Key string
  548. Val string
  549. }
  550. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  551. selectors := make([]string, 0)
  552. for _, label := range labels {
  553. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  554. }
  555. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  556. context.TODO(),
  557. metav1.ListOptions{
  558. LabelSelector: strings.Join(selectors, ","),
  559. },
  560. )
  561. if err != nil {
  562. return nil, err
  563. }
  564. return resp.Items, nil
  565. }
  566. // DeleteJob deletes the job in the given name and namespace.
  567. func (a *Agent) DeleteJob(name, namespace string) error {
  568. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  569. context.TODO(),
  570. name,
  571. metav1.DeleteOptions{},
  572. )
  573. }
  574. // GetJobPods lists all pods belonging to a job in a namespace
  575. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  576. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  577. context.TODO(),
  578. metav1.ListOptions{
  579. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  580. },
  581. )
  582. if err != nil {
  583. return nil, err
  584. }
  585. return resp.Items, nil
  586. }
  587. // GetIngress gets ingress given the name and namespace
  588. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  589. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  590. context.TODO(),
  591. name,
  592. metav1.GetOptions{},
  593. )
  594. if err != nil && errors.IsNotFound(err) {
  595. return nil, IsNotFoundError
  596. } else if err != nil {
  597. return nil, err
  598. }
  599. return resp, nil
  600. }
  601. var IsNotFoundError = fmt.Errorf("not found")
  602. type BadRequestError struct {
  603. msg string
  604. }
  605. func (e *BadRequestError) Error() string {
  606. return e.msg
  607. }
  608. // GetDeployment gets the deployment given the name and namespace
  609. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  610. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  611. context.TODO(),
  612. c.Name,
  613. metav1.GetOptions{},
  614. )
  615. if err != nil && errors.IsNotFound(err) {
  616. return nil, IsNotFoundError
  617. } else if err != nil {
  618. return nil, err
  619. }
  620. res.Kind = c.Kind
  621. return res, nil
  622. }
  623. // GetStatefulSet gets the statefulset given the name and namespace
  624. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  625. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  626. context.TODO(),
  627. c.Name,
  628. metav1.GetOptions{},
  629. )
  630. if err != nil && errors.IsNotFound(err) {
  631. return nil, IsNotFoundError
  632. } else if err != nil {
  633. return nil, err
  634. }
  635. res.Kind = c.Kind
  636. return res, nil
  637. }
  638. // GetReplicaSet gets the replicaset given the name and namespace
  639. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  640. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  641. context.TODO(),
  642. c.Name,
  643. metav1.GetOptions{},
  644. )
  645. if err != nil && errors.IsNotFound(err) {
  646. return nil, IsNotFoundError
  647. } else if err != nil {
  648. return nil, err
  649. }
  650. res.Kind = c.Kind
  651. return res, nil
  652. }
  653. // GetDaemonSet gets the daemonset by name and namespace
  654. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  655. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  656. context.TODO(),
  657. c.Name,
  658. metav1.GetOptions{},
  659. )
  660. if err != nil && errors.IsNotFound(err) {
  661. return nil, IsNotFoundError
  662. } else if err != nil {
  663. return nil, err
  664. }
  665. res.Kind = c.Kind
  666. return res, nil
  667. }
  668. // GetJob gets the job by name and namespace
  669. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  670. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  671. context.TODO(),
  672. c.Name,
  673. metav1.GetOptions{},
  674. )
  675. if err != nil && errors.IsNotFound(err) {
  676. return nil, IsNotFoundError
  677. } else if err != nil {
  678. return nil, err
  679. }
  680. res.Kind = c.Kind
  681. return res, nil
  682. }
  683. // GetCronJob gets the CronJob by name and namespace
  684. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  685. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  686. context.TODO(),
  687. c.Name,
  688. metav1.GetOptions{},
  689. )
  690. if err != nil && errors.IsNotFound(err) {
  691. return nil, IsNotFoundError
  692. } else if err != nil {
  693. return nil, err
  694. }
  695. res.Kind = c.Kind
  696. return res, nil
  697. }
  698. // GetPodsByLabel retrieves pods with matching labels
  699. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  700. // Search in all namespaces for matching pods
  701. return a.Clientset.CoreV1().Pods(namespace).List(
  702. context.TODO(),
  703. metav1.ListOptions{
  704. LabelSelector: selector,
  705. },
  706. )
  707. }
  708. // GetPodByName retrieves a single instance of pod with given name
  709. func (a *Agent) GetPodByName(name string, namespace string) (*v1.Pod, error) {
  710. // Get pod by name
  711. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  712. context.TODO(),
  713. name,
  714. metav1.GetOptions{},
  715. )
  716. if err != nil && errors.IsNotFound(err) {
  717. return nil, IsNotFoundError
  718. }
  719. if err != nil {
  720. return nil, err
  721. }
  722. return pod, nil
  723. }
  724. // DeletePod deletes a pod by name and namespace
  725. func (a *Agent) DeletePod(namespace string, name string) error {
  726. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  727. context.TODO(),
  728. name,
  729. metav1.DeleteOptions{},
  730. )
  731. if err != nil && errors.IsNotFound(err) {
  732. return IsNotFoundError
  733. }
  734. return err
  735. }
  736. // GetPodLogs streams real-time logs from a given pod.
  737. func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer string, rw *websocket.WebsocketSafeReadWriter) error {
  738. // get the pod to read in the list of contains
  739. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  740. context.Background(),
  741. name,
  742. metav1.GetOptions{},
  743. )
  744. if err != nil && errors.IsNotFound(err) {
  745. return IsNotFoundError
  746. } else if err != nil {
  747. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  748. }
  749. // see if container is ready and able to open a stream. If not, wait for container
  750. // to be ready.
  751. err, _ = a.waitForPod(pod)
  752. if err != nil && goerrors.Is(err, IsNotFoundError) {
  753. return IsNotFoundError
  754. } else if err != nil {
  755. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  756. }
  757. container := pod.Spec.Containers[0].Name
  758. if len(selectedContainer) > 0 {
  759. container = selectedContainer
  760. }
  761. tails := int64(400)
  762. // follow logs
  763. podLogOpts := v1.PodLogOptions{
  764. Follow: true,
  765. TailLines: &tails,
  766. Container: container,
  767. }
  768. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  769. podLogs, err := req.Stream(context.TODO())
  770. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  771. // we'd like to pass this through to the client.
  772. if err != nil && errors.IsBadRequest(err) {
  773. return &BadRequestError{err.Error()}
  774. } else if err != nil {
  775. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  776. }
  777. r := bufio.NewReader(podLogs)
  778. errorchan := make(chan error)
  779. var wg sync.WaitGroup
  780. var once sync.Once
  781. wg.Add(2)
  782. go func() {
  783. wg.Wait()
  784. close(errorchan)
  785. }()
  786. go func() {
  787. defer func() {
  788. if r := recover(); r != nil {
  789. // TODO: add method to alert on panic
  790. return
  791. }
  792. }()
  793. // listens for websocket closing handshake
  794. defer wg.Done()
  795. for {
  796. if _, _, err := rw.ReadMessage(); err != nil {
  797. errorchan <- nil
  798. return
  799. }
  800. }
  801. }()
  802. go func() {
  803. defer func() {
  804. if r := recover(); r != nil {
  805. // TODO: add method to alert on panic
  806. return
  807. }
  808. }()
  809. defer wg.Done()
  810. for {
  811. bytes, err := r.ReadBytes('\n')
  812. if err != nil {
  813. errorchan <- err
  814. return
  815. }
  816. if _, writeErr := rw.Write(bytes); writeErr != nil {
  817. errorchan <- writeErr
  818. return
  819. }
  820. }
  821. }()
  822. for err = range errorchan {
  823. // only call these methods a single time
  824. once.Do(func() {
  825. rw.Close()
  826. podLogs.Close()
  827. })
  828. }
  829. return err
  830. }
  831. // GetPodLogs streams real-time logs from a given pod.
  832. func (a *Agent) GetPreviousPodLogs(namespace string, name string, selectedContainer string) ([]string, error) {
  833. // get the pod to read in the list of contains
  834. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  835. context.Background(),
  836. name,
  837. metav1.GetOptions{},
  838. )
  839. if err != nil && errors.IsNotFound(err) {
  840. return nil, IsNotFoundError
  841. } else if err != nil {
  842. return nil, fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  843. }
  844. container := pod.Spec.Containers[0].Name
  845. if len(selectedContainer) > 0 {
  846. container = selectedContainer
  847. }
  848. tails := int64(400)
  849. // follow logs
  850. podLogOpts := v1.PodLogOptions{
  851. Follow: true,
  852. TailLines: &tails,
  853. Container: container,
  854. Previous: true,
  855. }
  856. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  857. podLogs, err := req.Stream(context.TODO())
  858. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  859. // we'd like to pass this through to the client.
  860. if err != nil && strings.Contains(err.Error(), "not found") {
  861. return nil, IsNotFoundError
  862. }
  863. if err != nil && errors.IsBadRequest(err) {
  864. return nil, &BadRequestError{err.Error()}
  865. } else if err != nil {
  866. return nil, fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  867. }
  868. defer podLogs.Close()
  869. r := bufio.NewReader(podLogs)
  870. var logs []string
  871. for {
  872. line, err := r.ReadString('\n')
  873. logs = append(logs, line)
  874. if err == io.EOF {
  875. break
  876. } else if err != nil {
  877. return nil, err
  878. }
  879. }
  880. return logs, nil
  881. }
  882. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  883. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  884. jobPods, err := a.GetJobPods(namespace, name)
  885. if err != nil {
  886. return err
  887. }
  888. podName := jobPods[0].ObjectMeta.Name
  889. restConf, err := a.RESTClientGetter.ToRESTConfig()
  890. restConf.GroupVersion = &schema.GroupVersion{
  891. Group: "api",
  892. Version: "v1",
  893. }
  894. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  895. restClient, err := rest.RESTClientFor(restConf)
  896. if err != nil {
  897. return err
  898. }
  899. req := restClient.Post().
  900. Resource("pods").
  901. Name(podName).
  902. Namespace(namespace).
  903. SubResource("exec")
  904. req.Param("command", "./signal.sh")
  905. req.Param("container", "sidecar")
  906. req.Param("stdin", "true")
  907. req.Param("stdout", "false")
  908. req.Param("tty", "false")
  909. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  910. if err != nil {
  911. return err
  912. }
  913. return exec.Stream(remotecommand.StreamOptions{
  914. Tty: false,
  915. Stdin: strings.NewReader("./signal.sh"),
  916. })
  917. }
  918. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  919. // the task some number of times until failing
  920. func (a *Agent) RunWebsocketTask(task func() error) error {
  921. lastTime := int64(0)
  922. for {
  923. if err := a.UpdateClientset(); err != nil {
  924. return err
  925. }
  926. err := task()
  927. if err == nil {
  928. return nil
  929. }
  930. if !errors2.Is(err, &AuthError{}) {
  931. return err
  932. }
  933. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  934. return err
  935. }
  936. lastTime = time.Now().Unix()
  937. }
  938. }
  939. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  940. // TODO: Support Jobs
  941. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  942. run := func() error {
  943. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  944. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  945. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  946. options.LabelSelector = selectors
  947. }
  948. factory := informers.NewSharedInformerFactoryWithOptions(
  949. a.Clientset,
  950. 0,
  951. informers.WithTweakListOptions(tweakListOptionsFunc),
  952. )
  953. var informer cache.SharedInformer
  954. // Spins up an informer depending on kind. Convert to lowercase for robustness
  955. switch strings.ToLower(kind) {
  956. case "deployment":
  957. informer = factory.Apps().V1().Deployments().Informer()
  958. case "statefulset":
  959. informer = factory.Apps().V1().StatefulSets().Informer()
  960. case "replicaset":
  961. informer = factory.Apps().V1().ReplicaSets().Informer()
  962. case "daemonset":
  963. informer = factory.Apps().V1().DaemonSets().Informer()
  964. case "job":
  965. informer = factory.Batch().V1().Jobs().Informer()
  966. case "cronjob":
  967. informer = factory.Batch().V1beta1().CronJobs().Informer()
  968. case "namespace":
  969. informer = factory.Core().V1().Namespaces().Informer()
  970. case "pod":
  971. informer = factory.Core().V1().Pods().Informer()
  972. }
  973. stopper := make(chan struct{})
  974. errorchan := make(chan error)
  975. var wg sync.WaitGroup
  976. var once sync.Once
  977. var err error
  978. wg.Add(2)
  979. go func() {
  980. wg.Wait()
  981. close(errorchan)
  982. }()
  983. go func() {
  984. defer func() {
  985. if r := recover(); r != nil {
  986. // TODO: add method to alert on panic
  987. return
  988. }
  989. }()
  990. // listens for websocket closing handshake
  991. defer wg.Done()
  992. for {
  993. if _, _, err := rw.ReadMessage(); err != nil {
  994. errorchan <- nil
  995. return
  996. }
  997. }
  998. }()
  999. go func() {
  1000. defer func() {
  1001. if r := recover(); r != nil {
  1002. // TODO: add method to alert on panic
  1003. return
  1004. }
  1005. }()
  1006. // listens for websocket closing handshake
  1007. defer wg.Done()
  1008. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1009. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1010. errorchan <- &AuthError{}
  1011. }
  1012. })
  1013. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1014. UpdateFunc: func(oldObj, newObj interface{}) {
  1015. msg := Message{
  1016. EventType: "UPDATE",
  1017. Object: newObj,
  1018. Kind: strings.ToLower(kind),
  1019. }
  1020. err := rw.WriteJSON(msg)
  1021. if err != nil {
  1022. errorchan <- err
  1023. }
  1024. },
  1025. AddFunc: func(obj interface{}) {
  1026. msg := Message{
  1027. EventType: "ADD",
  1028. Object: obj,
  1029. Kind: strings.ToLower(kind),
  1030. }
  1031. err := rw.WriteJSON(msg)
  1032. if err != nil {
  1033. errorchan <- err
  1034. }
  1035. },
  1036. DeleteFunc: func(obj interface{}) {
  1037. msg := Message{
  1038. EventType: "DELETE",
  1039. Object: obj,
  1040. Kind: strings.ToLower(kind),
  1041. }
  1042. err := rw.WriteJSON(msg)
  1043. if err != nil {
  1044. errorchan <- err
  1045. }
  1046. },
  1047. })
  1048. informer.Run(stopper)
  1049. }()
  1050. for err = range errorchan {
  1051. once.Do(func() {
  1052. close(stopper)
  1053. rw.Close()
  1054. })
  1055. }
  1056. return err
  1057. }
  1058. return a.RunWebsocketTask(run)
  1059. }
  1060. var b64 = base64.StdEncoding
  1061. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  1062. func decodeRelease(data string) (*rspb.Release, error) {
  1063. // base64 decode string
  1064. b, err := b64.DecodeString(data)
  1065. if err != nil {
  1066. return nil, err
  1067. }
  1068. // For backwards compatibility with releases that were stored before
  1069. // compression was introduced we skip decompression if the
  1070. // gzip magic header is not found
  1071. if bytes.Equal(b[0:3], magicGzip) {
  1072. r, err := gzip.NewReader(bytes.NewReader(b))
  1073. if err != nil {
  1074. return nil, err
  1075. }
  1076. defer r.Close()
  1077. b2, err := ioutil.ReadAll(r)
  1078. if err != nil {
  1079. return nil, err
  1080. }
  1081. b = b2
  1082. }
  1083. var rls rspb.Release
  1084. // unmarshal release object bytes
  1085. if err := json.Unmarshal(b, &rls); err != nil {
  1086. return nil, err
  1087. }
  1088. return &rls, nil
  1089. }
  1090. func contains(s []string, str string) bool {
  1091. for _, v := range s {
  1092. if v == str {
  1093. return true
  1094. }
  1095. }
  1096. return false
  1097. }
  1098. func ParseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  1099. if secret.Type != "helm.sh/release.v1" {
  1100. return nil, true, nil
  1101. }
  1102. releaseData, ok := secret.Data["release"]
  1103. if !ok {
  1104. return nil, true, fmt.Errorf("release field not found")
  1105. }
  1106. helm_object, err := decodeRelease(string(releaseData))
  1107. if err != nil {
  1108. return nil, true, err
  1109. }
  1110. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  1111. return nil, true, nil
  1112. }
  1113. return helm_object, false, nil
  1114. }
  1115. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1116. run := func() error {
  1117. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1118. options.LabelSelector = selectors
  1119. }
  1120. factory := informers.NewSharedInformerFactoryWithOptions(
  1121. a.Clientset,
  1122. 0,
  1123. informers.WithTweakListOptions(tweakListOptionsFunc),
  1124. informers.WithNamespace(namespace),
  1125. )
  1126. informer := factory.Core().V1().Secrets().Informer()
  1127. stopper := make(chan struct{})
  1128. errorchan := make(chan error)
  1129. var wg sync.WaitGroup
  1130. var once sync.Once
  1131. var err error
  1132. wg.Add(2)
  1133. go func() {
  1134. wg.Wait()
  1135. close(errorchan)
  1136. }()
  1137. go func() {
  1138. defer func() {
  1139. if r := recover(); r != nil {
  1140. // TODO: add method to alert on panic
  1141. return
  1142. }
  1143. }()
  1144. // listens for websocket closing handshake
  1145. defer wg.Done()
  1146. for {
  1147. if _, _, err := rw.ReadMessage(); err != nil {
  1148. errorchan <- nil
  1149. return
  1150. }
  1151. }
  1152. }()
  1153. go func() {
  1154. defer func() {
  1155. if r := recover(); r != nil {
  1156. // TODO: add method to alert on panic
  1157. return
  1158. }
  1159. }()
  1160. // listens for websocket closing handshake
  1161. defer wg.Done()
  1162. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1163. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1164. errorchan <- &AuthError{}
  1165. }
  1166. })
  1167. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1168. UpdateFunc: func(oldObj, newObj interface{}) {
  1169. secretObj, ok := newObj.(*v1.Secret)
  1170. if !ok {
  1171. errorchan <- fmt.Errorf("could not cast to secret")
  1172. return
  1173. }
  1174. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1175. if isNotHelmRelease && err == nil {
  1176. return
  1177. }
  1178. if err != nil {
  1179. errorchan <- err
  1180. return
  1181. }
  1182. msg := Message{
  1183. EventType: "UPDATE",
  1184. Object: helm_object,
  1185. }
  1186. rw.WriteJSON(msg)
  1187. },
  1188. AddFunc: func(obj interface{}) {
  1189. secretObj, ok := obj.(*v1.Secret)
  1190. if !ok {
  1191. errorchan <- fmt.Errorf("could not cast to secret")
  1192. return
  1193. }
  1194. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1195. if isNotHelmRelease && err == nil {
  1196. return
  1197. }
  1198. if err != nil {
  1199. errorchan <- err
  1200. return
  1201. }
  1202. msg := Message{
  1203. EventType: "ADD",
  1204. Object: helm_object,
  1205. }
  1206. rw.WriteJSON(msg)
  1207. },
  1208. DeleteFunc: func(obj interface{}) {
  1209. secretObj, ok := obj.(*v1.Secret)
  1210. if !ok {
  1211. errorchan <- fmt.Errorf("could not cast to secret")
  1212. return
  1213. }
  1214. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1215. if isNotHelmRelease && err == nil {
  1216. return
  1217. }
  1218. if err != nil {
  1219. errorchan <- err
  1220. return
  1221. }
  1222. msg := Message{
  1223. EventType: "DELETE",
  1224. Object: helm_object,
  1225. }
  1226. rw.WriteJSON(msg)
  1227. },
  1228. })
  1229. informer.Run(stopper)
  1230. }()
  1231. for err = range errorchan {
  1232. once.Do(func() {
  1233. close(stopper)
  1234. rw.Close()
  1235. })
  1236. }
  1237. return err
  1238. }
  1239. return a.RunWebsocketTask(run)
  1240. }
  1241. func (a *Agent) Provision(
  1242. opts *provisioner.ProvisionOpts,
  1243. ) error {
  1244. // get the provisioner job template
  1245. job, err := provisioner.GetProvisionerJobTemplate(opts)
  1246. if err != nil {
  1247. return err
  1248. }
  1249. // clearExistingJob with the same name
  1250. // this is required in case of a job retry
  1251. err = a.clearExistingJobs(job)
  1252. if err != nil {
  1253. return err
  1254. }
  1255. // apply the provisioner job template
  1256. _, err = a.Clientset.BatchV1().Jobs(opts.ProvJobNamespace).Create(
  1257. context.TODO(),
  1258. job,
  1259. metav1.CreateOptions{},
  1260. )
  1261. return err
  1262. }
  1263. func (a *Agent) clearExistingJobs(j *batchv1.Job) error {
  1264. // find if existingJob already exists
  1265. existingJob, err := a.Clientset.BatchV1().Jobs(j.Namespace).Get(
  1266. context.TODO(),
  1267. j.Name,
  1268. metav1.GetOptions{},
  1269. )
  1270. if err != nil {
  1271. // job not found, no further action needed
  1272. return nil
  1273. }
  1274. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  1275. defer cancel()
  1276. w, err := a.Clientset.BatchV1().Jobs(existingJob.Namespace).Watch(
  1277. context.TODO(),
  1278. metav1.ListOptions{
  1279. ResourceVersion: existingJob.ResourceVersion,
  1280. // ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
  1281. },
  1282. )
  1283. if err != nil {
  1284. // most probably the job has already been deleted
  1285. return nil
  1286. }
  1287. deleteErrorChan := make(chan error)
  1288. go func(errChan chan<- error) {
  1289. // job exists, delete it and wait for its deletion
  1290. // delete job if it already exists
  1291. err = a.Clientset.BatchV1().Jobs(existingJob.Namespace).Delete(
  1292. context.TODO(),
  1293. j.Name,
  1294. metav1.DeleteOptions{},
  1295. )
  1296. if err != nil {
  1297. // unable to delete job
  1298. errChan <- err
  1299. }
  1300. }(deleteErrorChan)
  1301. for {
  1302. select {
  1303. case <-ctx.Done():
  1304. return fmt.Errorf("timedout waiting for existing job deletion")
  1305. case event := <-w.ResultChan():
  1306. switch event.Type {
  1307. case watch.Deleted:
  1308. // job has been successfully delete
  1309. // return without error
  1310. return nil
  1311. }
  1312. }
  1313. }
  1314. }
  1315. // CreateImagePullSecrets will create the required image pull secrets and
  1316. // return a map from the registry name to the name of the secret.
  1317. func (a *Agent) CreateImagePullSecrets(
  1318. repo repository.Repository,
  1319. namespace string,
  1320. linkedRegs map[string]*models.Registry,
  1321. doAuth *oauth2.Config,
  1322. ) (map[string]string, error) {
  1323. res := make(map[string]string)
  1324. for key, val := range linkedRegs {
  1325. _reg := registry.Registry(*val)
  1326. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1327. if err != nil {
  1328. return nil, err
  1329. }
  1330. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1331. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1332. context.TODO(),
  1333. secretName,
  1334. metav1.GetOptions{},
  1335. )
  1336. // if not found, create the secret
  1337. if err != nil && errors.IsNotFound(err) {
  1338. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1339. context.TODO(),
  1340. &v1.Secret{
  1341. ObjectMeta: metav1.ObjectMeta{
  1342. Name: secretName,
  1343. },
  1344. Data: map[string][]byte{
  1345. string(v1.DockerConfigJsonKey): data,
  1346. },
  1347. Type: v1.SecretTypeDockerConfigJson,
  1348. },
  1349. metav1.CreateOptions{},
  1350. )
  1351. if err != nil {
  1352. return nil, err
  1353. }
  1354. // add secret name to the map
  1355. res[key] = secretName
  1356. continue
  1357. } else if err != nil {
  1358. return nil, err
  1359. }
  1360. // otherwise, check that the secret contains the correct data: if
  1361. // if doesn't, update it
  1362. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1363. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1364. context.TODO(),
  1365. &v1.Secret{
  1366. ObjectMeta: metav1.ObjectMeta{
  1367. Name: secretName,
  1368. },
  1369. Data: map[string][]byte{
  1370. string(v1.DockerConfigJsonKey): data,
  1371. },
  1372. Type: v1.SecretTypeDockerConfigJson,
  1373. },
  1374. metav1.UpdateOptions{},
  1375. )
  1376. if err != nil {
  1377. return nil, err
  1378. }
  1379. }
  1380. // add secret name to the map
  1381. res[key] = secretName
  1382. }
  1383. return res, nil
  1384. }
  1385. // helper that waits for pod to be ready
  1386. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1387. var (
  1388. w watch.Interface
  1389. err error
  1390. ok bool
  1391. )
  1392. // immediately after creating a pod, the API may return a 404. heuristically 1
  1393. // second seems to be plenty.
  1394. watchRetries := 3
  1395. for i := 0; i < watchRetries; i++ {
  1396. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1397. w, err = a.Clientset.CoreV1().
  1398. Pods(pod.Namespace).
  1399. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1400. if err == nil {
  1401. break
  1402. }
  1403. time.Sleep(time.Second)
  1404. }
  1405. if err != nil {
  1406. return err, false
  1407. }
  1408. defer w.Stop()
  1409. expireTime := time.Now().Add(time.Second * 30)
  1410. for time.Now().Unix() <= expireTime.Unix() {
  1411. select {
  1412. case <-time.NewTicker(time.Second).C:
  1413. // poll every second in case we already missed the ready event while
  1414. // creating the listener.
  1415. pod, err = a.Clientset.CoreV1().
  1416. Pods(pod.Namespace).
  1417. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1418. if err != nil && errors.IsNotFound(err) {
  1419. return IsNotFoundError, false
  1420. } else if err != nil {
  1421. return err, false
  1422. }
  1423. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1424. return nil, isExited
  1425. }
  1426. case evt := <-w.ResultChan():
  1427. pod, ok = evt.Object.(*v1.Pod)
  1428. if !ok {
  1429. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1430. }
  1431. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1432. return nil, isExited
  1433. }
  1434. }
  1435. }
  1436. return goerrors.New("timed out waiting for pod"), false
  1437. }
  1438. func isPodReady(pod *v1.Pod) bool {
  1439. ready := false
  1440. conditions := pod.Status.Conditions
  1441. for i := range conditions {
  1442. if conditions[i].Type == v1.PodReady {
  1443. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1444. }
  1445. }
  1446. return ready
  1447. }
  1448. func isPodExited(pod *v1.Pod) bool {
  1449. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1450. }