agent.go 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. goerrors "errors"
  17. "github.com/porter-dev/porter/api/server/shared/websocket"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/registry"
  20. "github.com/porter-dev/porter/internal/repository"
  21. "golang.org/x/oauth2"
  22. "github.com/porter-dev/porter/internal/helm/grapher"
  23. appsv1 "k8s.io/api/apps/v1"
  24. batchv1 "k8s.io/api/batch/v1"
  25. v1 "k8s.io/api/core/v1"
  26. v1beta1 "k8s.io/api/extensions/v1beta1"
  27. netv1 "k8s.io/api/networking/v1"
  28. netv1beta1 "k8s.io/api/networking/v1beta1"
  29. "k8s.io/apimachinery/pkg/api/errors"
  30. "k8s.io/apimachinery/pkg/api/resource"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/fields"
  33. "k8s.io/apimachinery/pkg/runtime"
  34. "k8s.io/apimachinery/pkg/runtime/schema"
  35. "k8s.io/apimachinery/pkg/types"
  36. "k8s.io/apimachinery/pkg/watch"
  37. "k8s.io/cli-runtime/pkg/genericclioptions"
  38. "k8s.io/client-go/informers"
  39. "k8s.io/client-go/kubernetes"
  40. "k8s.io/client-go/rest"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/tools/remotecommand"
  43. "k8s.io/kubectl/pkg/scheme"
  44. rspb "github.com/stefanmcshane/helm/pkg/release"
  45. istiov1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
  46. versionedclient "istio.io/client-go/pkg/clientset/versioned"
  47. )
  48. // Agent is a Kubernetes agent for performing operations that interact with the
  49. // api server. Do not create this struct directly, use NewKubernetesAgent instead.
  50. type Agent struct {
  51. RESTClientGetter genericclioptions.RESTClientGetter
  52. Clientset kubernetes.Interface
  53. // context is used here as a workaround since RESTClientGetter and kubernetes.Interface do not support contexts
  54. context context.Context
  55. }
  56. type Message struct {
  57. EventType string `json:"event_type"`
  58. Object interface{}
  59. Kind string
  60. }
  61. type ListOptions struct {
  62. FieldSelector string
  63. }
  64. type AuthError struct{}
  65. func (e *AuthError) Error() string {
  66. return "Unauthorized error"
  67. }
  68. // NewKubernetesAgent creates a new agent for accessing kubernetes on a cluster
  69. func NewKubernetesAgent(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, clientset kubernetes.Interface) Agent {
  70. return Agent{
  71. RESTClientGetter: restClientGetter,
  72. Clientset: clientset,
  73. context: ctx,
  74. }
  75. }
  76. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  77. func (a *Agent) UpdateClientset() error {
  78. restConf, err := a.RESTClientGetter.ToRESTConfig()
  79. if err != nil {
  80. return err
  81. }
  82. clientset, err := kubernetes.NewForConfig(restConf)
  83. if err != nil {
  84. return err
  85. }
  86. a.Clientset = clientset
  87. return nil
  88. }
  89. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  90. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  91. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  92. context.TODO(),
  93. &v1.ConfigMap{
  94. ObjectMeta: metav1.ObjectMeta{
  95. Name: name,
  96. Namespace: namespace,
  97. Labels: map[string]string{
  98. "porter": "true",
  99. },
  100. },
  101. Data: configMap,
  102. },
  103. metav1.CreateOptions{},
  104. )
  105. }
  106. func (a *Agent) CreateVersionedConfigMap(name, namespace string, version uint, configMap map[string]string, apps ...string) (*v1.ConfigMap, error) {
  107. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  108. context.TODO(),
  109. &v1.ConfigMap{
  110. ObjectMeta: metav1.ObjectMeta{
  111. Name: fmt.Sprintf("%s.v%d", name, version),
  112. Namespace: namespace,
  113. Labels: map[string]string{
  114. "owner": "porter",
  115. "envgroup": name,
  116. "version": fmt.Sprintf("%d", version),
  117. },
  118. Annotations: map[string]string{
  119. PorterAppAnnotationName: strings.Join(apps, ","),
  120. },
  121. },
  122. Data: configMap,
  123. },
  124. metav1.CreateOptions{},
  125. )
  126. }
  127. const PorterAppAnnotationName = "porter.run/apps"
  128. func (a *Agent) AddApplicationToVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  129. annons := cm.Annotations
  130. if annons == nil {
  131. annons = make(map[string]string)
  132. }
  133. appAnnon, appAnnonExists := annons[PorterAppAnnotationName]
  134. if !appAnnonExists || appAnnon == "" {
  135. annons[PorterAppAnnotationName] = appName
  136. } else {
  137. appStrArr := strings.Split(appAnnon, ",")
  138. foundApp := false
  139. for _, appStr := range appStrArr {
  140. if appStr == appName {
  141. foundApp = true
  142. }
  143. }
  144. if !foundApp {
  145. annons[PorterAppAnnotationName] = fmt.Sprintf("%s,%s", appAnnon, appName)
  146. }
  147. }
  148. cm.SetAnnotations(annons)
  149. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  150. context.TODO(),
  151. cm,
  152. metav1.UpdateOptions{},
  153. )
  154. }
  155. func (a *Agent) RemoveApplicationFromVersionedConfigMap(cm *v1.ConfigMap, appName string) (*v1.ConfigMap, error) {
  156. annons := cm.Annotations
  157. if annons == nil {
  158. annons = make(map[string]string)
  159. }
  160. appAnn, appAnnExists := annons[PorterAppAnnotationName]
  161. if !appAnnExists {
  162. return nil, IsNotFoundError
  163. }
  164. appStrArr := strings.Split(appAnn, ",")
  165. newStrArr := make([]string, 0)
  166. for _, appStr := range appStrArr {
  167. if appStr != appName {
  168. newStrArr = append(newStrArr, appStr)
  169. }
  170. }
  171. annons[PorterAppAnnotationName] = strings.Join(newStrArr, ",")
  172. cm.SetAnnotations(annons)
  173. return a.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(
  174. context.TODO(),
  175. cm,
  176. metav1.UpdateOptions{},
  177. )
  178. }
  179. func (a *Agent) CreateLinkedVersionedSecret(name, namespace, cmName string, version uint, data map[string][]byte) (*v1.Secret, error) {
  180. return a.Clientset.CoreV1().Secrets(namespace).Create(
  181. context.TODO(),
  182. &v1.Secret{
  183. ObjectMeta: metav1.ObjectMeta{
  184. Name: fmt.Sprintf("%s.v%d", name, version),
  185. Namespace: namespace,
  186. Labels: map[string]string{
  187. "owner": "porter",
  188. "envgroup": name,
  189. "version": fmt.Sprintf("%d", version),
  190. "configmap": cmName,
  191. },
  192. },
  193. Data: data,
  194. },
  195. metav1.CreateOptions{},
  196. )
  197. }
  198. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  199. // base64 encoded
  200. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  201. return a.Clientset.CoreV1().Secrets(namespace).Create(
  202. context.TODO(),
  203. &v1.Secret{
  204. ObjectMeta: metav1.ObjectMeta{
  205. Name: name,
  206. Namespace: namespace,
  207. Labels: map[string]string{
  208. "porter": "true",
  209. "configmap": cmName,
  210. },
  211. },
  212. Data: data,
  213. },
  214. metav1.CreateOptions{},
  215. )
  216. }
  217. type mergeConfigMapData struct {
  218. Data map[string]*string `json:"data"`
  219. }
  220. // UpdateConfigMap updates the configmap given its name and namespace
  221. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  222. cmData := make(map[string]*string)
  223. for key, val := range configMap {
  224. valCopy := val
  225. cmData[key] = &valCopy
  226. if len(val) == 0 {
  227. cmData[key] = nil
  228. }
  229. }
  230. mergeCM := &mergeConfigMapData{
  231. Data: cmData,
  232. }
  233. patchBytes, err := json.Marshal(mergeCM)
  234. if err != nil {
  235. return nil, err
  236. }
  237. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  238. context.Background(),
  239. name,
  240. types.MergePatchType,
  241. patchBytes,
  242. metav1.PatchOptions{},
  243. )
  244. }
  245. type mergeLinkedSecretData struct {
  246. Data map[string]*[]byte `json:"data"`
  247. }
  248. // UpdateLinkedSecret updates the secret given its name and namespace
  249. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  250. secretData := make(map[string]*[]byte)
  251. for key, val := range data {
  252. valCopy := val
  253. secretData[key] = &valCopy
  254. if len(val) == 0 {
  255. secretData[key] = nil
  256. }
  257. }
  258. mergeSecret := &mergeLinkedSecretData{
  259. Data: secretData,
  260. }
  261. patchBytes, err := json.Marshal(mergeSecret)
  262. if err != nil {
  263. return err
  264. }
  265. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  266. context.TODO(),
  267. name,
  268. types.MergePatchType,
  269. patchBytes,
  270. metav1.PatchOptions{},
  271. )
  272. return err
  273. }
  274. // DeleteConfigMap deletes the configmap given its name and namespace
  275. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  276. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  277. context.TODO(),
  278. name,
  279. metav1.DeleteOptions{},
  280. )
  281. }
  282. // DeleteLinkedSecret deletes the secret given its name and namespace
  283. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  284. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  285. context.TODO(),
  286. name,
  287. metav1.DeleteOptions{},
  288. )
  289. }
  290. func (a *Agent) ListVersionedConfigMaps(name string, namespace string) ([]v1.ConfigMap, error) {
  291. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  292. context.Background(),
  293. metav1.ListOptions{
  294. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  295. },
  296. )
  297. if err != nil {
  298. return nil, err
  299. }
  300. return listResp.Items, nil
  301. }
  302. func (a *Agent) DeleteVersionedConfigMap(name string, namespace string) error {
  303. return a.Clientset.CoreV1().ConfigMaps(namespace).DeleteCollection(
  304. context.Background(),
  305. metav1.DeleteOptions{},
  306. metav1.ListOptions{
  307. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  308. },
  309. )
  310. }
  311. func (a *Agent) DeleteVersionedSecret(name string, namespace string) error {
  312. return a.Clientset.CoreV1().Secrets(namespace).DeleteCollection(
  313. context.Background(),
  314. metav1.DeleteOptions{},
  315. metav1.ListOptions{
  316. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  317. },
  318. )
  319. }
  320. func (a *Agent) ListAllVersionedConfigMaps(namespace string) ([]v1.ConfigMap, error) {
  321. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  322. context.Background(),
  323. metav1.ListOptions{
  324. LabelSelector: fmt.Sprintf("envgroup"),
  325. },
  326. )
  327. if err != nil {
  328. return nil, err
  329. }
  330. // only keep the latest version for each configmap
  331. latestMap := make(map[string]v1.ConfigMap)
  332. for _, configmap := range listResp.Items {
  333. egName, egNameExists := configmap.Labels["envgroup"]
  334. if !egNameExists {
  335. continue
  336. }
  337. id := fmt.Sprintf("%s/%s", configmap.Namespace, egName)
  338. if currLatest, exists := latestMap[id]; exists {
  339. // get version
  340. currVersionStr, currVersionExists := currLatest.Labels["version"]
  341. versionStr, versionExists := configmap.Labels["version"]
  342. if versionExists && currVersionExists {
  343. currVersion, currErr := strconv.Atoi(currVersionStr)
  344. version, err := strconv.Atoi(versionStr)
  345. if currErr == nil && err == nil && currVersion < version {
  346. latestMap[id] = configmap
  347. }
  348. }
  349. } else {
  350. latestMap[id] = configmap
  351. }
  352. }
  353. res := make([]v1.ConfigMap, 0)
  354. for _, cm := range latestMap {
  355. res = append(res, cm)
  356. }
  357. return res, nil
  358. }
  359. // GetConfigMap retrieves the configmap given its name and namespace
  360. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  361. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  362. context.TODO(),
  363. name,
  364. metav1.GetOptions{},
  365. )
  366. }
  367. func (a *Agent) GetVersionedConfigMap(name, namespace string, version uint) (*v1.ConfigMap, error) {
  368. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  369. context.Background(),
  370. metav1.ListOptions{
  371. LabelSelector: fmt.Sprintf("envgroup=%s,version=%d", name, version),
  372. },
  373. )
  374. if err != nil {
  375. return nil, err
  376. }
  377. if listResp.Items == nil || len(listResp.Items) == 0 {
  378. return nil, IsNotFoundError
  379. }
  380. // if the length of the list is greater than 1, return an error -- this shouldn't happen
  381. if len(listResp.Items) > 1 {
  382. return nil, fmt.Errorf("multiple configmaps found while searching for %s/%s and version %d", namespace, name, version)
  383. }
  384. return &listResp.Items[0], nil
  385. }
  386. func (a *Agent) GetLatestVersionedConfigMap(name, namespace string) (*v1.ConfigMap, uint, error) {
  387. listResp, err := a.Clientset.CoreV1().ConfigMaps(namespace).List(
  388. context.Background(),
  389. metav1.ListOptions{
  390. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  391. },
  392. )
  393. if err != nil {
  394. return nil, 0, err
  395. }
  396. if listResp.Items == nil || len(listResp.Items) == 0 {
  397. return nil, 0, IsNotFoundError
  398. }
  399. // iterate through the configmaps and get the greatest version
  400. var res *v1.ConfigMap
  401. var latestVersion uint
  402. for _, configmap := range listResp.Items {
  403. if res == nil {
  404. versionStr, versionExists := configmap.Labels["version"]
  405. if !versionExists {
  406. continue
  407. }
  408. version, err := strconv.Atoi(versionStr)
  409. if err != nil {
  410. continue
  411. }
  412. latestV := configmap
  413. res = &latestV
  414. latestVersion = uint(version)
  415. } else {
  416. // get version
  417. versionStr, versionExists := configmap.Labels["version"]
  418. currVersionStr, currVersionExists := res.Labels["version"]
  419. if versionExists && currVersionExists {
  420. currVersion, currErr := strconv.Atoi(currVersionStr)
  421. version, err := strconv.Atoi(versionStr)
  422. if currErr == nil && err == nil && currVersion < version {
  423. latestV := configmap
  424. res = &latestV
  425. latestVersion = uint(version)
  426. }
  427. }
  428. }
  429. }
  430. if res == nil {
  431. return nil, 0, IsNotFoundError
  432. }
  433. return res, latestVersion, nil
  434. }
  435. func (a *Agent) GetLatestVersionedSecret(name, namespace string) (*v1.Secret, uint, error) {
  436. listResp, err := a.Clientset.CoreV1().Secrets(namespace).List(
  437. context.Background(),
  438. metav1.ListOptions{
  439. LabelSelector: fmt.Sprintf("envgroup=%s", name),
  440. },
  441. )
  442. if err != nil {
  443. return nil, 0, err
  444. }
  445. if listResp.Items == nil || len(listResp.Items) == 0 {
  446. return nil, 0, IsNotFoundError
  447. }
  448. // iterate through the configmaps and get the greatest version
  449. var res *v1.Secret
  450. var latestVersion uint
  451. for _, secret := range listResp.Items {
  452. if res == nil {
  453. versionStr, versionExists := secret.Labels["version"]
  454. if !versionExists {
  455. continue
  456. }
  457. version, err := strconv.Atoi(versionStr)
  458. if err != nil {
  459. continue
  460. }
  461. latestV := secret
  462. res = &latestV
  463. latestVersion = uint(version)
  464. } else {
  465. // get version
  466. versionStr, versionExists := secret.Labels["version"]
  467. currVersionStr, currVersionExists := res.Labels["version"]
  468. if versionExists && currVersionExists {
  469. currVersion, currErr := strconv.Atoi(currVersionStr)
  470. version, err := strconv.Atoi(versionStr)
  471. if currErr == nil && err == nil && currVersion < version {
  472. latestV := secret
  473. res = &latestV
  474. latestVersion = uint(version)
  475. }
  476. }
  477. }
  478. }
  479. if res == nil {
  480. return nil, 0, IsNotFoundError
  481. }
  482. return res, latestVersion, nil
  483. }
  484. // GetSecret retrieves the secret given its name and namespace
  485. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  486. return a.Clientset.CoreV1().Secrets(namespace).Get(
  487. context.TODO(),
  488. name,
  489. metav1.GetOptions{},
  490. )
  491. }
  492. // ListServices lists services in a namespace
  493. func (a *Agent) ListServices(ctx context.Context, namespace string, labelSelector string) (*v1.ServiceList, error) {
  494. return a.Clientset.CoreV1().Services(namespace).List(
  495. ctx,
  496. metav1.ListOptions{
  497. LabelSelector: labelSelector,
  498. },
  499. )
  500. }
  501. // CreateSecret creates the secret given its name and namespace
  502. func (a *Agent) CreateSecret(secret *v1.Secret, namespace string) (*v1.Secret, error) {
  503. _, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  504. context.TODO(),
  505. secret.Name,
  506. metav1.GetOptions{},
  507. )
  508. if err != nil {
  509. if !errors.IsNotFound(err) {
  510. return nil, err
  511. }
  512. return a.Clientset.CoreV1().Secrets(namespace).Create(
  513. context.TODO(),
  514. secret,
  515. metav1.CreateOptions{},
  516. )
  517. }
  518. return a.Clientset.CoreV1().Secrets(namespace).Update(
  519. context.TODO(),
  520. secret,
  521. metav1.UpdateOptions{},
  522. )
  523. }
  524. // ListConfigMaps simply lists namespaces
  525. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  526. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  527. context.TODO(),
  528. metav1.ListOptions{
  529. LabelSelector: "porter",
  530. },
  531. )
  532. }
  533. // ListEvents lists the events of a given object.
  534. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  535. return a.Clientset.CoreV1().Events(namespace).List(
  536. context.TODO(),
  537. metav1.ListOptions{
  538. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  539. },
  540. )
  541. }
  542. // ListNamespaces simply lists namespaces
  543. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  544. return a.Clientset.CoreV1().Namespaces().List(
  545. context.TODO(),
  546. metav1.ListOptions{},
  547. )
  548. }
  549. // CreateNamespace creates a namespace with the given name.
  550. func (a *Agent) CreateNamespace(name string, labels map[string]string) (*v1.Namespace, error) {
  551. // check if namespace exists
  552. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  553. context.TODO(),
  554. name,
  555. metav1.GetOptions{},
  556. )
  557. if err == nil && checkNS != nil {
  558. if checkNS.Status.Phase == v1.NamespaceTerminating {
  559. // edge case for when the same namespace was previously created
  560. // but was deleted and is currently in the "Terminating" phase
  561. // let us wait for a maximum of 10 seconds
  562. timeNow := time.Now().Add(10 * time.Second)
  563. stillTerminating := true
  564. for {
  565. _, err := a.Clientset.CoreV1().Namespaces().Get(
  566. context.TODO(),
  567. name,
  568. metav1.GetOptions{},
  569. )
  570. if err != nil && errors.IsNotFound(err) {
  571. stillTerminating = false
  572. break
  573. }
  574. time.Sleep(time.Second)
  575. if time.Now().After(timeNow) {
  576. break
  577. }
  578. }
  579. if stillTerminating {
  580. // the namespace has been in the "Terminating" phase
  581. return nil, fmt.Errorf("cannot create namespace %s, stuck in \"Terminating\" phase", name)
  582. }
  583. } else {
  584. return checkNS, nil
  585. }
  586. }
  587. namespace := &v1.Namespace{
  588. ObjectMeta: metav1.ObjectMeta{
  589. Name: name,
  590. },
  591. }
  592. if len(labels) > 0 {
  593. namespace.SetLabels(labels)
  594. }
  595. return a.Clientset.CoreV1().Namespaces().Create(
  596. context.TODO(),
  597. namespace,
  598. metav1.CreateOptions{},
  599. )
  600. }
  601. // GetNamespace gets the namespace given the name
  602. func (a *Agent) GetNamespace(name string) (*v1.Namespace, error) {
  603. ns, err := a.Clientset.CoreV1().Namespaces().Get(
  604. context.Background(),
  605. name,
  606. metav1.GetOptions{},
  607. )
  608. if err != nil {
  609. return nil, err
  610. }
  611. return ns, nil
  612. }
  613. // DeleteNamespace deletes the namespace given the name.
  614. func (a *Agent) DeleteNamespace(name string) error {
  615. // check if namespace exists
  616. checkNS, err := a.Clientset.CoreV1().Namespaces().Get(
  617. context.TODO(),
  618. name,
  619. metav1.GetOptions{},
  620. )
  621. // if the namespace is not found, don't return an error.
  622. if err != nil && errors.IsNotFound(err) {
  623. return nil
  624. }
  625. // if the namespace was found but is in the "Terminating" phase
  626. // we should ignore it and not return an error
  627. if checkNS != nil && checkNS.Status.Phase == v1.NamespaceTerminating {
  628. return nil
  629. }
  630. return a.Clientset.CoreV1().Namespaces().Delete(
  631. context.TODO(),
  632. name,
  633. metav1.DeleteOptions{},
  634. )
  635. }
  636. func (a *Agent) GetPorterAgent() (*appsv1.Deployment, error) {
  637. depl, err := a.Clientset.AppsV1().Deployments("porter-agent-system").Get(
  638. context.TODO(),
  639. "porter-agent-controller-manager",
  640. metav1.GetOptions{},
  641. )
  642. if err != nil && errors.IsNotFound(err) {
  643. return nil, IsNotFoundError
  644. }
  645. return depl, err
  646. }
  647. // ListJobsByLabel lists jobs in a namespace matching a label
  648. type Label struct {
  649. Key string
  650. Val string
  651. }
  652. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  653. selectors := make([]string, 0)
  654. for _, label := range labels {
  655. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  656. }
  657. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  658. context.TODO(),
  659. metav1.ListOptions{
  660. LabelSelector: strings.Join(selectors, ","),
  661. },
  662. )
  663. if err != nil {
  664. return nil, err
  665. }
  666. return resp.Items, nil
  667. }
  668. // StreamJobs streams a list of jobs to the websocket writer, closing the connection once all jobs have been sent
  669. func (a *Agent) StreamJobs(namespace string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  670. run := func() error {
  671. errorchan := make(chan error)
  672. ctx, cancel := context.WithCancel(context.Background())
  673. defer cancel()
  674. var wg sync.WaitGroup
  675. var once sync.Once
  676. var err error
  677. wg.Add(2)
  678. go func() {
  679. wg.Wait()
  680. close(errorchan)
  681. }()
  682. go func() {
  683. defer func() {
  684. if r := recover(); r != nil {
  685. // TODO: add method to alert on panic
  686. return
  687. }
  688. }()
  689. // listens for websocket closing handshake
  690. defer wg.Done()
  691. for {
  692. if _, _, err := rw.ReadMessage(); err != nil {
  693. errorchan <- nil
  694. return
  695. }
  696. }
  697. }()
  698. go func() {
  699. defer func() {
  700. if r := recover(); r != nil {
  701. // TODO: add method to alert on panic
  702. return
  703. }
  704. }()
  705. // listens for websocket closing handshake
  706. defer wg.Done()
  707. continueVal := ""
  708. for {
  709. if ctx.Err() != nil {
  710. errorchan <- nil
  711. return
  712. }
  713. labelSelector := "meta.helm.sh/release-name"
  714. if selectors != "" {
  715. labelSelector = selectors
  716. }
  717. jobs, err := a.Clientset.BatchV1().Jobs(namespace).List(
  718. ctx,
  719. metav1.ListOptions{
  720. Limit: 100,
  721. Continue: continueVal,
  722. LabelSelector: labelSelector,
  723. },
  724. )
  725. if err != nil {
  726. errorchan <- err
  727. return
  728. }
  729. for _, job := range jobs.Items {
  730. err := rw.WriteJSON(job)
  731. if err != nil {
  732. errorchan <- err
  733. return
  734. }
  735. }
  736. if jobs.Continue == "" {
  737. // we have reached the end of the list of jobs
  738. break
  739. } else {
  740. // start pagination
  741. continueVal = jobs.Continue
  742. }
  743. }
  744. // at this point, we can return the status finished
  745. err := rw.WriteJSON(map[string]interface{}{
  746. "streamStatus": "finished",
  747. })
  748. errorchan <- err
  749. }()
  750. for err = range errorchan {
  751. once.Do(func() {
  752. rw.Close()
  753. cancel()
  754. })
  755. }
  756. return err
  757. }
  758. return a.RunWebsocketTask(run)
  759. }
  760. // DeleteJob deletes the job in the given name and namespace.
  761. func (a *Agent) DeleteJob(name, namespace string) error {
  762. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  763. context.TODO(),
  764. name,
  765. metav1.DeleteOptions{},
  766. )
  767. }
  768. // GetJobPods lists all pods belonging to a job in a namespace
  769. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  770. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  771. context.TODO(),
  772. metav1.ListOptions{
  773. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  774. },
  775. )
  776. if err != nil {
  777. return nil, err
  778. }
  779. return resp.Items, nil
  780. }
  781. // GetIngress gets ingress given the name and namespace
  782. func (a *Agent) GetExtensionsV1Beta1Ingress(namespace string, name string) (*v1beta1.Ingress, error) {
  783. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  784. context.TODO(),
  785. name,
  786. metav1.GetOptions{},
  787. )
  788. if err != nil && errors.IsNotFound(err) {
  789. return nil, IsNotFoundError
  790. } else if err != nil {
  791. return nil, err
  792. }
  793. return resp, nil
  794. }
  795. func (a *Agent) GetNetworkingV1Ingress(namespace string, name string) (*netv1.Ingress, error) {
  796. resp, err := a.Clientset.NetworkingV1().Ingresses(namespace).Get(
  797. context.TODO(),
  798. name,
  799. metav1.GetOptions{},
  800. )
  801. if err != nil && errors.IsNotFound(err) {
  802. return nil, IsNotFoundError
  803. } else if err != nil {
  804. return nil, err
  805. }
  806. return resp, nil
  807. }
  808. func (a *Agent) GetNetworkingV1Beta1Ingress(namespace string, name string) (*netv1beta1.Ingress, error) {
  809. resp, err := a.Clientset.NetworkingV1beta1().Ingresses(namespace).Get(
  810. context.TODO(),
  811. name,
  812. metav1.GetOptions{},
  813. )
  814. if err != nil && errors.IsNotFound(err) {
  815. return nil, IsNotFoundError
  816. } else if err != nil {
  817. return nil, err
  818. }
  819. return resp, nil
  820. }
  821. func (a *Agent) GetIstioIngress(namespace, name string) (*istiov1beta1.Gateway, error) {
  822. restConf, err := a.RESTClientGetter.ToRESTConfig()
  823. if err != nil {
  824. return nil, err
  825. }
  826. clientset, err := versionedclient.NewForConfig(restConf)
  827. if err != nil {
  828. return nil, err
  829. }
  830. gateway, err := clientset.NetworkingV1beta1().Gateways(namespace).Get(
  831. context.Background(), name, metav1.GetOptions{},
  832. )
  833. if err != nil {
  834. return nil, err
  835. }
  836. return gateway, nil
  837. }
  838. var IsNotFoundError = fmt.Errorf("not found")
  839. type BadRequestError struct {
  840. msg string
  841. }
  842. func (e *BadRequestError) Error() string {
  843. return e.msg
  844. }
  845. // GetDeployment gets the deployment given the name and namespace
  846. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  847. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  848. context.TODO(),
  849. c.Name,
  850. metav1.GetOptions{},
  851. )
  852. if err != nil && errors.IsNotFound(err) {
  853. return nil, IsNotFoundError
  854. } else if err != nil {
  855. return nil, err
  856. }
  857. res.Kind = c.Kind
  858. return res, nil
  859. }
  860. // GetDeploymentsBySelector returns the deployments by label selector
  861. func (a *Agent) GetDeploymentsBySelector(ctx context.Context, namespace string, selector string) (*appsv1.DeploymentList, error) {
  862. res, err := a.Clientset.AppsV1().Deployments(namespace).List(
  863. ctx,
  864. metav1.ListOptions{
  865. LabelSelector: selector,
  866. },
  867. )
  868. if err != nil {
  869. return nil, err
  870. }
  871. return res, nil
  872. }
  873. // GetStatefulSet gets the statefulset given the name and namespace
  874. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  875. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  876. context.TODO(),
  877. c.Name,
  878. metav1.GetOptions{},
  879. )
  880. if err != nil && errors.IsNotFound(err) {
  881. return nil, IsNotFoundError
  882. } else if err != nil {
  883. return nil, err
  884. }
  885. res.Kind = c.Kind
  886. return res, nil
  887. }
  888. // GetReplicaSet gets the replicaset given the name and namespace
  889. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  890. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  891. context.TODO(),
  892. c.Name,
  893. metav1.GetOptions{},
  894. )
  895. if err != nil && errors.IsNotFound(err) {
  896. return nil, IsNotFoundError
  897. } else if err != nil {
  898. return nil, err
  899. }
  900. res.Kind = c.Kind
  901. return res, nil
  902. }
  903. // GetDaemonSet gets the daemonset by name and namespace
  904. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  905. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  906. context.TODO(),
  907. c.Name,
  908. metav1.GetOptions{},
  909. )
  910. if err != nil && errors.IsNotFound(err) {
  911. return nil, IsNotFoundError
  912. } else if err != nil {
  913. return nil, err
  914. }
  915. res.Kind = c.Kind
  916. return res, nil
  917. }
  918. // GetJob gets the job by name and namespace
  919. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  920. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  921. context.TODO(),
  922. c.Name,
  923. metav1.GetOptions{},
  924. )
  925. if err != nil && errors.IsNotFound(err) {
  926. return nil, IsNotFoundError
  927. } else if err != nil {
  928. return nil, err
  929. }
  930. res.Kind = c.Kind
  931. return res, nil
  932. }
  933. // GetCronJob gets the CronJob by name and namespace
  934. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1.CronJob, error) {
  935. res, err := a.Clientset.BatchV1().CronJobs(c.Namespace).Get(
  936. context.TODO(),
  937. c.Name,
  938. metav1.GetOptions{},
  939. )
  940. if err != nil && errors.IsNotFound(err) {
  941. return nil, IsNotFoundError
  942. } else if err != nil {
  943. return nil, err
  944. }
  945. res.Kind = c.Kind
  946. return res, nil
  947. }
  948. // GetPodsByLabel retrieves pods with matching labels
  949. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  950. // Search in all namespaces for matching pods
  951. return a.Clientset.CoreV1().Pods(namespace).List(
  952. context.TODO(),
  953. metav1.ListOptions{
  954. LabelSelector: selector,
  955. },
  956. )
  957. }
  958. // GetPodByName retrieves a single instance of pod with given name
  959. func (a *Agent) GetPodByName(name string, namespace string) (*v1.Pod, error) {
  960. // Get pod by name
  961. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  962. context.TODO(),
  963. name,
  964. metav1.GetOptions{},
  965. )
  966. if err != nil && errors.IsNotFound(err) {
  967. return nil, IsNotFoundError
  968. }
  969. if err != nil {
  970. return nil, err
  971. }
  972. return pod, nil
  973. }
  974. // DeletePod deletes a pod by name and namespace
  975. func (a *Agent) DeletePod(namespace string, name string) error {
  976. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  977. context.TODO(),
  978. name,
  979. metav1.DeleteOptions{},
  980. )
  981. if err != nil && errors.IsNotFound(err) {
  982. return IsNotFoundError
  983. }
  984. return err
  985. }
  986. // GetPodLogs streams real-time logs from a given pod.
  987. func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer string, rw *websocket.WebsocketSafeReadWriter) error {
  988. // get the pod to read in the list of contains
  989. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  990. context.Background(),
  991. name,
  992. metav1.GetOptions{},
  993. )
  994. if err != nil && errors.IsNotFound(err) {
  995. return IsNotFoundError
  996. } else if err != nil {
  997. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  998. }
  999. // see if container is ready and able to open a stream. If not, wait for container
  1000. // to be ready.
  1001. err, _ = a.waitForPod(pod)
  1002. if err != nil && goerrors.Is(err, IsNotFoundError) {
  1003. return IsNotFoundError
  1004. } else if err != nil {
  1005. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  1006. }
  1007. container := pod.Spec.Containers[0].Name
  1008. if len(selectedContainer) > 0 {
  1009. container = selectedContainer
  1010. }
  1011. tails := int64(400)
  1012. // follow logs
  1013. podLogOpts := v1.PodLogOptions{
  1014. Follow: true,
  1015. TailLines: &tails,
  1016. Container: container,
  1017. }
  1018. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  1019. podLogs, err := req.Stream(context.TODO())
  1020. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  1021. // we'd like to pass this through to the client.
  1022. if err != nil && errors.IsBadRequest(err) {
  1023. return &BadRequestError{err.Error()}
  1024. } else if err != nil {
  1025. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  1026. }
  1027. r := bufio.NewReader(podLogs)
  1028. errorchan := make(chan error)
  1029. var wg sync.WaitGroup
  1030. var once sync.Once
  1031. wg.Add(2)
  1032. go func() {
  1033. wg.Wait()
  1034. close(errorchan)
  1035. }()
  1036. go func() {
  1037. defer func() {
  1038. if r := recover(); r != nil {
  1039. // TODO: add method to alert on panic
  1040. return
  1041. }
  1042. }()
  1043. // listens for websocket closing handshake
  1044. defer wg.Done()
  1045. for {
  1046. if _, _, err := rw.ReadMessage(); err != nil {
  1047. errorchan <- nil
  1048. return
  1049. }
  1050. }
  1051. }()
  1052. go func() {
  1053. defer func() {
  1054. if r := recover(); r != nil {
  1055. // TODO: add method to alert on panic
  1056. return
  1057. }
  1058. }()
  1059. defer wg.Done()
  1060. for {
  1061. bytes, err := r.ReadBytes('\n')
  1062. if err != nil {
  1063. errorchan <- err
  1064. return
  1065. }
  1066. if _, writeErr := rw.Write(bytes); writeErr != nil {
  1067. errorchan <- writeErr
  1068. return
  1069. }
  1070. }
  1071. }()
  1072. for err = range errorchan {
  1073. // only call these methods a single time
  1074. once.Do(func() {
  1075. rw.Close()
  1076. podLogs.Close()
  1077. })
  1078. }
  1079. return err
  1080. }
  1081. // GetPodLogs streams real-time logs from a given pod.
  1082. func (a *Agent) GetPreviousPodLogs(namespace string, name string, selectedContainer string) ([]string, error) {
  1083. // get the pod to read in the list of contains
  1084. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  1085. context.Background(),
  1086. name,
  1087. metav1.GetOptions{},
  1088. )
  1089. if err != nil && errors.IsNotFound(err) {
  1090. return nil, IsNotFoundError
  1091. } else if err != nil {
  1092. return nil, fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  1093. }
  1094. container := pod.Spec.Containers[0].Name
  1095. if len(selectedContainer) > 0 {
  1096. container = selectedContainer
  1097. }
  1098. tails := int64(400)
  1099. // follow logs
  1100. podLogOpts := v1.PodLogOptions{
  1101. Follow: true,
  1102. TailLines: &tails,
  1103. Container: container,
  1104. Previous: true,
  1105. }
  1106. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  1107. podLogs, err := req.Stream(context.TODO())
  1108. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  1109. // we'd like to pass this through to the client.
  1110. if err != nil && strings.Contains(err.Error(), "not found") {
  1111. return nil, IsNotFoundError
  1112. }
  1113. if err != nil && errors.IsBadRequest(err) {
  1114. return nil, &BadRequestError{err.Error()}
  1115. } else if err != nil {
  1116. return nil, fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  1117. }
  1118. defer podLogs.Close()
  1119. r := bufio.NewReader(podLogs)
  1120. var logs []string
  1121. for {
  1122. line, err := r.ReadString('\n')
  1123. logs = append(logs, line)
  1124. if err == io.EOF {
  1125. break
  1126. } else if err != nil {
  1127. return nil, err
  1128. }
  1129. }
  1130. return logs, nil
  1131. }
  1132. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  1133. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  1134. jobPods, err := a.GetJobPods(namespace, name)
  1135. if err != nil {
  1136. return err
  1137. }
  1138. podName := jobPods[0].ObjectMeta.Name
  1139. restConf, err := a.RESTClientGetter.ToRESTConfig()
  1140. restConf.GroupVersion = &schema.GroupVersion{
  1141. Group: "api",
  1142. Version: "v1",
  1143. }
  1144. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  1145. restClient, err := rest.RESTClientFor(restConf)
  1146. if err != nil {
  1147. return err
  1148. }
  1149. req := restClient.Post().
  1150. Resource("pods").
  1151. Name(podName).
  1152. Namespace(namespace).
  1153. SubResource("exec")
  1154. req.Param("command", "./signal.sh")
  1155. req.Param("container", "sidecar")
  1156. req.Param("stdin", "true")
  1157. req.Param("stdout", "false")
  1158. req.Param("tty", "false")
  1159. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  1160. if err != nil {
  1161. return err
  1162. }
  1163. return exec.Stream(remotecommand.StreamOptions{
  1164. Tty: false,
  1165. Stdin: strings.NewReader("./signal.sh"),
  1166. })
  1167. }
  1168. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  1169. // the task some number of times until failing
  1170. func (a *Agent) RunWebsocketTask(task func() error) error {
  1171. lastTime := int64(0)
  1172. for {
  1173. if err := a.UpdateClientset(); err != nil {
  1174. return err
  1175. }
  1176. err := task()
  1177. if err == nil {
  1178. return nil
  1179. }
  1180. if !goerrors.Is(err, &AuthError{}) {
  1181. return err
  1182. }
  1183. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  1184. return err
  1185. }
  1186. lastTime = time.Now().Unix()
  1187. }
  1188. }
  1189. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  1190. // TODO: Support Jobs
  1191. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1192. run := func() error {
  1193. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  1194. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  1195. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1196. options.LabelSelector = selectors
  1197. }
  1198. factory := informers.NewSharedInformerFactoryWithOptions(
  1199. a.Clientset,
  1200. 0,
  1201. informers.WithTweakListOptions(tweakListOptionsFunc),
  1202. )
  1203. var informer cache.SharedInformer
  1204. // Spins up an informer depending on kind. Convert to lowercase for robustness
  1205. switch strings.ToLower(kind) {
  1206. case "deployment":
  1207. informer = factory.Apps().V1().Deployments().Informer()
  1208. case "statefulset":
  1209. informer = factory.Apps().V1().StatefulSets().Informer()
  1210. case "replicaset":
  1211. informer = factory.Apps().V1().ReplicaSets().Informer()
  1212. case "daemonset":
  1213. informer = factory.Apps().V1().DaemonSets().Informer()
  1214. case "job":
  1215. informer = factory.Batch().V1().Jobs().Informer()
  1216. case "cronjob":
  1217. informer = factory.Batch().V1beta1().CronJobs().Informer()
  1218. case "namespace":
  1219. informer = factory.Core().V1().Namespaces().Informer()
  1220. case "pod":
  1221. informer = factory.Core().V1().Pods().Informer()
  1222. }
  1223. stopper := make(chan struct{})
  1224. errorchan := make(chan error)
  1225. var wg sync.WaitGroup
  1226. var once sync.Once
  1227. var err error
  1228. wg.Add(2)
  1229. go func() {
  1230. wg.Wait()
  1231. close(errorchan)
  1232. }()
  1233. go func() {
  1234. defer func() {
  1235. if r := recover(); r != nil {
  1236. // TODO: add method to alert on panic
  1237. return
  1238. }
  1239. }()
  1240. // listens for websocket closing handshake
  1241. defer wg.Done()
  1242. for {
  1243. if _, _, err := rw.ReadMessage(); err != nil {
  1244. errorchan <- nil
  1245. return
  1246. }
  1247. }
  1248. }()
  1249. go func() {
  1250. defer func() {
  1251. if r := recover(); r != nil {
  1252. // TODO: add method to alert on panic
  1253. return
  1254. }
  1255. }()
  1256. // listens for websocket closing handshake
  1257. defer wg.Done()
  1258. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1259. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1260. errorchan <- &AuthError{}
  1261. }
  1262. })
  1263. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1264. UpdateFunc: func(oldObj, newObj interface{}) {
  1265. msg := Message{
  1266. EventType: "UPDATE",
  1267. Object: newObj,
  1268. Kind: strings.ToLower(kind),
  1269. }
  1270. err := rw.WriteJSON(msg)
  1271. if err != nil {
  1272. errorchan <- err
  1273. }
  1274. },
  1275. AddFunc: func(obj interface{}) {
  1276. msg := Message{
  1277. EventType: "ADD",
  1278. Object: obj,
  1279. Kind: strings.ToLower(kind),
  1280. }
  1281. err := rw.WriteJSON(msg)
  1282. if err != nil {
  1283. errorchan <- err
  1284. }
  1285. },
  1286. DeleteFunc: func(obj interface{}) {
  1287. msg := Message{
  1288. EventType: "DELETE",
  1289. Object: obj,
  1290. Kind: strings.ToLower(kind),
  1291. }
  1292. err := rw.WriteJSON(msg)
  1293. if err != nil {
  1294. errorchan <- err
  1295. }
  1296. },
  1297. })
  1298. informer.Run(stopper)
  1299. }()
  1300. for err = range errorchan {
  1301. once.Do(func() {
  1302. close(stopper)
  1303. rw.Close()
  1304. })
  1305. }
  1306. return err
  1307. }
  1308. return a.RunWebsocketTask(run)
  1309. }
  1310. var b64 = base64.StdEncoding
  1311. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  1312. func decodeRelease(data string) (*rspb.Release, error) {
  1313. // base64 decode string
  1314. b, err := b64.DecodeString(data)
  1315. if err != nil {
  1316. return nil, err
  1317. }
  1318. // For backwards compatibility with releases that were stored before
  1319. // compression was introduced we skip decompression if the
  1320. // gzip magic header is not found
  1321. if bytes.Equal(b[0:3], magicGzip) {
  1322. r, err := gzip.NewReader(bytes.NewReader(b))
  1323. if err != nil {
  1324. return nil, err
  1325. }
  1326. defer r.Close()
  1327. b2, err := ioutil.ReadAll(r)
  1328. if err != nil {
  1329. return nil, err
  1330. }
  1331. b = b2
  1332. }
  1333. var rls rspb.Release
  1334. // unmarshal release object bytes
  1335. if err := json.Unmarshal(b, &rls); err != nil {
  1336. return nil, err
  1337. }
  1338. return &rls, nil
  1339. }
  1340. func contains(s []string, str string) bool {
  1341. for _, v := range s {
  1342. if v == str {
  1343. return true
  1344. }
  1345. }
  1346. return false
  1347. }
  1348. func ParseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  1349. if secret.Type != "helm.sh/release.v1" {
  1350. return nil, true, nil
  1351. }
  1352. releaseData, ok := secret.Data["release"]
  1353. if !ok {
  1354. return nil, true, fmt.Errorf("release field not found")
  1355. }
  1356. helm_object, err := decodeRelease(string(releaseData))
  1357. if err != nil {
  1358. return nil, true, err
  1359. }
  1360. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  1361. return nil, true, nil
  1362. }
  1363. return helm_object, false, nil
  1364. }
  1365. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  1366. run := func() error {
  1367. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  1368. options.LabelSelector = selectors
  1369. }
  1370. factory := informers.NewSharedInformerFactoryWithOptions(
  1371. a.Clientset,
  1372. 0,
  1373. informers.WithTweakListOptions(tweakListOptionsFunc),
  1374. informers.WithNamespace(namespace),
  1375. )
  1376. informer := factory.Core().V1().Secrets().Informer()
  1377. stopper := make(chan struct{})
  1378. errorchan := make(chan error)
  1379. var wg sync.WaitGroup
  1380. var once sync.Once
  1381. var err error
  1382. wg.Add(2)
  1383. go func() {
  1384. wg.Wait()
  1385. close(errorchan)
  1386. }()
  1387. go func() {
  1388. defer func() {
  1389. if r := recover(); r != nil {
  1390. // TODO: add method to alert on panic
  1391. return
  1392. }
  1393. }()
  1394. // listens for websocket closing handshake
  1395. defer wg.Done()
  1396. for {
  1397. if _, _, err := rw.ReadMessage(); err != nil {
  1398. errorchan <- nil
  1399. return
  1400. }
  1401. }
  1402. }()
  1403. go func() {
  1404. defer func() {
  1405. if r := recover(); r != nil {
  1406. // TODO: add method to alert on panic
  1407. return
  1408. }
  1409. }()
  1410. // listens for websocket closing handshake
  1411. defer wg.Done()
  1412. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  1413. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  1414. errorchan <- &AuthError{}
  1415. }
  1416. })
  1417. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  1418. UpdateFunc: func(oldObj, newObj interface{}) {
  1419. secretObj, ok := newObj.(*v1.Secret)
  1420. if !ok {
  1421. errorchan <- fmt.Errorf("could not cast to secret")
  1422. return
  1423. }
  1424. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1425. if isNotHelmRelease && err == nil {
  1426. return
  1427. }
  1428. if err != nil {
  1429. errorchan <- err
  1430. return
  1431. }
  1432. msg := Message{
  1433. EventType: "UPDATE",
  1434. Object: helm_object,
  1435. }
  1436. rw.WriteJSON(msg)
  1437. },
  1438. AddFunc: func(obj interface{}) {
  1439. secretObj, ok := obj.(*v1.Secret)
  1440. if !ok {
  1441. errorchan <- fmt.Errorf("could not cast to secret")
  1442. return
  1443. }
  1444. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1445. if isNotHelmRelease && err == nil {
  1446. return
  1447. }
  1448. if err != nil {
  1449. errorchan <- err
  1450. return
  1451. }
  1452. msg := Message{
  1453. EventType: "ADD",
  1454. Object: helm_object,
  1455. }
  1456. rw.WriteJSON(msg)
  1457. },
  1458. DeleteFunc: func(obj interface{}) {
  1459. secretObj, ok := obj.(*v1.Secret)
  1460. if !ok {
  1461. errorchan <- fmt.Errorf("could not cast to secret")
  1462. return
  1463. }
  1464. helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
  1465. if isNotHelmRelease && err == nil {
  1466. return
  1467. }
  1468. if err != nil {
  1469. errorchan <- err
  1470. return
  1471. }
  1472. msg := Message{
  1473. EventType: "DELETE",
  1474. Object: helm_object,
  1475. }
  1476. rw.WriteJSON(msg)
  1477. },
  1478. })
  1479. informer.Run(stopper)
  1480. }()
  1481. for err = range errorchan {
  1482. once.Do(func() {
  1483. close(stopper)
  1484. rw.Close()
  1485. })
  1486. }
  1487. return err
  1488. }
  1489. return a.RunWebsocketTask(run)
  1490. }
  1491. func (a *Agent) StreamPorterAgentLokiLog(
  1492. ctx context.Context,
  1493. labels []string,
  1494. startTime string,
  1495. searchParam string,
  1496. limit uint32,
  1497. rw *websocket.WebsocketSafeReadWriter,
  1498. ) error {
  1499. run := func() error {
  1500. errorchan := make(chan error)
  1501. var wg sync.WaitGroup
  1502. var once sync.Once
  1503. var err error
  1504. wg.Add(2)
  1505. go func() {
  1506. wg.Wait()
  1507. close(errorchan)
  1508. }()
  1509. go func() {
  1510. defer func() {
  1511. if r := recover(); r != nil {
  1512. // TODO: add method to alert on panic
  1513. return
  1514. }
  1515. }()
  1516. defer wg.Done()
  1517. // listens for websocket closing handshake
  1518. for {
  1519. if _, _, err := rw.ReadMessage(); err != nil {
  1520. errorchan <- nil
  1521. return
  1522. }
  1523. }
  1524. }()
  1525. go func() {
  1526. defer func() {
  1527. if r := recover(); r != nil {
  1528. // TODO: add method to alert on panic
  1529. return
  1530. }
  1531. }()
  1532. defer wg.Done()
  1533. podList, err := a.Clientset.CoreV1().Pods("porter-agent-system").List(ctx, metav1.ListOptions{
  1534. LabelSelector: "control-plane=controller-manager",
  1535. })
  1536. if err != nil {
  1537. errorchan <- err
  1538. return
  1539. }
  1540. if len(podList.Items) == 0 {
  1541. errorchan <- fmt.Errorf("no porter agent pods found")
  1542. return
  1543. }
  1544. pod := podList.Items[0]
  1545. restConf, err := a.RESTClientGetter.ToRESTConfig()
  1546. if err != nil {
  1547. errorchan <- err
  1548. return
  1549. }
  1550. req := a.Clientset.CoreV1().RESTClient().
  1551. Post().
  1552. Resource("pods").
  1553. Name(pod.Name).
  1554. Namespace(pod.Namespace).
  1555. SubResource("exec")
  1556. cmd := []string{
  1557. "/porter/agent-cli",
  1558. "--start",
  1559. startTime,
  1560. }
  1561. for _, label := range labels {
  1562. cmd = append(cmd, "--label", label)
  1563. }
  1564. if searchParam != "" {
  1565. cmd = append(cmd, "--search", searchParam)
  1566. }
  1567. if limit > 0 {
  1568. cmd = append(cmd, "--limit", fmt.Sprintf("%d", limit))
  1569. }
  1570. opts := &v1.PodExecOptions{
  1571. Command: cmd,
  1572. Stdout: true,
  1573. Stderr: true,
  1574. }
  1575. req.VersionedParams(
  1576. opts,
  1577. scheme.ParameterCodec,
  1578. )
  1579. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  1580. if err != nil {
  1581. errorchan <- err
  1582. return
  1583. }
  1584. err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
  1585. Stdin: nil,
  1586. Stdout: rw,
  1587. Stderr: rw,
  1588. })
  1589. if err != nil {
  1590. errorchan <- err
  1591. return
  1592. }
  1593. }()
  1594. for err = range errorchan {
  1595. once.Do(func() {
  1596. rw.Close()
  1597. })
  1598. }
  1599. return err
  1600. }
  1601. return a.RunWebsocketTask(run)
  1602. }
  1603. // CreateImagePullSecrets will create the required image pull secrets and
  1604. // return a map from the registry name to the name of the secret.
  1605. func (a *Agent) CreateImagePullSecrets(
  1606. repo repository.Repository,
  1607. namespace string,
  1608. linkedRegs map[string]*models.Registry,
  1609. doAuth *oauth2.Config,
  1610. ) (map[string]string, error) {
  1611. res := make(map[string]string)
  1612. for key, val := range linkedRegs {
  1613. _reg := registry.Registry(*val)
  1614. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1615. if err != nil {
  1616. return nil, err
  1617. }
  1618. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1619. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1620. context.TODO(),
  1621. secretName,
  1622. metav1.GetOptions{},
  1623. )
  1624. // if not found, create the secret
  1625. if err != nil && errors.IsNotFound(err) {
  1626. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1627. context.TODO(),
  1628. &v1.Secret{
  1629. ObjectMeta: metav1.ObjectMeta{
  1630. Name: secretName,
  1631. },
  1632. Data: map[string][]byte{
  1633. string(v1.DockerConfigJsonKey): data,
  1634. },
  1635. Type: v1.SecretTypeDockerConfigJson,
  1636. },
  1637. metav1.CreateOptions{},
  1638. )
  1639. if err != nil {
  1640. return nil, err
  1641. }
  1642. // add secret name to the map
  1643. res[key] = secretName
  1644. continue
  1645. } else if err != nil {
  1646. return nil, err
  1647. }
  1648. // otherwise, check that the secret contains the correct data: if
  1649. // if doesn't, update it
  1650. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1651. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1652. context.TODO(),
  1653. &v1.Secret{
  1654. ObjectMeta: metav1.ObjectMeta{
  1655. Name: secretName,
  1656. },
  1657. Data: map[string][]byte{
  1658. string(v1.DockerConfigJsonKey): data,
  1659. },
  1660. Type: v1.SecretTypeDockerConfigJson,
  1661. },
  1662. metav1.UpdateOptions{},
  1663. )
  1664. if err != nil {
  1665. return nil, err
  1666. }
  1667. }
  1668. // add secret name to the map
  1669. res[key] = secretName
  1670. }
  1671. return res, nil
  1672. }
  1673. // RunCommandOnPod creates an ephemeral pod from the given pod with the given args as its start command.
  1674. func (a *Agent) RunCommandOnPod(ctx context.Context, p *v1.Pod, args []string) error {
  1675. container := p.Spec.Containers[0].Name
  1676. newPod, err := a.createEphemeralPodFromExisting(ctx, p, container, args)
  1677. if err != nil {
  1678. return err
  1679. }
  1680. podName := newPod.ObjectMeta.Name
  1681. err, _ = a.waitForPod(newPod)
  1682. if err != nil {
  1683. return err
  1684. }
  1685. // refresh pod info for latest status
  1686. newPod, err = a.Clientset.CoreV1().
  1687. Pods(newPod.Namespace).
  1688. Get(ctx, newPod.Name, metav1.GetOptions{})
  1689. if err != nil {
  1690. return err
  1691. }
  1692. req := a.Clientset.CoreV1().RESTClient().Post().
  1693. Resource("pods").
  1694. Name(podName).
  1695. Namespace(newPod.Namespace).
  1696. SubResource("attach")
  1697. req.Param("stdin", "true")
  1698. req.Param("stdout", "true")
  1699. req.Param("tty", "true")
  1700. req.Param("container", container)
  1701. restConf, err := a.RESTClientGetter.ToRESTConfig()
  1702. if err != nil {
  1703. return err
  1704. }
  1705. _, err = remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  1706. if err != nil {
  1707. return err
  1708. }
  1709. return nil
  1710. }
  1711. // helper that waits for pod to be ready
  1712. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1713. var (
  1714. w watch.Interface
  1715. err error
  1716. ok bool
  1717. )
  1718. // immediately after creating a pod, the API may return a 404. heuristically 1
  1719. // second seems to be plenty.
  1720. watchRetries := 3
  1721. for i := 0; i < watchRetries; i++ {
  1722. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1723. w, err = a.Clientset.CoreV1().
  1724. Pods(pod.Namespace).
  1725. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1726. if err == nil {
  1727. break
  1728. }
  1729. time.Sleep(time.Second)
  1730. }
  1731. if err != nil {
  1732. return err, false
  1733. }
  1734. defer w.Stop()
  1735. expireTime := time.Now().Add(time.Second * 30)
  1736. for time.Now().Unix() <= expireTime.Unix() {
  1737. select {
  1738. case <-time.NewTicker(time.Second).C:
  1739. // poll every second in case we already missed the ready event while
  1740. // creating the listener.
  1741. pod, err = a.Clientset.CoreV1().
  1742. Pods(pod.Namespace).
  1743. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1744. if err != nil && errors.IsNotFound(err) {
  1745. return IsNotFoundError, false
  1746. } else if err != nil {
  1747. return err, false
  1748. }
  1749. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1750. return nil, isExited
  1751. }
  1752. case evt := <-w.ResultChan():
  1753. pod, ok = evt.Object.(*v1.Pod)
  1754. if !ok {
  1755. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1756. }
  1757. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1758. return nil, isExited
  1759. }
  1760. }
  1761. }
  1762. return goerrors.New("timed out waiting for pod"), false
  1763. }
  1764. func isPodReady(pod *v1.Pod) bool {
  1765. ready := false
  1766. conditions := pod.Status.Conditions
  1767. for i := range conditions {
  1768. if conditions[i].Type == v1.PodReady {
  1769. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1770. }
  1771. }
  1772. return ready
  1773. }
  1774. func isPodExited(pod *v1.Pod) bool {
  1775. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1776. }
  1777. func (a *Agent) createEphemeralPodFromExisting(
  1778. ctx context.Context,
  1779. existing *v1.Pod,
  1780. container string,
  1781. args []string,
  1782. ) (*v1.Pod, error) {
  1783. newPod := existing.DeepCopy()
  1784. // only copy the pod spec, overwrite metadata
  1785. suffix, err := RandomString(4)
  1786. if err != nil {
  1787. return nil, err
  1788. }
  1789. newPod.ObjectMeta = metav1.ObjectMeta{
  1790. Name: strings.ToLower(fmt.Sprintf("%s-copy-%s", existing.ObjectMeta.Name, suffix)),
  1791. Namespace: existing.ObjectMeta.Namespace,
  1792. }
  1793. newPod.Status = v1.PodStatus{}
  1794. // set restart policy to never
  1795. newPod.Spec.RestartPolicy = v1.RestartPolicyNever
  1796. // change the command in the pod to the passed in pod command
  1797. cmdRoot := args[0]
  1798. cmdArgs := make([]string, 0)
  1799. // annotate with the ephemeral pod tag
  1800. newPod.Labels = make(map[string]string)
  1801. newPod.Labels["porter/ephemeral-pod"] = "true"
  1802. if len(args) > 1 {
  1803. cmdArgs = args[1:]
  1804. }
  1805. for i := 0; i < len(newPod.Spec.Containers); i++ {
  1806. if newPod.Spec.Containers[i].Name == container {
  1807. newPod.Spec.Containers[i].Command = []string{cmdRoot}
  1808. newPod.Spec.Containers[i].Args = cmdArgs
  1809. newPod.Spec.Containers[i].TTY = true
  1810. newPod.Spec.Containers[i].Stdin = true
  1811. newPod.Spec.Containers[i].StdinOnce = true
  1812. var newCpu int
  1813. if newPod.Spec.Containers[i].Resources.Requests.Cpu() != nil && newPod.Spec.Containers[i].Resources.Requests.Cpu().MilliValue() > 500 {
  1814. newCpu = 500
  1815. }
  1816. if newCpu != 0 {
  1817. newPod.Spec.Containers[i].Resources.Limits[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", newCpu))
  1818. newPod.Spec.Containers[i].Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", newCpu))
  1819. for j := 0; j < len(newPod.Spec.Containers[i].Env); j++ {
  1820. if newPod.Spec.Containers[i].Env[j].Name == "PORTER_RESOURCES_CPU" {
  1821. newPod.Spec.Containers[i].Env[j].Value = fmt.Sprintf("%dm", newCpu)
  1822. break
  1823. }
  1824. }
  1825. }
  1826. var newMemory int
  1827. if newPod.Spec.Containers[i].Resources.Requests.Memory() != nil && newPod.Spec.Containers[i].Resources.Requests.Memory().Value() > 1000*1024*1024 {
  1828. newMemory = 1000
  1829. }
  1830. if newMemory != 0 {
  1831. newPod.Spec.Containers[i].Resources.Limits[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", newMemory))
  1832. newPod.Spec.Containers[i].Resources.Requests[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", newMemory))
  1833. for j := 0; j < len(newPod.Spec.Containers[i].Env); j++ {
  1834. if newPod.Spec.Containers[i].Env[j].Name == "PORTER_RESOURCES_RAM" {
  1835. newPod.Spec.Containers[i].Env[j].Value = fmt.Sprintf("%dMi", newMemory)
  1836. break
  1837. }
  1838. }
  1839. }
  1840. }
  1841. // remove health checks and probes
  1842. newPod.Spec.Containers[i].LivenessProbe = nil
  1843. newPod.Spec.Containers[i].ReadinessProbe = nil
  1844. newPod.Spec.Containers[i].StartupProbe = nil
  1845. }
  1846. newPod.Spec.NodeName = ""
  1847. // create the pod and return it
  1848. return a.Clientset.CoreV1().Pods(existing.ObjectMeta.Namespace).Create(
  1849. ctx,
  1850. newPod,
  1851. metav1.CreateOptions{},
  1852. )
  1853. }