Collecting operators real output cardinalities as json files

2020-05-23 Thread Francesco Ventura
Hi everybody, 

I would like to collect the statistics and the real output cardinalities about 
the execution of many jobs as json files. I know that exist a REST interface 
that can be used but I was looking for something simpler. In practice, I would 
like to get the information showed in the WebUI at runtime about a job and 
store it as a file. I am using the env.getExecutionPlan() to get the execution 
plan of a job with the estimated cardinalities for each operator. However, it 
includes only estimated cardinalities and it can be used only before calling 
env.execute(). 

There is a similar way to extract the real output cardinalities of each 
pipeline after the execution? 
Is there a place where the Flink cluster stores the history of the information 
about executed jobs?
Developing a REST client to extract such information is the only way possible? 

I also would like to avoid adding counters to the job source code since I am 
monitoring the run time execution and I should avoid everything that can 
interfere.

Maybe is a trivial problem but I have a quick look around and I can not find 
the solution.

Thank you very much,

Francesco

Re: Question about My Flink Application

2020-05-23 Thread Alexander Fedulov
Returning the discussion to the mailing list ( it accidentally went to a
side channel because of a direct reply).
What I was referring to, is the event-time processing semantic, which is
based on the watermarks mechanism [1].
If you are using it, the event time at your KeyedBroadcastProcessFuction
will be determined as a minimum value of the maximum watermarks observed
across all of the input channels. In order not to stall the processing of
the events of the main data flow by the control channel (broadcast stream),
you could set it's watermark to the maximum possible value, as shown in
this example [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
[2]
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference



On Sat, May 23, 2020 at 1:05 AM Sara Arshad 
wrote:

> - It was based on something I read about the broadcast.
> Besides, as I mentioned before, the restart happens when it's triggering
> checkpoints.
> - When I send the streams it processes it perfectly fine between restarts.
> - Yes, I am using ProcessingTimeService in the cache source to make it
> get data every 300 seconds.
> Do you have any views on should it be doable with a stream of a million
> messages, In case I improve my implementation?
>
> Best regards,
> Sara
>
> On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov 
> wrote:
>
>> OK, with such data sizes this should definitely be doable with a
>> broadcast channel.
>> "The problem was that the broadcast puts a lot of pressure on
>> checkpointing." - is this the evaluation of the AWS support? Do you have
>> any details as to why this is considered to be the case?
>> "Even before I start to send the Kinesis stream it stuck." - so do you
>> actually see any data output or nothing is happening and 20 minutes later
>> the job crashes?
>> Are you using event time processing semantics in your pipeline?
>>
>> --
>>
>> Alexander Fedulov | Solutions Architect
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> On Fri, May 22, 2020 at 4:34 PM Sara Arshad 
>> wrote:
>>
>>> Hi Alexander,
>>>
>>> It's not that much data. I have only 2 records in my dynamodb right now
>>> (later it can be around 100 records. it's not that much) and I update
>>> the whole data every 300 seconds.
>>> Even before I start to send the Kinesis stream it stuck.
>>> Yes, I can see the checkpoint size is around 150k. But in some cases
>>> when I sent Kinesis Stream of 80 messages it's around 190k.
>>> The maximum checkpoint duration is 670.
>>>
>>> Regards,
>>>
>>>
>>> On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, 
>>> wrote:
>>>
 Hi Sara,

 what is the volume of data that is coming in through the broadcast
 channel every 30 seconds? Do you only insert modified rules entries or all
 of them on each update?
 Do you have access to metrics? Specifically, the size of the
 checkpoints and time distribution of different checkpoint phases are of
 interest.

 Best,

 --

 Alexander Fedulov | Solutions Architect

 

 Follow us @VervericaData

 --

 Join Flink Forward  - The Apache Flink
 Conference

 On Fri, May 22, 2020 at 3:57 PM Sara Arshad 
 wrote:

> The problem was that the broadcast puts a lot of pressure on
> checkpointing.
> I have to find another solution.
> If you have any other solution please let me know.
>
> Regards,
> Sara
>
> On Wed, 20 May 2020, 5:55 pm Sara Arshad, 
> wrote:
>
>> That was the broadcast stream. Which is supposed to behave like a
>> cache.
>> Then I connect that one to the kinesis stream like the below code.
>> Also, I have two Sink functions to send the results to another
>> dynamoDb table or cloud watch based on the output type.
>> Is this make sense or do you have another idea?
>>
>> DataStreamSource ruleDataStreamSource = env.addSource(new 
>> DynamoDBSource()).setParallelism(1);
>>
>> MapStateDescriptor ruleStateDescriptor = new 
>> MapStateDescriptor<>(
>> "RulesBroadcastState",
>> BasicTypeInfo.STRING_TYPE_INFO,
>> TypeInformation.of(new TypeHint() {
>> }));
>>
>> BroadcastStream ruleBroadcastStream = ruleDataStreamSource
>> .broadcast(ruleStateDescriptor);
>>
>> SingleOutputStreamOperator process = 
>> env.addSource(ObjectFactory.getKinesisConsumer())
>> .keyBy(new KeySelector>() {
>>
>> @Override
>> pu

Timeout Callbacks issue -Flink

2020-05-23 Thread Jaswin Shah
Hi,
I am running flink job with following functionality:

  1.  I consume stream1 and stream2 from two kafka topics and assign the 
watermarks to the events of two streams by extracting the timestamps from the 
events in streams.
  2.  Then, I am connecting two streams and calling KeyedCoProcessFunction on 
connectedStream.
  3.  I have processElement1 method and processElement2 methods which receive 
the events of two streams 1 and 2 and do the join logic as shown in below code 
snippet.
  4.  I have shared mapstate for two streams.
  5.  When an event comes to processElement method, I register the callback 
time for that message to ensure if corresponding matching message is not 
arrived from other stream, I will send the message to sideOutput on invocation 
of callback method i.e. onTimer.

Something is getting wrong in the callback times registrations for events due 
to which for many messages of stream2 the callback is coming earlier than 
registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the 
timevalue in event message, for stream1 it;s normal TZ only. Though I am weak 
in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and 
timestamp calculations and watermarks registrations.

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartPgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {

MapStateDescriptor cartPgMapStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartPG.class)
);
cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
}

/**
 *
 * @return
 */
@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
logger.info("On timer called key is {}",ctx.getCurrentKey());
String searchKey = ctx.getCurrentKey();
CartPG  cartPg = cartPgState.get(searchKey);
if(Objects.nonNull(cartPg)) {
ctx.output(CartPGSideOutput.getOutputTag(), 
cartPgState.get(ctx.getCurrentKey()));
cartPgState.remove(searchKey);
}
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {
Long cartEventTimeStamp = context.timestamp();
logger.info("cart time : {} ",cartEventTimeStamp);
context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ 
ConfigurationsManager.getMaxWaitTimeForPGMessage());

String searchKey = cartMessage.createJoinStringCondition();

CartPG cartPG = cartPgState.get(searchKey);

if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setCartMessage(cartMessage);
cartPgState.put(searchKey,cartPG);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {

Long pgEventTimeStamp = context.timestamp();
logger.info("pg time : {} ",pgEventTimeStamp);

context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

String searchKey = pgMessage.createJoinStringCondition();
CartPG cartPG = cartPgState.get(searchKey);


Re: Timeout Callbacks issue -Flink

2020-05-23 Thread Jaswin Shah
++
Here, I am registering the callback time for an even with processing time and 
calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for 
callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah 
Sent: 23 May 2020 17:18
To: user@flink.apache.org ; Arvid Heise 
; Yun Tang 
Subject: Timeout Callbacks issue -Flink

Hi,
I am running flink job with following functionality:

  1.  I consume stream1 and stream2 from two kafka topics and assign the 
watermarks to the events of two streams by extracting the timestamps from the 
events in streams.
  2.  Then, I am connecting two streams and calling KeyedCoProcessFunction on 
connectedStream.
  3.  I have processElement1 method and processElement2 methods which receive 
the events of two streams 1 and 2 and do the join logic as shown in below code 
snippet.
  4.  I have shared mapstate for two streams.
  5.  When an event comes to processElement method, I register the callback 
time for that message to ensure if corresponding matching message is not 
arrived from other stream, I will send the message to sideOutput on invocation 
of callback method i.e. onTimer.

Something is getting wrong in the callback times registrations for events due 
to which for many messages of stream2 the callback is coming earlier than 
registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the 
timevalue in event message, for stream1 it;s normal TZ only. Though I am weak 
in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and 
timestamp calculations and watermarks registrations.

/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartPgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {

MapStateDescriptor cartPgMapStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartPG.class)
);
cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
}

/**
 *
 * @return
 */
@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
logger.info("On timer called key is {}",ctx.getCurrentKey());
String searchKey = ctx.getCurrentKey();
CartPG  cartPg = cartPgState.get(searchKey);
if(Objects.nonNull(cartPg)) {
ctx.output(CartPGSideOutput.getOutputTag(), 
cartPgState.get(ctx.getCurrentKey()));
cartPgState.remove(searchKey);
}
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {
Long cartEventTimeStamp = context.timestamp();
logger.info("cart time : {} ",cartEventTimeStamp);
context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ 
ConfigurationsManager.getMaxWaitTimeForPGMessage());

String searchKey = cartMessage.createJoinStringCondition();

CartPG cartPG = cartPgState.get(searchKey);

if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setCartMessage(cartMessage);
cartPgState.put(searchKey,cartPG);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
 

Stateful-fun-Basic-Hello

2020-05-23 Thread C DINESH
Hi Team,

I am writing my first stateful fun basic hello application. I am getting
the following Exception.

$ ./bin/flink run -c
org.apache.flink.statefun.flink.core.StatefulFunctionsJob
./stateful-sun-hello-java-1.0-SNAPSHOT-jar-with-dependencies.jar





 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Invalid configuration:
classloader.parent-first-patterns.additional; Must contain all of
org.apache.flink.statefun, org.apache.kafka, com.google.protobuf

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by:
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException:
Invalid configuration: classloader.parent-first-patterns.additional; Must
contain all of org.apache.flink.statefun, org.apache.kafka,
com.google.protobuf

at
org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator.validateParentFirstClassloaderPatterns(StatefulFunctionsConfigValidator.java:55)

at
org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator.validate(StatefulFunctionsConfigValidator.java:44)

at
org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.(StatefulFunctionsConfig.java:143)

at
org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.fromEnvironment(StatefulFunctionsConfig.java:105)

This is my POM file I hope I have added all the dependencies. Please
suggest me what to do.


http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

org.example
stateful-sun-hello-java
1.0-SNAPSHOT



com.google.protobuf
protobuf-java
3.6.1


org.apache.flink
statefun-sdk
2.0.0


org.apache.flink
statefun-flink-distribution
2.0.0


org.apache.flink
statefun-kafka-io
2.0.0






clean generate-sources compile install




com.github.os72
protoc-jar-maven-plugin
3.6.0.1


generate-sources

run


direct


src/main/protobuf




java

src/main/java


grpc-java

io.grpc:protoc-gen-grpc-java:1.15.0

src/main/java








org.apache.maven.plugins
maven-assembly-plugin
2.4.1



jar-with-dependencies





org.apache.flink.statefun.flink.core.StatefulFunctionsJob






make-assembly

package

single





org.apache.maven.plugins
maven-compiler-plugin
3.8.0

1.8
1.8








Thanks,
Dinesh


回复:onTimer method in CoProcessFunction in flink

2020-05-23 Thread Yun Gao
Hi Jaswin,

 If I understand right, I think you could add the logic in the onTimer 
callback. In this callback, OnTimerContext.output(xx, outputTag) could be used 
to output data to the specific sideout. Besides, you should need a new state to 
store the elements to output in the onTimer callback. A similar example might 
be [1].

Best,
 Yun
​ 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example




 --原始邮件 --
发件人:Jaswin Shah 
发送时间:Fri May 22 23:00:43 2020
收件人:user@flink.apache.org , Arvid Heise 
, Yun Tang 
主题:onTimer method in CoProcessFunction in flink

How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;


/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 *
 * @return
 */

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);

String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}

/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (

Re: Apache Flink - Error on creating savepoints using REST interface

2020-05-23 Thread Chesnay Schepler

You also have to set the boolean cancel-job parameter.

On 22/05/2020 22:47, M Singh wrote:

Hi:

I am using Flink 1.6.2 and trying to create a savepoint using the 
following curl command using the following references 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html) 
and 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html 



curl -v -H "Content-Type: application/json" -XPOST 
http:///jobs//savepoints -d 
'{"target-directory":"s3://mys3bucket/savepointdirectory/testMay22-sp1/"}'

But I am getting the following error:
{"errors":["Request did not match expected format 
SavepointTriggerRequestBody."]}

Can you please let me know what I could be missing ?
Thanks





回复:Re: onTimer method in CoProcessFunction in flink

2020-05-23 Thread Yun Gao
Hi Jaswin,
I think the event time timer and process time timer in Flink should be 
fully decoupled: the event time timer is trigger by the watermark received, and 
the processing time is trigger by physical clock, and you may think them as two 
seperated timelines and have no guarantee on their relative speed. Therefore, I 
think the result of computing the deadline with event time and register it as 
processing time should be nondetermined, and it depends on the gap between 
event time and processing time.

Best,
 Yun



 --原始邮件 --
发件人:Jaswin Shah 
发送时间:Sat May 23 22:08:57 2020
收件人:user@flink.apache.org , Arvid Heise 
, Yun Tang 
主题:Re: onTimer method in CoProcessFunction in flink

Hi Yun,
Actually this problem is solved now. I have been stuck in other problem of 
timeoutcallbacks. Here, I am receiving the callbacks too early and the eventime 
registrations was somehow failing, might be it was needing some special 
handling. I need to know if this callback registration is wrong or is there 
something wrong.
Do we need some special handling for event time semantecs usages?
Thanks,
Jaswin

From: Jaswin Shah 
Sent: 22 May 2020 20:30
To: user@flink.apache.org ; Arvid Heise 
; Yun Tang 
Subject: onTimer method in CoProcessFunction in flink
How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;


/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 *
 * @return
 */

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);

String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNu

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-23 Thread C DINESH
Hi Jary,

What you mean by step banlence . Could you please provide a concrete example

On Fri, May 22, 2020 at 3:46 PM Jary Zhen  wrote:

> Hello everyone,
>
>First,a brief pipeline introduction:
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   consume multi kafka topic
>   -> union them
>   -> assignTimestampsAndWatermarks
>   -> keyby
>   -> window()  and so on …
> It's a very normal way use flink to process data like this in production
> environment.
> But,  If I want to test the pipeline above I need to use the api of
> FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
> So my question is how to control the ’step‘ banlence as one topic produces
> 3 records per second while another topic produces 3 per second.
>
> I don’t know if I describe clearly . so any suspicion please let me know
>
> Tks
>
>


stateful-fun2.0 checkpointing

2020-05-23 Thread C DINESH
Hi Team,

1. How can we enable checkpointing in stateful-fun2.0
2. How to set parallelism

Thanks,
Dinesh.


Re: Flink Window with multiple trigger condition

2020-05-23 Thread aj
I am still not able to get much after reading the stuff. Please help with
some basic code to start to build this window and trigger.

Another option I am thinking is I just use a Richflatmap function and use
the keyed state to build this logic. Is that the correct approach?



On Fri, May 22, 2020 at 4:52 PM aj  wrote:

>
>
> I was also thinking to have a processing time window but that will not
> work for me. I want to start the window when the user  "*search*" event
> arrives. So for each user window will start from the *search* event.
>  The Tumbling window has fixed start end time so that will not be suitable
> in my case.
>
>
>
>
> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> To achieve what you have in mind, I think what you have to do is to use a
>> processing time window of 30 mins, and have a custom trigger that matches
>> the "start" event in the `onElement` method and return
>> TriggerResult.FIRE_AND_PURGE.
>>
>> That way, the window fires either when the processing time has passed, or
>> the start event was recieved.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: Query Rest API from IDE during runtime

2020-05-23 Thread C DINESH
Hi Annemarie,

You need to use http client to connect to the job managaer.



 //Creating a HttpClient object
  CloseableHttpClient httpclient = HttpClients.createDefault();

  //Creating a HttpGet object
  HttpGet httpget = new HttpGet("https://${jobmanager:port}/jobs ");

  //Executing the Get request
  HttpResponse httpresponse = httpclient.execute(httpget);


from httpresponse you will get all the running job details.


On Fri, May 22, 2020 at 9:22 PM Annemarie Burger <
annemarie.bur...@campus.tu-berlin.de> wrote:

> Hi,
>
> I want to query Flink's REST API in my IDE during runtime in order to get
> the jobID of the job that is currently running. Is there any way to do
> this?
> I found the RestClient class, but can't seem to figure out how to exactly
> make this work. Any help much appreciated.
>
> Best,
> Annemarie
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.8.3 Kubernetes POD OOM

2020-05-23 Thread Josson Paul
Hi Andrey,
  We don't use Rocks DB. As I said in the original email I am using File
Based. Even though our cluster is on Kubernetes out Flink cluster is
Flink's stand alone resource manager. We have not yet integrated our Flink
with Kubernetes.

Thanks,
Josson

On Fri, May 22, 2020 at 3:37 AM Andrey Zagrebin 
wrote:

> Hi Josson,
>
> Do you use state backend? is it RocksDB?
>
> Best,
> Andrey
>
> On Fri, May 22, 2020 at 12:58 PM Fabian Hueske  wrote:
>
>> Hi Josson,
>>
>> I don't have much experience setting memory bounds in Kubernetes myself,
>> but my colleague Andrey (in CC) reworked Flink's memory configuration for
>> the last release to ease the configuration in container envs.
>> He might be able to help.
>>
>> Best, Fabian
>>
>> Am Do., 21. Mai 2020 um 18:43 Uhr schrieb Josson Paul <
>> jossonp...@gmail.com>:
>>
>>> Cluster type: Standalone cluster
>>> Job Type: Streaming
>>> JVM memory: 26.2 GB
>>> POD memory: 33 GB
>>> CPU: 10 Cores
>>> GC: G1GC
>>> Flink Version: 1.8.3
>>> State back end: File based
>>> NETWORK_BUFFERS_MEMORY_FRACTION : 0.02f of the Heap
>>> We are not accessing Direct memory from application. Only Flink uses
>>> direct memory
>>>
>>> We notice that in Flink 1.8.3 over a period of 30 minutes the POD is
>>> killed with OOM. JVM Heap is with in limit.
>>> We read from Kafka and have windows in the application. Our Sink is
>>> either Kafka or Elastic Search
>>> *The same application/job was working perfectly in Flink 1.4.1 with the
>>> same input rate and output rate*
>>> No back pressure
>>> *I have attached few Grafana charts as PDF*
>>> Any idea why the off heap memory / outside JVM memory is going up and
>>> eventually reaching the limit.
>>>
>>>  Java Heap (reserved=26845184KB, committed=26845184KB)
>>> (mmap: reserved=26845184KB, committed=26845184KB)
>>>
>>> - Class (reserved=1241866KB, committed=219686KB)
>>> (classes #36599)
>>> (malloc=4874KB #74568)
>>> (mmap: reserved=1236992KB, committed=214812KB)
>>>
>>> - Thread (reserved=394394KB, committed=394394KB)
>>> (thread #383)
>>> (stack: reserved=392696KB, committed=392696KB)
>>> (malloc=1250KB #1920)
>>> (arena=448KB #764)
>>>
>>> - Code (reserved=272178KB, committed=137954KB)
>>> (malloc=22578KB #33442)
>>> (mmap: reserved=249600KB, committed=115376KB)
>>>
>>> - GC (reserved=1365088KB, committed=1365088KB)
>>> (malloc=336112KB #1130298)
>>> (mmap: reserved=1028976KB, committed=1028976KB)
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Josson
>>>
>>

-- 
Thanks
Josson


Re: Flink 1.8.3 Kubernetes POD OOM

2020-05-23 Thread Josson Paul
Hi Andrey,
  To clarify the above email. I am using Heap Based State and not Rocks DB.

Thanks,
Josson

On Sat, May 23, 2020, 17:37 Josson Paul  wrote:

> Hi Andrey,
>   We don't use Rocks DB. As I said in the original email I am using File
> Based. Even though our cluster is on Kubernetes out Flink cluster is
> Flink's stand alone resource manager. We have not yet integrated our Flink
> with Kubernetes.
>
> Thanks,
> Josson
>
> On Fri, May 22, 2020 at 3:37 AM Andrey Zagrebin 
> wrote:
>
>> Hi Josson,
>>
>> Do you use state backend? is it RocksDB?
>>
>> Best,
>> Andrey
>>
>> On Fri, May 22, 2020 at 12:58 PM Fabian Hueske  wrote:
>>
>>> Hi Josson,
>>>
>>> I don't have much experience setting memory bounds in Kubernetes myself,
>>> but my colleague Andrey (in CC) reworked Flink's memory configuration for
>>> the last release to ease the configuration in container envs.
>>> He might be able to help.
>>>
>>> Best, Fabian
>>>
>>> Am Do., 21. Mai 2020 um 18:43 Uhr schrieb Josson Paul <
>>> jossonp...@gmail.com>:
>>>
 Cluster type: Standalone cluster
 Job Type: Streaming
 JVM memory: 26.2 GB
 POD memory: 33 GB
 CPU: 10 Cores
 GC: G1GC
 Flink Version: 1.8.3
 State back end: File based
 NETWORK_BUFFERS_MEMORY_FRACTION : 0.02f of the Heap
 We are not accessing Direct memory from application. Only Flink uses
 direct memory

 We notice that in Flink 1.8.3 over a period of 30 minutes the POD is
 killed with OOM. JVM Heap is with in limit.
 We read from Kafka and have windows in the application. Our Sink is
 either Kafka or Elastic Search
 *The same application/job was working perfectly in Flink 1.4.1 with the
 same input rate and output rate*
 No back pressure
 *I have attached few Grafana charts as PDF*
 Any idea why the off heap memory / outside JVM memory is going up and
 eventually reaching the limit.

  Java Heap (reserved=26845184KB, committed=26845184KB)
 (mmap: reserved=26845184KB, committed=26845184KB)

 - Class (reserved=1241866KB, committed=219686KB)
 (classes #36599)
 (malloc=4874KB #74568)
 (mmap: reserved=1236992KB, committed=214812KB)

 - Thread (reserved=394394KB, committed=394394KB)
 (thread #383)
 (stack: reserved=392696KB, committed=392696KB)
 (malloc=1250KB #1920)
 (arena=448KB #764)

 - Code (reserved=272178KB, committed=137954KB)
 (malloc=22578KB #33442)
 (mmap: reserved=249600KB, committed=115376KB)

 - GC (reserved=1365088KB, committed=1365088KB)
 (malloc=336112KB #1130298)
 (mmap: reserved=1028976KB, committed=1028976KB)



 --
 Thanks
 Josson

>>>
>
> --
> Thanks
> Josson
>