Hi Thomas,

It’s great that you’ve brought out these issues, which IMO are all very valid. 
They have also been in my head for a while.

Here’s a list of things, out of the top of my head, that I would really like to 
improve as part of a major Kafka / Kinesis connector rework.
Some have JIRAs for them already, or were discussed in some other indirectly 
related JIRA. It might make sense to open an umbrella ticket and consolidate 
all of them there.

- Common abstraction for partition-based, replayable sources, which handles 1) 
position checkpointing, 2) partition discovery / topic subscription (using the 
file source pattern), 3) custom startup positions, 4) per-partition watermarks, 
and 5) partition idleness.
- Configuration for the connectors are not well-defined. Some go through 
provided properties, some requires using setter methods, etc. Moreover, it is 
actually very confusing for some users that we share the Properties to carry 
Flink connector-specific configurations, as well as the internally used client 
configuration [1]. I think in this aspect, Beam’s KafkaIO has a nice API [2] 
when it comes to this.
- Some default behaviors of the current connectors, such as partitioning and 
flushing on the producer sides, and offset-committing for the Kafka consumer, 
do not make sense [3] [4].
- The deserialization / serialization schema together with the partitioner 
interfaces don’t really place well together. For example, the `getTargetTopic` 
method should really be part of the partitioner [5].

I think we are now in a good position to try making this happen for 1.6. Once 
1.5 is out of the way, I can try opening an umbrella JIRA and collect 
everything there so we can discuss more there.

Cheers,
Gordon

[1] 
https://issues.apache.org/jira/browse/FLINK-4280?focusedCommentId=15399648&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648
[2] 
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171
[3] https://issues.apache.org/jira/browse/FLINK-5728
[4] https://issues.apache.org/jira/browse/FLINK-5704
[5] https://issues.apache.org/jira/browse/FLINK-6288

On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:

Generalizing the pattern would be great. I was also wondering if there aren't 
other commonalities between sources that would benefit from a shared framework. 
Kafka and Kinesis don't look all that different from a consumer perspective: 
replayable source, topic -> stream, partition -> shard, offset -> sequence, 
dynamic discovery, state saving - shouldn't there be more common code?

Meanwhile, we need to find a way to address shortcomings in the current Kinesis 
connector to enable the use case. I would prefer to do that without permanently 
forking the connector code, so here are some more thoughts:
Provide hook to override discovery (not to hit Kinesis from every subtask)
Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)
If we can accomplish these in short order, it would be great. The current 
implementation makes it hard/impossible to override certain behaviors (final 
protected methods and the like). If there is agreement then I would like to 
address those as a quick PR.

Thanks,
Thomas


On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
Hi,

That last point is very valid. For a while now I've wanted to generalise the 
pattern of our file source to other sources. (This is related to how Beam 
sources are being refactored to use Splittable DoFn.)

I'm very eager for design work to start on this once 1.5 is out the door. There 
are some other folks (cc'ed) who have also talked/thought about this before.

Best,
Aljoscha

> On 7. Feb 2018, at 01:44, Thomas Weise <t...@apache.org> wrote:
>
> In addition to lack of watermark support, the Kinesis consumer suffers from
> a discovery related issue that also needs to be resolved. Shard discovery
> runs periodically in all subtasks. That's not just inefficient but becomes
> a real problem when there is a large number of subtasks due to rate
> limiting (
> https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
> The discovery interval should be minimized to cap latency (new shards not
> consumed until discovered).
>
> How about moving discovery out of the fetcher into a separate singleton
> source and then broadcast the result to the parallel fetchers, following
> the pattern applied to file input?
>
> https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336
>
> This would also ensure that all subtasks consistently see the same shard
> list.
>
> Thoughts?
>
> Thanks,
> Thomas
>
>
> On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise <t...@apache.org> wrote:
>
>> Hi,
>>
>> The Kinesis consumer currently does not emit watermarks, and this can lead
>> to problems when a single subtask reads from multiple shards and offsets
>> are not closely aligned with respect to the event time.
>>
>> The Kafka consumer has support for periodic and punctuated watermarks,
>> although there is also the unresolved issue https://issues.apache.org/
>> jira/browse/FLINK-5479 that would equally apply for Kinesis.
>>
>> I propose adding support for timestamp assigner and watermark generator to
>> the Kinesis consumer.
>>
>> As for handling of idle shards, is there a preference? Perhaps a
>> customization point on the assigner that defers the decision to the user
>> would be appropriate?
>>
>> Thanks,
>> Thomas
>>
>>


Reply via email to