Hi Anil,
Yes you are right, split assignment is either proactive by split discovery
and then assignment as in the scheduled discovery tasks in Kafka, JDBC
sources or reactive where the reader itself submits a split request and the
enumerator assigns on requests like the example you added, or even a
mixture of both as in the file system source, In order to assign splits via
SplitEnumerator#handleSplitRequest implementation reader instances must
submit requests as in here[1]. I can't see your custom implementation [2]
submitting any split requests. Let me know if I get anything wrong here!


1-
https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40

2-
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java
Best Regards
Ahmed Hamdy


On Sun, 6 Oct 2024 at 16:48, Anil Dasari <adas...@guidewire.com> wrote:

> Hi Ahmed,
> Thanks for the response.
> This is the part that I find unclear in the documentation and FLIP-27. The
> actual split assignment happens in the SplitEnumerator#handleSplitRequest
> method and not in SplitEnumerator#start. Both KafkaSource and JdbcSource
> use context.callAsync to identify new splits in SplitEnumerator#start.
> From what I understand, split assignment does not need to occur in the
> start method. SplitEmulator assigns the assigned splits to the defined
> reader in the source.
>
> SplitEnumerator#handleSplitRequest calls decided by parallelism ? i
> think, yes.
>
> Kafka and Jdbc Source split emulators a bit complex due to the nature of
> the auto discovery required for the given configuration. Here is the simple
> split emulator from flink repo
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58
> where split assignment happen in handleRequest.
>
> Let me know if you have any questions.
>
> Thanks.
>
> On Sun, Oct 6, 2024 at 6:44 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>
>> Hi Anil
>> I am glad the new (yet to be only) source API is getting attention,
>> according to the execution model the assignments of splits is the
>> responsibility of the enumerator and it seems that the enumerator is not
>> assigning the readers any splits.
>> Check kafka source for reference[1]
>> 1-
>> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Sun, 6 Oct 2024 at 06:41, Anil Dasari <adas...@guidewire.com> wrote:
>>
>>> Hello,
>>> I have implemented a custom source that reads tables in parallel, with
>>> each split corresponding to a table and custom source implementation can be
>>> found here -
>>> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java
>>>
>>> However, it seems the source splits are not being scheduled and data is
>>> not being read from the tables. Can someone help me identify the issue in
>>> the implementation?
>>>
>>> Thanks
>>>
>>>

Reply via email to