Przeglądaj źródła

Merge branch 'develop' into kaelan-aws-web-identity

Kaelan Patel 2 lat temu
rodzic
commit
e6db2c0aa3

+ 6 - 1
MAINTAINERS.md

@@ -10,7 +10,12 @@ Official list of [OpenCost Maintainers](https://github.com/orgs/opencost/teams/o
 | Artur Khantimirov | @r2k1 | Microsoft | |
 | Matt Bolt | @​mbolt35 | Kubecost | <matt@kubecost.com> |
 | Matt Ray | @mattray | Kubecost | <mattray@kubecost.com> |
-| Michael Dresser | @michaelmdresser | Kubecost | <michael@kubecost.com> |
 | Niko Kovacevic | @nikovacevic | Kubecost | <niko@kubecost.com> |
 | Sean Holcomb | @Sean-Holcomb | Kubecost | <Sean@kubecost.com> |
 | Thomas Evans | @teevans | Kubecost | <thomas@kubecost.com> |
+
+## Opencost Emeritus Committers
+We would like to acknowledge previous committers and their huge contributions to our collective success:
+| Maintainer | GitHub ID | Affiliation | Email |
+| --------------- | --------- | ----------- | ----------- |
+| Michael Dresser | @michaelmdresser | Kubecost | <michaelmdresser@gmail.com> |

+ 2 - 3
core/go.mod

@@ -7,6 +7,7 @@ require (
 	github.com/goccy/go-json v0.9.11
 	github.com/google/go-cmp v0.6.0
 	github.com/hashicorp/go-multierror v1.1.1
+	github.com/hashicorp/go-plugin v1.6.0
 	github.com/json-iterator/go v1.1.12
 	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/rs/zerolog v1.26.1
@@ -14,6 +15,7 @@ require (
 	golang.org/x/exp v0.0.0-20221031165847-c99f073a8326
 	golang.org/x/sync v0.6.0
 	golang.org/x/text v0.14.0
+	google.golang.org/grpc v1.62.0
 	google.golang.org/protobuf v1.32.0
 	k8s.io/api v0.25.3
 	k8s.io/apimachinery v0.25.3
@@ -28,7 +30,6 @@ require (
 	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/hashicorp/errwrap v1.0.0 // indirect
 	github.com/hashicorp/go-hclog v1.6.2 // indirect
-	github.com/hashicorp/go-plugin v1.6.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/hashicorp/yamux v0.1.1 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect
@@ -48,9 +49,7 @@ require (
 	github.com/subosito/gotenv v1.2.0 // indirect
 	golang.org/x/net v0.21.0 // indirect
 	golang.org/x/sys v0.17.0 // indirect
-	google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
-	google.golang.org/grpc v1.62.0 // indirect
 	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect

+ 5 - 10
core/go.sum

@@ -45,6 +45,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
 github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
+github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
+github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
@@ -186,6 +188,8 @@ github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE
 github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
+github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
+github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
 github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
@@ -382,8 +386,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
-golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
 golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -409,8 +411,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
-golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
 golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -462,8 +463,6 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
-golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
 golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -606,8 +605,6 @@ google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6D
 google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
 google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
-google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y=
-google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c h1:NUsgEN92SQQqzfA+YtqYNqYmB3DMMYLlIwUZAQFVFbo=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@@ -644,8 +641,6 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
-google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

+ 90 - 5
core/pkg/util/typeutil/typeutil.go

@@ -3,15 +3,15 @@ package typeutil
 import (
 	"fmt"
 	"reflect"
+	"runtime"
+	"strings"
 )
 
 // TypeOf is a utility that can covert a T type to a package + type name for generic types.
 func TypeOf[T any]() string {
-	var inst T
 	var prefix string
 
-	// get a reflect.Type of a variable with type T
-	t := reflect.TypeOf(inst)
+	t := reflect.TypeFor[T]()
 
 	// pointer types do not carry the adequate type information, so we need to extract the
 	// underlying types until we reach the non-pointer type, we prepend a * each depth
@@ -22,9 +22,94 @@ func TypeOf[T any]() string {
 
 	// this should not be possible, but in the event that it does, we want to be loud about it
 	if t == nil {
-		panic(fmt.Sprintf("Unable to generate a key for type: %+v", reflect.TypeOf(inst)))
+		panic(fmt.Sprintf("failed to locate non-pointer type: %+v", reflect.TypeFor[T]()))
+	}
+
+	name := t.Name()
+
+	// special cases for built-ins struct{} and interface{}
+	if name == "" {
+		name = t.String()
+	}
+
+	// no package path, do not use a / separator
+	if t.PkgPath() == "" {
+		return prefix + name
 	}
 
 	// combine the prefix, package path, and the type name
-	return fmt.Sprintf("%s%s/%s", prefix, t.PkgPath(), t.Name())
+	return fmt.Sprintf("%s%s/%s", prefix, t.PkgPath(), name)
+}
+
+// TypeFor uses type inferencing to accept a value and returns the fully qualified package
+// and type name
+func TypeFor[T any](value T) string {
+	return TypeOf[T]()
+}
+
+// PackageOf is a utility that can return the package name for the type provided.
+func PackageOf[T any]() string {
+	t := reflect.TypeFor[T]()
+
+	for t != nil && t.Kind() == reflect.Pointer {
+		t = t.Elem()
+	}
+
+	// this should not be possible, but in the event that it does, we want to be loud about it
+	if t == nil {
+		panic(fmt.Sprintf("failed to locate package for: %+v", reflect.TypeFor[T]()))
+	}
+
+	return t.PkgPath()
+}
+
+// PackageFor uses type inferencing to accepts a value and returns
+// the package name for the type of the value.
+func PackageFor[T any](value T) string {
+	return PackageOf[T]()
+}
+
+// PackageFromCaller returns the package name of the caller at the specified depth.
+func PackageFromCaller(depth int) string {
+	// get program counter for the first depth caller into this function
+	if pc, _, _, ok := runtime.Caller(depth); ok {
+		f := runtime.FuncForPC(pc)
+		if f == nil {
+			return ""
+		}
+
+		parentPkg := ""
+		funcName := f.Name()
+		pkg := funcName
+
+		// if there are slashes in the fully qualified path, we want to split
+		// everything before the last slash as the parent package, and everything
+		// after is the package + calling convention. If there are no slashes, then
+		// it's a root level package, so it's just package + calling convention
+		slashIndex := strings.LastIndex(funcName, "/")
+		if slashIndex >= 0 {
+			parentPkg = funcName[:slashIndex]
+			pkg = funcName[slashIndex:]
+		}
+
+		// the package + calling convention can be in a few forms, but since we only
+		// care about the package, we can return everything up until a '.'.
+		// We can make a hard assertion here that unless the go spec changes, we can
+		// rely on the function calling convention to have the form <package>.<function>
+		dotIndex := strings.Index(pkg, ".")
+		if dotIndex < 0 {
+			panic("Unable to parse package name from function call convention: " + pkg)
+		}
+
+		// the fully qualified package name is the parent package + resolved caller package
+		return parentPkg + pkg[:dotIndex]
+	}
+	return ""
+}
+
+// CurrentPackage returns the package name of the caller. This is especially handy for automatically
+// generating package scoped tracing identifiers.
+func CurrentPackage() string {
+	// Depth is from: (2) Caller -> (1) CurrentPackage -> (0) PackageFromCaller
+	return PackageFromCaller(2)
 }

+ 123 - 0
core/pkg/util/typeutil/typeutil_test.go

@@ -0,0 +1,123 @@
+package typeutil_test
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/typeutil"
+)
+
+type TestType struct{}
+type GenericTestType[T any] struct{}
+
+type CurrentPackageTester struct{}
+
+func (c *CurrentPackageTester) TestFromInstance(t *testing.T, expected string) {
+	cmp(t, typeutil.CurrentPackage(), expected)
+}
+
+func (c *CurrentPackageTester) TestFromNestedInstance(t *testing.T, expected string) {
+	nested := func() string {
+		return typeutil.CurrentPackage()
+	}
+
+	result := nested()
+	cmp(t, result, expected)
+}
+
+// cmp compares two comparable values and fails the test if they are not equal.
+func cmp[T comparable](t *testing.T, result, expected T) {
+	if result != expected {
+		t.Errorf("Expected: %+v. Got: %+v", expected, result)
+	}
+}
+
+type InterfaceType interface{}
+
+var packageScoped = typeutil.CurrentPackage()
+
+func TestTypeOf(t *testing.T) {
+	const packageName = "github.com/opencost/opencost/core/pkg/util/typeutil_test"
+	const testTypeName = packageName + "/TestType"
+	const genericTestTypeName = packageName + "/GenericTestType"
+	const genericTypeParameterTypeName = packageName + ".GenericTestType"
+	const interfaceTypeName = packageName + "/InterfaceType"
+
+	// Basic Types
+	cmp(t, typeutil.TypeOf[int](), "int")
+	cmp(t, typeutil.TypeOf[int8](), "int8")
+	cmp(t, typeutil.TypeOf[any](), "interface {}")
+	cmp(t, typeutil.TypeOf[interface{}](), "interface {}")
+	cmp(t, typeutil.TypeOf[struct{}](), "struct {}")
+
+	// Specific Types
+	cmp(t, typeutil.TypeOf[TestType](), testTypeName)
+	cmp(t, typeutil.TypeOf[*TestType](), "*"+testTypeName)
+	cmp(t, typeutil.TypeOf[**TestType](), "**"+testTypeName)
+	cmp(t, typeutil.TypeOf[GenericTestType[string]](), genericTestTypeName+"[string]")
+	cmp(t, typeutil.TypeOf[GenericTestType[GenericTestType[string]]](), genericTestTypeName+"["+genericTypeParameterTypeName+"[string]"+"]")
+	cmp(t, typeutil.TypeOf[GenericTestType[*GenericTestType[string]]](), genericTestTypeName+"[*"+genericTypeParameterTypeName+"[string]"+"]")
+	cmp(t, typeutil.TypeOf[GenericTestType[*GenericTestType[map[int][]float64]]](), genericTestTypeName+"[*"+genericTypeParameterTypeName+"[map[int][]float64]"+"]")
+
+	// interface types
+	cmp(t, typeutil.TypeOf[InterfaceType](), interfaceTypeName)
+	cmp(t, typeutil.TypeOf[*InterfaceType](), "*"+interfaceTypeName)
+	cmp(t, typeutil.TypeOf[**InterfaceType](), "**"+interfaceTypeName)
+
+	// TypeFor variants
+	var value any
+	cmp(t, typeutil.TypeFor(value), "interface {}")
+
+	var ivalue InterfaceType
+	cmp(t, typeutil.TypeFor(ivalue), interfaceTypeName)
+
+	var testType **TestType
+	cmp(t, typeutil.TypeFor(testType), "**"+testTypeName)
+}
+
+func DeferredCurrentPackage() (result string) {
+	defer func() {
+		result = typeutil.CurrentPackage()
+	}()
+
+	return
+}
+
+func TestPackageOf(t *testing.T) {
+	const currentPackageName = "github.com/opencost/opencost/core/pkg/util/typeutil_test"
+	const jsonEncoderPackageName = "encoding/json"
+	const opencostPackageName = "github.com/opencost/opencost/core/pkg/opencost"
+
+	cmp(t, typeutil.PackageOf[TestType](), currentPackageName)
+	cmp(t, typeutil.PackageOf[*TestType](), currentPackageName)
+	cmp(t, typeutil.PackageOf[json.Encoder](), jsonEncoderPackageName)
+	cmp(t, typeutil.PackageOf[*opencost.Allocation](), opencostPackageName)
+
+	cmp(t, typeutil.CurrentPackage(), currentPackageName)
+
+	deferredResult := DeferredCurrentPackage()
+	cmp(t, deferredResult, currentPackageName)
+
+	// Tests the CurrentPackage function within an instance function
+	// this will return something like:
+	// "github.com/opencost/opencost/core/pkg/util/typeutil_test.(*CurrentPackageTester).TestFromInstance"
+	new(CurrentPackageTester).TestFromInstance(t, currentPackageName)
+
+	// Tests the CurrentPackage function within an instance function that contains a nested anonymous function
+	// this will return something like:
+	// "github.com/opencost/opencost/core/pkg/util/typeutil_test.(*CurrentPackageTester).TestFromNestedInstance.func1"
+	new(CurrentPackageTester).TestFromNestedInstance(t, currentPackageName)
+
+	// This test the package scoped variable which calls the CurrentPackage function in the package scope
+	// this will normally return something like:
+	// "github.com/opencost/opencost/core/pkg/util/typeutil_test.init"
+	cmp(t, packageScoped, currentPackageName)
+
+	// PackageFor variants
+	var value any
+	cmp(t, typeutil.PackageFor(value), "")
+
+	var ivalue InterfaceType
+	cmp(t, typeutil.PackageFor(ivalue), currentPackageName)
+}

+ 3 - 3
pkg/cloud/azure/storagebillingparser.go

@@ -1,7 +1,6 @@
 package azure
 
 import (
-	"bytes"
 	"context"
 	"encoding/csv"
 	"fmt"
@@ -86,12 +85,13 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 				return err
 			}
 		} else {
-			blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
+			streamReader, err2 := asbp.StreamBlob(blobName, client)
 			if err2 != nil {
 				asbp.ConnectionStatus = cloud.FailedConnection
 				return err2
 			}
-			err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
+
+			err2 = asbp.parseCSV(start, end, csv.NewReader(streamReader), resultFn)
 			if err2 != nil {
 				asbp.ConnectionStatus = cloud.ParseError
 				return err2

+ 6 - 0
pkg/cloud/azure/storageconnection.go

@@ -70,6 +70,12 @@ func (sc *StorageConnection) DownloadBlob(blobName string, client *azblob.Client
 	return downloadedData.Bytes(), nil
 }
 
+// StreamBlob returns an io.Reader for the given blob which uses a re-usable double buffer approach to stream directly
+// from blob storage.
+func (sc *StorageConnection) StreamBlob(blobName string, client *azblob.Client) (*StreamReader, error) {
+	return NewStreamReader(client, sc.Container, blobName)
+}
+
 // DownloadBlobToFile downloads the Azure Billing CSV to a local file
 func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blobName string, client *azblob.Client, ctx context.Context) error {
 	// If file exists, don't download it again

+ 247 - 0
pkg/cloud/azure/streamreader.go

@@ -0,0 +1,247 @@
+package azure
+
+import (
+	"bytes"
+	"context"
+	"io"
+
+	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
+)
+
+const defaultBlockSize = int(8 * 1024 * 1024) // 8MB
+
+// StreamReader is a double buffered streaming reader for Azure Blob Storage.
+type StreamReader struct {
+	client    *azblob.Client
+	container string
+	blobName  string
+	block     *bytes.Buffer
+	next      *streamingBlock
+	position  int64
+	size      int64
+}
+
+// NewStreamReader creates a new streaming reader for the specified blob.
+func NewStreamReader(client *azblob.Client, container string, blobName string) (*StreamReader, error) {
+	sar := &StreamReader{
+		client:    client,
+		container: container,
+		blobName:  blobName,
+		block:     nil,
+		next:      nil,
+		position:  0,
+		size:      0,
+	}
+
+	// get the size of the blob
+	blobClient := client.ServiceClient().NewContainerClient(container).NewBlobClient(blobName)
+	gr, err := blobClient.GetProperties(context.Background(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	sar.size = *gr.ContentLength
+
+	return sar, nil
+}
+
+// See io.Reader.Read
+func (r *StreamReader) Read(p []byte) (n int, err error) {
+	if r.position >= r.size {
+		return 0, io.EOF
+	}
+
+	// fetch the blocks on demand
+	if r.block == nil || r.block.Len() == 0 {
+		err := r.nextBlock()
+		if err != nil {
+			return 0, err
+		}
+	}
+
+	// block.Next() constrains the bytes read even if len(p) is larger
+	// than the rest of the block
+	copied := copy(p, r.block.Next(len(p)))
+	r.position += int64(copied)
+
+	return copied, nil
+}
+
+// nextBlock fetches the next block of data from the blob and starts the download of
+// the next block in the background.
+func (r *StreamReader) nextBlock() error {
+	// if we don't have a block, we need to fetch the first block, and start fetching
+	// the next block in the background
+	if r.block == nil {
+		current := newStreamBlock(
+			r.client,
+			r.container,
+			r.blobName,
+			nil,
+			r.position,
+			int64(defaultBlockSize),
+			r.size,
+		)
+
+		// explicitly wait here for the first block
+		err := current.Wait()
+		if err != nil {
+			return err
+		}
+
+		// set the current block and start the next block download
+		r.block = current.buffer
+
+		// if the block size capacity was reduced to a value different than the default block size,
+		// we can assume there is no more data beyond this block, so we don't need to start the next block
+		if current.capacity != int64(defaultBlockSize) {
+			return nil
+		}
+
+		// start next block stream
+		r.next = newStreamBlock(
+			r.client,
+			r.container,
+			r.blobName,
+			nil,
+			r.position+current.capacity,
+			int64(defaultBlockSize),
+			r.size,
+		)
+
+		return nil
+	}
+
+	// we have a block and a next block, so we need to wait for the next block to finish
+	// buffering, then we can swap current and next buffers
+	err := r.next.Wait()
+	if err != nil {
+		return err
+	}
+
+	// save the current buffer to re-use in the next block and set the current to the next block
+	currentBuffer := r.block
+	r.block = r.next.buffer
+
+	if r.next.capacity != int64(defaultBlockSize) {
+		return nil
+	}
+
+	// start next block stream
+	r.next = newStreamBlock(
+		r.client,
+		r.container,
+		r.blobName,
+		currentBuffer, // recycle the old current buffer as the next buffer
+		r.position+int64(r.block.Cap()),
+		int64(defaultBlockSize),
+		r.size,
+	)
+
+	return nil
+}
+
+// streamingBlock is a buffered block of data that runs in a separate goroutine
+// to allow the next block to download while the current block is being read.
+type streamingBlock struct {
+	client    *azblob.Client
+	container string
+	blob      string
+
+	done   chan struct{}
+	buffer *bytes.Buffer
+	err    error
+
+	start    int64
+	capacity int64
+}
+
+// newStreamBlock creates a new buffered block of data the down the specific
+// range of the blob. While the block download runs in a separate goroutine,
+// we will never attempt to access the passed buffer until after the Wait()
+// returns. This just ensures that we will never attempt to swap buffers
+// mid-download.
+func newStreamBlock(
+	client *azblob.Client,
+	container string,
+	blob string,
+	buffer *bytes.Buffer,
+	start int64,
+	capacity int64,
+	max int64,
+) *streamingBlock {
+	sb := &streamingBlock{
+		client:    client,
+		container: container,
+		blob:      blob,
+		done:      make(chan struct{}),
+		buffer:    buffer,
+		start:     start,
+		capacity:  capacity,
+	}
+
+	// determine if we need to reallocate a new block buffer or if we can re-use the existing storage
+	blockSize := capacity
+	if start+blockSize > max {
+		blockSize = max - start
+	}
+
+	// if the provided buffer is nil or the blockSize is different than the provided capacity, we need to reallocate
+	// reallocation will likely happen once at the end of the stream
+	if sb.buffer == nil || blockSize != capacity {
+		sb.buffer = bytes.NewBuffer(make([]byte, 0, blockSize))
+		sb.capacity = blockSize
+	} else {
+		sb.buffer.Reset()
+	}
+
+	// start a goroutine to fetch the block of data, close the done channel when the block
+	// is fetched or an error occurs
+	go func(block *streamingBlock) {
+		ctx := context.Background()
+
+		opts := azblob.DownloadStreamOptions{
+			Range: azblob.HTTPRange{
+				Offset: block.start,
+				Count:  block.capacity,
+			},
+		}
+
+		resp, err := block.client.DownloadStream(ctx, block.container, block.blob, &opts)
+		if err != nil {
+			block.err = err
+			close(block.done)
+			return
+		}
+
+		retryOpts := &azblob.RetryReaderOptions{
+			MaxRetries: 3,
+		}
+
+		var body io.ReadCloser = resp.NewRetryReader(ctx, retryOpts)
+		_, err = io.Copy(block.buffer, body)
+		if err != nil {
+			block.err = err
+			close(block.done)
+			return
+		}
+
+		err = body.Close()
+		if err != nil {
+			block.err = err
+			close(block.done)
+			return
+		}
+
+		close(block.done)
+	}(sb)
+
+	return sb
+}
+
+// Wait blocks until the block is downloaded and returns any error that occurred.
+func (sb *streamingBlock) Wait() error {
+	<-sb.done
+
+	return sb.err
+}