소스 검색

ETL: cascading and multiplexing: port time utilities from dirty branch

Niko Kovacevic 5 년 전
부모
커밋
11f5060e56
1개의 변경된 파일87개의 추가작업 그리고 0개의 파일을 삭제
  1. 87 0
      pkg/util/time.go

+ 87 - 0
pkg/util/time.go

@@ -3,6 +3,7 @@ package util
 import (
 	"fmt"
 	"strconv"
+	"sync"
 	"time"
 )
 
@@ -136,3 +137,89 @@ func normalizeTimeParam(param string) (string, error) {
 
 	return param, nil
 }
+
+// FormatStoreResolution provides a clean notation for ETL store resolutions.
+// e.g. daily => 1d; hourly => 1h
+func FormatStoreResolution(dur time.Duration) string {
+	if dur >= 24*time.Hour {
+		return fmt.Sprintf("%dd", int(dur.Hours()/24.0))
+	} else if dur >= time.Hour {
+		return fmt.Sprintf("%dh", int(dur.Hours()))
+	}
+	return fmt.Sprint(dur)
+}
+
+// JobTicker is a ticker used to synchronize the next run of a repeating
+// process. The designated use-case is for infinitely-looping selects,
+// where a timeout or an exit channel might cancel the process, but otherwise
+// the intent is to wait at the select for some amount of time until the
+// next run. This differs from a standard ticker, which ticks without
+// waiting and drops any missed ticks; rather, this ticker must be kicked
+// off manually for each tick, so that after the current run of the job
+// completes, the timer starts again.
+type JobTicker struct {
+	Ch     <-chan time.Time
+	ch     chan time.Time
+	closed bool
+	mx     sync.Mutex
+}
+
+// NewJobTicker instantiates a new JobTicker.
+func NewJobTicker() *JobTicker {
+	c := make(chan time.Time)
+
+	return &JobTicker{
+		Ch:     c,
+		ch:     c,
+		closed: false,
+	}
+}
+
+// Close closes the JobTicker channels
+func (jt *JobTicker) Close() {
+	jt.mx.Lock()
+	defer jt.mx.Unlock()
+
+	if jt.closed {
+		return
+	}
+
+	jt.closed = true
+	close(jt.ch)
+}
+
+// TickAt schedules the next tick of the ticker for the given time in the
+// future. If the time is not in the future, the ticker will tick immediately.
+func (jt *JobTicker) TickAt(t time.Time) {
+	go func(t time.Time) {
+		n := time.Now()
+		if t.After(n) {
+			time.Sleep(t.Sub(n))
+		}
+
+		jt.mx.Lock()
+		defer jt.mx.Unlock()
+
+		if !jt.closed {
+			jt.ch <- time.Now()
+		}
+	}(t)
+}
+
+// TickIn schedules the next tick of the ticker for the given duration into
+// the future. If the duration is less than or equal to zero, the ticker will
+// tick immediately.
+func (jt *JobTicker) TickIn(d time.Duration) {
+	go func(d time.Duration) {
+		if d > 0 {
+			time.Sleep(d)
+		}
+
+		jt.mx.Lock()
+		defer jt.mx.Unlock()
+
+		if !jt.closed {
+			jt.ch <- time.Now()
+		}
+	}(d)
+}