Hi Ahmed, Thanks again. You are right. send a split request call in the reader fixed the issue.
Thanks On Sun, Oct 6, 2024 at 11:30 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > Hi Anil, > Yes you are right, split assignment is either proactive by split discovery > and then assignment as in the scheduled discovery tasks in Kafka, JDBC > sources or reactive where the reader itself submits a split request and the > enumerator assigns on requests like the example you added, or even a > mixture of both as in the file system source, In order to assign splits via > SplitEnumerator#handleSplitRequest implementation reader instances must > submit requests as in here[1]. I can't see your custom implementation [2] > submitting any split requests. Let me know if I get anything wrong here! > > > 1- > https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40 > <https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40> > > 2- > https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java > <https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java> > Best Regards > Ahmed Hamdy > > > On Sun, 6 Oct 2024 at 16:48, Anil Dasari <adas...@guidewire.com> wrote: > >> Hi Ahmed, >> Thanks for the response. >> This is the part that I find unclear in the documentation and FLIP-27. >> The actual split assignment happens in the >> SplitEnumerator#handleSplitRequest method and not in >> SplitEnumerator#start. Both KafkaSource and JdbcSource use >> context.callAsync to identify new splits in SplitEnumerator#start. From >> what I understand, split assignment does not need to occur in the start >> method. SplitEmulator assigns the assigned splits to the defined reader in >> the source. >> >> SplitEnumerator#handleSplitRequest calls decided by parallelism ? i >> think, yes. >> >> Kafka and Jdbc Source split emulators a bit complex due to the nature of >> the auto discovery required for the given configuration. Here is the simple >> split emulator from flink repo >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58 >> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58> >> where split assignment happen in handleRequest. >> >> Let me know if you have any questions. >> >> Thanks. >> >> On Sun, Oct 6, 2024 at 6:44 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote: >> >>> Hi Anil >>> I am glad the new (yet to be only) source API is getting attention, >>> according to the execution model the assignments of splits is the >>> responsibility of the enumerator and it seems that the enumerator is not >>> assigning the readers any splits. >>> Check kafka source for reference[1] >>> 1- >>> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286 >>> <https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286> >>> Best Regards >>> Ahmed Hamdy >>> >>> >>> On Sun, 6 Oct 2024 at 06:41, Anil Dasari <adas...@guidewire.com> wrote: >>> >>>> Hello, >>>> I have implemented a custom source that reads tables in parallel, with >>>> each split corresponding to a table and custom source implementation can be >>>> found here - >>>> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java >>>> <https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java> >>>> >>>> However, it seems the source splits are not being scheduled and data is >>>> not being read from the tables. Can someone help me identify the issue in >>>> the implementation? >>>> >>>> Thanks >>>> >>>>