agent.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. goerrors "errors"
  17. "github.com/porter-dev/porter/api/server/shared/websocket"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/registry"
  20. "github.com/porter-dev/porter/internal/repository"
  21. "golang.org/x/oauth2"
  22. errors2 "errors"
  23. "github.com/porter-dev/porter/internal/helm/grapher"
  24. appsv1 "k8s.io/api/apps/v1"
  25. batchv1 "k8s.io/api/batch/v1"
  26. batchv1beta1 "k8s.io/api/batch/v1beta1"
  27. v1 "k8s.io/api/core/v1"
  28. v1beta1 "k8s.io/api/extensions/v1beta1"
  29. "k8s.io/apimachinery/pkg/api/errors"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/runtime"
  33. "k8s.io/apimachinery/pkg/runtime/schema"
  34. "k8s.io/apimachinery/pkg/types"
  35. "k8s.io/apimachinery/pkg/watch"
  36. "k8s.io/cli-runtime/pkg/genericclioptions"
  37. "k8s.io/client-go/informers"
  38. "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/rest"
  40. "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/remotecommand"
  42. rspb "helm.sh/helm/v3/pkg/release"
  43. )
  44. // Agent is a Kubernetes agent for performing operations that interact with the
  45. // api server
  46. type Agent struct {
  47. RESTClientGetter genericclioptions.RESTClientGetter
  48. Clientset kubernetes.Interface
  49. }
  50. type Message struct {
  51. EventType string `json:"event_type"`
  52. Object interface{}
  53. Kind string
  54. }
  55. type ListOptions struct {
  56. FieldSelector string
  57. }
  58. type AuthError struct{}
  59. func (e *AuthError) Error() string {
  60. return "Unauthorized error"
  61. }
  62. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  63. func (a *Agent) UpdateClientset() error {
  64. restConf, err := a.RESTClientGetter.ToRESTConfig()
  65. if err != nil {
  66. return err
  67. }
  68. clientset, err := kubernetes.NewForConfig(restConf)
  69. if err != nil {
  70. return err
  71. }
  72. a.Clientset = clientset
  73. return nil
  74. }
  75. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  76. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  77. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  78. context.TODO(),
  79. &v1.ConfigMap{
  80. ObjectMeta: metav1.ObjectMeta{
  81. Name: name,
  82. Namespace: namespace,
  83. Labels: map[string]string{
  84. "porter": "true",
  85. },
  86. },
  87. Data: configMap,
  88. },
  89. metav1.CreateOptions{},
  90. )
  91. }
  92. func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
  93. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  94. context.TODO(),
  95. &v1.ConfigMap{
  96. ObjectMeta: metav1.ObjectMeta{
  97. Name: fmt.Sprintf("%s.v%d", name, version),
  98. Namespace: namespace,
  99. Labels: map[string]string{
  100. "owner": "porter",
  101. "envgroup": name,
  102. "version": fmt.Sprintf("%d", version),
  103. },
  104. Annotations: map[string]string{
  105. PorterAppAnnotationName: strings.Join(apps, ","),
  106. },
  107. },
  108. Data: configMap,
  109. },
  110. metav1.CreateOptions{},
  111. )
  112. }
  113. const PorterAppAnnotationName = "porter.run/apps"
  114. func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  115. annons := cm.Annotations
  116. if annons == nil {
  117. annons = make(map[string]string)
  118. }
  119. appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
  120. if !appAnnonExists || appAnnon == "" {
  121. annons[PorterAppAnnotationName] = appName
  122. } else {
  123. appStrArr := strings.Split(appAnnon, ",")
  124. foundApp := false
  125. for _, appStr := range appStrArr {
  126. if appStr == appName {
  127. foundApp = true
  128. }
  129. }
  130. if !foundApp {
  131. annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
  132. }
  133. }
  134. cm.SetAnnotations(annons)
  135. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  136. context.TODO(),
  137. cm,
  138. metav1.UpdateOptions{},
  139. )
  140. }
  141. func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  142. annons := cm.Annotations
  143. if annons == nil {
  144. annons = make(map[string]string)
  145. }
  146. appAnn, appAnnExists := annons[PorterAppAnnotationName]
  147. if !appAnnExists {
  148. return nil, IsNotFoundError
  149. }
  150. appStrArr := strings.Split(appAnn, ",")
  151. newStrArr := make([]string, 0)
  152. for _, appStr := range appStrArr {
  153. if appStr != appName {
  154. newStrArr = append(newStrArr, appStr)
  155. }
  156. }
  157. annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
  158. cm.SetAnnotations(annons)
  159. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  160. context.TODO(),
  161. cm,
  162. metav1.UpdateOptions{},
  163. )
  164. }
  165. func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
  166. return a.Clientset.CoreV1().Secrets(namespace).Create(
  167. context.TODO(),
  168. &v1.Secret{
  169. ObjectMeta: metav1.ObjectMeta{
  170. Name: fmt.Sprintf("%s.v%d", name, version),
  171. Namespace: namespace,
  172. Labels: map[string]string{
  173. "owner": "porter",
  174. "envgroup": name,
  175. "version": fmt.Sprintf("%d", version),
  176. "configmap": cmName,
  177. },
  178. },
  179. Data: data,
  180. },
  181. metav1.CreateOptions{},
  182. )
  183. }
  184. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  185. // base64 encoded
  186. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  187. return a.Clientset.CoreV1().Secrets(namespace).Create(
  188. context.TODO(),
  189. &v1.Secret{
  190. ObjectMeta: metav1.ObjectMeta{
  191. Name: name,
  192. Namespace: namespace,
  193. Labels: map[string]string{
  194. "porter": "true",
  195. "configmap": cmName,
  196. },
  197. },
  198. Data: data,
  199. },
  200. metav1.CreateOptions{},
  201. )
  202. }
  203. type mergeConfigMapData struct {
  204. Data map[string]*string `json:"data"`
  205. }
  206. // UpdateConfigMap updates the configmap given its name and namespace
  207. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  208. cmData := make(map[string]*string)
  209. for key, val := range configMap {
  210. valCopy := val
  211. cmData[key] = &valCopy
  212. if len(val) == 0 {
  213. cmData[key] = nil
  214. }
  215. }
  216. mergeCM := &mergeConfigMapData{
  217. Data: cmData,
  218. }
  219. patchBytes, err := json.Marshal(mergeCM)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  224. context.Background(),
  225. name,
  226. types.MergePatchType,
  227. patchBytes,
  228. metav1.PatchOptions{},
  229. )
  230. }
  231. type mergeLinkedSecretData struct {
  232. Data map[string]*[]byte `json:"data"`
  233. }
  234. // UpdateLinkedSecret updates the secret given its name and namespace
  235. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  236. secretData := make(map[string]*[]byte)
  237. for key, val := range data {
  238. valCopy := val
  239. secretData[key] = &valCopy
  240. if len(val) == 0 {
  241. secretData[key] = nil
  242. }
  243. }
  244. mergeSecret := &mergeLinkedSecretData{
  245. Data: secretData,
  246. }
  247. patchBytes, err := json.Marshal(mergeSecret)
  248. if err != nil {
  249. return err
  250. }
  251. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  252. context.TODO(),
  253. name,
  254. types.MergePatchType,
  255. patchBytes,
  256. metav1.PatchOptions{},
  257. )
  258. return err
  259. }
  260. // DeleteConfigMap deletes the configmap given its name and namespace
  261. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  262. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  263. context.TODO(),
  264. name,
  265. metav1.DeleteOptions{},
  266. )
  267. }
  268. // DeleteLinkedSecret deletes the secret given its name and namespace
  269. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  270. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  271. context.TODO(),
  272. name,
  273. metav1.DeleteOptions{},
  274. )
  275. }
  276. func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
  277. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  278. context.Background(),
  279. metav1.ListOptions{
  280. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  281. },
  282. )
  283. if err != nil {
  284. return nil, err
  285. }
  286. return listResp.Items, nil
  287. }
  288. func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
  289. return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
  290. context.Background(),
  291. metav1.DeleteOptions{},
  292. metav1.ListOptions{
  293. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  294. },
  295. )
  296. }
  297. func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
  298. return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
  299. context.Background(),
  300. metav1.DeleteOptions{},
  301. metav1.ListOptions{
  302. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  303. },
  304. )
  305. }
  306. func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
  307. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  308. context.Background(),
  309. metav1.ListOptions{
  310. LabelSelector: fmt.Sprintf("envgroup"),
  311. },
  312. )
  313. if err != nil {
  314. return nil, err
  315. }
  316. // only keep the latest version for each configmap
  317. latestMap := make(map[string]v1.ConfigMap)
  318. for _, configmap := range listResp.Items {
  319. egName, egNameExists := configmap.Labels["envgroup"]
  320. if !egNameExists {
  321. continue
  322. }
  323. id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
  324. if currLatest, exists := latestMap[id]; exists {
  325. // get version
  326. currVersionStr, currVersionExists := currLatest.Labels["version"]
  327. versionStr, versionExists := configmap.Labels["version"]
  328. if versionExists && currVersionExists {
  329. currVersion, currErr := strconv.Atoi(currVersionStr)
  330. version, err := strconv.Atoi(versionStr)
  331. if currErr == nil && err == nil && currVersion < version {
  332. latestMap[id] = configmap
  333. }
  334. }
  335. } else {
  336. latestMap[id] = configmap
  337. }
  338. }
  339. res := make([]v1.ConfigMap, 0)
  340. for _, cm := range latestMap {
  341. res = append(res, cm)
  342. }
  343. return res, nil
  344. }
  345. // GetConfigMap retrieves the configmap given its name and namespace
  346. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  347. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  348. context.TODO(),
  349. name,
  350. metav1.GetOptions{},
  351. )
  352. }
  353. func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
  354. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  355. context.Background(),
  356. metav1.ListOptions{
  357. LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
  358. },
  359. )
  360. if err != nil {
  361. return nil, err
  362. }
  363. if listResp.Items == nil || len(listResp.Items) == 0 {
  364. return nil, IsNotFoundError
  365. }
  366. // if the length of the list is greater than 1, return an error -- this shouldn't happen
  367. if len(listResp.Items) > 1 {
  368. return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
  369. }
  370. return &listResp.Items[0], nil
  371. }
  372. func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
  373. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  374. context.Background(),
  375. metav1.ListOptions{
  376. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  377. },
  378. )
  379. if err != nil {
  380. return nil, 0, err
  381. }
  382. if listResp.Items == nil || len(listResp.Items) == 0 {
  383. return nil, 0, IsNotFoundError
  384. }
  385. // iterate through the configmaps and get the greatest version
  386. var res *v1.ConfigMap
  387. var latestVersion uint
  388. for _, configmap := range listResp.Items {
  389. if res == nil {
  390. versionStr, versionExists := configmap.Labels["version"]
  391. if !versionExists {
  392. continue
  393. }
  394. version, err := strconv.Atoi(versionStr)
  395. if err != nil {
  396. continue
  397. }
  398. latestV := configmap
  399. res = &latestV
  400. latestVersion = uint(version)
  401. } else {
  402. // get version
  403. versionStr, versionExists := configmap.Labels["version"]
  404. currVersionStr, currVersionExists := res.Labels["version"]
  405. if versionExists && currVersionExists {
  406. currVersion, currErr := strconv.Atoi(currVersionStr)
  407. version, err := strconv.Atoi(versionStr)
  408. if currErr == nil && err == nil && currVersion < version {
  409. latestV := configmap
  410. res = &latestV
  411. latestVersion = uint(version)
  412. }
  413. }
  414. }
  415. }
  416. if res == nil {
  417. return nil, 0, IsNotFoundError
  418. }
  419. return res, latestVersion, nil
  420. }
  421. func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
  422. listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
  423. context.Background(),
  424. metav1.ListOptions{
  425. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  426. },
  427. )
  428. if err != nil {
  429. return nil, 0, err
  430. }
  431. if listResp.Items == nil || len(listResp.Items) == 0 {
  432. return nil, 0, IsNotFoundError
  433. }
  434. // iterate through the configmaps and get the greatest version
  435. var res *v1.Secret
  436. var latestVersion uint
  437. for _, secret := range listResp.Items {
  438. if res == nil {
  439. versionStr, versionExists := secret.Labels["version"]
  440. if !versionExists {
  441. continue
  442. }
  443. version, err := strconv.Atoi(versionStr)
  444. if err != nil {
  445. continue
  446. }
  447. latestV := secret
  448. res = &latestV
  449. latestVersion = uint(version)
  450. } else {
  451. // get version
  452. versionStr, versionExists := secret.Labels["version"]
  453. currVersionStr, currVersionExists := res.Labels["version"]
  454. if versionExists && currVersionExists {
  455. currVersion, currErr := strconv.Atoi(currVersionStr)
  456. version, err := strconv.Atoi(versionStr)
  457. if currErr == nil && err == nil && currVersion < version {
  458. latestV := secret
  459. res = &latestV
  460. latestVersion = uint(version)
  461. }
  462. }
  463. }
  464. }
  465. if res == nil {
  466. return nil, 0, IsNotFoundError
  467. }
  468. return res, latestVersion, nil
  469. }
  470. // GetSecret retrieves the secret given its name and namespace
  471. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  472. return a.Clientset.CoreV1().Secrets(namespace).Get(
  473. context.TODO(),
  474. name,
  475. metav1.GetOptions{},
  476. )
  477. }
  478. // ListConfigMaps simply lists namespaces
  479. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  480. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  481. context.TODO(),
  482. metav1.ListOptions{
  483. LabelSelector: "porter",
  484. },
  485. )
  486. }
  487. // ListEvents lists the events of a given object.
  488. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  489. return a.Clientset.CoreV1().Events(namespace).List(
  490. context.TODO(),
  491. metav1.ListOptions{
  492. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  493. },
  494. )
  495. }
  496. // ListNamespaces simply lists namespaces
  497. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  498. return a.Clientset.CoreV1().Namespaces().List(
  499. context.TODO(),
  500. metav1.ListOptions{},
  501. )
  502. }
  503. // CreateNamespace creates a namespace with the given name.
  504. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  505. // check if namespace exists
  506. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  507. context.TODO(),
  508. name,
  509. metav1.GetOptions{},
  510. )
  511. if err == nil && checkNS != nil {
  512. return checkNS, nil
  513. }
  514. namespace := v1.Namespace{
  515. ObjectMeta: metav1.ObjectMeta{
  516. Name: name,
  517. },
  518. }
  519. return a.Clientset.CoreV1().Namespaces().Create(
  520. context.TODO(),
  521. &namespace,
  522. metav1.CreateOptions{},
  523. )
  524. }
  525. // DeleteNamespace deletes the namespace given the name.
  526. func (a *Agent) DeleteNamespace(name string) error {
  527. return a.Clientset.CoreV1().Namespaces().Delete(
  528. context.TODO(),
  529. name,
  530. metav1.DeleteOptions{},
  531. )
  532. }
  533. func (a *Agent) GetPorterAgent() (*appsv1.Deployment, error) {
  534. depl, err := a.Clientset.AppsV1().Deployments("porter-agent-system").Get(
  535. context.TODO(),
  536. "porter-agent-controller-manager",
  537. metav1.GetOptions{},
  538. )
  539. if err != nil && errors.IsNotFound(err) {
  540. return nil, IsNotFoundError
  541. }
  542. return depl, err
  543. }
  544. // ListJobsByLabel lists jobs in a namespace matching a label
  545. type Label struct {
  546. Key string
  547. Val string
  548. }
  549. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  550. selectors := make([]string, 0)
  551. for _, label := range labels {
  552. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  553. }
  554. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  555. context.TODO(),
  556. metav1.ListOptions{
  557. LabelSelector: strings.Join(selectors, ","),
  558. },
  559. )
  560. if err != nil {
  561. return nil, err
  562. }
  563. return resp.Items, nil
  564. }
  565. // DeleteJob deletes the job in the given name and namespace.
  566. func (a *Agent) DeleteJob(name, namespace string) error {
  567. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  568. context.TODO(),
  569. name,
  570. metav1.DeleteOptions{},
  571. )
  572. }
  573. // GetJobPods lists all pods belonging to a job in a namespace
  574. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  575. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  576. context.TODO(),
  577. metav1.ListOptions{
  578. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  579. },
  580. )
  581. if err != nil {
  582. return nil, err
  583. }
  584. return resp.Items, nil
  585. }
  586. // GetIngress gets ingress given the name and namespace
  587. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  588. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  589. context.TODO(),
  590. name,
  591. metav1.GetOptions{},
  592. )
  593. if err != nil && errors.IsNotFound(err) {
  594. return nil, IsNotFoundError
  595. } else if err != nil {
  596. return nil, err
  597. }
  598. return resp, nil
  599. }
  600. var IsNotFoundError = fmt.Errorf("not found")
  601. type BadRequestError struct {
  602. msg string
  603. }
  604. func (e *BadRequestError) Error() string {
  605. return e.msg
  606. }
  607. // GetDeployment gets the deployment given the name and namespace
  608. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  609. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  610. context.TODO(),
  611. c.Name,
  612. metav1.GetOptions{},
  613. )
  614. if err != nil && errors.IsNotFound(err) {
  615. return nil, IsNotFoundError
  616. } else if err != nil {
  617. return nil, err
  618. }
  619. res.Kind = c.Kind
  620. return res, nil
  621. }
  622. // GetStatefulSet gets the statefulset given the name and namespace
  623. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  624. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  625. context.TODO(),
  626. c.Name,
  627. metav1.GetOptions{},
  628. )
  629. if err != nil && errors.IsNotFound(err) {
  630. return nil, IsNotFoundError
  631. } else if err != nil {
  632. return nil, err
  633. }
  634. res.Kind = c.Kind
  635. return res, nil
  636. }
  637. // GetReplicaSet gets the replicaset given the name and namespace
  638. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  639. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  640. context.TODO(),
  641. c.Name,
  642. metav1.GetOptions{},
  643. )
  644. if err != nil && errors.IsNotFound(err) {
  645. return nil, IsNotFoundError
  646. } else if err != nil {
  647. return nil, err
  648. }
  649. res.Kind = c.Kind
  650. return res, nil
  651. }
  652. // GetDaemonSet gets the daemonset by name and namespace
  653. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  654. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  655. context.TODO(),
  656. c.Name,
  657. metav1.GetOptions{},
  658. )
  659. if err != nil && errors.IsNotFound(err) {
  660. return nil, IsNotFoundError
  661. } else if err != nil {
  662. return nil, err
  663. }
  664. res.Kind = c.Kind
  665. return res, nil
  666. }
  667. // GetJob gets the job by name and namespace
  668. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  669. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  670. context.TODO(),
  671. c.Name,
  672. metav1.GetOptions{},
  673. )
  674. if err != nil && errors.IsNotFound(err) {
  675. return nil, IsNotFoundError
  676. } else if err != nil {
  677. return nil, err
  678. }
  679. res.Kind = c.Kind
  680. return res, nil
  681. }
  682. // GetCronJob gets the CronJob by name and namespace
  683. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  684. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  685. context.TODO(),
  686. c.Name,
  687. metav1.GetOptions{},
  688. )
  689. if err != nil && errors.IsNotFound(err) {
  690. return nil, IsNotFoundError
  691. } else if err != nil {
  692. return nil, err
  693. }
  694. res.Kind = c.Kind
  695. return res, nil
  696. }
  697. // GetPodsByLabel retrieves pods with matching labels
  698. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  699. // Search in all namespaces for matching pods
  700. return a.Clientset.CoreV1().Pods(namespace).List(
  701. context.TODO(),
  702. metav1.ListOptions{
  703. LabelSelector: selector,
  704. },
  705. )
  706. }
  707. // GetPodByName retrieves a single instance of pod with given name
  708. func (a *Agent) GetPodByName(name string, namespace string) (*v1.Pod, error) {
  709. // Get pod by name
  710. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  711. context.TODO(),
  712. name,
  713. metav1.GetOptions{},
  714. )
  715. if err != nil && errors.IsNotFound(err) {
  716. return nil, IsNotFoundError
  717. }
  718. if err != nil {
  719. return nil, err
  720. }
  721. return pod, nil
  722. }
  723. // DeletePod deletes a pod by name and namespace
  724. func (a *Agent) DeletePod(namespace string, name string) error {
  725. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  726. context.TODO(),
  727. name,
  728. metav1.DeleteOptions{},
  729. )
  730. if err != nil && errors.IsNotFound(err) {
  731. return IsNotFoundError
  732. }
  733. return err
  734. }
  735. // GetPodLogs streams real-time logs from a given pod.
  736. func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer string, rw *websocket.WebsocketSafeReadWriter) error {
  737. // get the pod to read in the list of contains
  738. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  739. context.Background(),
  740. name,
  741. metav1.GetOptions{},
  742. )
  743. if err != nil && errors.IsNotFound(err) {
  744. return IsNotFoundError
  745. } else if err != nil {
  746. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  747. }
  748. // see if container is ready and able to open a stream. If not, wait for container
  749. // to be ready.
  750. err, _ = a.waitForPod(pod)
  751. if err != nil && goerrors.Is(err, IsNotFoundError) {
  752. return IsNotFoundError
  753. } else if err != nil {
  754. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  755. }
  756. container := pod.Spec.Containers[0].Name
  757. if len(selectedContainer) > 0 {
  758. container = selectedContainer
  759. }
  760. tails := int64(400)
  761. // follow logs
  762. podLogOpts := v1.PodLogOptions{
  763. Follow: true,
  764. TailLines: &tails,
  765. Container: container,
  766. }
  767. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  768. podLogs, err := req.Stream(context.TODO())
  769. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  770. // we'd like to pass this through to the client.
  771. if err != nil && errors.IsBadRequest(err) {
  772. return &BadRequestError{err.Error()}
  773. } else if err != nil {
  774. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  775. }
  776. r := bufio.NewReader(podLogs)
  777. errorchan := make(chan error)
  778. var wg sync.WaitGroup
  779. var once sync.Once
  780. wg.Add(2)
  781. go func() {
  782. wg.Wait()
  783. close(errorchan)
  784. }()
  785. go func() {
  786. defer func() {
  787. if r := recover(); r != nil {
  788. // TODO: add method to alert on panic
  789. return
  790. }
  791. }()
  792. // listens for websocket closing handshake
  793. defer wg.Done()
  794. for {
  795. if _, _, err := rw.ReadMessage(); err != nil {
  796. errorchan <- nil
  797. return
  798. }
  799. }
  800. }()
  801. go func() {
  802. defer func() {
  803. if r := recover(); r != nil {
  804. // TODO: add method to alert on panic
  805. return
  806. }
  807. }()
  808. defer wg.Done()
  809. for {
  810. bytes, err := r.ReadBytes('\n')
  811. if err != nil {
  812. errorchan <- err
  813. return
  814. }
  815. if _, writeErr := rw.Write(bytes); writeErr != nil {
  816. errorchan <- writeErr
  817. return
  818. }
  819. }
  820. }()
  821. for err = range errorchan {
  822. // only call these methods a single time
  823. once.Do(func() {
  824. rw.Close()
  825. podLogs.Close()
  826. })
  827. }
  828. return err
  829. }
  830. // GetPodLogs streams real-time logs from a given pod.
  831. func (a *Agent) GetPreviousPodLogs(namespace string, name string, selectedContainer string) ([]string, error) {
  832. // get the pod to read in the list of contains
  833. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  834. context.Background(),
  835. name,
  836. metav1.GetOptions{},
  837. )
  838. if err != nil && errors.IsNotFound(err) {
  839. return nil, IsNotFoundError
  840. } else if err != nil {
  841. return nil, fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  842. }
  843. container := pod.Spec.Containers[0].Name
  844. if len(selectedContainer) > 0 {
  845. container = selectedContainer
  846. }
  847. tails := int64(400)
  848. // follow logs
  849. podLogOpts := v1.PodLogOptions{
  850. Follow: true,
  851. TailLines: &tails,
  852. Container: container,
  853. Previous: true,
  854. }
  855. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  856. podLogs, err := req.Stream(context.TODO())
  857. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  858. // we'd like to pass this through to the client.
  859. if err != nil && strings.Contains(err.Error(), "not found") {
  860. return nil, IsNotFoundError
  861. }
  862. if err != nil && errors.IsBadRequest(err) {
  863. return nil, &BadRequestError{err.Error()}
  864. } else if err != nil {
  865. return nil, fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  866. }
  867. defer podLogs.Close()
  868. r := bufio.NewReader(podLogs)
  869. var logs []string
  870. for {
  871. line, err := r.ReadString('\n')
  872. logs = append(logs, line)
  873. if err == io.EOF {
  874. break
  875. } else if err != nil {
  876. return nil, err
  877. }
  878. }
  879. return logs, nil
  880. }
  881. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  882. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  883. jobPods, err := a.GetJobPods(namespace, name)
  884. if err != nil {
  885. return err
  886. }
  887. podName := jobPods[0].ObjectMeta.Name
  888. restConf, err := a.RESTClientGetter.ToRESTConfig()
  889. restConf.GroupVersion = &schema.GroupVersion{
  890. Group: "api",
  891. Version: "v1",
  892. }
  893. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  894. restClient, err := rest.RESTClientFor(restConf)
  895. if err != nil {
  896. return err
  897. }
  898. req := restClient.Post().
  899. Resource("pods").
  900. Name(podName).
  901. Namespace(namespace).
  902. SubResource("exec")
  903. req.Param("command", "./signal.sh")
  904. req.Param("container", "sidecar")
  905. req.Param("stdin", "true")
  906. req.Param("stdout", "false")
  907. req.Param("tty", "false")
  908. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  909. if err != nil {
  910. return err
  911. }
  912. return exec.Stream(remotecommand.StreamOptions{
  913. Tty: false,
  914. Stdin: strings.NewReader("./signal.sh"),
  915. })
  916. }
  917. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  918. // the task some number of times until failing
  919. func (a *Agent) RunWebsocketTask(task func() error) error {
  920. lastTime := int64(0)
  921. for {
  922. if err := a.UpdateClientset(); err != nil {
  923. return err
  924. }
  925. err := task()
  926. if err == nil {
  927. return nil
  928. }
  929. if !errors2.Is(err, &AuthError{}) {
  930. return err
  931. }
  932. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  933. return err
  934. }
  935. lastTime = time.Now().Unix()
  936. }
  937. }
  938. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  939. // TODO: Support Jobs
  940. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  941. run := func() error {
  942. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  943. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  944. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  945. options.LabelSelector = selectors
  946. }
  947. factory := informers.NewSharedInformerFactoryWithOptions(
  948. a.Clientset,
  949. 0,
  950. informers.WithTweakListOptions(tweakListOptionsFunc),
  951. )
  952. var informer cache.SharedInformer
  953. // Spins up an informer depending on kind. Convert to lowercase for robustness
  954. switch strings.ToLower(kind) {
  955. case "deployment":
  956. informer = factory.Apps().V1().Deployments().Informer()
  957. case "statefulset":
  958. informer = factory.Apps().V1().StatefulSets().Informer()
  959. case "replicaset":
  960. informer = factory.Apps().V1().ReplicaSets().Informer()
  961. case "daemonset":
  962. informer = factory.Apps().V1().DaemonSets().Informer()
  963. case "job":
  964. informer = factory.Batch().V1().Jobs().Informer()
  965. case "cronjob":
  966. informer = factory.Batch().V1beta1().CronJobs().Informer()
  967. case "namespace":
  968. informer = factory.Core().V1().Namespaces().Informer()
  969. case "pod":
  970. informer = factory.Core().V1().Pods().Informer()
  971. }
  972. stopper := make(chan struct{})
  973. errorchan := make(chan error)
  974. var wg sync.WaitGroup
  975. var once sync.Once
  976. var err error
  977. wg.Add(2)
  978. go func() {
  979. wg.Wait()
  980. close(errorchan)
  981. }()
  982. go func() {
  983. defer func() {
  984. if r := recover(); r != nil {
  985. // TODO: add method to alert on panic
  986. return
  987. }
  988. }()
  989. // listens for websocket closing handshake
  990. defer wg.Done()
  991. for {
  992. if _, _, err := rw.ReadMessage(); err != nil {
  993. errorchan <- nil
  994. return
  995. }
  996. }
  997. }()
  998. go func() {
  999. defer func() {
  1000. if r := recover(); r != nil {
  1001. // TODO: add method to alert on panic
  1002. return
  1003. }
  1004. }()
  1005. // listens for websocket closing handshake
  1006. defer wg.Done()
  1007. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1008. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1009. errorchan <- &AuthError{}
  1010. }
  1011. })
  1012. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1013. UpdateFunc: func(oldObj, newObj interface{}) {
  1014. msg := Message{
  1015. EventType: "UPDATE",
  1016. Object: newObj,
  1017. Kind: strings.ToLower(kind),
  1018. }
  1019. err := rw.WriteJSON(msg)
  1020. if err != nil {
  1021. errorchan <- err
  1022. }
  1023. },
  1024. AddFunc: func(obj interface{}) {
  1025. msg := Message{
  1026. EventType: "ADD",
  1027. Object: obj,
  1028. Kind: strings.ToLower(kind),
  1029. }
  1030. err := rw.WriteJSON(msg)
  1031. if err != nil {
  1032. errorchan <- err
  1033. }
  1034. },
  1035. DeleteFunc: func(obj interface{}) {
  1036. msg := Message{
  1037. EventType: "DELETE",
  1038. Object: obj,
  1039. Kind: strings.ToLower(kind),
  1040. }
  1041. err := rw.WriteJSON(msg)
  1042. if err != nil {
  1043. errorchan <- err
  1044. }
  1045. },
  1046. })
  1047. informer.Run(stopper)
  1048. }()
  1049. for err = range errorchan {
  1050. once.Do(func() {
  1051. close(stopper)
  1052. rw.Close()
  1053. })
  1054. }
  1055. return err
  1056. }
  1057. return a.RunWebsocketTask(run)
  1058. }
  1059. var b64 = base64.StdEncoding
  1060. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  1061. func decodeRelease(data string) (*rspb.Release, error) {
  1062. // base64 decode string
  1063. b, err := b64.DecodeString(data)
  1064. if err != nil {
  1065. return nil, err
  1066. }
  1067. // For backwards compatibility with releases that were stored before
  1068. // compression was introduced we skip decompression if the
  1069. // gzip magic header is not found
  1070. if bytes.Equal(b[0:3], magicGzip) {
  1071. r, err := gzip.NewReader(bytes.NewReader(b))
  1072. if err != nil {
  1073. return nil, err
  1074. }
  1075. defer r.Close()
  1076. b2, err := ioutil.ReadAll(r)
  1077. if err != nil {
  1078. return nil, err
  1079. }
  1080. b = b2
  1081. }
  1082. var rls rspb.Release
  1083. // unmarshal release object bytes
  1084. if err := json.Unmarshal(b, &rls); err != nil {
  1085. return nil, err
  1086. }
  1087. return &rls, nil
  1088. }
  1089. func contains(s []string, str string) bool {
  1090. for _, v := range s {
  1091. if v == str {
  1092. return true
  1093. }
  1094. }
  1095. return false
  1096. }
  1097. func ParseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  1098. if secret.Type != "helm.sh/release.v1" {
  1099. return nil, true, nil
  1100. }
  1101. releaseData, ok := secret.Data["release"]
  1102. if !ok {
  1103. return nil, true, fmt.Errorf("release field not found")
  1104. }
  1105. helm_object, err := decodeRelease(string(releaseData))
  1106. if err != nil {
  1107. return nil, true, err
  1108. }
  1109. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  1110. return nil, true, nil
  1111. }
  1112. return helm_object, false, nil
  1113. }
  1114. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1115. run := func() error {
  1116. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1117. options.LabelSelector = selectors
  1118. }
  1119. factory := informers.NewSharedInformerFactoryWithOptions(
  1120. a.Clientset,
  1121. 0,
  1122. informers.WithTweakListOptions(tweakListOptionsFunc),
  1123. informers.WithNamespace(namespace),
  1124. )
  1125. informer := factory.Core().V1().Secrets().Informer()
  1126. stopper := make(chan struct{})
  1127. errorchan := make(chan error)
  1128. var wg sync.WaitGroup
  1129. var once sync.Once
  1130. var err error
  1131. wg.Add(2)
  1132. go func() {
  1133. wg.Wait()
  1134. close(errorchan)
  1135. }()
  1136. go func() {
  1137. defer func() {
  1138. if r := recover(); r != nil {
  1139. // TODO: add method to alert on panic
  1140. return
  1141. }
  1142. }()
  1143. // listens for websocket closing handshake
  1144. defer wg.Done()
  1145. for {
  1146. if _, _, err := rw.ReadMessage(); err != nil {
  1147. errorchan <- nil
  1148. return
  1149. }
  1150. }
  1151. }()
  1152. go func() {
  1153. defer func() {
  1154. if r := recover(); r != nil {
  1155. // TODO: add method to alert on panic
  1156. return
  1157. }
  1158. }()
  1159. // listens for websocket closing handshake
  1160. defer wg.Done()
  1161. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1162. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1163. errorchan <- &AuthError{}
  1164. }
  1165. })
  1166. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1167. UpdateFunc: func(oldObj, newObj interface{}) {
  1168. secretObj, ok := newObj.(*v1.Secret)
  1169. if !ok {
  1170. errorchan <- fmt.Errorf("could not cast to secret")
  1171. return
  1172. }
  1173. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1174. if isNotHelmRelease && err == nil {
  1175. return
  1176. }
  1177. if err != nil {
  1178. errorchan <- err
  1179. return
  1180. }
  1181. msg := Message{
  1182. EventType: "UPDATE",
  1183. Object: helm_object,
  1184. }
  1185. rw.WriteJSON(msg)
  1186. },
  1187. AddFunc: func(obj interface{}) {
  1188. secretObj, ok := obj.(*v1.Secret)
  1189. if !ok {
  1190. errorchan <- fmt.Errorf("could not cast to secret")
  1191. return
  1192. }
  1193. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1194. if isNotHelmRelease && err == nil {
  1195. return
  1196. }
  1197. if err != nil {
  1198. errorchan <- err
  1199. return
  1200. }
  1201. msg := Message{
  1202. EventType: "ADD",
  1203. Object: helm_object,
  1204. }
  1205. rw.WriteJSON(msg)
  1206. },
  1207. DeleteFunc: func(obj interface{}) {
  1208. secretObj, ok := obj.(*v1.Secret)
  1209. if !ok {
  1210. errorchan <- fmt.Errorf("could not cast to secret")
  1211. return
  1212. }
  1213. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1214. if isNotHelmRelease && err == nil {
  1215. return
  1216. }
  1217. if err != nil {
  1218. errorchan <- err
  1219. return
  1220. }
  1221. msg := Message{
  1222. EventType: "DELETE",
  1223. Object: helm_object,
  1224. }
  1225. rw.WriteJSON(msg)
  1226. },
  1227. })
  1228. informer.Run(stopper)
  1229. }()
  1230. for err = range errorchan {
  1231. once.Do(func() {
  1232. close(stopper)
  1233. rw.Close()
  1234. })
  1235. }
  1236. return err
  1237. }
  1238. return a.RunWebsocketTask(run)
  1239. }
  1240. // CreateImagePullSecrets will create the required image pull secrets and
  1241. // return a map from the registry name to the name of the secret.
  1242. func (a *Agent) CreateImagePullSecrets(
  1243. repo repository.Repository,
  1244. namespace string,
  1245. linkedRegs map[string]*models.Registry,
  1246. doAuth *oauth2.Config,
  1247. ) (map[string]string, error) {
  1248. res := make(map[string]string)
  1249. for key, val := range linkedRegs {
  1250. _reg := registry.Registry(*val)
  1251. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1252. if err != nil {
  1253. return nil, err
  1254. }
  1255. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1256. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1257. context.TODO(),
  1258. secretName,
  1259. metav1.GetOptions{},
  1260. )
  1261. // if not found, create the secret
  1262. if err != nil && errors.IsNotFound(err) {
  1263. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1264. context.TODO(),
  1265. &v1.Secret{
  1266. ObjectMeta: metav1.ObjectMeta{
  1267. Name: secretName,
  1268. },
  1269. Data: map[string][]byte{
  1270. string(v1.DockerConfigJsonKey): data,
  1271. },
  1272. Type: v1.SecretTypeDockerConfigJson,
  1273. },
  1274. metav1.CreateOptions{},
  1275. )
  1276. if err != nil {
  1277. return nil, err
  1278. }
  1279. // add secret name to the map
  1280. res[key] = secretName
  1281. continue
  1282. } else if err != nil {
  1283. return nil, err
  1284. }
  1285. // otherwise, check that the secret contains the correct data: if
  1286. // if doesn't, update it
  1287. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1288. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1289. context.TODO(),
  1290. &v1.Secret{
  1291. ObjectMeta: metav1.ObjectMeta{
  1292. Name: secretName,
  1293. },
  1294. Data: map[string][]byte{
  1295. string(v1.DockerConfigJsonKey): data,
  1296. },
  1297. Type: v1.SecretTypeDockerConfigJson,
  1298. },
  1299. metav1.UpdateOptions{},
  1300. )
  1301. if err != nil {
  1302. return nil, err
  1303. }
  1304. }
  1305. // add secret name to the map
  1306. res[key] = secretName
  1307. }
  1308. return res, nil
  1309. }
  1310. // helper that waits for pod to be ready
  1311. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1312. var (
  1313. w watch.Interface
  1314. err error
  1315. ok bool
  1316. )
  1317. // immediately after creating a pod, the API may return a 404. heuristically 1
  1318. // second seems to be plenty.
  1319. watchRetries := 3
  1320. for i := 0; i < watchRetries; i++ {
  1321. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1322. w, err = a.Clientset.CoreV1().
  1323. Pods(pod.Namespace).
  1324. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1325. if err == nil {
  1326. break
  1327. }
  1328. time.Sleep(time.Second)
  1329. }
  1330. if err != nil {
  1331. return err, false
  1332. }
  1333. defer w.Stop()
  1334. expireTime := time.Now().Add(time.Second * 30)
  1335. for time.Now().Unix() <= expireTime.Unix() {
  1336. select {
  1337. case <-time.NewTicker(time.Second).C:
  1338. // poll every second in case we already missed the ready event while
  1339. // creating the listener.
  1340. pod, err = a.Clientset.CoreV1().
  1341. Pods(pod.Namespace).
  1342. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1343. if err != nil && errors.IsNotFound(err) {
  1344. return IsNotFoundError, false
  1345. } else if err != nil {
  1346. return err, false
  1347. }
  1348. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1349. return nil, isExited
  1350. }
  1351. case evt := <-w.ResultChan():
  1352. pod, ok = evt.Object.(*v1.Pod)
  1353. if !ok {
  1354. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1355. }
  1356. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1357. return nil, isExited
  1358. }
  1359. }
  1360. }
  1361. return goerrors.New("timed out waiting for pod"), false
  1362. }
  1363. func isPodReady(pod *v1.Pod) bool {
  1364. ready := false
  1365. conditions := pod.Status.Conditions
  1366. for i := range conditions {
  1367. if conditions[i].Type == v1.PodReady {
  1368. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1369. }
  1370. }
  1371. return ready
  1372. }
  1373. func isPodExited(pod *v1.Pod) bool {
  1374. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1375. }