This is fairly stale, but getting back to this:

We ended up going the route of using the Operator API and implementing
something similar to the  `readFile` API with one real source function that
reads out splits and then a small abstraction over the
AbstractStreamOperator, a `MessagableSourceFunction`, that had a similar
API to processFunction.

It is a little bit more to deal with, but wasn't too bad all told.

I am hoping to get the code cleaned up and post it at least as an example
of how to use the Operator API for some more advanced use cases.

Aljoscha: That looks really interesting! I actually saw that too late to
consider something like that, but seems like a good change!

Thanks for the input.

Addison
On Thu, Aug 30, 2018 at 4:07 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Addison,
>
> for a while now different ideas about reworking the Source interface have
> been floated. I implemented a prototype that showcases my favoured approach
> for such a new interface:
> https://github.com/aljoscha/flink/commits/refactor-source-interface
>
> This basically splits the Source into two parts: a SplitEnumerator and a
> SplitReader. The enumerator is responsible for discovering what should be
> read and the reader is responsible for reading splits. In this model, the
> SplitReader does not necessarily have to sit at the beginning of the
> pipeline, it could sit somewhere in the middle and the splits don't have to
> necessarily come from the enumerator but could come from a different source.
>
> I think this could fit the use case that you're describing.
>
> Best,
> Aljoscha
>
> On 25. Aug 2018, at 11:45, Chesnay Schepler <ches...@apache.org> wrote:
>
> The SourceFunction interface is rather flexible so you can do pretty much
> whatever you want. Exact implementation depends on whether control messages
> are pulled or pushed to the source; in the first case you'd simply block
> within "run()" on the external call, in the latter you'd have it block on a
> queue of some sort that is fed by another thread waiting for messages.
>
> AFAIK you should never use the collector outside of "processElement".
>
> On 25.08.2018 05:15, vino yang wrote:
>
> Hi Addison,
>
> I have a lot of things I don't understand. Is your source self-generated
> message? Why can't source receive input? If the source is unacceptable then
> why is it called source? Isn't kafka-connector the input as source?
>
> If you mean that under normal circumstances it can't receive another input
> about control messages, there are some ways to solve it.
>
> 1) Access external systems in your source to get or subscribe to control
> messages, such as Zookeeper.
> 2) If your source is followed by a map or process operator, then they can
> be chained together as a "big" source, then you can pass your control
> message via Flink's new feature "broadcast state". See this blog post for
> details.[1]
> 3) Mix control messages with normal messages in the same message flow.
> After the control message is parsed, the corresponding action is taken. Of
> course, this kind of program is not very recommended.
>
> [1]:
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>
> Thanks, vino.
>
> Addison Higham <addis...@gmail.com> 于2018年8月25日周六 上午12:46写道:
>
>> HI,
>>
>> I am writing a parallel source function that ideally needs to receive
>> some messages as control information (specifically, a state message on
>> where to start reading from a kinesis stream). As far as I can tell, there
>> isn't a way to make a sourceFunction receive input (which makes sense) so I
>> am thinking it makes sense to use a processFunction that will occasionally
>> receive control messages and mostly just output a lot of messages.
>>
>> This works from an API perspective, with a few different options, I could
>> either:
>>
>> A) have the processElement function block on calling the loop that will
>> produce messages or
>> B) have the processEelement function return (by pass the collector and
>> starting the reading on a different thread), but continue to produce
>> messages downstream
>>
>> This obviously does raise some concerns though:
>>
>> 1. Does this break any assumptions flink has of message lifecycle? Option
>> A of blocking on processElement for very long periods seems straight
>> forward but less than ideal, not to mention not being able to handle any
>> other control messages.
>>
>> However, I am not sure if a processFunction sending messages after the
>> processElement function has returned would break some expectations flink
>> has of operator lifeycles. Messages are also emitted by timers, etc, but
>> this would be completely outside any of those function calls as it is
>> started on another thread. This is obviously how most SourceFunctions work,
>> but it isn't clear if the same technique holds for ProcessFunctions
>>
>> 2. Would this have a negative impact on backpressure downstream? Since I
>> am still going to be using the same collector instance, it seems like it
>> should ultimately work, but I wonder if there are other details I am not
>> aware of.
>>
>> 3. Is this just a terrible idea in general? It seems like I could maybe
>> do this by implementing directly on top of an Operator, but I am not as
>> familiar with that API
>>
>> Thanks in advance for any thoughts!
>>
>> Addison
>>
>
>
>

Reply via email to