agent_server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package v2
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/porter-dev/porter/api/types"
  7. "github.com/porter-dev/porter/internal/telemetry"
  8. v1 "k8s.io/api/core/v1"
  9. "k8s.io/client-go/kubernetes"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. )
  12. // returns the agent service
  13. func GetAgentService(clientset kubernetes.Interface) (*v1.Service, error) {
  14. return clientset.CoreV1().Services("porter-agent-system").Get(
  15. context.TODO(),
  16. "porter-agent-controller-manager",
  17. metav1.GetOptions{},
  18. )
  19. }
  20. func ListPorterEvents(
  21. clientset kubernetes.Interface,
  22. service *v1.Service,
  23. req *types.ListEventsRequest,
  24. ) (*types.ListEventsResponse, error) {
  25. vals := make(map[string]string)
  26. if req.Type != nil {
  27. vals["type"] = *req.Type
  28. }
  29. if req.ReleaseName != nil {
  30. vals["release_name"] = *req.ReleaseName
  31. }
  32. if req.ReleaseNamespace != nil {
  33. vals["release_namespace"] = *req.ReleaseNamespace
  34. }
  35. if req.PaginationRequest != nil {
  36. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  37. }
  38. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  39. "http",
  40. service.Name,
  41. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  42. "/events",
  43. vals,
  44. )
  45. rawQuery, err := resp.DoRaw(context.Background())
  46. if err != nil {
  47. return nil, err
  48. }
  49. eventsResp := &types.ListEventsResponse{}
  50. err = json.Unmarshal(rawQuery, eventsResp)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return eventsResp, nil
  55. }
  56. func ListPorterJobEvents(
  57. clientset kubernetes.Interface,
  58. service *v1.Service,
  59. req *types.ListJobEventsRequest,
  60. ) (*types.ListEventsResponse, error) {
  61. vals := make(map[string]string)
  62. vals["job_name"] = req.JobName
  63. if req.Type != nil {
  64. vals["type"] = *req.Type
  65. }
  66. if req.ReleaseName != nil {
  67. vals["release_name"] = *req.ReleaseName
  68. }
  69. if req.ReleaseNamespace != nil {
  70. vals["release_namespace"] = *req.ReleaseNamespace
  71. }
  72. if req.PaginationRequest != nil {
  73. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  74. }
  75. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  76. "http",
  77. service.Name,
  78. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  79. "/events/job",
  80. vals,
  81. )
  82. rawQuery, err := resp.DoRaw(context.Background())
  83. if err != nil {
  84. return nil, err
  85. }
  86. eventsResp := &types.ListEventsResponse{}
  87. err = json.Unmarshal(rawQuery, eventsResp)
  88. if err != nil {
  89. return nil, err
  90. }
  91. return eventsResp, nil
  92. }
  93. func ListIncidents(
  94. clientset kubernetes.Interface,
  95. service *v1.Service,
  96. req *types.ListIncidentsRequest,
  97. ) (*types.ListIncidentsResponse, error) {
  98. vals := make(map[string]string)
  99. if req.Status != nil {
  100. vals["status"] = string(*req.Status)
  101. }
  102. if req.ReleaseName != nil {
  103. vals["release_name"] = *req.ReleaseName
  104. }
  105. if req.ReleaseNamespace != nil {
  106. vals["release_namespace"] = *req.ReleaseNamespace
  107. }
  108. if req.PaginationRequest != nil {
  109. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  110. }
  111. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  112. "http",
  113. service.Name,
  114. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  115. "/incidents",
  116. vals,
  117. )
  118. rawQuery, err := resp.DoRaw(context.Background())
  119. if err != nil {
  120. return nil, err
  121. }
  122. incidentsResp := &types.ListIncidentsResponse{}
  123. err = json.Unmarshal(rawQuery, incidentsResp)
  124. if err != nil {
  125. return nil, err
  126. }
  127. return incidentsResp, nil
  128. }
  129. func GetIncidentByID(
  130. clientset kubernetes.Interface,
  131. service *v1.Service,
  132. incidentID string,
  133. ) (*types.Incident, error) {
  134. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  135. "http",
  136. service.Name,
  137. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  138. fmt.Sprintf("/incidents/%s", incidentID),
  139. nil,
  140. )
  141. rawQuery, err := resp.DoRaw(context.Background())
  142. if err != nil {
  143. return nil, err
  144. }
  145. incident := &types.Incident{}
  146. if err := json.Unmarshal(rawQuery, incident); err != nil {
  147. return nil, err
  148. }
  149. return incident, nil
  150. }
  151. func ListIncidentEvents(
  152. clientset kubernetes.Interface,
  153. service *v1.Service,
  154. req *types.ListIncidentEventsRequest,
  155. ) (*types.ListIncidentEventsResponse, error) {
  156. vals := make(map[string]string)
  157. if req.IncidentID != nil {
  158. vals["incident_id"] = *req.IncidentID
  159. }
  160. if req.PodName != nil {
  161. vals["pod_name"] = *req.PodName
  162. }
  163. if req.PodNamespace != nil {
  164. vals["pod_namespace"] = *req.PodNamespace
  165. }
  166. if req.Summary != nil {
  167. vals["summary"] = *req.Summary
  168. }
  169. if req.PaginationRequest != nil {
  170. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  171. }
  172. if req.PodPrefix != nil {
  173. vals["pod_prefix"] = *req.PodPrefix
  174. }
  175. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  176. "http",
  177. service.Name,
  178. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  179. "/incidents/events",
  180. vals,
  181. )
  182. rawQuery, err := resp.DoRaw(context.Background())
  183. if err != nil {
  184. return nil, err
  185. }
  186. events := &types.ListIncidentEventsResponse{}
  187. if err := json.Unmarshal(rawQuery, events); err != nil {
  188. return nil, err
  189. }
  190. return events, nil
  191. }
  192. func GetHistoricalLogs(
  193. ctx context.Context,
  194. clientset kubernetes.Interface,
  195. service *v1.Service,
  196. req *types.GetLogRequest,
  197. ) (*types.GetLogResponse, error) {
  198. ctx, span := telemetry.NewSpan(ctx, "agent-get-historical-logs")
  199. defer span.End()
  200. vals := make(map[string]string)
  201. if req.Limit != 0 {
  202. vals["limit"] = fmt.Sprintf("%d", req.Limit)
  203. }
  204. if req.StartRange != nil {
  205. startVal, err := req.StartRange.MarshalText()
  206. if err != nil {
  207. return nil, telemetry.Error(ctx, span, err, "unable to marshal start range")
  208. }
  209. vals["start_range"] = string(startVal)
  210. }
  211. if req.EndRange != nil {
  212. endVal, err := req.EndRange.MarshalText()
  213. if err != nil {
  214. return nil, err
  215. }
  216. vals["end_range"] = string(endVal)
  217. }
  218. vals["pod_selector"] = req.PodSelector
  219. if req.Namespace != "" {
  220. vals["namespace"] = req.Namespace
  221. }
  222. vals["revision"] = req.Revision
  223. if req.SearchParam != "" {
  224. vals["search_param"] = req.SearchParam
  225. }
  226. if req.Direction != "" {
  227. vals["direction"] = req.Direction
  228. }
  229. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  230. "http",
  231. service.Name,
  232. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  233. "/logs",
  234. vals,
  235. )
  236. rawQuery, err := resp.DoRaw(ctx)
  237. if err != nil {
  238. return nil, telemetry.Error(ctx, span, err, "unable to do get raw response")
  239. }
  240. logsResp := &types.GetLogResponse{}
  241. err = json.Unmarshal(rawQuery, logsResp)
  242. if err != nil {
  243. return nil, telemetry.Error(ctx, span, err, "unable to unmarshal logs response")
  244. }
  245. return logsResp, nil
  246. }
  247. // Logs returns logs from the porter agent matching the provided labels and other query parameters
  248. func Logs(
  249. clientset kubernetes.Interface,
  250. service *v1.Service,
  251. req *types.LogRequest,
  252. ) (*types.GetLogResponse, error) {
  253. vals := make(map[string]string)
  254. if req.Limit != 0 {
  255. vals["limit"] = fmt.Sprintf("%d", req.Limit)
  256. }
  257. if req.StartRange != nil {
  258. startVal, err := req.StartRange.MarshalText()
  259. if err != nil {
  260. return nil, err
  261. }
  262. vals["start_range"] = string(startVal)
  263. }
  264. if req.EndRange != nil {
  265. endVal, err := req.EndRange.MarshalText()
  266. if err != nil {
  267. return nil, err
  268. }
  269. vals["end_range"] = string(endVal)
  270. }
  271. if req.SearchParam != "" {
  272. vals["search_param"] = req.SearchParam
  273. }
  274. if req.Direction != "" {
  275. vals["direction"] = req.Direction
  276. }
  277. if req.MatchLabels != nil {
  278. json, err := json.Marshal(req.MatchLabels)
  279. if err != nil {
  280. return nil, fmt.Errorf("error marshalling match labels map to json: %w", err)
  281. }
  282. vals["match_labels_json"] = string(json)
  283. }
  284. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  285. "http",
  286. service.Name,
  287. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  288. "/logs",
  289. vals,
  290. )
  291. rawQuery, err := resp.DoRaw(context.Background())
  292. if err != nil {
  293. return nil, err
  294. }
  295. logsResp := &types.GetLogResponse{}
  296. err = json.Unmarshal(rawQuery, logsResp)
  297. if err != nil {
  298. return nil, err
  299. }
  300. return logsResp, nil
  301. }
  302. func GetPodValues(
  303. clientset kubernetes.Interface,
  304. service *v1.Service,
  305. req *types.GetPodValuesRequest,
  306. ) ([]string, error) {
  307. vals := make(map[string]string)
  308. if req.StartRange != nil {
  309. startVal, err := req.StartRange.MarshalText()
  310. if err != nil {
  311. return nil, err
  312. }
  313. vals["start_range"] = string(startVal)
  314. }
  315. if req.EndRange != nil {
  316. endVal, err := req.EndRange.MarshalText()
  317. if err != nil {
  318. return nil, err
  319. }
  320. vals["end_range"] = string(endVal)
  321. }
  322. vals["match_prefix"] = req.MatchPrefix
  323. vals["revision"] = req.Revision
  324. vals["namespace"] = req.Namespace
  325. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  326. "http",
  327. service.Name,
  328. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  329. "/logs/pod_values",
  330. vals,
  331. )
  332. rawQuery, err := resp.DoRaw(context.Background())
  333. if err != nil {
  334. return nil, err
  335. }
  336. valsResp := make([]string, 0)
  337. err = json.Unmarshal(rawQuery, &valsResp)
  338. if err != nil {
  339. return nil, err
  340. }
  341. return valsResp, nil
  342. }
  343. func GetRevisionValues(
  344. clientset kubernetes.Interface,
  345. service *v1.Service,
  346. req *types.GetRevisionValuesRequest,
  347. ) ([]string, error) {
  348. vals := make(map[string]string)
  349. if req.StartRange != nil {
  350. startVal, err := req.StartRange.MarshalText()
  351. if err != nil {
  352. return nil, err
  353. }
  354. vals["start_range"] = string(startVal)
  355. }
  356. if req.EndRange != nil {
  357. endVal, err := req.EndRange.MarshalText()
  358. if err != nil {
  359. return nil, err
  360. }
  361. vals["end_range"] = string(endVal)
  362. }
  363. vals["match_prefix"] = req.MatchPrefix
  364. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  365. "http",
  366. service.Name,
  367. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  368. "/logs/revision_values",
  369. vals,
  370. )
  371. rawQuery, err := resp.DoRaw(context.Background())
  372. if err != nil {
  373. return nil, err
  374. }
  375. valsResp := make([]string, 0)
  376. err = json.Unmarshal(rawQuery, &valsResp)
  377. if err != nil {
  378. return nil, err
  379. }
  380. return valsResp, nil
  381. }
  382. func GetHistoricalKubernetesEvents(
  383. clientset kubernetes.Interface,
  384. service *v1.Service,
  385. req *types.GetKubernetesEventRequest,
  386. ) (*types.GetKubernetesEventResponse, error) {
  387. vals := make(map[string]string)
  388. if req.Limit != 0 {
  389. vals["limit"] = fmt.Sprintf("%d", req.Limit)
  390. }
  391. if req.StartRange != nil {
  392. startVal, err := req.StartRange.MarshalText()
  393. if err != nil {
  394. return nil, err
  395. }
  396. vals["start_range"] = string(startVal)
  397. }
  398. if req.EndRange != nil {
  399. endVal, err := req.EndRange.MarshalText()
  400. if err != nil {
  401. return nil, err
  402. }
  403. vals["end_range"] = string(endVal)
  404. }
  405. vals["pod_selector"] = req.PodSelector
  406. vals["namespace"] = req.Namespace
  407. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  408. "http",
  409. service.Name,
  410. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  411. "/events",
  412. vals,
  413. )
  414. rawQuery, err := resp.DoRaw(context.Background())
  415. if err != nil {
  416. return nil, err
  417. }
  418. eventsResp := &types.GetKubernetesEventResponse{}
  419. err = json.Unmarshal(rawQuery, eventsResp)
  420. if err != nil {
  421. return nil, err
  422. }
  423. return eventsResp, nil
  424. }
  425. func GetAgentStatus(
  426. clientset kubernetes.Interface,
  427. service *v1.Service,
  428. ) (*types.GetAgentStatusResponse, error) {
  429. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  430. "http",
  431. service.Name,
  432. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  433. "/status",
  434. nil,
  435. )
  436. rawQuery, err := resp.DoRaw(context.Background())
  437. if err != nil {
  438. return nil, err
  439. }
  440. statusResp := &types.GetAgentStatusResponse{}
  441. err = json.Unmarshal(rawQuery, statusResp)
  442. if err != nil {
  443. return nil, err
  444. }
  445. return statusResp, nil
  446. }