agent_server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. package v2
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/porter-dev/porter/api/types"
  7. "github.com/porter-dev/porter/internal/telemetry"
  8. v1 "k8s.io/api/core/v1"
  9. "k8s.io/client-go/kubernetes"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. )
  12. // returns the agent service
  13. func GetAgentService(clientset kubernetes.Interface) (*v1.Service, error) {
  14. return clientset.CoreV1().Services("porter-agent-system").Get(
  15. context.TODO(),
  16. "porter-agent-controller-manager",
  17. metav1.GetOptions{},
  18. )
  19. }
  20. func ListPorterEvents(
  21. clientset kubernetes.Interface,
  22. service *v1.Service,
  23. req *types.ListEventsRequest,
  24. ) (*types.ListEventsResponse, error) {
  25. vals := make(map[string]string)
  26. if req.Type != nil {
  27. vals["type"] = *req.Type
  28. }
  29. if req.ReleaseName != nil {
  30. vals["release_name"] = *req.ReleaseName
  31. }
  32. if req.ReleaseNamespace != nil {
  33. vals["release_namespace"] = *req.ReleaseNamespace
  34. }
  35. if req.PaginationRequest != nil {
  36. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  37. }
  38. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  39. "http",
  40. service.Name,
  41. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  42. "/events",
  43. vals,
  44. )
  45. rawQuery, err := resp.DoRaw(context.Background())
  46. if err != nil {
  47. return nil, err
  48. }
  49. eventsResp := &types.ListEventsResponse{}
  50. err = json.Unmarshal(rawQuery, eventsResp)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return eventsResp, nil
  55. }
  56. func ListPorterJobEvents(
  57. clientset kubernetes.Interface,
  58. service *v1.Service,
  59. req *types.ListJobEventsRequest,
  60. ) (*types.ListEventsResponse, error) {
  61. vals := make(map[string]string)
  62. vals["job_name"] = req.JobName
  63. if req.Type != nil {
  64. vals["type"] = *req.Type
  65. }
  66. if req.ReleaseName != nil {
  67. vals["release_name"] = *req.ReleaseName
  68. }
  69. if req.ReleaseNamespace != nil {
  70. vals["release_namespace"] = *req.ReleaseNamespace
  71. }
  72. if req.PaginationRequest != nil {
  73. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  74. }
  75. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  76. "http",
  77. service.Name,
  78. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  79. "/events/job",
  80. vals,
  81. )
  82. rawQuery, err := resp.DoRaw(context.Background())
  83. if err != nil {
  84. return nil, err
  85. }
  86. eventsResp := &types.ListEventsResponse{}
  87. err = json.Unmarshal(rawQuery, eventsResp)
  88. if err != nil {
  89. return nil, err
  90. }
  91. return eventsResp, nil
  92. }
  93. func ListIncidents(
  94. clientset kubernetes.Interface,
  95. service *v1.Service,
  96. req *types.ListIncidentsRequest,
  97. ) (*types.ListIncidentsResponse, error) {
  98. vals := make(map[string]string)
  99. if req.Status != nil {
  100. vals["status"] = string(*req.Status)
  101. }
  102. if req.ReleaseName != nil {
  103. vals["release_name"] = *req.ReleaseName
  104. }
  105. if req.ReleaseNamespace != nil {
  106. vals["release_namespace"] = *req.ReleaseNamespace
  107. }
  108. if req.PaginationRequest != nil {
  109. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  110. }
  111. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  112. "http",
  113. service.Name,
  114. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  115. "/incidents",
  116. vals,
  117. )
  118. rawQuery, err := resp.DoRaw(context.Background())
  119. if err != nil {
  120. return nil, err
  121. }
  122. incidentsResp := &types.ListIncidentsResponse{}
  123. err = json.Unmarshal(rawQuery, incidentsResp)
  124. if err != nil {
  125. return nil, err
  126. }
  127. return incidentsResp, nil
  128. }
  129. func GetIncidentByID(
  130. clientset kubernetes.Interface,
  131. service *v1.Service,
  132. incidentID string,
  133. ) (*types.Incident, error) {
  134. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  135. "http",
  136. service.Name,
  137. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  138. fmt.Sprintf("/incidents/%s", incidentID),
  139. nil,
  140. )
  141. rawQuery, err := resp.DoRaw(context.Background())
  142. if err != nil {
  143. return nil, err
  144. }
  145. incident := &types.Incident{}
  146. if err := json.Unmarshal(rawQuery, incident); err != nil {
  147. return nil, err
  148. }
  149. return incident, nil
  150. }
  151. func ListIncidentEvents(
  152. clientset kubernetes.Interface,
  153. service *v1.Service,
  154. req *types.ListIncidentEventsRequest,
  155. ) (*types.ListIncidentEventsResponse, error) {
  156. vals := make(map[string]string)
  157. if req.IncidentID != nil {
  158. vals["incident_id"] = *req.IncidentID
  159. }
  160. if req.PodName != nil {
  161. vals["pod_name"] = *req.PodName
  162. }
  163. if req.PodNamespace != nil {
  164. vals["pod_namespace"] = *req.PodNamespace
  165. }
  166. if req.Summary != nil {
  167. vals["summary"] = *req.Summary
  168. }
  169. if req.PaginationRequest != nil {
  170. vals["page"] = fmt.Sprintf("%d", req.PaginationRequest.Page)
  171. }
  172. if req.PodPrefix != nil {
  173. vals["pod_prefix"] = *req.PodPrefix
  174. }
  175. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  176. "http",
  177. service.Name,
  178. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  179. "/incidents/events",
  180. vals,
  181. )
  182. rawQuery, err := resp.DoRaw(context.Background())
  183. if err != nil {
  184. return nil, err
  185. }
  186. events := &types.ListIncidentEventsResponse{}
  187. if err := json.Unmarshal(rawQuery, events); err != nil {
  188. return nil, err
  189. }
  190. return events, nil
  191. }
  192. func GetHistoricalLogs(
  193. ctx context.Context,
  194. clientset kubernetes.Interface,
  195. service *v1.Service,
  196. req *types.GetLogRequest,
  197. ) (*types.GetLogResponse, error) {
  198. ctx, span := telemetry.NewSpan(ctx, "agent-get-historical-logs")
  199. defer span.End()
  200. vals := make(map[string]string)
  201. if req.Limit != 0 {
  202. vals["limit"] = fmt.Sprintf("%d", req.Limit)
  203. }
  204. if req.StartRange != nil {
  205. startVal, err := req.StartRange.MarshalText()
  206. if err != nil {
  207. return nil, telemetry.Error(ctx, span, err, "unable to marshal start range")
  208. }
  209. vals["start_range"] = string(startVal)
  210. }
  211. if req.EndRange != nil {
  212. endVal, err := req.EndRange.MarshalText()
  213. if err != nil {
  214. return nil, err
  215. }
  216. vals["end_range"] = string(endVal)
  217. }
  218. vals["pod_selector"] = req.PodSelector
  219. if req.Namespace != "" {
  220. vals["namespace"] = req.Namespace
  221. }
  222. vals["revision"] = req.Revision
  223. if req.SearchParam != "" {
  224. vals["search_param"] = req.SearchParam
  225. }
  226. if req.Direction != "" {
  227. vals["direction"] = req.Direction
  228. }
  229. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  230. "http",
  231. service.Name,
  232. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  233. "/logs",
  234. vals,
  235. )
  236. rawQuery, err := resp.DoRaw(ctx)
  237. if err != nil {
  238. return nil, telemetry.Error(ctx, span, err, "unable to do get raw response")
  239. }
  240. logsResp := &types.GetLogResponse{}
  241. err = json.Unmarshal(rawQuery, logsResp)
  242. if err != nil {
  243. return nil, telemetry.Error(ctx, span, err, "unable to unmarshal logs response")
  244. }
  245. return logsResp, nil
  246. }
  247. // Logs returns logs from the porter agent matching the provided labels and other query parameters
  248. func Logs(
  249. ctx context.Context,
  250. clientset kubernetes.Interface,
  251. service *v1.Service,
  252. req *types.LogRequest,
  253. ) (*types.GetLogResponse, error) {
  254. ctx, span := telemetry.NewSpan(ctx, "agent-get-logs")
  255. defer span.End()
  256. vals := make(map[string]string)
  257. if req.Limit != 0 {
  258. vals["limit"] = fmt.Sprintf("%d", req.Limit)
  259. }
  260. if req.StartRange != nil {
  261. startVal, err := req.StartRange.MarshalText()
  262. if err != nil {
  263. return nil, telemetry.Error(ctx, span, err, "unable to marshal start range")
  264. }
  265. vals["start_range"] = string(startVal)
  266. }
  267. if req.EndRange != nil {
  268. endVal, err := req.EndRange.MarshalText()
  269. if err != nil {
  270. return nil, telemetry.Error(ctx, span, err, "unable to marshal end range")
  271. }
  272. vals["end_range"] = string(endVal)
  273. }
  274. if req.SearchParam != "" {
  275. vals["search_param"] = req.SearchParam
  276. }
  277. if req.Direction != "" {
  278. vals["direction"] = req.Direction
  279. }
  280. if req.MatchLabels != nil {
  281. json, err := json.Marshal(req.MatchLabels)
  282. if err != nil {
  283. return nil, fmt.Errorf("error marshalling match labels map to json: %w", err)
  284. }
  285. vals["match_labels_json"] = string(json)
  286. }
  287. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  288. "http",
  289. service.Name,
  290. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  291. "/logs",
  292. vals,
  293. )
  294. rawQuery, err := resp.DoRaw(ctx)
  295. if err != nil {
  296. return nil, telemetry.Error(ctx, span, err, "unable to get raw response")
  297. }
  298. logsResp := &types.GetLogResponse{}
  299. err = json.Unmarshal(rawQuery, logsResp)
  300. if err != nil {
  301. return nil, telemetry.Error(ctx, span, err, "unable to unmarshal logs response")
  302. }
  303. return logsResp, nil
  304. }
  305. func GetPodValues(
  306. clientset kubernetes.Interface,
  307. service *v1.Service,
  308. req *types.GetPodValuesRequest,
  309. ) ([]string, error) {
  310. vals := make(map[string]string)
  311. if req.StartRange != nil {
  312. startVal, err := req.StartRange.MarshalText()
  313. if err != nil {
  314. return nil, err
  315. }
  316. vals["start_range"] = string(startVal)
  317. }
  318. if req.EndRange != nil {
  319. endVal, err := req.EndRange.MarshalText()
  320. if err != nil {
  321. return nil, err
  322. }
  323. vals["end_range"] = string(endVal)
  324. }
  325. vals["match_prefix"] = req.MatchPrefix
  326. vals["revision"] = req.Revision
  327. vals["namespace"] = req.Namespace
  328. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  329. "http",
  330. service.Name,
  331. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  332. "/logs/pod_values",
  333. vals,
  334. )
  335. rawQuery, err := resp.DoRaw(context.Background())
  336. if err != nil {
  337. return nil, err
  338. }
  339. valsResp := make([]string, 0)
  340. err = json.Unmarshal(rawQuery, &valsResp)
  341. if err != nil {
  342. return nil, err
  343. }
  344. return valsResp, nil
  345. }
  346. func GetRevisionValues(
  347. clientset kubernetes.Interface,
  348. service *v1.Service,
  349. req *types.GetRevisionValuesRequest,
  350. ) ([]string, error) {
  351. vals := make(map[string]string)
  352. if req.StartRange != nil {
  353. startVal, err := req.StartRange.MarshalText()
  354. if err != nil {
  355. return nil, err
  356. }
  357. vals["start_range"] = string(startVal)
  358. }
  359. if req.EndRange != nil {
  360. endVal, err := req.EndRange.MarshalText()
  361. if err != nil {
  362. return nil, err
  363. }
  364. vals["end_range"] = string(endVal)
  365. }
  366. vals["match_prefix"] = req.MatchPrefix
  367. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  368. "http",
  369. service.Name,
  370. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  371. "/logs/revision_values",
  372. vals,
  373. )
  374. rawQuery, err := resp.DoRaw(context.Background())
  375. if err != nil {
  376. return nil, err
  377. }
  378. valsResp := make([]string, 0)
  379. err = json.Unmarshal(rawQuery, &valsResp)
  380. if err != nil {
  381. return nil, err
  382. }
  383. return valsResp, nil
  384. }
  385. func GetHistoricalKubernetesEvents(
  386. clientset kubernetes.Interface,
  387. service *v1.Service,
  388. req *types.GetKubernetesEventRequest,
  389. ) (*types.GetKubernetesEventResponse, error) {
  390. vals := make(map[string]string)
  391. if req.Limit != 0 {
  392. vals["limit"] = fmt.Sprintf("%d", req.Limit)
  393. }
  394. if req.StartRange != nil {
  395. startVal, err := req.StartRange.MarshalText()
  396. if err != nil {
  397. return nil, err
  398. }
  399. vals["start_range"] = string(startVal)
  400. }
  401. if req.EndRange != nil {
  402. endVal, err := req.EndRange.MarshalText()
  403. if err != nil {
  404. return nil, err
  405. }
  406. vals["end_range"] = string(endVal)
  407. }
  408. vals["pod_selector"] = req.PodSelector
  409. vals["namespace"] = req.Namespace
  410. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  411. "http",
  412. service.Name,
  413. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  414. "/events",
  415. vals,
  416. )
  417. rawQuery, err := resp.DoRaw(context.Background())
  418. if err != nil {
  419. return nil, err
  420. }
  421. eventsResp := &types.GetKubernetesEventResponse{}
  422. err = json.Unmarshal(rawQuery, eventsResp)
  423. if err != nil {
  424. return nil, err
  425. }
  426. return eventsResp, nil
  427. }
  428. func GetAgentStatus(
  429. clientset kubernetes.Interface,
  430. service *v1.Service,
  431. ) (*types.GetAgentStatusResponse, error) {
  432. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  433. "http",
  434. service.Name,
  435. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  436. "/status",
  437. nil,
  438. )
  439. rawQuery, err := resp.DoRaw(context.Background())
  440. if err != nil {
  441. return nil, err
  442. }
  443. statusResp := &types.GetAgentStatusResponse{}
  444. err = json.Unmarshal(rawQuery, statusResp)
  445. if err != nil {
  446. return nil, err
  447. }
  448. return statusResp, nil
  449. }