Kaynağa Gözat

implemented plugin registration mechanism

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
Alex Meijer 2 yıl önce
ebeveyn
işleme
71aaa86a2b
2 değiştirilmiş dosya ile 89 ekleme ve 5 silme
  1. 3 1
      pkg/customcost/ingestor.go
  2. 86 4
      pkg/customcost/pipelineservice.go

+ 3 - 1
pkg/customcost/ingestor.go

@@ -60,10 +60,11 @@ type CustomCostIngestor struct {
 	isStopping   atomic.Bool
 	exitBuildCh  chan string
 	exitRunCh    chan string
+	plugins      map[string]*plugin.ClientProtocol
 }
 
 // NewIngestor is an initializer for ingestor
-func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client) (*CustomCostIngestor, error) {
+func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.ClientProtocol) (*CustomCostIngestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
 	}
@@ -79,6 +80,7 @@ func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Reposi
 		creationTime: now,
 		lastRun:      now,
 		coverage:     opencost.NewClosedWindow(midnight, midnight),
+		plugins:      plugins,
 	}, nil
 }
 

+ 86 - 4
pkg/customcost/pipelineservice.go

@@ -3,12 +3,18 @@ package customcost
 import (
 	"fmt"
 	"net/http"
+	"os"
+	"os/exec"
+	"strings"
 	"time"
 
+	"github.com/hashicorp/go-hclog"
 	"github.com/hashicorp/go-plugin"
 	"github.com/julienschmidt/httprouter"
 	"github.com/opencost/opencost/core/pkg/log"
+	ocplugin "github.com/opencost/opencost/core/pkg/plugin"
 	proto "github.com/opencost/opencost/core/pkg/protocol"
+	"github.com/opencost/opencost/core/pkg/version"
 	"github.com/opencost/opencost/pkg/env"
 )
 
@@ -20,15 +26,91 @@ type PipelineService struct {
 	hourlyStore, dailyStore       Repository
 }
 
-func getRegisteredPlugins(configDir string, execDir string) map[string]*plugin.Client {
-	plugins := map[string]*plugin.Client{}
-	return plugins
+func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.ClientProtocol, error) {
+
+	pluginNames := map[string]string{}
+	// scan plugin config directory for all file names
+	configFiles, err := os.ReadDir(configDir)
+	if err != nil {
+		log.Errorf("error reading files in directory %s: %v", configDir, err)
+	}
+
+	// list of plugins that we must run are the strings before _
+	for _, file := range configFiles {
+		log.Tracef("parsing config file name: %s", file.Name())
+		fileParts := strings.Split(file.Name(), "_")
+
+		if len(fileParts) != 2 || fileParts[1] == "_config.json" {
+			return nil, fmt.Errorf("plugin config file name %s invalid. Config files must have the form <plugin name>_config.json", file.Name())
+		}
+
+		pluginNames[fileParts[0]] = configDir + "/" + file.Name()
+	}
+
+	if len(pluginNames) == 0 {
+		log.Infof("no plugins detected.")
+		return nil, nil
+	}
+
+	log.Infof("requiring plugins matching your architecture: " + version.Architecture)
+	configs := map[string]*plugin.ClientConfig{}
+	// set up the client config
+	for name, config := range pluginNames {
+		if _, err := os.Stat(execDir + "/" + name + ".ocplugin." + version.Architecture); err != nil {
+			msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
+			log.Errorf(msg)
+			return nil, fmt.Errorf(msg)
+		}
+
+		var handshakeConfig = plugin.HandshakeConfig{
+			ProtocolVersion:  1,
+			MagicCookieKey:   "PLUGIN_NAME",
+			MagicCookieValue: name,
+		}
+
+		logger := hclog.New(&hclog.LoggerOptions{
+			Name:   "plugin[" + name + "]",
+			Output: os.Stdout,
+			Level:  hclog.Debug,
+		})
+
+		// pluginMap is the map of plugins we can dispense.
+		var pluginMap = map[string]plugin.Plugin{
+			"CustomCostSource": &ocplugin.CustomCostPlugin{},
+		}
+		configs[name] = &plugin.ClientConfig{
+			HandshakeConfig: handshakeConfig,
+			Plugins:         pluginMap,
+			Cmd:             exec.Command(execDir+"/"+name+".ocplugin."+version.Architecture, config),
+			Logger:          logger,
+		}
+	}
+
+	plugins := map[string]*plugin.ClientProtocol{}
+
+	for name, config := range configs {
+		client := plugin.NewClient(config)
+		// connect the client
+		rpcClient, err := client.Client()
+		if err != nil {
+			log.Errorf("error connecting client for plugin %s: %v", name, err)
+			return nil, fmt.Errorf("error connecting client for plugin %s: %v", name, err)
+		}
+		// add the connected, initialized client to the ma
+		plugins[name] = &rpcClient
+	}
+
+	return plugins, nil
 }
 
 // NewPipelineService is a constructor for a PipelineService
 func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
 
-	registeredPlugins := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
+	registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
+	if err != nil {
+		log.Errorf("error getting registered plugins: %v", err)
+		return nil, fmt.Errorf("error getting registered plugins: %v", err)
+	}
 
 	hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins)
 	if err != nil {