Skip to content

Commit a7fdab8

Browse files
committed
[FLINK-30090] Support timespan for TimerGauges
1 parent 026a53e commit a7fdab8

2 files changed

Lines changed: 92 additions & 6 deletions

File tree

flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,21 @@
3232
* happen in a couple of hours, the returned value will account for this ongoing measurement.
3333
*/
3434
public class TimerGauge implements Gauge<Long>, View {
35+
36+
private static final int DEFAULT_TIME_SPAN_IN_SECONDS = 60;
37+
3538
private final Clock clock;
3639

37-
private long previousCount;
40+
/** The time-span over which the average is calculated. */
41+
private final int timeSpanInSeconds;
42+
/** Circular array containing the history of values. */
43+
private final long[] values;
44+
/** The index in the array for the current time. */
45+
private int idx = 0;
46+
47+
private boolean fullWindow = false;
48+
49+
private long currentValue;
3850
private long currentCount;
3951
private long currentMeasurementStartTS;
4052
/**
@@ -50,11 +62,24 @@ public class TimerGauge implements Gauge<Long>, View {
5062
private long accumulatedCount;
5163

5264
public TimerGauge() {
53-
this(SystemClock.getInstance());
65+
this(DEFAULT_TIME_SPAN_IN_SECONDS);
66+
}
67+
68+
public TimerGauge(int timeSpanInSeconds) {
69+
this(SystemClock.getInstance(), timeSpanInSeconds);
5470
}
5571

5672
public TimerGauge(Clock clock) {
73+
this(clock, DEFAULT_TIME_SPAN_IN_SECONDS);
74+
}
75+
76+
public TimerGauge(Clock clock, int timeSpanInSeconds) {
5777
this.clock = clock;
78+
this.timeSpanInSeconds =
79+
Math.max(
80+
timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS),
81+
UPDATE_INTERVAL_SECONDS);
82+
this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS];
5883
}
5984

6085
public synchronized void markStart() {
@@ -89,15 +114,32 @@ public synchronized void update() {
89114
currentMaxSingleMeasurement =
90115
Math.max(currentMaxSingleMeasurement, now - currentMeasurementStartTS);
91116
}
92-
previousCount = Math.max(Math.min(currentCount / UPDATE_INTERVAL_SECONDS, 1000), 0);
117+
updateCurrentValue();
93118
previousMaxSingleMeasurement = currentMaxSingleMeasurement;
94119
currentCount = 0;
95120
currentMaxSingleMeasurement = 0;
96121
}
97122

123+
private void updateCurrentValue() {
124+
if (idx == values.length - 1) {
125+
fullWindow = true;
126+
}
127+
values[idx] = currentCount;
128+
idx = (idx + 1) % values.length;
129+
130+
int maxIndex = fullWindow ? values.length : idx;
131+
long totalTime = 0;
132+
for (int i = 0; i < maxIndex; i++) {
133+
totalTime += values[i];
134+
}
135+
136+
currentValue =
137+
Math.max(Math.min(totalTime / (UPDATE_INTERVAL_SECONDS * maxIndex), 1000), 0);
138+
}
139+
98140
@Override
99141
public synchronized Long getValue() {
100-
return previousCount;
142+
return currentValue;
101143
}
102144

103145
/**

flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class TimerGaugeTest {
4545
@Test
4646
public void testBasicUsage() {
4747
ManualClock clock = new ManualClock(42_000_000);
48-
TimerGauge gauge = new TimerGauge(clock);
48+
TimerGauge gauge = new TimerGauge(clock, View.UPDATE_INTERVAL_SECONDS);
4949

5050
gauge.update();
5151
assertThat(gauge.getValue(), is(0L));
@@ -74,7 +74,7 @@ public void testBasicUsage() {
7474
@Test
7575
public void testUpdateWithoutMarkingEnd() {
7676
ManualClock clock = new ManualClock(42_000_000);
77-
TimerGauge gauge = new TimerGauge(clock);
77+
TimerGauge gauge = new TimerGauge(clock, View.UPDATE_INTERVAL_SECONDS);
7878

7979
gauge.markStart();
8080
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
@@ -107,4 +107,48 @@ public void testGetWithoutUpdate() {
107107
assertThat(gauge.getValue(), is(0L));
108108
assertThat(gauge.getMaxSingleMeasurement(), is(0L));
109109
}
110+
111+
@Test
112+
public void testLargerTimespan() {
113+
ManualClock clock = new ManualClock(42_000_000);
114+
TimerGauge gauge = new TimerGauge(clock, 2 * View.UPDATE_INTERVAL_SECONDS);
115+
116+
gauge.markStart();
117+
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
118+
gauge.markEnd();
119+
gauge.update();
120+
121+
assertThat(gauge.getValue(), is(SLEEP / View.UPDATE_INTERVAL_SECONDS));
122+
assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP));
123+
assertEquals(gauge.getAccumulatedCount(), SLEEP);
124+
125+
gauge.update();
126+
// One sleep in 2 intervals
127+
assertThat(gauge.getValue(), is(SLEEP / (View.UPDATE_INTERVAL_SECONDS * 2)));
128+
assertThat(gauge.getMaxSingleMeasurement(), is(0L));
129+
assertEquals(gauge.getAccumulatedCount(), SLEEP);
130+
131+
// One sleep in each interval
132+
133+
gauge.markStart();
134+
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
135+
gauge.markEnd();
136+
gauge.update();
137+
138+
gauge.markStart();
139+
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
140+
gauge.markEnd();
141+
gauge.update();
142+
143+
assertThat(gauge.getValue(), is(SLEEP / (View.UPDATE_INTERVAL_SECONDS)));
144+
145+
// Check that the getMaxSingleMeasurement can go down after an update
146+
gauge.markStart();
147+
clock.advanceTime(SLEEP / 2, TimeUnit.MILLISECONDS);
148+
gauge.markEnd();
149+
gauge.update();
150+
151+
assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP / 2));
152+
assertEquals(gauge.getAccumulatedCount(), 3 * SLEEP + SLEEP / 2);
153+
}
110154
}

0 commit comments

Comments
 (0)