Hi Jose,

I had a look at your program but did not spot anything.
The query is a simple "SELECT FROM WHERE" query that does not have any
state.
So the only state is the state of the Kafka source, i.e, the offset.

How much time did pass between taking the savepoint and resuming?
Did you see any exceptions in the log files (TM, JM)?

Thanks, Fabian

2017-10-06 15:48 GMT+02:00 Jose Miguel Tejedor Fernandez <
jose.fernan...@rovio.com>:

> Hi,
>
> I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1)
> whose source and sink is a Kafka cluster 0.10.0.1.
>
> I am testing savepoints by stopping/resuming the job and when I checked
> the validity of the data sunk during the stop time I observed that some of
> the events have been lost.
>
> The stream of events is around 6K per 10 minutes and around 50% are lost.
> I share the code in case you can indicate me any hint.
>
>
> Job is resumed correctly from last savepoint and checkpoints configuration
> is as follow:
>
> env.setStateBackend(new FsStateBackend("s3://my_
> bucket/flink/checkpoints/"));
> env.enableCheckpointing(params.getLong("checkpoint.interval", 300000));
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(
> params.getLong("checkpoint.minPause", 60 * 1000));
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(
> params.getInt("checkpoint.maxConcurrent", 1));
> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
> 10 * 60 * 1000));
>  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.
> ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
> The Kafka consumer:
>
>         SingleOutputStreamOperator<Map<String, String>>
> createKafkaStream(Collection<String> topics, int parallelism, Properties
> kafkaProps, StreamExecutionEnvironment env,
>             int resetOffsetsTo, String eventTimeField)
>         {
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>         FlinkKafkaConsumer010<Map<String, String>> consumer = new
> FlinkKafkaConsumer010<>(
>                 new LinkedList<>(topics),
>                 new EventMapSchema(), kafkaProps);
>
>         DataStream<Map<String, String>> messageStream = env
>                 .addSource(consumer)
>                 .name("Kafka (" + StringUtils.join(topics, ", ") + ")")
>                 .setParallelism(parallelism);
>
>         return messageStream
>                 // discard events that don't have the event time field
>                 .filter(new MissingTimeStampFilter(eventTimeField))
>                 // provide ascending timestamps for
> TimeCharacteristic.EventTime
>                 .assignTimestampsAndWatermarks(new
> EventMapTimestampExtractor(eventTimeField));
>
>          }
>
>         ....
>
>         StreamTableUtils.registerTable("events", kafkaStream, fieldNames,
> tableEnv);
>
>         String sql = "SELECT\n" +
>                 "  field_1 AS a,\n" +
>                 "  field_2 AS b,\n" +
>                 "  field_3 AS c,\n" +
>                 "  field_4 AS d,\n" +
>                 "  field_5 AS e,\n" +
>                 " FROM events\n" +
>                 " WHERE field_1 IS NOT NULL";
>         LOG.info("sql: {}", sql);
>
>
>         Table result = tableEnv.sql(sql);
>         System.err.println("output fields: " + Arrays.toString(result.
> getSchema().getColumnNames()));
>
>         if (printToErr) {≤
>             printRows(result, tableEnv);
>         }
>
>         if (!StringUtils.isBlank(outputTopic)) {
>             TableSink<?> tableSink = new 
> Kafka09CustomJsonTableSink(outputTopic,
> KafkaStreams.getProperties(params),
>                     new FlinkFixedPartitioner<>(), timestampColumn);
>             result.writeToSink(tableSink);
>         }
>
>         env.execute();
>
> Cheers
>
> BR
>
>
> *JM*
>
>
>

Reply via email to