Re: In consistent Check point API response

2020-05-26 Thread Yun Tang
Hi Bhaskar

I think I have understood your scenario now. And I think this is what expected 
in Flink.
As you only allow your job could restore 5 times, the "restore" would only 
record the checkpoint to restore at the 5th recovery, and the checkpoint id 
would always stay there.

"Restored" is for last restored checkpoint and "completed" is for last 
completed checkpoint, they are actually not the same thing.
The only scenario that they're the same in numbers is when Flink just restore 
successfully before a new checkpoint completes.

Best
Yun Tang



From: Vijay Bhaskar 
Sent: Tuesday, May 26, 2020 12:19
To: Yun Tang 
Cc: user 
Subject: Re: In consistent Check point API response

Hi Yun
Understood the issue now:
"restored" always shows only the check point that is used for restoring 
previous state
In all the attempts < 6 ( in my case max attempts are 5, 6 is the last attempt)
  Flink HA is  restoring the state, so restored and latest are same value
if the last attempt  == 6
 Flink job already has few check points
 After that job failed and Flink HA gave up and marked the job state as "FAILED"
   At this point "restored". value is the one which is in 5'th attempt but 
latest is the one which is the latest checkpoint which is retained

Shall i file any documentation improvement Jira? I want to add more 
documentation with the help of  the above scenarios.

Regards
Bhaskar



On Tue, May 26, 2020 at 8:14 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Bhaskar

It seems I still not understand your case-5 totally. Your job failed 6 times, 
and recover from previous checkpoint to restart again. However, you found the 
REST API told the wrong answer.
How do you ensure your "restored" field is giving the wrong checkpoint file 
which is not latest? Have you ever checked the log in JM to view related 
contents: "Restoring job xxx from latest valid checkpoint: x@" [1] to know 
exactly which checkpoint choose to restore?

I think you could give a more concrete example e.g. which expected/actual 
checkpoint to restore, to tell your story.

[1] 
https://github.com/apache/flink/blob/8f992e8e868b846cf7fe8de23923358fc6b50721/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1250

Best
Yun Tang

From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Monday, May 25, 2020 17:01
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: In consistent Check point API response

Thanks Yun.
Here is the problem i am facing:

I am using  jobs/:jobID/checkpoints  API to recover the failed job. We have the 
remote manager which monitors the jobs.  We are using "restored" field of the 
API response to get the latest check point file to use. Its giving correct 
checkpoint file for all the 4 cases except the 5'th case. Where the "restored" 
field is giving the wrong check point file which is not latest.  When we 
compare the  check point file returned by  the "completed". field, both are 
giving identical checkpoints in all 4 cases, except 5'th case
We can't use flink UI in because of security reasons

Regards
Bhaskar

On Mon, May 25, 2020 at 12:57 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vijay

If I understand correct, do you mean your last "restored" checkpoint is null 
via REST api when the job failed 6 times and then recover successfully with 
another several successful checkpoints?

First of all, if your job just recovered successfully, can you observe the 
"last restored" checkpoint in web UI?
Secondly, how long will you cannot see the "restored " field  after recover 
successfully?
Last but not least, I cannot see the real difference among your cases, what's 
the core difference in your case(5)?

>From the implementation of Flink, it will create the checkpoint statics 
>without restored checkpoint and assign it once the latest savepoint/checkpoint 
>is restored. [1]

[1] 
https://github.com/apache/flink/blob/50253c6b89e3c92cac23edda6556770a63643c90/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1285

Best
Yun Tang


From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Monday, May 25, 2020 14:20
To: user mailto:user@flink.apache.org>>
Subject: In consistent Check point API response

Hi
I am using flink retained check points and along with   jobs/:jobid/checkpoints 
API for retrieving the latest retained check point
Following the response of Flink Checkpoints API:

I have my jobs restart attempts are 5
 check point API response in "latest" key, check point file name of both 
"restored" and "completed" values are having following behavior
1)Suppose the job is failed 3 times and recovered 4'th time, then both values 
are same
2)Suppose the job is failed 4 times and recovered 5'th time, then both values 
are same
3)Suppose the job is failed 5 times and recovered 6'th time, then both values 
are s

Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-26 Thread Aljoscha Krettek
I think what might be happening is that you're mixing dependencies from 
the flink-sql-connector-kafka and the proper flink-connector-kafka that 
should be used with the DataStream API. Could that be the case?


Best,
Aljoscha

On 25.05.20 19:18, Piotr Nowojski wrote:

Hi,

It would be helpful if you could provide full stack trace, what Flink version 
and which Kafka connector version are you using?

It sounds like either a dependency convergence error (mixing Kafka 
dependencies/various versions of flink-connector-kafka inside a single job/jar) 
or some shading issue. Can you check your project for such issues (`mvn 
dependency:tree` command [1]).

Also what’s a bit suspicious for me is the return type:


Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;


I’m not sure, but I was not aware that we are shading Kafka dependency in our 
connectors? Are you manually shading something?

Piotrek

[1] 
https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
 



On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:


public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
 @Override
 public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
 ProducerRecord record = new ProducerRecord<>(o.f0,
 o.f1.getBytes(StandardCharsets.UTF_8));
 return record;
 }
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
 "default", new MyKafkaSerializationSchema(),
 prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei
wangl...@geekplus.com.cn 






Re: In consistent Check point API response

2020-05-26 Thread Vijay Bhaskar
Thanks Yun. How can i contribute better documentation of the same by
opening Jira on this?

Regards
Bhaskar

On Tue, May 26, 2020 at 12:32 PM Yun Tang  wrote:

> Hi Bhaskar
>
> I think I have understood your scenario now. And I think this is what
> expected in Flink.
> As you only allow your job could restore 5 times, the "restore" would only
> record the checkpoint to restore at the 5th recovery, and the checkpoint id
> would always stay there.
>
> "Restored" is for last restored checkpoint and "completed" is for last
> completed checkpoint, they are actually not the same thing.
> The only scenario that they're the same in numbers is when Flink just
> restore successfully before a new checkpoint completes.
>
> Best
> Yun Tang
>
>
> --
> *From:* Vijay Bhaskar 
> *Sent:* Tuesday, May 26, 2020 12:19
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: In consistent Check point API response
>
> Hi Yun
> Understood the issue now:
> "restored" always shows only the check point that is used for restoring
> previous state
> In all the attempts < 6 ( in my case max attempts are 5, 6 is the last
> attempt)
>   Flink HA is  restoring the state, so restored and latest are same value
> if the last attempt  == 6
>  Flink job already has few check points
>  After that job failed and Flink HA gave up and marked the job state as
> "FAILED"
>At this point "restored". value is the one which is in 5'th attempt but
> latest is the one which is the latest checkpoint which is retained
>
> Shall i file any documentation improvement Jira? I want to add more
> documentation with the help of  the above scenarios.
>
> Regards
> Bhaskar
>
>
>
> On Tue, May 26, 2020 at 8:14 AM Yun Tang  wrote:
>
> Hi Bhaskar
>
> It seems I still not understand your case-5 totally. Your job failed 6
> times, and recover from previous checkpoint to restart again. However, you
> found the REST API told the wrong answer.
> How do you ensure your "restored" field is giving the wrong checkpoint
> file which is not latest? Have you ever checked the log in JM to view
> related contents: "Restoring job xxx from latest valid checkpoint: x@"
> [1] to know exactly which checkpoint choose to restore?
>
> I think you could give a more concrete example e.g. which expected/actual
> checkpoint to restore, to tell your story.
>
> [1]
> https://github.com/apache/flink/blob/8f992e8e868b846cf7fe8de23923358fc6b50721/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1250
>
> Best
> Yun Tang
> --
> *From:* Vijay Bhaskar 
> *Sent:* Monday, May 25, 2020 17:01
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: In consistent Check point API response
>
> Thanks Yun.
> Here is the problem i am facing:
>
> I am using  jobs/:jobID/checkpoints  API to recover the failed job. We
> have the remote manager which monitors the jobs.  We are using "restored"
> field of the API response to get the latest check point file to use. Its
> giving correct checkpoint file for all the 4 cases except the 5'th case.
> Where the "restored" field is giving the wrong check point file which is
> not latest.  When we compare the  check point file returned by  the
> "completed". field, both are giving identical checkpoints in all 4 cases,
> except 5'th case
> We can't use flink UI in because of security reasons
>
> Regards
> Bhaskar
>
> On Mon, May 25, 2020 at 12:57 PM Yun Tang  wrote:
>
> Hi Vijay
>
> If I understand correct, do you mean your last "restored" checkpoint is
> null via REST api when the job failed 6 times and then recover successfully
> with another several successful checkpoints?
>
> First of all, if your job just recovered successfully, can you observe the
> "last restored" checkpoint in web UI?
> Secondly, how long will you cannot see the "restored " field  after
> recover successfully?
> Last but not least, I cannot see the real difference among your cases,
> what's the core difference in your case(5)?
>
> From the implementation of Flink, it will create the checkpoint statics
> without restored checkpoint and assign it once the latest
> savepoint/checkpoint is restored. [1]
>
> [1]
> https://github.com/apache/flink/blob/50253c6b89e3c92cac23edda6556770a63643c90/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1285
>
> Best
> Yun Tang
>
> --
> *From:* Vijay Bhaskar 
> *Sent:* Monday, May 25, 2020 14:20
> *To:* user 
> *Subject:* In consistent Check point API response
>
> Hi
> I am using flink retained check points and along with
>  jobs/:jobid/checkpoints API for retrieving the latest retained check point
> Following the response of Flink Checkpoints API:
>
> I have my jobs restart attempts are 5
>  check point API response in "latest" key, check point file name of both
> "restored" and "completed" values are having following behavior
> 1)Suppose the job is failed 3 times and recovered 4'th time, then both
> values are s

Question on Job Restart strategy

2020-05-26 Thread Vijay Bhaskar
Hi
We are using restart strategy of fixed delay.
I have fundamental question:
Why the reset counter is not zero after streaming job restart is
successful?
Let's say I have number of restarts max are: 5
My streaming job tried 2 times and 3'rd attempt its successful, why counter
is still 2 but not zero?
Traditionally in network world, clients will retry for some time and once
they are successful, they will reset the counter back to zero.

Why this is the case in flink?

Regards
Bhaskar


Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-26 Thread Leonard Xu
Hi,wanglei

I think Aljoscha is wright. Could you post your denpendency list?
Dependency flink-connector-kafka is used in dataStream Application which you 
should use, dependency flink-sql-connector-kafka is used in Table API & SQL 
Application. We should only add one of them because the two dependency will 
conflict.   

Best,
Leonard Xu

> 在 2020年5月26日,15:02,Aljoscha Krettek  写道:
> 
> I think what might be happening is that you're mixing dependencies from the 
> flink-sql-connector-kafka and the proper flink-connector-kafka that should be 
> used with the DataStream API. Could that be the case?
> 
> Best,
> Aljoscha
> 
> On 25.05.20 19:18, Piotr Nowojski wrote:
>> Hi,
>> It would be helpful if you could provide full stack trace, what Flink 
>> version and which Kafka connector version are you using?
>> It sounds like either a dependency convergence error (mixing Kafka 
>> dependencies/various versions of flink-connector-kafka inside a single 
>> job/jar) or some shading issue. Can you check your project for such issues 
>> (`mvn dependency:tree` command [1]).
>> Also what’s a bit suspicious for me is the return type:
>>> Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>> I’m not sure, but I was not aware that we are shading Kafka dependency in 
>> our connectors? Are you manually shading something?
>> Piotrek
>> [1] 
>> https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
>>  
>> 
>>> On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:
>>> 
>>> 
>>> public class MyKafkaSerializationSchema implements 
>>> KafkaSerializationSchema> {
>>> @Override
>>> public ProducerRecord serialize(Tuple2 
>>> o, @Nullable Long aLong) {
>>> ProducerRecord record = new ProducerRecord<>(o.f0,
>>> o.f1.getBytes(StandardCharsets.UTF_8));
>>> return record;
>>> }
>>> }
>>> FlinkKafkaProducer> producer = new 
>>> FlinkKafkaProducer>(
>>> "default", new MyKafkaSerializationSchema(),
>>> prop2,Semantic.EXACTLY_ONCE);
>>> 
>>> But there's  error when runnng:
>>> 
>>> java.lang.AbstractMethodError: 
>>> com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>>> 
>>> Any suggestion on this?
>>> 
>>> Thanks,
>>> Lei
>>> wangl...@geekplus.com.cn 
> 



Re: Testing process functions

2020-05-26 Thread Manas Kale
Thank you for the example, Alexander.

On Wed, May 20, 2020 at 6:48 PM Alexander Fedulov 
wrote:

> Hi Manas,
>
> I would recommend using TestHarnesses for testing. You could also use them
> prior to 1.10. Here is an example of setting the dependencies:
>
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113
>
> You can see some examples of tests for a demo application here:
>
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java
> Hope this helps.
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Mon, May 18, 2020 at 1:18 PM Manas Kale  wrote:
>
>> I see, I had not considered the serialization; that was the issue.
>> Thank you.
>>
>> On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler 
>> wrote:
>>
>>> We don't publish sources for test classes.
>>>
>>> Have you considered that the sink will be serialized on job submission,
>>> meaning that your myTestSink instance is not the one actually used by
>>> the job? This typically means that have to store stuff in a static field
>>> instead.
>>> Alternatively, depending on the number of elements
>>> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
>>> be worth a try.
>>>
>>> On 15/05/2020 12:49, Manas Kale wrote:
>>> > Hi,
>>> > How do I test process functions? I tried by implementing a sink
>>> > function that stores myProcessFunction's output in a list. After
>>> > env.execute(), I use assertions.
>>> > If I set a breakpoint in the myTestSink's invoke() method, I see that
>>> > that method is being called correctly. However, after env.execute()
>>> > returns, all data in sink functions is wiped clean.
>>> >
>>> > TestSink myTestSink = new myTestSink();
>>> > testStream.process(new myProcessFunction()).addSink(myTestSink);
>>> > env.execute("test");
>>> > assertEquals(expectedOutput, myTestSink.actual);
>>> >
>>> > What am I doing wrong?
>>> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
>>> > 1.10. I wasn't able to download its sources to understand how I could
>>> > use that. Have the sources not been added to maven or is it a problem
>>> > at my end?
>>> >
>>> > Regards,
>>> > Manas
>>>
>>>
>>>


Re: How can I set the parallelism higher than the task slot number in more machines?

2020-05-26 Thread Till Rohrmann
Hi Felipe,

Flink does not create dummy operators. Unless you have configured one
operator to have a parallelism of 32, you should actually only see 16
subtasks of a given operator (given that you start your program with -p
16). Be aware, though, that if you have multiple operators which cannot
share the same slot, then Flink will need p_1 + p_2 + ... + p_n slots where
p_i is the max parallelism of every slot sharing group you have defined in
your job.

Cheers,
Till

On Mon, May 25, 2020 at 7:41 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Solved! that was because I was using slotSharingGroup() in all
> operators to ensure that they stay in the same task slot. I guess
> Flink was creating dummy operators to ensure that.
>
> Thanks anyway.
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Mon, May 25, 2020 at 5:54 PM Felipe Gutierrez
>  wrote:
> >
> > Hi,
> >
> > I deployed Flink 1.10 standalone in a cluster with 4 machines 8 cores
> > each. Then I configured each machine to have 8 Task Slots and
> > parallelism default of 8.
> > taskmanager.numberOfTaskSlots: 8
> > parallelism.default: 8
> > I want to run my stream app with a parallelism of 16 for each subtask.
> > But not having more than 8 subtasks in one TaskManager because Flink
> > will make them share memory [1]. I suppose that I can deploy half of
> > the subtasks in one machine and the second half in another machine. Is
> > it correct?
> >
> > Then I deployed a program using "flink run -p 16 myApp.jar" and I was
> > monitoring it with Prometheus + Grafana and Flink created 16 subtasks
> > in two nodes. I mean that I am seeing 32 subtasks of each operator
> > that I am using. I think I was supposed to see only 16 of each
> > operator. Is there something wrong with my configuration?
> > Additional to that I also tried to use .setParallelism(16) but I got
> > the same result. 32 subtasks of the same operator.
> >
> > Thanks,
> > Felipe
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-numberoftaskslots
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
>


Re: close file on job crash

2020-05-26 Thread Laurent Exsteens
Thanks!

On Tue, May 26, 2020, 08:13 Piotr Nowojski  wrote:

> Hi,
>
> One clarification. `RichFunction#close` is of course called always, not
> only after internal failure. It’s called after internal failure, external
> failure or clean shutdown.
>
> `SourceFunction#cancel` is intended to inform the `SourceFunction` to
> cleanly exit it’s `#run` method/loop (note SIGINT will be issued anyway).
> In this case `#close` also will be called after source’s threads exit.
>
> Piotrek
>
> On 25 May 2020, at 21:37, Laurent Exsteens 
> wrote:
>
> Thank you, we'll try that.
>
> On Mon, May 25, 2020, 21:09 Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Cancel method is being invoked only when SourceTask is being cancelled
>> from the outside, by JobManager - for example after detecting a failure of
>> a different Task.
>>
>> > What is the proper way to handle this issue? Is there some kind of
>> closable source interface we should implement?
>>
>> Have you tried implementing
>> `org.apache.flink.api.common.functions.RichFunction#close` (extending
>> `AbstractRichFunction` and overloading `#close`)? This method should be
>> invoked when StreamTask is disposing it’s operators after an internal
>> failure.
>>
>> Piotrek
>>
>> On 25 May 2020, at 10:49, Laurent Exsteens 
>> wrote:
>>
>> Hello,
>>
>> we had to implement a specific source to read files in a certain way. The
>> files we read are a NAS mounted through NFS.
>>
>> If an error occurs in a map after this specific source when the file is
>> still being read, the file is never closed, resulting in the task manager
>> keeping the file open (apparently) indefinitely, and the file not allowed
>> to be moved until the task manager releases it
>> We then have to kill the full task manager in order to release the file.
>>
>> I already added a closing of the file in the cancel method of the source.
>> But this does not seem to be sufficient.
>>
>> What is the proper way to handle this issue? Is there some kind of
>> closable source interface we should implement?
>>
>> Thanks in advance for your help.
>>
>> Best Regards,
>>
>> Laurent.
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>> Rue Emile Francqui, 4
>> 1435 Mont-Saint-Guibert
>> (T) +32 10 75 02 00
>>
>> *euranova.eu *
>> *research.euranova.eu* 
>>
>> ♻ Be green, keep it on the screen
>>
>>
>>
> ♻ Be green, keep it on the screen
>
>
>

-- 
♻ Be green, keep it on the screen


Modified & rebuilt Flink source code but changes do not work

2020-05-26 Thread Qi K.
Hi folks,


Within our team, we made some simple changes to the source code of 
flink-runtime module (mostly related to log levels, like INFO -> WARN). 


Then we rebuilt the whole Flink project using `mvn clean install -DskipTests` 
command (Flink version = 1.9.3, Maven version = 3.2.5), the process finished 
without errors.


After that, we copied the generated `flink-dist_2.11-1.9.3.jar` to 
`${FLINK_HOME}/lib` directory, replacing the old one.


Finally, we submitted a simple streaming job w/ `flink run` on our YARN cluster 
(not in YARN session mode), but the changes we made to the code didn't seem to 
take effect at all. i.e. JobManager/TaskManager logs were exactly like the 
original version.


Dependency scope of the streaming job was already configured as `provided`. 
When we found the flink-dist JAR from HDFS, it truly was the newly-built one.


This is quite confusing to us, so any help is appreciated.


Many thanks



















Re: Using Queryable State within 1 job + docs suggestion

2020-05-26 Thread Annemarie Burger
Hi,

I managed to work around the JobID issues, by first starting the task that
queries the state, pauzing it, and then using env.executeAsync.getJobID to
get the proper jobID to use when querying the state, and passing that to the
(pauzed) query state task, which can then continue.

However, the Queryable state CompletableFuture objects always return empty.
Below is the relevant code. Any idea what I'm doing wrong? Any help much
appreciated.


The state is a MapState>.
This represents a edge list of a graph, sorted by source vertex id, and then
by target vertex id. 

// The method call to get all edges from another graph partition/thread
which have a certain srcId. 
HashMap answer = QS.getSrcVertex(partitionId,
srcId);

 // The method itself. The answer returned is always null, even when the
queried partition's state includes the srcId. 
 public HashMap getSrcVertex(Integer
partitionId, GradoopId srcVertex) throws ExecutionException,
InterruptedException {
CompletableFuture>> resultFuture =
client.getKvState(
jobID,
"queryableState",
partitionId,
new TypeHint(){},
descriptor);
AtomicReference> answer = new
AtomicReference<>();
resultFuture.thenAccept(response -> {

// These prints are never reached
try {
answer.set(response.get(srcVertex));
System.out.println(response.get(srcVertex));
} catch (Exception e) {
System.out.println("We dont have state");
}
});
return answer.get();
}

// The descriptor used
descriptor =
new MapStateDescriptor>(
"edgeList",
TypeInformation.of(new TypeHint() {
}).createSerializer(new ExecutionConfig()),
TypeInformation.of(new
TypeHint>() {
}).createSerializer(new ExecutionConfig())
);

// The client
client = new QueryableStateClient("127.0.0.1", 9067);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Modified & rebuilt Flink source code but changes do not work

2020-05-26 Thread Congxian Qiu
Hi

If you commit the change in you local git repo, could you please check
whether the commitid in job log(such as `Rev:28bdd33`, the 28bdd33 is the
commit id) is the same as the local commit id?

Best,
Congxian


Qi K.  于2020年5月26日周二 下午4:47写道:

> Hi folks,
>
> Within our team, we made some simple changes to the source code of
> flink-runtime module (mostly related to log levels, like INFO -> WARN).
>
> Then we rebuilt the whole Flink project using `mvn clean install
> -DskipTests` command (Flink version = 1.9.3, Maven version = 3.2.5), the
> process finished without errors.
>
> After that, we copied the generated `flink-dist_2.11-1.9.3.jar` to
> `${FLINK_HOME}/lib` directory, replacing the old one.
>
> Finally, we submitted a simple streaming job w/ `flink run` on our YARN
> cluster (not in YARN session mode), but the changes we made to the code
> didn't seem to take effect at all. i.e. JobManager/TaskManager logs were
> exactly like the original version.
>
> Dependency scope of the streaming job was already configured as
> `provided`. When we found the flink-dist JAR from HDFS, it truly was the
> newly-built one.
>
> This is quite confusing to us, so any help is appreciated.
>
> Many thanks
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Piotr Nowojski
Hi,

You could easily filter/map/process the streams differently before writing them 
to the sinks. Building on top of my previous example, this also should work 
fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you 
wish. `foo` and `bar` would be applied once to the stream, before it’s going to 
be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be 
applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of 
them.

Piotrek 


> On 26 May 2020, at 06:45, Prasanna kumar  
> wrote:
> 
> Piotr, 
> 
> Thanks for the reply. 
> 
> There is one other case, where some events have to be written to multiple 
> sinks and while other have to be written to just one sink. 
> 
> How could i have a common codeflow/DAG for the same ?
> 
> I do not want multiple jobs to do the same want to accomplish in a single job 
> .
> 
> Could i add Stream code "myStream.addSink(sink1)" under a conditional 
> operator such as 'if' to determine . 
> 
> But i suppose here the stream works differently compared to normal code 
> processing.
> 
> Prasanna.
> 
> 
> On Mon 25 May, 2020, 23:37 Piotr Nowojski,  > wrote:
> Hi,
> 
> To the best of my knowledge the following pattern should work just fine:
> 
> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
> any ;
> myStream.addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
> 
> All of the records from `myStream` would be passed to each of the sinks.
> 
> Piotrek
> 
> > On 24 May 2020, at 19:34, Prasanna kumar  > > wrote:
> > 
> > Hi,
> > 
> > There is a single source of events for me in my system. 
> > 
> > I need to process and send the events to multiple destination/sink at the 
> > same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
> > 
> > I am able send to one sink.
> > 
> > By adding more sink stream to the source stream could we achieve it . Are 
> > there any shortcomings.  
> > 
> > Please let me know if any one here has successfully implemented one .
> > 
> > Thanks,
> > Prasanna.
> 



Re: Stateful-fun-Basic-Hello

2020-05-26 Thread Igal Shilman
Hi,
Can you verify that your jar contains the following file
META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule ?

Thanks,
Igal.

On Tue, May 26, 2020 at 11:49 AM C DINESH  wrote:

> Hi Gordon,
>
> Thanks for your response.
>
> After adding this conf to flink-yml.
>
> `classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf`
>
> It gave me one more error
>
> The main method caused an error: Invalid configuration:
> jobmanager.scheduler; Currently the only supported scheduler is 'legacy'
>
> I updated to
>
> jobmanager.scheduler : legacy
>
> in flink-conf.yaml
>
> But know I got one more error. Which is self-explanatory. But actually I
> have provided ingress and egress in the module. I have attached a screen
> shot of my code. Please suggest me what to do.
>
> $ ./bin/flink run -c
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> /Users/dineshchiramana/learning/flink_learning/stateful-fun-hello-java/target/stateful-fun-hello-java-1.0-SNAPSHOT-jar-with-dependencies.jar
>
>
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: There are no ingress defined.
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>
> Caused by: java.lang.IllegalStateException: There are no ingress defined.
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
>
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
> ... 8 more
>
>
>
>
> Cheers,
> Dinesh.
>
>
> On Tue, May 26, 2020 at 9:59 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> You're right, maybe the documentation needs a bit more directions there,
>> especially for people who are newer to Flink.
>>
>> 1. How to increase parallelism
>>
>> There are two ways to do this. Either set the `parallelism.default` also
>> in the flink-conf.yaml, or use the -p command line option when starting the
>> application via packaged Docker images.
>>
>> 2. How to enable checkpointing
>>
>> You would have to set execution.checkpointing.mode and
>> execution.checkpointing.interval configs, also in flink-conf.yaml.
>>
>> For example, the mode can be set to `EXACTLY_ONCE` and interval to `5sec`
>> to have exactly-once mode checkpoints at 5 second intervals.
>>
>>
>> In general, the Statefun specific configurations are listed here [1].
>> All other configurations available in Flink are also available in
>> Stateful Functions as well.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/deployment-and-operations/configurations.html
>>
>> On Tue, May 26, 2020, 11:42 AM C DINESH  wrote:
>>
>>> Hi Team,
>>>
>>> I mean to say that know I understood. but in the documentation page
>>> flink-conf.yaml is not mentioned
>>>
>>> On Mon, May 25, 2020 at 7:18 PM C DINESH 
>>> wrote:
>>>
 Thanks Gordon,

 I read the documentation several times. But I didn't understand at that
 time, flink-conf.yaml is not there.

 can you please suggest
 1. how to increase parallelism
 2. how to give checkpoints to the job

 As far as I know there is no documentation regarding this. or Are these
 features are not there yet?

 Cheers,
 Dinesh.

>>>


Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Prasanna kumar
Thanks Piotr for the Reply.

I will explain my requirement in detail.

Table Updates -> Generate Business Events -> Subscribers

*Source Side*
There are CDC of 100 tables which the framework needs to listen to.

*Event Table Mapping*

There would be Event associated with table in a *m:n* fashion.

say there are tables TA, TB, TC.

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC

*Event Sink Mapping*

EA has following sinks. kafka topic SA,SA2,SAC.
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC.

The point is the sink are not predefined. [. But i only see the example
online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years
time.

We are looking at writing a generic job for the same , rather than writing
one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski  wrote:

> Hi,
>
> You could easily filter/map/process the streams differently before writing
> them to the sinks. Building on top of my previous example, this also should
> work fine:
>
>
> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
> but any ;
>
> myStream.baz().addSink(sink1);
> myStream.addSink(sink2);
> myStream.qux().quuz().corge().addSink(sink3);
>
> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that
> you wish. `foo` and `bar` would be applied once to the stream, before it’s
> going to be split to different sinks, while `baz`, `qux`, `quuz` and
> `corge` would be applied to only of the sinks AFTER splitting.
>
> In your case, it could be:
>
> myStream.filter(...).addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
>
> So sink2 and sink3 would get all of the records, while sink1 only a
> portion of them.
>
> Piotrek
>
>
> On 26 May 2020, at 06:45, Prasanna kumar 
> wrote:
>
> Piotr,
>
> Thanks for the reply.
>
> There is one other case, where some events have to be written to multiple
> sinks and while other have to be written to just one sink.
>
> How could i have a common codeflow/DAG for the same ?
>
> I do not want multiple jobs to do the same want to accomplish in a single
> job .
>
> Could i add Stream code "myStream.addSink(sink1)" under a conditional
> operator such as 'if' to determine .
>
> But i suppose here the stream works differently compared to normal code
> processing.
>
> Prasanna.
>
>
> On Mon 25 May, 2020, 23:37 Piotr Nowojski,  wrote:
>
>> Hi,
>>
>> To the best of my knowledge the following pattern should work just fine:
>>
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
>> but any ;
>> myStream.addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.addSink(sink3);
>>
>> All of the records from `myStream` would be passed to each of the sinks.
>>
>> Piotrek
>>
>> > On 24 May 2020, at 19:34, Prasanna kumar 
>> wrote:
>> >
>> > Hi,
>> >
>> > There is a single source of events for me in my system.
>> >
>> > I need to process and send the events to multiple destination/sink at
>> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>> >
>> > I am able send to one sink.
>> >
>> > By adding more sink stream to the source stream could we achieve it .
>> Are there any shortcomings.
>> >
>> > Please let me know if any one here has successfully implemented one .
>> >
>> > Thanks,
>> > Prasanna.
>>
>>
>


Re: Apache Flink - Question about application restart

2020-05-26 Thread M Singh
 Hi Zhu Zhu:
I have another clafication - it looks like if I run the same app multiple times 
- it's job id changes.  So it looks like even though the graph is the same the 
job id is not dependent on the job graph only since with different runs of the 
same app it is not the same.
Please let me know if I've missed anything.
Thanks
On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh  
wrote:  
 
  Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks





ClusterClientFactory selection

2020-05-26 Thread M Singh
Hi:
I wanted to find out which parameter/configuration allows flink cli pick up the 
appropriate cluster client factory (especially in the yarn mode).
Thanks

Re: Question on Job Restart strategy

2020-05-26 Thread Gary Yao
Hi Bhaskar,

> Why the reset counter is not zero after streaming job restart is successful?

The short answer is that the fixed delay restart strategy is not
implemented like that (see [1] if you are using Flink 1.10 or above).
There are also other systems that behave similarly, e.g., Apache
Hadoop YARN (see yarn.resourcemanager.am.max-attempts).

If you have such a requirement, you can try to approximate it using
the failure rate restart strategy [2]. Resetting the attempt counter
to zero after a successful restart cannot be easily implemented with
the current RestartBackoffTimeStrategy interface [3]; for this to be
possible, the strategy would need to be informed if a restart was
successful. However, it is not clear what constitutes a successful
restart. For example, is it sufficient that enough TMs/slots could be
acquired to run the job? The job could still fail afterwards due to a
bug in user code. Could it be sufficient to require all tasks to
produce at least one record? I do not think so because the job could
still fail deterministically afterwards due to a particular record.

Best,
Gary

[1] 
https://github.com/apache/flink/blob/d1292b5f30508e155d0f733527532d7c671ad263/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FixedDelayRestartBackoffTimeStrategy.java#L29
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#failure-rate-restart-strategy
[3] 
https://github.com/apache/flink/blob/d1292b5f30508e155d0f733527532d7c671ad263/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java#L23


On Tue, May 26, 2020 at 9:28 AM Vijay Bhaskar  wrote:
>
> Hi
> We are using restart strategy of fixed delay.
> I have fundamental question:
> Why the reset counter is not zero after streaming job restart is successful?
> Let's say I have number of restarts max are: 5
> My streaming job tried 2 times and 3'rd attempt its successful, why counter 
> is still 2 but not zero?
> Traditionally in network world, clients will retry for some time and once 
> they are successful, they will reset the counter back to zero.
>
> Why this is the case in flink?
>
> Regards
> Bhaskar


Re: Question about My Flink Application

2020-05-26 Thread Sara Arshad
Hi Alexander,

Thank you for your reply.
I got a reply from AWS people. Seems like it's a configuration problem.
But, even if it works fine without restarting, it's not a good option for
us.
There is no one-to-one relation between cache data and keyed values.
Therefore, It has to send the whole data to every key every 5 minutes and
we may have a very large number of keys at the same time.
So I came up with a completely different solution. Now, I only have the
cache in a shared MAP. Maybe, It is not that much good design-wise but it
has higher performance.

Best regards,
Sara



On Sat, May 23, 2020 at 1:04 PM Alexander Fedulov 
wrote:

> Returning the discussion to the mailing list ( it accidentally went to a
> side channel because of a direct reply).
> What I was referring to, is the event-time processing semantic, which is
> based on the watermarks mechanism [1].
> If you are using it, the event time at your KeyedBroadcastProcessFuction
> will be determined as a minimum value of the maximum watermarks observed
> across all of the input channels. In order not to stall the processing of
> the events of the main data flow by the control channel (broadcast stream),
> you could set it's watermark to the maximum possible value, as shown in
> this example [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
> [2]
> https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
>
>
> On Sat, May 23, 2020 at 1:05 AM Sara Arshad 
> wrote:
>
>> - It was based on something I read about the broadcast.
>> Besides, as I mentioned before, the restart happens when it's triggering
>> checkpoints.
>> - When I send the streams it processes it perfectly fine between restarts.
>> - Yes, I am using ProcessingTimeService in the cache source to make it
>> get data every 300 seconds.
>> Do you have any views on should it be doable with a stream of a million
>> messages, In case I improve my implementation?
>>
>> Best regards,
>> Sara
>>
>> On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov <
>> alexan...@ververica.com> wrote:
>>
>>> OK, with such data sizes this should definitely be doable with a
>>> broadcast channel.
>>> "The problem was that the broadcast puts a lot of pressure on
>>> checkpointing." - is this the evaluation of the AWS support? Do you have
>>> any details as to why this is considered to be the case?
>>> "Even before I start to send the Kinesis stream it stuck." - so do you
>>> actually see any data output or nothing is happening and 20 minutes later
>>> the job crashes?
>>> Are you using event time processing semantics in your pipeline?
>>>
>>> --
>>>
>>> Alexander Fedulov | Solutions Architect
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> On Fri, May 22, 2020 at 4:34 PM Sara Arshad 
>>> wrote:
>>>
 Hi Alexander,

 It's not that much data. I have only 2 records in my dynamodb right now
 (later it can be around 100 records. it's not that much) and I update
 the whole data every 300 seconds.
 Even before I start to send the Kinesis stream it stuck.
 Yes, I can see the checkpoint size is around 150k. But in some cases
 when I sent Kinesis Stream of 80 messages it's around 190k.
 The maximum checkpoint duration is 670.

 Regards,


 On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, <
 alexan...@ververica.com> wrote:

> Hi Sara,
>
> what is the volume of data that is coming in through the broadcast
> channel every 30 seconds? Do you only insert modified rules entries or all
> of them on each update?
> Do you have access to metrics? Specifically, the size of the
> checkpoints and time distribution of different checkpoint phases are of
> interest.
>
> Best,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> On Fri, May 22, 2020 at 3:57 PM Sara Arshad 
> wrote:
>
>> The problem was that the broadcast puts a lot of pressure on
>> checkpointing.
>> I have to find another solution.
>> If you have any other solution please let me know.
>>
>> Regards,
>> Sara
>>
>> On Wed, 20 May 2020, 5:55 pm Sara Arshad, 
>> wrote:
>>
>>> That was the broadcast stream. Which is supposed to behave like a
>>> cache.
>>> Then I connect that one to the kinesis stream like the below code.
>>> Also, I have two Sink

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Piotr Nowojski
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the 
job. You can have some custom logic, that would filter out records before 
writing them to the sinks, as I proposed before, or you could use side outputs 
[1] would be better suited to your use case? 

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
 


> On 26 May 2020, at 12:20, Prasanna kumar  
> wrote:
> 
> Thanks Piotr for the Reply. 
> 
> I will explain my requirement in detail. 
> 
> Table Updates -> Generate Business Events -> Subscribers 
> 
> Source Side
> There are CDC of 100 tables which the framework needs to listen to. 
> 
> Event Table Mapping
> 
> There would be Event associated with table in a m:n fashion. 
> 
> say there are tables TA, TB, TC. 
> 
> EA, EA2 and EA3 are generated from TA (based on conditions)
> EB generated from TB (based on conditions)
> EC generated from TC (no conditions.)
> 
> Say there are events EA,EB,EC generated from the tables TA, TB, TC 
> 
> Event Sink Mapping
> 
> EA has following sinks. kafka topic SA,SA2,SAC. 
> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
> EC has only rest endpoint RC. 
> 
> The point is the sink are not predefined. [. But i only see the example 
> online where , flink code having explicit myStream.addSink(sink2);   ]
> 
> We expect around 500 types of events in our platform in another 2 years time. 
> 
> We are looking at writing a generic job for the same , rather than writing 
> one for new case.
> 
> Let me know your thoughts and flink suitability to this requirement.
> 
> Thanks
> Prasanna.
> 
> 
> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski  > wrote:
> Hi,
> 
> You could easily filter/map/process the streams differently before writing 
> them to the sinks. Building on top of my previous example, this also should 
> work fine:
> 
> 
> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
> any ;
> 
> myStream.baz().addSink(sink1);
> myStream.addSink(sink2);
> myStream.qux().quuz().corge().addSink(sink3);
>  
> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you 
> wish. `foo` and `bar` would be applied once to the stream, before it’s going 
> to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would 
> be applied to only of the sinks AFTER splitting.
> 
> In your case, it could be:
> 
> myStream.filter(...).addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
> 
> So sink2 and sink3 would get all of the records, while sink1 only a portion 
> of them.
> 
> Piotrek 
> 
> 
>> On 26 May 2020, at 06:45, Prasanna kumar > > wrote:
>> 
>> Piotr, 
>> 
>> Thanks for the reply. 
>> 
>> There is one other case, where some events have to be written to multiple 
>> sinks and while other have to be written to just one sink. 
>> 
>> How could i have a common codeflow/DAG for the same ?
>> 
>> I do not want multiple jobs to do the same want to accomplish in a single 
>> job .
>> 
>> Could i add Stream code "myStream.addSink(sink1)" under a conditional 
>> operator such as 'if' to determine . 
>> 
>> But i suppose here the stream works differently compared to normal code 
>> processing.
>> 
>> Prasanna.
>> 
>> 
>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, > > wrote:
>> Hi,
>> 
>> To the best of my knowledge the following pattern should work just fine:
>> 
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
>> any ;
>> myStream.addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.addSink(sink3);
>> 
>> All of the records from `myStream` would be passed to each of the sinks.
>> 
>> Piotrek
>> 
>> > On 24 May 2020, at 19:34, Prasanna kumar > > > wrote:
>> > 
>> > Hi,
>> > 
>> > There is a single source of events for me in my system. 
>> > 
>> > I need to process and send the events to multiple destination/sink at the 
>> > same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>> > 
>> > I am able send to one sink.
>> > 
>> > By adding more sink stream to the source stream could we achieve it . Are 
>> > there any shortcomings.  
>> > 
>> > Please let me know if any one here has successfully implemented one .
>> > 
>> > Thanks,
>> > Prasanna.
>> 
> 



Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Prasanna kumar
Piotr,

There is an event and subscriber registry as JSON file which has the table
event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and
create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i
mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks.

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write
to the respective sinks.

{
  " tablename ": "source.table1",
  "events": [
{
  "operation": "update",
  "eventstobecreated": [
{
  "eventname": "USERUPDATE",
  "Columnoperation": "and",
  "ColumnChanges": [
{
  "columnname": "name"
},
{
  "columnname": "loginenabled",
  "value": "Y"
}
  ],
  "Subscribers": [
{
  "customername": "c1",
  "method": "Kafka",
  "methodparams": {
"topicname": "USERTOPIC"
  }
},
{
  "customername": "c2",
  "method": "S3",
  "methodparams": {
"folder": "aws://folderC2"
  }}, ]}]
},
{
  "operation": "insert",
  "eventstobecreated": [
  "eventname": "USERINSERT",
  "operation": "insert",
  "Subscribers": [
{
  "teamname": "General",
  "method": "Kafka",
  "methodparams": {
"topicname": "new_users"
  }
},
{
  "teamname": "General",
  "method": "kinesis",
  "methodparams": {
"URL": "new_users",
"username": "uname",
"password":  "pwd"
  }}, ]}]
},
{
  "operation": "delete",
  "eventstobecreated": [
{
  "eventname": "USERDELETE",
  "Subscribers": [
{
  "customername": "c1",
  "method": "Kafka",
  "methodparams": {
"topicname": "USERTOPIC"
  }
},
{
  "customername": "c4",
  "method": "Kafka",
  "methodparams": {
"topicname": "deleterecords"
 }}, ]}]
 },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure if I fully understand what do you mean by
>
> > The point is the sink are not predefined.
>
> You must know before submitting the job, what sinks are going to be used
> in the job. You can have some custom logic, that would filter out records
> before writing them to the sinks, as I proposed before, or you could use
> side outputs [1] would be better suited to your use case?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On 26 May 2020, at 12:20, Prasanna kumar 
> wrote:
>
> Thanks Piotr for the Reply.
>
> I will explain my requirement in detail.
>
> Table Updates -> Generate Business Events -> Subscribers
>
> *Source Side*
> There are CDC of 100 tables which the framework needs to listen to.
>
> *Event Table Mapping*
>
> There would be Event associated with table in a *m:n* fashion.
>
> say there are tables TA, TB, TC.
>
> EA, EA2 and EA3 are generated from TA (based on conditions)
> EB generated from TB (based on conditions)
> EC generated from TC (no conditions.)
>
> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>
> *Event Sink Mapping*
>
> EA has following sinks. kafka topic SA,SA2,SAC.
> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
> EC has only rest endpoint RC.
>
> The point is the sink are not predefined. [. But i only see the example
> online where , flink code having explicit myStream.addSink(sink2);   ]
>
> We expect around 500 types of events in our platform in another 2 years
> time.
>
> We are looking at writing a generic job for the same , rather than writing
> one for new case.
>
> Let me know your thoughts and flink suitability to this requirement.
>
> Thanks
> Prasanna.
>
>
> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> You could easily filter/map/process the streams differently before
>> writing them to the sinks. Building on top of my previous example, this
>> also should work fine:
>>
>>
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
>> but any ;
>>
>> myStream.baz().addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.qux().quuz().corge().addSink(sink3);
>>
>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that
>> you wish. `foo` and `bar` would be app

Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Joey Pereira
Following up: I've put together the implementation,
https://github.com/apache/flink/pull/12345. It's passing tests but is
only partially complete, as it still needs some clean-up and configuration.
I still need to try running this against a production cluster to check the
performance, as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira  wrote:

> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
> with that in mind.
>
> We have already begun using checkpoints for recovery. Having these
> improvements would still be immensely helpful to reduce downtime for
> savepoint recovery.
>
> On Mon, May 18, 2020 at 3:14 PM Yun Tang  wrote:
>
>> Hi Joey
>>
>> Previously, I also looked at the mechanism to create on-disk SSTables as
>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>> found the main challenge is how to ensure the keys are inserted in a
>> strictly increasing order. The key order in java could differ from the
>> bytes order in RocksDB. In your case, I think it could be much easier as
>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>> should be in a strictly increasing order [1].
>>
>> FLINK-17288  could
>> mitigate the performance and your solution could help improve the
>> performance much better (and could integrate with state-processor-api
>> story).
>>
>> On the other hand, for out-of-box to use in production for your scenario,
>> how about using checkpoint to recover, as it also supports rescale and
>> normal recover.
>>
>> [1]
>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>
>>
>> Best
>> Yun Tang
>> --
>> *From:* Joey Pereira 
>> *Sent:* Tuesday, May 19, 2020 2:27
>> *To:* user@flink.apache.org 
>> *Cc:* Mike Mintz ; Shahid Chohan ;
>> Aaron Levin 
>> *Subject:* RocksDB savepoint recovery performance improvements
>>
>> Hey,
>>
>> While running a Flink application with a large-state, savepoint recovery
>> has been a painful part of operating the application because recovery time
>> can be several hours. During some profiling that chohan (cc'd) had done, a
>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>> Put operations.
>>
>> When Flink is bootstrapping state for RocksDB instances this is not what
>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>> format (SSTables):
>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>> This
>> was also recently reported on Jira:
>> https://issues.apache.org/jira/browse/FLINK-17288.
>>
>> From what I understood of the current implementation:
>>
>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>> RocksDBIncrementalRestoreOperation,
>> use RocksDBWriteBatchWrapper.
>>
>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>> will provide atomicity of batches as well as performance benefits for
>> batching, compared to individual Puts, but it will still involve RocksDB’s
>> insert paths which can involve expensive operations[0].
>>
>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>> files, writes can be batched even further and avoid expensive operations in
>> RocksDB. This is commonly utilized by other systems for restoration or
>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>> are some restrictions on being able to generate SSTables, as well as
>> limitations for ingestion to be performant. Unfortunately, it’s all not
>> very well documented:
>>
>> 1. When generating an SSTable, keys need to be inserted in-order.
>>
>> 2. Ingested files should not have key-ranges that overlap with either
>> existing or other ingested files[4]. It is possible to ingest overlapping
>> SSTables, but this may incur significant overhead.
>>
>> To generate SSTables with non-overlapping key-ranges and to create them
>> with keys in-order, it would mean that the savepoints would need to be
>> ordered while processing them. I'm unsure if this is the case for how
>> Flink's savepoints are stored.
>>
>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is
>> used (eg: for incremental checkpoint or something else). I did notice it
>> is iterating over a temporary RocksDB instance and inserting into a "final
>> ” instance. These writes could be optimized in a similar manner.
>> Alternatively, it could be possible to use the temporary instance's
>> SSTables, ingest them, and prune data out with RocksDB's DeleteRange.
>>
>> To get started with prototyping, I was thinking of taking a simple
>> approach of making an interface for RocksDBWriteBatchWrapper and swapping
>> the implementation for one that does SSTable genera

Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-26 Thread Aizhamal Nurmamat kyzy
Hi all,

Please join our webinar this Wednesday at 10am PST/5:00pm GMT/1:00pm EST
where Max Michels - PMC member for Apache Beam and Apache Flink, will
deliver a talk about leveraging Apache Beam for large-scale stream and
batch analytics with Apache Flink.

You can register via this link:
https://learn.xnextcon.com/event/eventdetails/W20052710

Here is the short description of the talk:
---
Apache Beam is a framework for writing stream and batch processing
pipelines using multiple languages such as Java, Python, SQL, or Go. Apache
Beam does not come with an execution engine of its own. Instead, it defers
the execution to its Runners which translate Beam pipelines for any
supported execution engine. Thus, users have complete control over the
language and the execution engine they use, without having to rewrite their
code.
In this talk, we will look at running Apache Beam pipelines with Apache
Flink. We will explain the concepts behind Apache Beams portability
framework for multi-language support, and then show how to get started
running Java, Python, and SQL pipelines.

Links to the slides and recordings of this and previous webinars you can
find here: https://github.com/aijamalnk/beam-learning-month

Hope y'all are safe,
Aizhamal


Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Steven Wu
Yun, you mentioned that checkpoint also supports rescale. I thought the
recommendation [1] is to use savepoint for rescale.

[1]
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira  wrote:

> Following up: I've put together the implementation,
> https://github.com/apache/flink/pull/12345. It's passing tests but is
> only partially complete, as it still needs some clean-up and configuration.
> I still need to try running this against a production cluster to check the
> performance, as well as getting some RocksDB benchmarks.
>
> On Mon, May 18, 2020 at 3:46 PM Joey Pereira  wrote:
>
>> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
>> with that in mind.
>>
>> We have already begun using checkpoints for recovery. Having these
>> improvements would still be immensely helpful to reduce downtime for
>> savepoint recovery.
>>
>> On Mon, May 18, 2020 at 3:14 PM Yun Tang  wrote:
>>
>>> Hi Joey
>>>
>>> Previously, I also looked at the mechanism to create on-disk SSTables as
>>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>>> found the main challenge is how to ensure the keys are inserted in a
>>> strictly increasing order. The key order in java could differ from the
>>> bytes order in RocksDB. In your case, I think it could be much easier as
>>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>>> should be in a strictly increasing order [1].
>>>
>>> FLINK-17288  could
>>> mitigate the performance and your solution could help improve the
>>> performance much better (and could integrate with state-processor-api
>>> story).
>>>
>>> On the other hand, for out-of-box to use in production for your
>>> scenario, how about using checkpoint to recover, as it also supports
>>> rescale and normal recover.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>>
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Joey Pereira 
>>> *Sent:* Tuesday, May 19, 2020 2:27
>>> *To:* user@flink.apache.org 
>>> *Cc:* Mike Mintz ; Shahid Chohan <
>>> cho...@stripe.com>; Aaron Levin 
>>> *Subject:* RocksDB savepoint recovery performance improvements
>>>
>>> Hey,
>>>
>>> While running a Flink application with a large-state, savepoint recovery
>>> has been a painful part of operating the application because recovery time
>>> can be several hours. During some profiling that chohan (cc'd) had done, a
>>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>>> Put operations.
>>>
>>> When Flink is bootstrapping state for RocksDB instances this is not what
>>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>>> format (SSTables):
>>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>>> This
>>> was also recently reported on Jira:
>>> https://issues.apache.org/jira/browse/FLINK-17288.
>>>
>>> From what I understood of the current implementation:
>>>
>>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>>> RocksDBIncrementalRestoreOperation,
>>> use RocksDBWriteBatchWrapper.
>>>
>>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>>> will provide atomicity of batches as well as performance benefits for
>>> batching, compared to individual Puts, but it will still involve RocksDB’s
>>> insert paths which can involve expensive operations[0].
>>>
>>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>>> files, writes can be batched even further and avoid expensive operations in
>>> RocksDB. This is commonly utilized by other systems for restoration or
>>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>>> are some restrictions on being able to generate SSTables, as well as
>>> limitations for ingestion to be performant. Unfortunately, it’s all not
>>> very well documented:
>>>
>>> 1. When generating an SSTable, keys need to be inserted in-order.
>>>
>>> 2. Ingested files should not have key-ranges that overlap with either
>>> existing or other ingested files[4]. It is possible to ingest overlapping
>>> SSTables, but this may incur significant overhead.
>>>
>>> To generate SSTables with non-overlapping key-ranges and to create them
>>> with keys in-order, it would mean that the savepoints would need to be
>>> ordered while processing them. I'm unsure if this is the case for how
>>> Flink's savepoints are stored.
>>>
>>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it
>>> is used (eg: for incremental checkpoint or something else). I did
>>> notice it is iterating over a temporary RocksDB instance and inserting into
>>> a "final” instance. These

Tumbling windows - increasing checkpoint size over time

2020-05-26 Thread Wissman, Matt
Hello Flink Community,

I’m running a Flink pipeline that uses a tumbling window and incremental 
checkpoint with RocksDB backed by s3. The number of objects in the window is 
stable but overtime the checkpoint size grows seemingly unbounded. Within the 
first few hours after bringing the Flink pipeline up, the checkpoint size is 
around 100K but after a week of operation it grows to around 100MB. The 
pipeline isn’t using any other Flink state besides the state that the window 
uses. I think this has something to do with RocksDB’s compaction but shouldn’t 
the tumbling window state expire and be purged from the checkpoint?

Flink Version 1.7.1

Thanks!

-Matt


Re: Tumbling windows - increasing checkpoint size over time

2020-05-26 Thread Guowei Ma
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt  于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental 
> checkpoint with RocksDB backed by s3. The number of objects in the window is 
> stable but overtime the checkpoint size grows seemingly unbounded. Within the 
> first few hours after bringing the Flink pipeline up, the checkpoint size is 
> around 100K but after a week of operation it grows to around 100MB. The 
> pipeline isn’t using any other Flink state besides the state that the window 
> uses. I think this has something to do with RocksDB’s compaction but 
> shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt


Re: ClusterClientFactory selection

2020-05-26 Thread Yang Wang
Hi M Singh,

The Flink CLI picks up the correct ClusterClientFactory via java SPI. You
could check YarnClusterClientFactory#isCompatibleWith for how it is
activated.
The cli option / configuration is "-e/--executor" or execution.target (e.g.
yarn-per-job)*.*


Best,
Yang

M Singh  于2020年5月26日周二 下午6:45写道:

> Hi:
>
> I wanted to find out which parameter/configuration allows flink cli pick
> up the appropriate cluster client factory (especially in the yarn mode).
>
> Thanks
>


Re: Flink Dashboard UI Tasks hard limit

2020-05-26 Thread Xintong Song
Could you also explain how do you set the parallelism when getting this
execution plan?
I'm asking because this json file itself only shows the resulted execution
plan. It is not clear to me what is not working as expected in your case.
E.g., you set the parallelism for an operator to 10 but the execution plan
only shows 5.

Thank you~

Xintong Song



On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan 
wrote:

> Hi Xintong,
> Thanks for the excellent clarification for tasks.
>
> I attached a sample screenshot above and din't reflect the slots used and
> the tasks limit I was running into in that pic.
>
> I am attaching my Execution plan here. Please let me know how I can
> increase the nmber of tasks aka parallelism. As  increase the parallelism,
> i run into this bottleneck with the tasks.
>
> BTW - The https://flink.apache.org/visualizer/ is a great start to see
> this.
> TIA,
>
> On Sun, May 24, 2020 at 7:52 PM Xintong Song 
> wrote:
>
>> Increasing network memory buffers (fraction, min, max) seems to increase
>>> tasks slightly.
>>
>> That's wired. I don't think the number of network memory buffers have
>> anything to do with the task amount.
>>
>> Let me try to clarify a few things.
>>
>> Please be aware that, how many tasks a Flink job has, and how many slots
>> a Flink cluster has, are two different things.
>> - The number of tasks are decided by your job's parallelism and topology.
>> E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3,
>> 4 respectively. Then you would have totally 9 (2+3+4) tasks.
>> - The number of slots are decided by number of TMs and slots-per-TM.
>> - For streaming jobs, you have to make sure the number of slots is enough
>> for executing all your tasks. The number of slots needed for executing your
>> job is by default the max parallelism of your job graph vertices. Take the
>> above example, you would need 4 slots, because it's the max among all the
>> vertices' parallelisms (2, 3, 4).
>>
>> In your case, the screenshot shows that you job has 9621 tasks in total
>> (not around 18000, the dark box shows total tasks while the green box shows
>> running tasks), and 600 slots are in use (658 - 58) suggesting that the max
>> parallelism of your job graph vertices is 600.
>>
>> If you want to increase the number of tasks, you should increase your job
>> parallelism. There are several ways to do that.
>>
>>- In your job codes (assuming you are using DataStream API)
>>   - Use `StreamExecutionEnvironment#setParallelism()` to set
>>   parallelism for all operators.
>>   - Use `SingleOutputStreamOperator#setParallelism()` to set
>>   parallelism for a specific operator. (Only supported for subclasses of
>>   `SingleOutputStreamOperator`.)
>>- When submitting your job, use `-p ` as an argument for
>>the `flink run` command, to set parallelism for all operators.
>>- Set `parallelism.default` in your `flink-conf.yaml`, to set a
>>default parallelism for your jobs. This will be used for jobs that have 
>> not
>>set parallelism with neither of the above methods.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi Xintong,
>>> Thx for your reply.  Increasing network memory buffers (fraction, min,
>>> max) seems to increase tasks slightly.
>>>
>>> Streaming job
>>> Standalone
>>>
>>> Vijay
>>>
>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song 
>>> wrote:
>>>
 Hi Vijay,

 I don't think your problem is related to number of opening files. The
 parallelism of your job is decided before actually tries to open the files.
 And if the OS limit for opening files is reached, you should see a job
 execution failure, instead of a success execution with a lower parallelism.

 Could you share some more information about your use case?

- What kind of job are your executing? Is it a streaming or batch
processing job?
- Which Flink deployment do you use? Standalone? Yarn?
- It would be helpful if you can share the Flink logs.


 Thank you~

 Xintong Song



 On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan 
 wrote:

> Hi,
> I have increased the number of slots available but the Job is not
> using all the slots but runs into this approximate 18000 Tasks limit.
> Looking into the source code, it seems to be opening file -
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
> So, do I have to tune the ulimit or something similar at the Ubuntu
> O/S level to increase number of tasks available ? What I am confused about
> is the ulimit is per machine but the ExecutionGraph is across many 
> machines
> ? Please pardon my ignorance here. Does number of tasks equate to number 
> of
> open files. I am using 15 slots per TaskManager on AWS m5.4x

Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Yun Tang
@Joey Pereira I think you might need to create a new 
JIRA ticket and link your PR to the new issue as 
FLINK-17288 mainly focus on 
bulk load options while your solution focus on SST generator, if your solution 
could behave better, we could tag 
FLINK-17288 as "won't do".

@Steven Wu sure, Flink community always suggest to 
use savepoint to restore but current checkpoint also support it. I mention that 
is for quick fix at his scenario.

Best
Yun Tang

From: Steven Wu 
Sent: Wednesday, May 27, 2020 0:36
To: Joey Pereira 
Cc: user@flink.apache.org ; Yun Tang ; 
Mike Mintz ; Shahid Chohan ; Aaron 
Levin 
Subject: Re: RocksDB savepoint recovery performance improvements

Yun, you mentioned that checkpoint also supports rescale. I thought the 
recommendation [1] is to use savepoint for rescale.

[1] 
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira 
mailto:j...@stripe.com>> wrote:
Following up: I've put together the implementation, 
https://github.com/apache/flink/pull/12345. It's passing tests but is only 
partially complete, as it still needs some clean-up and configuration. I still 
need to try running this against a production cluster to check the performance, 
as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira 
mailto:j...@stripe.com>> wrote:
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with 
that in mind.

We have already begun using checkpoints for recovery. Having these improvements 
would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I 
planed to use RocksDB's benchmark to mock scenario in Flink. However, I found 
the main challenge is how to ensure the keys are inserted in a strictly 
increasing order. The key order in java could differ from the bytes order in 
RocksDB. In your case, I think it could be much easier as 
RocksFullSnapshotStrategy write data per columnfamily per key group which 
should be in a strictly increasing order [1].

FLINK-17288 could mitigate 
the performance and your solution could help improve the performance much 
better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how 
about using checkpoint to recover, as it also supports rescale and normal 
recover.

[1] 
https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308


Best
Yun Tang

From: Joey Pereira mailto:j...@stripe.com>>
Sent: Tuesday, May 19, 2020 2:27
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Cc: Mike Mintz mailto:mikemi...@stripe.com>>; Shahid 
Chohan mailto:cho...@stripe.com>>; Aaron Levin 
mailto:aaronle...@stripe.com>>
Subject: RocksDB savepoint recovery performance improvements

Hey,

While running a Flink application with a large-state, savepoint recovery has 
been a painful part of operating the application because recovery time can be 
several hours. During some profiling that chohan (cc'd) had done, a red flag 
stood out — savepoint recovery consisted mostly of RocksDB Get and Put 
operations.

When Flink is bootstrapping state for RocksDB instances this is not what I 
would have expected, as RocksDB supports direct ingestion of the on-disk format 
(SSTables): 
https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This 
was also recently reported on Jira: 
https://issues.apache.org/jira/browse/FLINK-17288.

>From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and 
RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

* RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will 
provide atomicity of batches as well as performance benefits for batching, 
compared to individual Puts, but it will still involve RocksDB’s insert paths 
which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, 
writes can be batched even further and avoid expensive operations in RocksDB. 
This is commonly utilized by other systems for restoration or import processes, 
such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some 
restrictions on being able to generate SSTables, as well as limitations for 
ingestion to be performant. Unfortunately, it’s all not very

Re: In consistent Check point API response

2020-05-26 Thread Yun Tang
To be honest, from my point of view current description should have already 
give enough explanations [1] in "Overview Tab".
Latest Completed Checkpoint: The latest successfully completed checkpoints.
Latest Restore: There are two types of restore operations.

  *   Restore from Checkpoint: We restored from a regular periodic checkpoint.
  *   Restore from Savepoint: We restored from a savepoint.

You could still create a JIRA issue and give your ideas in that issue. If 
agreed to work on in that ticket, you can create a PR to edit 
checkpoint_monitoring.md [2] and checkpoint_monitoring.zh.md [3] to update 
related documentation.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/checkpoint_monitoring.html#overview-tab
[2] 
https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.md
[3] 
https://github.com/apache/flink/blob/master/docs/monitoring/checkpoint_monitoring.zh.md

Best
Yun Tang

From: Vijay Bhaskar 
Sent: Tuesday, May 26, 2020 15:18
To: Yun Tang 
Cc: user 
Subject: Re: In consistent Check point API response

Thanks Yun. How can i contribute better documentation of the same by opening 
Jira on this?

Regards
Bhaskar

On Tue, May 26, 2020 at 12:32 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Bhaskar

I think I have understood your scenario now. And I think this is what expected 
in Flink.
As you only allow your job could restore 5 times, the "restore" would only 
record the checkpoint to restore at the 5th recovery, and the checkpoint id 
would always stay there.

"Restored" is for last restored checkpoint and "completed" is for last 
completed checkpoint, they are actually not the same thing.
The only scenario that they're the same in numbers is when Flink just restore 
successfully before a new checkpoint completes.

Best
Yun Tang



From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Tuesday, May 26, 2020 12:19
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: In consistent Check point API response

Hi Yun
Understood the issue now:
"restored" always shows only the check point that is used for restoring 
previous state
In all the attempts < 6 ( in my case max attempts are 5, 6 is the last attempt)
  Flink HA is  restoring the state, so restored and latest are same value
if the last attempt  == 6
 Flink job already has few check points
 After that job failed and Flink HA gave up and marked the job state as "FAILED"
   At this point "restored". value is the one which is in 5'th attempt but 
latest is the one which is the latest checkpoint which is retained

Shall i file any documentation improvement Jira? I want to add more 
documentation with the help of  the above scenarios.

Regards
Bhaskar



On Tue, May 26, 2020 at 8:14 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Bhaskar

It seems I still not understand your case-5 totally. Your job failed 6 times, 
and recover from previous checkpoint to restart again. However, you found the 
REST API told the wrong answer.
How do you ensure your "restored" field is giving the wrong checkpoint file 
which is not latest? Have you ever checked the log in JM to view related 
contents: "Restoring job xxx from latest valid checkpoint: x@" [1] to know 
exactly which checkpoint choose to restore?

I think you could give a more concrete example e.g. which expected/actual 
checkpoint to restore, to tell your story.

[1] 
https://github.com/apache/flink/blob/8f992e8e868b846cf7fe8de23923358fc6b50721/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1250

Best
Yun Tang

From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Monday, May 25, 2020 17:01
To: Yun Tang mailto:myas...@live.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: In consistent Check point API response

Thanks Yun.
Here is the problem i am facing:

I am using  jobs/:jobID/checkpoints  API to recover the failed job. We have the 
remote manager which monitors the jobs.  We are using "restored" field of the 
API response to get the latest check point file to use. Its giving correct 
checkpoint file for all the 4 cases except the 5'th case. Where the "restored" 
field is giving the wrong check point file which is not latest.  When we 
compare the  check point file returned by  the "completed". field, both are 
giving identical checkpoints in all 4 cases, except 5'th case
We can't use flink UI in because of security reasons

Regards
Bhaskar

On Mon, May 25, 2020 at 12:57 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vijay

If I understand correct, do you mean your last "restored" checkpoint is null 
via REST api when the job failed 6 times and then recover successfully with 
another several successful checkpoints?

First of all, if your job just recovered successfully, can you observe the 
"last restored" checkpoint in web UI?
Secondl