Hi Ahmed,
Thanks again. You are right. send a split request call in the reader fixed
the issue.

Thanks




On Sun, Oct 6, 2024 at 11:30 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Anil,
> Yes you are right, split assignment is either proactive by split discovery
> and then assignment as in the scheduled discovery tasks in Kafka, JDBC
> sources or reactive where the reader itself submits a split request and the
> enumerator assigns on requests like the example you added, or even a
> mixture of both as in the file system source, In order to assign splits via
> SplitEnumerator#handleSplitRequest implementation reader instances must
> submit requests as in here[1]. I can't see your custom implementation [2]
> submitting any split requests. Let me know if I get anything wrong here!
>
>
> 1-
> https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40
> <https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40>
>
> 2-
> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java
> <https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java>
> Best Regards
> Ahmed Hamdy
>
>
> On Sun, 6 Oct 2024 at 16:48, Anil Dasari <adas...@guidewire.com> wrote:
>
>> Hi Ahmed,
>> Thanks for the response.
>> This is the part that I find unclear in the documentation and FLIP-27.
>> The actual split assignment happens in the
>> SplitEnumerator#handleSplitRequest method and not in
>> SplitEnumerator#start. Both KafkaSource and JdbcSource use
>> context.callAsync to identify new splits in SplitEnumerator#start. From
>> what I understand, split assignment does not need to occur in the start
>> method. SplitEmulator assigns the assigned splits to the defined reader in
>> the source.
>>
>> SplitEnumerator#handleSplitRequest calls decided by parallelism ? i
>> think, yes.
>>
>> Kafka and Jdbc Source split emulators a bit complex due to the nature of
>> the auto discovery required for the given configuration. Here is the simple
>> split emulator from flink repo
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58
>> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58>
>> where split assignment happen in handleRequest.
>>
>> Let me know if you have any questions.
>>
>> Thanks.
>>
>> On Sun, Oct 6, 2024 at 6:44 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>>
>>> Hi Anil
>>> I am glad the new (yet to be only) source API is getting attention,
>>> according to the execution model the assignments of splits is the
>>> responsibility of the enumerator and it seems that the enumerator is not
>>> assigning the readers any splits.
>>> Check kafka source for reference[1]
>>> 1-
>>> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286
>>> <https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286>
>>> Best Regards
>>> Ahmed Hamdy
>>>
>>>
>>> On Sun, 6 Oct 2024 at 06:41, Anil Dasari <adas...@guidewire.com> wrote:
>>>
>>>> Hello,
>>>> I have implemented a custom source that reads tables in parallel, with
>>>> each split corresponding to a table and custom source implementation can be
>>>> found here -
>>>> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java
>>>> <https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java>
>>>>
>>>> However, it seems the source splits are not being scheduled and data is
>>>> not being read from the tables. Can someone help me identify the issue in
>>>> the implementation?
>>>>
>>>> Thanks
>>>>
>>>>

Reply via email to