Hi Marco, That's a lot of code to digest. So I'm sorry if I did get something wrong.
>From your example, it looks like you want to use the average within a tumble window. If no record for a particular key has been emitted in that time, you want to repeat the last value. I'd use a dummy record to force the windows to be triggered and then ignore it on aggregation. Here is a sketch aggregateTimeSeriesStream.keyBy(1). .process(<store all keys in a state, on 15 min timer, emit dummy record for all keys>) .window(<tumble 15 min>) .reduce(<average, ignore dummy record, create dummy record if empty>) .process(<store last record per key, replace dummy record with last value>) This approach needs more functions, but they can be chained mostly. However, it should be smaller pieces of work that should be easier to maintain and test. You especially save the trouble of writing the window logic yourself. On Thu, Jun 18, 2020 at 7:11 PM Marco Villalobos <mvillalo...@kineteque.com> wrote: > I came up with a solution for backfills. However, at this moment, I am not > happy with my solution. > I think there might be other facilities within Flink which allow me to > implement a better more efficient or more scalable solution. > > In another post, rmetz...@apache.org suggested that I use a process > function and a timer. He was right in that I should use that approach. I > want to thank him. > > The averages are computed by a ProcessWindowFunction that keys by the name > and window size and uses a tumbling event time window. > > However, after that average is complete, I then use a KeyedProcessFunction > that is keyed by window size. I then use a somewhat brute force approach > with ValueState<Set<String>> to track names that need a value and a MAP > STATE to determine which values exist and which ones are backfilled. > It also cleans up stale values. > > I committed my code to a branch > https://github.com/minmay/flink-patterns/tree/feature/backfill , and I > also created a pull request > https://github.com/minmay/flink-patterns/pull/1/files to share my > experience. > > I am open critical comments on my approach, lack of understanding of > Flink, algorithms and data-structures used. Please refrain from comments on > my code style though. > > I'll also copy and paste my solution below. > > package mvillalobos.flink.patterns.timeseries.average; > > import com.google.common.collect.ImmutableList; > import org.apache.flink.api.common.state.MapState; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.io.jdbc.JDBCOptions; > import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple7; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import > org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; > import > org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.TableSchema; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import picocli.CommandLine; > > import java.io.File; > import java.sql.Connection; > import java.sql.DriverManager; > import java.sql.ResultSet; > import java.sql.Statement; > import java.sql.Timestamp; > import java.time.Instant; > import java.time.temporal.ChronoUnit; > import java.util.Comparator; > import java.util.List; > import java.util.Map; > import java.util.Set; > import java.util.TreeSet; > import java.util.concurrent.Callable; > import java.util.stream.Collectors; > import java.util.stream.StreamSupport; > > @CommandLine.Command(name = "Time Series Average", > mixinStandardHelpOptions = true, > description = "Compute the average of the time series with a 15 > minute tumbling event time window and upsert the results into an Apache > Derby database.") > public class TimeSeriesAverageApp implements Callable<Integer> { > > private final static Logger logger = > LoggerFactory.getLogger(TimeSeriesAverageApp.class); > > @CommandLine.Option(names = {"-f", "--input-file"}, description = "The > CSV input file of time series data. Each line must be in the format: > String, double, Instant.") > private File inputFile; > > @Override > public Integer call() throws Exception { > stream(inputFile.toString()); > return 0; > } > > public void stream(String inputFilePath) throws Exception { > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > // GIVEN a SOURCE with a CSV input file > // in which each line has a: String, double, Instant > // THEN the MAP operator > // transforms the line into a Tuple7 > // f0: name: String > // f1: window_size: int > // f2: value: double > // f3: event_timestamp: Instant > // f4: aggregate_sum: double > // f5: aggregate_count double > // f6: is_backfile: boolean > // WHEN the map operation finishes > // THEN the event time assigned using field f3 > final DataStream<Tuple7<String, Integer, Double, Instant, Double, > Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath) > .map(line -> { > final String[] split = line.split(","); > final String name = split[0]; > final double value = Double.parseDouble(split[1]); > final Instant timestamp = Instant.parse(split[2]); > return Tuple7.of(name, 1, value, timestamp, value, 1, > false); > }).returns(Types.TUPLE(Types.STRING, Types.INT, > Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, > Types.BOOLEAN)) > .name("time series stream") > .assignTimestampsAndWatermarks( > new AscendingTimestampExtractor<>() { > @Override > public long > extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, > Integer, Boolean> element) { > return element.f3.toEpochMilli(); > } > } > ); > > final JDBCUpsertTableSink jdbcUpsertTableSink = > buildJdbcUpsertTableSink(); > > upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream); > > // GIVEN a data stream with Tuple7 > // f0: name: String > // f1: window_size: int > // f2: value: double > // f3: event_timestamp: Instant > // f4: aggregate_sum: double > // f5: aggregate_count double > // f6: is_backfill: boolean > // THEN the stream is KEYED BY: f0: name:String, f1: window_size: > int > // THEN the stream is WINDOWED into a tumbling event time window > of 15 minutes > // THEN the window is configured to allow elements late by 1 hour > // THEN a low-level process window function is applied to the > window that > // aggregates the time series by assigning the following > tuple fields: > // f1: window_size = 15 minutes in miliseconds > // f2: value = average value in this 15 minute window > // f3: event_timestamp = the first epoch millisecond in this > 15 minute window > // f4: aggregate_sum = sum of f2 values in this 15 minute > window > // f5: aggregate_count = number of values in this 15 minute > window > final DataStream<Tuple7<String, Integer, Double, Instant, Double, > Integer, Boolean>> > aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1) > .window(TumblingEventTimeWindows.of(Time.minutes(15))) > .allowedLateness(Time.hours(1)) > .process(new ProcessWindowFunction<Tuple7<String, Integer, > Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, > Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() { > @Override > public void process( > Tuple tuple, > Context context, > Iterable<Tuple7<String, Integer, Double, > Instant, Double, Integer, Boolean>> elements, > Collector<Tuple7<String, Integer, Double, > Instant, Double, Integer, Boolean>> out > ) throws Exception { > > final Tuple7<String, Integer, Double, Instant, > Double, Integer, Boolean> aggregation = new Tuple7<>(); > > boolean is_window_initialized = false; > for (Tuple7<String, Integer, Double, Instant, > Double, Integer, Boolean> element : > ImmutableList.copyOf(elements).reverse()) { > > if (!is_window_initialized) { > > final Instant timestamp = > Instant.ofEpochMilli(context.window().getStart()); > > aggregation.f0 = element.f0; > aggregation.f1 = (int) > Time.minutes(15).toMilliseconds(); > aggregation.f2 = element.f2; > aggregation.f3 = timestamp; > aggregation.f4 = 0D; > aggregation.f5 = 0; > aggregation.f6 = false; > is_window_initialized = true; > } > > aggregation.f4 += element.f2; > aggregation.f5++; > aggregation.f2 = aggregation.f4 / > aggregation.f5; > } > > logger.info("Added aggregation: {}", aggregation); > out.collect(aggregation); > } > }).name("averaged keyed tumbling window event time > stream"); > > > // GIVEN a data-stream of tuple7 > // f0: name: String > // f1: window_size: int > // f2: value: double > // f3: event_timestamp: Instant > // f4: aggregate_sum: double > // f5: aggregate_count double > // f6: is_backfill: boolean > // THAT was aggregated to compute the average on f2: value: double > // WITH a grouping of: f0: name:String, f1: window_size: int > // WITH a tumbling event time window of 15 minutes > // THEN the stream is KEYED BY: f1: window_size: int > // THEN a low-level keyed process function is applied to the > window that > // WHEN the keyed process function opens it > // initializes a VALUE STATE of TreeSet<String> called > "nameSet" > // initializes a MAP STATE of > // KEY of Tuple2: f0: String, f1: Instant > // VALUE of Tuple7: > // f0: String > // f1: int > // f2: double > // f3: Instant > // f4: double > // f5: double > // f6: boolean > // called "backfillState" > // WHEN the keyed process function processes an element it > // adds each f0: name: String into the VALUE STATE > "nameSet" > // adds each > // KEY of Tuple2: f0: name: String, f3: > event_timestamp: Instant > // VALUE of Tuple7: > // f0: name: String > // f1: window_size: int > // f2: value: double > // f3: event_timestamp: Instant > // f4: aggregate_sum: double > // f5: aggregate_count double > // f6: is_backfill: boolean > // to the MAP STATE "backfillState" > // fires an timer to occur at f3: event_timestamp: > Instant + 15 minutes (at the end of a window) > // WHEN the keyed process functions coalesced timers are > handled it > // calculates the current "event_time" to handle which is > the timestamp - 15 minutes > // iterates over each time series name in the "nameSet" > for each "name": > // IF MAP STATE "backfillState" contains a KEY of > Tuple2: "name", "event_time" THEN > // collect the VALUE as an OUT result because it > is not a back fill > // ELSE > // the KEY of Tuple2: "name", "event_time" > requires a back fill > // iterate over the MAP STATE "backfillState" > // filter the by "name" = Tuple2.f0 > // filter by timestamp "event_time" > > Tuple2.f1 > // sort by key timestamp Tuple.f1 in > ascending order > // collect into a List named "backfills" > // IF "backfills" is empty THEN there is no > backfill > // ELSE > // the back fill is the last value in the list > // remove the other values in the list from > MAP STATE "backfillState" as they are no longer needed > final DataStream<Tuple7<String, Integer, Double, Instant, Double, > Integer, Boolean>> backfilledAggregateTimeSeriesStream = > aggregateTimeSeriesStream.keyBy(1) > .process( > new KeyedProcessFunction<>() { > > private ValueState<Set<String>> > namesState; > > private MapState<Tuple2<String, > Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, > Boolean>> backfillState; > > @Override > public void open(Configuration > parameters) { > MapStateDescriptor<Tuple2<String, > Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, > Boolean>> backfillDescriptor = > new MapStateDescriptor<>( > "backfill-state", > > TypeInformation.of(new TypeHint<>() {}), > > TypeInformation.of(new TypeHint<>() {}) > ); > > backfillState = > getRuntimeContext().getMapState(backfillDescriptor); > > ValueStateDescriptor<Set<String>> > namesDescriptor = > new > ValueStateDescriptor<>("names-value-state", TypeInformation.of(new > TypeHint<>() {})); > > namesState = > getRuntimeContext().getState(namesDescriptor); > } > > @Override > public void processElement( > Tuple7<String, Integer, > Double, Instant, Double, Integer, Boolean> value, > Context ctx, > Collector<Tuple7<String, > Integer, Double, Instant, Double, Integer, Boolean>> out > ) throws Exception { > > if (namesState.value() == null) { > namesState.update(new > TreeSet<>()); > } > > namesState.value().add(value.f0); > > final Instant evenTime = value.f3; > final long timer = > evenTime.toEpochMilli() + Time.minutes(15).toMilliseconds(); > > logger.info( > "processElement with key: > {}, value: {}. registering timer: {}", > ctx.getCurrentKey(), > value, > Instant.ofEpochMilli(timer) > ); > > ctx.timerService().registerEventTimeTimer(timer); > > final Tuple2<String, Instant> > currentKey = new Tuple2<>(value.f0, value.f3); > backfillState.put(currentKey, > value); > } > > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple7<String, > Integer, Double, Instant, Double, Integer, Boolean>> out > ) throws Exception { > > final Instant event_time = > Instant.ofEpochMilli(timestamp).minus(15, ChronoUnit.MINUTES); > > for (String name : > namesState.value()) { > Tuple2<String, Instant> key = > new Tuple2<>(name, event_time); > if > (backfillState.contains(key)) { > final Tuple7<String, > Integer, Double, Instant, Double, Integer, Boolean> value = > backfillState.get(key); > logger.info( > "onTimer with key: > {} timestamp: {}, event_time: {}, has value: {}", > > ctx.getCurrentKey(), > > Instant.ofEpochMilli(timestamp), > event_time, > value > ); > out.collect(value); > } else { > final > List<Map.Entry<Tuple2<String, Instant>, Tuple7<String, Integer, Double, > Instant, Double, Integer, Boolean>>> backfills > = > StreamSupport.stream(backfillState.entries().spliterator(), false) > .filter(entry -> > name.equals(entry.getKey().f0)) > .filter(entry -> > event_time.isAfter(entry.getKey().f1)) > > .sorted(Comparator.comparing(entry -> entry.getKey().f1)) > > .collect(Collectors.toList()); > > if (!backfills.isEmpty()) { > final Tuple7<String, > Integer, Double, Instant, Double, Integer, Boolean> value = > backfills.get(backfills.size() - 1).getValue(); > final Tuple7<String, > Integer, Double, Instant, Double, Integer, Boolean> backfill = new Tuple7<>( > value.f0, > value.f1, value.f2, event_time, value.f4, value.f5, true > ); > out.collect(backfill); > > for (int i = 0; i < > backfills.size() - 1; i++) { > > backfillState.remove(backfills.get(i).getKey()); > } > logger.info("onTimer > with key: {} timestamp: {}, step: {}, has backfill: {}", > ctx.getCurrentKey(), Instant.ofEpochMilli(timestamp), event_time, backfill); > } > } > } > logger.info("*****************"); > } > }); > > upsertToJDBC(jdbcUpsertTableSink, > backfilledAggregateTimeSeriesStream); > > env.execute("time series"); > } > > private JDBCUpsertTableSink buildJdbcUpsertTableSink() { > final JDBCUpsertTableSink jdbcUpsertTableSink = > JDBCUpsertTableSink.builder() > .setOptions(JDBCOptions.builder() > .setDBUrl("jdbc:derby:memory:flink") > .setTableName("time_series") > .build()) > .setTableSchema(TableSchema.builder() > .field("name", DataTypes.VARCHAR(50).notNull()) > .field("window_size", DataTypes.INT().notNull()) > .field("value", DataTypes.DOUBLE().notNull()) > .field("event_timestamp", > DataTypes.TIMESTAMP().notNull()) > .field("aggregate_sum", > DataTypes.DOUBLE().notNull()) > .field("aggregate_count", > DataTypes.INT().notNull()) > .field("is_backfill", > DataTypes.BOOLEAN().notNull()) > .primaryKey("name", "window_size", > "event_timestamp") > .build()) > .build(); > jdbcUpsertTableSink.setKeyFields(new String[]{"name", > "window_size", "event_timestamp"}); > return jdbcUpsertTableSink; > } > > private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, > DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, > Boolean>> timeSeriesStream) { > jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> { > final Row row = new Row(7); > row.setField(0, t.f0); > row.setField(1, t.f1); > row.setField(2, t.f2); > row.setField(3, Timestamp.from(t.f3)); > row.setField(4, t.f4); > row.setField(5, t.f5); > row.setField(6, t.f6); > return new Tuple2<>(true, row); > }).returns(new TypeHint<Tuple2<Boolean, Row>>() { > })).name("upsert to JDBC"); > } > > public static void main(String[] args) throws Exception { > > final String databaseURL = "jdbc:derby:memory:flink;create=true"; > int exitCode; > try (final Connection con = > DriverManager.getConnection(databaseURL)) { > try (final Statement stmt = con.createStatement();) { > stmt.execute("CREATE TABLE time_series (\n" + > " id INTEGER NOT NULL GENERATED ALWAYS AS > IDENTITY (START WITH 1, INCREMENT BY 1),\n" + > " name VARCHAR(50) NOT NULL,\n" + > " window_size INTEGER NOT NULL DEFAULT 1,\n" + > " event_timestamp TIMESTAMP NOT NULL DEFAULT > CURRENT_TIMESTAMP,\n" + > " value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" > + > " aggregate_sum DOUBLE PRECISION NOT NULL > DEFAULT 0,\n" + > " aggregate_count INTEGER NOT NULL DEFAULT > 1,\n" + > " is_backfill BOOLEAN NOT NULL DEFAULT > false,\n" + > " version INTEGER NOT NULL DEFAULT 1,\n" + > " create_time TIMESTAMP NOT NULL DEFAULT > CURRENT_TIMESTAMP,\n" + > " modify_time TIMESTAMP NOT NULL DEFAULT > CURRENT_TIMESTAMP,\n" + > " UNIQUE (name, window_size, > event_timestamp)\n" + > ")"); > } > > exitCode = new CommandLine(new > TimeSeriesAverageApp()).execute(args); > > try (final Statement stmt = con.createStatement()) { > final ResultSet rs = stmt.executeQuery("SELECT id, name, > window_size, event_timestamp, value, aggregate_sum, aggregate_count, > is_backfill, version, create_time, modify_time FROM time_series ORDER BY > window_size, event_timestamp, name"); > while (rs.next()) { > final long id = rs.getLong(1); > final String name = rs.getString(2); > final int window_size = rs.getInt(3); > final Timestamp event_timestamp = rs.getTimestamp(4); > final double value = rs.getDouble(5); > final double aggregate_sum = rs.getDouble(6); > final int aggregate_count = rs.getInt(7); > final boolean is_backfill = rs.getBoolean(8); > final int version = rs.getInt(9); > final Timestamp create_time = rs.getTimestamp(10); > final Timestamp modify_time = rs.getTimestamp(11); > logger.info( > "id: {}, name: \"{}\", window_size: {}, > event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, > is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"", > id, name, window_size, event_timestamp, value, > aggregate_sum, aggregate_count, is_backfill, version, create_time, > modify_time > ); > } > } > } > > System.exit(exitCode); > } > } > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng