2
0

recommender.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. //go:build ee
  2. /*
  3. === Recommender Job ===
  4. This job checks to see if a cluster matches policies set by the OPA config file.
  5. */
  6. package jobs
  7. import (
  8. "errors"
  9. "fmt"
  10. "log"
  11. "time"
  12. "github.com/mitchellh/mapstructure"
  13. "github.com/porter-dev/porter/api/server/shared/config/env"
  14. "github.com/porter-dev/porter/api/server/shared/requestutils"
  15. "github.com/porter-dev/porter/api/types"
  16. "github.com/porter-dev/porter/ee/integrations/vault"
  17. "github.com/porter-dev/porter/internal/encryption"
  18. "github.com/porter-dev/porter/internal/kubernetes"
  19. "github.com/porter-dev/porter/internal/models"
  20. "github.com/porter-dev/porter/internal/oauth"
  21. "github.com/porter-dev/porter/internal/opa"
  22. "github.com/porter-dev/porter/internal/repository"
  23. rcreds "github.com/porter-dev/porter/internal/repository/credentials"
  24. rgorm "github.com/porter-dev/porter/internal/repository/gorm"
  25. "golang.org/x/oauth2"
  26. "gorm.io/gorm"
  27. )
  28. type recommender struct {
  29. enqueueTime time.Time
  30. db *gorm.DB
  31. repo repository.Repository
  32. doConf *oauth2.Config
  33. clusterAndProjectIDs []clusterAndProjectID
  34. categories []string
  35. policies *opa.KubernetesPolicies
  36. runRecommenderID string
  37. }
  38. // RecommenderOpts holds the options required to run this job
  39. type RecommenderOpts struct {
  40. DBConf *env.DBConf
  41. DOClientID string
  42. DOClientSecret string
  43. DOScopes []string
  44. ServerURL string
  45. LegacyProjectIDs []uint
  46. Input map[string]interface{}
  47. }
  48. type recommenderInput struct {
  49. Projects []uint `mapstructure:"projects"`
  50. ClusterID uint `mapstructure:"cluster_id"`
  51. Priority string `mapstructure:"priority"`
  52. Categories []string `mapstructure:"categories"`
  53. }
  54. type clusterAndProjectID struct {
  55. clusterID uint
  56. projectID uint
  57. }
  58. func NewRecommender(
  59. db *gorm.DB,
  60. enqueueTime time.Time,
  61. opts *RecommenderOpts,
  62. opaPolicies *opa.KubernetesPolicies,
  63. ) (*recommender, error) {
  64. var credBackend rcreds.CredentialStorage
  65. if opts.DBConf.VaultAPIKey != "" && opts.DBConf.VaultServerURL != "" && opts.DBConf.VaultPrefix != "" {
  66. credBackend = vault.NewClient(
  67. opts.DBConf.VaultServerURL,
  68. opts.DBConf.VaultAPIKey,
  69. opts.DBConf.VaultPrefix,
  70. )
  71. }
  72. var key [32]byte
  73. for i, b := range []byte(opts.DBConf.EncryptionKey) {
  74. key[i] = b
  75. }
  76. repo := rgorm.NewRepository(db, &key, credBackend)
  77. doConf := oauth.NewDigitalOceanClient(&oauth.Config{
  78. ClientID: opts.DOClientID,
  79. ClientSecret: opts.DOClientSecret,
  80. Scopes: opts.DOScopes,
  81. BaseURL: opts.ServerURL,
  82. })
  83. // parse input
  84. parsedInput := &recommenderInput{}
  85. err := mapstructure.Decode(opts.Input, parsedInput)
  86. if err != nil {
  87. return nil, err
  88. }
  89. // validate
  90. validator := requestutils.NewDefaultValidator()
  91. if requestErr := validator.Validate(parsedInput); requestErr != nil {
  92. return nil, fmt.Errorf(requestErr.Error())
  93. }
  94. clusterIDs, err := getClustersToParse(db, repo.Cluster(), parsedInput, opts.LegacyProjectIDs)
  95. if err != nil {
  96. return nil, err
  97. }
  98. recommenderID, err := encryption.GenerateRandomBytes(32)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return &recommender{
  103. enqueueTime, db, repo, doConf, clusterIDs, parsedInput.Categories, opaPolicies, string(recommenderID),
  104. }, nil
  105. }
  106. func getClustersToParse(db *gorm.DB, clusterRepo repository.ClusterRepository, input *recommenderInput, legacyProjects []uint) ([]clusterAndProjectID, error) {
  107. // if the project and cluster ID is set, make sure that the project id matches the cluster's
  108. // project id
  109. if input.ClusterID != 0 {
  110. if len(input.Projects) != 1 {
  111. return nil, fmt.Errorf("if cluster ID is passed, you must pass the matching project ID")
  112. }
  113. _, err := clusterRepo.ReadCluster(input.Projects[0], input.ClusterID)
  114. if err != nil {
  115. return nil, err
  116. }
  117. return []clusterAndProjectID{{
  118. clusterID: input.ClusterID,
  119. projectID: input.Projects[0],
  120. }}, nil
  121. }
  122. // if there are no projects set, query for all clusters within the relevant projects
  123. clusters := make([]*models.Cluster, 0)
  124. query := db.Where(`clusters.project_id IN (?) OR clusters.project_id IN (
  125. SELECT p2.id FROM projects AS p2
  126. INNER JOIN project_usages ON p2.id=project_usages.project_id
  127. WHERE project_usages.resource_cpu != 10 AND project_usages.resource_memory != 20000 AND project_usages.clusters != 1 AND project_usages.users != 1
  128. )`, legacyProjects)
  129. if err := query.Find(&clusters).Error; err != nil {
  130. return nil, err
  131. }
  132. res := make([]clusterAndProjectID, 0)
  133. for _, cluster := range clusters {
  134. res = append(res, clusterAndProjectID{
  135. clusterID: cluster.ID,
  136. projectID: cluster.ProjectID,
  137. })
  138. }
  139. return res, nil
  140. }
  141. func (n *recommender) ID() string {
  142. return "recommender"
  143. }
  144. func (n *recommender) EnqueueTime() time.Time {
  145. return n.enqueueTime
  146. }
  147. func (n *recommender) Run() error {
  148. for _, ids := range n.clusterAndProjectIDs {
  149. fmt.Println(ids.projectID, ids.clusterID)
  150. cluster, err := n.repo.Cluster().ReadCluster(ids.projectID, ids.clusterID)
  151. if err != nil {
  152. log.Printf("error reading cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  153. continue
  154. }
  155. k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
  156. Cluster: cluster,
  157. Repo: n.repo,
  158. DigitalOceanOAuth: n.doConf,
  159. AllowInClusterConnections: false,
  160. Timeout: 5 * time.Second,
  161. })
  162. if err != nil {
  163. log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  164. continue
  165. }
  166. dynamicClient, err := kubernetes.GetDynamicClientOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
  167. Cluster: cluster,
  168. Repo: n.repo,
  169. DigitalOceanOAuth: n.doConf,
  170. AllowInClusterConnections: false,
  171. })
  172. if err != nil {
  173. log.Printf("error getting dynamic client for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  174. continue
  175. }
  176. runner := opa.NewRunner(n.policies, cluster, k8sAgent, dynamicClient)
  177. queryResults, err := runner.GetRecommendations(n.categories)
  178. if err != nil {
  179. log.Printf("error querying opa policies for cluster ID %d: %v. skipping cluster ...", ids.clusterID, err)
  180. continue
  181. }
  182. for _, queryRes := range queryResults {
  183. fmt.Println(queryRes.ObjectID, queryRes.Allow, queryRes.PolicyTitle, queryRes.PolicyMessage)
  184. monitor, err := n.repo.MonitorTestResult().ReadMonitorTestResult(ids.projectID, ids.clusterID, queryRes.ObjectID)
  185. if err != nil {
  186. if errors.Is(err, gorm.ErrRecordNotFound) {
  187. monitor, err = n.repo.MonitorTestResult().CreateMonitorTestResult(n.getMonitorTestResultFromQueryResult(cluster, queryRes, n.runRecommenderID))
  188. } else {
  189. continue
  190. }
  191. } else {
  192. monitor, err = n.repo.MonitorTestResult().UpdateMonitorTestResult(mergeMonitorTestResultFromQueryResult(monitor, queryRes, n.runRecommenderID))
  193. }
  194. if err != nil {
  195. continue
  196. }
  197. }
  198. err = n.repo.MonitorTestResult().ArchiveMonitorTestResults(ids.projectID, ids.clusterID, n.runRecommenderID)
  199. if err != nil {
  200. log.Printf("error archiving test results for cluster ID %d: %v", ids.clusterID, err)
  201. continue
  202. }
  203. err = n.repo.MonitorTestResult().DeleteOldMonitorTestResults(ids.projectID, ids.clusterID, n.runRecommenderID)
  204. if err != nil {
  205. log.Printf("error deleting old test results for cluster ID %d: %v", ids.clusterID, err)
  206. continue
  207. }
  208. }
  209. return nil
  210. }
  211. func (n *recommender) getMonitorTestResultFromQueryResult(cluster *models.Cluster, queryRes *opa.OPARecommenderQueryResult, recommenderID string) *models.MonitorTestResult {
  212. runResult := types.MonitorTestStatusSuccess
  213. if !queryRes.Allow {
  214. runResult = types.MonitorTestStatusFailed
  215. }
  216. currTime := time.Now()
  217. return &models.MonitorTestResult{
  218. ProjectID: cluster.ProjectID,
  219. ClusterID: cluster.ID,
  220. Category: queryRes.CategoryName,
  221. ObjectID: queryRes.ObjectID,
  222. LastStatusChange: &currTime,
  223. LastTested: &currTime,
  224. LastRunResult: string(runResult),
  225. LastRunResultEnum: models.GetLastRunResultEnum(string(runResult)),
  226. LastRecommenderRunID: recommenderID,
  227. Title: queryRes.PolicyTitle,
  228. Message: queryRes.PolicyMessage,
  229. Severity: queryRes.PolicySeverity,
  230. SeverityEnum: models.GetSeverityEnum(queryRes.PolicySeverity),
  231. Archived: false,
  232. }
  233. }
  234. func mergeMonitorTestResultFromQueryResult(monitor *models.MonitorTestResult, queryRes *opa.OPARecommenderQueryResult, recommenderID string) *models.MonitorTestResult {
  235. runResult := types.MonitorTestStatusSuccess
  236. if !queryRes.Allow {
  237. runResult = types.MonitorTestStatusFailed
  238. }
  239. currTime := time.Now()
  240. if isStatusChange := monitor.LastRunResult == string(runResult); isStatusChange {
  241. monitor.LastStatusChange = &currTime
  242. }
  243. monitor.LastTested = &currTime
  244. monitor.LastRunResult = string(runResult)
  245. monitor.Title = queryRes.PolicyTitle
  246. monitor.Message = queryRes.PolicyMessage
  247. monitor.Severity = queryRes.PolicySeverity
  248. monitor.SeverityEnum = models.GetSeverityEnum(queryRes.PolicySeverity)
  249. monitor.LastRunResultEnum = models.GetLastRunResultEnum(string(runResult))
  250. monitor.LastRecommenderRunID = recommenderID
  251. monitor.Archived = false
  252. return monitor
  253. }
  254. func (n *recommender) SetData([]byte) {}