This is fairly stale, but getting back to this: We ended up going the route of using the Operator API and implementing something similar to the `readFile` API with one real source function that reads out splits and then a small abstraction over the AbstractStreamOperator, a `MessagableSourceFunction`, that had a similar API to processFunction.
It is a little bit more to deal with, but wasn't too bad all told. I am hoping to get the code cleaned up and post it at least as an example of how to use the Operator API for some more advanced use cases. Aljoscha: That looks really interesting! I actually saw that too late to consider something like that, but seems like a good change! Thanks for the input. Addison On Thu, Aug 30, 2018 at 4:07 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Addison, > > for a while now different ideas about reworking the Source interface have > been floated. I implemented a prototype that showcases my favoured approach > for such a new interface: > https://github.com/aljoscha/flink/commits/refactor-source-interface > > This basically splits the Source into two parts: a SplitEnumerator and a > SplitReader. The enumerator is responsible for discovering what should be > read and the reader is responsible for reading splits. In this model, the > SplitReader does not necessarily have to sit at the beginning of the > pipeline, it could sit somewhere in the middle and the splits don't have to > necessarily come from the enumerator but could come from a different source. > > I think this could fit the use case that you're describing. > > Best, > Aljoscha > > On 25. Aug 2018, at 11:45, Chesnay Schepler <ches...@apache.org> wrote: > > The SourceFunction interface is rather flexible so you can do pretty much > whatever you want. Exact implementation depends on whether control messages > are pulled or pushed to the source; in the first case you'd simply block > within "run()" on the external call, in the latter you'd have it block on a > queue of some sort that is fed by another thread waiting for messages. > > AFAIK you should never use the collector outside of "processElement". > > On 25.08.2018 05:15, vino yang wrote: > > Hi Addison, > > I have a lot of things I don't understand. Is your source self-generated > message? Why can't source receive input? If the source is unacceptable then > why is it called source? Isn't kafka-connector the input as source? > > If you mean that under normal circumstances it can't receive another input > about control messages, there are some ways to solve it. > > 1) Access external systems in your source to get or subscribe to control > messages, such as Zookeeper. > 2) If your source is followed by a map or process operator, then they can > be chained together as a "big" source, then you can pass your control > message via Flink's new feature "broadcast state". See this blog post for > details.[1] > 3) Mix control messages with normal messages in the same message flow. > After the control message is parsed, the corresponding action is taken. Of > course, this kind of program is not very recommended. > > [1]: > https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink > > Thanks, vino. > > Addison Higham <addis...@gmail.com> 于2018年8月25日周六 上午12:46写道: > >> HI, >> >> I am writing a parallel source function that ideally needs to receive >> some messages as control information (specifically, a state message on >> where to start reading from a kinesis stream). As far as I can tell, there >> isn't a way to make a sourceFunction receive input (which makes sense) so I >> am thinking it makes sense to use a processFunction that will occasionally >> receive control messages and mostly just output a lot of messages. >> >> This works from an API perspective, with a few different options, I could >> either: >> >> A) have the processElement function block on calling the loop that will >> produce messages or >> B) have the processEelement function return (by pass the collector and >> starting the reading on a different thread), but continue to produce >> messages downstream >> >> This obviously does raise some concerns though: >> >> 1. Does this break any assumptions flink has of message lifecycle? Option >> A of blocking on processElement for very long periods seems straight >> forward but less than ideal, not to mention not being able to handle any >> other control messages. >> >> However, I am not sure if a processFunction sending messages after the >> processElement function has returned would break some expectations flink >> has of operator lifeycles. Messages are also emitted by timers, etc, but >> this would be completely outside any of those function calls as it is >> started on another thread. This is obviously how most SourceFunctions work, >> but it isn't clear if the same technique holds for ProcessFunctions >> >> 2. Would this have a negative impact on backpressure downstream? Since I >> am still going to be using the same collector instance, it seems like it >> should ultimately work, but I wonder if there are other details I am not >> aware of. >> >> 3. Is this just a terrible idea in general? It seems like I could maybe >> do this by implementing directly on top of an Operator, but I am not as >> familiar with that API >> >> Thanks in advance for any thoughts! >> >> Addison >> > > >