agent.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. goerrors "errors"
  17. "github.com/porter-dev/porter/api/server/shared/websocket"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/registry"
  20. "github.com/porter-dev/porter/internal/repository"
  21. "golang.org/x/oauth2"
  22. "github.com/porter-dev/porter/internal/helm/grapher"
  23. appsv1 "k8s.io/api/apps/v1"
  24. batchv1 "k8s.io/api/batch/v1"
  25. v1 "k8s.io/api/core/v1"
  26. v1beta1 "k8s.io/api/extensions/v1beta1"
  27. netv1 "k8s.io/api/networking/v1"
  28. netv1beta1 "k8s.io/api/networking/v1beta1"
  29. "k8s.io/apimachinery/pkg/api/errors"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/runtime"
  33. "k8s.io/apimachinery/pkg/runtime/schema"
  34. "k8s.io/apimachinery/pkg/types"
  35. "k8s.io/apimachinery/pkg/watch"
  36. "k8s.io/cli-runtime/pkg/genericclioptions"
  37. "k8s.io/client-go/informers"
  38. "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/rest"
  40. "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/remotecommand"
  42. "k8s.io/kubectl/pkg/scheme"
  43. rspb "github.com/stefanmcshane/helm/pkg/release"
  44. istiov1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
  45. versionedclient "istio.io/client-go/pkg/clientset/versioned"
  46. )
  47. // Agent is a Kubernetes agent for performing operations that interact with the
  48. // api server. Do not create this struct directly, use NewKubernetesAgent instead.
  49. type Agent struct {
  50. RESTClientGetter genericclioptions.RESTClientGetter
  51. Clientset kubernetes.Interface
  52. // context is used here as a workaround since RESTClientGetter and kubernetes.Interface do not support contexts
  53. context context.Context
  54. }
  55. type Message struct {
  56. EventType string `json:"event_type"`
  57. Object interface{}
  58. Kind string
  59. }
  60. type ListOptions struct {
  61. FieldSelector string
  62. }
  63. type AuthError struct{}
  64. func (e *AuthError) Error() string {
  65. return "Unauthorized error"
  66. }
  67. // NewKubernetesAgent creates a new agent for accessing kubernetes on a cluster
  68. func NewKubernetesAgent(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, clientset kubernetes.Interface) Agent {
  69. return Agent{
  70. RESTClientGetter: restClientGetter,
  71. Clientset: clientset,
  72. context: ctx,
  73. }
  74. }
  75. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  76. func (a *Agent) UpdateClientset() error {
  77. restConf, err := a.RESTClientGetter.ToRESTConfig()
  78. if err != nil {
  79. return err
  80. }
  81. clientset, err := kubernetes.NewForConfig(restConf)
  82. if err != nil {
  83. return err
  84. }
  85. a.Clientset = clientset
  86. return nil
  87. }
  88. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  89. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  90. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  91. context.TODO(),
  92. &v1.ConfigMap{
  93. ObjectMeta: metav1.ObjectMeta{
  94. Name: name,
  95. Namespace: namespace,
  96. Labels: map[string]string{
  97. "porter": "true",
  98. },
  99. },
  100. Data: configMap,
  101. },
  102. metav1.CreateOptions{},
  103. )
  104. }
  105. func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
  106. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  107. context.TODO(),
  108. &v1.ConfigMap{
  109. ObjectMeta: metav1.ObjectMeta{
  110. Name: fmt.Sprintf("%s.v%d", name, version),
  111. Namespace: namespace,
  112. Labels: map[string]string{
  113. "owner": "porter",
  114. "envgroup": name,
  115. "version": fmt.Sprintf("%d", version),
  116. },
  117. Annotations: map[string]string{
  118. PorterAppAnnotationName: strings.Join(apps, ","),
  119. },
  120. },
  121. Data: configMap,
  122. },
  123. metav1.CreateOptions{},
  124. )
  125. }
  126. const PorterAppAnnotationName = "porter.run/apps"
  127. func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  128. annons := cm.Annotations
  129. if annons == nil {
  130. annons = make(map[string]string)
  131. }
  132. appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
  133. if !appAnnonExists || appAnnon == "" {
  134. annons[PorterAppAnnotationName] = appName
  135. } else {
  136. appStrArr := strings.Split(appAnnon, ",")
  137. foundApp := false
  138. for _, appStr := range appStrArr {
  139. if appStr == appName {
  140. foundApp = true
  141. }
  142. }
  143. if !foundApp {
  144. annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
  145. }
  146. }
  147. cm.SetAnnotations(annons)
  148. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  149. context.TODO(),
  150. cm,
  151. metav1.UpdateOptions{},
  152. )
  153. }
  154. func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  155. annons := cm.Annotations
  156. if annons == nil {
  157. annons = make(map[string]string)
  158. }
  159. appAnn, appAnnExists := annons[PorterAppAnnotationName]
  160. if !appAnnExists {
  161. return nil, IsNotFoundError
  162. }
  163. appStrArr := strings.Split(appAnn, ",")
  164. newStrArr := make([]string, 0)
  165. for _, appStr := range appStrArr {
  166. if appStr != appName {
  167. newStrArr = append(newStrArr, appStr)
  168. }
  169. }
  170. annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
  171. cm.SetAnnotations(annons)
  172. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  173. context.TODO(),
  174. cm,
  175. metav1.UpdateOptions{},
  176. )
  177. }
  178. func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
  179. return a.Clientset.CoreV1().Secrets(namespace).Create(
  180. context.TODO(),
  181. &v1.Secret{
  182. ObjectMeta: metav1.ObjectMeta{
  183. Name: fmt.Sprintf("%s.v%d", name, version),
  184. Namespace: namespace,
  185. Labels: map[string]string{
  186. "owner": "porter",
  187. "envgroup": name,
  188. "version": fmt.Sprintf("%d", version),
  189. "configmap": cmName,
  190. },
  191. },
  192. Data: data,
  193. },
  194. metav1.CreateOptions{},
  195. )
  196. }
  197. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  198. // base64 encoded
  199. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  200. return a.Clientset.CoreV1().Secrets(namespace).Create(
  201. context.TODO(),
  202. &v1.Secret{
  203. ObjectMeta: metav1.ObjectMeta{
  204. Name: name,
  205. Namespace: namespace,
  206. Labels: map[string]string{
  207. "porter": "true",
  208. "configmap": cmName,
  209. },
  210. },
  211. Data: data,
  212. },
  213. metav1.CreateOptions{},
  214. )
  215. }
  216. type mergeConfigMapData struct {
  217. Data map[string]*string `json:"data"`
  218. }
  219. // UpdateConfigMap updates the configmap given its name and namespace
  220. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  221. cmData := make(map[string]*string)
  222. for key, val := range configMap {
  223. valCopy := val
  224. cmData[key] = &valCopy
  225. if len(val) == 0 {
  226. cmData[key] = nil
  227. }
  228. }
  229. mergeCM := &mergeConfigMapData{
  230. Data: cmData,
  231. }
  232. patchBytes, err := json.Marshal(mergeCM)
  233. if err != nil {
  234. return nil, err
  235. }
  236. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  237. context.Background(),
  238. name,
  239. types.MergePatchType,
  240. patchBytes,
  241. metav1.PatchOptions{},
  242. )
  243. }
  244. type mergeLinkedSecretData struct {
  245. Data map[string]*[]byte `json:"data"`
  246. }
  247. // UpdateLinkedSecret updates the secret given its name and namespace
  248. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  249. secretData := make(map[string]*[]byte)
  250. for key, val := range data {
  251. valCopy := val
  252. secretData[key] = &valCopy
  253. if len(val) == 0 {
  254. secretData[key] = nil
  255. }
  256. }
  257. mergeSecret := &mergeLinkedSecretData{
  258. Data: secretData,
  259. }
  260. patchBytes, err := json.Marshal(mergeSecret)
  261. if err != nil {
  262. return err
  263. }
  264. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  265. context.TODO(),
  266. name,
  267. types.MergePatchType,
  268. patchBytes,
  269. metav1.PatchOptions{},
  270. )
  271. return err
  272. }
  273. // DeleteConfigMap deletes the configmap given its name and namespace
  274. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  275. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  276. context.TODO(),
  277. name,
  278. metav1.DeleteOptions{},
  279. )
  280. }
  281. // DeleteLinkedSecret deletes the secret given its name and namespace
  282. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  283. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  284. context.TODO(),
  285. name,
  286. metav1.DeleteOptions{},
  287. )
  288. }
  289. func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
  290. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  291. context.Background(),
  292. metav1.ListOptions{
  293. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  294. },
  295. )
  296. if err != nil {
  297. return nil, err
  298. }
  299. return listResp.Items, nil
  300. }
  301. func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
  302. return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
  303. context.Background(),
  304. metav1.DeleteOptions{},
  305. metav1.ListOptions{
  306. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  307. },
  308. )
  309. }
  310. func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
  311. return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
  312. context.Background(),
  313. metav1.DeleteOptions{},
  314. metav1.ListOptions{
  315. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  316. },
  317. )
  318. }
  319. func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
  320. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  321. context.Background(),
  322. metav1.ListOptions{
  323. LabelSelector: fmt.Sprintf("envgroup"),
  324. },
  325. )
  326. if err != nil {
  327. return nil, err
  328. }
  329. // only keep the latest version for each configmap
  330. latestMap := make(map[string]v1.ConfigMap)
  331. for _, configmap := range listResp.Items {
  332. egName, egNameExists := configmap.Labels["envgroup"]
  333. if !egNameExists {
  334. continue
  335. }
  336. id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
  337. if currLatest, exists := latestMap[id]; exists {
  338. // get version
  339. currVersionStr, currVersionExists := currLatest.Labels["version"]
  340. versionStr, versionExists := configmap.Labels["version"]
  341. if versionExists && currVersionExists {
  342. currVersion, currErr := strconv.Atoi(currVersionStr)
  343. version, err := strconv.Atoi(versionStr)
  344. if currErr == nil && err == nil && currVersion < version {
  345. latestMap[id] = configmap
  346. }
  347. }
  348. } else {
  349. latestMap[id] = configmap
  350. }
  351. }
  352. res := make([]v1.ConfigMap, 0)
  353. for _, cm := range latestMap {
  354. res = append(res, cm)
  355. }
  356. return res, nil
  357. }
  358. // GetConfigMap retrieves the configmap given its name and namespace
  359. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  360. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  361. context.TODO(),
  362. name,
  363. metav1.GetOptions{},
  364. )
  365. }
  366. func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
  367. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  368. context.Background(),
  369. metav1.ListOptions{
  370. LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
  371. },
  372. )
  373. if err != nil {
  374. return nil, err
  375. }
  376. if listResp.Items == nil || len(listResp.Items) == 0 {
  377. return nil, IsNotFoundError
  378. }
  379. // if the length of the list is greater than 1, return an error -- this shouldn't happen
  380. if len(listResp.Items) > 1 {
  381. return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
  382. }
  383. return &listResp.Items[0], nil
  384. }
  385. func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
  386. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  387. context.Background(),
  388. metav1.ListOptions{
  389. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  390. },
  391. )
  392. if err != nil {
  393. return nil, 0, err
  394. }
  395. if listResp.Items == nil || len(listResp.Items) == 0 {
  396. return nil, 0, IsNotFoundError
  397. }
  398. // iterate through the configmaps and get the greatest version
  399. var res *v1.ConfigMap
  400. var latestVersion uint
  401. for _, configmap := range listResp.Items {
  402. if res == nil {
  403. versionStr, versionExists := configmap.Labels["version"]
  404. if !versionExists {
  405. continue
  406. }
  407. version, err := strconv.Atoi(versionStr)
  408. if err != nil {
  409. continue
  410. }
  411. latestV := configmap
  412. res = &latestV
  413. latestVersion = uint(version)
  414. } else {
  415. // get version
  416. versionStr, versionExists := configmap.Labels["version"]
  417. currVersionStr, currVersionExists := res.Labels["version"]
  418. if versionExists && currVersionExists {
  419. currVersion, currErr := strconv.Atoi(currVersionStr)
  420. version, err := strconv.Atoi(versionStr)
  421. if currErr == nil && err == nil && currVersion < version {
  422. latestV := configmap
  423. res = &latestV
  424. latestVersion = uint(version)
  425. }
  426. }
  427. }
  428. }
  429. if res == nil {
  430. return nil, 0, IsNotFoundError
  431. }
  432. return res, latestVersion, nil
  433. }
  434. func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
  435. listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
  436. context.Background(),
  437. metav1.ListOptions{
  438. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  439. },
  440. )
  441. if err != nil {
  442. return nil, 0, err
  443. }
  444. if listResp.Items == nil || len(listResp.Items) == 0 {
  445. return nil, 0, IsNotFoundError
  446. }
  447. // iterate through the configmaps and get the greatest version
  448. var res *v1.Secret
  449. var latestVersion uint
  450. for _, secret := range listResp.Items {
  451. if res == nil {
  452. versionStr, versionExists := secret.Labels["version"]
  453. if !versionExists {
  454. continue
  455. }
  456. version, err := strconv.Atoi(versionStr)
  457. if err != nil {
  458. continue
  459. }
  460. latestV := secret
  461. res = &latestV
  462. latestVersion = uint(version)
  463. } else {
  464. // get version
  465. versionStr, versionExists := secret.Labels["version"]
  466. currVersionStr, currVersionExists := res.Labels["version"]
  467. if versionExists && currVersionExists {
  468. currVersion, currErr := strconv.Atoi(currVersionStr)
  469. version, err := strconv.Atoi(versionStr)
  470. if currErr == nil && err == nil && currVersion < version {
  471. latestV := secret
  472. res = &latestV
  473. latestVersion = uint(version)
  474. }
  475. }
  476. }
  477. }
  478. if res == nil {
  479. return nil, 0, IsNotFoundError
  480. }
  481. return res, latestVersion, nil
  482. }
  483. // GetSecret retrieves the secret given its name and namespace
  484. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  485. return a.Clientset.CoreV1().Secrets(namespace).Get(
  486. context.TODO(),
  487. name,
  488. metav1.GetOptions{},
  489. )
  490. }
  491. // ListConfigMaps simply lists namespaces
  492. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  493. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  494. context.TODO(),
  495. metav1.ListOptions{
  496. LabelSelector: "porter",
  497. },
  498. )
  499. }
  500. // ListEvents lists the events of a given object.
  501. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  502. return a.Clientset.CoreV1().Events(namespace).List(
  503. context.TODO(),
  504. metav1.ListOptions{
  505. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  506. },
  507. )
  508. }
  509. // ListNamespaces simply lists namespaces
  510. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  511. return a.Clientset.CoreV1().Namespaces().List(
  512. context.TODO(),
  513. metav1.ListOptions{},
  514. )
  515. }
  516. // CreateNamespace creates a namespace with the given name.
  517. func (a *Agent) CreateNamespace(name string, labels map[string]string) (*v1.Namespace, error) {
  518. // check if namespace exists
  519. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  520. context.TODO(),
  521. name,
  522. metav1.GetOptions{},
  523. )
  524. if err == nil && checkNS != nil {
  525. if checkNS.Status.Phase == v1.NamespaceTerminating {
  526. // edge case for when the same namespace was previously created
  527. // but was deleted and is currently in the "Terminating" phase
  528. // let us wait for a maximum of 10 seconds
  529. timeNow := time.Now().Add(10 * time.Second)
  530. stillTerminating := true
  531. for {
  532. _, err := a.Clientset.CoreV1().Namespaces().Get(
  533. context.TODO(),
  534. name,
  535. metav1.GetOptions{},
  536. )
  537. if err != nil && errors.IsNotFound(err) {
  538. stillTerminating = false
  539. break
  540. }
  541. time.Sleep(time.Second)
  542. if time.Now().After(timeNow) {
  543. break
  544. }
  545. }
  546. if stillTerminating {
  547. // the namespace has been in the "Terminating" phase
  548. return nil, fmt.Errorf("cannot create namespace %s, stuck in \"Terminating\" phase", name)
  549. }
  550. } else {
  551. return checkNS, nil
  552. }
  553. }
  554. namespace := &v1.Namespace{
  555. ObjectMeta: metav1.ObjectMeta{
  556. Name: name,
  557. },
  558. }
  559. if len(labels) > 0 {
  560. namespace.SetLabels(labels)
  561. }
  562. return a.Clientset.CoreV1().Namespaces().Create(
  563. context.TODO(),
  564. namespace,
  565. metav1.CreateOptions{},
  566. )
  567. }
  568. // GetNamespace gets the namespace given the name
  569. func (a *Agent) GetNamespace(name string) (*v1.Namespace, error) {
  570. ns, err := a.Clientset.CoreV1().Namespaces().Get(
  571. context.Background(),
  572. name,
  573. metav1.GetOptions{},
  574. )
  575. if err != nil {
  576. return nil, err
  577. }
  578. return ns, nil
  579. }
  580. // DeleteNamespace deletes the namespace given the name.
  581. func (a *Agent) DeleteNamespace(name string) error {
  582. // check if namespace exists
  583. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  584. context.TODO(),
  585. name,
  586. metav1.GetOptions{},
  587. )
  588. // if the namespace is not found, don't return an error.
  589. if err != nil && errors.IsNotFound(err) {
  590. return nil
  591. }
  592. // if the namespace was found but is in the "Terminating" phase
  593. // we should ignore it and not return an error
  594. if checkNS != nil && checkNS.Status.Phase == v1.NamespaceTerminating {
  595. return nil
  596. }
  597. return a.Clientset.CoreV1().Namespaces().Delete(
  598. context.TODO(),
  599. name,
  600. metav1.DeleteOptions{},
  601. )
  602. }
  603. func (a *Agent) GetPorterAgent() (*appsv1.Deployment, error) {
  604. depl, err := a.Clientset.AppsV1().Deployments("porter-agent-system").Get(
  605. context.TODO(),
  606. "porter-agent-controller-manager",
  607. metav1.GetOptions{},
  608. )
  609. if err != nil && errors.IsNotFound(err) {
  610. return nil, IsNotFoundError
  611. }
  612. return depl, err
  613. }
  614. // ListJobsByLabel lists jobs in a namespace matching a label
  615. type Label struct {
  616. Key string
  617. Val string
  618. }
  619. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  620. selectors := make([]string, 0)
  621. for _, label := range labels {
  622. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  623. }
  624. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  625. context.TODO(),
  626. metav1.ListOptions{
  627. LabelSelector: strings.Join(selectors, ","),
  628. },
  629. )
  630. if err != nil {
  631. return nil, err
  632. }
  633. return resp.Items, nil
  634. }
  635. // StreamJobs streams a list of jobs to the websocket writer, closing the connection once all jobs have been sent
  636. func (a *Agent) StreamJobs(namespace string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  637. run := func() error {
  638. errorchan := make(chan error)
  639. ctx, cancel := context.WithCancel(context.Background())
  640. defer cancel()
  641. var wg sync.WaitGroup
  642. var once sync.Once
  643. var err error
  644. wg.Add(2)
  645. go func() {
  646. wg.Wait()
  647. close(errorchan)
  648. }()
  649. go func() {
  650. defer func() {
  651. if r := recover(); r != nil {
  652. // TODO: add method to alert on panic
  653. return
  654. }
  655. }()
  656. // listens for websocket closing handshake
  657. defer wg.Done()
  658. for {
  659. if _, _, err := rw.ReadMessage(); err != nil {
  660. errorchan <- nil
  661. return
  662. }
  663. }
  664. }()
  665. go func() {
  666. defer func() {
  667. if r := recover(); r != nil {
  668. // TODO: add method to alert on panic
  669. return
  670. }
  671. }()
  672. // listens for websocket closing handshake
  673. defer wg.Done()
  674. continueVal := ""
  675. for {
  676. if ctx.Err() != nil {
  677. errorchan <- nil
  678. return
  679. }
  680. labelSelector := "meta.helm.sh/release-name"
  681. if selectors != "" {
  682. labelSelector = selectors
  683. }
  684. jobs, err := a.Clientset.BatchV1().Jobs(namespace).List(
  685. ctx,
  686. metav1.ListOptions{
  687. Limit: 100,
  688. Continue: continueVal,
  689. LabelSelector: labelSelector,
  690. },
  691. )
  692. if err != nil {
  693. errorchan <- err
  694. return
  695. }
  696. for _, job := range jobs.Items {
  697. err := rw.WriteJSON(job)
  698. if err != nil {
  699. errorchan <- err
  700. return
  701. }
  702. }
  703. if jobs.Continue == "" {
  704. // we have reached the end of the list of jobs
  705. break
  706. } else {
  707. // start pagination
  708. continueVal = jobs.Continue
  709. }
  710. }
  711. // at this point, we can return the status finished
  712. err := rw.WriteJSON(map[string]interface{}{
  713. "streamStatus": "finished",
  714. })
  715. errorchan <- err
  716. }()
  717. for err = range errorchan {
  718. once.Do(func() {
  719. rw.Close()
  720. cancel()
  721. })
  722. }
  723. return err
  724. }
  725. return a.RunWebsocketTask(run)
  726. }
  727. // DeleteJob deletes the job in the given name and namespace.
  728. func (a *Agent) DeleteJob(name, namespace string) error {
  729. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  730. context.TODO(),
  731. name,
  732. metav1.DeleteOptions{},
  733. )
  734. }
  735. // GetJobPods lists all pods belonging to a job in a namespace
  736. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  737. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  738. context.TODO(),
  739. metav1.ListOptions{
  740. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  741. },
  742. )
  743. if err != nil {
  744. return nil, err
  745. }
  746. return resp.Items, nil
  747. }
  748. // GetIngress gets ingress given the name and namespace
  749. func (a *Agent) GetExtensionsV1Beta1Ingress(namespace string, name string) (*v1beta1.Ingress, error) {
  750. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  751. context.TODO(),
  752. name,
  753. metav1.GetOptions{},
  754. )
  755. if err != nil && errors.IsNotFound(err) {
  756. return nil, IsNotFoundError
  757. } else if err != nil {
  758. return nil, err
  759. }
  760. return resp, nil
  761. }
  762. func (a *Agent) GetNetworkingV1Ingress(namespace string, name string) (*netv1.Ingress, error) {
  763. resp, err := a.Clientset.NetworkingV1().Ingresses(namespace).Get(
  764. context.TODO(),
  765. name,
  766. metav1.GetOptions{},
  767. )
  768. if err != nil && errors.IsNotFound(err) {
  769. return nil, IsNotFoundError
  770. } else if err != nil {
  771. return nil, err
  772. }
  773. return resp, nil
  774. }
  775. func (a *Agent) GetNetworkingV1Beta1Ingress(namespace string, name string) (*netv1beta1.Ingress, error) {
  776. resp, err := a.Clientset.NetworkingV1beta1().Ingresses(namespace).Get(
  777. context.TODO(),
  778. name,
  779. metav1.GetOptions{},
  780. )
  781. if err != nil && errors.IsNotFound(err) {
  782. return nil, IsNotFoundError
  783. } else if err != nil {
  784. return nil, err
  785. }
  786. return resp, nil
  787. }
  788. func (a *Agent) GetIstioIngress(namespace, name string) (*istiov1beta1.Gateway, error) {
  789. restConf, err := a.RESTClientGetter.ToRESTConfig()
  790. if err != nil {
  791. return nil, err
  792. }
  793. clientset, err := versionedclient.NewForConfig(restConf)
  794. if err != nil {
  795. return nil, err
  796. }
  797. gateway, err := clientset.NetworkingV1beta1().Gateways(namespace).Get(
  798. context.Background(), name, metav1.GetOptions{},
  799. )
  800. if err != nil {
  801. return nil, err
  802. }
  803. return gateway, nil
  804. }
  805. var IsNotFoundError = fmt.Errorf("not found")
  806. type BadRequestError struct {
  807. msg string
  808. }
  809. func (e *BadRequestError) Error() string {
  810. return e.msg
  811. }
  812. // GetDeployment gets the deployment given the name and namespace
  813. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  814. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  815. context.TODO(),
  816. c.Name,
  817. metav1.GetOptions{},
  818. )
  819. if err != nil && errors.IsNotFound(err) {
  820. return nil, IsNotFoundError
  821. } else if err != nil {
  822. return nil, err
  823. }
  824. res.Kind = c.Kind
  825. return res, nil
  826. }
  827. // GetStatefulSet gets the statefulset given the name and namespace
  828. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  829. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  830. context.TODO(),
  831. c.Name,
  832. metav1.GetOptions{},
  833. )
  834. if err != nil && errors.IsNotFound(err) {
  835. return nil, IsNotFoundError
  836. } else if err != nil {
  837. return nil, err
  838. }
  839. res.Kind = c.Kind
  840. return res, nil
  841. }
  842. // GetReplicaSet gets the replicaset given the name and namespace
  843. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  844. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  845. context.TODO(),
  846. c.Name,
  847. metav1.GetOptions{},
  848. )
  849. if err != nil && errors.IsNotFound(err) {
  850. return nil, IsNotFoundError
  851. } else if err != nil {
  852. return nil, err
  853. }
  854. res.Kind = c.Kind
  855. return res, nil
  856. }
  857. // GetDaemonSet gets the daemonset by name and namespace
  858. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  859. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  860. context.TODO(),
  861. c.Name,
  862. metav1.GetOptions{},
  863. )
  864. if err != nil && errors.IsNotFound(err) {
  865. return nil, IsNotFoundError
  866. } else if err != nil {
  867. return nil, err
  868. }
  869. res.Kind = c.Kind
  870. return res, nil
  871. }
  872. // GetJob gets the job by name and namespace
  873. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  874. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  875. context.TODO(),
  876. c.Name,
  877. metav1.GetOptions{},
  878. )
  879. if err != nil && errors.IsNotFound(err) {
  880. return nil, IsNotFoundError
  881. } else if err != nil {
  882. return nil, err
  883. }
  884. res.Kind = c.Kind
  885. return res, nil
  886. }
  887. // GetCronJob gets the CronJob by name and namespace
  888. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1.CronJob, error) {
  889. res, err := a.Clientset.BatchV1().CronJobs(c.Namespace).Get(
  890. context.TODO(),
  891. c.Name,
  892. metav1.GetOptions{},
  893. )
  894. if err != nil && errors.IsNotFound(err) {
  895. return nil, IsNotFoundError
  896. } else if err != nil {
  897. return nil, err
  898. }
  899. res.Kind = c.Kind
  900. return res, nil
  901. }
  902. // GetPodsByLabel retrieves pods with matching labels
  903. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  904. // Search in all namespaces for matching pods
  905. return a.Clientset.CoreV1().Pods(namespace).List(
  906. context.TODO(),
  907. metav1.ListOptions{
  908. LabelSelector: selector,
  909. },
  910. )
  911. }
  912. // GetPodByName retrieves a single instance of pod with given name
  913. func (a *Agent) GetPodByName(name string, namespace string) (*v1.Pod, error) {
  914. // Get pod by name
  915. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  916. context.TODO(),
  917. name,
  918. metav1.GetOptions{},
  919. )
  920. if err != nil && errors.IsNotFound(err) {
  921. return nil, IsNotFoundError
  922. }
  923. if err != nil {
  924. return nil, err
  925. }
  926. return pod, nil
  927. }
  928. // DeletePod deletes a pod by name and namespace
  929. func (a *Agent) DeletePod(namespace string, name string) error {
  930. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  931. context.TODO(),
  932. name,
  933. metav1.DeleteOptions{},
  934. )
  935. if err != nil && errors.IsNotFound(err) {
  936. return IsNotFoundError
  937. }
  938. return err
  939. }
  940. // GetPodLogs streams real-time logs from a given pod.
  941. func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer string, rw *websocket.WebsocketSafeReadWriter) error {
  942. // get the pod to read in the list of contains
  943. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  944. context.Background(),
  945. name,
  946. metav1.GetOptions{},
  947. )
  948. if err != nil && errors.IsNotFound(err) {
  949. return IsNotFoundError
  950. } else if err != nil {
  951. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  952. }
  953. // see if container is ready and able to open a stream. If not, wait for container
  954. // to be ready.
  955. err, _ = a.waitForPod(pod)
  956. if err != nil && goerrors.Is(err, IsNotFoundError) {
  957. return IsNotFoundError
  958. } else if err != nil {
  959. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  960. }
  961. container := pod.Spec.Containers[0].Name
  962. if len(selectedContainer) > 0 {
  963. container = selectedContainer
  964. }
  965. tails := int64(400)
  966. // follow logs
  967. podLogOpts := v1.PodLogOptions{
  968. Follow: true,
  969. TailLines: &tails,
  970. Container: container,
  971. }
  972. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  973. podLogs, err := req.Stream(context.TODO())
  974. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  975. // we'd like to pass this through to the client.
  976. if err != nil && errors.IsBadRequest(err) {
  977. return &BadRequestError{err.Error()}
  978. } else if err != nil {
  979. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  980. }
  981. r := bufio.NewReader(podLogs)
  982. errorchan := make(chan error)
  983. var wg sync.WaitGroup
  984. var once sync.Once
  985. wg.Add(2)
  986. go func() {
  987. wg.Wait()
  988. close(errorchan)
  989. }()
  990. go func() {
  991. defer func() {
  992. if r := recover(); r != nil {
  993. // TODO: add method to alert on panic
  994. return
  995. }
  996. }()
  997. // listens for websocket closing handshake
  998. defer wg.Done()
  999. for {
  1000. if _, _, err := rw.ReadMessage(); err != nil {
  1001. errorchan <- nil
  1002. return
  1003. }
  1004. }
  1005. }()
  1006. go func() {
  1007. defer func() {
  1008. if r := recover(); r != nil {
  1009. // TODO: add method to alert on panic
  1010. return
  1011. }
  1012. }()
  1013. defer wg.Done()
  1014. for {
  1015. bytes, err := r.ReadBytes('\n')
  1016. if err != nil {
  1017. errorchan <- err
  1018. return
  1019. }
  1020. if _, writeErr := rw.Write(bytes); writeErr != nil {
  1021. errorchan <- writeErr
  1022. return
  1023. }
  1024. }
  1025. }()
  1026. for err = range errorchan {
  1027. // only call these methods a single time
  1028. once.Do(func() {
  1029. rw.Close()
  1030. podLogs.Close()
  1031. })
  1032. }
  1033. return err
  1034. }
  1035. // GetPodLogs streams real-time logs from a given pod.
  1036. func (a *Agent) GetPreviousPodLogs(namespace string, name string, selectedContainer string) ([]string, error) {
  1037. // get the pod to read in the list of contains
  1038. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  1039. context.Background(),
  1040. name,
  1041. metav1.GetOptions{},
  1042. )
  1043. if err != nil && errors.IsNotFound(err) {
  1044. return nil, IsNotFoundError
  1045. } else if err != nil {
  1046. return nil, fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  1047. }
  1048. container := pod.Spec.Containers[0].Name
  1049. if len(selectedContainer) > 0 {
  1050. container = selectedContainer
  1051. }
  1052. tails := int64(400)
  1053. // follow logs
  1054. podLogOpts := v1.PodLogOptions{
  1055. Follow: true,
  1056. TailLines: &tails,
  1057. Container: container,
  1058. Previous: true,
  1059. }
  1060. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  1061. podLogs, err := req.Stream(context.TODO())
  1062. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  1063. // we'd like to pass this through to the client.
  1064. if err != nil && strings.Contains(err.Error(), "not found") {
  1065. return nil, IsNotFoundError
  1066. }
  1067. if err != nil && errors.IsBadRequest(err) {
  1068. return nil, &BadRequestError{err.Error()}
  1069. } else if err != nil {
  1070. return nil, fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  1071. }
  1072. defer podLogs.Close()
  1073. r := bufio.NewReader(podLogs)
  1074. var logs []string
  1075. for {
  1076. line, err := r.ReadString('\n')
  1077. logs = append(logs, line)
  1078. if err == io.EOF {
  1079. break
  1080. } else if err != nil {
  1081. return nil, err
  1082. }
  1083. }
  1084. return logs, nil
  1085. }
  1086. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  1087. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  1088. jobPods, err := a.GetJobPods(namespace, name)
  1089. if err != nil {
  1090. return err
  1091. }
  1092. podName := jobPods[0].ObjectMeta.Name
  1093. restConf, err := a.RESTClientGetter.ToRESTConfig()
  1094. restConf.GroupVersion = &schema.GroupVersion{
  1095. Group: "api",
  1096. Version: "v1",
  1097. }
  1098. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  1099. restClient, err := rest.RESTClientFor(restConf)
  1100. if err != nil {
  1101. return err
  1102. }
  1103. req := restClient.Post().
  1104. Resource("pods").
  1105. Name(podName).
  1106. Namespace(namespace).
  1107. SubResource("exec")
  1108. req.Param("command", "./signal.sh")
  1109. req.Param("container", "sidecar")
  1110. req.Param("stdin", "true")
  1111. req.Param("stdout", "false")
  1112. req.Param("tty", "false")
  1113. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  1114. if err != nil {
  1115. return err
  1116. }
  1117. return exec.Stream(remotecommand.StreamOptions{
  1118. Tty: false,
  1119. Stdin: strings.NewReader("./signal.sh"),
  1120. })
  1121. }
  1122. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  1123. // the task some number of times until failing
  1124. func (a *Agent) RunWebsocketTask(task func() error) error {
  1125. lastTime := int64(0)
  1126. for {
  1127. if err := a.UpdateClientset(); err != nil {
  1128. return err
  1129. }
  1130. err := task()
  1131. if err == nil {
  1132. return nil
  1133. }
  1134. if !goerrors.Is(err, &AuthError{}) {
  1135. return err
  1136. }
  1137. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  1138. return err
  1139. }
  1140. lastTime = time.Now().Unix()
  1141. }
  1142. }
  1143. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  1144. // TODO: Support Jobs
  1145. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1146. run := func() error {
  1147. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  1148. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  1149. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1150. options.LabelSelector = selectors
  1151. }
  1152. factory := informers.NewSharedInformerFactoryWithOptions(
  1153. a.Clientset,
  1154. 0,
  1155. informers.WithTweakListOptions(tweakListOptionsFunc),
  1156. )
  1157. var informer cache.SharedInformer
  1158. // Spins up an informer depending on kind. Convert to lowercase for robustness
  1159. switch strings.ToLower(kind) {
  1160. case "deployment":
  1161. informer = factory.Apps().V1().Deployments().Informer()
  1162. case "statefulset":
  1163. informer = factory.Apps().V1().StatefulSets().Informer()
  1164. case "replicaset":
  1165. informer = factory.Apps().V1().ReplicaSets().Informer()
  1166. case "daemonset":
  1167. informer = factory.Apps().V1().DaemonSets().Informer()
  1168. case "job":
  1169. informer = factory.Batch().V1().Jobs().Informer()
  1170. case "cronjob":
  1171. informer = factory.Batch().V1beta1().CronJobs().Informer()
  1172. case "namespace":
  1173. informer = factory.Core().V1().Namespaces().Informer()
  1174. case "pod":
  1175. informer = factory.Core().V1().Pods().Informer()
  1176. }
  1177. stopper := make(chan struct{})
  1178. errorchan := make(chan error)
  1179. var wg sync.WaitGroup
  1180. var once sync.Once
  1181. var err error
  1182. wg.Add(2)
  1183. go func() {
  1184. wg.Wait()
  1185. close(errorchan)
  1186. }()
  1187. go func() {
  1188. defer func() {
  1189. if r := recover(); r != nil {
  1190. // TODO: add method to alert on panic
  1191. return
  1192. }
  1193. }()
  1194. // listens for websocket closing handshake
  1195. defer wg.Done()
  1196. for {
  1197. if _, _, err := rw.ReadMessage(); err != nil {
  1198. errorchan <- nil
  1199. return
  1200. }
  1201. }
  1202. }()
  1203. go func() {
  1204. defer func() {
  1205. if r := recover(); r != nil {
  1206. // TODO: add method to alert on panic
  1207. return
  1208. }
  1209. }()
  1210. // listens for websocket closing handshake
  1211. defer wg.Done()
  1212. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1213. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1214. errorchan <- &AuthError{}
  1215. }
  1216. })
  1217. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1218. UpdateFunc: func(oldObj, newObj interface{}) {
  1219. msg := Message{
  1220. EventType: "UPDATE",
  1221. Object: newObj,
  1222. Kind: strings.ToLower(kind),
  1223. }
  1224. err := rw.WriteJSON(msg)
  1225. if err != nil {
  1226. errorchan <- err
  1227. }
  1228. },
  1229. AddFunc: func(obj interface{}) {
  1230. msg := Message{
  1231. EventType: "ADD",
  1232. Object: obj,
  1233. Kind: strings.ToLower(kind),
  1234. }
  1235. err := rw.WriteJSON(msg)
  1236. if err != nil {
  1237. errorchan <- err
  1238. }
  1239. },
  1240. DeleteFunc: func(obj interface{}) {
  1241. msg := Message{
  1242. EventType: "DELETE",
  1243. Object: obj,
  1244. Kind: strings.ToLower(kind),
  1245. }
  1246. err := rw.WriteJSON(msg)
  1247. if err != nil {
  1248. errorchan <- err
  1249. }
  1250. },
  1251. })
  1252. informer.Run(stopper)
  1253. }()
  1254. for err = range errorchan {
  1255. once.Do(func() {
  1256. close(stopper)
  1257. rw.Close()
  1258. })
  1259. }
  1260. return err
  1261. }
  1262. return a.RunWebsocketTask(run)
  1263. }
  1264. var b64 = base64.StdEncoding
  1265. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  1266. func decodeRelease(data string) (*rspb.Release, error) {
  1267. // base64 decode string
  1268. b, err := b64.DecodeString(data)
  1269. if err != nil {
  1270. return nil, err
  1271. }
  1272. // For backwards compatibility with releases that were stored before
  1273. // compression was introduced we skip decompression if the
  1274. // gzip magic header is not found
  1275. if bytes.Equal(b[0:3], magicGzip) {
  1276. r, err := gzip.NewReader(bytes.NewReader(b))
  1277. if err != nil {
  1278. return nil, err
  1279. }
  1280. defer r.Close()
  1281. b2, err := ioutil.ReadAll(r)
  1282. if err != nil {
  1283. return nil, err
  1284. }
  1285. b = b2
  1286. }
  1287. var rls rspb.Release
  1288. // unmarshal release object bytes
  1289. if err := json.Unmarshal(b, &rls); err != nil {
  1290. return nil, err
  1291. }
  1292. return &rls, nil
  1293. }
  1294. func contains(s []string, str string) bool {
  1295. for _, v := range s {
  1296. if v == str {
  1297. return true
  1298. }
  1299. }
  1300. return false
  1301. }
  1302. func ParseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  1303. if secret.Type != "helm.sh/release.v1" {
  1304. return nil, true, nil
  1305. }
  1306. releaseData, ok := secret.Data["release"]
  1307. if !ok {
  1308. return nil, true, fmt.Errorf("release field not found")
  1309. }
  1310. helm_object, err := decodeRelease(string(releaseData))
  1311. if err != nil {
  1312. return nil, true, err
  1313. }
  1314. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  1315. return nil, true, nil
  1316. }
  1317. return helm_object, false, nil
  1318. }
  1319. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1320. run := func() error {
  1321. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1322. options.LabelSelector = selectors
  1323. }
  1324. factory := informers.NewSharedInformerFactoryWithOptions(
  1325. a.Clientset,
  1326. 0,
  1327. informers.WithTweakListOptions(tweakListOptionsFunc),
  1328. informers.WithNamespace(namespace),
  1329. )
  1330. informer := factory.Core().V1().Secrets().Informer()
  1331. stopper := make(chan struct{})
  1332. errorchan := make(chan error)
  1333. var wg sync.WaitGroup
  1334. var once sync.Once
  1335. var err error
  1336. wg.Add(2)
  1337. go func() {
  1338. wg.Wait()
  1339. close(errorchan)
  1340. }()
  1341. go func() {
  1342. defer func() {
  1343. if r := recover(); r != nil {
  1344. // TODO: add method to alert on panic
  1345. return
  1346. }
  1347. }()
  1348. // listens for websocket closing handshake
  1349. defer wg.Done()
  1350. for {
  1351. if _, _, err := rw.ReadMessage(); err != nil {
  1352. errorchan <- nil
  1353. return
  1354. }
  1355. }
  1356. }()
  1357. go func() {
  1358. defer func() {
  1359. if r := recover(); r != nil {
  1360. // TODO: add method to alert on panic
  1361. return
  1362. }
  1363. }()
  1364. // listens for websocket closing handshake
  1365. defer wg.Done()
  1366. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1367. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1368. errorchan <- &AuthError{}
  1369. }
  1370. })
  1371. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1372. UpdateFunc: func(oldObj, newObj interface{}) {
  1373. secretObj, ok := newObj.(*v1.Secret)
  1374. if !ok {
  1375. errorchan <- fmt.Errorf("could not cast to secret")
  1376. return
  1377. }
  1378. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1379. if isNotHelmRelease && err == nil {
  1380. return
  1381. }
  1382. if err != nil {
  1383. errorchan <- err
  1384. return
  1385. }
  1386. msg := Message{
  1387. EventType: "UPDATE",
  1388. Object: helm_object,
  1389. }
  1390. rw.WriteJSON(msg)
  1391. },
  1392. AddFunc: func(obj interface{}) {
  1393. secretObj, ok := obj.(*v1.Secret)
  1394. if !ok {
  1395. errorchan <- fmt.Errorf("could not cast to secret")
  1396. return
  1397. }
  1398. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1399. if isNotHelmRelease && err == nil {
  1400. return
  1401. }
  1402. if err != nil {
  1403. errorchan <- err
  1404. return
  1405. }
  1406. msg := Message{
  1407. EventType: "ADD",
  1408. Object: helm_object,
  1409. }
  1410. rw.WriteJSON(msg)
  1411. },
  1412. DeleteFunc: func(obj interface{}) {
  1413. secretObj, ok := obj.(*v1.Secret)
  1414. if !ok {
  1415. errorchan <- fmt.Errorf("could not cast to secret")
  1416. return
  1417. }
  1418. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1419. if isNotHelmRelease && err == nil {
  1420. return
  1421. }
  1422. if err != nil {
  1423. errorchan <- err
  1424. return
  1425. }
  1426. msg := Message{
  1427. EventType: "DELETE",
  1428. Object: helm_object,
  1429. }
  1430. rw.WriteJSON(msg)
  1431. },
  1432. })
  1433. informer.Run(stopper)
  1434. }()
  1435. for err = range errorchan {
  1436. once.Do(func() {
  1437. close(stopper)
  1438. rw.Close()
  1439. })
  1440. }
  1441. return err
  1442. }
  1443. return a.RunWebsocketTask(run)
  1444. }
  1445. func (a *Agent) StreamPorterAgentLokiLog(
  1446. labels []string,
  1447. startTime string,
  1448. searchParam string,
  1449. limit uint32,
  1450. rw *websocket.WebsocketSafeReadWriter,
  1451. ) error {
  1452. run := func() error {
  1453. errorchan := make(chan error)
  1454. var wg sync.WaitGroup
  1455. var once sync.Once
  1456. var err error
  1457. wg.Add(2)
  1458. go func() {
  1459. wg.Wait()
  1460. close(errorchan)
  1461. }()
  1462. go func() {
  1463. defer func() {
  1464. if r := recover(); r != nil {
  1465. // TODO: add method to alert on panic
  1466. return
  1467. }
  1468. }()
  1469. defer wg.Done()
  1470. // listens for websocket closing handshake
  1471. for {
  1472. if _, _, err := rw.ReadMessage(); err != nil {
  1473. errorchan <- nil
  1474. return
  1475. }
  1476. }
  1477. }()
  1478. go func() {
  1479. defer func() {
  1480. if r := recover(); r != nil {
  1481. // TODO: add method to alert on panic
  1482. return
  1483. }
  1484. }()
  1485. defer wg.Done()
  1486. podList, err := a.Clientset.CoreV1().Pods("porter-agent-system").List(context.Background(), metav1.ListOptions{
  1487. LabelSelector: "control-plane=controller-manager",
  1488. })
  1489. if err != nil {
  1490. errorchan <- err
  1491. return
  1492. }
  1493. if len(podList.Items) == 0 {
  1494. errorchan <- fmt.Errorf("no porter agent pods found")
  1495. return
  1496. }
  1497. pod := podList.Items[0]
  1498. restConf, err := a.RESTClientGetter.ToRESTConfig()
  1499. if err != nil {
  1500. errorchan <- err
  1501. return
  1502. }
  1503. req := a.Clientset.CoreV1().RESTClient().
  1504. Post().
  1505. Resource("pods").
  1506. Name(pod.Name).
  1507. Namespace(pod.Namespace).
  1508. SubResource("exec")
  1509. cmd := []string{
  1510. "/porter/agent-cli",
  1511. "--start",
  1512. startTime,
  1513. }
  1514. for _, label := range labels {
  1515. cmd = append(cmd, "--label", label)
  1516. }
  1517. if searchParam != "" {
  1518. cmd = append(cmd, "--search", searchParam)
  1519. }
  1520. if limit > 0 {
  1521. cmd = append(cmd, "--limit", fmt.Sprintf("%d", limit))
  1522. }
  1523. opts := &v1.PodExecOptions{
  1524. Command: cmd,
  1525. Stdout: true,
  1526. Stderr: true,
  1527. }
  1528. req.VersionedParams(
  1529. opts,
  1530. scheme.ParameterCodec,
  1531. )
  1532. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  1533. if err != nil {
  1534. errorchan <- err
  1535. return
  1536. }
  1537. err = exec.Stream(remotecommand.StreamOptions{
  1538. Stdin: nil,
  1539. Stdout: rw,
  1540. Stderr: rw,
  1541. })
  1542. if err != nil {
  1543. errorchan <- err
  1544. return
  1545. }
  1546. }()
  1547. for err = range errorchan {
  1548. once.Do(func() {
  1549. rw.Close()
  1550. })
  1551. }
  1552. return err
  1553. }
  1554. return a.RunWebsocketTask(run)
  1555. }
  1556. // CreateImagePullSecrets will create the required image pull secrets and
  1557. // return a map from the registry name to the name of the secret.
  1558. func (a *Agent) CreateImagePullSecrets(
  1559. repo repository.Repository,
  1560. namespace string,
  1561. linkedRegs map[string]*models.Registry,
  1562. doAuth *oauth2.Config,
  1563. ) (map[string]string, error) {
  1564. res := make(map[string]string)
  1565. for key, val := range linkedRegs {
  1566. _reg := registry.Registry(*val)
  1567. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1568. if err != nil {
  1569. return nil, err
  1570. }
  1571. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1572. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1573. context.TODO(),
  1574. secretName,
  1575. metav1.GetOptions{},
  1576. )
  1577. // if not found, create the secret
  1578. if err != nil && errors.IsNotFound(err) {
  1579. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1580. context.TODO(),
  1581. &v1.Secret{
  1582. ObjectMeta: metav1.ObjectMeta{
  1583. Name: secretName,
  1584. },
  1585. Data: map[string][]byte{
  1586. string(v1.DockerConfigJsonKey): data,
  1587. },
  1588. Type: v1.SecretTypeDockerConfigJson,
  1589. },
  1590. metav1.CreateOptions{},
  1591. )
  1592. if err != nil {
  1593. return nil, err
  1594. }
  1595. // add secret name to the map
  1596. res[key] = secretName
  1597. continue
  1598. } else if err != nil {
  1599. return nil, err
  1600. }
  1601. // otherwise, check that the secret contains the correct data: if
  1602. // if doesn't, update it
  1603. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1604. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1605. context.TODO(),
  1606. &v1.Secret{
  1607. ObjectMeta: metav1.ObjectMeta{
  1608. Name: secretName,
  1609. },
  1610. Data: map[string][]byte{
  1611. string(v1.DockerConfigJsonKey): data,
  1612. },
  1613. Type: v1.SecretTypeDockerConfigJson,
  1614. },
  1615. metav1.UpdateOptions{},
  1616. )
  1617. if err != nil {
  1618. return nil, err
  1619. }
  1620. }
  1621. // add secret name to the map
  1622. res[key] = secretName
  1623. }
  1624. return res, nil
  1625. }
  1626. // helper that waits for pod to be ready
  1627. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1628. var (
  1629. w watch.Interface
  1630. err error
  1631. ok bool
  1632. )
  1633. // immediately after creating a pod, the API may return a 404. heuristically 1
  1634. // second seems to be plenty.
  1635. watchRetries := 3
  1636. for i := 0; i < watchRetries; i++ {
  1637. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1638. w, err = a.Clientset.CoreV1().
  1639. Pods(pod.Namespace).
  1640. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1641. if err == nil {
  1642. break
  1643. }
  1644. time.Sleep(time.Second)
  1645. }
  1646. if err != nil {
  1647. return err, false
  1648. }
  1649. defer w.Stop()
  1650. expireTime := time.Now().Add(time.Second * 30)
  1651. for time.Now().Unix() <= expireTime.Unix() {
  1652. select {
  1653. case <-time.NewTicker(time.Second).C:
  1654. // poll every second in case we already missed the ready event while
  1655. // creating the listener.
  1656. pod, err = a.Clientset.CoreV1().
  1657. Pods(pod.Namespace).
  1658. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1659. if err != nil && errors.IsNotFound(err) {
  1660. return IsNotFoundError, false
  1661. } else if err != nil {
  1662. return err, false
  1663. }
  1664. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1665. return nil, isExited
  1666. }
  1667. case evt := <-w.ResultChan():
  1668. pod, ok = evt.Object.(*v1.Pod)
  1669. if !ok {
  1670. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1671. }
  1672. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1673. return nil, isExited
  1674. }
  1675. }
  1676. }
  1677. return goerrors.New("timed out waiting for pod"), false
  1678. }
  1679. func isPodReady(pod *v1.Pod) bool {
  1680. ready := false
  1681. conditions := pod.Status.Conditions
  1682. for i := range conditions {
  1683. if conditions[i].Type == v1.PodReady {
  1684. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1685. }
  1686. }
  1687. return ready
  1688. }
  1689. func isPodExited(pod *v1.Pod) bool {
  1690. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1691. }