Re: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Jing Ge
the FLIP-238 to be accepted? > > -Original Message- > From: Qingsheng Ren > Sent: jueves, 9 de junio de 2022 12:16 > To: Sanabria, Carlos > Cc: user > Subject: Re: [External] Re: Source vs SourceFunction and testing > > Hi Carlos, > > FLIP-238 [1] is proposing

RE: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Sanabria, Carlos
? -Original Message- From: Qingsheng Ren Sent: jueves, 9 de junio de 2022 12:16 To: Sanabria, Carlos Cc: user Subject: Re: [External] Re: Source vs SourceFunction and testing Hi Carlos, FLIP-238 [1] is proposing a FLIP-27-based data generator source and I think this is what you are looking

Re: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Qingsheng Ren
class of > the job, and also create the test Sources and Sinks in the Junit tests, and > inject them in the AbstractDataStreamJob class. > > The problem comes with the new Source interface and the end to end tests > against the local embedded mini cluster. Prior to Flink 1.15, we use

RE: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Sanabria, Carlos
gainst the local embedded mini cluster. Prior to Flink 1.15, we used the FromElementsFunction to create the test SourceFunction. Now that we changed the code to use the new Source interface, we cannot use the FromElementsFunction anymore, and we haven't found an equivalent FromElementsSource

Re: SourceFunction

2022-06-08 Thread Jing Ge
: > Hello, > Is there plan to deprecate SourceFunction in favor of Source API? We have > custom SourceFunction based source, do we need to plan to rewrite it using > new Source API ? > > Thanks, > Alexey >

SourceFunction

2022-06-08 Thread Alexey Trenikhun
Hello, Is there plan to deprecate SourceFunction in favor of Source API? We have custom SourceFunction based source, do we need to plan to rewrite it using new Source API ? Thanks, Alexey

Re: Source vs SourceFunction and testing

2022-05-25 Thread Qingsheng Ren
:04 AM Qingsheng Ren wrote: > Hi Piotr, > > I’d like to share my understanding about this. Source and SourceFunction are > both interfaces to data sources. SourceFunction was designed and introduced > earlier and as the project evolved, many shortcomings emerged. Therefore, the > com

Re: Source vs SourceFunction and testing

2022-05-25 Thread Piotr Domagalski
acepalm: On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren wrote: > Hi Piotr, > > I’d like to share my understanding about this. Source and SourceFunction > are both interfaces to data sources. SourceFunction was designed and > introduced earlier and as the project evolved, many sh

Re: Source vs SourceFunction and testing

2022-05-24 Thread Qingsheng Ren
Hi Piotr, I’d like to share my understanding about this. Source and SourceFunction are both interfaces to data sources. SourceFunction was designed and introduced earlier and as the project evolved, many shortcomings emerged. Therefore, the community re-designed the source interface and

Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
he type system and being still confused about differences between > Source, SourceFunction, DataStream, DataStreamOperator, etc. > > I think the DataStream<> type is what I'm looking for? That is, then I can > use: > > DataStream source = env.fromSource(getKa

Re: Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi Ken, Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, navigating the type system and being still confused about differences between Source, SourceFunction, DataStream, DataStreamOperator, etc. I think the DataStream<> type is what I'm looking for? That is,

Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
ould > like to test later on with `MiniCluster`. > > I've looked at the flink-training repository examples [1] and they tend to > expose the main job as a class that accepts a `SourceFunction` and a > `SinkFunction`, which make sense. But then, my job is normally constructed &

Re: Source vs SourceFunction and testing

2022-05-24 Thread Aeden Jameson
; > I've looked at the flink-training repository examples [1] and they tend to > expose the main job as a class that accepts a `SourceFunction` and a > `SinkFunction`, which make sense. But then, my job is normally constructed > with `KafkaSource` which is then passed to `env.fr

Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi, I'm wondering: what ithe recommended way to structure the job which one would like to test later on with `MiniCluster`. I've looked at the flink-training repository examples [1] and they tend to expose the main job as a class that accepts a `SourceFunction` and a `SinkFunction`,

Re: How to access Task.isBackPressured() from a SourceFunction?

2022-02-14 Thread Niklas Semmler
Hi Darren, No, you cannot access the Task from the operator. You can access some metrics via the RuntimeContext. getRuntimeContext().getMetricGroup() How does the backpressure help you here? Backpressure can originate in any operator or network connection. If it's an operator further downstre

How to access Task.isBackPressured() from a SourceFunction?

2022-02-08 Thread Darren Whobrey
Hi, is there a way for the UDF of a source function, extended from RichParallelSourceFunction, to access its Task instance, so as to call Task.isBackPressured()? I'm trying to give priorities to different input sources that need to be managed from within the same source function and want to stop

Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread Ingo Bürk
Hi Oscar, I think you'll find your answers in [1], have a look at Yun's response a couple emails down. Basically, SourceFunction is the legacy source stack, and ideally you'd instead implement your source using the FLIP-27 stack[2] where you can directly define the boundedne

Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread 陳樺威
Sorry, there are some typos that may be misleading. The SourceFunction will be detected as* Streaming Mode.* 陳樺威 於 2021年6月3日 週四 下午1:29寫道: > Hi, > > Currently, we want to use batch execution mode [0] to consume historical > data and rebuild states for our streaming application. >

SourceFunction cannot run in Batch Mode

2021-06-02 Thread 陳樺威
. However, the function will be detected as Batch Mode. Our question is, how to implement a SourceFunction as a Bounded DataStream? Thanks! Oscar [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ [1] https://ci.apache.org/projects/flink/flink-docs

Re: Call run() of another SourceFunction inside run()?

2021-04-14 Thread Piotr Nowojski
Hi, I think it should be working. At least from the top of my head I do not see any reason why it shouldn't be working. Just make sure that you are proxying all relevant methods, not only those defined in `SourceFunction`. For example `FlinkKafkaConsumer` is implementing/exte

Call run() of another SourceFunction inside run()?

2021-04-14 Thread Schneider, Jochen
Hi! To work around FLINK-2491<https://issues.apache.org/jira/browse/FLINK-2491> which causes checkpointing issues for us I am trying to chain SourceFunctions so that the first one never quits. The basic idea is as follows: class WrappingSourceFunction(innerSourceFunction: SourceFunction

Re: Passing a custom SourceContext to a SourceFunction

2019-05-16 Thread Chesnay Schepler
You cannot control what kind of SourceContext is passed into your function. What are you trying to achieve? On 15/05/2019 09:30, Debasish Ghosh wrote: Hi - I have a custom SourceFunction .. class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] { def run(ctx: SourceContext[T

Passing a custom SourceContext to a SourceFunction

2019-05-15 Thread Debasish Ghosh
Hi - I have a custom SourceFunction .. class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] { def run(ctx: SourceContext[T]): Unit = { data.foreach(d ⇒ ctx.collect(d)) } } When this function is run during job execution, the SourceContext that gets passed serializes the data

Re: Flink Custom SourceFunction and SinkFunction

2019-03-04 Thread Piotr Nowojski
/sinks implementations. Piotrek > On 1 Mar 2019, at 09:39, Siew Wai Yow wrote: > > Hi guys, > > I have question regarding to the title that need your expertise, > > I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem > suitable? > I need to build

Flink Custom SourceFunction and SinkFunction

2019-03-01 Thread Siew Wai Yow
Hi guys, I have question regarding to the title that need your expertise, 1. I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem suitable? 2. I need to build a SFTP SinkFunction as well, may I know if per-defined HDFS rolling file sink accept SFTP connection since

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-12 Thread Aaron Levin
for reaching out! I'd love to take a step back and find a better > solution, so I'll try to be succint in what I'm trying to accomplish: > > We're trying to write a SourceFunction which: > * reads some Sequence files from S3 in a particular order (each task gets > f

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-09 Thread Aljoscha Krettek
/flink/commits/refactor-source-interface> > On 1. Nov 2018, at 16:50, Aaron Levin wrote: > > Hey, > > Thanks for reaching out! I'd love to take a step back and find a better > solution, so I'll try to be succint in what I'm trying to accomplish: > >

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aaron Levin
Hey, Thanks for reaching out! I'd love to take a step back and find a better solution, so I'll try to be succint in what I'm trying to accomplish: We're trying to write a SourceFunction which: * reads some Sequence files from S3 in a particular order (each task gets files

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aljoscha Krettek
InputSplit = { > if(nextSplit == null && !hasNext) { > throw new NoSuchElementException() > } > val tmp: InputSplit = nextSplit > nextSplit = null > tmp > } > > } > } > } > > > On Wed, Oct 24, 201

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aaron Levin
null) { > nextSplit = split > true > } else { > exhausted = true > false > } > } > > override def next(): InputSplit = { > if(nextSplit == null && !hasNext) { &

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-26 Thread Aaron Levin
tmp } } } } On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz wrote: > Hi Aaron, > > Could you share the code of you custom function? > > I am also adding Aljosha and Kostas to cc, who should be more helpful on > that topic. > > Best, > > Dawid >

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Aaron Levin
r to > initialize the underlying source correctly. > > > Best regards, > > Kien > > > On 10/20/2018 1:06 AM, Aaron Levin wrote: > > Hi, > > I'm writing a custom `SourceFunction` which wraps an underlying > `InputFormatSourceFunction`. When I try to use thi

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Kien Truong
to initialize the underlying source correctly. Best regards, Kien On 10/20/2018 1:06 AM, Aaron Levin wrote: Hi, I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource`

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Dawid Wysakowicz
Hi Aaron, Could you share the code of you custom function? I am also adding Aljosha and Kostas to cc, who should be more helpful on that topic. Best, Dawid On 19/10/2018 20:06, Aaron Levin wrote: > Hi, > > I'm writing a custom `SourceFunction` which wraps

Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-19 Thread Aaron Levin
Hi, I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID.

Re: Test harness for validating proper checkpointing of custom SourceFunction

2018-09-13 Thread Aljoscha Krettek
the (Keyed)(One|Two)InputStreamOperatorTestHarness classes to > test checkpointing of some custom functions. > > But in looking through the Flink source, I didn’t see anything comparable for > testing a custom SourceFunction (which implements the ListCheckpointed > interface). > > What’s the reco

Test harness for validating proper checkpointing of custom SourceFunction

2018-09-12 Thread Ken Krugler
Hi all, We’re using the (Keyed)(One|Two)InputStreamOperatorTestHarness classes to test checkpointing of some custom functions. But in looking through the Flink source, I didn’t see anything comparable for testing a custom SourceFunction (which implements the ListCheckpointed interface

Re: ClassNotFoundException in custom SourceFunction

2017-12-09 Thread romain.jln
Hi, The problem is that most of the exceptions appear when my job has been running for some hours. The only way for me to reproduce some of those errors is by using the web UI and hitting the cancel button of my job. So if I can find a way to generate this action locally in a test, maybe I can

Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread Aljoscha Krettek
Hi, Is it possible to go in there with a debugger and see where exactly the code is invoking the ClassLoader? Best, Aljoscha > On 8. Dec 2017, at 14:13, romain.jln wrote: > > Hi, > > FYI, I edited my message on the Nabble archive website because I realised I > sent the wrong stack trace at f

Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread romain.jln
Hi, FYI, I edited my message on the Nabble archive website because I realised I sent the wrong stack trace at first (but I don't know if you've noticed the modification). The first one was actually related to a custom Sink function that sends data to the Eventhub (not sure whether they are related

Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread Aljoscha Krettek
Hi, Is the code that is throwing the exception trying to use the Tread Context ClassLoader? If yes, that might explain it because a Thread that you create will not have the correct ClassLoader set. Best, Aljoscha > On 8. Dec 2017, at 12:24, Fabian Hueske wrote: > > Hi, > > thanks a lot for

Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread Fabian Hueske
Hi, thanks a lot for investigating this problems and the results you shared. This looks like a bug to me. I'm CCing Aljoscha who knows the internals of the DataStream API very well. Which Flink version are you using? Would you mind creating a JIRA issue [1] with all the info you provided so far?

Re: ClassNotFoundException in custom SourceFunction

2017-12-08 Thread romain.jln
Hi, The stack trace is usually something like : Exception in thread "Thread-49" java.lang.NoClassDefFoundError: com/microsoft/azure/eventhubs/amqp/AmqpErrorCode at com.microsoft.azure.eventhubs.ExceptionUtil.toException(ExceptionUtil.java:30) at com.microsoft.azure.eventhubs.Messa

Re: ClassNotFoundException in custom SourceFunction

2017-12-07 Thread Fabian Hueske
Hi, A ClassNotFoundException should not be expected behavior. Can you post the stacktrace of the exception? We had a few issues in the past where Flink didn't use the correct classloader. So this would not be an unusual bug. Thanks, Fabian 2017-12-07 10:44 GMT+01:00 Tugdual Grall : > ok > > On

Re: ClassNotFoundException in custom SourceFunction

2017-12-07 Thread Tugdual Grall
ok On Thu, Dec 7, 2017 at 10:35 AM, romain.jln wrote: > Hi all, > > I am experiencing some problems with a custom source that I have > implemented. I am getting some ClassNotFoundException randomly during the > execution of the job meanwhile the fat jar submitted to Flink contains the > given cl

ClassNotFoundException in custom SourceFunction

2017-12-07 Thread romain.jln
Hi all, I am experiencing some problems with a custom source that I have implemented. I am getting some ClassNotFoundException randomly during the execution of the job meanwhile the fat jar submitted to Flink contains the given classes. After several hours of investigation, I think I have been ab

Re: Freeing resources in SourceFunction

2016-11-03 Thread Maximilian Michels
For your use case you should use the close() method which is always called upon shutdown of your source. The cancel() is only called when you explicitly cancel your job. -Max On Thu, Nov 3, 2016 at 2:45 PM, Yury Ruchin wrote: > Hello, > > I'm writing a custom source function for my streaming jo

Freeing resources in SourceFunction

2016-11-03 Thread Yury Ruchin
Hello, I'm writing a custom source function for my streaming job. The source function manages some connection pool. I want to close that pool once my job is "finished" (since the stream is unbounded, the only way I see is to cancel the streaming job). Since I inherit RichSourceFunction, there are

Re: SourceFunction Scala

2016-03-12 Thread Stefano Baghino
Source(args)) >> >> ^ >> >> Error:(14, 31) not enough arguments for method addSource: (implicit >> evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit >> evidence$16: >> org.apache.flink.api.common.typeinfo.TypeInformation

Re: SourceFunction Scala

2016-03-07 Thread Ankur Sharma
evidence$16. > val stream = env.addSource(new QueryOneSource(args)) > ^ > > class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] { > val nextTuple: Tuple // case class Tuple(id: Long, value: Int) > override de

Re: SourceFunction Scala

2016-03-06 Thread Márton Balassi
m[org.mpi.debs.Tuple]. > > Unspecified value parameter evidence$16. > > val stream = env.addSource(new QueryOneSource(args)) > > ^ > > > class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] { > > val nextT

SourceFunction Scala

2016-03-06 Thread Ankur Sharma
)) ^ class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] { val nextTuple: Tuple // case class Tuple(id: Long, value: Int) override def run(ctx: SourceContext[Tuple]) = { while (true) { nextRecord() ctx.collect(this.nextTuple) } } override def cancel