logs.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package porter_agent
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. v1 "k8s.io/api/core/v1"
  7. "k8s.io/client-go/kubernetes"
  8. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  9. )
  10. // returns the agent service
  11. func GetAgentService(clientset kubernetes.Interface) (*v1.Service, error) {
  12. return clientset.CoreV1().Services("porter-agent-system").Get(
  13. context.TODO(),
  14. "porter-agent-controller-manager",
  15. metav1.GetOptions{},
  16. )
  17. }
  18. type SimpleIngress struct {
  19. Name string `json:"name"`
  20. Namespace string `json:"namespace"`
  21. }
  22. type LogPathOpts struct {
  23. Timestamp int
  24. Pod string
  25. Namespace string
  26. }
  27. func GetLogsFromPorterAgent(
  28. clientset kubernetes.Interface,
  29. service *v1.Service,
  30. opts *LogPathOpts,
  31. ) (*AgentLogsResp, error) {
  32. if len(service.Spec.Ports) == 0 {
  33. return nil, fmt.Errorf("agent service has no exposed ports to query")
  34. }
  35. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  36. "http",
  37. service.Name,
  38. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  39. fmt.Sprintf("/pod/%s/ns/%s/logbucket/%d", opts.Pod, opts.Namespace, opts.Timestamp),
  40. nil,
  41. )
  42. rawQuery, err := resp.DoRaw(context.TODO())
  43. if err != nil {
  44. return nil, err
  45. }
  46. return parseLogQuery(rawQuery)
  47. }
  48. type LogBucketPathOpts struct {
  49. Pod string
  50. Namespace string
  51. }
  52. func GetLogBucketsFromPorterAgent(
  53. clientset kubernetes.Interface,
  54. service *v1.Service,
  55. opts *LogBucketPathOpts,
  56. ) (*AgentLogBucketsResp, error) {
  57. if len(service.Spec.Ports) == 0 {
  58. return nil, fmt.Errorf("agent service has no exposed ports to query")
  59. }
  60. resp := clientset.CoreV1().Services(service.Namespace).ProxyGet(
  61. "http",
  62. service.Name,
  63. fmt.Sprintf("%d", service.Spec.Ports[0].Port),
  64. fmt.Sprintf("/pod/%s/ns/%s/logbucket", opts.Pod, opts.Namespace),
  65. nil,
  66. )
  67. rawQuery, err := resp.DoRaw(context.TODO())
  68. if err != nil {
  69. return nil, err
  70. }
  71. return parseLogBucketsQuery(rawQuery)
  72. }
  73. type AgentLogsResp struct {
  74. Logs []string `json:"logs"`
  75. MatchedBucket string `json:"matchedBucket"`
  76. Error string `json:"error"`
  77. }
  78. func parseLogQuery(rawQuery []byte) (*AgentLogsResp, error) {
  79. resp := &AgentLogsResp{}
  80. err := json.Unmarshal(rawQuery, resp)
  81. if err != nil {
  82. return nil, err
  83. }
  84. return resp, nil
  85. }
  86. type AgentLogBucketsResp struct {
  87. AvailableBuckets []string `json:"availableLogBuckets"`
  88. Error string `json:"error"`
  89. }
  90. func parseLogBucketsQuery(rawQuery []byte) (*AgentLogBucketsResp, error) {
  91. resp := &AgentLogBucketsResp{}
  92. err := json.Unmarshal(rawQuery, resp)
  93. if err != nil {
  94. return nil, err
  95. }
  96. return resp, nil
  97. }