Re: Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Yun Gao
+1 for removing the methods that are deprecated for a while & have alternative 
methods.

One specific thing is that if we remove the DataStream#split, do we consider 
enabling side-output in more operators in the future ? Currently it should be 
only available in ProcessFunctions, but not available to other commonly used 
UDF like Source or AsyncFunction[1].

One temporary solution occurs to me is to add a ProcessFunction after the 
operators want to use side-output. But I think the solution is not very direct 
to come up with and if it really works we might add it to the document of 
side-output. 

[1] https://issues.apache.org/jira/browse/FLINK-7954

Best,
 Yun


 --Original Mail --
Sender:Kostas Kloudas 
Send Date:Tue Aug 18 03:52:44 2020
Recipients:Dawid Wysakowicz 
CC:dev , user 
Subject:Re: [DISCUSS] Removing deprecated methods from DataStream API
+1 for removing them.



From a quick look, most of them (not all) have been deprecated a long time ago.



Cheers,

Kostas



On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz  wrote:

>

> @David Yes, my idea was to remove any use of fold method and all related 
> classes including WindowedStream#fold

>

> @Klou Good idea to also remove the deprecated enableCheckpointing() & 
> StreamExecutionEnvironment#readFile and alike. I did another pass over some 
> of the classes and thought we could also drop:

>

> ExecutionConfig#set/getCodeAnalysisMode

> ExecutionConfig#disable/enableSysoutLogging

> ExecutionConfig#set/isFailTaskOnCheckpointError

> ExecutionConfig#isLatencyTrackingEnabled

>

> As for the `forceCheckpointing` I am not fully convinced to doing it. As far 
> as I know iterations still do not participate in checkpointing correctly. 
> Therefore it still might make sense to force it. In other words there is no 
> real alternative to that method. Unless we only remove the methods from 
> StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. 
> WDYT?

>

> An updated list of methods I suggest to remove:

>

> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)

> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)

> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)

> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)

> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)

> RuntimeContext#getAllAccumulators (deprecated in 0.10)

> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)

> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)

> DataStream#split (deprecated in 1.8)

> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)

>

> Bear in mind that majority of the options listed above in ExecutionConfig 
> take no effect. They were left there purely to satisfy the binary 
> compatibility. Personally I don't see any benefit of leaving a method and 
> silently dropping the underlying feature. The only configuration that is 
> respected is setting the number of execution retries.

>

> I also wanted to make it explicit that most of the changes above would result 
> in a binary incompatibility and require additional exclusions in the japicmp. 
> Those are:

>

> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)

> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)

> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)

> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)

> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)

> DataStream#split (deprecated in 1.8)

> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)

> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)

>

> Looking forward to more opinions on the issue.

>

> Best,

>

> Dawid

>

>

> On 17/08/2020 12:49, Kostas Kloudas wrote:

>

> Thanks a lot for starting this Dawid,

>

> Big +1 for the proposed clean-up, and I would also add the deprecated

> methods of the StreamExecutionEnvironment like:

>

> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)

> enableCheckpointing()

> isForceCheckpointing()

>

> readFile(FileInputFormat inputFormat,String

> filePath,FileProcessingMode watchType,long interval, FilePathFilter

> fi

Re: Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Yun Gao
Hi, 

Very thanks for bringing up this discussion!

One more question is that does the BATCH and STREAMING mode also decides 
the shuffle types and operators? I'm asking so because that even for blocking 
mode, it should also benefit from keeping some edges to be pipeline if the 
resources are known to be enough. Do we also consider to expose more 
fine-grained control on the shuffle types? 

Best,
 Yun 



 --Original Mail --
Sender:Kostas Kloudas 
Send Date:Tue Aug 18 02:24:21 2020
Recipients:David Anderson 
CC:dev , user 
Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of 
> recognizing the need some users may have for "batch" and "bounded streaming" 
> to be treated differently. If I've understood it correctly, the section on 
> scheduling allows me to choose STREAMING scheduling even if I have bounded 
> sources. I like that approach, because it recognizes that even though I have 
> bounded inputs, I don't necessarily want batch processing semantics. I think 
> it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to 
> run exactly the same job as in production, except with different sources and 
> sinks. While it might be a reasonable default, I'm not convinced that 
> switching a processing time streaming job to read from a bounded source 
> should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] 
>> https://cwik

Re: SDK vs Connectors

2020-08-22 Thread Yun Gao
Hi Prasanna,

   1) Semantically both a) and b) would be Ok. If the Custom sink could be 
chained with the map operator (I assume the map operator is the "Processing" in 
the graph), there should be also no much difference physically, if they could 
not chain, then writting a custom sink would cause another pass of network 
transferring, but the custom sink would be run in a different thread, thus much 
more computation resources could be exploited. 
   2) To achieve at-least-once, you need to implment the "CheckpointedFunction" 
interface, and ensures flushing all the data to the outside systems when 
snapshotting states. Since if the checkpointing succeed, the previous data will 
not be replayed after failover, thus these pieces of data need to be ensured 
written out before the checkpoint succeeds.
   3) From my side I don't think there are significant disadvantages of writing 
custom sink functions. 

Best,
 Yun


--
Sender:Prasanna kumar
Date:2020/08/22 02:00:51
Recipient:user; 
Theme:SDK vs Connectors

Hi Team,

Following is the pipeline 
Kafka => Processing => SNS Topics .

Flink Does not provide a SNS connector out of the box. 

a) I implemented the above by using AWS SDK and published the messages in the 
Map operator itself.  
The pipeline is working well. I see messages flowing to SNS topics.

b) Another approach is that I could write a custom sink function and still 
publish to SNS using SDK in this stage. 

Questions
1) What would be the primary difference between approach a) and b). Is there 
any significant advantage of one over the other ?

2) Would at least once guarantee be confirmed if we follow the above approach?

3) Would there be any significant disadvantages(rather what we need to be 
careful ) of writing our custom sink functions ?

Thanks,
Prasanna. 


Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yun Gao
Congratulations Dian !

 Best
 Yun


--
Sender:Marta Paes Moreira
Date:2020/08/27 17:42:34
Recipient:Yuan Mei
Cc:Xingbo Huang; jincheng sun; 
dev; Dian Fu; 
user; user-zh
Theme:Re: [ANNOUNCE] New PMC member: Dian Fu

Congrats, Dian!
On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

Congrats!
On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

Congratulations Dian!

Best,
Xingbo
jincheng sun  于2020年8月27日周四 下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various important 
features, such as the Python UDF and Pandas integration, and keeps checking and 
voting for our releases, and also has successfully produced two 
releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)


Re: Implementation of setBufferTimeout(timeoutMillis)

2020-08-30 Thread Yun Gao
Hi Pankaj,

I think it should be in 
org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.

Best,
 Yun



--
Sender:Pankaj Chand
Date:2020/08/31 02:40:15
Recipient:user
Theme:Implementation of setBufferTimeout(timeoutMillis)

Hello,

The documentation gives the following two sample lines for setting the buffer 
timeout for the streaming environment or transformation.

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

I have been trying to find where (file and method) in the Flink source code are 
the buffers being flushed by iteratively referring to the value of 
timeoutMillis (or the default value), but have been unsuccessful. Please help.

Thanks,

Pankaj 


Re: Exception on s3 committer

2020-08-30 Thread Yun Gao

Hi Ivan,

   I think there might be some points to check:

   1. Is the job restored from the latest successful checkpoint after restart ? 
   2. Have you ever changed the timeout settings for uncompleted multipart 
upload ?
   3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
exist or not ?

Best,
 Yun


 --Original Mail --
Sender:Ivan Yang 
Send Date:Sat Aug 29 12:43:28 2020
Recipients:user 
Subject:Exception on s3 committer
Hi all,

We got this exception after a job restart. Does anyone know what may lead to 
this situation? and how to get pass this Checkpoint issue? Prior to this, the 
job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat 
to various s3 prefixes. Thanks in advance. -Ivan

2020-08-28 15:17:58
java.io.IOException: Recovering commit failed for object 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does 
not exist and MultiPart Upload 
3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
 is not valid.
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804: 
com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does 
not exist. The upload ID may be invalid, or the upload may have been aborted or 
completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; 
Request ID: 9A99AFAD80A8F202; S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=), 
S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUplo

Re: Re: Exception on s3 committer

2020-09-01 Thread Yun Gao
Hi Ivan,

   For flink sink it might commit a single file multiple times. This happens if 
there are failover after committing one file and then met with a failover, the 
the job will restarted from the latest checkpoint, and the file's state will be 
get back to pending and committed again. In the normal case, when re-committing 
a file failed Flink will first check if the file exists, if it exists, it will 
think it as committed before and ignore the error and continue. 

However, based on the current description, it seems that there is another job 
which will delete the committed files. Then if it is deleted after the first 
commit but before recommitting the file, the file will be detected to be not 
exists when recomming and thus will cause an error.


 --Original Mail --
Sender:Ivan Yang 
Send Date:Tue Sep 1 08:06:55 2020
Recipients:Yun Gao 
CC:user 
Subject:Re: Exception on s3 committer
Hi Yun,

Thank you so much for you suggestion.

(1) The job couldn’t restore from the last checkpoint. The exception is in my 
original email.
(2) No, I didn’t change any multipart upload settings. 
(3) The file is gone. I have another batch process that reads Flink output s3 
bucket and pushes objects to another bucket. Upon success read and write, The 
batch job will delete the file. What’s puzzling me is if Flink hasn’t 
successfully commit the multipart file, it should not be visible to the batch 
job. It looks the situation is while Flink tried to commit the multipart file, 
it crashed and restarted. The file is committed on s3 successfully, but not 
acknowledge recorded on Flink side. In between, the batch job consumed the 
file. I don’t know if that’s possible.

Thanks
Ivan


On Aug 30, 2020, at 11:10 PM, Yun Gao  wrote:

Hi Ivan,

   I think there might be some points to check:

   1. Is the job restored from the latest successful checkpoint after restart ? 
   2. Have you ever changed the timeout settings for uncompleted multipart 
upload ?
   3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
exist or not ?

Best,
 Yun


 --Original Mail --
Sender:Ivan Yang 
Send Date:Sat Aug 29 12:43:28 2020
Recipients:user 
Subject:Exception on s3 committer
Hi all,

We got this exception after a job restart. Does anyone know what may lead to 
this situation? and how to get pass this Checkpoint issue? Prior to this, the 
job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat 
to various s3 prefixes. Thanks in advance. -Ivan

2020-08-28 15:17:58
java.io.IOException: Recovering commit failed for object 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does 
not exist and MultiPart Upload 
3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
 is not valid.
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 

Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 Thread Yun Gao
Great! Very thanks @ZhuZhu for driving this and thanks for all contributed to 
the release!

Best,
 Yun


 --Original Mail --
Sender:Jingsong Li 
Send Date:Thu Sep 17 13:31:41 2020
Recipients:user-zh 
CC:dev , user , Apache Announce 
List 
Subject:Re: [ANNOUNCE] Apache Flink 1.11.2 released

Thanks ZhuZhu for driving the release.

Best,
Jingsong
On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink(r) is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


-- 
Best, Jingsong Lee

[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-08 Thread Yun Gao
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-08 Thread Yun Gao
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not s​upport checkpoints after some tasks finished, which causes 
some problems for bounded or mixed jobs:
1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be 
replayed before committed to external systems in streaming mode. If sources are 
bounded and checkpoints are disabled after some tasks are finished, the data 
sent after the last checkpoint would always not be able to be committed. This 
issue has already been reported some times in the user ML[2][3][4] and is 
future brought up when working on FLIP-143: Unified Sink API [5]. 
2. The jobs with both bounded and unbounded sources might have to 
replay a large amount of records after failover due to no periodic checkpoints 
are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 --Original Mail --
Sender:Yun Gao 
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev , User-Flink 
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Yun Gao
 exit directly? The new Source 
API would terminate directly since there is no pending splits and the legacy 
sources would be dealt specially by skipped execution if the source operator is 
fully finished before. We would be able to turn to the final solution gradually 
in the next steps. 

Best,
Yun


--
From:Arvid Heise 
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao 
Cc:Flink Dev ; User-Flink 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing 
issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit 
closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would 
make the sinks the root nodes. That is very similar to how graphs in relational 
algebra are labeled. However, I got the feeling that in Flink, we rather 
iterate from sources to sink, making the sources root nodes and the sinks the 
leaf nodes. However, I have no clue how it's done in similar cases, so please 
take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in 
CheckpointCoordinator. Let's assume that we inject the barrier at all root 
subtasks (initially all sources). So in the iterative algorithm, whenever root 
A finishes, it looks at all connected subtasks B if they have any upstream task 
left. If not B becomes a new root. That would require to only touch a part of 
the job graph, but would require some callback from JobManager to 
CheckpointCoordinator. 
2b) We also need to be careful for out-of-sync updates: if the root is about to 
finish, we could send the barrier to it from CheckpointCoordinator, but at the 
time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at 
EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
ambiguous: What happens when an unaligned checkpoint is started and then one 
input channel contains the EndOfPartition event? From the written description, 
it sounds to me like, we move back to an aligned checkpoint for the whole 
receiving task. However, that is neither easily possible nor necessary. Imho it 
would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we 
recover an incomplete DAG. Afaik the subtasks are deployed before the state is 
recovered, so at some point, the subtasks either need to be removed again or 
maybe we could even avoid them being created in the first place.

[1] https://issues.apache.org/jira/browse/FLINK-2491
On Fri, Oct 9, 2020 at 8:22 AM Yun Gao  wrote:

Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be 
replayed before committed to external systems in streaming mode. If sources are 
bounded and checkpoints are disabled after some tasks are finished, the data 
sent after the last checkpoint would always not be able to be committed. This 
issue has already been reported some times in the user ML[2][3][4] and is 
future brought up when working on FLIP-143: Unified Sink API [5]. 
2. The jobs with both bounded and unbounded sources might have to 
replay a large amount of records after failover due to no periodic checkpoints 
are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 --Original Mail --
Sender:Yun Gao 
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev , User-Flink 
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finishe

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely 
>>> thought it through. But in general, I'm currently at the point where I 
>>> think that we also need non-checkpoint related events in unaligned 
>>> checkpoints. So just keep that in mind, that we might converge anyhow at 
>>> this point.
I also agree with that it would be better to keep the unaligned checkpoints 
behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no 
>>> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we 
>>> can completely ignore the problem on how to store and restore output 
>>> buffers of a completed task (also important for the next point).
Exactly, we should not need to persist the output buffers for the completed 
tasks, and that would simply the implementation a lot.

>>> 5) I think we are on the same page and I completely agree that for the 
>>> MVP/first version, it's completely fine to start and immediately stop. A 
>>> tad better would be even to not even start the procession loop. 
I also agree with this part. We would keep optimizing the implementation after 
the first version. 

Best,
Yun   




--
From:Arvid Heise 
Send Time:2020 Oct. 13 (Tue.) 03:39
To:Yun Gao 
Cc:Flink Dev ; User-Flink 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely thought 
it through. But in general, I'm currently at the point where I think that we 
also need non-checkpoint related events in unaligned checkpoints. So just keep 
that in mind, that we might converge anyhow at this point.

In general, what is helping in this case is to remember that there no unaligned 
checkpoint barrier ever going to overtake EndOfPartition. So, we can completely 
ignore the problem on how to store and restore output buffers of a completed 
task (also important for the next point).

5) I think we are on the same page and I completely agree that for the 
MVP/first version, it's completely fine to start and immediately stop. A tad 
better would be even to not even start the procession loop. 

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao  wrote:

Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue 
under the quota: 
>> 1) You call the tasks that get the barriers injected leaf nodes, which would 
>> make the > sinks the root nodes. That is very similar to how graphs in 
>> relational algebra are labeled. However, I got the feeling that in Flink, we 
>> rather iterate from sources to sink, making the sources root nodes and the 
>> sinks the leaf nodes. However, I have no clue how it's done in similar 
>> cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in 
>> CheckpointCoordinator. Let's assume that we inject the barrier at all root 
>> subtasks (initially all sources). So in the iterative algorithm, whenever 
>> root A finishes, it looks at all connected subtasks B if they have any 
>> upstream task left. If not B becomes a new root. That would require to only 
>> touch a part of the job graph, but would require some callback from 
>> JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we 
should have the same thoughts that we start with the source nodes to find all 
the nodes whose precedent nodes are all finished. It would be much better to 
call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP 
to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about 
>> to finish, we could send the barrier to it from CheckpointCoordinator, but 
>> at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, 
it may then further check if the task has been finished, if so, it should then 
re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at 
>> EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on 
EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
>> ambiguous: What happens when an unaligned checkpoint is started and then one 
>> input channel contains the EndOfPartition e

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
Hi Till,
Very thanks for the feedbacks !
> 1) When restarting all tasks independent of the status at checkpoint time 
> (finished, running, scheduled), we might allocate more resources than we 
> actually need to run the remaining job. From a scheduling perspective it 
> would be easier if we already know that certain subtasks don't need to be 
> rescheduled. I believe this can be an optimization, though.
> 2) In the section Compatibility, Deprecation and Migration Plan you mentioned 
> that you want to record operators in the CompletedCheckpoint which are fully 
> finished. How will this information be used for constructing a recovered 
> ExecutionGraph? Why wouldn't the same principle work for the task level?

I think the first two issues should be related. The main reason that with 
external checkpoints the checkpoint might taken from one job and used in 
another jobs, but we do not have a unique ID to match tasks across jobs. 
Furthermore, users may also change the parallelism of JobVertex, or even modify 
the graph structures by adding/removing operators or changing the chain 
relationship between operators. 
On the other side, currently Flink already provides custom UID for operators, 
which makes the operators a stable unit for recovery. The current checkpoints 
are also organized in the unit of operators to support rescale and job 
Upgrading. 
When restarting from a checkpoint with finished operators, we could only starts 
the tasks with operators that are not fully finished (namely some subtasks are 
still running when taking checkpoints). Then during the execution of a single 
task, we only initialize/open/run/close the operators not fully finished. The 
Scheduler should be able to compute if a tasks contains not fully finished 
operators with the current JobGraph and the operator finish states restored 
from the checkpoints.

> 3) How will checkpointing work together with fully bounded jobs and FLIP-1 
> (fine grained recovery)?
Currently I think it should be compatible with fully bounded jobs and FLIP-1 
since it could be viewed as a completion of the current checkpoint mechanism. 
Concretely
1. The batch job (with blocking execution mode) should be not affected since 
checkpoints are not enabled in this case.
2. The bounded job running with pipeline mode would be also supported with 
checkpoints during it is finishing with the modification. As discussed in the 
FLIP it should not affect the current behavior after restored for almost all 
the jobs.
3. The region failover and more fine-grained tasks should also not be affected: 
similar to the previous behavior, after failover, the failover policy 
(full/region/fine-grained) decides which tasks to restart and the checkpoint 
only decides what state are restored for these tasks. The only difference with 
this modification is that these tasks are now might restored from a checkpoints 
taken after some tasks are finished. Since the perviously finished tasks would 
always be skipped by not started or run an empty execution, and the behavior of 
the previously running tasks should keeps unchanged, the overall behavior 
should be not affected.


Best,
Yun


--
From:Till Rohrmann 
Send Time:2020 Oct. 13 (Tue.) 17:25
To:Yun Gao 
Cc:Arvid Heise ; Flink Dev ; 
User-Flink 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time 
(finished, running, scheduled), we might allocate more resources than we 
actually need to run the remaining job. From a scheduling perspective it would 
be easier if we already know that certain subtasks don't need to be 
rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you mentioned 
that you want to record operators in the CompletedCheckpoint which are fully 
finished. How will this information be used for constructing a recovered 
ExecutionGraph? Why wouldn't the same principle work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1 
(fine grained recovery)?

Cheers,
Till
On Tue, Oct 13, 2020 at 9:30 AM Yun Gao  wrote:

Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely 
>>> thought it through. But in general, I'm currently at the point where I 
>>> think that we also need non-checkpoint related events in unaligned 
>>> checkpoints. So just keep that in mind, that we might converge anyhow at 
>>> this point.
I also agree with that it would be better to keep the unaligned checkpoints 
behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this c

Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
Hi Alexander, 

The signature of the createRemoteEnvironment is 
public static StreamExecutionEnvironment createRemoteEnvironment(
  String host, int port, String... jarFiles);
Which could also ship the jars to execute to remote cluster. Could you have a 
try to also pass the jar files to the remote environment ?

Best,
 Yun
--
Sender:Alexander Bagerman
Date:2020/10/29 10:43:16
Recipient:
Theme:How to deploy dynamically generated flink jobs?

Hi,
I am trying to build a functionality to dynamically configure a flink job 
(Java) in my code based on some additional metadata and submit it to a flink 
running in a session cluster.
Flink version is 1.11.2
The problem I have is how to provide a packed job to the cluster. When I am 
trying the following code
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);
... configuring job workflow here...
env.execute(jobName);

I am getting ClassNotFoundException stating that code for my mapping functions 
did not make it to the cluster. Which makes sense.
What would be the right way to deploy dynamically configured flink jobs which 
are not packaged as a jar file but rather generated ad-hoc?
Thanks



Re: Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
Hi Alexander,

 From my side I still think it should be reasonable to have a jar that contains 
the code that are running in the clients and also shipped to the cluster. Then 
this jar could also be included in the shipping jar list.

 For the second issue, similarly I think you may first build the project to get 
the jar containing the code, then fill the path of the generated jar in to test 
the submitting.

Best,
 Yun



 --Original Mail --
Sender:Alexander Bagerman 
Send Date:Thu Oct 29 11:38:45 2020
Recipients:Yun Gao 
CC:Flink ML 
Subject:Re: How to deploy dynamically generated flink jobs?

I did try it but this option seems to be for a third party jar. In my case I 
would need to specify/ship a jar that contains the code where job is being 
constracted. I'm not clear of
1. how to point to the containg jar 
2. how to test such a submission from my project running in Eclipse 
Alex 

On Wed, Oct 28, 2020 at 8:21 PM Yun Gao  wrote:

Hi Alexander, 

The signature of the createRemoteEnvironment is 
public static StreamExecutionEnvironment createRemoteEnvironment(
  String host, int port, String... jarFiles);
Which could also ship the jars to execute to remote cluster. Could you have a 
try to also pass the jar files to the remote environment ?

Best,
 Yun
--
Sender:Alexander Bagerman
Date:2020/10/29 10:43:16
Recipient:
Theme:How to deploy dynamically generated flink jobs?

Hi,
I am trying to build a functionality to dynamically configure a flink job 
(Java) in my code based on some additional metadata and submit it to a flink 
running in a session cluster.
Flink version is 1.11.2
The problem I have is how to provide a packed job to the cluster. When I am 
trying the following code
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);... 
configuring job workflow here...env.execute(jobName);
I am getting ClassNotFoundException stating that code for my mapping functions 
did not make it to the cluster. Which makes sense.
What would be the right way to deploy dynamically configured flink jobs which 
are not packaged as a jar file but rather generated ad-hoc?
Thanks



Re: Unify error handler and late window record output for SQL api

2020-10-29 Thread Yun Gao
Hi Yi,

  Sorry I'm might not be experts for SQL, as a whole, since SQL should be a 
high-level API, the users might have less control for the jobs:
  1. Unfortunately we do not have the API to catch all the errors. I think 
even with DataStream, we also do not provide API to catch  the runtime 
exception such as the one related to network. Could you also explain a bit on 
why this functionality is wanted? 
  2. SQL API currently also does not provide API to sideout the late 
records, since the standard SQL does not provide the corresponding grammar, it 
would be a bit complex to provide the corresponding functionalities. 

Best,
 Yun


 --Original Mail --
Sender:Yi Tang 
Send Date:Thu Oct 29 15:08:30 2020
Recipients:Flink ML 
Subject:Unify error handler and late window record output for SQL api

Hi,
I'm looking for a way to handle potential errors in job submitted with SQL API, 
but unfortunately nothing found.
Handle errors manually in SQL API is hard, I think. Is there a way to handle 
all errors and send them to a SideOutput to avoid task failure.

Also one can put late records into a SideOutput in streaming API, looks like 
there's no equivalent in SQL API.

Thanks.

Re: Native kubernetes setup failed to start job

2020-10-29 Thread Yun Gao
Hi Liangde,

   I pull in Yang Wang who is the expert for Flink on K8s.  

Best,
 Yun

 --Original Mail --
Sender:Chen Liangde 
Send Date:Fri Oct 30 05:30:40 2020
Recipients:Flink ML 
Subject:Native kubernetes setup failed to start job

I created a flink cluster in kubernetes following this guide: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
The job manager was running. When a job was submitted to the job manager, it 
spawned a task manager pod, but the task manager failed to connect to the job 
manager. And in the job manager web ui I can't find the task manager.
This error is suspicious: 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 352518404 - discarded
2020-10-29 13:22:51,069 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Connecting to 
ResourceManager 
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*().2020-10-29
 13:22:51,176 WARN  akka.remote.transport.netty.NettyTransport  
 [] - Remote connection to 
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with 
java.io.IOException: Connection reset by peer2020-10-29 13:22:51,176 WARN  
akka.remote.transport.netty.NettyTransport   [] - Remote 
connection to [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed 
with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 352518404 - discarded2020-10-29 
13:22:51,180 WARN  akka.remote.ReliableDeliverySupervisor   
[] - Association with remote system 
[akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123] has failed, 
address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123]] Caused by: [The 
remote system explicitly disassociated (reason unknown).]2020-10-29 
13:22:51,183 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
[] - Could not resolve ResourceManager address 
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*,
 retrying in 1 ms: Could not connect to rpc endpoint under address 
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*.2020-10-29
 13:23:01,203 WARN  akka.remote.transport.netty.NettyTransport  
 [] - Remote connection to 
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with 
java.io.IOException: Connection reset by peer

Re: Re: Re: How to deploy dynamically generated flink jobs?

2020-10-29 Thread Yun Gao
Hi Alexander, 

  Sorry I might not fully understand the issue, do you means the "flink" 
jar is the same jar with the spring app fat jar, or they are not the same jar? 
As a whole, I think the parameter value we need for jarFiles is the absolute 
path of the jar file. We might need some logic to decide the path to the jar 
files. For example, if the "flink" jar containing the UDF is the same to the 
spring app fat jar containing the execute call, we might use method like [1] to 
find the containing jar, otherwise we might need some mappings from the job 
name to its flink jar. 

Best,
 Yun

[1] 
https://github.com/apache/hadoop/blob/8ee6bc2518bfdf7ad257cc1cf3c73f4208c49fc0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ClassUtil.java#L38

 --Original Mail --
Sender:Alexander Bagerman 
Send Date:Fri Oct 30 04:49:59 2020
Recipients:Yun Gao 
CC:Flink ML 
Subject:Re: Re: How to deploy dynamically generated flink jobs?

Thanks, Yun. Makes sense. How would you reference a jar file from inside of 
another jar for such invocation? 
In my case I would have an interactive application - spring boot web app - 
where the job would be configured and 
StreamExecutionEnvironment.execute(jobName) is called. 
Spring app is a runnable fat jar with my "flink" jar packaged along with other 
jars. How would I specify location to the jar so that 
StreamExecutionEnvironment can find it?
Thanks,
Alex

Re: Duplicate operators generated by plan

2020-12-03 Thread Yun Gao
Hi Rex,

Could  you also attach one example for these sql / table ? And one possible 
issue to confirm is that does the operators with the same names also have the 
same inputs ?

Best,
Yun

 --Original Mail --
Sender:Rex Fenley 
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user 
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact 
join operator multiple times simply because the subsequent operator filters on 
a different boolean value. This is a massive duplication of storage and work. 
The filtered operators which follow result in only a small set of elements 
filtered out per set too.

eg. of two separate operators that are equal

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]) 

Which are entirely the same datasets being processed.

The first one points to 
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
admin_organization_ids]) 

The second one points to
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
teacher_organization_ids]) 

And these are both intersecting sets of data though slightly different. I don't 
see why that would make the 1 join from before split into 2 though. There's 
even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to 
not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Re: How to parse list values in csv file

2020-12-03 Thread Yun Gao

Hi,

The CSV only supports the types listed in [1] and must use the types in 
this list, thus for other types some kind of workaround is needed, like first 
parsed as string and parsed again later in the program. 

Best,
Yun



[1] 
https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287



 --Original Mail --
Sender:narasimha 
Send Date:Fri Dec 4 00:45:53 2020
Recipients:user 
Subject:How to parse list values in csv file

Hi,

Getting below error when trying to read a csv file, one of the field is list 
tupe 

Can someone help if fixing the issue 

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type 
'java.util.List' is not supported for the CSV input format.
jobmanager_1   | at 
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy


Re: Re: Duplicate operators generated by plan

2020-12-06 Thread Yun Gao
Hi Rex,

   I tried a similar example[1] but did not reproduce the issue, which version 
of Flink you are using now ?

Best,
 Yun


[1] The example code:
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setRestartStrategy(RestartStrategies.noRestart());
bsEnv.setParallelism(1);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

DataStream> source = bsEnv.addSource(new 
RichParallelSourceFunction>() {

   @Override
   public void run(SourceContext> sourceContext) throws 
Exception {
  sourceContext.collect(new Tuple2<>(0, "test"));
   }

   @Override
   public void cancel() {

   }
});

Table table = bsTableEnv.fromDataStream(
  source, $("id"), $("name"));
Table table2 = table.select(call("abs", $("id")), $("name"))
  .as("new_id", "new_name");

bsTableEnv.createTemporaryView("view", table2);
Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as 
new_name from view group by new_id");

Table ret = table.join(handled)
  .where($("id").isEqual($("new_id")))
  .select($("id"), $("name"), $("new_name"));
System.out.println(ret.explain());

DataStream> row = bsTableEnv.toRetractStream(ret, 
Row.class);
row.addSink(new SinkFunction>() {
   @Override
   public void invoke(Tuple2 value, Context context) throws 
Exception {

   }
});

System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
--
Sender:Rex Fenley
Date:2020/12/04 14:18:21
Recipient:Yun Gao
Cc:user; Brad Davis
Theme:Re: Duplicate operators generated by plan

cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley  wrote:

Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is 
as follows. The orgUsersTable is later filtered into one table and aggregated 
and another table and aggregated. The planner seems to duplicate orgUsersTable 
into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
 this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
 OrgUsersRoleSplatPrefix,
 this.tableEnv
 )

// helper function
 def splatRoles(
 table: Table,
 columnPrefix: String,
 tableEnv: TableEnvironment
 ): Table = {
 // Flink does not have a contains function so we have to splat out our role 
array's contents
 // and join it to the originating table.
 val func = new SplatRolesFunc()
 val splatted = table
 .map(func($"roles", $"id"))
 .as(
 "id_splatted",
 s"${columnPrefix}_is_admin",
 s"${columnPrefix}_is_teacher",
 s"${columnPrefix}_is_student",
 s"${columnPrefix}_is_parent"
 )
 // FIRST_VALUE is only available in SQL - so this is SQL.
 // Rationale: We have to group by after a map to preserve the pk inference, 
otherwise flink will
 // toss it out and all future joins will not have a unique key.
 tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
 val grouped = tableEnv.sqlQuery(s"""
 SELECT
 id_splatted,
 FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
 FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
 FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
 FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
 FROM ${columnPrefix}_splatted
 GROUP BY id_splatted
 """)
 return table
 .join(grouped, $"id" === $"id_splatted")
 .dropColumns($"id_splatted")
 .renameColumns($"roles".as(s"${columnPrefix}_roles"))
 }

@FunctionHint(
 output = new DataTypeHint(
 "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student 
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
 )
)
class SplatRolesFunc extends ScalarFunction {
 def eval(roles: Array[String], id: java.lang.Long): Row = {
 val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
 val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
 val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
 val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
 return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
 }
 override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
 Types.ROW(
 Types.LONG,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN
 )
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao  wrote:

Hi Rex,

Could  you also attach one example for these sql / table ? And one possible 
issue to confirm is that does the operators with the same names also hav

Re: How to parse list values in csv file

2020-12-06 Thread Yun Gao
Glad to hear that you solved this issue!


Best,
 Yun--
Sender:narasimha
Date:2020/12/06 21:35:33
Recipient:Yun Gao
Cc:user
Theme:Re: How to parse list values in csv file

thanks for you email.

Translated csv to JSON, read it as a plain text file and then processed to 
objects. 
It solved my use case. 


On Fri, Dec 4, 2020 at 12:24 PM Yun Gao  wrote:


Hi,

The CSV only supports the types listed in [1] and must use the types in 
this list, thus for other types some kind of workaround is needed, like first 
parsed as string and parsed again later in the program. 

Best,
Yun



[1] 
https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287



 --Original Mail --
Sender:narasimha 
Send Date:Fri Dec 4 00:45:53 2020
Recipients:user 
Subject:How to parse list values in csv file

Hi,

Getting below error when trying to read a csv file, one of the field is list 
tupe 

Can someone help if fixing the issue 

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type 
'java.util.List' is not supported for the CSV input format.
jobmanager_1   | at 
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy


-- 
A.Narasimha Swamy



Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-14 Thread Yun Gao

Hi all,

I would like to resume this discussion for supporting checkpoints after 
tasks Finished :) Based on the previous discussion, we now implement a version 
of PoC [1] to try the idea. During the PoC we also met with some possible 
issues:

1. To include EndOfPartition into consideration for barrier alignment at 
the TM side, we now tend to decouple the logic for EndOfPartition with the 
normal alignment behaviors to avoid the complex interference (which seems to be 
a bit not trackable). We could do so by inserting suitable barriers for input 
channels received but not processed EndOfPartition. For example, if a task with 
four inputs has received barrier 2 from two input channels, but the other two 
inputs do not received barrier 2  before EndOfPartition due to the precedent 
tasks are finished, we could then insert barrier 2 for the last two channels so 
that we could still finish the checkpoint 2.
2. As we have discussed, if a tasks finished during we triggering the 
tasks, it would cause checkpoint failure and we should re-trigger its 
descendants. But if possible we think we might skip this issue at the first 
version to reduce the implementation complexity since it should not affect the 
correctness. We could considering support it in the following versions.

3. We would have to add a field isFinished  to OperatorState so that we 
could not re-run finished sources after failover. However, this would require a 
new version of checkpoint meta. Currently Flink have an abstract 
MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
implementation. To add V4 which is only different from V3 for one field, the 
current PoC want to introduce a new MetaV3V4SerializerBase extends 
MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
looks a little complex and we might need a general mechanism to extend 
checkpoint meta format. 

4. With the change StreamTask would have two types of subclasses according 
to how to implement triggerCheckpoint, one is source tasks that perform 
checkpoints immediately and another is the non-source tasks that would notify 
CheckpointBarrierHandler in some way. However, since we have multiple source 
tasks (legacy and new source) and multiple non-source tasks (one-input, 
two-input, multiple-input), it would cause the cases that multiple subclasses 
share the same implementation and  cause code repetition. Currently the PoC 
introduces a new level of abstraction, namely SourceStreamTasks and 
NonSourceStreamTasks, but what makes it more complicated is that 
StreamingIterationHead extends OneInputStreamTask but it need to perform 
checkpoint as source tasks.

Glad to hear your opinions!

Best,
 Yun

[1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from 
commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Yun Gao
 Hi Aljoscha,

Very thanks for the feedbacks! For the remaining issues:

  > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

  Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

 > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


--
From:Aljoscha Krettek 
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>  1. To include EndOfPartition into consideration for barrier alignment at 
> the TM side, we now tend to decouple the logic for EndOfPartition with the 
> normal alignment behaviors to avoid the complex interference (which seems to 
> be a bit not trackable). We could do so by inserting suitable barriers for 
> input channels received but not processed EndOfPartition. For example, if a 
> task with four inputs has received barrier 2 from two input channels, but the 
> other two inputs do not received barrier 2  before EndOfPartition due to the 
> precedent tasks are finished, we could then insert barrier 2 for the last two 
> channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>  2. As we have discussed, if a tasks finished during we triggering the 
> tasks, it would cause checkpoint failure and we should re-trigger its 
> descendants. But if possible we think we might skip this issue at the first 
> version to reduce the implementation complexity since it should not affect 
> the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>  3. We would have to add a field isFinished  to OperatorState so that we 
> could not re-run finished sources after failover. However, this would require 
> a new version of checkpoint meta. Currently Flink have an abstract 
> MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
> implementation. To add V4 which is only different from V3 for one field, the 
> current PoC want to introduce a new MetaV3V4SerializerBase extends 
> MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
> looks a little complex and we might need a general mechanism to extend 
> checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>  4. With the change StreamTask would have two types of subclasses 
> according to how to implement triggerCheckpoint, one is source tasks that 
> perform checkpoints immediately and another is the non-source tasks that 
> would notify CheckpointBarrierHandler in some way. However, since we have 
> multiple source tasks (legacy and new source) and multiple non-source tasks 
> (one-input, two-input, multiple-input), it would cause the cases that 
> multiple subclasses share the same implementation and  cause code repetition. 
> Currently the PoC introduces a new level of abstraction, namely 
> SourceStreamTasks and NonSourceStreamTasks, but what makes it more 
> complicated is that StreamingIterationHead extends OneInputStreamTask but it 
> need to perform checkpoint as source tasks.

Don

Re: See lag end-to-end

2020-12-20 Thread Yun Gao
Hi Rex,

   I think Latency Marker is what you need [1].


Best,
 Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking

--
Sender:Rex Fenley
Date:2020/12/21 04:57:59
Recipient:user
Cc:Brad Davis
Theme:See lag end-to-end

Hello,

Is there some proxy to seeing the relative time it takes for records to make it 
through an entire job plan? Maybe checkpoint alignment time would be a proxy 
for this? Is there metrics for that or something else that would provide signal 
here?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 


Re: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 
1.9.2. There may be different issues for checkpoint timeout, and one possible 
one might be there are back-pressure due to some operator could not process its 
records  in time, which would block the checkpoints. I think you might check 
the back-pressure [1] first, and if there is indeed back pressure, then you 
might try unaligned checkpoints or solve the back pressure by increasing the 
parallelism of slow operators. 

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html
 



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org 
Subject:checkpointing seems to be throttled.

Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.

  
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.
We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?
Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?
For now we plan on setting 
execution.checkpointing.timeout: 60 min
execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned
  trueand also explicitly setstate.checkpoints.dir


Re: [Help Required:]-Unable to submit job from REST API

2020-12-21 Thread Yun Gao

Hi Puneet,

   From the doc it seems submitting a job via rest api should send a post 
request to /jars/:jarid/run [1]. The response "Not Found" should means the REST 
API server does not know the request type.

Best,
 Yun





 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#jars-jarid-run--
Sender:Puneet Kinra
Date:2020/12/21 19:47:31
Recipient:user
Theme:[Help Required:]-Unable to submit job from REST API

oHi All

Unable to submit job from REST API (Flink-Monitoring API),

Steps followed:

1) Load the jar using load api.
2) can see the jar in the /tmp/flink-web folder.
3) Try to run the jar using the following.

Request

http://host-ip/45f30ad6-c8fb-4c2c-9fbf-c4f56acdd9d9_stream-processor-jar-with-dependencies.jar/run?programArgs=/users/puneet/app/orchestrator/PropertiesStream_back.json&entryClass=com.orchestrator.flowExecution.GraphExecutor
 

Response:

{
"errors": [
"Not found."
]
}

-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
e-mail :puneet.ki...@customercentria.com




Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



Re: RE: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said 

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao , user@flink.apache.org 

Subject:RE: checkpointing seems to be throttled.

Thanks for the quick response.
We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any. 
From: Yun Gao  
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.
This email is from an external source -exercise caution regarding links and 
attachments.
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env the

Re: Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
Hi nick,

   Sorry I initially think that the data is also write into Kafka with flink . 
So it could be ensured that there is no delay in the write side, right ? Does 
the delay in the read side keeps existing ?

Best,
 Yun




 --Original Mail --
Sender:nick toker 
Send Date:Tue Dec 22 01:43:50 2020
Recipients:Yun Gao 
CC:user 
Subject:Re: checkpoint delay consume message

hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com‬‏>:‬

 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



Re: StreamingFileSink closed file exception

2020-12-24 Thread Yun Gao
Hi Billy,
StreamingFileSink does not expect the Encoder to close the stream passed in 
in encode method. However, ObjectMapper would close it at the end of the write 
method. Thus I think you think disable the close action for ObjectMapper, or 
change the encode implementation to 

objectMapper.writeValue(new CloseShieldOutputStream(stream), element);

to avoid the stream get closed actually.
 --Original Mail --
Sender:Billy Bain 
Send Date:Thu Dec 24 22:32:06 2020
Recipients:User 
Subject:StreamingFileSink closed file exception

I am new to Flink and am trying to process a file and write it out formatted as 
JSON. 

This is a much simplified version. 

public class AndroidReader {
public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
DataStreamSource lines =  
env.readTextFile("file:///path/to/file/input.json");

SingleOutputStreamOperator android = lines.map(new 
AndroidDataMapper());
StreamingFileSink sink =  
StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new 
AndroidDataEncoder() )

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
android.addSink(sink);
env.execute("Android");
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
public AndroidData() {
}
private String packageName;
public String getPackageName() {
return packageName;
}
public void setPackageName(String packageName) {
this.packageName = packageName;
}
}
public class AndroidDataMapper implements MapFunction {

private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public AndroidData map(String value) throws Exception {
return objectMapper.readValue(value, AndroidData.class);
}
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder {

private static final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void encode(AndroidData element, OutputStream stream) throws 
IOException {
objectMapper.writeValue(stream, element);
}
}

The issue is that I get an ClosedChannelException. I see the folder get 
created, but then no files are written to it. 
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at 
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

Any help would be appreciated. Thanks!

-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-24 Thread Yun Gao
   Hi all,

 I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

 1. Which operators should wait for one more checkpoint before close ?

One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

  Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
 a. Use base classes for each two version.
 b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks


--
From:Yun Gao 
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek ; dev ; user 

Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

 Hi Aljoscha,

Very thanks for the feedbacks! For the remaining issues:

  > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

  Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

 > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


--
From:Aljoscha Krettek 
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>  1. To include EndOfPartition into consideration for 

Re: Tumbling Time Window

2021-01-03 Thread Yun Gao
Hi Navneeth

For me I think you may start with using the window function and an example 
for the custom window function could be found in [1]. From the description I 
think it should be a standard Tumbling window, if implementing with the 
customized process function, it would end up have a similar functionality with 
the current window operator. 


Best,
 Yun


 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#processwindowfunction

--
Sender:Navneeth Krishnan
Date:2021/01/04 08:05:17
Recipient:user
Theme:Tumbling Time Window

Hello All,

First of all Happy New Year!! Thanks for the excellent community support.

I have a job which requires a 2 seconds tumbling time window per key, For each 
user we wait for 2 seconds to collect enough data and proceed to
 further processing. My question is should I use the regular DSL windowing or 
write a custom process function which does the windowing. I have heard that the 
DSL window has more overhead versus the custom window function.

What do you guys suggest and can someone provide an example of custom window 
function per key. Also given the window time is very less (2 secs) would there 
be more overhead in firing so many timers for each key?

Thanks!

Regards,
Navneeth 


Re: Facing issues on kafka while running a job that was built with 1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12

2021-01-03 Thread Yun Gao
Hi Narasimha,

 Since the Kafka-connect itself is purely implemented with Java, thus I 
guess that with high probabililty it is not the issue of scala version. I think 
may first have a check of the kafka cluster's status ?

Best,
 Yun

--
Sender:narasimha
Date:2020/12/30 13:43:28
Recipient:user
Theme:Facing issues on kafka while running a job that was built with 
1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12

Hi, 

Facing issues on kafka while running a job that was built with 
1.11.2-scala-2.11 version onto flink version 1.11.2-scala-2.12. 
kafka-connector with 1.11.2-scala-2.11 is getting packaged with the job. 

Kafka cluster was all good when writing to topics, but when someone reads 
intermittently the cluster becomes unstable with unresponsive brokers. 
Do these differences in scala binary versions cause any issues to Kafka?

Flink version 1.11.2-scala-2-12
Kafka version - 2.1.0

-- 
A.Narasimha Swamy



Re: Is chk-$id/_metadata created regardless of enabling externalized checkpoints?

2021-01-04 Thread Yun Gao
Hi Dongwon,

   Happy new year! One meta file would be stored on top of HDFS even if 
external-checkpoint is not enabled. If external checkpoint is not enabled, 
flink would delete all the checkpoints on exit, and if external checkpoint is 
enabled, the checkpoints would be kept on cancel or fail cases, according to 
the settings. Thus for the second issue, I think it would be yes.

Best,
 Yun


 --Original Mail --
Sender:Dongwon Kim 
Send Date:Mon Jan 4 19:16:39 2021
Recipients:user 
Subject:Is chk-$id/_metadata created regardless of enabling externalized 
checkpoints?

Hi,

First of all, happy new year!
It can be a very basic question but I have something to clarify in my head.

my flink-conf.yaml is as follows (note that I didn't specify the value of 
"execution-checkpointing-externalized-checkpoint-retention [1]"):
#...
execution.checkpointing.interval: 20min
execution.checkpointing.min-pause: 1min

state.backend: rocksdb
state.backend.incremental: true

state.checkpoints.dir: hdfs:///flink-jobs/ckpts
state.checkpoints.num-retained: 10

state.savepoints.dir: hdfs:///flink-jobs/svpts
#...

And the checkpoint configuration is shown as follows in Web UI (note that 
"Persist Checkpoints Externally" is "Disabled" in the final row):


According to [2],
externalized checkpoints: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are not automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the deployment notes on externalized checkpoints.
So I've thought the metadata of a checkpoint is only on JobManager's memory and 
not stored on HDFS unless 
"execution-checkpointing-externalized-checkpoint-retention" is set.

However, even without setting the value, every checkpoint already contains its 
own metadata:
[user@devflink conf]$ hdfs dfs -ls 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/*
Found 1 items
-rw-r--r--  3 user hdfs  163281 2021-01-04 14:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-945/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  163281 2021-01-04 14:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-946/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  163157 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-947/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  156684 2021-01-04 15:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-948/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  147280 2021-01-04 15:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-949/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  147280 2021-01-04 16:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-950/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  162937 2021-01-04 16:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-951/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  175089 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-952/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  173289 2021-01-04 17:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-953/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  153951 2021-01-04 17:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-954/_metadata
Found 21 items
-rw-r--r--  3 user hdfs 78748 2021-01-04 14:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/05d76f4e-3d9c-420c-8b87-077fc9880d9a
-rw-r--r--  3 user hdfs 23905 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0b9d9323-9f10-4fc2-8fcc-a9326448b07c
-rw-r--r--  3 user hdfs 81082 2021-01-04 16:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0f6779d0-3a2e-4a94-be9b-d9d6710a7ea0
-rw-r--r--  3 user hdfs 23905 2021-01-04 16:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/107b3b74-634a-462c-bf40-1d4886117aa9
-rw-r--r--  3 user hdfs 78748 2021-01-04 14:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/18a538c6-d40e-48c0-a965-d65be407a124
-rw-r--r--  3 user hdfs 83550 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/24ed9c4a-0b8e-45d4-95b8-64547cb9c541
-rw-r--r--  3 user hdfs 23905 2021-01-04 17:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/35ee9665-7c1f-4407-beb5-fbb312d84907
-rw-r--r--  3 user hdfs 47997 2021-01-04 11:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/36363172-c401-4d60-a970-cfb2b3cbf058
-rw-r--r--  3 user hdfs 81082 2021-01-04 15:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/43aecc8c-145f-43ba-81a8-b0ce2c3498f4
-rw-r--r--  3 user hdfs 79898 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/5743f278-fc50-4c4a-b14e-89bfdb2139fa
-rw-r--r--  3 user hdfs 23905 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/67e16688-c48c-409b-acac-e7091a84d548
-rw-r--r--  3 user hdfs 23905 2021-01

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
 Hi Avrid, 

 Very thanks for the feedbacks!

 For the second issue, sorry I think I might not make it very clear, 
I'm initially thinking the case that for example for a job with graph A -> B -> 
C, when we compute which tasks to trigger, A is still running, so we trigger A 
to start the checkpoint. However, before the triggering message reached A, A 
gets finished and the trigger message failed due to not found the task. In this 
case if we do not handle it, the checkpoint would failed due to timeout. 
However, by default failed checkpoint would cause job failure and we would also 
need to wait for a checkpoint interval for the next checkpoint. One solution 
would be check all the pending checkpoints to trigger B instead when JM is 
notified that A is finished.

   For the third issue, it should work if we store a special value for some 
filed in OperatorState or OperatorSubtaskState, for example, we might store a 
special subtaskState map inside the OperatorState to mark it is finished since 
the finished operator should always have an empty state. Very thanks for the 
advices! I'll try with this method. 

Best,
 Yun



--
From:Arvid Heise 
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao 
Cc:Aljoscha Krettek ; dev ; user 

Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My 
gut feeling says that this is something we should only address for new sinks 
where we decouple the semantics of commits and checkpoints anyways. @Aljoscha 
Krettek any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition 
that is finished before the first checkpoint. Then, we would need to store the 
finished state of the subtask somehow. So I'm assuming, we still need to 
trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was 
assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we 
can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao  wrote:

   Hi all,

 I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

 1. Which operators should wait for one more checkpoint before close ?

One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

  Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
 a. Use base classes for each two version.
 b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&

Re: StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

2021-01-06 Thread Yun Gao
Hi Mahendra,  

Sorry for the late reply. I noticed that in your code you implement a 
bucket assigner that reads to switch to a new bucket every minute, does it 
related to the current problems met ? Since different buckets would use 
different directories and files, when switching buckets new files would be 
created and used.


Best,
 Yun


 --Original Mail --
Sender:Mahendra Hegde 
Send Date:Tue Dec 29 20:23:33 2020
Recipients:user@flink.apache.org 
Subject:StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

Hello,

I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3.
I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with 
bulk formats.

But when I use this I am facing 2 issues :
‘shouldRollOnEvent’ method is getting called on each record addition but 
.getsize() always gives one message size instead of current partFile size.
Files are getting rolled out at every 1 minute even though my checkpoint is 
bigger (3 mins), I don’t find any way to override this 1 min default rolling.

Any suggestion would be appreciated.


Code:

val avroOcfFilesink : StreamingFileSink[GenericRecord] =  
StreamingFileSink.forBulkFormat(new Path(avroOutputPath),
  new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {
override def createWriter(out: OutputStream): 
DataFileWriter[GenericRecord] = {
  val schema: Schema = new 
Schema.Parser().parse(faultCodeOCFRecordSchema)
  val datumWriter = new ReflectDatumWriter[GenericRecord](schema)
  val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
  dataFileWriter.setCodec(CodecFactory.snappyCodec)
  dataFileWriter.create(schema, out)
  dataFileWriter
}
  }))
  .withBucketAssigner(new BucketAssigner[GenericRecord, String] {
override def getBucketId(in: GenericRecord, context: Context): String = 
{
  val bucketIdPrefix = 
configurationParameters.getRequired("s3.bucket.id.prefix")
  val currentProcessingTimeUTC = System.currentTimeMillis()
bucketIdPrefix + 
TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)

}
override def getSerializer: SimpleVersionedSerializer[String] = { 
SimpleVersionedStringSerializer.INSTANCE }
  }).withBucketCheckInterval(12)
 .withRollingPolicy(
   new CheckpointRollingPolicy[GenericRecord, String] {
override def shouldRollOnEvent(partFileState: PartFileInfo[String], 
element: GenericRecord): Boolean = {
  log.info("## PartFileState.getSize:"+partFileState.getSize+", 
Creation"+partFileState.getCreationTime+",  
Lastupdate:"+partFileState.getLastUpdateTime)
  false
}
override def shouldRollOnProcessingTime(partFileState: 
PartFileInfo[String], currentTime: Long): Boolean = {
  val result : Boolean =  (currentTime - partFileState.getCreationTime) 
>= 1
  log.info(" currentTime:"+currentTime+" , 
partFileState.getCreationTime"+partFileState.getCreationTime+", 
Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result)
  false
}
  }
).build()


Thanks
MH

Re: Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-07 Thread Yun Gao
Hi Billy,

I checked the provided example and found it should be a problem of 
ContinuousFileReader, and I created an issue for it[1]. For temporarily go 
around the issue, I think you may disable the chain of 
ContinuousFileReaderOperator with the following operators:

   
android.disableChaining().sinkTo(sink);
Best,
 Yun

[1] https://issues.apache.org/jira/browse/FLINK-20888




 --Original Mail --
Sender:Billy Bain 
Send Date:Thu Jan 7 04:02:34 2021
Recipients:Arvid Heise 
CC:user , Billy Bain 
Subject:Re: Implementing a TarInputFormat based on FileInputFormat

Hi Arvid, 

Thanks for the response. I have created a sample application with input data 
and uploaded it to google drive. The sample data is in the archive... thus the 
large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader 
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path 
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String inputPath = parameter.get("input_path");
String outputPath = parameter.get("output_path");
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource android = env.readFile(new 
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024)
.build())
.build();
android.sinkTo(sink);
env.execute("zMarket Android");
}
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise  wrote:

Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you 
maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:

I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and 
that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I 
have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at 
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at 
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at 
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at 
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat extends FileInputFormat implements 
ResultTypeQueryable {

private static final Logger logger = 
LoggerFactory.getLogger(TarInputFormat.class);
   

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Yun Gao
Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds 
> complexity (new event). 
> However, the resources would be wasted for a relatively short time until the 
> job finishes completely. 
> And compared to other options, complexity seems much lower. Or are 
> differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in 
this case, the resource for the finished 
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if 
we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to 
introduce some kind of pre-EndOfPartition messages, 
which is similar to the current EndOfPartition, but do not cause the channels 
to be released.

> 2. I think it would be helpful to describe how is rescaling handled in 
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the 
process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 
fo the subtasks are finished, now the state of the operator is composed 
of the state of the 2 remaining subtask instance, if we rescale to 5 after 
failover, the state of the 2 previous remaining subtasks would be 
re-distributed 
to the 5 new subtasks after failover. 

If before failover all the 4 subtasks are finished, the operator would be 
marked as finished, after failover the operator would be still marked as 
finished, 
and all the subtask instance of this operator would skip all the methods like 
open(), endOfInput(), close() and would be excluded when taking checkpoints
 after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's 
> true for operator state, but what about channel state (captured by unaligned 
> checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, 
then its descandent tasks would wait all the records are received
 from the finished tasks before taking checkpoint, thus in this case we would 
not have result partition state, but only have channel state for the 
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one 
checkpoint A has reported FINISHED, CheckpointCoordinator would 
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received 
EndOfPartition from the network (namely inputChannel.onBuffer() is called with 
EndOfPartition) and then taking snapshot for the input channels, as the normal 
unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun



 --Original Mail --
Sender:Khachatryan Roman 
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise 
CC:dev , user 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion (and sorry for probably duplicated 
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds 
complexity (new event). 
However, the resources would be wasted for a relatively short time until the 
job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences 
in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 
2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true 
for operator state, but what about channel state (captured by unaligned 
checkpoint)? I think it still has to be sent downstream which invalidates this 
Option.

Regards,
Roman

On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)

I think we are mixing two different things here that may require different 
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only f

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
  Hi Roman,

  Very thanks for the feedbacks !
> Probably it would be simpler to just decline the RPC-triggered 
checkpoint 
> if not all inputs of this task are finished (with 
CHECKPOINT_DECLINED_TASK_NOT_READY).

> But I wonder how significantly this waiting for EoP from every input 
will delay performing the first checkpoint 
> by B after becoming a new source. This may in turn impact 
exactly-once sinks and incremental checkpoints.
> Maybe a better option would be to postpone JM notification from 
source until it's EoP is consumed?

   I also agree with that there would indeed be possible cases that the 
checkpoint get slower since it could not skip
the data in  the result partition of the finished upstream task:
a) For aligned checkpoint, the cases would not happen since the 
downstream tasks would always need to 
process the buffers in order. 
   b)  With unaligned checkpoint enabled, the slower cases might happen 
if the downstream task processes very 
slowly. 

   But since only the result partition part of the finished upstream need 
wait to be processed, the other part of 
the execution graph could  still perform the unaligned checkpoint normally, I 
think the average delay caused would 
 be much lower than the completely aligned checkpoint, but there would still be 
extremely bad cases that
   the delay is long. 

   Declining the RPC-trigger checkpoint would indeed simplify the 
implementation, but since currently by default the
   failed checkpoint would cause job failover, thus we might have some 
concerns in directly decline the checkpoint.
   For postpone the notification the JM notification, since current JM 
should not be able to know if the task has 
   received all the EndOfPartition from the upstream tasks, we might need 
to introduce new RPC for notifying the 
   state and since the triggering is not atomic, we may also met with some  
synchronization issues between JM and TM, 
   which would introduce some complexity. 
  Thus another possible option might be let the upstream task to wait till 
all the pending buffers in the result partition has
  been flushed before get to finish. We could only do the wait for the 
PipelineResultPartition so it won't affect the batch
  jobs. With the waiting the unaligned checkpoint could continue to trigger 
the upstream task and skip the buffers in
  the result partition. Since the result partition state would be kept 
within the operator state of the last operator, after failover
  we would found that the last operator has an non-empty state and we would 
restart the tasks containing this operator to 
  resend the snapshotted buffers. Of course this would also introduce some 
complexity, and since the probability of long delay 
  would be lower than the completely aligned case, do you think it would be 
ok for us to view it as an optimization and 
  postpone it to future versions ? 

 Best,
 Yun


--
From:Khachatryan Roman 
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao 
Cc:Arvid Heise ; dev ; user 

Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one 
> checkpoint A has reported FINISHED, CheckpointCoordinator would 
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
> received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received 
> EndOfPartition from the network (namely inputChannel.onBuffer() is called 
> with 
> EndOfPartition) and then taking snapshot for the input channels, as the 
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if 
not all inputs of this task are finished (with 
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay 
performing the first checkpoint by B after becoming a new source. This may in 
turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until 
it's EoP is consumed?

Regards,
Roman



Re: Re: Use Flink to process request with list of queries and aggregate

2021-01-11 Thread Yun Gao
Hi Li,

From my view I think it would not be eaily use a countWindow if you have 
different number of records for each key (namely user in this case). I think 
you may need to user the low level KeyedProcessFunction [1] to keep some state 
by yourself. For example, each request might also carries the total number of 
requests of each user, and in the KeyedProcessFunction you might record the 
received number of requests and total requests of this user in the state. 
Whenever enough requests is received for each user, it could be known that the 
message is completely processed and the state of this user could also be 
cleaned at then.


Best,
 Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#the-keyedprocessfunction

--
Sender:Li Wang
Date:2021/01/11 07:10:27
Recipient:
Theme:Re: Use Flink to process request with list of queries and aggregate

Can I get any suggestion? Thanks a lot.

- Li



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it could not limit the 
possible max delay.

> Not all declines cause job failure, particularly 
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY 
indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint 
interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each 
checkpoint.

>> Thus another possible option might be let the upstream task to wait till all 
>> the pending buffers in the result partition has been flushed before get to 
>> finish.
> This is what I meant by "postpone JM notification from source". Just blocking 
> the task thread wouldn't add much complexity, though I'm not sure if it would 
> cause any problems.

>> do you think it would be ok for us to view it as an optimization and 
>> postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and 
currently 
I also do not see explicit problems for waiting for the flush of pipeline 
result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun



--
From:Khachatryan Roman 
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the 
> downstream task processes very slowly. 
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's 
source subtask finishes.

> But since only the result partition part of the finished upstream need wait 
> to be processed, the other part of 
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs 
are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the 
> implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns 
> in directly decline the checkpoint.
Not all declines cause job failure, particularly 
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all 
> the pending buffers in the result partition has been flushed before get to 
> finish.
This is what I meant by "postpone JM notification from source". Just blocking 
the task thread wouldn't add much complexity, though I'm not sure if it would 
cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone 
> it to future versions ? 
I think that's a good idea.

Regards,
Roman



Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-13 Thread Yun Gao
  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, 
previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the 
discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism 
cases. Another option
might be let the StreamTask do one synchronization with the 
CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions 
are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details 
in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the 
input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that 
for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been 
finished. One option
to address this issue is to make the upstream tasks to wait for buffers get 
flushed before exit, and 
we would include this in the future versions. I updated this part in this 
section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint 
before exit. To support
the operators that need to wait for some finalization condition like the Sink 
committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the 
runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks 
are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


--
From:Yun Gao 
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it could not limit the 
possible max delay.

> Not all declines cause job failure, particularly 
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY 
indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint 
interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each 
checkpoint.

>> Thus another possible option might be let the upstream task to wait till all 
>> the pending buffers in the result partition has been flushed before get to 
>> finish.
> This is what I meant by "postpone JM notification from source". Just blocking 
> the task thread wouldn't add much complexity, though I'm not sure if it would 
> cause any problems.

>> do you think it would be ok for us to view it as an optimization and 
>> postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and 
currently 
I also do not see explicit problems for waiting for the flush of pipeline 
result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun



--
From:Khachatryan Roman 
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks F

Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread Yun Gao
Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---

Re: StreamingFileSink with ParquetAvroWriters

2021-01-13 Thread Yun Gao
Hi Jan,

Could you have a try by adding this dependency ?


   org.apache.parquet
   parquet-avro
   1.11.1

 



Best,
 Yun


 --Original Mail --
Sender:Jan Oelschlegel 
Send Date:Thu Jan 14 00:49:30 2021
Recipients:user@flink.apache.org 
Subject:StreamingFileSink with ParquetAvroWriters

Hi,
i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for 
writing into HDFS in Parquet format.

As it says in the documentation I have added the dependencies:


   org.apache.flink
   flink-parquet_${scala.binary.version}
   ${flink.version}


And this is my file sink definition:

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
new Path("hdfs://namenode.local:8020/user/datastream/"),
ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()


If I execute this in cluster I get the following error:

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)


Looks like there are some dependencies missing. How can I fix this?


Jan O.HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread Yun Gao
Hi Sagar,

  I rechecked and found that the new kafka source is not formally publish yet, 
and a stable method I think may be try adding the FlinkKafkaConsumer as a 
BOUNDED source first. Sorry for the inconvient. 

Best,
 Yun

--
Sender:Yun Gao
Date:2021/01/14 15:26:54
Recipient:Ardhani Narasimha; 
sagar
Cc:Flink User Mail List
Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-14 Thread Yun Gao
  Hi all,

We have some offline discussion together with @Arvid, @Roman and @Aljoscha and 
I'd 
like to post some points we discussed:

1) For the problem that the "new" root task coincidently finished before 
getting triggered
successfully, we have listed two options in the FLIP-147[1], for the first 
version, now we are not tend
to go with the first option that JM would re-compute and re-trigger new sources 
when it realized
some tasks are not triggered successfully. This option would avoid the 
complexity of adding 
new PRC and duplicating task states, and in average case it would not cause too 
much 
overhead.

2) For how to support operators like Sink Committer to wait for one complete 
checkpoint 
before exit, it would be more an issue of how to use the checkpoints after 
tasks finished instead 
of how to achieve checkpoint after tasks finished, thus we would like to not 
include this part 
first in the current discussion. We would discuss and solve this issue 
separately after FLIP-147 is done.

Best,
 Yun


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
--
From:Yun Gao 
Send Time:2021 Jan. 13 (Wed.) 16:09
To:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, 
previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the 
discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism 
cases. Another option
might be let the StreamTask do one synchronization with the 
CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions 
are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details 
in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the 
input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that 
for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been 
finished. One option
to address this issue is to make the upstream tasks to wait for buffers get 
flushed before exit, and 
we would include this in the future versions. I updated this part in this 
section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint 
before exit. To support
the operators that need to wait for some finalization condition like the Sink 
committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the 
runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks 
are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


--
From:Yun Gao 
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it coul

Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 Thread Yun Gao
   Hi,
   Very thanks for Jinsong to bring up this discussion! It should largely 
improve the usability after enhancing the FileSystem connector in Table. 

   I have the same question with Piotr. From my side, I think it should be 
better to be able to reuse existing StreamingFileSink. I think We have began 
   enhancing the supported FileFormat (e.g., ORC, Avro...), and reusing 
StreamFileSink should be able to avoid repeat work in the Table library. 
Besides, 
   the bucket concept seems also matches the semantics of partition. 

   For the notification of adding partitions, I'm a little wondering that 
the Watermark mechanism might not be enough since Bucket/Partition might spans
   multiple subtasks. It depends on the level of notification: if we want 
to notify for the bucket on each subtask, using watermark to notifying each 
subtask
   should be ok, but if we want to notifying for the whole 
Bucket/Partition, we might need to also do some coordination between subtasks. 


 Best, 
  Yun




--
From:Piotr Nowojski 
Send Time:2020 Mar. 13 (Fri.) 18:03
To:dev 
Cc:user ; user-zh 
Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table

Hi,

Which actual sinks/sources are you planning to use in this feature? Is it about 
exposing StreamingFileSink in the Table API? Or do you want to implement new 
Sinks/Sources?

Piotrek

> On 13 Mar 2020, at 10:04, jinhai wang  wrote:
> 
> Thanks for FLIP-115. It is really useful feature for platform developers who 
> manage hundreds of Flink to Hive jobs in production.
> I think we need add 'connector.sink.username' for UserGroupInformation when 
> data is written to HDFS
> 
> 
>  在 2020/3/13 下午3:33,“Jingsong Li” 写入:
> 
>Hi everyone,
> 
>I'd like to start a discussion about FLIP-115 Filesystem connector in Table
>[1].
>This FLIP will bring:
>- Introduce Filesystem table factory in table, support
>csv/parquet/orc/json/avro formats.
>- Introduce streaming filesystem/hive sink in table
> 
>CC to user mail list, if you have any unmet needs, please feel free to
>reply~
> 
>Look forward to hearing from you.
> 
>[1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> 
>Best,
>Jingsong Lee
> 
> 
> 



Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-16 Thread Yun Gao
Hi Kaan,

   For the first issue, I think the two implementation should have difference 
and the first should be slower, but I think which one to use should be depend 
on your algorithm if it could compute incrementally only with the changed 
edges. However, as far as I know I think most graph algorithm does not satisfy 
this property, therefore I think you might have to use the first one. 

For the second issue, I think you might use Graph.getVertices() and 
graph.getEdges() to get the underlying vertices and edges dataset of the graph, 
then you could do any operations with the two datasets, like join the vertices 
dataset with the second edge list, and finally create a new Graph with new 
Graph(updated_vertices, edges, env).

Best,
 Yun


--
From:Kaan Sancak 
Send Time:2020 Apr. 16 (Thu.) 17:17
To:Till Rohrmann 
Cc:Tzu-Li (Gordon) Tai ; user 
Subject:Re: Question about Writing Incremental Graph Algorithms using Apache 
Flink Gelly

If the vertex type is POJO what happens during the union of the graph? Is there 
a persistent approach, or can we define a function handle such occasions?

Would there be a performance difference between two cases:

1)
 Graph graph = … // From edges list

 graph = graph.runScatterGatherIteration();

 Graph secondGraph = … // From second edge list

 graph = graph.union(secondGraph).runScatterGatherIteration()

2)
  
Graph graph = … // From edges list

 graph = graph.runScatterGatherIteration();

 graph.addEdges(second_edge_list)

 graph = graph.runScatterGatherIteration();


Before starting the second scatter-gather, I want to set/reset some fields of 
the vertex value of the vertices that are effected by edge additions/deletions 
(or union). It would be good to have a callback function that touches the 
end-points of the edges that are added/deleted.

Best
Kaan




On Apr 15, 2020, at 11:07 AM, Till Rohrmann  wrote:
Hi Kaan,

I think what you are proposing is something like this:

Graph graph = ... // get first batch

Graph graphAfterFirstSG = 
graph.runScatterGatherIteration();

Graph secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph updatedGraph = 
graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till
On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak  wrote:
Thanks for the useful information! It seems like a good and fun idea to 
experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the 
Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or 
Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and 
obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are 
happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai  wrote:
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Re: Updating Closure Variables

2020-04-27 Thread Yun Gao
 Hi Senthil,
 I think you are right that you cannot update closure variables 
directly and expect them to show up at the workers.

 If the variable values are read from S3 files, I think currently you 
will need to define a source explicitly to read the latest value of the file. 
Whether to use BroadcastedStream should depends on how you want to access the 
set of string: if you want to broadcast the same strings to all the tasks, then 
broadcast stream is the solution and if you want to distribute the set of 
strings in other methods, you could also use more generic connect streams like: 
 streamA.connect(streamB.keyBy()).process(xx). [1]

Best,
 Yun

 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#datastream-transformations



--
From:Senthil Kumar 
Send Time:2020 Apr. 27 (Mon.) 21:51
To:user@flink.apache.org 
Subject:Updating Closure Variables

Hello Flink Community!

We have a flink streaming application with a particular use case where a 
closure variable Set is used in a filter function.

Currently, the variable is set at startup time.

It’s populated from an S3 location, where several files exist (we consume the 
one with the last updated timestamp).

Is it possible to periodically update (say once every 24 hours) this closure 
variable?

My initial research indicates that we cannot update closure variables and 
expect them to show up at the workers.

There seems to be something called BrodcastStream in Flink. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Is that the right approach? I would like some kind of a confirmation before I 
go deeper into it.

cheers
Kumar



回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Yun Gao
Hi Peter,

Sorry for missing the question and response later, I'm currently sworking 
together with Jingsong on the issue to support "global committing" (like 
writing _SUCCESS file or adding partitions to hive store) after buckets 
terminated. In 1.11 we may first support watermark/time related buckets in 
Table/SQL API, and we are also thinking of supporting "global committing" for 
arbitrary bucket assigner policy for StreamingFileSink users. The current rough 
thought is to let users specify when a bucket is terminated on a single task, 
and the OperatorCoordinator[1] of the sink will aggreate the information from 
all subtasks about this bucket and do the global committing if the bucket has 
been finished on all the subtasks, but this is still under thinking and 
discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best,
 Yun


[1] OperatorCoordinator is introduced in FLIP-27: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
 This is a component resides in JobManager and could communicate with all the 
subtasks of the corresponding operator, thus it could be used to aggregate 
status from subtasks. 


 --原始邮件 --
发件人:Robert Metzger 
发送时间:Tue May 12 15:36:26 2020
收件人:Jingsong Li 
抄送:Peter Groesbeck , user 
主题:Re: Writing _SUCCESS Files (Streaming and Batch)

Hi Peter,

I filed a ticket for this feature request: 
https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your 
thoughts / requirements to the ticket)

Best,
Robert


On Wed, May 6, 2020 at 3:41 AM Jingsong Li  wrote:

Hi Peter,

The troublesome is how to know the "ending" for a bucket in streaming job.
In 1.11, we are trying to implement a watermark-related bucket ending 
mechanism[1] in Table/SQL.

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee
On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck  
wrote:

I am replacing an M/R job with a Streaming job using the StreamingFileSink and 
there is a requirement to generate an empty _SUCCESS file like the old Hadoop 
job. I have to implement a similar Batch job to read from backup files in case 
of outages or downtime.

The Batch job question was answered here and appears to be still relevant 
although if someone could confirm for me that would be great.
https://stackoverflow.com/a/39413810

The question of the Streaming job came up back in 2018 here:
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E

But the solution to use or extend the BucketingSink class seems out of date now 
that BucketingSink has been deprecated.

Is there a way to implement a similar solution for StreamingFileSink?

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future.

Thank you,
Peter

-- 
Best, Jingsong Lee

回复:changing the output files names in Streamfilesink from part-00 to something else

2020-05-12 Thread Yun Gao
Hi Dhurandar:

Currently StreamingFileSink should be able to change the prefix and suffix 
of the filename[1], it could be changed to something like -0-0. 
Could this solve your problem ?


 Best,
  Yun




[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#part-file-configuration



--
发件人:dhurandar S
日 期:2020年05月13日 05:13:04
收件人:user; 
主 题:changing the output files names in Streamfilesink from part-00 to something 
else

We want to change the name of the file being generated as the output of our 
StreamFileSink. 
, when files are generated they are named part-00*, is there a way that we can 
change the name. 

In Hadoop, we can change RecordWriters and MultipleOutputs. May I please some 
help in this regard. This is causing blockers for us and will force us t move 
to MR job 

-- 
Thank you and regards,
Dhurandar




回复:Performance issue when writing to HDFS

2020-05-21 Thread Yun Gao
Hi Kong,

 Sorry that I'm not expert of Hadoop, but from the logs and Google, It 
seems more likely to be a problem of HDFS side [1] ? Like long-time GC in 
DataNode.

 Also I have found a similar issue from the history mails [2], and the 
conclusion should be similar.

 Best,
 Yun


   [1] 
https://community.cloudera.com/t5/Support-Questions/Solution-for-quot-slow-readprocessor-quot-warnings/td-p/122046
   [2] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Slow-ReadProcessor-quot-warnings-when-using-BucketSink-td9427.html



 --原始邮件 --
发件人:Mu Kong 
发送时间:Fri May 22 11:16:32 2020
收件人:user 
主题:Performance issue when writing to HDFS

Hi all,

I have Flink application consuming from Kafka and writing the data to HDFS 
bucketed by event time with BucketingSink.
Sometimes, the the traffic gets high and from the prometheus metrics, it shows 
the writing is not stable.

(getting from flink_taskmanager_job_task_operator_numRecordsOutPerSecond)

The output data on HDFS is also getting delayed. (The records for a certain 
hour bucket are written to HDFS 50 minutes after that hour)

I looked into the log, and find warning regarding the datanode ack, which might 
be related:

DFSClient exception:2020-05-21 10:43:10,432 INFO  
org.apache.hadoop.hdfs.DFSClient  - Exception in 
createBlockOutputStream
java.io.IOException: Got error, status message , ack with firstBadLink as :1004
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1478)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1380)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:558)

 Slow ReadProcessor read fields warning:2020-05-21 10:42:30,509 WARN  
org.apache.hadoop.hdfs.DFSClient  - Slow 
ReadProcessor read fields took 30230ms (threshold=3ms); ack: seqno: 126 
reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 372753456 
flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[:1004,DS-833b175e-9848-453d-a222-abf5c05d643e,DISK], 
DatanodeInfoWithStorage[:1004,DS-f998208a-df7b-4c63-9dde-26453ba69559,DISK], 
DatanodeInfoWithStorage[:1004,DS-4baa6ba6-3951-46f7-a843-62a13e3a62f7,DISK]]


We haven't done any tuning for the Flink job regarding writing to HDFS. Is 
there any config or optimization we can try to avoid delay and these warnings?

Thanks in advance!!

Best regards,
Mu

回复:onTimer method in CoProcessFunction in flink

2020-05-23 Thread Yun Gao
Hi Jaswin,

 If I understand right, I think you could add the logic in the onTimer 
callback. In this callback, OnTimerContext.output(xx, outputTag) could be used 
to output data to the specific sideout. Besides, you should need a new state to 
store the elements to output in the onTimer callback. A similar example might 
be [1].

Best,
 Yun
​ 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example




 --原始邮件 --
发件人:Jaswin Shah 
发送时间:Fri May 22 23:00:43 2020
收件人:user@flink.apache.org , Arvid Heise 
, Yun Tang 
主题:onTimer method in CoProcessFunction in flink

How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;


/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 *
 * @return
 */

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);

String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}

/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (

回复:Re: onTimer method in CoProcessFunction in flink

2020-05-23 Thread Yun Gao
Hi Jaswin,
I think the event time timer and process time timer in Flink should be 
fully decoupled: the event time timer is trigger by the watermark received, and 
the processing time is trigger by physical clock, and you may think them as two 
seperated timelines and have no guarantee on their relative speed. Therefore, I 
think the result of computing the deadline with event time and register it as 
processing time should be nondetermined, and it depends on the gap between 
event time and processing time.

Best,
 Yun



 --原始邮件 --
发件人:Jaswin Shah 
发送时间:Sat May 23 22:08:57 2020
收件人:user@flink.apache.org , Arvid Heise 
, Yun Tang 
主题:Re: onTimer method in CoProcessFunction in flink

Hi Yun,
Actually this problem is solved now. I have been stuck in other problem of 
timeoutcallbacks. Here, I am receiving the callbacks too early and the eventime 
registrations was somehow failing, might be it was needing some special 
handling. I need to know if this callback registration is wrong or is there 
something wrong.
Do we need some special handling for event time semantecs usages?
Thanks,
Jaswin

From: Jaswin Shah 
Sent: 22 May 2020 20:30
To: user@flink.apache.org ; Arvid Heise 
; Yun Tang 
Subject: onTimer method in CoProcessFunction in flink
How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;


/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 *
 * @return
 */

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);

String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNu

Re: Re: Flink Window with multiple trigger condition

2020-05-24 Thread Yun Gao
Hi,

   First sorry that I'm not expert on Window and please correct me if I'm 
wrong, but from my side, it seems the assigner might also be a problem in 
addition to the trigger: currently Flink window assigner should be all based on 
time (processing time or event time), and it might be hard to implement an 
event-driven window assigner that start to assign elements to a window after 
received some elements. 
  What comes to me is that a possible alternative method is to use the 
low-level KeyedProcessFunction directly:  you may register a timer 30 mins 
later when received the "search" event and write the time of search event into 
the state. Then for the following events, they will be saved to the state since 
the flag is set. After received the "start" event or the timer is triggered, 
you could load all the events from the states, do the aggregation and cancel 
the timer if it is triggered by "start" event. A simpler case is [1] and it 
does not consider stop the aggreation when received special event, but it seems 
that the logic could be added to the case.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example

Best,
 Yun




 --Original Mail --
Sender:aj 
Send Date:Sun May 24 01:10:55 2020
Recipients:Tzu-Li (Gordon) Tai 
CC:user 
Subject:Re: Flink Window with multiple trigger condition


I am still not able to get much after reading the stuff. Please help with some 
basic code to start to build this window and trigger. 

Another option I am thinking is I just use a Richflatmap function and use the 
keyed state to build this logic. Is that the correct approach? 



On Fri, May 22, 2020 at 4:52 PM aj  wrote:


I was also thinking to have a processing time window but that will not work for 
me. I want to start the window when the user  "search" event arrives. So for 
each user window will start from the search event. 
 The Tumbling window has fixed start end time so that will not be suitable in 
my case. 




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai  
wrote:
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





Re: Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread Yun Gao
yless")) {
currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
SearchSummaryCalculation(record, currentTuple);
}
sessionSummary.put(search_hex9, currentTuple);
}
timeState.update(currentTimeState);
}





On Sun, May 24, 2020 at 10:57 PM Yun Gao  wrote:

Hi,

   First sorry that I'm not expert on Window and please correct me if I'm 
wrong, but from my side, it seems the assigner might also be a problem in 
addition to the trigger: currently Flink window assigner should be all based on 
time (processing time or event time), and it might be hard to implement an 
event-driven window assigner that start to assign elements to a window after 
received some elements. 
  What comes to me is that a possible alternative method is to use the 
low-level KeyedProcessFunction directly:  you may register a timer 30 mins 
later when received the "search" event and write the time of search event into 
the state. Then for the following events, they will be saved to the state since 
the flag is set. After received the "start" event or the timer is triggered, 
you could load all the events from the states, do the aggregation and cancel 
the timer if it is triggered by "start" event. A simpler case is [1] and it 
does not consider stop the aggreation when received special event, but it seems 
that the logic could be added to the case.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example

Best,
 Yun




 --Original Mail --
Sender:aj 
Send Date:Sun May 24 01:10:55 2020
Recipients:Tzu-Li (Gordon) Tai 
CC:user 
Subject:Re: Flink Window with multiple trigger condition


I am still not able to get much after reading the stuff. Please help with some 
basic code to start to build this window and trigger. 

Another option I am thinking is I just use a Richflatmap function and use the 
keyed state to build this logic. Is that the correct approach? 



On Fri, May 22, 2020 at 4:52 PM aj  wrote:


I was also thinking to have a processing time window but that will not work for 
me. I want to start the window when the user  "search" event arrives. So for 
each user window will start from the search event. 
 The Tumbling window has fixed start end time so that will not be suitable in 
my case. 




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai  
wrote:
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





Re: Question on stream joins

2020-05-28 Thread Yun Gao
Hi Sudan,

   As far as I know, both join and cogroup requires keys (namely partitioning), 
thus for the non-keyed scenario, you may have to use low-level connect operator 
to achieve it. In my opinion it should be something like

  leftSource.connect(rightSource)
   .process(new TagCoprocessFunction()) // In this function,  tag the left 
source with "0" and the right source with "1"
​  .window(xx) 
​  .process(new XX()) // In this function, you could get all the left and 
right elements in this window, and you could distinguish them with the tag 
added in the previous step.

It should be pointed out that without key (partitioning) the paralellism of the 
window operator will have to be 1.


For the keyed scenarios, You may use high-level operators join/cogroup to 
achieve that. The join could be seen as a special example as cogroup that in 
cogroup, you could access all the left and right elements directly, and in join 
function, the framework will iterate the elements for you and you can only 
specify the logic for each (left, right) pair. 

Best,
 Yun


 --Original Mail --
Sender:Sudan S 
Send Date:Fri May 29 01:40:59 2020
Recipients:User-Flink 
Subject:Question on stream joins

Hi ,

I have two usecases

1. I have two streams which `leftSource` and `rightSource` which i want to join 
without partitioning over a window and find the difference of count of elements 
of leftSource and rightSource and emit the result of difference. Which is the 
appropriate join function ican use ?

join/cogroup/connect.

2. I want to replicate the same behaviour over a keyed source. Basically 
leftSource and rightSource are joined by a partition key.

Plz let me know which is the appropriate join operator for the usecase
"The information contained in this e-mail and any accompanying documents may 
contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if this 
message has been addressed to you in error, please immediately alert the sender 
by replying to this e-mail and then delete this message, including any 
attachments. Any dissemination, distribution or other use of the contents of 
this message by anyone other than the intended recipient is strictly 
prohibited. All messages sent to and from this e-mail address may be monitored 
as permitted by applicable law and regulations to ensure compliance with our 
internal policies and to protect our business."

Re: Re: Question on stream joins

2020-05-29 Thread Yun Gao
Hi Sudan,

   The first process is used to tag the elements from the left and right 
windows, so next they could be merged into the same stream and then they could 
be assigned to the same window. Then the next window(xxx).process(new 
WindowProcessFunction) defines the window operator to process the windowed 
elements, thus the second process defines the window process logic. Without the 
tagging we may not be able to assign the elements from both the left and right 
stream to the same window.

Best,
 Yun



 --Original Mail --
Sender:Sudan S 
Send Date:Fri May 29 14:39:31 2020
Recipients:Yun Gao 
CC:User-Flink 
Subject:Re: Question on stream joins

Thanks Yun. Was thinking a similar way.  I had one more question.

leftSource.connect(rightSource)
   .process(new TagCoprocessFunction()) // In this function,  tag the left 
source with "0" and the right source with "1"
  .window(xx) 
  .process(new XX()) 

In this when will the window be applied ? since the window operator is after 
process(new TagCoprocessFunction()).

On Fri, May 29, 2020 at 11:35 AM Yun Gao  wrote:

Hi Sudan,

   As far as I know, both join and cogroup requires keys (namely partitioning), 
thus for the non-keyed scenario, you may have to use low-level connect operator 
to achieve it. In my opinion it should be something like

  leftSource.connect(rightSource)
   .process(new TagCoprocessFunction()) // In this function,  tag the left 
source with "0" and the right source with "1"
​  .window(xx) 
​  .process(new XX()) // In this function, you could get all the left and 
right elements in this window, and you could distinguish them with the tag 
added in the previous step.

It should be pointed out that without key (partitioning) the paralellism of the 
window operator will have to be 1.


For the keyed scenarios, You may use high-level operators join/cogroup to 
achieve that. The join could be seen as a special example as cogroup that in 
cogroup, you could access all the left and right elements directly, and in join 
function, the framework will iterate the elements for you and you can only 
specify the logic for each (left, right) pair. 

Best,
 Yun


 --Original Mail --
Sender:Sudan S 
Send Date:Fri May 29 01:40:59 2020
Recipients:User-Flink 
Subject:Question on stream joins

Hi ,

I have two usecases

1. I have two streams which `leftSource` and `rightSource` which i want to join 
without partitioning over a window and find the difference of count of elements 
of leftSource and rightSource and emit the result of difference. Which is the 
appropriate join function ican use ?

join/cogroup/connect.

2. I want to replicate the same behaviour over a keyed source. Basically 
leftSource and rightSource are joined by a partition key.

Plz let me know which is the appropriate join operator for the usecase
"The information contained in this e-mail and any accompanying documents may 
contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if this 
message has been addressed to you in error, please immediately alert the sender 
by replying to this e-mail and then delete this message, including any 
attachments. Any dissemination, distribution or other use of the contents of 
this message by anyone other than the intended recipient is strictly 
prohibited. All messages sent to and from this e-mail address may be monitored 
as permitted by applicable law and regulations to ensure compliance with our 
internal policies and to protect our business."
"The information contained in this e-mail and any accompanying documents may 
contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if this 
message has been addressed to you in error, please immediately alert the sender 
by replying to this e-mail and then delete this message, including any 
attachments. Any dissemination, distribution or other use of the contents of 
this message by anyone other than the intended recipient is strictly 
prohibited. All messages sent to and from this e-mail address may be monitored 
as permitted by applicable law and regulations to ensure compliance with our 
internal policies and to protect our business."

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-05-31 Thread Yun Gao
Hi Yu,

I think when the serializer returns null, the following operator should still 
receive a record of null. A possible thought is that the following operator may 
couting the number of null records received and use a metric to publish the 
value to a monitor system, and the monitor system promethus, and the monitor 
system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like 
NULL_TBASE, and the operator should be able to count the number of NULL_TBASE 
received.

Best,
 Yun



 --Original Mail --
Sender:Yu Yang 
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user 
Subject:best practice for handling corrupted records / exceptions in custom 
DefaultKryoSerializer?

Hi all, 

To deal with corrupted messages that can leak into the data source once in a 
while, we implement a custom DefaultKryoSerializer class as below that catches 
exceptions. The custom serializer returns null in read(...) method when it 
encounters exception in reading. With this implementation, the serializer may 
silently drop records.  One concern is that it may drop too many records before 
we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record 
make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
 private static final Logger LOG = 
LoggerFactory.getLogger(CustomTBaseSerializer.class);
 @Override
 public void write(Kryo kryo, Output output, TBase tBase) {
 try {
 super.write(kryo, output, tBase);
} catch (Throwable t) {
 LOG.error("Failed to write due to unexpected Throwable", t);
}
}

 @Override
 public TBase read(Kryo kryo, Input input, Class tBaseClass) {
 try {
 return super.read(kryo, input, tBaseClass);
} catch (Throwable t) {
 LOG.error("Failed to read from input due to unexpected Throwable", 
t);
 return null;
}
 }
  }

Thank you!

Regards, 
-Yu

Re: Reading files from multiple subdirectories

2020-06-11 Thread Yun Gao
Hi Lorenzo,

Read from a previouse thread [1] and the source code, I think you may set 
inputFormat.setNestedFileEnumeration(true) to also scan the nested files.

Best,
Yun

[1] 
https://lists.apache.org/thread.html/86a23b4c44d92c3adeb9ff4a708365fe4099796fb32deb6319e0e17f%40%3Cuser.flink.apache.org%3E



--
Sender:Lorenzo Nicora
Date:2020/06/11 21:31:20
Recipient:user
Theme:Reading files from multiple subdirectories

Hi,

related to the same case I am discussing in another thread, but not related to 
AVRO this time :) 

I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3 
bucket.
Files are bucketed by date time as it often happens.

Is there any way, using Flink only, to monitor a base-path and detect new files 
in any subdirectories? 
Or I need to use something external to move new files in a single directory?

I am currently using 
env.readFile(inputFormat, path, PROCESS_CONTINUOUSLY, 6)
with AvroInputFormat, but it seems it can only monitor a single directory


Cheers
Lorenzo 


Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

2020-06-12 Thread Yun Gao
Hi Felipe,

   I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type 
locally and it seems to be able to startup normally.

   Could you also share your current executing code and the full stacktrace of 
the exception ?

Best,
 Yun

 [1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
 --Original Mail --
Sender:Felipe Gutierrez 
Send Date:Fri Jun 12 23:11:28 2020
Recipients:user 
Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to 
the TaxiRide training example?
Hi,

I am using the flink training exercise TaxiRide [1] to execute a
stream count of events. On the cluster and on my local machine I am
receiving the message that joda.Time cannot be serialized "class
org.joda.time.LocalDateTime is not a valid POJO type". However it is
starting the job on the cluster, but not in my local machine. So I
searched in the internet and it is requested to register the jodaTime
class on the environment[2]. I did like this:

env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);

and I added the joda and avro dependency on the pom.xml:


joda-time
joda-time


org.apache.flink
flink-avro
${project.version}


I also tested using addDefaultKryoSerializer but I got the same error.
For some reason, it is still not working. Does anyone have some hint
of what could be happening?

Thanks! Felipe
[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Re: Shared state between two process functions

2020-06-14 Thread Yun Gao
Hi Jaswin,
   Currently the state belongs to single operators, thus it should be not 
possible to share states between different operators. Could you also share the 
original problem want to solve by sharing states ?

Best,
 Yun



 --Original Mail --
Sender:Jaswin Shah 
Send Date:Sun Jun 14 18:57:54 2020
Recipients:user@flink.apache.org 
Subject:Shared state between two process functions

Hi,

Is it possible to create the shared state(MapState) between two different 
keyedProcessFunction? If it's possible, how can we do that in flink?

Thanks,
Jaswin

Re: adding s3 object metadata while using StreamFileSink

2020-06-21 Thread Yun Gao
Hi Dhurandar,

With my understand I think what you need is to get notified when a file is 
written successfully (committed) on the S3 FileSystem. However, currently there 
is no public API for the listener and there an issue tracking it [1].

With the current version, one possible method comes to me is that may have 
to use reflection to access some internal states of StreamFileSink to get the 
committed files. As a whole, you may need to implement a customized 
StreamingFileSink and override the notifyCheckpointComplete method, where the 
new S3 file get committed and visible:

class CustomizedStreamingFileSink extends StreamingFileSink {

public void notifyCheckpointComplete(long checkpointId) throws Exception {
// 1. First use reflection to get the list of files will be committed 
in this call.
// The list of files should be get via StreamingFile -> ( 
StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there 
will be multiple Buckets) -> (for each Bucket) 
pendingFileRecoverablesPerCheckpoint
// Then we could get the iterator of pending files to commit in this 
time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2]
  // Then you could get the S3 object names via (PendingFileRecover if 1.11 is 
used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName.

super.notifyCheckpointComplete(checkpointId); // Get files committed 
normally.

// 3. Then here could start writing meta info for S3 objects recorded 
in step 1. 
}
}

For a single file it may get committed multiple times, therefore the writing 
meta info action must also be able to handle the repeat writing.

Another possible method will be to use a seperate source operator to periodly 
scans the S3 file system to detect the newly added files and modify their meta 
data. There should be embedding source function 
ContinuousFileMonitoringFunction[3] for this work, and I think it might be 
modified or reused for scanning the files. 

Best,
  Yun 


[1] https://issues.apache.org/jira/browse/FLINK-17900
[2] 
https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268
[3] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java


--
Sender:dhurandar S
Date:2020/06/20 03:19:38
Recipient:user; Flink Dev
Theme:adding s3 object metadata while using StreamFileSink

We are creating files in S3 and we want to update the S3 object metadata with 
some security-related information for governance purposes.

Right now Apache Flink totally abstracts how and when S3 object gets created in 
the system. 

Is there a way that we can pass the S3 object metadata and update it for the 
object created.

If not, 

How can we know when Apache Flink has created an S3 file. Deterministically.
Since once its created in S3 we can write Java code after that to add those 
metadata information?

-- 
Thank you and regards,
Dhurandar




Re: Problems with type erasure

2020-06-22 Thread Yun Gao
Hi Vincenzo:

Could you also attach the codes before line 72, namely how `delays` is 
defined ? Since the exception says the return type of "Custom Source" could not 
be defined, and I think it should refer to `delays`, and the exception is 
thrown when an operator is called on `delays` and Flink tries to create a new 
transformation based on the information of `delays`.

Best,
 Yun


 --Original Mail --
Sender:Vincenzo Pronestì 
Send Date:Mon Jun 22 19:02:05 2020
Recipients:flink-user 
Subject:Problems with type erasure

Hi there,
I need to execute the following code:
 72: KeyedStream, String> keyedDelays = delays 73:   
.flatMap(new Query1FlatMap()) 74:  .keyBy(item -> 
item.f0);but I keep getting this error message:The program finished with the 
following exception:The return type of function 'Custom Source' could not be 
determined automatically, due to type erasure. You can give type information 
hints by using the returns(...) method on the result of the transformation 
call, or by letting your function implement the 'ResultTypeQueryable' 
interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've
 read this guide 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
 (there's an example with Tuple2> which is the same I need) and 
I think I have two options:1 - Implement ResultTypeQueryable> in my Query1FlatMap class. I did this by adding:@Overridepublic 
TypeInformation> getProducedType() {return 
TypeInformation.of(new TypeHint>(){});}2 - Use the 
returns method right after the flatMap(new Query1FlatMap()), like 
this:TypeInformation> tInfo = TypeInformation.of(new 
TypeHint>(){});KeyedStream, String> keyedDelays = delays.flatMap(new 
Query1FlatMap()).returns(tInfo).keyBy(item -> item.f0);Actually 
I've also tried with:TypeHint> tHint = new 
TypeHint>(){};KeyedStream, 
String> keyedDelays = delays.flatMap(new 
Query1FlatMap()).returns(tHint).keyBy(item -> item.f0);The 
problem is none of all these things works and the error message is always the 
same as above. Does any of you know how I can fix this?Also I'm having the same 
issue with another code where the keyed stream has two Tuple2 (i.e. 
Tuple2, Integer>, Tuple>). Would the solution work even 
in this last case? Or, do I need to change something because of the double 
Tuple2?Thank you for your attention.Best regards,Vincenzo 

Re: TypeInformation not found

2020-06-22 Thread Yun Gao
Hi yu,

Have you add "import org.apache.flink.api.scala._"? It seems should be ok 
if the import has been added in the program:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object Test {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
solutionInput.print()
env.execute()
  }
}


Best,
Yun





 --Original Mail --
Sender:王宇 
Send Date:Tue Jun 23 09:42:47 2020
Recipients:User 
Subject:No Subject

Hi, all
 some error occurred when I run flink in minicluster, 
flink-version:1.11、scala-version:2.12.0.

Error:(33, 41) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))
Error:(33, 41) not enough arguments for method fromElements: (implicit 
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, 
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val solutionInput = env.fromElements((1, "1"))
Error:(34, 40) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val worksetInput = env.fromElements((2, "2"))
Error:(34, 40) not enough arguments for method fromElements: (implicit 
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, 
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val worksetInput = env.fromElements((2, "2"))
Error:(47, 41) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))

have tried 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#type-information-in-the-scala-api

thanks

Re: Re: TypeInformation not found

2020-06-24 Thread Yun Gao
Hi Yu,

   I tried WordCount and the attached test, it should be able to run normally 
in my IDEA. Could you have a check of the imported project, or reimport the 
project if there are still problems ? 

Best,
Yun
 --Original Mail --
Sender:Yu Wang 
Send Date:Tue Jun 23 15:51:27 2020
Recipients:Yun Gao 
CC:User 
Subject:Re: TypeInformation not found

thanks  Yun Gao!have added "import org.apache.flink.api.scala._", I just to run 
wordcount in idea .



On Tue, Jun 23, 2020 at 11:16 AM Yun Gao  wrote:

Hi yu,

Have you add "import org.apache.flink.api.scala._"? It seems should be ok 
if the import has been added in the program:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object Test {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
solutionInput.print()
env.execute()
  }
}


Best,
Yun





 --Original Mail --
Sender:王宇 
Send Date:Tue Jun 23 09:42:47 2020
Recipients:User 
Subject:No Subject

Hi, all
 some error occurred when I run flink in minicluster, 
flink-version:1.11、scala-version:2.12.0.

Error:(33, 41) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))
Error:(33, 41) not enough arguments for method fromElements: (implicit 
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, 
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val solutionInput = env.fromElements((1, "1"))
Error:(34, 40) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val worksetInput = env.fromElements((2, "2"))
Error:(34, 40) not enough arguments for method fromElements: (implicit 
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, 
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val worksetInput = env.fromElements((2, "2"))
Error:(47, 41) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))

have tried 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#type-information-in-the-scala-api

thanks

Re: ElasticSearch_Sink

2020-07-15 Thread Yun Gao
Hi Dinesh,

   As far as I know, to implement the 2 phase commit protocol for one external 
system, I think the external system is required to provide some kind of 
transactions that could stay across sessions. With such a transaction mechansim 
then we could first start a new transaction and write the data, then we 
precommit the transaction when checkpointing and commit the transaction when on 
checkpoint complete notificatoin. After failover, we could be able to recover 
the transaction and abort them (if not precommitted) or commit them (if 
precommitted) again. As an example, for JDBC we may have to use XA transaction 
instead of normal JDBC transaction, since JDBC transaction will always be 
aborted when failover, even if we have precommitted.

  If such a transaction mechanism is not provided by the external system, we 
may have to use a secondary system (Like WAL logs or JDBC Table) to first cache 
the data and only write the data to the final system on commit. Note that since 
a transaction might be committed multiple times, the final system could still 
need to deduplicate the records or have some kind of transaction mechansim 
always aborted on failover.

Best,
Yun

--
Sender:C DINESH
Date:2020/07/16 11:01:02
Recipient:user
Theme:ElasticSearch_Sink

Hello All,

Can we implement 2 Phase Commit Protocol for elastic search sink. Will there be 
any limitations?

Thanks in advance.

Warm regards,
Dinesh. 


Re: hybrid state backends

2021-02-07 Thread Yun Gao

Hi Marco,

Sorry that current statebackend is a global configuration and could 
not be configured differently for different operators.

One possible alternative option to this requirements might be set rocksdb 
as the default statebackend, and for those operators that want to put state
 in memory, a new operator might be implemented by extends 
AbstractStreamOperator
and rewrite the snapshotState() method, and use raw state to snapshot the 
in-memory
data. However, this option would touch some non-user-level api of flink.

Best,
 Yun--
Sender:Marco Villalobos
Date:2021/02/05 19:09:37
Recipient:user
Theme:hybrid state backends

Is it possible to use different statebackends for different operators? There 
are certain situations where I want the state to reside completely in memory, 
and other situations where I want it stored in rocksdb. 


Re: UUID in part files

2021-02-07 Thread Yun Gao
Hi Dan

The SQL add the uuid by default is for the case that users want execute
multiple bounded sql and append to the same directory (hive table), thus
a uuid is attached to avoid overriding the previous output.

The datastream could be viewed as providing the low-level api and
thus it does not add the uuid automatically. And as you have pointed out,
by using OutputFileConfig users could also implement the functionality.

Best,
 Yun


 --Original Mail --
Sender:Dan Hill 
Send Date:Mon Feb 8 07:40:36 2021
Recipients:user 
Subject:UUID in part files

Hi.

Context
I'm migrating my Flink SQL job to DataStream.  When switching to 
StreamingFileSink, I noticed that the part files now do not have a uuid in 
them.  "part-0-0" vs "part-{uuid string}-0-0".  This is easy to add with 
OutputFileConfig.

Question
Is there a reason why the base OutputFileConfig doesn't add the uuid 
automatically?  Is this just a legacy issue?  Or do most people not have the 
uuid in the file outputs?


Re: Re: flink kryo exception

2021-02-07 Thread Yun Gao
Hi yidan,

One more thing to confirm: are you create the savepoint and stop the job all 
together with 

 bin/flink cancel -s [:targetDirectory] :jobId
command ?

Best,
 Yun



 --Original Mail --
Sender:赵一旦 
Send Date:Sun Feb 7 16:13:57 2021
Recipients:Till Rohrmann 
CC:Robert Metzger , user 
Subject:Re: flink kryo exception

It also maybe have something to do with my job's first tasks. The second task 
have two input, one is the kafka source stream(A), another is self-defined 
mysql source as broadcast stream.(B) 
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an 
offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only the 
subtask-0 will do mysql query and send out records, other subtasks do nothing. 
All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation, I do 
not know whether the problem have something to do with it.
赵一旦  于2021年2月7日周日 下午4:05写道:

The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based 
implementation before the data are transformed to the second task, and this 
case savepoint works.  The only problem is that, I should stop the job and 
remember the savepoint path, then restart job with the savepoint path. And now 
it is : I stop the job, then the job failed and restart automatically with the 
generated savepoint.  So I do not need to restart the job anymore, since what 
it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure 
that I can provide an example to reproduces the problem.  
Till Rohrmann  于2021年2月6日周六 上午12:13写道:

Could you provide us with a minimal working example which reproduces the 
problem for you? This would be super helpful in figuring out the problem you 
are experiencing. Thanks a lot for your help.

Cheers,
Till
On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:

Yeah, and if it is different, why my job runs normally.  The problem only 
occurres when I stop it. 
Robert Metzger  于2021年2月5日周五 下午7:08写道:

Are you 100% sure that the jar files in the classpath (/lib folder) are exactly 
the same on all machines? (It can happen quite easily in a distributed 
standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:

Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger  于2021年2月5日周五 下午6:52写道:

Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can 
lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying 
Flink, which state backend are you using, what kind of job (I guess DataStream 
API))

Somehow the process receiving the data is unable to deserialize it, most likely 
because they are configured differently (different classpath, dependency 
versions etc.)
On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:

I do not think this is some code related problem anymore, maybe it is some bug?
赵一旦  于2021年2月5日周五 下午4:30写道:

Hi all, I find that the failure always occurred in the second task, after the 
source task. So I do something in the first chaining task, I transform the 
'Map' based class object to another normal class object, and the problem 
disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint 
(all successful).

But, I also met another problem. Also this problem occurs while I stop the job, 
and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
at org.apache.flink.types.StringValue.readString(StringValue.java:783)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:1

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-07 Thread Yun Gao
Hi Jan,

From my view, I think in Flink Window should be as a "high-level" operation for 
some kind
of aggregation operation and if it could not satisfy the requirements, we could 
at least turn to
using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for each 
key, and increment
the value on each element. Then we could also register time for each key on 
receiving the first 
element for this key,  and in the onTimer callback, we could send the current 
state value, update
the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction


 --Original Mail --
Sender:Jan Brusch 
Send Date:Sat Feb 6 23:44:00 2021
Recipients:user 
Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem
Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
Which sounds simple enough in theory:

messageStream
 .keyBy(//EmitterType + MessageType)
 .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
Time.seconds(30)))
 .map(_ => 1)
 .reduce((x,y) => x + y)
 .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
in flink.

We came up with the following solution for the time being:

messageStream
 .keyBy(//EmitterType + MessageType)
 .window(GlobalWindows.create())
 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
 .evictor(// CustomEvictor: Evict all messages older than n minutes 
BEFORE processing the window)
 .process(// CustomCounter: Count all Messages in Window State);
 .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the window and the process-function will get triggered one 
last time on the now empty window, so we can produce a count of 0.

I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom 
process functions will always keep all messages in state. And, on every 
trigger all elements will have to be touched twice: To compare the 
timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like" 
approach to this. Sorry for the long writeup, but I would love to hear 
your takes.


Best regards
Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Yun Gao
Hi,

Have you also include the kakfa-connector related jar in the classpath?

Best,
Yun
 --Original Mail --
Sender:joris.vanagtmaal 
Send Date:Tue Feb 9 03:16:52 2021
Recipients:User-Flink 
Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in 
anomalies()
  File "streaming-dms.py", line 142, in anomalies
t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
 at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Yun Gao

Hi,

I also think there should be different ways to achieve the target. For the 
first option listed previously, 
the pseudo-code roughly like

class MyFunciton extends KeyedProcessFunction {
ValueState count;

void open() {
   count = ... // Create the value state
   }  

​void processElement(T t, Context context, Collector collector) {
​Integer current = count.get();
if (current == null) {
  context.timeService().registerTimer(30); // Register 
timer for the first time
  current = 0;
}

count.update(current + 1); // update the count
}

void onTimer(...) {
 collector.collect(new Tuple2<>(getCurrentKey(), count.get());
 context.timeService().registerTimer(30);  // register the following timer
}
}

1. For flink the state and timer are all bound to a key implicitly, thus I 
think they should
not need to be bound manually.
2. To clear the outdated state, it could be cleared via count.clear(); if it 
has been 0 
for a long time. There are different ways to count the interval, like register 
another timer
and clear the timer when received the elements or update the counter to -1, 
-2... to mark
how much timer it has passed.


Best,
 Yun





 --Original Mail --
Sender:Khachatryan Roman 
Send Date:Tue Feb 9 02:35:20 2021
Recipients:Jan Brusch 
CC:Yun Gao , user 
Subject:Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Hi,

Probably another solution would be to register a timer (using 
KeyedProcessFunction) once we see an element after keyBy. The timer will fire 
in windowIntervalMs. Upon firing, it will emit a dummy element which will be 
ignored (or subtracted) in the end.
Upon receiving each new element, the function will shift the timer accordingly.

Regards,
Roman

On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch  wrote:

Hi Yun,
thanks for your reply.
I do agree with your point about standard windows being for high level 
operations and the lower-level apis offering a rich toolset for most advanced 
use cases.
I have tried to solve my problem with keyedProcessFunctions also but was not 
able to get it to work for two reasons:
1) I was not able to set up a combination of ValueState, Timers and Triggers 
that emulated a sliding window with a rising and falling count (including 0) 
good enough.
2) Memory Leak: States / Windows should be cleared after a certain time of 
being at count 0 in order to prevent an infinitely rising of ValueStates (that 
are not needed anymore)

Can you maybe please elaborate in pseudocode how you would envision your 
solution?

Best regards
Jan
On 08.02.21 05:31, Yun Gao wrote:

Hi Jan,

From my view, I think in Flink Window should be as a "high-level" operation for 
some kind
of aggregation operation and if it could not satisfy the requirements, we could 
at least turn to
using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for each 
key, and increment
the value on each element. Then we could also register time for each key on 
receiving the first 
element for this key,  and in the onTimer callback, we could send the current 
state value, update
the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction


 --Original Mail --
Sender:Jan Brusch 
Send Date:Sat Feb 6 23:44:00 2021
Recipients:user 
Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem 
Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
 Which sounds simple enough in theory:

 messageStream
  .keyBy(//EmitterType + MessageType)
 .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
 Time.seconds(30)))
  .map(_ => 1)
  .reduce((x,y) => x + y)
  .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
 in flink.

 We came up with the following solution for the time being:

 messageStream
  .keyBy(//EmitterType + MessageType)
  .window(GlobalWindows.create())
 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
 .evictor(// CustomEvictor: Evict all messages older than n minutes 
 BEFORE processing the window)
 .process(// CustomCounter: Count all Messages in Window State);
  .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the wi

Re: Any plans to make Flink configurable with pure data?

2021-02-09 Thread Yun Gao
Hi Pilgrim,

Currently table indeed could not using low level api like timer, would a 
mixture of sql & datastream
could satisfy the requirements? A job might be created via multiple sqls, and 
connected via datastream
operations.

Best,
 Yun

--
Sender:Pilgrim Beart
Date:2021/02/09 02:22:46
Recipient:
Theme:Any plans to make Flink configurable with pure data?

To a naive Flink newcomer (me) it's a little surprising that there is no pure 
"data" mechanism for specifying a Flink pipeline, only "code" interfaces. With 
the DataStream interface I can use Java, Scala or Python to set up a pipeline 
and then execute it - but that doesn't really seem to need a programming model, 
it seems like configuration, which could be done with data? OK, one does need 
occasionally to specify some custom code, e.g. a ProcessFunction, but for any 
given use-case, a relatively static library of such functions would seem fine.

My use case is that I have lots of customers, and I'm doing a similar job for 
each of them, so I'd prefer to have a library of common code (e.g. 
ProcessFunctions), and then specify each customer's specific requirements in a 
single config file.  To do that in Java, I'd have to do metaprogramming (to 
build various pieces of Java out of that config file).

Flink SQL seems to be the closest solution, but doesn't appear to support 
fundamental Flink concepts such as timers (?). Is there a plan to evolve Flink 
SQL to support timers? Timeouts is my specific need.

Thanks,
-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot 





Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-09 Thread Yun Gao

Hi,

Could you have a try to add the jar via python configuration explicitly? It 
might refer to [1].

Best,
 Yun

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program
 
--
Sender:joris.vanagtmaal
Date:2021/02/09 15:50:27
Recipient:
Theme:Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Debugging long Flink checkpoint durations

2021-03-02 Thread Yun Gao
Hi Dan,

I think you could see the detail of the checkpoints via the checkpoint UI[1]. 
Also, if you see in the
pending checkpoints some tasks do not take snapshot,  you might have a look 
whether this task
is backpressuring the previous tasks [2].

Best,
Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
--
Sender:Dan Hill
Date:2021/03/02 04:34:56
Recipient:user
Theme:Debugging long Flink checkpoint durations

Hi.  Are there good ways to debug long Flink checkpoint durations?

I'm running a backfill job that runs ~10 days of data and then starts 
checkpointing failing.  Since I only see the last 10 checkpoints in the 
jobmaster UI, I don't see when it starts.

I looked through the text logs and didn't see much.

I assume:
1) I have something misconfigured that is causing old state is sticking around.
2) I don't have enough resources.



Re: Re: Checkpoint Error

2021-03-07 Thread Yun Gao
Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: java options to generate heap dump in EMR not working

2021-03-07 Thread Yun Gao
Hi,

I tried with the standalone session (sorry I do not have a yarn cluster in 
hand) and it seems that
the flink cluster could startup normally. Could you check the log of 
NodeManager to see the detail
reason that the container does not get launched? Also have you check if there 
are some spell error
or some unexpected special white space character for the configuration ?

For the case of configuring `env.java.opts`, it seems the JobManager also could 
not be launched with
this configuration.

Best,
Yun
 --Original Mail --
Sender:bat man 
Send Date:Sat Mar 6 16:03:06 2021
Recipients:user 
Subject:java options to generate heap dump in EMR not working

Hi,

I am trying to generate a heap dump to debug a GC overhead OOM. For that I 
added the below java options in flink-conf.yaml, however after adding this the 
yarn is not able to launch the containers. The job logs show it goes on 
requesting for containers from yarn and it gets them, again releases it. then 
again the same cycle continues. If I remove the option from flink-conf.yaml 
then the containers are launched and the job starts processing.

env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/dump.hprof"

If I try this then yarn client does not comes up -

env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/dump.hprof"

Am I doing anything wrong here?

PS: I am using EMR. 

Thanks,
Hemant

Re: How do I call an algorithm written in C++ in Flink?

2021-03-08 Thread Yun Gao
Hi Suxi,

Do you mean you want to call the algorithm in C++ ? If so, I think you could
do it the same with as you wrap it in SpringBoot project via JNI. I think you 
do not need to add a new operator, and you could use existing Flink API, and
you could load you library in open() and call the algorithm in the following 
processing method.

Best,
Yun



 --Original Mail --
Sender:苏喜 张 <15138217...@163.com>
Send Date:Mon Mar 8 14:12:02 2021
Recipients:user@flink.apache.org 
Subject:How do I call an algorithm written in C++ in Flink?

The company has provided an algorithm written in C++, which has been packaged 
into a.so file. I have built a SpringBoot project, which uses JNI to operate 
the algorithm written in C++. Could you please tell me how to call it in Flink? 
Do i need to define operators, chains of operators?
 

Re: Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Yun Gao
Hi Rainie,

From the code it seems the current problem does not use the time-related 
functionality like
window/timer? If so, the problem would be indepdent with the time type used.

Also, it would not likely due to rebalance() since the network layer has the 
check of sequence
number. If there are missed record there would be failover. 

Since the current logic seems not rely on too much complex functionality, would 
it be possible
that there might be some inconsistency between the flink implementation and the 
presto one ?

Best,
Yun


--
Sender:Rainie Li
Date:2021/03/08 17:14:30
Recipient:Smile
Cc:user
Theme:Re: Flink application has slightly data loss using Processing Time

Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will 
rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction>() {
  public void processElement(
  T element,
  ProcessFunction>.Context context,
  Collector> collector) {
for (StreamFilter filter : filters) {
  if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord result = new 
SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
  }
}
  }
})
.name("Process-" + eventClass.getSimpleName());
On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
Hi Rainie, 

 Could you please provide more information about your processing logic?
 Do you use window operators?
 If there's no time-based operator in your logic, late arrival data won't be
 dropped by default and there might be something wrong with your flat map or
 filter operator. Otherwise, you can use sideOutputLateData() to get the late
 data of the window and have a look at them. See [1] for more information
 about sideOutputLateData().

 [1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
 

 Regards,
 Smile



 --
 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun



 --Original Mail --
Sender:Dan Hill 
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user 
Subject:Gradually increasing checkpoint size

Hi!

I'm running a backfill Flink stream job over older data.  It has multiple 
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd 
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is 
that my checkpoint has a bunch of useless state in it.

- Dan

Re: Re: Re: Checkpoint Error

2021-03-08 Thread Yun Gao
Hi Navneeth,

Is the attached exception the root cause for the checkpoint failure ?
Namely is it also reported in job manager log?

Also, have you enabled concurrent checkpoint? 

Best,
 Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Mon Mar 8 13:10:46 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Checkpoint Error

Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's are 
mounted with this EFS. Not sure how to debug this.

Thanks
On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: questions about broadcasts

2021-03-08 Thread Yun Gao
Hi Marco,

(a) It is possible for an operator to receive two different kind of broadcasts, 

DataStream ints = 
DataStream strs = ...
ints.broadcast().connect(strs.broadcast())
​.process(new CoProcessFunction(){...});

(b) Traditional Flink operator could not accept three different inputs. 
There is a new MultipleInputOperator that could accept arbitrary number
of inputs [1]. However It is currently not expose directly to end users,
and you would need to work on some low-level api to use it. Or an alternative
might be use a tag to union the two input streams (or any two of the three 
inputs)
and use the (keyed)CoProcessFunction above. Also note that the broadcast is 
only a partitioner, 
and it is treated no difference with other partitioners for downstream 
operators.

Best,
Yun



[1] 
https://github.com/apache/flink/blob/51524de8fd337aafd30952873b36216c5a3c43bc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java#L261


--
Sender:Marco Villalobos
Date:2021/03/06 09:47:53
Recipient:user
Theme:questions about broadcasts

Is it possible for an operator to receive two different kinds of broadcasts?  

Is it possible for an operator to receive two different types of streams and a 
broadcast? For example, I know there is a KeyedCoProcessFunction, but is there 
a version of that which can also receive broadcasts? 


Re: New settings are not honored unless checkpoint is cleared.

2021-03-08 Thread Yun Gao
Hi Yordan,

What are the settings that are changed during the tests?

Best,
Yun


--
From:Yordan Pavlov 
Send Time:2021 Mar. 5 (Fri.) 23:36
To:user 
Subject:New settings are not honored unless checkpoint is cleared.

Hello there,
I am running Flink 1.11.3 on Kubernetes deployment. If I change a
setting and re-deploy my Flink setup, the new setting is correctly
applied in the config file but is not being honored by Flink. In other
words, I can ssh into the pod and check the config file - it has the
new setting as I would expect. However the web interface for the job
keeps showing the old configuration and Flink as a whole keep running
with the old setting. The way to have the new setting considered is to
clear the checkpoint for the job stored in Zookeeper. Then I recover
the job using:

--fromSavepoint path_to_savepoint_or_checkpoint

My presumption is that the job configuration is stored in Zookeeper
along with other Flink data. Could someone shed some light on what I
am observing.

Thank you!

Re: Re: How to check checkpointing mode

2021-03-08 Thread Yun Gao
Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of 
see#checkpointCfg#checkpointingMode ?

Best,
Yun


 --Original Mail --
Sender:Alexey Trenikhun 
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List , Yun Gao 

Subject:Re: How to check checkpointing mode

Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods 
dealing with command line parameters etc, below two major methods:

@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job 
args, since we add
// -- after jobs argument, this unnecessary for us arguments will be 
treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
  } catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
  } catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
  }
}
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws 
IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
  .getExecutionEnvironment()
  .enableCheckpointing(checkpointInterval.toMillis(),
  CheckpointingMode.AT_LEAST_ONCE)
  .setMaxParallelism(1024)
  .setParallelism(parallelism);
  if (externalizedCheckpoints) {
see.getCheckpointConfig()

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
  .putAll(kafkaCommonOptions)
  .putAll(kafkaProducerOptions)
  .varFiles(valueFiles)
  .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
  .semantic(Semantic.AT_LEAST_ONCE)
  .config(producerProperties)
  .build();

  final AutoTopic autoTopic = AutoTopic.builder()
  .config(producerProperties)
  .partitions(autoCreateTopicsPartitions)
  .replicationFactor(autoCreateTopicsReplicationFactor)
  .doNotCreateTopics(ImmutableSet.of(
  gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
  ))
  .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 
Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole 
TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in 
event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
  new TStateCleanupOnTimeout.Factory(
  maxCallDuration,
  postmortemCallDuration,
  globalAppConfig.timerGranularity()
  );

  @Nullable final SingleOutputStreamOperator cfgXform;
  @Nullable final DataStream cfgSource = addSources(see,
  SourceTopic.GCA_CFG,
  new CfgJsonDeserializationSchema(),
  (event, timestamp) -> event.getBatchId(),
  it -> !it.getHeartbeat());

  if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");

if (!isNullOrEmpty(gspCfg)) {
  cfgXform.addSink(producerFactory.create(gspCfg,
  autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg
  .uid("uid-" + gspCfg)
  .name(gspCfg);
} else {
  cfgXform.addSink(new DiscardingSink<>())
  .uid("uid-gsp-cfg-null")
  .name("gsp-cfg-null");
}
  } else {
cfgXform = null;
  }

  final DataStream voiceCallThreadSource = addSources(see,
  SourceTopic.VOICE_CALL_THREAD,
  callThreadFormat == KafkaTopicFormat.JSON
  ? new TJsonDeserializationSchema()
  : new CallEventDeserializationSchema(),
  (event, timestamp) ->
  Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
  ? timestamp - deviationMillis
  : Instants.toMillis(event.getTimestamp()),
  event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator tcmDataStream1 = 
voiceCallThreadSource
  .keyBy(CallEventKey::new)
  .process

Re: Re: Gradually increasing checkpoint size

2021-03-08 Thread Yun Gao
Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun



 --Original Mail --
Sender:Dan Hill 
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Gradually increasing checkpoint size

Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  
I also have my own operator that only outputs the first value.

public class OnlyFirstUser extends 
RichFlatMapFunction {

 private transient ValueState alreadyOutputted;

 @Override
 public void flatMap(T value, Collector out) throws Exception {
 if (!alreadyOutputted.value()) {
 alreadyOutputted.update(true);
 out.collect(value);
 }
 }

 @Override
 public void open(Configuration config) {
 ValueStateDescriptor descriptor =
 new ValueStateDescriptor<>(
 "alreadyOutputted", // the state name
 TypeInformation.of(new TypeHint() {}), // type information
 false); // default value of the state, if nothing was set
 alreadyOutputted = getRuntimeContext().getState(descriptor);
 }
}

All of my inputs have this watermark strategy.  In the Flink UI, early in the 
job run, I see "Low Watermarks" on each node and they increase.  After some 
checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(
 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));


Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:

Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun



 --Original Mail --
Sender:Dan Hill 
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user 
Subject:Gradually increasing checkpoint size

Hi!

I'm running a backfill Flink stream job over older data.  It has multiple 
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd 
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is 
that my checkpoint has a bunch of useless state in it.

- Dan

Re: Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Yun Gao
Hi Kai,

Yes, you are basically right, one minor point is that the start time is
taken as the time that the checkpoint get intiated in the JM side.

Best,
 Yun



 --Original Mail --
Sender:Kai Fu 
Send Date:Mon Apr 5 09:31:58 2021
Recipients:user 
Subject:Re: Meaning of checkpointStartDelayNanos

I found its meaning in the code. It means the delay of checkpoint action when 
the checkpoint barrier comes to the current operator since it's intiated in the 
source.


On Mon, Apr 5, 2021 at 9:21 AM Kai Fu  wrote:

Hi team,

I'm a little confused by the meaning of checkpointStartDelayNanos, I do not 
understand what time it exactly means, but it seems it's a quite important 
indicator for checkpoint/backpresure.  The explanation of it on metrics page 
does not help too much. Can someone help to explain it more clearly?

-- 
Best regards,- Kai

-- 
Best regards,- Kai

Re: Questions about checkpointAlignmentTime in unaligned checkpoint

2021-04-04 Thread Yun Gao
Hi Kai,

Under unaligned checkpoint settings, there are still alignment process. Although
the task could snapshot the state of the operators on received the first 
barrier and
emit barriers to the following tasks, it still need to wait till all the 
barriers to be received
before finalize the checkpoint, and during this process, it need to snapshot 
the buffers
that are skipped by the barrier, and the final snapshot would compose of both 
the operator
snapshots and the snapshots of the skipped buffers. 

Therefore, the checkpointAlignmentTime metric still exists.

Best,
Yun



 --Original Mail --
Sender:Kai Fu 
Send Date:Mon Apr 5 09:18:39 2021
Recipients:user 
Subject:Questions about checkpointAlignmentTime in unaligned checkpoint

Hi team,

I'm observing the metrics reporter still emits checkpointAlignmentTime metric 
in the unaligned checkpoint setting as shown in the figure below. Is it a 
meaningful in unaligned checkpoint, since I suppose the alignment operation 
only happens in aligned checkpoint.  



-- 
Best regards,- Kai

Re: Union of more then two streams

2021-04-04 Thread Yun Gao
Hi, 

With a.connect(b).coprocess(xx).connect(c).coprocess(xx), there would create two
operators, the first operators would union a and b and output the enriched 
data, 
and then .connect(c).coprocess(xx) would pass-throught the already enriched data
and enrich the record from c. Since the two operators could not get chained, 
the performance
seems would be affected.

Another method is to first label each input with a tag, e.g., ("a", a record), 
("b", b record), ..
and then use 

a.union(b).union(c).union(d).process(xx)

then in the process operator, different logic could be chosen according to the 
tag. 

If adding tag is hard, then it might need to use the new multiple-inputs 
operator, which somehow would need
to use the low-level API of Flink, thus I would recommend the above tag + union 
method first.

Best, 
Yun
 --Original Mail --
Sender:B.B. 
Send Date:Fri Apr 2 16:41:16 2021
Recipients:flink_user 
Subject:Union of more then two streams

Hi,

I have an architecture question regarding the union of more than two streams in 
Apache Flink.

We are having three and sometime more streams that are some kind of code book 
with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that 
doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and 
store the enrichment data as managed, keyed state (so when compact events from 
kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. 
Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key 
codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with 
main stream, eg. mainstream.conect(codebook_1) -> 
mainstreamWihtCodebook1.connect(codebook_2) - > 
mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

BB.

Re: Official flink java client

2021-04-22 Thread Yun Gao
Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
Hi Falvio,

Very thanks for the explanation, may be another option is to have a look at 
the http rest API[1] ? Flink provides official http api to submit jar jobs and 
query 
job status, and they might be able to help.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client

I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if you 
want to.
It is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use 
only but it actually work very well. 

Best,
Flavio

[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
Hi Flavio,

Got that, from my view I think RestClusterClient might not be viewed as public 
API, 
and might be change between version, thus it might need to be careful when 
upgrading.

Best,
Yun



 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 16:10:05 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Re: Official flink java client

Obviously I could rewrite a java client from scratch that interface with the 
provided REST API but why if I can reuse something already existing?
Usually I interface with REST API using auto generated clients (if APIs are 
exposed via Swagger or OpenApi).
If that's not an option, writing a REST client from scratch is something I try 
to avoid as much as I can..

Best,
Flavio
On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

Hi Falvio,

Very thanks for the explanation, may be another option is to have a look at 
the http rest API[1] ? Flink provides official http api to submit jar jobs and 
query 
job status, and they might be able to help.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client

I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if you 
want to.
It is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use 
only but it actually work very well. 

Best,
Flavio

[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: pojo warning when using auto generated protobuf class

2021-04-24 Thread Yun Gao
Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-11333
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html


 --Original Mail --
Sender:Prashant Deva 
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User 
Subject:pojo warning when using auto generated protobuf class

I am seeing this warning msg when trying to use a custom protobuf de/serializer 
with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
Class class com.xx.APITrace cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema, 
SerializationSchema {

override fun getProducedType(): TypeInformation {
return TypeInformation.of(Trace.APITrace::class.java)
}

override fun deserialize(message: ByteArray): Trace.APITrace {
return Trace.APITrace.parseFrom(message)
}

override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
return false
}

override fun serialize(element: Trace.APITrace): ByteArray {
return element.toByteArray()
}
}


Re: Too man y checkpoint folders kept for externalized retention.

2021-04-25 Thread Yun Gao
Hi John,

Logically the maximum retained checkpoints are configured
by state.checkpoints.num-retained [1]. Have you configured 
this option?


Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained



--
Sender:John Smith
Date:2021/04/24 01:41:41
Recipient:user
Theme:Too man y checkpoint folders kept for externalized retention.

Hi running 1.10.0.

Just curious is this specific to externalized retention or checkpointing in 
general.

I see my checkpoint folder counting thousands of chk-x folders.

If using default checkpoint or NONE externalized checkpointing does the count 
of chk- folders grow indefinitely until the job is killed or it retains up 
to certain amount?

Thanks 


Re: Re: pojo warning when using auto generated protobuf class

2021-04-26 Thread Yun Gao
Hi Prashant,

Flink should always give warnings as long as the deduced result 
is GenericType, no matter it uses the default kryo serializer or
the register one, thus if you have registered the type, you may 
simply ignore the warnings. To make sure it works, you may 
find the tm that the source tasks resides, and use jmap to 
see if ProtoSerializer is created or not.

Best,
Yun




 --Original Mail --
Sender:Prashant Deva 
Send Date:Sun Apr 25 01:18:41 2021
Recipients:Yun Gao 
CC:User 
Subject:Re: pojo warning when using auto generated protobuf class

so i did  register the type with Kryo and the ProtobufSerializer. However I am 
still continuing to see the warnings. is this a bug in Flink?

env.config.registerTypeWithKryoSerializer(Trace.APITrace::class.java, 
ProtobufSerializer::class.java)

 val stream: DataStreamSource = 
env.addSource(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props))

Sent via Superhuman

On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao  wrote:

Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-11333
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html


 --Original Mail --
Sender:Prashant Deva 
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User 
Subject:pojo warning when using auto generated protobuf class

I am seeing this warning msg when trying to use a custom protobuf de/serializer 
with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
Class class com.xx.APITrace cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema, 
SerializationSchema {

override fun getProducedType(): TypeInformation {
return TypeInformation.of(Trace.APITrace::class.java)
}

override fun deserialize(message: ByteArray): Trace.APITrace {
return Trace.APITrace.parseFrom(message)
}

override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
return false
}

override fun serialize(element: Trace.APITrace): ByteArray {
return element.toByteArray()
}
}


Re: Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2021-05-05 Thread Yun Gao
Hi Ragini,

How did you submit your job ? The exception here is mostly cuased
that the `flink-client` is not included in the classpath at the client side.
If the job is submitted via the flink cli, namely `flink run -c  xx.jar`,
it should be included by default, and if some programming way is used, then
the flink-client must be included in the dependency.

Best,
Yun


--
Sender:Ragini Manjaiah
Date:2021/05/04 17:25:30
Recipient:user
Theme:Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory 
found to execute the application.

Hi Team,
I am trying to submit a flink job of version 1.11.3 . The actual application is 
developed in flink 1.8.1.
Since the Hadoop cluster is 3.2.0 apache I downloaded flink 1.11.3 
(flink-1.11.3-bin-scala_2.11.tgz) and tried  to submit the job.
while submitting facing the below mentioned  exception . I have set the HADOOP 
parameters :

export HADOOP_CONF_DIR=/etc/hadoop/conf
export HADOOP_CLASSPATH=`hadoop classpath`

Is there any changes I need to do it the pom file to overcome this 


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: No ExecutorFactory found to execute the application.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute 
the application.
 at 
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1809)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
 at org.sapphire.appspayload.StreamingJob.main(StreamingJob.java:214)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 


Re: Define rowtime on intermediate table field

2021-05-05 Thread Yun Gao
Hi Sumeet,

I think you might first convert the table back to the DataStream [1],
then define the timestamp and watermark with 
`assignTimestampsAndWatermarks(...)`, 
and then convert it back to table[2].

Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#convert-a-table-into-a-datastream
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion



 --Original Mail --
Sender:Sumeet Malhotra 
Send Date:Tue May 4 16:32:10 2021
Recipients:user 
Subject:Define rowtime on intermediate table field

Hi,

My use case involves reading raw data records from Kafka and processing them. 
The records are coming from a database, where a periodic job reads new rows, 
packages them into a single JSON object (as described below) and writes the 
entire record to Kafka.

{
'id': 'some_id',
'key_a': 'value_a',
'key_b': 'value_b',
'result': {
'columns': [
'col_a',
'col_b',
'col_c',
'col_d'
],
'rows': [
['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'],
['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'],
['2021-05-04T05:23:13.953610Z', '655363', '2', '562'],
...
...
]
}
}

As can be seen, the row time is actually embedded in the `result` object.

What I'm doing at the moment is to run this data through a user defined table 
function, which parses the `result` object as a string, and emits multiple rows 
that include the timestamp field. This is working fine.

In the next step, I would want to perform windowing on this transformed data. 
That requires defining the event time attribute along with the watermark. As I 
understand, this can be done either during the initial table DDL definition or 
during conversion to a datastream.

Since I extract the timestamp value only after reading from Kafka, how can I 
define an event time attribute on the intermediate table that's basically a 
result of the user defined table function?

The only solution I can think of at the moment, is to write the intermediate 
table back to Kafka, and then create a new consumer that reads from Kafka, 
where I can define the event time attribute as part of its DDL. This most 
likely won't be good performance wise. I'm looking at any other way, I can 
define event time on results of my user defined table function?

Thanks in advance,
Sumeet



Re: Re: Handling "Global" Updating State

2021-05-16 Thread Yun Gao
Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some 
operator. The job
would keep the source alive after all the data are emit (like sleep forever), 
and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the 
broadcast state, it could
set the same uid and same state name with the corresponding operator in the 
first job, so it could
acqure the state content on startup.

Yun,
Best

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


 --Original Mail --
Sender:Rion Williams 
Send Date:Mon May 17 07:00:03 2021
Recipients:user 
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the 
bill for this scenario and keeping the downstream operators up-to-date as 
messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In 
my case, I need to have loaded all of my “lookup” topic into state before 
processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety 
into some collection like a hashmap (prior to executing the Flink pipeline to 
ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so 
I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known 
approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams  wrote:



Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. 
The gist of it is that I have a job that listens to a series of events from a 
Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on 
some configurations that are currently being stored within another Kafka topic. 
I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:

When Flink starts up, use a regular Kafka Consumer and read all of the 
configuration data from that topic in its entirety.
Store the messages from that topic in some type of thread-safe collection 
statically accessible by the operators downstream.
Expose the thread-safe collection within the operators to actually perform the 
filtering.
This doesn't seem right though. I was reading about BroadcastState which seems 
like it might fit the bill (e.g. keep those mappings in Broadcast state so that 
all of the downstream operations would have access to them, which I'd imagine 
would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a 
series of mappings that I want to keep relatively up to date in a Kafka topic, 
and I'm consuming from another Kafka topic that will need those mappings to 
filter against.

I'd be happy to share some of the approaches I currently have or elaborate a 
bit more if that isn't super clear.

Thanks much,

Rion



Re: Re: Handling "Global" Updating State

2021-05-18 Thread Yun Gao
Hi Rion,

Sorry for the late reply, another simpler method might indeed be in 
initializeState,
the operator directly read the data from the kafka to initialize the state.

Best,
Yun



 --Original Mail --
Sender:Rion Williams 
Send Date:Mon May 17 19:53:35 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Handling "Global" Updating State
Hi Yun,

That’s very helpful and good to know that the problem/use-case has been thought 
about. Since my need is probably shorter-term than later, I’ll likely need to 
explore a workaround.

Do you know of an approach that might not require the use of check pointing and 
restarting? I was looking into exploring initializeState within my 
broadcast-side stream to get it current and then simply listening to the Kafka 
topic as records come in. I’d imagine this would work, but that may be a bit of 
a naive approach.

Thanks!

Rion 

On May 17, 2021, at 1:36 AM, Yun Gao  wrote:



Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some 
operator. The job
would keep the source alive after all the data are emit (like sleep forever), 
and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the 
broadcast state, it could
set the same uid and same state name with the corresponding operator in the 
first job, so it could
acqure the state content on startup.

Yun,
Best

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


 --Original Mail --
Sender:Rion Williams 
Send Date:Mon May 17 07:00:03 2021
Recipients:user 
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the 
bill for this scenario and keeping the downstream operators up-to-date as 
messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In 
my case, I need to have loaded all of my “lookup” topic into state before 
processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety 
into some collection like a hashmap (prior to executing the Flink pipeline to 
ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so 
I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known 
approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams  wrote:



Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. 
The gist of it is that I have a job that listens to a series of events from a 
Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on 
some configurations that are currently being stored within another Kafka topic. 
I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:

When Flink starts up, use a regular Kafka Consumer and read all of the 
configuration data from that topic in its entirety.
Store the messages from that topic in some type of thread-safe collection 
statically accessible by the operators downstream.
Expose the thread-safe collection within the operators to actually perform the 
filtering.
This doesn't seem right though. I was reading about BroadcastState which seems 
like it might fit the bill (e.g. keep those mappings in Broadcast state so that 
all of the downstream operations would have access to them, which I'd imagine 
would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a 
series of mappings that I want to keep relatively up to date in a Kafka topic, 
and I'm consuming from another Kafka topic that will need those mappings to 
filter against.

I'd be happy to share some of the approaches I currently have or elaborate a 
bit more if that isn't super clear.

Thanks much,

Rion



  1   2   >