Re: Official flink java client

2021-04-23 Thread Flavio Pompermaier
I also interface to Flink clusters using REST in order to avoid many
annoying problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if
you want to.
It is available at [1] and it add some missing methods to the default Flink
version (I also had to copy that class and modify the visibility of some
field in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use
only but it actually work very well.

Best,
Flavio

[1]
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java

On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

> Hi gaurav,
>
> Logicall Flink client is bear inside the StreamExecutionEnvironment, and
> users could use the
> StreamExecutionEnvironment to execute their jobs. Could you share more
> about why you
> want to directly use the client?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*gaurav kulkarni 
> *Send Date:*Fri Apr 23 10:14:08 2021
> *Recipients:*User 
> *Subject:*Official flink java client
>
>> Hi,
>>
>> Is there any official flink client in java that's available? I came
>> across RestClusterClient
>> ,
>> but I am not sure if its official
>> .
>> I can create my own client, but just wanted to check if there is anything
>> official available already that I can leverage.
>>
>> Thanks,
>> Gaurav
>>
>> Run already deployed job on Flink Cluster using RestClusterClient
>>
>> I am trying to run already deployed job on Flink Cluster using Rest
>> request.I had success using a simple rest ...
>>
>> 
>>
>>
>>


Re: MemoryStateBackend Issue

2021-04-23 Thread Matthias Pohl
One additional question: How did you stop and restart the job? The behavior
you're expecting should work with stop-with-savepoint. Cancelling the job
and then just restarting it wouldn't work. The latter approach would lead
to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl 
wrote:

> Hi Milind,
> I bet someone else might have a faster answer. But could you provide the
> logs and config to get a better understanding of what your issue is?
> In general, the state is maintained even in cases where a TaskManager
> fails.
>
> Best,
> Matthias
>
> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:
>
>> Hi
>>
>> I see MemoryStateBackend being used in TM Log
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
>> has been configured, using default (Memory / JobManager)
>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>> maxStateSize: 5242880)
>>
>>
>>
>> I am logging checkpointed value which is just message count
>>
>> Snapshot the state 500
>> Snapshot the state 1000
>>
>>
>> When I restart the job i.e. new TM but the job manager is same I see
>>
>> Snapshot the state 500
>>
>> In the JM logs I see following entries
>>
>> Triggering checkpoint 1
>> Triggering checkpoint 2
>>
>> After restarting job hence new TM
>>
>> Triggering checkpoint 1
>>
>> As per my understanding JM should hold the checkpointed
>> 
>> state across TM ? Am I correct?
>>
>> I have not configured anything special and using default. Do I need to
>> add any setting to make it work ?
>> I want to maintain message count across the TMs.
>>
>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica 

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: Debezium CDC | OOM

2021-04-23 Thread Matthias Pohl
Got it. Thanks for clarifying.

On Fri, Apr 23, 2021 at 6:36 AM Ayush Chauhan 
wrote:

> Hi Matthias,
>
> I am using RocksDB as a state backend. I think the iceberg sink is not
> able to propagate back pressure to the source which is resulting in OOM for
> my CDC pipeline.
> Please refer to this - https://github.com/apache/iceberg/issues/2504
>
>
>
> On Thu, Apr 22, 2021 at 8:44 PM Matthias Pohl 
> wrote:
>
>> Hi Ayush,
>> Which state backend have you configured [1]? Have you considered trying
>> out RocksDB [2]? RocksDB might help with persisting at least keyed state.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>>
>> On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
>> wrote:
>>
>>> Hi,
>>> I am using flink cdc to stream CDC changes in an iceberg table. When I
>>> first run the flink job for a topic which has all the data for a table, it
>>> get out of heap memory as flink try to load all the data during my 15mins
>>> checkpointing interval. Right now, only solution I have is to pass *-ytm
>>> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after
>>> flink has consumed all the data. Is there a way to tell flink cdc code to
>>> trigger checkpoint or throttle the consumption speed(I think backpressure
>>> should have handled this)?
>>>
>>> --
>>>  Ayush Chauhan
>>>  Software Engineer | Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>
>
> --
>  Ayush Chauhan
>  Software Engineer | Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
Hi Falvio,

Very thanks for the explanation, may be another option is to have a look at 
the http rest API[1] ? Flink provides official http api to submit jar jobs and 
query 
job status, and they might be able to help.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client

I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if you 
want to.
It is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use 
only but it actually work very well. 

Best,
Flavio

[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Re: Official flink java client

2021-04-23 Thread Flavio Pompermaier
Obviously I could rewrite a java client from scratch that interface with
the provided REST API but why if I can reuse something already existing?
Usually I interface with REST API using auto generated clients (if APIs are
exposed via Swagger or OpenApi).
If that's not an option, writing a REST client from scratch is something I
try to avoid as much as I can..

Best,
Flavio

On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

> Hi Falvio,
>
> Very thanks for the explanation, may be another option is to have a look
> at
> the http rest API[1] ? Flink provides official http api to submit jar jobs
> and query
> job status, and they might be able to help.
>
> Best,
> Yun
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
>
> --Original Mail --
> *Sender:*Flavio Pompermaier 
> *Send Date:*Fri Apr 23 15:25:55 2021
> *Recipients:*Yun Gao 
> *CC:*gaurav kulkarni , User <
> user@flink.apache.org>
> *Subject:*Re: Official flink java client
>
>> I also interface to Flink clusters using REST in order to avoid many
>> annoying problems (due to dependency conflicts, classpath or env variables).
>> I use an extended version of the RestClusterClient that you can reuse if
>> you want to.
>> It is available at [1] and it add some missing methods to the default
>> Flink version (I also had to copy that class and modify the visibility of
>> some field in order to enable the extension).
>> Officially the Flink RestClusterClient is meant to be used for internal
>> use only but it actually work very well.
>>
>> Best,
>> Flavio
>>
>> [1]
>> https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
>>
>> On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:
>>
>>> Hi gaurav,
>>>
>>> Logicall Flink client is bear inside the StreamExecutionEnvironment, and
>>> users could use the
>>> StreamExecutionEnvironment to execute their jobs. Could you share more
>>> about why you
>>> want to directly use the client?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*gaurav kulkarni 
>>> *Send Date:*Fri Apr 23 10:14:08 2021
>>> *Recipients:*User 
>>> *Subject:*Official flink java client
>>>
 Hi,

 Is there any official flink client in java that's available? I came
 across RestClusterClient
 ,
 but I am not sure if its official
 .
 I can create my own client, but just wanted to check if there is anything
 official available already that I can leverage.

 Thanks,
 Gaurav

 Run already deployed job on Flink Cluster using RestClusterClient

 I am trying to run already deployed job on Flink Cluster using Rest
 request.I had success using a simple rest ...

 





Re: event-time window cannot become earlier than the current watermark by merging

2021-04-23 Thread Matthias Pohl
After having talked to David about this issue offline, I decided to create
a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
mailing list, Vishal. Hopefully, the community has the chance to look into
it.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22425

On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl 
wrote:

> To me, it sounds strange. I would have expected it to work with
> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
> look at it. Maybe, he has some more insights. I haven't worked that much
> with lateness, yet.
>
> Matthias
>
> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in *now* pulled from the simulated sideout , essentially the
>> Process Function with a reverse predicate to the Filter Process Function.
>>
>>
>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> And when I added the filter the Exception was not thrown. So the
>>> sequence of events
>>>
>>> * Increased lateness from 12 ( that was what it was initially running
>>> with )  to 24 hours
>>> * the pipe ran as desired before it blew up with the Exception
>>> * masked the issue by increasing the lateness to 48 hours.
>>> * It blew up again but now after the added lateness, so essentially the
>>> same issue but added lateness let the pipe run for another few hours.
>>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>>
>>> public class LateEventFilter extends ProcessFunction>> VALUE>, KeyedTimedValue> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventFilter(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue value, Context
>>> ctx,
>>> Collector> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>> public class LateEventSideOutput extends ProcessFunction>> , KeyedTimedValue> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventSideOutput(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue value, Context
>>> ctx,
>>> Collector> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>>
>>>  I am using RocksDB as a backend if that helps.
>>>
>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Yes sir. The allowedLateNess and side output always existed.

 On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
 wrote:

> You're saying that you used `allowedLateness`/`sideOutputLateData` as
> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
> being added to your pipeline when running into the
> UnsupportedOperationException issue previously?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> As in this is essentially doing what lateness *should* have done
>> And I think that is a bug. My code now is . Please look at
>> the allowedLateness on the session window.
>>
>> SingleOutputStreamOperator>
>> filteredKeyedValue = keyedValue
>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>> "late_filter").uid("late_filter");
>> SingleOutputStreamOperator>
>> lateKeyedValue = keyedValue
>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>> name("late_data").uid("late_data");
>> SingleOutputStreamOperator>
>> aggregate = filteredKeyedValue
>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>> keyBy(value -> value.getKey())
>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>> lateOutputTag)
>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>> .aggregate(new SortAggregate(),
>> new SessionIdProcessWindowFunction(this.gapInMinutes,
>> thi

Approaches for external state for Flink

2021-04-23 Thread Oğuzhan Mangır
I'm trying to design a stream flow that checks *de-duplicate* events and
sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event
enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state
store. The details of my job;

I have an event payload with *uuid* Field. If the event that has the same
uuid will come, this event should be discarded. In my case, two kafka
topics are reading. The first topic has a lot of fields, but other topics
just have a *uuid* field, thus I have to enrich data using the same uuid
for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra
using the *uuid*. If a state exists, ignore this event and *do not* emit to
the Kafka. If state does not exist, save  this event to the Cassandra, then
emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra
using the *uuid*. If state exists, check a column that represents this
event came from topic2. If the value of this column is false, enrich the
event using state and update the Cassandra column as true. If true, ignore
this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
saveEventToCassandra(event)
Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state does not exist) None //ignore this event
  else {
if (state.flag == true) None // ignore this event
else {
   updateFlagAsTrueInCassandra(event)
   Some(event)
}
  }
}


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large
and I have to feed the state from batch flow firstly. Thus I can not use
the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions
for state read/write operations.


Re: Approaches for external state for Flink

2021-04-23 Thread Raghavendar T S
Hi Oğuzhan

Take a look at bloom filter. You might get better ideas.

Links:
https://en.wikipedia.org/wiki/Bloom_filter
https://stackoverflow.com/questions/4282375/what-is-the-advantage-to-using-bloom-filters
https://redislabs.com/modules/redis-bloom/

Thank you

On Fri, Apr 23, 2021 at 3:52 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> I'm trying to design a stream flow that checks *de-duplicate* events and
> sends them to the Kafka topic.
>
> Basically, flow looks like that;
>
> kafka (multiple topics) =>  flink (checking de-duplication and event
> enrichment) => kafka (single topic)
>
> For de-duplication, I'm thinking of using Cassandra as an external state
> store. The details of my job;
>
> I have an event payload with *uuid* Field. If the event that has the same
> uuid will come, this event should be discarded. In my case, two kafka
> topics are reading. The first topic has a lot of fields, but other topics
> just have a *uuid* field, thus I have to enrich data using the same uuid
> for the events coming from the second topic.
>
> Stream1: Messages reading from the first topic. Read state from Cassandra
> using the *uuid*. If a state exists, ignore this event and *do not* emit
> to the Kafka. If state does not exist, save  this event to the Cassandra,
> then emit this event to the Kafka.
>
> Stream2: Messages reading from the second topic. Read state from Cassandra
> using the *uuid*. If state exists, check a column that represents this
> event came from topic2. If the value of this column is false, enrich the
> event using state and update the Cassandra column as true. If true, ignore
> this event because this event is a duplicate.
>
> def checkDeDuplication(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state exist) None //ignore this event
>   else {
> saveEventToCassandra(event)
> Some(event)
>   }
> }
>
> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state does not exist) None //ignore this event
>   else {
> if (state.flag == true) None // ignore this event
> else {
>updateFlagAsTrueInCassandra(event)
>Some(event)
> }
>   }
> }
>
>
> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
> stream1.union(stream2).addSink(kafkaSink)
>
> 1- Is that a good approach?
>
> 2- Is Cassandra the right choice here? Note, the state size is very large
> and I have to feed the state from batch flow firstly. Thus I can not use
> the internal state like rocksdb.
>
> 3- Can i improve this logic?
>
> 4- May be any bottleneck in that flow? I think to use asyncMap functions
> for state read/write operations.
>


-- 
Raghavendar T S
www.teknosrc.com


Re: Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
Hi Flavio,

Got that, from my view I think RestClusterClient might not be viewed as public 
API, 
and might be change between version, thus it might need to be careful when 
upgrading.

Best,
Yun



 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 16:10:05 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Re: Official flink java client

Obviously I could rewrite a java client from scratch that interface with the 
provided REST API but why if I can reuse something already existing?
Usually I interface with REST API using auto generated clients (if APIs are 
exposed via Swagger or OpenApi).
If that's not an option, writing a REST client from scratch is something I try 
to avoid as much as I can..

Best,
Flavio
On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

Hi Falvio,

Very thanks for the explanation, may be another option is to have a look at 
the http rest API[1] ? Flink provides official http api to submit jar jobs and 
query 
job status, and they might be able to help.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client

I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if you 
want to.
It is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use 
only but it actually work very well. 

Best,
Flavio

[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: event-time window cannot become earlier than the current watermark by merging

2021-04-23 Thread Vishal Santoshi
Great, thanks for the update.  The upfront filter does work and has for the
last 24 hours and no reason why it should not.

Again I have to note that there is no mail group that has been this
reactive to issues, so thank you again.



On Fri, Apr 23, 2021 at 4:34 AM Matthias Pohl 
wrote:

> After having talked to David about this issue offline, I decided to create
> a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
> mailing list, Vishal. Hopefully, the community has the chance to look into
> it.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22425
>
> On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl 
> wrote:
>
>> To me, it sounds strange. I would have expected it to work with
>> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
>> look at it. Maybe, he has some more insights. I haven't worked that much
>> with lateness, yet.
>>
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in *now* pulled from the simulated sideout , essentially the
>>> Process Function with a reverse predicate to the Filter Process Function.
>>>
>>>
>>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 And when I added the filter the Exception was not thrown. So the
 sequence of events

 * Increased lateness from 12 ( that was what it was initially running
 with )  to 24 hours
 * the pipe ran as desired before it blew up with the Exception
 * masked the issue by increasing the lateness to 48 hours.
 * It blew up again but now after the added lateness, so essentially the
 same issue but added lateness let the pipe run for another few hours.
 * Added the Fliter upfront  as below, the pipe has no issues. Also
 metrics show that no data is being pushed through the sideoutput and that
 data in not pulled from the a simulated sideout ( below )


 public class LateEventFilter extends ProcessFunction>>> KEY, VALUE>, KeyedTimedValue> {
 private static final long serialVersionUID = 1L;

 long allowedLateness;
 public LateEventFilter(long allowedLateness){
 this.allowedLateness = allowedLateness;
 }
 @Override
 public void processElement(KeyedTimedValue value, Context
 ctx,
 Collector> out) throws Exception {
 if (ctx.timestamp() + allowedLateness > ctx.timerService().
 currentWatermark()) {
 out.collect(value);
 }
 }
 }


 public class LateEventSideOutput extends ProcessFunction<
 KeyedTimedValue, KeyedTimedValue> {
 private static final long serialVersionUID = 1L;

 long allowedLateness;
 public LateEventSideOutput(long allowedLateness){
 this.allowedLateness = allowedLateness;
 }
 @Override
 public void processElement(KeyedTimedValue value, Context
 ctx,
 Collector> out) throws Exception {
 if (ctx.timestamp() + allowedLateness <= ctx.timerService().
 currentWatermark()) {
 out.collect(value);
 }
 }
 }



  I am using RocksDB as a backend if that helps.

 On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Yes sir. The allowedLateNess and side output always existed.
>
> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
> wrote:
>
>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>> being added to your pipeline when running into the
>> UnsupportedOperationException issue previously?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> As in this is essentially doing what lateness *should* have done
>>> And I think that is a bug. My code now is . Please look at
>>> the allowedLateness on the session window.
>>>
>>> SingleOutputStreamOperator>
>>> filteredKeyedValue = keyedValue
>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>> "late_filter").uid("late_filter");
>>> SingleOutputStreamOperator>
>>> lateKeyedValue = keyedValue
>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>> name("late_data").uid("late_data");
>>> SingleOutputStreamOperator>
>>> aggregate = filteredKeyedValue
>>>

Re: Re: Re: Official flink java client

2021-04-23 Thread Flavio Pompermaier
Yes, that's a known risk. Indeed it would be awesome if the REST API would
be published also using some format that allow automatic client generation
(like swagger or openapi). Also release an official client could be an
option otherwise...I think that it's very annoying to write a client from
scratch.
I'll continue to use RestClusterClient until it works..

On Fri, Apr 23, 2021 at 2:48 PM Yun Gao  wrote:

> Hi Flavio,
>
> Got that, from my view I think RestClusterClient might not be viewed as
> public API,
> and might be change between version, thus it might need to be careful when
> upgrading.
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Flavio Pompermaier 
> *Send Date:*Fri Apr 23 16:10:05 2021
> *Recipients:*Yun Gao 
> *CC:*gaurav kulkarni , User <
> user@flink.apache.org>
> *Subject:*Re: Re: Official flink java client
>
>> Obviously I could rewrite a java client from scratch that interface with
>> the provided REST API but why if I can reuse something already existing?
>> Usually I interface with REST API using auto generated clients (if APIs
>> are exposed via Swagger or OpenApi).
>> If that's not an option, writing a REST client from scratch is something
>> I try to avoid as much as I can..
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:
>>
>>> Hi Falvio,
>>>
>>> Very thanks for the explanation, may be another option is to have a look
>>> at
>>> the http rest API[1] ? Flink provides official http api to submit jar
>>> jobs and query
>>> job status, and they might be able to help.
>>>
>>> Best,
>>> Yun
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
>>>
>>> --Original Mail --
>>> *Sender:*Flavio Pompermaier 
>>> *Send Date:*Fri Apr 23 15:25:55 2021
>>> *Recipients:*Yun Gao 
>>> *CC:*gaurav kulkarni , User <
>>> user@flink.apache.org>
>>> *Subject:*Re: Official flink java client
>>>
 I also interface to Flink clusters using REST in order to avoid many
 annoying problems (due to dependency conflicts, classpath or env 
 variables).
 I use an extended version of the RestClusterClient that you can reuse
 if you want to.
 It is available at [1] and it add some missing methods to the default
 Flink version (I also had to copy that class and modify the visibility of
 some field in order to enable the extension).
 Officially the Flink RestClusterClient is meant to be used for internal
 use only but it actually work very well.

 Best,
 Flavio

 [1]
 https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java

 On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

> Hi gaurav,
>
> Logicall Flink client is bear inside the StreamExecutionEnvironment,
> and users could use the
> StreamExecutionEnvironment to execute their jobs. Could you share more
> about why you
> want to directly use the client?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*gaurav kulkarni 
> *Send Date:*Fri Apr 23 10:14:08 2021
> *Recipients:*User 
> *Subject:*Official flink java client
>
>> Hi,
>>
>> Is there any official flink client in java that's available? I came
>> across RestClusterClient
>> ,
>> but I am not sure if its official
>> .
>> I can create my own client, but just wanted to check if there is anything
>> official available already that I can leverage.
>>
>> Thanks,
>> Gaurav
>>
>> Run already deployed job on Flink Cluster using RestClusterClient
>>
>> I am trying to run already deployed job on Flink Cluster using Rest
>> request.I had success using a simple rest ...
>>
>> 
>>
>>
>>


Re: Question about snapshot file

2021-04-23 Thread Abdullah bin Omar
Hi,

Thank you for your reply.

I want to read the previous snapshot (if needed) at the time of operation.
In [1], there is a portion:

DataSet listState  = savepoint.readListState<>(
"my-uid",
"list-state",
Types.INT);


here, will the function savepoint.readliststate<> () work to read the
previous snapshot?  If it is, then is the filename of a savepoint file
similar to my-uid?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Thank you




On Fri, Apr 23, 2021 at 1:11 AM Matthias Pohl 
wrote:

> What is it you're trying to achieve in general? The JavaDoc of
> MetadataV2V3SerializerBase provides a description on the format of the
> file. Theoretically, you could come up with custom code using the Flink
> sources to parse the content of the file. But maybe, there's another way to
> accomplish what you're trying to do.
>
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>
> On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
> abdullahbinoma...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a savepoint or checkpointed file from my task. However, the file
>> is binary. I want to see what the file contains.
>>
>> How is it possible to see what information the file has (or how it is
>> possible to make it human readable?)
>>
>> Thank you
>>
>> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Abdullah,
>>> the metadata file contains handles to the operator states of the
>>> checkpoint [1]. You might want to have a look into the State Processor API
>>> [2].
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
>>> abdullahbinoma...@gmail.com> wrote:
>>>
 Hi,

 (1) what 's the snapshot metadata file (binary) contains ? is it
 possible to read the snapshot metadata file by using Flink Deserialization?

 (2) is there any function that can be used to see the previous
 states on time of operation?

 Thank you

>>>
>


Re: Question about snapshot file

2021-04-23 Thread David Anderson
Abdullah,

ReadRidesAndFaresSnapshot [1] is an example that shows how to use the State
Processor API to display the contents of a snapshot taken while running
RidesAndFaresSolution [2].

Hopefully that will help you get started.

[1]
https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
[2]
https://github.com/ververica/flink-training/blob/master/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java

Best regards,
David

On Fri, Apr 23, 2021 at 3:32 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> Thank you for your reply.
>
> I want to read the previous snapshot (if needed) at the time of operation.
> In [1], there is a portion:
>
> DataSet listState  = savepoint.readListState<>(
> "my-uid",
> "list-state",
> Types.INT);
>
>
> here, will the function savepoint.readliststate<> () work to read the
> previous snapshot?  If it is, then is the filename of a savepoint file
> similar to my-uid?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Thank you
>
>
>
>
> On Fri, Apr 23, 2021 at 1:11 AM Matthias Pohl 
> wrote:
>
>> What is it you're trying to achieve in general? The JavaDoc of
>> MetadataV2V3SerializerBase provides a description on the format of the
>> file. Theoretically, you could come up with custom code using the Flink
>> sources to parse the content of the file. But maybe, there's another way to
>> accomplish what you're trying to do.
>>
>> Matthias
>>
>> [1]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>
>> On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a savepoint or checkpointed file from my task. However, the file
>>> is binary. I want to see what the file contains.
>>>
>>> How is it possible to see what information the file has (or how it is
>>> possible to make it human readable?)
>>>
>>> Thank you
>>>
>>> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
>>> wrote:
>>>
 Hi Abdullah,
 the metadata file contains handles to the operator states of the
 checkpoint [1]. You might want to have a look into the State Processor API
 [2].

 Best,
 Matthias

 [1]
 https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
 [2]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

 On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
 abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> (1) what 's the snapshot metadata file (binary) contains ? is it
> possible to read the snapshot metadata file by using Flink 
> Deserialization?
>
> (2) is there any function that can be used to see the previous
> states on time of operation?
>
> Thank you
>

>>


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-23 Thread Miguel Araújo
Thanks for your replies. I agree this is a somewhat general problem.
I posted it here as I was trying to register the valid subclasses in Kryo
but I couldn't get the message to go away, i.e., everything worked
correctly but there was the complaint that GenericType serialization was
being used.

This is how I was registering these types:

env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])

and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
(1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
fields were detected for class java.lang.Number so it cannot be used as a
POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I
already had as part of my input event.

Once again, thanks for your replies because they gave me the right
perspective.



Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
18:26:

> Hi Miguel,
>
> as Klemens said this is a rather general problem independent of Flink: How
> do you map Polymorphism in serialization?
>
> Flink doesn't have an answer on its own, as it's discouraged (A Number can
> have arbitrary many subclasses: how do you distinguish them except by
> classname? That adds a ton of overhead.). The easiest solution in your case
> is to convert ints into double.
> Or you use Kryo which dictionary encodes the classes and also limits the
> possible subclasses.
>
> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I guess this is more of a Java Problem than a Flink Problem. If you want
>> it quick and dirty you could implement a class such as:
>>
>> public class Value {
>> private boolean isLongSet = false;
>> private long longValue = 0L;
>> private boolean isIntegerSet = false;
>> private int intValue = 0;
>>
>>public Value(final long value) {
>>setLong(value);
>>}
>>
>> public void setLong(final long value) |
>> longValue = value;
>> isLongSet = true;
>>}
>>
>>public long getLong() {
>>if(isLongSet) {
>>return longValue
>>}
>>}
>>
>>// Add same methods for int
>>// to satisfy POJO requirements you will also need to add a
>> no-argument constructor as well as getters and setters for the boolean flags
>> }
>>
>> I guess a cleaner solution would be possible using a custom Kryo
>> serializer as explained here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>> Regards
>>   Klemens
>>
>>
>>
>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo > >:
>> >
>> > Hi everyone,
>> >
>> > I have a ProcessFunction which needs to store different number types
>> for different keys, e.g., some keys need to store an integer while others
>> need to store a double.
>> >
>> > I tried to use java.lang.Number as the type for the ValueState, but I
>> got the expected "No fields were detected for class java.lang.Number so it
>> cannot be used as a POJO type and must be processed as GenericType."
>> >
>> > I have the feeling that this is not the right approach, but the exact
>> type to be stored is only known at runtime which makes things a bit
>> trickier. Is there a way to register these classes correctly, or Is it
>> preferable to use different ValueState's for different types?
>> >
>> > Thanks,
>> > Miguel
>>
>>


Re: Official flink java client

2021-04-23 Thread gaurav kulkarni
 Thanks for the response, folks! I plan to use the client mostly for monitoring 
status of jobs, probably to trigger savepoints too. I may extend it in future 
to submit jobs. Given RestClusterClient is not officially supported, I will 
probably build something myself. Agree with Flavio, it would be great if there 
is an official client available or if the client can be generated 
automatically. 
Thanks,Gaurav
On Friday, April 23, 2021, 06:18:35 AM PDT, Flavio Pompermaier 
 wrote:  
 
 Yes, that's a known risk. Indeed it would be awesome if the REST API would be 
published also using some format that allow automatic client generation (like 
swagger or openapi). Also release an official client could be an option 
otherwise...I think that it's very annoying to write a client from scratch.I'll 
continue to use RestClusterClient until it works..
On Fri, Apr 23, 2021 at 2:48 PM Yun Gao  wrote:

Hi Flavio,
Got that, from my view I think RestClusterClient might not be viewed as public 
API, and might be change between version, thus it might need to be careful when 
upgrading.
Best,Yun


 --Original Mail --Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 16:10:05 2021Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Re: Official flink java client
Obviously I could rewrite a java client from scratch that interface with the 
provided REST API but why if I can reuse something already existing?Usually I 
interface with REST API using auto generated clients (if APIs are exposed via 
Swagger or OpenApi).If that's not an option, writing a REST client from scratch 
is something I try to avoid as much as I can..
Best,Flavio
On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

Hi Falvio,
Very thanks for the explanation, may be another option is to have a look at the 
http rest API[1] ? Flink provides official http api to submit jar jobs and 
query job status, and they might be able to help.
Best,Yun
[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client
I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).I use an 
extended version of the RestClusterClient that you can reuse if you want to.It 
is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).Officially the Flink RestClusterClient is 
meant to be used for internal use only but it actually work very well. 
Best,Flavio
[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,
Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the StreamExecutionEnvironment to execute their jobs. Could you share 
more about why you want to directly use the client? 
Best,Yun


 --Original Mail --Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021Recipients:User 
Subject:Official flink java client
Hi, 
Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 
Thanks,Gaurav

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Run already deployed job on Flink Cluster using RestClusterClient

I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...
 |

 |

 |













  

Too man y checkpoint folders kept for externalized retention.

2021-04-23 Thread John Smith
Hi running 1.10.0.

Just curious is this specific to externalized retention or checkpointing in
general.

I see my checkpoint folder counting thousands of chk-x folders.

If using default checkpoint or NONE externalized checkpointing does the
count of chk- folders grow indefinitely until the job is killed or it
retains up to certain amount?

Thanks


Re: MemoryStateBackend Issue

2021-04-23 Thread Milind Vaidya
Hi Matthias,

Yeah you are right. I am canceling the job and hence it is creating new job
with new job id and hence it is no respecting previous checkpoint. I
observed same behaviour even for local FS backend.

Is there any way to simulated failing of job locally ?

As far as config is concerned, I have not configured any back end in the
conf file and defaulting to Memory Checkpoint.

Thanks,
Milind



On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl 
wrote:

> One additional question: How did you stop and restart the job? The
> behavior you're expecting should work with stop-with-savepoint. Cancelling
> the job and then just restarting it wouldn't work. The latter approach
> would lead to a new job being created.
>
> Best,
> Matthias
>
> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl 
> wrote:
>
>> Hi Milind,
>> I bet someone else might have a faster answer. But could you provide the
>> logs and config to get a better understanding of what your issue is?
>> In general, the state is maintained even in cases where a TaskManager
>> fails.
>>
>> Best,
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:
>>
>>> Hi
>>>
>>> I see MemoryStateBackend being used in TM Log
>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
>>> has been configured, using default (Memory / JobManager)
>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>>> maxStateSize: 5242880)
>>>
>>>
>>>
>>> I am logging checkpointed value which is just message count
>>>
>>> Snapshot the state 500
>>> Snapshot the state 1000
>>>
>>>
>>> When I restart the job i.e. new TM but the job manager is same I see
>>>
>>> Snapshot the state 500
>>>
>>> In the JM logs I see following entries
>>>
>>> Triggering checkpoint 1
>>> Triggering checkpoint 2
>>>
>>> After restarting job hence new TM
>>>
>>> Triggering checkpoint 1
>>>
>>> As per my understanding JM should hold the checkpointed
>>> 
>>> state across TM ? Am I correct?
>>>
>>> I have not configured anything special and using default. Do I need to
>>> add any setting to make it work ?
>>> I want to maintain message count across the TMs.
>>>
>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica 
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


FLINK Invocation error

2021-04-23 Thread Vijayendra Yadav
Hi Team,

While restarting Flink application from CHECKPOINT, facing the following
Error(intermittently), but it does not impact Job getting submitted or
functionality. But still wondering what could be the reason and solution ?

*RUN Command:*

/usr/lib/flink/bin/flink run
   \
-s
*s3://bucket-app/flink/checkpoint/app/0c9be9b65962e068b6b138ed81f7ae14/chk-13229/*
  \
-c com.comp.App \
-m yarn-cluster
  \
-yjm 4096m
   \
-ytm 6144m
   \
-ynm flink-app\
-yt ${app_install_path}/conf
   \
${app_install_path}/*.jar
  \
--conffile ${app_install_path}/application.properties
  \
--app App


*ERROR Messages:*

*Job has been submitted with JobID e510e34928101ed53cb08df6d3d29f69*
13:00:35.488 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error
while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_265]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_265]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
[hadoop-common-2.10.0-amzn-0.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
~[?:1.8.0_265]
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at com.directv.dis.DisFlinkService.execute(DisFlinkService.java:73)
~[?:?]
at
com.directv.dis.DisFlinkEmrApplication.main(DisFlinkEmrApplication.java:38)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
... 11 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been exhausted.
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:302)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_265]
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
~[flink-dist_2.11-1

Writing to Avro from pyflink

2021-04-23 Thread Edward Yang
I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu,
I've tested my code with an iterator writing to csv and everything works as
expected. Reading through the flink documentation I see that I should add
jar dependencies to work with avro. I downloaded three jar files that I
believe are required for avro like so:

table_env\
.get_config()\
.get_configuration()\
.set_string(
"pipeline.jars",

rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what
I'm supposed to do as I'm not familiar with java and when I switch the sink
format to avro I get some unexpected errors:

Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
at 
org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
at 
org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:

pojo warning when using auto generated protobuf class

2021-04-23 Thread Prashant Deva
I am seeing this warning msg when trying to use a custom protobuf
de/serializer with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor
- Class class com.xx.APITrace cannot be used as a POJO type because not all
fields are valid POJO fields, and must be processed as GenericType. Please
read the Flink documentation on "Data Types & Serialization" for details of
the effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema,
SerializationSchema {

override fun getProducedType(): TypeInformation {
return TypeInformation.of(Trace.APITrace::class.java)
}

override fun deserialize(message: ByteArray): Trace.APITrace {
return Trace.APITrace.parseFrom(message)
}

override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
return false
}

override fun serialize(element: Trace.APITrace): ByteArray {
return element.toByteArray()
}
}