Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-13 Thread Yun Gao
  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, 
previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the 
discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism 
cases. Another option
might be let the StreamTask do one synchronization with the 
CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions 
are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details 
in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the 
input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that 
for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been 
finished. One option
to address this issue is to make the upstream tasks to wait for buffers get 
flushed before exit, and 
we would include this in the future versions. I updated this part in this 
section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint 
before exit. To support
the operators that need to wait for some finalization condition like the Sink 
committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the 
runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks 
are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


--
From:Yun Gao 
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it could not limit the 
possible max delay.

> Not all declines cause job failure, particularly 
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY 
indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint 
interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each 
checkpoint.

>> Thus another possible option might be let the upstream task to wait till all 
>> the pending buffers in the result partition has been flushed before get to 
>> finish.
> This is what I meant by "postpone JM notification from source". Just blocking 
> the task thread wouldn't add much complexity, though I'm not sure if it would 
> cause any problems.

>> do you think it would be ok for us to view it as an optimization and 
>> postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and 
currently 
I also do not see explicit problems for waiting for the flush of pipeline 
result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun



--
From:Khachatryan Roman 
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks F

Re: Metrics for average time taken by flatMap function

2021-01-13 Thread Manish G
This approach has an issue. Even for those periods when there is no
activity, still the latest gauge value is used for calculations and this
generates graphs which are not correct representation of the situation.

On Tue, Jan 12, 2021 at 7:01 PM Manish G 
wrote:

> Prometheus provides avg_over_time for a range vector. That seems to be
> better suited for this usecase.
>
> On Tue, Jan 12, 2021 at 6:53 PM Chesnay Schepler 
> wrote:
>
>> The cumulative time probably isn't that useful to detect changes in the
>> behavior of the application.
>>
>> On 1/12/2021 12:30 PM, Chesnay Schepler wrote:
>>
>> I mean the difference itself, not cumulative.
>>
>> On 1/12/2021 12:08 PM, Manish G wrote:
>>
>> Can you elaborate the second approach more?
>> Currently I am exposing the difference itself. OR do you mean the
>> cumulative difference?ie I maintain a member variable, say timeSoFar, and
>> update it with time consumed by each method call and then expose it.
>> Something like this:
>>
>> timeSoFar += timeConsumedByCurrentInvocation
>> this.simpleGaug.setValue( timeSoFar );
>>
>> On Tue, Jan 12, 2021 at 4:24 PM Chesnay Schepler 
>> wrote:
>>
>>> That approach will generally not work for jobs that run for a long time,
>>> because it will be nigh impossible for anomalies to affect the average. You
>>> want to look into exponential moving averages.
>>> Alternatively, just expose the diff as an absolute value and calculate
>>> the average in prometheus.
>>>
>>> On 1/12/2021 11:50 AM, Manish G wrote:
>>>
>>> OK, got it.
>>> So I would need to accumulate the time value over the calls as well as
>>> number of times it is called...and then calculate average(accumulated time/
>>> number of times called) and then set calculated value into gauge as above.
>>>
>>> On Tue, Jan 12, 2021 at 4:12 PM Chesnay Schepler 
>>> wrote:
>>>
 A gauge just returns a value, and Flink exposes it as is. As such you
 need to calculate the average over time yourself, taking 2 time
 measurements (before and after the processing of each).

 On 1/12/2021 11:31 AM, Manish G wrote:

 startTime is set at start of function:

 long startTime = System.currentTimeMillis();


 On Tue, Jan 12, 2021 at 3:59 PM Manish G 
 wrote:

> My code is:
>
> public class SimpleGauge implements Gauge {
>
> private T mValue;
>
> @Override
> public T getValue() {
> return mValue;
> }
>
> public void setValue(T value){
> mValue = value;
> }
> }
>
> And in flatmap function:
>
> float endTime = (System.currentTimeMillis() - startTime) / 1000F;
> this.simplegauge.setValue(endTime);
>
>
> So does it mean when flink calls my getValue function to accumulate the 
> value, and not to take it as snapshot?
>
>
> On Tue, Jan 12, 2021 at 3:53 PM Chesnay Schepler 
> wrote:
>
>> Sure, that might work. Be aware though that time measurements are,
>> compared to the logic within a function, usually rather expensive and
>> may impact performance.
>>
>> On 1/12/2021 10:57 AM, Manish G wrote:
>> > Hi All,
>> >
>> > I have implemented a flatmap function and I want to collect metrics
>> > for average time for this function which I plan to monitor via
>> prometheus.
>> >
>> > What would be good approach for it? I have added a gauge to the
>> > method(extending Gauge interface from flink API). Would it work for
>> my
>> > needs?
>> >
>> >
>>
>>

>>>
>>
>>


Re: Re: Idea import Flink source code

2021-01-13 Thread Matthias Pohl
Don't forget to use the reply-all button when replying to threads on the
mailing lists. :-)

Have you tried building the project via command line using `mvn -DskipTests
-Dfast install` to pull all dependencies?
And just to verify: you didn't change the code, did you? We're talking
about the vanilla Flink source code...?

Matthias

On Wed, Jan 13, 2021 at 9:18 AM penguin.  wrote:

> Hi,
> Thank you for your reminding.
>  It seems that there is something wrong with putting the picture in the
> text.
>
> ▼Sync: at 2021/1/13 12:05 with 18 errors
>
> ▼Resolve dependencies 4 errors
>  Cannot resolve netminidev:json-smart:2.3
>  Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0
>  Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
>  Cannot resolve com.nimbusds:lang-tag:1.5
> ▼Resolve plugins 14 errors
>  Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: 
> 
>
> Best,
> penguin
>
>
>
>
> 在 2021-01-13 15:24:22,"Matthias Pohl"  写道:
>
> Hi,
> you might want to move these kinds of questions into the
> user@flink.apache.org which is the mailing list for community support
> questions [1].
> Coming back to your question: Is it just me or is the image not
> accessible? Could you provide a textual description of your problem?
>
> Best,
> Matthias
>
>
> [1] https://flink.apache.org/community.html#mailing-lists
>
> On Wed, Jan 13, 2021 at 6:18 AM penguin.  wrote:
>
>> Hello,
>> When importing the Flink source code into idea, the following error
>> occurred.
>> And several mirrors were configured in the setting file of maven, which
>> did not solve the problem
>>
>>
>>
>
>
>


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread Aljoscha Krettek

On 2021/01/13 07:58, vinay.raic...@t-systems.com wrote:

Not sure about your proposal regarding Point 3:
*	firstly how is it ensured that the stream is closed? If I understand 
the doc correctly the stream will be established starting with the 
latest timestamp (hmm... is it not a standard behaviour?) and will 
never finish (UNBOUNDED),


On the first question of standard behaviour: the default is to start 
from the group offsets that are available in Kafka. This uses the 
configured consumer group. I think it's better to be explicit, though, 
and specify sth like `EARLIEST` or `LATEST`, etc.


And yes, the stream will start but never stop with this version of the 
Kafka connector. Only when you use the new `KafkaSource` can you also 
specify an end timestamp that will make the Kafka source shut down 
eventually.


*	secondly it is still not clear how to get the latest event  at a 
given time point in the past?


You are referring to getting a single record, correct? I don't think 
this is possible with Flink. All you can do is get a stream from Kafka 
that is potentially bounded by a start timestamp and/or end timestamp.


Best,
Aljoscha


Re:Re: Re: Idea import Flink source code

2021-01-13 Thread penguin.
Hi,
I click the reply button every time... Does this mean that only the replied 
person can see the email?




If Maven fails to download plugins or dependencies,  is mvn -clean install 
-DskipTests a must?

I'll try first.


penguin












在 2021-01-13 16:35:10,"Matthias Pohl"  写道:

Don't forget to use the reply-all button when replying to threads on the 
mailing lists. :-)


Have you tried building the project via command line using `mvn -DskipTests 
-Dfast install` to pull all dependencies?
And just to verify: you didn't change the code, did you? We're talking about 
the vanilla Flink source code...?


Matthias


On Wed, Jan 13, 2021 at 9:18 AM penguin.  wrote:

Hi,
Thank you for your reminding.
 It seems that there is something wrong with putting the picture in the text.
▼Sync: at 2021/1/13 12:05 with 18 errors

▼Resolve dependencies 4 errors
 Cannot resolve netminidev:json-smart:2.3
 Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0
 Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
 Cannot resolve com.nimbusds:lang-tag:1.5
▼Resolve plugins 14 errors
 Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: 

Best,
penguin










在 2021-01-13 15:24:22,"Matthias Pohl"  写道:

Hi,
you might want to move these kinds of questions into the user@flink.apache.org 
which is the mailing list for community support questions [1].
Coming back to your question: Is it just me or is the image not accessible? 
Could you provide a textual description of your problem?


Best,
Matthias




[1] https://flink.apache.org/community.html#mailing-lists


On Wed, Jan 13, 2021 at 6:18 AM penguin.  wrote:

Hello,
When importing the Flink source code into idea, the following error occurred.
And several mirrors were configured in the setting file of maven, which did not 
solve the problem








 

Re: Dead code in ES Sink

2021-01-13 Thread Aljoscha Krettek

On 2021/01/12 15:04, Rex Fenley wrote:

[2]
https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java#L131

Should [2] be removed?


The link seems to not work anymore but I'm guessing you're referring to 
`CONNECTION_MAX_RETRY_TIMEOUT_OPTION`. This is used in the 
`*DynamicSinkFactory` classes, such as [1]. These can be used when 
defining Table API/SQL sources using DDL or the programmatic API. The 
actual field is never used but it will be used to check the allowed 
options when verifying what users specify via "string" options.


[1] 
https://github.com/apache/flink/blob/ee653778689023ddfdf007d5bde1daad8fbbc081/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java#L98


Re: Re: Re: Idea import Flink source code

2021-01-13 Thread Matthias Pohl
The mvn command helps to identify whether your issue is related to Maven
and/or missing dependencies or whether it's an Intellij problem. Usually,
running `mvn clean install -DskipTests -Dfast` isn't required to import the
Flink project into Intellij.

Best,
Matthias

PS: reply adds only the immediate responder to the recipient lists (as
happened in your first reply) vs reply-all would also automatically add the
ML email address(es) (and other thread participants) to the CC list.

On Wed, Jan 13, 2021 at 9:49 AM penguin.  wrote:

> Hi,
> I click the reply button every time... Does this mean that only the
> replied person can see the email?
>
>
> If Maven fails to download plugins or dependencies,  is mvn -clean
> install -DskipTests a must?
> I'll try first.
>
> penguin
>
>
>
>
> 在 2021-01-13 16:35:10,"Matthias Pohl"  写道:
>
> Don't forget to use the reply-all button when replying to threads on the
> mailing lists. :-)
>
> Have you tried building the project via command line using `mvn
> -DskipTests -Dfast install` to pull all dependencies?
> And just to verify: you didn't change the code, did you? We're talking
> about the vanilla Flink source code...?
>
> Matthias
>
> On Wed, Jan 13, 2021 at 9:18 AM penguin.  wrote:
>
>> Hi,
>> Thank you for your reminding.
>>  It seems that there is something wrong with putting the picture in the
>> text.
>>
>> ▼Sync: at 2021/1/13 12:05 with 18 errors
>>
>> ▼Resolve dependencies 4 errors
>>  Cannot resolve netminidev:json-smart:2.3
>>  Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0
>>  Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
>>  Cannot resolve com.nimbusds:lang-tag:1.5
>> ▼Resolve plugins 14 errors
>>  Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: 
>> 
>>
>> Best,
>> penguin
>>
>>
>>
>>
>> 在 2021-01-13 15:24:22,"Matthias Pohl"  写道:
>>
>> Hi,
>> you might want to move these kinds of questions into the
>> user@flink.apache.org which is the mailing list for community support
>> questions [1].
>> Coming back to your question: Is it just me or is the image not
>> accessible? Could you provide a textual description of your problem?
>>
>> Best,
>> Matthias
>>
>>
>> [1] https://flink.apache.org/community.html#mailing-lists
>>
>> On Wed, Jan 13, 2021 at 6:18 AM penguin.  wrote:
>>
>>> Hello,
>>> When importing the Flink source code into idea, the following error
>>> occurred.
>>> And several mirrors were configured in the setting file of maven, which
>>> did not solve the problem
>>>
>>>
>>>


Re: Metrics for average time taken by flatMap function

2021-01-13 Thread Chesnay Schepler
If you want the gauge to only represent recent activity then you will 
need to use a timer of sorts to reset the gauge after N time (something 
larger than the reporter interval) unless it was changed in the meantime 
(e.g., by also recording a timestamp within SimpleGauge)


On 1/13/2021 9:33 AM, Manish G wrote:
This approach has an issue. Even for those periods when there is no 
activity, still the latest gauge value is used for calculations and 
this generates graphs which are not correct representation of the 
situation.


On Tue, Jan 12, 2021 at 7:01 PM Manish G > wrote:


Prometheus provides avg_over_time for a range vector. That seems
to be better suited for this usecase.

On Tue, Jan 12, 2021 at 6:53 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

The cumulative time probably isn't that useful to detect
changes in the behavior of the application.

On 1/12/2021 12:30 PM, Chesnay Schepler wrote:

I mean the difference itself, not cumulative.

On 1/12/2021 12:08 PM, Manish G wrote:

Can you elaborate the second approach more?
Currently I am exposing the difference itself. OR do you
mean the cumulative difference?ie I maintain a member
variable, say timeSoFar, and update it with time consumed by
each method call and then expose it. Something like this:

timeSoFar += timeConsumedByCurrentInvocation
this.simpleGaug.setValue( timeSoFar );

On Tue, Jan 12, 2021 at 4:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

That approach will generally not work for jobs that run
for a long time, because it will be nigh impossible for
anomalies to affect the average. You want to look into
exponential moving averages.
Alternatively, just expose the diff as an absolute value
and calculate the average in prometheus.

On 1/12/2021 11:50 AM, Manish G wrote:

OK, got it.
So I would need to accumulate the time value over the
calls as well as number of times it is called...and
then calculate average(accumulated time/ number of
times called) and then set calculated value into gauge
as above.

On Tue, Jan 12, 2021 at 4:12 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

A gauge just returns a value, and Flink exposes it
as is. As such you need to calculate the average
over time yourself, taking 2 time measurements
(before and after the processing of each).

On 1/12/2021 11:31 AM, Manish G wrote:

startTime is set at start of function:

long startTime = System.currentTimeMillis();

On Tue, Jan 12, 2021 at 3:59 PM Manish G
mailto:manish.c.ghildi...@gmail.com>> wrote:

My code is:

|public class SimpleGauge implements
Gauge { private T mValue; @Override public
T getValue() { return mValue; } public void
setValue(T value){ mValue = value; } }|

And in flatmap function:

|float endTime = (System.currentTimeMillis() -
startTime) / 1000F;
this.simplegauge.setValue(endTime); |

|So does it mean when flink calls my getValue
function to accumulate the value, and not to
take it as snapshot? |


On Tue, Jan 12, 2021 at 3:53 PM Chesnay
Schepler mailto:ches...@apache.org>> wrote:

Sure, that might work. Be aware though
that time measurements are,
compared to the logic within a function,
usually rather expensive and
may impact performance.

On 1/12/2021 10:57 AM, Manish G wrote:
> Hi All,
>
> I have implemented a flatmap function
and I want to collect metrics
> for average time for this function which
I plan to monitor via prometheus.
>
> What would be good approach for it? I
have added a gauge to the
> method(extending Gauge interface from
flink API). Would it work for my
> needs?
>
>













Re: Flink 1.12 Kryo Serialization Error

2021-01-13 Thread Timo Walther

Hi Yuval,

could you share a reproducible example with us?

I see you are using SQL / Table API with a RAW type. I could imagine 
that the KryoSerializer is configured differently when serializing and 
when deserializing. This might be due to `ExecutionConfig` not shipped 
(or copied) through the stack correctly.


Even though an error in the stack should be visible immediately and not 
after 30 seconds, I still would also investigate an error in this direction.


Regards,
Timo


On 13.01.21 09:47, Piotr Nowojski wrote:

Hi Yuval,

I have a couple of questions:

1. I see that you are using Flink 1.12.0, is that correct?
2. Have you tried running your application with a different Flink 
version? If you are using 1.12.0, could you check Flink 1.11.3, or vice 
versa?
3. What's the configuration that you are using? For example, have you 
enabled unaligned checkpoints or some other feature?
4. Is the problem still there if you replace Kryo with something else 
(Java's serialisation?)?
5. Could it be a problem with dependency convergence? Like maybe there 
are different versions of Flink jars present during runtime?
6. Lastly, would it be possible for you to prepare a minimal example 
that could reproduce the problem?


Piotrek

wt., 12 sty 2021 o 17:19 Yuval Itzchakov > napisał(a):


Hi Chesnay,
Turns out it didn't actually work, there were one or two
successful runs but the problem still persists (it's a bit non
deterministic, and doesn't always reproduce when parallelism is set
to 1).

I turned off all Kryo custom serialization and am only using Flink
provided one's ATM, the problem still persists.
There seems to be an issue with how Flink serializes these raw types
over the wire, but I still can't put my finger as to what the
problem is.

What I can see is that Flink tries to consume a HybridMemorySegment
which contains one of these custom raw types I have and because of
malformed content it receives a negative length for the byte array:

image.png

Content seems to be prepended with a bunch of NULL values which
through off the length calculation:

image.png

But I still don't have the entire chain of execution wrapped
mentally in my head, trying to figure it out.

An additional error I'm receiving, even when removing the
problematic JSON field and switching it out for a String:

java.lang.IllegalStateException: When there are multiple buffers, an
unfinished bufferConsumer can not be at the head of the buffers queue.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
~[flink-core-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.

Re: Flink streaming sql是否支持两层group by聚合

2021-01-13 Thread Joshua Fan
Hi Jark and Benchao

I have learned from your previous email  on how to do pv/uv in flink sql.
One is to make a MMdd grouping, the other is to make a day window.
Thank you all.

I have a question about the result output. For MMdd grouping, every
minute the database would get a record, and many records would be in the
database as time goes on, but there would be only a few records in the
database according to the day window.

for example, the pv would be 12:00,100   12:01,200  12:02,300   12:03,400
according to the MMdd grouping solution, for the day window solution,
there would be only one record as  12:00,100 |12:01,200|12:02,300|12:03,400.

I wonder, for the day window solution, is it possible to have the same
result output as the MMdd solution? because the day window solution has
no worry about the state retention.

Thanks.

Yours sincerely

Josh

On Sat, Apr 18, 2020 at 9:38 PM Jark Wu  wrote:

> Hi,
>
> I will use English because we are also sending to user@ ML.
>
> This behavior is as expected, not a bug. Benchao gave a good explanation
> about the reason. I will give some further explanation.
> In Flink SQL, we will split an update operation (such as uv from 100 ->
> 101) into two separate messages, one is -[key, 100], the other is +[key,
> 101].
> Once these two messages arrive the downstream aggregation, it will also
> send two result messages (assuming the previous SUM(uv) is 500),
> one is [key, 400], the other is [key, 501].
>
> But this problem is almost addressed since 1.9, if you enabled the
> mini-batch optimization [1]. Because mini-batch optimization will try best
> to the
> accumulate the separate + and - message in a single mini-batch processing.
> You can upgrade and have a try.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>
>
>
> On Sat, 18 Apr 2020 at 12:26, Benchao Li  wrote:
>
>> 这个按照目前的设计,应该不能算是bug,应该是by desigh的。
>> 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。
>>
>> dixingxing85 于2020年4月18日 周六上午11:38写道:
>>
>>> 多谢benchao,
>>> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
>>> 20200417,86
>>> 20200417,90
>>> 20200417,130
>>> 20200417,131
>>>
>>> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
>>> 20200417,90
>>> 20200417,86
>>> 20200417,130
>>> 20200417,86
>>> 20200417,131
>>>
>>> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
>>> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>>>
>>> Sent from my iPhone
>>>
>>> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
>>>
>>> 
>>>
>>> Hi,
>>>
>>> 这个是支持的哈。
>>> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
>>> 如果是两层的话,就成了:
>>> 第一层-[old], 第二层-[cur], +[old]
>>> 第一层+[new], 第二层[-old], +[new]
>>>
>>> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>>>

 Hi all:

 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
 或者flink还不支持这种sql*。
 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A
 -> dt,  B -> pvareaid)

 SELECT dt, SUM(a.uv) AS uv
 FROM (
SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
FROM streaming_log_event
WHERE action IN ('action1')
   AND pvareaid NOT IN ('pv1', 'pv2')
   AND pvareaid IS NOT NULL
GROUP BY dt, pvareaid
 ) a
 GROUP BY dt;

 sink接收到的数据对应日志为:

 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,86,20200417)
 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,130,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,130,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,86,20200417)
 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(false,0,86,20200417)
 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
 (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
 data(true,0,131,20200417)


 我们使用的是1.7.2, 测试作业的并行度为1。
 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228


 --
 dixingxin...@163.com


>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>


Bugs in Streaming job stopping? Weird graceful stop/restart for disjoint job graph

2021-01-13 Thread Theo Diefenthal
Hi there, 

I'm currently analyzing a weird behavior of one of our jobs running on YARN 
with Flink 1.11.2. I have a kind of special situation here in that regard that 
I submit a single streaming job with a disjoint job graph, i.e. that job 
contains two graphs of the same kind but totally independent of each other (one 
having an ETL pipeline for source1, another for source2). It's just for 
historical reasons that makes deployment a bit easier. 

I had the job running nicely until I wanted to stop it with a savepoint as 
usual like so: 
flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS 
--yarnapplicationId=${FLINK_YARN_APPID} ${ID} 

After a minute, I receiveid a timeout exception [1]. 

Now the interesting part/possible bugs are the following 3: 
1. The stop was triggered at 21/01/12 06:15:10. I see that almost all tasks of 
the entire job switched from RUNNING to FINISHED within seconds, but two tasks 
had something that looks like a racecondition on shutdown. They threw an 
IllegalStateException in BufferBuilder.append where an assertion makes sure 
that the buffer is not yet finished. [2] 
2. That failure lead to RESTARTING the tasks of that job. So the failure 
occured 5 seconds after I triggered to stop the job. And 2 seconds later, I see 
that the pipeline switched it's state to RUNNING again. No wonder that the 
"stop" eventually stopped with a Timeout as the Job didn't think about shutting 
down anymore. 
3. BUT the major issue for me here is: The entire pipeline of source1 was 
restarted, but the pipeline of source2 was still FINISHED. As Fink did quite 
some stuff with Batch/Streaming unification and region failover/restart in the 
last versions, my guess is that as I am in the special case of a disjoint graph 
here, only the tasks in the connected graph where the error occured restarted 
properly and the other graph was left in FINISHED state, even though I am 
dealing with a streaming job here. 

The problem is that the job was left in kind of a broken state: From just 
watching at YARN / Flink UI it seemed to be still running and the stop had no 
effect, but in reality, it shut down a huge part of the job. My workaround of 
course is as following: 
1. If a "graceful stop" won't succeed, in future I will trigger a hard kill 
"yarn application -kill" afterwards because I can't be certain in what state 
the job is after a failed attempt to stop. 
2. I will enforce stronger isolation in my jobs so that I always have connected 
graphs as jobs. In my case: I will deploy two independent jobs for the two ETL 
pipelines and hope that this problem won't arise again (At least, have the 
entire job either FINISHED or RUNNING). 

But I'm curious what you think: Are those 3 bugs or (some of it) kind of 
expected behaviour? Should I open bug ticket(s) for those? 

Best regards 
Theo 



[1] Timeout from flink stop: 
org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"f23290bf5fb0ecd49a4455e4a65f2eb6". 
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495) 
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) 
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487) 
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931) 
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) 
Caused by: java.util.concurrent.TimeoutException 
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) 
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493) 
... 9 more 
[2] Exception in graceful shutdown: 
2021-01-12T06:15:15.827877+01:00 WARN org.apache.flink.runtime.taskmanager.Task 
Source: rawdata_source1 -> validation_source1 -> enrich_source1 -> 
map_json_source1 -> Sink: write_to_kafka_source1) (3/18) 
(bc68320cf69dd877782417a3298499d6) switched from RUNNING to FAILED. 
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator 
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
 
at 
org.apache.flink.streaming.run

RE: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread VINAY.RAICHUR
Ok. Attached is the PPT of what am attempting to achieve w.r.t. time 

Hope I am all set to achieve the three bullets mentioned in attached slide to 
create reports with KafkaSource and KafkaBuilder approach. 

If you have any additional tips to share please do so after going through the 
slide attached (example for live dashboards use case) 

Kind regards
Vinay

-Original Message-
From: Aljoscha Krettek  
Sent: 13 January 2021 14:06
To: user@flink.apache.org
Cc: RAICHUR, VINAY 
Subject: Re: Flink to get historical data from kafka between timespan t1 & t2

On 2021/01/13 07:58, vinay.raic...@t-systems.com wrote:
>Not sure about your proposal regarding Point 3:
>*  firstly how is it ensured that the stream is closed? If I understand 
>the doc correctly the stream will be established starting with the 
>latest timestamp (hmm... is it not a standard behaviour?) and will 
>never finish (UNBOUNDED),

On the first question of standard behaviour: the default is to start from the 
group offsets that are available in Kafka. This uses the configured consumer 
group. I think it's better to be explicit, though, and specify sth like 
`EARLIEST` or `LATEST`, etc.

And yes, the stream will start but never stop with this version of the Kafka 
connector. Only when you use the new `KafkaSource` can you also specify an end 
timestamp that will make the Kafka source shut down eventually.

>*  secondly it is still not clear how to get the latest event  at a 
>given time point in the past?

You are referring to getting a single record, correct? I don't think this is 
possible with Flink. All you can do is get a stream from Kafka that is 
potentially bounded by a start timestamp and/or end timestamp.

Best,
Aljoscha


Positioning_Use_Cases_TrackingData_Past_Now.pptx
Description: Positioning_Use_Cases_TrackingData_Past_Now.pptx


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread Aljoscha Krettek

On 2021/01/13 12:07, vinay.raic...@t-systems.com wrote:

Ok. Attached is the PPT of what am attempting to achieve w.r.t. time

Hope I am all set to achieve the three bullets mentioned in attached 
slide to create reports with KafkaSource and KafkaBuilder approach.


If you have any additional tips to share please do so after going 
through the slide attached (example for live dashboards use case)


I think that should work with `KafkaSource`, yes. You just need to set 
the correct start timestamps and end timestamps, respectively. I believe 
that's all there is to it, off the top of my head I can't think of any 
additional tips.


Best,
Aljoscha


Re: Flink app logs to Elastic Search

2021-01-13 Thread Aljoscha Krettek

On 2021/01/11 01:29, bat man wrote:

Yes, no entries to the elastic search. No indices were created in elastic.
Jar is getting picked up which I can see from yarn logs. Pre-defined text
based logging is also available.


Hmm, I can't imagine much that could go wrong. Maybe there is some 
interference from other configuration files. Could you try and make sure 
that you only have the configuration and logging system in the classpath 
that you want to use?


Best,
Aljoscha


Re: Snowflake access through JDBC

2021-01-13 Thread Abhishek Rai
Thanks Roman, I ended up switching to the DataStream API and using
JdbcInputFormat like you suggested and that worked out fine.  Thanks!

On Fri, Dec 18, 2020 at 10:21 AM Khachatryan Roman
 wrote:
>
> Hello,
>
> Unfortunately, this driver is not currently supported by the Table API [1].
> You can implement a dialect for it [2] and construct JdbcTableSource [3] 
> manually.
>
> Alternatively, you can switch to the DataStream API and use JdbcInputFormat 
> [4] which doesn't require dialect.
>
> I'm also pulling in Jingson Li and Jark Wu as they might know better.
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.html
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.html
> [4] 
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcInputFormat.html
>
> Regards,
> Roman
>
>
> On Fri, Dec 18, 2020 at 4:55 PM Abhishek Rai  wrote:
>>
>> Hello,
>>
>> I'm trying to create a `StreamTableSource` for Snowflake using
>> `JdbcTableSourceSinkFactory.createStreamTableSource` (in package
>> org.apache.flink.connector.jdbc.table) but it fails with the following
>> error message due to `JdbcDialects` not having a dialect for
>> Snowflake.
>>
>> My goal is to fully read a Snowflake table through Flink.
>>
>> Is there any way to work around this?
>>
>> ```
>> java.lang.IllegalStateException: Cannot handle such jdbc url:
>> jdbc:snowflake://abc123.us-east-1.snowflakecomputing.com/?db=TEST
>>   at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>   at 
>> org.apache.flink.table.descriptors.JdbcValidator.validateCommonProperties(JdbcValidator.java:79)
>>   at 
>> org.apache.flink.table.descriptors.JdbcValidator.validate(JdbcValidator.java:64)
>>   at 
>> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory.getValidatedProperties(JdbcTableSourceSinkFactory.java:173)
>>   at 
>> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory.createStreamTableSource(JdbcTableSourceSinkFactory.java:138)
>> ```
>>
>> Thanks,
>> Abhishek


Publishing a table to Kafka

2021-01-13 Thread Abhishek Rai
Hello,

I'm using Flink 1.11.2 where I have a SQL backed `Table` which I'm
trying to write to Kafka.  I'm using `KafkaTableSourceSinkFactory`
which ends up instantiating a table sink of type `KafkaTableSink`.
Since this sink is an `AppendStreamTableSink`, I cannot write to it
using a generic table which might produce update/delete records.

What are my options to still write to Kafka?  I don't mind introducing
another boolean/etc field in the Kafka output records containing the
row kind or similar info.

The first workaround that I tried is to convert the table to a
DataStream, but that ran into a second issue as indicated below.  I'm
happy to consider other alternatives, potentially which can be
achieved at Table API level.

When converting to DataStream API, the `table.getSchema().toRowType()`
call below (`TableSchema.toRowType()`) may fail when the underlying
`DataType` is not convertible to a `TypeInformation`, e.g. I get the
following error:

```
TableException: Unsupported conversion from data type 'INTERVAL
SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
information. Only data types that originated from type information
fully support a reverse conversion.
```

Table to DataStream conversion and write to Kafka --
```
var rowType = table.getSchema().toRowType();
var kafkaRecordType = insertFieldAtIndex(
(RowTypeInfo)rowType, 0, "__row_kind", Types.BOOLEAN);
var outputStream =
tableEnvironment.toRetractStream(table, rowType).map(
new PrependChangeTypeToRowMapper(), kafkaRecordType);
var serializationSchema = JsonRowSerializationSchema.builder()
  .withTypeInfo(kafkaRecordType)
  .build();
var sink = new FlinkKafkaProducer<>(
kafkaTopic, serializationSchema, kafkaProperties);
outputStream.addSink(sink).name(sinkName);
```

Thanks,
Abhishek


Insufficient number of network buffers - rerun with min amount of required network memory

2021-01-13 Thread abelm
Hello!

I have a program which creates and runs a local Flink 1.12 environment. I
understand that, based on factors such as parallelism and the presence of
any process functions, the "Insufficient number of network buffers"
exception might pop up.

My plan is to catch this exception inside the main program and restart the
job with a new config in which 'taskmanager.memory.network.min' and
'taskmanager.memory.network.max' are set to a an equal value, greater than
that in the previous run, which would allow the pipeline to run
successfully.

My question is whether there exists a way to find out the exact min value of
bytes which should be put as the value for both the previously mentioned min
and max keys from the error message (specifically, the part stating:
"required [...], but only [...] available. The total number of network
buffers is currently set to [...] of [...] bytes each").

Should it not be possible to use the info from the error message in a
formula to determine this value, I'd probably just end up restarting the job
as many times as necessary instead of at most once and double the value of
'taskmanager.memory.network.min' and 'taskmanager.memory.network.max' for
every consecutive retry.

Apologies if this has been answered before. I have seen a fair few questions
regarding this error before, but none of them seemed to ask about this
specifically.

Thanks in advance for any help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Histogram has count data but not sum

2021-01-13 Thread Manish G
Hi All,

I have added histogram to code following here

.
But I observe that on prometheus board I get only count metrics, not sum.
Metrics itself is missing.
I have used classes:

com.codahale.metrics.Histogram
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper

What do I miss here?


Re: Histogram has count data but not sum

2021-01-13 Thread Chesnay Schepler

What exactly do you mean with "count metrics" and "sum"?

Given a Histogram H, you should see:
- one time series named H_count
- one time series named H, with various labels for the different 
quantiles.


What do you see in Prometheus, and what do you consider to be missing?

On 1/13/2021 4:10 PM, Manish G wrote:

Hi All,

I have added histogram to code following here 
.
But I observe that on prometheus board I get only count metrics, not 
sum. Metrics itself is missing.

I have used classes:
com.codahale.metrics.Histogram
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
What do I miss here?





Re: Histogram has count data but not sum

2021-01-13 Thread Manish G
So I an following this link
 too, to calculate
average request duration. As per this page, histogram metrics has _count,
_sum data. Though it is prometheus histogram, I expected flink histogram
too would provide me same.

On Wed, Jan 13, 2021 at 8:50 PM Chesnay Schepler  wrote:

> What exactly do you mean with "count metrics" and "sum"?
>
> Given a Histogram H, you should see:
> - one time series named H_count
> - one time series named H, with various labels for the different
> quantiles.
>
> What do you see in Prometheus, and what do you consider to be missing?
>
> On 1/13/2021 4:10 PM, Manish G wrote:
>
> Hi All,
>
> I have added histogram to code following here
> 
> .
> But I observe that on prometheus board I get only count metrics, not sum.
> Metrics itself is missing.
> I have used classes:
>
> com.codahale.metrics.Histogram
> org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
>
> What do I miss here?
>
>
>


Re: Dead code in ES Sink

2021-01-13 Thread Rex Fenley
>The actual field is never used but it will be used to check the allowed
options when verifying what users specify via "string" options.

Are you saying that this option does get passed along to Elasticsearch
still or that it's just arbitrarily validated? According to [1] it's been
deprecated in ES 6 and removed in ES 7.

[1] https://github.com/elastic/elasticsearch/pull/38085

On Wed, Jan 13, 2021 at 12:50 AM Aljoscha Krettek 
wrote:

> On 2021/01/12 15:04, Rex Fenley wrote:
> >[2]
> >
> https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java#L131
> >
> >Should [2] be removed?
>
> The link seems to not work anymore but I'm guessing you're referring to
> `CONNECTION_MAX_RETRY_TIMEOUT_OPTION`. This is used in the
> `*DynamicSinkFactory` classes, such as [1]. These can be used when
> defining Table API/SQL sources using DDL or the programmatic API. The
> actual field is never used but it will be used to check the allowed
> options when verifying what users specify via "string" options.
>
> [1]
>
> https://github.com/apache/flink/blob/ee653778689023ddfdf007d5bde1daad8fbbc081/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java#L98
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Histogram has count data but not sum

2021-01-13 Thread Chesnay Schepler
Flink Histograms are exposed as Summaries 
, since the 
quantiles are computed on the client-side (i.e., Flink side)


Flink Histograms cannot provide _sum data because we simply do not 
compute it, IIRC because Dropwizard Histograms also don't compute a sum 
and they are the only implementations to exist.


On 1/13/2021 4:24 PM, Manish G wrote:
So I an following this link 
 too, to calculate 
average request duration. As per this page, histogram metrics has 
_count, _sum data. Though it is prometheus histogram, I expected flink 
histogram too would provide me same.


On Wed, Jan 13, 2021 at 8:50 PM Chesnay Schepler > wrote:


What exactly do you mean with "count metrics" and "sum"?

Given a Histogram H, you should see:
- one time series named H_count
- one time series named H, with various labels for the
different quantiles.

What do you see in Prometheus, and what do you consider to be missing?

On 1/13/2021 4:10 PM, Manish G wrote:

Hi All,

I have added histogram to code following here

.
But I observe that on prometheus board I get only count metrics,
not sum. Metrics itself is missing.
I have used classes:
com.codahale.metrics.Histogram
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
What do I miss here?







StreamingFileSink with ParquetAvroWriters

2021-01-13 Thread Jan Oelschlegel
Hi,

i'm using Flink (v.1.11.2) and would like to use the StreamingFileSink for 
writing into HDFS in Parquet format.

As it says in the documentation I have added the dependencies:


   org.apache.flink
   flink-parquet_${scala.binary.version}
   ${flink.version}


And this is my file sink definition:


val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
new Path("hdfs://namenode.local:8020/user/datastream/"),
ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()


If I execute this in cluster I get the following error:

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)


Looks like there are some dependencies missing. How can I fix this?


Jan O.
HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


Re: state reset(lost) on TM recovery

2021-01-13 Thread Alexey Trenikhun
That is it ! - Protobuf compiler generates hashCodes functions which are not 
stable cross JVM restarts ([1]), this explains observed behavior. It is clear 
that stable hashCode is mandatory for KeyedProcessFunctions, but is it also 
requirement for MapState keys? Looks like rocksdb  backend first serialize key, 
so it is not effected by weirdness of protobuf hashCode, but what about 
filesystem backend?

[1] - https://groups.google.com/g/protobuf/c/MCk1moyWgIk


From: Chesnay Schepler 
Sent: Tuesday, January 12, 2021 2:20 AM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: state reset(lost) on TM recovery

Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() 
to ensure that key is same, but you are right in terms that it is scope 
related, the key is protobuf object and I specify custom TypeInformation in 
keyBy(), today I've changed code to use Tuple2 derived class instead of 
protobuf and it started to work, but why it is not working with protobuf and 
custom type information is unclear, checked serialize/deserialize - returns 
equal object, further until TM restarts it works. Is any special requirements 
for TypeSerializer and TypeInformation for key types ?


@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser parser = 
Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler 
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun ; Flink User Mail 
List 
Subject: Re: state reset(lost) on TM recovery

Just do double-check, are you aware that ValueState within a Keyed*Function is 
scoped to the key of the input element(s)? I.e., any stored value is only 
accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which 
reads from Kafka, transforms data and output into Kafka, one of processing 
nodes is KeyedCoProcessFunction with ValueState:

  1.  generated some input data, I see in log that state.update() is called and 
subsequent state.value() return not null
  2.  wait for checkpoint
  3.  restart taskmanager
  4.  state.value() returns null

I've tried to change backend from rocksdb to filesystem - same result, after 
taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey





error accessing S3 bucket 1.12

2021-01-13 Thread Billy Bain
I'm trying to use readTextFile() to access files in S3. I have verified the
s3 key and secret are clean and the s3 path is similar to
com.somepath/somefile. (the names changed to protect the guilty)

Any idea what I'm missing?

2021-01-13 12:12:43,836 DEBUG
org.apache.flink.streaming.api.functions.source.
ContinuousFileMonitoringFunction [] - Opened
ContinuousFileMonitoringFunction (taskIdx= 0) for path:
s3://com.somepath/somefile
2021-01-13 12:12:43,843 DEBUG org.apache.flink.fs.s3.common.
AbstractS3FileSystemFactory [] - Creating S3 file system backed by Hadoop
s3a file system
2021-01-13 12:12:43,844 DEBUG org.apache.flink.fs.s3.common.
AbstractS3FileSystemFactory [] - Loading Hadoop configuration for Hadoop
s3a file system
2021-01-13 12:12:43,926 DEBUG org.apache.flink.fs.s3hadoop.common.
HadoopConfigLoader [] - Adding Flink config entry for s3.access-key as
fs.s3a.access-key to Hadoop config
2021-01-13 12:12:43,926 DEBUG org.apache.flink.fs.s3hadoop.common.
HadoopConfigLoader [] - Adding Flink config entry for s3.secret-key as
fs.s3a.secret-key to Hadoop config
2021-01-13 12:12:43,944 DEBUG org.apache.flink.streaming.runtime.tasks.
StreamTask [] - Invoking Split Reader: Custom File Source -> (Timestamps/
Watermarks, Map -> Filter -> Sink: Unnamed) (1/1)#0
2021-01-13 12:12:43,944 DEBUG org.apache.flink.streaming.api.operators.
BackendRestorerProcedure [] - Creating operator state backend for
TimestampsAndWatermarksOperator_1cf40e099136da16c66c61032de62905_(1/1) with
empty state.
2021-01-13 12:12:43,946 DEBUG org.apache.flink.streaming.api.operators.
BackendRestorerProcedure [] - Creating operator state backend for
StreamSink_d91236bbbed306c2379eac4982246f1f_(1/1) with empty state.
2021-01-13 12:12:43,955 DEBUG org.apache.hadoop.conf.Configuration [] -
Reloading 1 existing configurations
2021-01-13 12:12:43,961 DEBUG org.apache.flink.fs.s3hadoop.
S3FileSystemFactory [] - Using scheme s3://com.somepath/somefile for s3a
file system backing the S3 File System
2021-01-13 12:12:43,965 DEBUG
org.apache.flink.streaming.api.functions.source.
ContinuousFileMonitoringFunction [] - Closed File Monitoring Source for
path: s3://com.somepath/somefile.
2021-01-13 12:12:43,967 WARN org.apache.flink.runtime.taskmanager.Task [] -
Source: Custom File Source (1/1)#0 (1d75ae07abbd65f296c55a61a400c59f)
switched from RUNNING to FAILED.
java.io.IOException: null uri host. This can be caused by unencoded / in
the password string
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(
AbstractS3FileSystemFactory.java:163)
~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293
a7d95166eee9a9b2329b71764cf67:?]
at org.apache.flink.core.fs.PluginFileSystemFactory.create(
PluginFileSystemFactory.java:61) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem
.java:468) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.functions.source.
ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:
196) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.lang.NullPointerException: null uri host. This can be
caused by unencoded / in the password string
at java.util.Objects.requireNonNull(Objects.java:246) ~[?:?]
at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(
S3xLoginHelper.java:69) ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-
293a7d95166eee9a9b2329b71764cf67:?]
at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293
a7d95166eee9a9b2329b71764cf67:?]
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:
234) ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293
a7d95166eee9a9b2329b71764cf67:?]
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(
AbstractS3FileSystemFactory.java:126)
~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293
a7d95166eee9a9b2329b71764cf67:?]
... 7 more



-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com


Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job

2021-01-13 Thread Kelly Smith
Hi folks,

I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM is 
crashing while cancelling a job. This is causing Kubernetes readiness probes to 
fail, the JM to be restarted, and then get in a bad state while it tries to 
recover itself using ZK + a checkpoint which no longer exists.

This is the only information being logged before the process exits:


 method: uncaughtException
   msg: FATAL: Thread 'cluster-io-thread-4' produced an uncaught exception. 
Stopping the process...
   pod: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
   stack: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool size 
= 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) 
at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
 at 
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
 at 
org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
 at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)


https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58


I’m not sure how to debug this further, but it seems like an internal Flink bug?

More info:


  *   Checkpoints are stored in S3 and I’m using the S3 connector
  *   Identical code has been running on Flink 1.11.x for months with no issues


Thanks,
Kelly


Flink Elasticsearch Async

2021-01-13 Thread Rex Fenley
Hello,

Looking at this documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-interval
it says

> The interval to flush buffered actions. Can be set to '0' to disable it.
Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can
be set to '0' with the flush interval set allowing for complete async
processing of buffered actions.

Does this imply that if either max size and max actions are set to not 0
that bulk indexing is not asynchronous?

We've been investigating socket time outs for some time now and are looking
for a way to fix. If things are synchronous that may be a contributing
factor.

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Elasticsearch config maxes can not be disabled

2021-01-13 Thread Rex Fenley
Hello,

It doesn't seem like we can disable max actions and max size for
Elasticsearch connector.

Docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
buffered actions per bulk request. Can be set to '0' to disable it.
sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory of
buffered actions per bulk request. Must be in MB granularity. Can be set to
'0' to disable it.
Reality:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Max number of buffered actions must be larger than 0.

ES code looks like -1 is actually the value for disabling, but when I use
-1:
Caused by: java.lang.IllegalArgumentException: Could not parse value '-1'
for key 'sink.bulk-flush.max-size'.

How can I disable these two settings?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: state reset(lost) on TM recovery

2021-01-13 Thread Chesnay Schepler

The FsStateBackend makes heavy use of hashcodes, so it must be stable.

On 1/13/2021 7:13 PM, Alexey Trenikhun wrote:
That is it ! - Protobuf compiler generates hashCodes functions which 
are not stable cross JVM restarts ([1]), this explains observed 
behavior. It is clear that stable hashCode is mandatory for 
KeyedProcessFunctions, but is it also requirement for MapState keys? 
Looks like rocksdb  backend first serialize key, so it is not effected 
by weirdness of protobuf hashCode, but what about filesystem backend?


[1] - https://groups.google.com/g/protobuf/c/MCk1moyWgIk


*From:* Chesnay Schepler 
*Sent:* Tuesday, January 12, 2021 2:20 AM
*To:* Alexey Trenikhun ; Flink User Mail List 


*Subject:* Re: state reset(lost) on TM recovery
Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:

Hello,

Yes, I'm aware, and I used elements with same key, and logged 
getCurrentKey() to ensure that key is same, but you are right in 
terms that it is scope related, the key is protobuf object and I 
specify custom TypeInformation in keyBy(), today I've changed code to 
use Tuple2 derived class instead of protobuf and it started to work, 
but why it is not working with protobuf and custom type information 
is unclear, checked serialize/deserialize - returns equal object, 
further until TM restarts it works. Is any special requirements 
for TypeSerializer and TypeInformation for key types ?


@Override public void serialize(T t, DataOutputView dataOutputView)throws 
IOException {
   final int serializedSize = t.getSerializedSize(); 
dataOutputView.writeInt(serializedSize);   final byte[] data =new 
byte[serializedSize]; t.writeTo(CodedOutputStream.newInstance(data)); 
dataOutputView.write(data); }

@Override public T deserialize(DataInputView dataInputView)throws IOException {
   final int serializedSize = dataInputView.readInt();   final 
com.google.protobuf.Parser parser = 
Unchecked.cast(prototype.getParserForType());   final byte[] data =new 
byte[serializedSize]; dataInputView.read(data);   return 
parser.parseFrom(CodedInputStream.newInstance(data)); }



*From:* Chesnay Schepler  
*Sent:* Monday, January 11, 2021 4:36 PM
*To:* Alexey Trenikhun  ; 
Flink User Mail List  


*Subject:* Re: state reset(lost) on TM recovery
Just do double-check, are you aware that ValueState within a 
Keyed*Function is scoped to the key of the input element(s)? I.e., 
any stored value is only accessible if an element with the same key 
is processed?


On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:

Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming 
job which reads from Kafka, transforms data and output into Kafka, 
one of processing nodes is KeyedCoProcessFunction with ValueState:


 1. generated some input data, I see in log that state.update() is
called and subsequent state.value() return not null
 2. wait for checkpoint
 3. restart taskmanager
 4. state.value() returns null

I've tried to change backend from rocksdb to filesystem - same 
result, after taskmanager restart state.value() returns null


Any ideas, what could cause resetting state to null?

Thanks,
Alexey









Re: state reset(lost) on TM recovery

2021-01-13 Thread Alexey Trenikhun
Ok, thanks.


From: Chesnay Schepler 
Sent: Wednesday, January 13, 2021 11:46:15 AM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: state reset(lost) on TM recovery

The FsStateBackend makes heavy use of hashcodes, so it must be stable.

On 1/13/2021 7:13 PM, Alexey Trenikhun wrote:
That is it ! - Protobuf compiler generates hashCodes functions which are not 
stable cross JVM restarts ([1]), this explains observed behavior. It is clear 
that stable hashCode is mandatory for KeyedProcessFunctions, but is it also 
requirement for MapState keys? Looks like rocksdb  backend first serialize key, 
so it is not effected by weirdness of protobuf hashCode, but what about 
filesystem backend?

[1] - https://groups.google.com/g/protobuf/c/MCk1moyWgIk


From: Chesnay Schepler 
Sent: Tuesday, January 12, 2021 2:20 AM
To: Alexey Trenikhun ; Flink User Mail 
List 
Subject: Re: state reset(lost) on TM recovery

Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() 
to ensure that key is same, but you are right in terms that it is scope 
related, the key is protobuf object and I specify custom TypeInformation in 
keyBy(), today I've changed code to use Tuple2 derived class instead of 
protobuf and it started to work, but why it is not working with protobuf and 
custom type information is unclear, checked serialize/deserialize - returns 
equal object, further until TM restarts it works. Is any special requirements 
for TypeSerializer and TypeInformation for key types ?


@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser parser = 
Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler 
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun ; Flink User Mail 
List 
Subject: Re: state reset(lost) on TM recovery

Just do double-check, are you aware that ValueState within a Keyed*Function is 
scoped to the key of the input element(s)? I.e., any stored value is only 
accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which 
reads from Kafka, transforms data and output into Kafka, one of processing 
nodes is KeyedCoProcessFunction with ValueState:

  1.  generated some input data, I see in log that state.update() is called and 
subsequent state.value() return not null
  2.  wait for checkpoint
  3.  restart taskmanager
  4.  state.value() returns null

I've tried to change backend from rocksdb to filesystem - same result, after 
taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey






Interpretting checkpoint data size

2021-01-13 Thread Rex Fenley
Hello,

I have incremental checkpoints turned on and there seems to be no relation
at all to how often the job checkpoints and how much data exists. Whether
checkpoints are set to every 1 min or every 5 seconds they're still around
5 GB in size and checkpoint times are still in minutes. I would expect that
if the system only runs for 5s then it would have significantly less data
to checkpoint than if it runs for 1 min.

Would someone mind clarifying the meaning of checkpoint data size when
incremental checkpoints are turned on? Possibly I'm misinterpreting it.

Thank you!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Interpretting checkpoint data size

2021-01-13 Thread Rex Fenley
One thing that did just come to mind is possibly every time I'm submitting
a job from a previous checkpoint with different settings, it has to slowly
re-checkpoint all the previous data. Which means there would be some warm
up time before things functioned in a steady state. Is this possible?

On Wed, Jan 13, 2021 at 6:09 PM Rex Fenley  wrote:

> Hello,
>
> I have incremental checkpoints turned on and there seems to be no relation
> at all to how often the job checkpoints and how much data exists. Whether
> checkpoints are set to every 1 min or every 5 seconds they're still around
> 5 GB in size and checkpoint times are still in minutes. I would expect that
> if the system only runs for 5s then it would have significantly less data
> to checkpoint than if it runs for 1 min.
>
> Would someone mind clarifying the meaning of checkpoint data size when
> incremental checkpoints are turned on? Possibly I'm misinterpreting it.
>
> Thank you!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Flink[Python] questions

2021-01-13 Thread Dc Zhao (BLOOMBERG/ 120 PARK)
Hi Flink Community:
We are using the pyflink to develop a POC for our project. We encountered some 
questions while using the flink.

We are using the flink version 1.2, python3.7, data stream API

1. Do you have examples of providing a python customized class as a `result 
type`? Based on the documentation research, we found out only built-in types 
are supported in Python. Also, what is the payload size limitation inside the 
flink, do we have a recommendation for that?

2. Do you have examples of `flink run --python` data stream API codes to the 
cluster? We tried to do that, however the process hangs on a `socket read from 
the java gateway`, due to the lack of the missing logs, we are not sure what is 
missing while submitting the job.


Regards
Dc


<< {CH} {TS} Anything that can possibly go wrong, it does. >>

Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread sagar
Hi Team,

I am getting the following error while running DataStream API in with batch
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink
1.12

I don't want to use JDBC source as underlying database table may change.
please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to
achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread Ardhani Narasimha
Interesting use case.

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

> Hi Team,
>
> I am getting the following error while running DataStream API in with
> batch mode with kafka source.
> I am using FlinkKafkaConsumer to consume the data.
>
> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
> ~[flink-core-1.12.0.jar:1.12.0]
>
> In my batch program I wanted to work with four to five different stream in
> batch mode as data source is bounded
>
> I don't find any clear example of how to do it with kafka souce with Flink
> 1.12
>
> I don't want to use JDBC source as underlying database table may change.
> please give me some example on how to achieve the above use case.
>
> Also for any large bounded source are there any alternatives to
> achieve this?
>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>

-- 
---
**IMPORTANT**: The contents of this email and any attachments are 
confidential and protected by applicable laws. If you have received this 
email by mistake, please (i) notify the sender immediately; (ii) delete it 
from your database; and (iii) do not disclose the contents to anyone or 
make copies thereof. Razorpay accepts no liability caused due to any 
inadvertent/ unintentional data transmitted through this email.
---


Re: Statement Sets

2021-01-13 Thread Jark Wu
No. The Kafka reader will be shared, that means Kafka data is only be read
once.

On Tue, 12 Jan 2021 at 03:04, Aeden Jameson  wrote:

> When using statement sets, if two select queries use the same table
> (e.g. Kafka Topic), does each query get its own copy of data?
>
> Thank you,
> Aeden
>


Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2021-01-13 Thread Jark Wu
Hi Dan,

Sorry for the late reply.

I guess you applied a "deduplication with keeping last row" before the
interval join?
That will produce an updating stream and interval join only supports
append-only input.
You can try to apply "deduplication with keeping *first* row" before the
interval join.
That should produce an append-only stream and interval join can consume
from it.

Best,
Jark



On Tue, 5 Jan 2021 at 20:07, Arvid Heise  wrote:

> Hi Dan,
>
> Which Flink version are you using? I know that there has been quite a bit
> of optimization of deduplication in 1.12, which would reduce the required
> state tremendously.
> I'm pulling in Jark who knows more.
>
> On Thu, Dec 31, 2020 at 6:54 AM Dan Hill  wrote:
>
>> Hi!
>>
>> I'm using Flink SQL to do an interval join.  Rows in one of the tables
>> are not unique.  I'm fine using either the first or last row.  When I try
>> to deduplicate
>> 
>>  and
>> then interval join, I get the following error.
>>
>> IntervalJoin doesn't support consuming update and delete changes which is
>> produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts
>> ASC], select=[platform_id, user_id, log_user_id, client_log_ts,
>> event_api_ts, ts])
>>
>> Is there a way to combine these in this order?  I could do the
>> deduplication afterwards but this will result in more state.
>>
>> - Dan
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread Yun Gao
Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---

Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread sagar
Hi Ardhani,

So whenever I want to run this flink job, I will call the Java API to put
the data to the four different kafka topics, what data to put into kafka
will be coded into those API and then once that is complete, I want to run
the flink job on the available data in the kafka and perform business
operation on all the available data.

I am not sure whether kafka as a datasource will be best for this use case,
but somehow I don't want to expose my flink job to database tables
directly.



Thanks & Regards,
Sagar


On Thu, Jan 14, 2021 at 12:41 PM Ardhani Narasimha <
ardhani.narasi...@razorpay.com> wrote:

> Interesting use case.
>
> Can you please elaborate more on this.
> On what criteria do you want to batch? Time? Count? Or Size?
>
> On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:
>
>> Hi Team,
>>
>> I am getting the following error while running DataStream API in with
>> batch mode with kafka source.
>> I am using FlinkKafkaConsumer to consume the data.
>>
>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>> ~[flink-core-1.12.0.jar:1.12.0]
>>
>> In my batch program I wanted to work with four to five different stream
>> in batch mode as data source is bounded
>>
>> I don't find any clear example of how to do it with kafka souce with
>> Flink 1.12
>>
>> I don't want to use JDBC source as underlying database table may change.
>> please give me some example on how to achieve the above use case.
>>
>> Also for any large bounded source are there any alternatives to
>> achieve this?
>>
>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>
>
> ---
> *IMPORTANT*: The contents of this email and any attachments are
> confidential and protected by applicable laws. If you have received this
> email by mistake, please (i) notify the sender immediately; (ii) delete it
> from your database; and (iii) do not disclose the contents to anyone or
> make copies thereof. Razorpay accepts no liability caused due to any
> inadvertent/ unintentional data transmitted through this email.
>
> ---
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: StreamingFileSink with ParquetAvroWriters

2021-01-13 Thread Yun Gao
Hi Jan,

Could you have a try by adding this dependency ?


   org.apache.parquet
   parquet-avro
   1.11.1

 



Best,
 Yun


 --Original Mail --
Sender:Jan Oelschlegel 
Send Date:Thu Jan 14 00:49:30 2021
Recipients:user@flink.apache.org 
Subject:StreamingFileSink with ParquetAvroWriters

Hi,
i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for 
writing into HDFS in Parquet format.

As it says in the documentation I have added the dependencies:


   org.apache.flink
   flink-parquet_${scala.binary.version}
   ${flink.version}


And this is my file sink definition:

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
new Path("hdfs://namenode.local:8020/user/datastream/"),
ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()


If I execute this in cluster I get the following error:

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)


Looks like there are some dependencies missing. How can I fix this?


Jan O.HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.