Просмотр исходного кода

Merge pull request #431 from kubecost/bolt/error-reporting

Cost Model Error Reporting
Matt Bolt 6 лет назад
Родитель
Сommit
7e44256f88

+ 2 - 1
cmd/costmodel/main.go

@@ -5,6 +5,7 @@ import (
 
 	"github.com/julienschmidt/httprouter"
 	"github.com/kubecost/cost-model/pkg/costmodel"
+	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"k8s.io/klog"
 )
@@ -22,5 +23,5 @@ func main() {
 	costmodel.Router.GET("/healthz", Healthz)
 	rootMux.Handle("/", costmodel.Router)
 	rootMux.Handle("/metrics", promhttp.Handler())
-	klog.Fatal(http.ListenAndServe(":9003", rootMux))
+	klog.Fatal(http.ListenAndServe(":9003", errors.PanicHandlerMiddleware(rootMux)))
 }

+ 1 - 7
go.mod

@@ -10,34 +10,28 @@ require (
 	github.com/aws/aws-sdk-go v1.28.9
 	github.com/dimchansky/utfbom v1.1.0 // indirect
 	github.com/etcd-io/bbolt v1.3.3
-	github.com/golang/mock v1.2.0
 	github.com/google/martian v2.1.0+incompatible // indirect
 	github.com/google/uuid v1.1.1
 	github.com/googleapis/gax-go v2.0.2+incompatible // indirect
 	github.com/gophercloud/gophercloud v0.2.0 // indirect
-	github.com/imdario/mergo v0.3.7 // indirect
 	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/lib/pq v1.2.0
-	github.com/mitchellh/go-homedir v1.1.0
 	github.com/patrickmn/go-cache v2.1.0+incompatible
-	github.com/pkg/errors v0.8.1 // indirect
 	github.com/prometheus/client_golang v1.0.0
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
 	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
 	go.etcd.io/bbolt v1.3.3 // indirect
-	golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529 // indirect
-	golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac // indirect
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
 	golang.org/x/sync v0.0.0-20190423024810-112230192c58
 	google.golang.org/api v0.4.0
-	gotest.tools v2.2.0+incompatible
 	k8s.io/api v0.0.0-20190913080256-21721929cffa
 	k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6
 	k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
 	k8s.io/klog v0.4.0
 	sigs.k8s.io/yaml v1.1.0
+	github.com/getsentry/sentry-go v0.6.1
 )
 
 go 1.13

+ 9 - 0
pkg/cloud/awsprovider.go

@@ -20,6 +20,7 @@ import (
 	"k8s.io/klog"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/util"
 
 	"github.com/aws/aws-sdk-go/aws"
@@ -563,6 +564,8 @@ func (aws *AWS) DownloadPricingData() error {
 			klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
 		} else { // If we make one successful run, check on new reservation data every hour
 			go func() {
+				defer errors.HandlePanic()
+
 				for {
 					aws.RIDataRunning = true
 					klog.Infof("Reserved Instance watcher running... next update in 1h")
@@ -1132,6 +1135,7 @@ func (a *AWS) GetAddresses() ([]byte, error) {
 		// respective channels
 		go func(region string) {
 			defer wg.Done()
+			defer errors.HandlePanic()
 
 			// Query for first page of volume results
 			resp, err := a.getAddressesForRegion(region)
@@ -1153,6 +1157,8 @@ func (a *AWS) GetAddresses() ([]byte, error) {
 
 	// Close the result channels after everything has been sent
 	go func() {
+		defer errors.HandlePanic()
+
 		wg.Wait()
 		close(errorCh)
 		close(addressCh)
@@ -1216,6 +1222,7 @@ func (a *AWS) GetDisks() ([]byte, error) {
 		// respective channels
 		go func(region string) {
 			defer wg.Done()
+			defer errors.HandlePanic()
 
 			// Query for first page of volume results
 			resp, err := a.getDisksForRegion(region, 1000, nil)
@@ -1256,6 +1263,8 @@ func (a *AWS) GetDisks() ([]byte, error) {
 
 	// Close the result channels after everything has been sent
 	go func() {
+		defer errors.HandlePanic()
+
 		wg.Wait()
 		close(errorCh)
 		close(volumeCh)

+ 4 - 1
pkg/costmodel/cluster.go

@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
@@ -40,7 +41,7 @@ const (
 // TODO move this to a package-accessible helper
 type PromQueryContext struct {
 	Client         prometheus.Client
-	ErrorCollector *util.ErrorCollector
+	ErrorCollector *errors.ErrorCollector
 	WaitGroup      *sync.WaitGroup
 }
 
@@ -51,6 +52,8 @@ func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQuer
 		defer ctx.WaitGroup.Done()
 	}
 
+	defer errors.HandlePanic()
+
 	raw, promErr := Query(ctx.Client, query)
 	ctx.ErrorCollector.Report(promErr)
 

+ 35 - 38
pkg/costmodel/costmodel.go

@@ -14,6 +14,7 @@ import (
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheusClient "github.com/prometheus/client_golang/api"
@@ -122,42 +123,6 @@ func (cd *CostData) GetController() (name string, kind string, hasController boo
 	return name, kind, hasController
 }
 
-// Error collection helper
-type ErrorCollector struct {
-	m      sync.Mutex
-	errors []error
-}
-
-// Reports an error to the collector. Ignores if the error is nil.
-func (ec *ErrorCollector) Report(e error) {
-	if e == nil {
-		return
-	}
-
-	ec.m.Lock()
-	defer ec.m.Unlock()
-
-	ec.errors = append(ec.errors, e)
-}
-
-// Whether or not the collector caught errors
-func (ec *ErrorCollector) IsError() bool {
-	ec.m.Lock()
-	defer ec.m.Unlock()
-
-	return len(ec.errors) > 0
-}
-
-// Errors caught by the collector
-func (ec *ErrorCollector) Errors() []error {
-	ec.m.Lock()
-	defer ec.m.Unlock()
-
-	errs := make([]error, len(ec.errors))
-	copy(errs, ec.errors)
-	return errs
-}
-
 const (
 	queryRAMRequestsStr = `avg(
 		label_replace(
@@ -370,10 +335,11 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var wg sync.WaitGroup
 	wg.Add(11)
 
-	var ec ErrorCollector
+	var ec errors.ErrorCollector
 	var resultRAMRequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultRAMRequests, promErr = Query(cli, queryRAMRequests)
@@ -386,6 +352,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultRAMUsage interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultRAMUsage, promErr = Query(cli, queryRAMUsage)
@@ -397,6 +364,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultCPURequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultCPURequests, promErr = Query(cli, queryCPURequests)
@@ -408,6 +376,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultCPUUsage interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultCPUUsage, promErr = Query(cli, queryCPUUsage)
@@ -419,6 +388,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultGPURequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultGPURequests, promErr = Query(cli, queryGPURequests)
@@ -430,6 +400,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultPVRequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultPVRequests, promErr = Query(cli, queryPVRequests)
@@ -441,6 +412,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultNetZoneRequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultNetZoneRequests, promErr = Query(cli, queryNetZoneRequests)
@@ -452,6 +424,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultNetRegionRequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultNetRegionRequests, promErr = Query(cli, queryNetRegionRequests)
@@ -463,6 +436,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var resultNetInternetRequests interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultNetInternetRequests, promErr = Query(cli, queryNetInternetRequests)
@@ -474,6 +448,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var normalizationResult interface{}
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		var promErr error
 		normalizationResult, promErr = Query(cli, normalization)
@@ -490,6 +465,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var k8sErr error
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
@@ -1772,11 +1748,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	queryProfileStart := time.Now()
 	queryProfileCh := make(chan string, numQueries)
 
-	var ec ErrorCollector
+	var ec errors.ErrorCollector
 	var resultRAMRequests interface{}
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "RAMRequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
@@ -1789,6 +1766,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "RAMUsage", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
@@ -1801,6 +1779,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "CPURequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
@@ -1813,6 +1792,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "CPUUsage", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
@@ -1825,6 +1805,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "RAMAllocations", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultRAMAllocations, promErr = QueryRange(cli, queryRAMAlloc, start, end, window)
@@ -1837,6 +1818,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "CPUAllocations", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultCPUAllocations, promErr = QueryRange(cli, queryCPUAlloc, start, end, window)
@@ -1849,6 +1831,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "GPURequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
@@ -1861,6 +1844,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVRequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
@@ -1873,6 +1857,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "NetZoneRequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultNetZoneRequests, promErr = QueryRange(cli, queryNetZoneRequests, start, end, window)
@@ -1885,6 +1870,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "NetRegionRequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultNetRegionRequests, promErr = QueryRange(cli, queryNetRegionRequests, start, end, window)
@@ -1897,6 +1883,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "NetInternetRequests", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
@@ -1909,6 +1896,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVPodAllocation", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		pvPodAllocationResults, promErr = QueryRange(cli, queryPVCAllocation, start, end, window)
@@ -1921,6 +1909,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVCost", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		pvCostResults, promErr = QueryRange(cli, queryPVHourlyCost, start, end, window)
@@ -1933,6 +1922,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "NSLabels", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
@@ -1945,6 +1935,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "PodLabels", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
@@ -1957,6 +1948,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "ServiceLabels", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
@@ -1969,6 +1961,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "DeploymentLabels", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
@@ -1981,6 +1974,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "Daemonsets", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		daemonsetResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodDaemonsets), start, end, window)
@@ -1993,6 +1987,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "StatefulSetLabels", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		statefulsetLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
@@ -2005,6 +2000,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	go func() {
 		defer wg.Done()
 		defer measureTimeAsync(time.Now(), profileThreshold, "Normalization", queryProfileCh)
+		defer errors.HandlePanic()
 
 		var promErr error
 		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
@@ -2022,6 +2018,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	var k8sErr error
 	go func() {
 		defer wg.Done()
+		defer errors.HandlePanic()
 
 		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {

+ 45 - 1
pkg/costmodel/router.go

@@ -17,9 +17,13 @@ import (
 	"k8s.io/klog"
 
 	"github.com/julienschmidt/httprouter"
+
+	sentry "github.com/getsentry/sentry-go"
+
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
+	"github.com/kubecost/cost-model/pkg/errors"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -634,6 +638,8 @@ func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request,
 
 func (a *Accesses) recordPrices() {
 	go func() {
+		defer errors.HandlePanic()
+
 		containerSeen := make(map[string]bool)
 		nodeSeen := make(map[string]bool)
 		pvSeen := make(map[string]bool)
@@ -873,12 +879,50 @@ type ConfigWatchers struct {
 	WatchFunc     func(string, map[string]string) error
 }
 
+// handle any panics reported by the errors package
+func handlePanic(p errors.Panic) bool {
+	err := p.Error
+
+	if err != nil {
+		if err, ok := err.(error); ok {
+			sentry.CurrentHub().CaptureException(err)
+			sentry.Flush(5 * time.Second)
+		}
+
+		if err, ok := err.(string); ok {
+			msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, p.Stack)
+			sentry.CurrentHub().CaptureEvent(&sentry.Event{
+				Level:   sentry.LevelError,
+				Message: msg,
+			})
+			sentry.Flush(5 * time.Second)
+		}
+	}
+
+	// Return true to recover iff the type is http, otherwise allow kubernetes
+	// to recover.
+	return p.Type == errors.PanicTypeHTTP
+}
+
 func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	klog.InitFlags(nil)
 	flag.Set("v", "3")
 	flag.Parse()
 	klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
 
+	var err error
+	if errorReportingEnabled {
+		err = sentry.Init(sentry.ClientOptions{Release: gitCommit})
+		if err != nil {
+			klog.Infof("Failed to initialize sentry for error reporting")
+		} else {
+			err = errors.SetPanicHandler(handlePanic)
+			if err != nil {
+				klog.Infof("Failed to set panic handler: %s", err)
+			}
+		}
+	}
+
 	address := os.Getenv(prometheusServerEndpointEnvVar)
 	if address == "" {
 		klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
@@ -900,7 +944,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	promCli, _ := prometheusClient.NewClient(pc)
 
 	api := prometheusAPI.NewAPI(promCli)
-	_, err := api.Config(context.Background())
+	_, err = api.Config(context.Background())
 	if err != nil {
 		klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
 	}

+ 1 - 1
pkg/util/errors.go → pkg/errors/errors.go

@@ -1,4 +1,4 @@
-package util
+package errors
 
 import "sync"
 

+ 122 - 0
pkg/errors/panic.go

@@ -0,0 +1,122 @@
+package errors
+
+import (
+	"fmt"
+	"net/http"
+	"runtime"
+)
+
+//--------------------------------------------------------------------------
+//  PanicType
+//--------------------------------------------------------------------------
+
+// PanicType defines the context in which the panic occurred
+type PanicType int
+
+const (
+	PanicTypeDefault PanicType = iota
+	PanicTypeHTTP
+)
+
+// The string representation of PanicContext
+func (pt PanicType) String() string {
+	return []string{"PanicTypeDefault", "PanicTypeHTTP"}[pt]
+}
+
+//--------------------------------------------------------------------------
+//  Panic
+//--------------------------------------------------------------------------
+
+// Panic represents a panic that occurred, captured by a recovery.
+type Panic struct {
+	Error interface{}
+	Stack string
+	Type  PanicType
+}
+
+// PanicHandler is a func that receives a Panic and returns a bool representing whether or not
+// the panic should recover or not.
+type PanicHandler = func(p Panic) bool
+
+var (
+	enabled    = false
+	dispatcher = make(chan Panic)
+)
+
+// SetPanicHandler sets the handler that is executed when any panic is captured by
+// HandlePanic(). Without setting a handler, the panic reporting is disabled.
+func SetPanicHandler(handler PanicHandler) error {
+	if enabled {
+		return fmt.Errorf("Panic Handler has already been set")
+	}
+
+	enabled = true
+
+	// Setup a go routine which receives via the panic channel, passes
+	// resulting Panic to the handler passed.
+	go func() {
+		for {
+			p := <-dispatcher
+
+			// If we do not wish to recover, panic using same error
+			if !handler(p) {
+				panic(p.Error)
+			}
+		}
+	}()
+
+	return nil
+}
+
+// PanicHandlerMiddleware should wrap any of the http handlers to capture panics.
+func PanicHandlerMiddleware(handler http.Handler) http.Handler {
+	return http.HandlerFunc(func(rw http.ResponseWriter, rq *http.Request) {
+		defer HandleHTTPPanic(rw, rq)
+
+		handler.ServeHTTP(rw, rq)
+	})
+}
+
+// HandlePanic should be executed in a deferred method (or deferred directly). It will
+// capture any panics that occur in the goroutine it exists, and report to the registered
+// global panic handler.
+func HandlePanic() {
+	// NOTE: For each "special" type of panic that is added, you must repeat this pattern. The recover()
+	// NOTE: call cannot exist in a func outside of the deferred func.
+	if !enabled {
+		return
+	}
+
+	if err := recover(); err != nil {
+		dispatch(err, PanicTypeDefault)
+	}
+}
+
+// HandleHTTPPanic should be executed in a deferred method (or deferred directly) in http middleware.
+// It will capture any panics that occur in the goroutine it exists, and report to the registered
+// global panic handler. HTTP handler panics will have the errors.PanicTypeHTTP Type.
+func HandleHTTPPanic(rw http.ResponseWriter, rq *http.Request) {
+	// NOTE: For each "special" type of panic that is added, you must repeat this pattern. The recover()
+	// NOTE: call cannot exist in a func outside of the deferred func.
+	if !enabled {
+		return
+	}
+
+	if err := recover(); err != nil {
+		rw.WriteHeader(http.StatusInternalServerError)
+
+		dispatch(err, PanicTypeHTTP)
+	}
+}
+
+// generate stacktrace, dispatch the panic via channel
+func dispatch(err interface{}, panicType PanicType) {
+	stack := make([]byte, 1024*8)
+	stack = stack[:runtime.Stack(stack, false)]
+
+	dispatcher <- Panic{
+		Error: err,
+		Stack: string(stack),
+		Type:  panicType,
+	}
+}

+ 5 - 2
pkg/prom/query.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"net/http"
 
+	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
@@ -20,13 +21,13 @@ const (
 // parsing query responses and errors.
 type Context struct {
 	Client         prometheus.Client
-	ErrorCollector *util.ErrorCollector
+	ErrorCollector *errors.ErrorCollector
 	semaphore      *util.Semaphore
 }
 
 // NewContext creates a new Promethues querying context from the given client
 func NewContext(client prometheus.Client) *Context {
-	var ec util.ErrorCollector
+	var ec errors.ErrorCollector
 
 	// By deafult, allow 20 concurrent queries, which is the Prometheus default
 	sem := util.NewSemaphore(20)
@@ -66,6 +67,8 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
 	go func(ctx *Context, resCh QueryResultsChan) {
+		defer errors.HandlePanic()
+
 		raw, promErr := ctx.query(query)
 		ctx.ErrorCollector.Report(promErr)