fhueske commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3387123212
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row
partitionKey) {
stateManager.clearStateForKey(stateName, partitionKey);
}
+ //
-------------------------------------------------------------------------
+ // Watermark & Timer API
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Sets the watermark for all tables to the given {@link LocalDateTime}
and fires eligible
+ * timers.
+ */
+ public void setWatermark(LocalDateTime watermark) throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+ }
+
+ /** Sets the watermark for all tables to the given {@link Instant} and
fires eligible timers. */
+ public void setWatermark(Instant watermark) throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkMillis(watermark.toEpochMilli());
+ }
+
+ /**
+ * Sets the watermark for a specific table to the given {@link
LocalDateTime} and fires eligible
+ * timers.
+ */
+ public void setWatermarkForTable(String tableArgument, LocalDateTime
watermark)
+ throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkForTableMillis(tableArgument,
DateTimeUtils.toTimestampMillis(watermark));
+ }
+
+ /**
+ * Sets the watermark for a specific table to the given {@link Instant}
and fires eligible
+ * timers.
+ */
+ public void setWatermarkForTable(String tableArgument, Instant watermark)
throws Exception {
+ checkNotNull(watermark, "watermark must not be null");
+ setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+ }
+
+ /** Returns all timers (both pending and fired), sorted by timestamp then
name. */
+ public List<Timer> getTimers() {
+ return Stream.concat(
+ timerManager.getPendingTimers().stream(),
+ timerManager.getFiredTimers().stream())
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ /** Returns all pending (not yet fired) timers, sorted by timestamp then
name. */
+ public List<Timer> getPendingTimers() {
+ return timerManager.getPendingTimers();
+ }
+
+ /** Returns all pending timers with the given name. */
+ public List<Timer> getPendingTimers(String timerName) {
+ return timerManager.getPendingTimers().stream()
+ .filter(t -> timerName.equals(t.getName()))
+ .collect(Collectors.toList());
+ }
+
+ /** Returns all timers that have fired, in the order they fired. */
+ public List<Timer> getFiredTimers() {
+ return timerManager.getFiredTimers();
+ }
+
+ /** Returns all fired timers with the given name. */
+ public List<Timer> getFiredTimers(String timerName) {
+ return timerManager.getFiredTimers().stream()
+ .filter(t -> timerName.equals(t.getName()))
+ .collect(Collectors.toList());
+ }
+
+ /** Clears the fired timer history. */
+ public void clearFiredTimers() {
+ timerManager.clearFiredTimers();
+ }
+
+ private void setWatermarkMillis(long millis) throws Exception {
+ checkState(isOpen, "Harness is not open");
+ for (TableArgumentInfo tableArg :
ArgumentInfo.filterTableArguments(arguments)) {
+ timerManager.setTableWatermark(tableArg.name, millis);
+ }
+ timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+ }
+
+ private void setWatermarkForTableMillis(String tableArgument, long millis)
throws Exception {
+ checkState(isOpen, "Harness is not open");
+ checkNotNull(tableArgument, "tableArgument must not be null");
+ checkArgument(
+ argumentsByName.get(tableArgument) instanceof
TableArgumentInfo,
+ "Unknown or non-table argument: %s",
+ tableArgument);
+ timerManager.setTableWatermark(tableArgument, millis);
+ timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+ }
+
+ private void fireTimer(Timer timer) throws Exception {
+ if (onTimer == null) {
+ throw new IllegalStateException(
+ "Timer fired but no onTimer() method is defined in "
+ + function.getClass().getSimpleName());
+ }
+
+ currentInvocation = InvocationContext.forTimer(timer);
+
+ try {
+ Map<String, Object> stateMap =
stateManager.loadStateForKey(timer.partitionKey);
+
+ List<StateArgumentInfo> stateArgs =
ArgumentInfo.filterStateArguments(arguments);
+ Object[] methodArgs = new Object[stateArgs.size()];
+ for (int i = 0; i < stateArgs.size(); i++) {
+ methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+ }
+
+ onTimer.invoke(function, new TestOnTimerContext(stateMap),
methodArgs);
+ stateManager.updateStateForKey(timer.partitionKey, stateMap);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof Exception) {
+ throw (Exception) cause;
+ }
+ throw new RuntimeException("onTimer() invocation failed", e);
+ } finally {
+ currentInvocation = null;
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Context implementations
+ //
-------------------------------------------------------------------------
+
+ private class TestContext implements ProcessTableFunction.Context {
+ final Map<String, Object> stateMap;
+
+ TestContext(Map<String, Object> stateMap) {
+ this.stateMap = stateMap;
+ }
+
+ @Override
+ public <TimeType> ProcessTableFunction.TimeContext<TimeType>
timeContext(
+ Class<TimeType> conversionClass) {
+ return new TestTimeContext<>(conversionClass);
+ }
+
+ @Override
+ public TableSemantics tableSemanticsFor(String argName) {
+ ArgumentInfo argInfo = argumentsByName.get(argName);
+ if (argInfo == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Argument '%s' not found. Available arguments:
%s",
+ argName, argumentsByName.keySet()));
+ }
+ if (!(argInfo instanceof TableArgumentInfo)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Argument '%s' is not a table argument (type:
%s)",
+ argName, argInfo.getClass().getSimpleName()));
+ }
+ TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+ int[] partitionIndices = getPartitionColumnIndices(tableArg);
+ int timeColumn =
+ onTimeColumnName != null
+ ?
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+ : -1;
+ return new TestHarnessTableSemantics(tableArg.dataType,
partitionIndices, timeColumn);
+ }
+
+ @Override
+ public void clearState(String stateName) {
+ stateMap.remove(stateName);
+ }
+
+ @Override
+ public void clearAllState() {
+ stateMap.clear();
Review Comment:
ah, sorry! Thought this was the full state...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]