Sfoglia il codice sorgente

Fix the parsing of spotInfo and its Version info

Previously it was assuming the Version column of the spot feed TSV contained the version of the feed data, but it turns that column is the file number within the publication hour

This also takes the opportunity to switch from a `[]string` index decoding system over to the actual TSV fields mapping into the struct as was the original intention. It seems the missing magic sauce was the `.FieldsPerRecord` in `csvReader`, without which `csvReader` returns `csv.ErrFieldCount` causing `csvutil` to similarly bail out, meaning the struct fields were never assigned
Matthew L Daniel 7 anni fa
parent
commit
722b387a12
1 ha cambiato i file con 37 aggiunte e 23 eliminazioni
  1. 37 23
      cloud/awsprovider.go

+ 37 - 23
cloud/awsprovider.go

@@ -793,6 +793,13 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 		keys = append(keys, obj.Key)
 	}
 
+	versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
+	header, err := csvutil.Header(spotInfo{}, "csv")
+	if err != nil {
+		return nil, err
+	}
+	fieldsPerRecord := len(header)
+
 	spots := make(map[string]*spotInfo)
 	for _, key := range keys {
 		getObj := &s3.GetObjectInput{
@@ -815,38 +822,45 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 
 		csvReader := csv.NewReader(gr)
 		csvReader.Comma = '\t'
-		header, err := csvutil.Header(spotInfo{}, "csv")
-		if err != nil {
-			return nil, err
-		}
+		csvReader.FieldsPerRecord = fieldsPerRecord
+
 		dec, err := csvutil.NewDecoder(csvReader, header...)
 		if err != nil {
 			return nil, err
 		}
 
+		var foundVersion string
 		for {
-			if err := dec.Decode(&spotInfo{}); err == io.EOF {
+			spot := spotInfo{}
+			err := dec.Decode(&spot)
+			if err == io.EOF {
 				break
-			}
-			st := dec.Record()
-			if len(st) == 9 { // it's tab separated but not quite a tsv
-				klog.V(3).Infof("Found spot info %+v", st)
-				spot := &spotInfo{
-					Timestamp:   st[0],
-					UsageType:   st[1],
-					Operation:   st[2],
-					InstanceID:  st[3],
-					MyBidID:     st[4],
-					MyMaxPrice:  st[5],
-					MarketPrice: st[6],
-					Charge:      st[7],
-					Version:     st[8],
-				}
-				if spot.Version != supportedSpotFeedVersion {
-					klog.V(1).Infof("Possibly unsupported spot data feed version " + spot.Version)
+			} else if err == csvutil.ErrFieldCount {
+				rec := dec.Record()
+				// the first two "Record()" will be the comment lines
+				// the first of which is "#Version"
+				if len(rec) == 1 && len(foundVersion) == 0 {
+					spotFeedVersion := rec[0]
+					klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
+					matches := versionRx.FindStringSubmatch(spotFeedVersion)
+					if matches != nil {
+						foundVersion = matches[1]
+						if foundVersion != supportedSpotFeedVersion {
+							klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
+							break
+						}
+					}
+					continue
 				}
-				spots[spot.InstanceID] = spot
+				klog.V(3).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
+				continue
+			} else if err != nil {
+				klog.V(2).Infof("Error during spot info decode: %s", err)
+				continue
 			}
+
+			klog.V(3).Infof("Found spot info %+v", spot)
+			spots[spot.InstanceID] = &spot
 		}
 		gr.Close()
 	}