the FLIP-238 to be accepted?
>
> -Original Message-
> From: Qingsheng Ren
> Sent: jueves, 9 de junio de 2022 12:16
> To: Sanabria, Carlos
> Cc: user
> Subject: Re: [External] Re: Source vs SourceFunction and testing
>
> Hi Carlos,
>
> FLIP-238 [1] is proposing
?
-Original Message-
From: Qingsheng Ren
Sent: jueves, 9 de junio de 2022 12:16
To: Sanabria, Carlos
Cc: user
Subject: Re: [External] Re: Source vs SourceFunction and testing
Hi Carlos,
FLIP-238 [1] is proposing a FLIP-27-based data generator source and I think
this is what you are looking
class of
> the job, and also create the test Sources and Sinks in the Junit tests, and
> inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests
> against the local embedded mini cluster. Prior to Flink 1.15, we use
gainst the local embedded mini cluster. Prior to Flink 1.15, we used the
FromElementsFunction to create the test SourceFunction. Now that we changed the
code to use the new Source interface, we cannot use the FromElementsFunction
anymore, and we haven't found an equivalent FromElementsSource
:
> Hello,
> Is there plan to deprecate SourceFunction in favor of Source API? We have
> custom SourceFunction based source, do we need to plan to rewrite it using
> new Source API ?
>
> Thanks,
> Alexey
>
Hello,
Is there plan to deprecate SourceFunction in favor of Source API? We have
custom SourceFunction based source, do we need to plan to rewrite it using new
Source API ?
Thanks,
Alexey
:04 AM Qingsheng Ren wrote:
> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction are
> both interfaces to data sources. SourceFunction was designed and introduced
> earlier and as the project evolved, many shortcomings emerged. Therefore, the
> com
acepalm:
On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren wrote:
> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction
> are both interfaces to data sources. SourceFunction was designed and
> introduced earlier and as the project evolved, many sh
Hi Piotr,
I’d like to share my understanding about this. Source and SourceFunction are
both interfaces to data sources. SourceFunction was designed and introduced
earlier and as the project evolved, many shortcomings emerged. Therefore, the
community re-designed the source interface and
he type system and being still confused about differences between
> Source, SourceFunction, DataStream, DataStreamOperator, etc.
>
> I think the DataStream<> type is what I'm looking for? That is, then I can
> use:
>
> DataStream source = env.fromSource(getKa
Hi Ken,
Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.
I think the DataStream<> type is what I'm looking for? That is,
ould
> like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
&
;
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fr
Hi,
I'm wondering: what ithe recommended way to structure the job which one
would like to test later on with `MiniCluster`.
I've looked at the flink-training repository examples [1] and they tend to
expose the main job as a class that accepts a `SourceFunction` and a
`SinkFunction`,
Hi Darren,
No, you cannot access the Task from the operator. You can access some metrics
via the RuntimeContext.
getRuntimeContext().getMetricGroup()
How does the backpressure help you here? Backpressure can originate in any
operator or network connection. If it's an operator further downstre
Hi, is there a way for the UDF of a source function, extended from
RichParallelSourceFunction, to access its Task instance, so as to call
Task.isBackPressured()?
I'm trying to give priorities to different input sources that need to be
managed from within the same source function and want to stop
Hi Oscar,
I think you'll find your answers in [1], have a look at Yun's response a
couple emails down. Basically, SourceFunction is the legacy source stack,
and ideally you'd instead implement your source using the FLIP-27 stack[2]
where you can directly define the boundedne
Sorry, there are some typos that may be misleading.
The SourceFunction will be detected as* Streaming Mode.*
陳樺威 於 2021年6月3日 週四 下午1:29寫道:
> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
>
.
However, the function will be detected as Batch Mode.
Our question is, how to implement a SourceFunction as a Bounded DataStream?
Thanks!
Oscar
[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
[1]
https://ci.apache.org/projects/flink/flink-docs
Hi,
I think it should be working. At least from the top of my head I do not see
any reason why it shouldn't be working.
Just make sure that you are proxying all relevant methods, not only those
defined in `SourceFunction`. For example `FlinkKafkaConsumer` is
implementing/exte
Hi!
To work around FLINK-2491<https://issues.apache.org/jira/browse/FLINK-2491>
which causes checkpointing issues for us I am trying to chain SourceFunctions
so that the first one never quits. The basic idea is as follows:
class WrappingSourceFunction(innerSourceFunction: SourceFunction
You cannot control what kind of SourceContext is passed into your function.
What are you trying to achieve?
On 15/05/2019 09:30, Debasish Ghosh wrote:
Hi -
I have a custom SourceFunction ..
class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
def run(ctx: SourceContext[T
Hi -
I have a custom SourceFunction ..
class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
def run(ctx: SourceContext[T]): Unit = {
data.foreach(d ⇒ ctx.collect(d))
}
}
When this function is run during job execution, the SourceContext that gets
passed serializes the data
/sinks
implementations.
Piotrek
> On 1 Mar 2019, at 09:39, Siew Wai Yow wrote:
>
> Hi guys,
>
> I have question regarding to the title that need your expertise,
>
> I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem
> suitable?
> I need to build
Hi guys,
I have question regarding to the title that need your expertise,
1. I need to build a SFTP SourceFunction, may I know if hadoop
SFTPFileSystem suitable?
2. I need to build a SFTP SinkFunction as well, may I know if per-defined
HDFS rolling file sink accept SFTP connection since
for reaching out! I'd love to take a step back and find a better
> solution, so I'll try to be succint in what I'm trying to accomplish:
>
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets
> f
/flink/commits/refactor-source-interface>
> On 1. Nov 2018, at 16:50, Aaron Levin wrote:
>
> Hey,
>
> Thanks for reaching out! I'd love to take a step back and find a better
> solution, so I'll try to be succint in what I'm trying to accomplish:
>
>
Hey,
Thanks for reaching out! I'd love to take a step back and find a better
solution, so I'll try to be succint in what I'm trying to accomplish:
We're trying to write a SourceFunction which:
* reads some Sequence files from S3 in a particular order (each task gets
files
InputSplit = {
> if(nextSplit == null && !hasNext) {
> throw new NoSuchElementException()
> }
> val tmp: InputSplit = nextSplit
> nextSplit = null
> tmp
> }
>
> }
> }
> }
>
>
> On Wed, Oct 24, 201
null) {
> nextSplit = split
> true
> } else {
> exhausted = true
> false
> }
> }
>
> override def next(): InputSplit = {
> if(nextSplit == null && !hasNext) {
&
tmp
}
}
}
}
On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz
wrote:
> Hi Aaron,
>
> Could you share the code of you custom function?
>
> I am also adding Aljosha and Kostas to cc, who should be more helpful on
> that topic.
>
> Best,
>
> Dawid
>
r to
> initialize the underlying source correctly.
>
>
> Best regards,
>
> Kien
>
>
> On 10/20/2018 1:06 AM, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use thi
to
initialize the underlying source correctly.
Best regards,
Kien
On 10/20/2018 1:06 AM, Aaron Levin wrote:
Hi,
I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction`
in a stream (via `env.addSource`
Hi Aaron,
Could you share the code of you custom function?
I am also adding Aljosha and Kostas to cc, who should be more helpful on
that topic.
Best,
Dawid
On 19/10/2018 20:06, Aaron Levin wrote:
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps
Hi,
I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
stream (via `env.addSource` and a subsequent sink) I get errors related to
the `InputSplitAssigner` not being initialized for a particular vertex ID.
the (Keyed)(One|Two)InputStreamOperatorTestHarness classes to
> test checkpointing of some custom functions.
>
> But in looking through the Flink source, I didn’t see anything comparable for
> testing a custom SourceFunction (which implements the ListCheckpointed
> interface).
>
> What’s the reco
Hi all,
We’re using the (Keyed)(One|Two)InputStreamOperatorTestHarness classes to test
checkpointing of some custom functions.
But in looking through the Flink source, I didn’t see anything comparable for
testing a custom SourceFunction (which implements the ListCheckpointed
interface
Hi,
The problem is that most of the exceptions appear when my job has been
running for some hours.
The only way for me to reproduce some of those errors is by using the web UI
and hitting the cancel button of my job. So if I can find a way to generate
this action locally in a test, maybe I can
Hi,
Is it possible to go in there with a debugger and see where exactly the code is
invoking the ClassLoader?
Best,
Aljoscha
> On 8. Dec 2017, at 14:13, romain.jln wrote:
>
> Hi,
>
> FYI, I edited my message on the Nabble archive website because I realised I
> sent the wrong stack trace at f
Hi,
FYI, I edited my message on the Nabble archive website because I realised I
sent the wrong stack trace at first (but I don't know if you've noticed the
modification). The first one was actually related to a custom Sink function
that sends data to the Eventhub (not sure whether they are related
Hi,
Is the code that is throwing the exception trying to use the Tread Context
ClassLoader? If yes, that might explain it because a Thread that you create
will not have the correct ClassLoader set.
Best,
Aljoscha
> On 8. Dec 2017, at 12:24, Fabian Hueske wrote:
>
> Hi,
>
> thanks a lot for
Hi,
thanks a lot for investigating this problems and the results you shared.
This looks like a bug to me. I'm CCing Aljoscha who knows the internals of
the DataStream API very well.
Which Flink version are you using?
Would you mind creating a JIRA issue [1] with all the info you provided so
far?
Hi,
The stack trace is usually something like :
Exception in thread "Thread-49" java.lang.NoClassDefFoundError:
com/microsoft/azure/eventhubs/amqp/AmqpErrorCode
at
com.microsoft.azure.eventhubs.ExceptionUtil.toException(ExceptionUtil.java:30)
at
com.microsoft.azure.eventhubs.Messa
Hi,
A ClassNotFoundException should not be expected behavior.
Can you post the stacktrace of the exception?
We had a few issues in the past where Flink didn't use the correct
classloader.
So this would not be an unusual bug.
Thanks,
Fabian
2017-12-07 10:44 GMT+01:00 Tugdual Grall :
> ok
>
> On
ok
On Thu, Dec 7, 2017 at 10:35 AM, romain.jln wrote:
> Hi all,
>
> I am experiencing some problems with a custom source that I have
> implemented. I am getting some ClassNotFoundException randomly during the
> execution of the job meanwhile the fat jar submitted to Flink contains the
> given cl
Hi all,
I am experiencing some problems with a custom source that I have
implemented. I am getting some ClassNotFoundException randomly during the
execution of the job meanwhile the fat jar submitted to Flink contains the
given classes.
After several hours of investigation, I think I have been ab
For your use case you should use the close() method which is always
called upon shutdown of your source. The cancel() is only called when
you explicitly cancel your job.
-Max
On Thu, Nov 3, 2016 at 2:45 PM, Yury Ruchin wrote:
> Hello,
>
> I'm writing a custom source function for my streaming jo
Hello,
I'm writing a custom source function for my streaming job. The source
function manages some connection pool. I want to close that pool once my
job is "finished" (since the stream is unbounded, the only way I see is to
cancel the streaming job). Since I inherit RichSourceFunction, there are
Source(args))
>>
>> ^
>>
>> Error:(14, 31) not enough arguments for method addSource: (implicit
>> evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit
>> evidence$16:
>> org.apache.flink.api.common.typeinfo.TypeInformation
evidence$16.
> val stream = env.addSource(new QueryOneSource(args))
> ^
>
> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
> override de
m[org.mpi.debs.Tuple].
>
> Unspecified value parameter evidence$16.
>
> val stream = env.addSource(new QueryOneSource(args))
>
> ^
>
>
> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
>
> val nextT
))
^
class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
while (true) {
nextRecord()
ctx.collect(this.nextTuple)
}
}
override def cancel
52 matches
Mail list logo