Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-09 Thread DONG, Weike
Hi,

>From the Flink 1.10  official document (
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html),
we could see that GROUPING SETS is only supported in Batch mode.

[image: image.png]

However, we also found that in
https://issues.apache.org/jira/browse/FLINK-12192, stream query using
GROUPING SETS and CUBE is already a fixed issues in 1.9.

Here I would like to know if SQL support for GROUPING SETS, ROLLUP, CUBE is
ready for use or not, and whether the document needs to be updated or not.

Thank you

Best regards,
Weike


Re: Backpressure and 99th percentile latency

2020-03-09 Thread Felipe Gutierrez
Indeed, it is a bit tricky to understand the relation between
floatingBuffersUsage, exclusiveBuffersUsage. I am reading again that
table on (https://flink.apache.org/2019/07/23/flink-network-stack-2.html)
but I guess I can rely on the latency metric that I implemented on my
operator (not the default latency tracking from Flink).

Thanks for the insight points!
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Mar 7, 2020 at 4:36 PM Zhijiang  wrote:
>
> Thanks for the feedback Felipe!
> Regarding with your below concern:
>
> > Although I think it is better to use outPoolUsage and inPoolUsage according 
> > to [1]. However, in your opinion is it better (faster to see) to use 
> > inputQueueLength and
> > outputQueueLength or outPoolUsage and inPoolUsage to monitor a consequence 
> > of backpressure? I mean, is there a faster way to show that the latency 
> > increased due to
> > backpressure? Maybe if I create my own metric on my own operator or udf?
>
> The blog [1] already gave a great explanation of network stack for users in 
> general and I also have the consensus on this issue.
> In particular,I can provide some further notes for your understanding.
>
> 1. It is not easy for users to get the precise total amount of input & output 
> buffers, so we are not aware of whether the input & output buffers are 
> exhausted and backpressure is happened from the metrics of 
> input&outputQueueLength. In contrast, we can know easily that input & 
> outputPoolUsage should both reach 100% once backpressure happening.
>
> 2. The inputPoolUsage has the different semantic from release-1.9. Before 1.9 
> this metric is only for measuring the usage of floating buffers. But from 1.9 
> it also covers the usage of exclusive buffers. That means from 1.9 you might 
> see the inputPoolUsage far from 100% when backpressure happens especially in 
> the data skew case, but the inputFloatingBufferUsage should be 100% instead.
>
> 3. The latency marker provided by flink framework is emitted to a random 
> channel (non-broadcast) every time because of performance concern. So it is 
> hard to say whether it is measuring the heavy-load channel or lightweight 
> channel in short while, especially in data skew scenario.
>
> 4. In theory the latency should be increased along with the trend of 
> increased input&outputQueueLength and input&outputPoolUsage. All of them 
> should be proportional to have the same trend in most cases.
>
> Best,
> Zhijiang
>
>
>
> --
> From:Felipe Gutierrez 
> Send Time:2020 Mar. 7 (Sat.) 18:49
> To:Arvid Heise 
> Cc:Zhijiang ; user 
> Subject:Re: Backpressure and 99th percentile latency
>
> Hi,
> I implemented my own histogram metric on my operator to measure the
> latency. The latency is following the throughput at the same pace now.
> The figures are attached.
>
> Best,
> Felipe
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Mar 6, 2020 at 9:38 AM Felipe Gutierrez
>  wrote:
> >
> > Thanks for the clarified answer @Zhijiang, I am gonna monitor
> > inputQueueLength and outputQueueLength to check some relation with
> > backpressure. Although I think it is better to use outPoolUsage and
> > inPoolUsage according to [1].
> > However, in your opinion is it better (faster to see) to use
> > inputQueueLength and outputQueueLength or outPoolUsage and inPoolUsage
> > to monitor a consequence of backpressure? I mean, is there a faster
> > way to show that the latency increased due to backpressure? Maybe if I
> > create my own metric on my own operator or udf?
> >
> > Thanks @Arvid. In the end I want to be able to hold SLAs. For me, the
> > SLA would be the minimum latency. If I understood correctly, in the
> > time that I started to have backpressure the latency track metrics are
> > not a very precise indication of how much backpressure my application
> > is suffering. It just indicates that there is backpressure.
> > What would you say that is more less precise metric to tune the
> > throughput in order to not have backpressure. Something like, if I
> > have 50,000 milliseconds of latency and the normal latency is 150
> > milliseconds, the throughput has to decrease by a factor of 50,000/150
> > times.
> >
> > Just a note. I am not changing the throughput of the sources yet. I am
> > changing the size of the window without restart the job. But I guess
> > they have the same meaning for this question.
> >
> > [1] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> >
> > On Fri, Mar 6, 2020 at 8:17 AM Arvid Heise  wrote:
> > >
> > > Hi Felipe,
> > >
> > > latency under backpressure has to b

How to change the flink web-ui jobServer?

2020-03-09 Thread LakeShen
Hi community,
   now I am moving the flink job to k8s,and I plan to use the ingress
to show the flink web ui  , the problem is that fink job server aren't
correct, so I want to change the flink web-ui jobserver ,I don't find the
any method  to change it ,are there some method to do that?
   Thanks to your reply.

Best wishes,
LakeShen


Re: History server UI not working

2020-03-09 Thread pwestermann
Hey Robert,

I just tried Flink 1.10 and the history server UI works for me too. Only
Flink 1.9.2 is not loading.
Since we were already looking into upgrading to 1.10, I might just do that
now.

Thanks,
Peter 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Kafka sink only support append mode?

2020-03-09 Thread wangl...@geekplus.com.cn
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query 
sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei




Re: Kafka sink only support append mode?

2020-03-09 Thread Jark Wu
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode
(e.g. upsert mode / retract mode) is on the agenda.
For now, you can customize a KafkaTableSink with implementing
UpsertStreamTableSink interface, where you will get a Tuple2
records,
and the Boolean represents insert or delete operation. Then you can encode
the insert/delete operation into Kafka storage or just ignore the
operations.

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> I  wrote a simple program reading from kafka using  sql  and sink to kafka.
> But only  'update-mode' = 'append' is supported for sink table and the
> query sql must have no group statement.
> Only append mode is supported for kafka sink?
>
> Thanks,
> Lei
>
>
>


How does custom object/data structures get mapped into rocksdb underneath?

2020-03-09 Thread kant kodali
Hi All,

I want to do stateful streaming and I was wondering how Custom objects get
mapped into rocksdb?

say I have the following class that represents my state

public class MyState {
private HashMap map1 ; // T can be any type
private HashMap map2; // S can be any type
}

I wonder how these two maps gets mapped into rocksdb? and how does Flink
know that map1 and map2 together are part of my state but not
individual ones in isolation?

Thanks!


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-09 Thread Robert Metzger
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC
stream), and you would like to maintain the inventory of the last 15 days
for each vendor.
Whenever there's an update in the inventory data (a new event arrives in
the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's
state to have an accurate count and to drop old records when they are
expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as
long as your state fits on all your machines hard disks (we'll probably not
have an issue there :) )
If you run into performance issues, you can consider switching to a memory
based backend (by then, you should have some knowledge about your state
size)

For tracking the events, I would recommend you to look into Flink's
windowing API:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
 / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
Or alternatively doing an implementation with ProcessFunction:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it
in Flink. This is an example for reading from a DynamoDB stream:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582

For writing to DynamoDB there is currently no official sink in Flink. It
should be fairly straightforward to implement a Sink using the SinkFunction
interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu  wrote:

> Hi flink users,
>
> We have a problem and think flink may be a good solution for that. But I'm
> new to flink and hope can get some insights from flink community :)
>
> Here is the problem. Suppose we have a DynamoDB table which store the
> inventory data, the schema is like:
>
> * vendorId (primary key)
> * inventory name
> * inventory units
> * inbound time
> ...
>
> This DDB table keeps changing, since we have inventory coming and removal. 
> *Every
> change will trigger a DynamoDB stream. *
> We need to calculate *all the inventory units that > 15 days for a
> specific vendor* like this:
> > select vendorId, sum(inventory units)
> > from dynamodb
> > where today's time - inbound time > 15
> > group by vendorId
> We don't want to schedule a daily batch job, so we are trying to work on a
> micro-batch solution in Flink, and publish this data to another DynamoDB
> table.
>
> A draft idea is to use the total units minus <15 days units, since both of
> then have event trigger. But no detailed solutions yet.
>
> Could anyone help provide some insights here?
>
> Thanks,
> J.
>


Re: How does custom object/data structures get mapped into rocksdb underneath?

2020-03-09 Thread Aljoscha Krettek

Hi,

when working with state you have to create a state descriptor ([1]). 
This will have a TypeInformation or TypeSerializer that Flink uses to 
(de)serialize your state. This will be used to serialize your state to 
bytes, in the case of RocksDB we serialize both the key (the key of the 
record that is used to key the stream) and the value (your state object, 
MyState in you example) to bytes and store them in the RocksDB key-value 
store.


Best,
Aljoscha

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html


On 09.03.20 12:27, kant kodali wrote:

Hi All,

I want to do stateful streaming and I was wondering how Custom objects get
mapped into rocksdb?

say I have the following class that represents my state

public class MyState {
 private HashMap map1 ; // T can be any type
 private HashMap map2; // S can be any type
}

I wonder how these two maps gets mapped into rocksdb? and how does Flink
know that map1 and map2 together are part of my state but not
individual ones in isolation?

Thanks!



Re: Writing a DataSet to ElasticSearch

2020-03-09 Thread Robert Metzger
Hey Niels,

For the OOM problem: Did you try RocksDB?

I don't think there's an ES OutputFormat.

I guess there's no way around implementing your own OutputFormat for ES, if
you want to use the DataSet API. It should not be too hard to implement.


On Sun, Mar 1, 2020 at 1:42 PM Niels Basjes  wrote:

> Hi,
>
> I have a job in Flink 1.10.0 which creates data that I need to write to
> ElasticSearch.
> Because it really is a Batch (and doing it as a stream keeps giving OOM
> problems: big + unordered + groupby) I'm trying to do it as a real batch.
>
> To write a DataSet to some output (that is not a file) an OutputFormat
> implementation is needed.
>
> public DataSink output(OutputFormat outputFormat)
>
> The problem I have is that I have not been able to find a "OutputFormat"
> for ElasticSearch.
> Adding ES as a Sink to a DataStream is trivial because a Sink is provided
> out of the box.
>
> The only alternative I came up with is to write the output of my batch to
> a file and then load that (with a stream) into ES.
>
> What is the proper solution?
> Is there an OutputFormat for ES I can use that I overlooked?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>


Re: Very large _metadata file

2020-03-09 Thread Congxian Qiu
Hi

As Gordon said, the metadata will contain the ByteStreamStateHandle, when
writing out the ByteStreamStateHandle, will write out the handle name --
which is a path(as you saw). The ByteStreamStateHandle will be created when
state size is small than `state.backend.fs.memory-threshold`(default is
1024).

If you want to verify this, you can ref the unit test
`CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
metadata, you can find out that there are many `ByteStreamStateHandle`, and
their names are the strings you saw in the metadata.

Best,
Congxian


Jacob Sevart  于2020年3月6日周五 上午3:57写道:

> Thanks, I will monitor that thread.
>
> I'm having a hard time following the serialization code, but if you know
> anything about the layout, tell me if this makes sense. What I see in the
> hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
> Then finally another HDFS path at the end.
>
> If it is putting state in there, under normal circumstances, does it make
> sense that it would be interleaved with metadata? I would expect all the
> metadata to come first, and then state.
>
> Jacob
>
>
>
> Jacob
>
> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas  wrote:
>
>> Hi Jacob,
>>
>> As I said previously I am not 100% sure what can be causing this
>> behavior, but this is a related thread here:
>>
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=
>>
>> Which you can re-post your problem and monitor for answers.
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>> >
>> > Kostas and Gordon,
>> >
>> > Thanks for the suggestions! I'm on RocksDB. We don't have that setting
>> configured so it should be at the default 1024b. This is the full "state.*"
>> section showing in the JobManager UI.
>> >
>> >
>> >
>> > Jacob
>> >
>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai 
>> wrote:
>> >>
>> >> Hi Jacob,
>> >>
>> >> Apart from what Klou already mentioned, one slightly possible reason:
>> >>
>> >> If you are using the FsStateBackend, it is also possible that your
>> state is small enough to be considered to be stored inline within the
>> metadata file.
>> >> That is governed by the "state.backend.fs.memory-threshold"
>> configuration, with a default value of 1024 bytes, or can also be
>> configured with the `fileStateSizeThreshold` argument when constructing the
>> `FsStateBackend`.
>> >> The purpose of that threshold is to ensure that the backend does not
>> create a large amount of very small files, where potentially the file
>> pointers are actually larger than the state itself.
>> >>
>> >> Cheers,
>> >> Gordon
>> >>
>> >>
>> >>
>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
>> wrote:
>> >>>
>> >>> Hi Jacob,
>> >>>
>> >>> Could you specify which StateBackend you are using?
>> >>>
>> >>> The reason I am asking is that, from the documentation in [1]:
>> >>>
>> >>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>> >>> state will be stored in the _metadata file. Since it is
>> >>> self-contained, you may move the file and restore from any location."
>> >>>
>> >>> I am also cc'ing Gordon who may know a bit more about state formats.
>> >>>
>> >>> I hope this helps,
>> >>> Kostas
>> >>>
>> >>> [1]
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
>> >>>
>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>> >>> >
>> >>> > Per the documentation:
>> >>> >
>> >>> > "The meta data file of a Savepoint contains (primarily) pointers to
>> all files on stable storage that are part of the Savepoint, in form of
>> absolute paths."
>> >>> >
>> >>> > I somehow have a _metadata file that's 1.9GB. Running strings on it
>> I find 962 strings, most of which look like HDFS paths, which leaves a lot
>> of that file-size unexplained. What else is in there, and how exactly could
>> this be happening?
>> >>> >
>> >>> > We're running 1.6.
>> >>> >
>> >>> > Jacob
>> >
>> >
>> >
>> > --
>> > Jacob Sevart
>> > Software Engineer, Safety
>>
>
>
> --
> Jacob Sevart
> Software Engineer, Safety
>


Re: Batch Flink Job S3 write performance vs Spark

2020-03-09 Thread Robert Metzger
Hey,

I don't think there will be a big performance difference. Both systems have
many users writing data to S3, so this will be optimized for both.

On Tue, Feb 25, 2020 at 6:03 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> have a question did anyone compared the performance of Flink batch job
> writing to s3 vs spark writing to s3?
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: Understanding n LIST calls as part of checkpointing

2020-03-09 Thread Piyush Narang
Hi Yun,

Thanks for getting back. We’re on a fork of Flink 1.9 (basically 1.9 with some 
backported fixes from 1.10 and a couple of minor patches) - 
https://github.com/criteo-forks/flink/tree/criteo-1.9
I’ll check the jira + fix and see if there’s something that was potentially 
missed.

-- Piyush


From: Yun Tang 
Date: Sunday, March 8, 2020 at 11:05 PM
To: Piyush Narang , user 
Subject: Re: Understanding n LIST calls as part of checkpointing

Hi Piyush

Which version of Flink do you use? After Flink-1.5, Flink would not call any 
"List" operation on checkpoint side with FLINK-8540 [1]. The only left "List" 
operation would be used when reading files in file input format. In a nut 
shell, these "List" calls should not come from Flink if you're using Flink-1.5+


[1] 
https://issues.apache.org/jira/browse/FLINK-8540

Best
Yun Tang


From: Piyush Narang 
Sent: Saturday, March 7, 2020 6:15
To: user 
Subject: Understanding n LIST calls as part of checkpointing


Hi folks,



I was trying to debug a job which was taking 20-30s to checkpoint data to Azure 
FS (compared to typically < 5s) and as part of doing so, I noticed something 
that I was trying to figure out a bit better.

Our checkpoint path is as follows: 
my_user/featureflow/foo-datacenter/cluster_name/my_flink_job/checkpoint/chk-1234



What I noticed was that while trying to take checkpoints (incremental using 
rocksDB) we make a number of List calls to Azure:

my_user/featureflow/foo-datacenter/cluster_name/my_flink_job/checkpoint

my_user/featureflow/foo-datacenter/cluster_name/my_flink_job

my_user/featureflow/foo-datacenter/cluster_name

my_user/featureflow/foo-datacenter

my_user/featureflow

my_user



Each of these calls takes a few seconds and all of them seem to add up to make 
our checkpoint take time. The part I was hoping to understand on the Flink side 
was whether the behavior of making these List calls for each parent ‘directory’ 
/ blob all the way to the top was normal / expected?



We are exploring a couple of other angles on our end (potentially flattening 
the directory / blob structure to reduce the number of these calls, is the 
latency on the Azure side expected), but along with this I was hoping to 
understand if this behavior on the Flink side is expected / if there’s 
something which we could optimize as well.



Thanks,



-- Piyush




Re: Alink and Flink ML

2020-03-09 Thread Marta Paes Moreira
Hi, Flavio.

Indeed, Becket is the best person to answer this question, but as far as I
understand the idea is that Alink will be contributed back to Flink in the
form of a refactored Flink ML library (sitting on top of the Table API)
[1]. You can follow the progress of these efforts by tracking FLIP-39 [2].

[1] https://developpaper.com/why-is-flink-ai-worth-looking-forward-to/
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs

On Tue, Mar 3, 2020 at 2:02 PM Gary Yao  wrote:

> Hi Flavio,
>
> I am looping in Becket (cc'ed) who might be able to answer your question.
>
> Best,
> Gary
>
> On Tue, Mar 3, 2020 at 12:19 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> since Alink has been open sourced, is there any good reason to keep both
>> Flink ML and Alink?
>> From what I understood Alink already contains the best ML implementation
>> available for Flink..am I wrong?
>> Maybe it could make sense to replace the current Flink ML with that of
>> Alink..or is that impossible?
>>
>> Cheers,
>> Flavio
>>
>


Re: Flink Session Windows State TTL

2020-03-09 Thread Robert Metzger
Hey Karl,

sorry for the late reply!

Let me first quickly answer your questions:

>
>- are expired session windows automatically removed from state? if
>not, what's the best way to do it?
>
> Yes, they should get removed automatically.

>
>-
>- how can we query state size?
>
>
You can monitor the state size in the Flink web ui (there's a
"Checkpointing" tab for each job)
Or through Flink's metrics system:
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#checkpointing


>
>-
>- how can we query number of windows in state?
>
> The window operator does not expose any metrics.

I also have some questions :)
- Have you considered using the RocksDB statebackend to mitigate the out of
memory issues?
- Why are you disabling the operator chaining?
- Did you validate that the "TimeZone.setDefault(...)" setting ends up at
the worker JVMs executing your code? (I suspect that you are only setting
the TimeZone in the JVM executing the main() method)

Best,
Robert



On Tue, Mar 3, 2020 at 7:23 PM karl.pullicino <
karl.pullic...@gamesysgroup.com> wrote:

> Added  flink_oom_exception.txt
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt>
>
> as originally forgot to attach it
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Alink and Flink ML

2020-03-09 Thread Flavio Pompermaier
Thanks Marta for the clarification!

On Mon, Mar 9, 2020 at 3:26 PM Marta Paes Moreira 
wrote:

> Hi, Flavio.
>
> Indeed, Becket is the best person to answer this question, but as far as I
> understand the idea is that Alink will be contributed back to Flink in the
> form of a refactored Flink ML library (sitting on top of the Table API)
> [1]. You can follow the progress of these efforts by tracking FLIP-39 [2].
>
> [1] https://developpaper.com/why-is-flink-ai-worth-looking-forward-to/
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
>
> On Tue, Mar 3, 2020 at 2:02 PM Gary Yao  wrote:
>
>> Hi Flavio,
>>
>> I am looping in Becket (cc'ed) who might be able to answer your question.
>>
>> Best,
>> Gary
>>
>> On Tue, Mar 3, 2020 at 12:19 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> since Alink has been open sourced, is there any good reason to keep both
>>> Flink ML and Alink?
>>> From what I understood Alink already contains the best ML implementation
>>> available for Flink..am I wrong?
>>> Maybe it could make sense to replace the current Flink ML with that of
>>> Alink..or is that impossible?
>>>
>>> Cheers,
>>> Flavio
>>>
>>


Re: Flink Session Windows State TTL

2020-03-09 Thread Robert Metzger
Sorry, I pressed the send button too fast.

You also attached an exception to the email, which reads as follows:

Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
Could not fulfill slot request c28970b7cd4f68383e242703bdac81ca.
Requested resource profile (ResourceProfile{cpuCores=-1.0,
heapMemoryInMB=-1, directMemoryInMB=-1, nativeMemoryInMB=-1,
networkMemoryInMB=-1, managedMemoryInMB=-1}) is unfulfillable.

This exception does not indicate that you are running out of memory at
runtime, rather that your operator can not be scheduled anymore.
Can you see if enabling the operator chaining again solves the problem?

Also, how are you deploying Flink?


On Mon, Mar 9, 2020 at 3:30 PM Robert Metzger  wrote:

> Hey Karl,
>
> sorry for the late reply!
>
> Let me first quickly answer your questions:
>
>>
>>- are expired session windows automatically removed from state? if
>>not, what's the best way to do it?
>>
>> Yes, they should get removed automatically.
>
>>
>>-
>>- how can we query state size?
>>
>>
> You can monitor the state size in the Flink web ui (there's a
> "Checkpointing" tab for each job)
> Or through Flink's metrics system:
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#checkpointing
>
>
>>
>>-
>>- how can we query number of windows in state?
>>
>> The window operator does not expose any metrics.
>
> I also have some questions :)
> - Have you considered using the RocksDB statebackend to mitigate the out
> of memory issues?
> - Why are you disabling the operator chaining?
> - Did you validate that the "TimeZone.setDefault(...)" setting ends up at
> the worker JVMs executing your code? (I suspect that you are only setting
> the TimeZone in the JVM executing the main() method)
>
> Best,
> Robert
>
>
>
> On Tue, Mar 3, 2020 at 7:23 PM karl.pullicino <
> karl.pullic...@gamesysgroup.com> wrote:
>
>> Added  flink_oom_exception.txt
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt>
>>
>> as originally forgot to attach it
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink Serialization as stable (kafka) output format?

2020-03-09 Thread Robert Metzger
Hi Theo,

However, in most benchmarks, avro turns out to be rather slow in terms of
> CPU cycles ( e.g. [1]  )


Avro is slower compared to what?
You should not only benchmark the CPU cycles for serializing the data. If
you are sending JSON strings across the network, you'll probably have a lot
more bytes to send across the network, making everything slower (usually
network is slower than CPU)

One of the reasons why people use Avro it supports schema evolution.

Regarding your questions:
1. For this use case, you can use the Flink data format as an internal
message format (between the star architecture jobs)
2. Generally speaking no
3. You will at leave have a dependency to flink-core. And this is a
somewhat custom setup, so you might be facing breaking API changes.
4. I'm not aware of any benchmarks. The Flink serializers are mostly for
internal use (between our operators), Kryo is our fallback (to not suffer
to much from the not invented here syndrome), while Avro is meant for
cross-system serialization.

I have the feeling that you can move ahead with using Flink's Pojo
serializer everywhere :)

Best,
Robert




On Wed, Mar 4, 2020 at 1:04 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi,
>
> Without knowing too much about flink serialization, I know that Flinks
> states that it serializes POJOtypes much faster than even the fast Kryo for
> Java. I further know that it supports schema evolution in the same way as
> avro.
>
> In our project, we have a star architecture, where one flink job produces
> results into a kafka topic and where we have multiple downstream consumers
> from that kafka topic (Mostly other flink jobs).
> For fast development cycles, we currently use JSON as output format for
> the kafka topic due to easy debugging capabilities and best migration
> possibilities. However, when scaling up, we need to switch to a more
> efficient format. Most often, Avro is mentioned in combination with a
> schema registry, as its much more efficient then JSON where essentially,
> each message contains the schema as well. However, in most benchmarks, avro
> turns out to be rather slow in terms of CPU cycles ( e.g. [1]
>  )
>
> My question(s) now:
> 1. Is it reasonable to use flink serializers as message format in Kafka?
> 2. Are there any downsides in using flinks serialization result as output
> format to kafka?
> 3. Can downstream consumers, written in Java, but not flink components,
> also easily deserialize flink serialized POJOs? Or do they have a
> dependency to at least full flink-core?
> 4. Do you have benchmarks comparing flink (de-)serialization performance
> to e.g. kryo and avro?
>
> The only thing I come up with why I wouldn't use flink serialization is
> that we wouldn't have a schema registry, but in our case, we share all our
> POJOs in a jar which is used by all components, so that is kind of a schema
> registry already and if we only make avro compatible changes, which are
> also well treated by flink, that shouldn't be any limitation compared to
> like avro+registry?
>
> Best regards
> Theo
>
> [1] https://github.com/eishay/jvm-serializers/wiki
>


RE: Flink Conf "yarn.flink-dist-jar" Question

2020-03-09 Thread Hailu, Andreas
Hi Yang,

Yes, a combination of these two would be very helpful for us. We have a single 
shaded binary which we use to run all of the jobs on our YARN cluster. If we 
could designate a single location in HDFS for that as well, we could also 
greatly benefit from FLINK-13938.

It sounds like a general public cache solution is what’s being called for?

// ah

From: Yang Wang 
Sent: Sunday, March 8, 2020 10:52 PM
To: Hailu, Andreas [Engineering] 
Cc: tison ; user@flink.apache.org
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question

Hi Hailu, tison,

I created a very similar ticket before to accelerate Flink submission on 
Yarn[1]. However,
we do not get a consensus in the PR. Maybe it's time to revive the discussion 
and try
to find a common solution for both the two tickets[1][2].


[1]. 
https://issues.apache.org/jira/browse/FLINK-13938
[2]. 
https://issues.apache.org/jira/browse/FLINK-14964


Best,
Yang

Hailu, Andreas mailto:andreas.ha...@gs.com>> 于2020年3月7日周六 
上午11:21写道:
Hi Tison, thanks for the reply. I’ve replied to the ticket. I’ll be watching it 
as well.

// ah

From: tison mailto:wander4...@gmail.com>>
Sent: Friday, March 6, 2020 1:40 PM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>
Cc: user@flink.apache.org
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question

FLINK-13938 seems a bit different than your requirement. The one totally 
matches is 
FLINK-14964.
 I'll appreciate it if you can share you opinion on the JIRA ticket.

Best,
tison.


tison mailto:wander4...@gmail.com>> 于2020年3月7日周六 上午2:35写道:
Yes your requirement is exactly taken into consideration by the community. We 
currently have an open JIRA ticket for the specific feature[1] and works for 
loosing the constraint of flink-jar schema to support DFS location should 
happen.

Best,
tison.

[1] 
https://issues.apache.org/jira/browse/FLINK-13938


Hailu, Andreas mailto:andreas.ha...@gs.com>> 于2020年3月7日周六 
上午2:03写道:
Hi,

We noticed that every time an application runs, it uploads the flink-dist 
artifact to the /user//.flink HDFS directory. This causes a user disk 
space quota issue as we submit thousands of apps to our cluster an hour. We had 
a similar problem with our Spark applications where it uploaded the Spark 
Assembly package for every app. Spark provides an argument to use a location in 
HDFS its for applications to leverage so they don’t need to upload them for 
every run, and that was our solution (see “spark.yarn.jar” configuration if 
interested.)

Looking at the Resource Orchestration Frameworks 
page,
 I see there’s might be a similar concept through a “yarn.flink-dist-jar” 
configuration option. I wanted to place the flink-dist package we’re using in a 
location in HDFS and configure out jobs to point to it, e.g.

yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar

Am I correct in that this is what I’m looking for? I gave this a try with some 
jobs today, and based on what I’m seeing in the launch_container.sh in our YARN 
application, it still looks like it’s being uploaded:

export 
_FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar"

How can I confirm? Or is this perhaps not config I’m looking for?

Best,
Andreas



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how w

RE: Flink Conf "yarn.flink-dist-jar" Question

2020-03-09 Thread Hailu, Andreas
Also may I ask what causes these application ID directories to be left behind? 
Is it a job failure, or can they persist even if the application succeeds? I’d 
like to know so that I can implement my own cleanup in the interim to prevent 
exceeding user disk space quotas.

// ah

From: Hailu, Andreas [Engineering]
Sent: Monday, March 9, 2020 1:20 PM
To: 'Yang Wang' 
Cc: tison ; user@flink.apache.org
Subject: RE: Flink Conf "yarn.flink-dist-jar" Question

Hi Yang,

Yes, a combination of these two would be very helpful for us. We have a single 
shaded binary which we use to run all of the jobs on our YARN cluster. If we 
could designate a single location in HDFS for that as well, we could also 
greatly benefit from FLINK-13938.

It sounds like a general public cache solution is what’s being called for?

// ah

From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Sunday, March 8, 2020 10:52 PM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>
Cc: tison mailto:wander4...@gmail.com>>; 
user@flink.apache.org
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question

Hi Hailu, tison,

I created a very similar ticket before to accelerate Flink submission on 
Yarn[1]. However,
we do not get a consensus in the PR. Maybe it's time to revive the discussion 
and try
to find a common solution for both the two tickets[1][2].


[1]. 
https://issues.apache.org/jira/browse/FLINK-13938
[2]. 
https://issues.apache.org/jira/browse/FLINK-14964


Best,
Yang

Hailu, Andreas mailto:andreas.ha...@gs.com>> 于2020年3月7日周六 
上午11:21写道:
Hi Tison, thanks for the reply. I’ve replied to the ticket. I’ll be watching it 
as well.

// ah

From: tison mailto:wander4...@gmail.com>>
Sent: Friday, March 6, 2020 1:40 PM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>
Cc: user@flink.apache.org
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question

FLINK-13938 seems a bit different than your requirement. The one totally 
matches is 
FLINK-14964.
 I'll appreciate it if you can share you opinion on the JIRA ticket.

Best,
tison.


tison mailto:wander4...@gmail.com>> 于2020年3月7日周六 上午2:35写道:
Yes your requirement is exactly taken into consideration by the community. We 
currently have an open JIRA ticket for the specific feature[1] and works for 
loosing the constraint of flink-jar schema to support DFS location should 
happen.

Best,
tison.

[1] 
https://issues.apache.org/jira/browse/FLINK-13938


Hailu, Andreas mailto:andreas.ha...@gs.com>> 于2020年3月7日周六 
上午2:03写道:
Hi,

We noticed that every time an application runs, it uploads the flink-dist 
artifact to the /user//.flink HDFS directory. This causes a user disk 
space quota issue as we submit thousands of apps to our cluster an hour. We had 
a similar problem with our Spark applications where it uploaded the Spark 
Assembly package for every app. Spark provides an argument to use a location in 
HDFS its for applications to leverage so they don’t need to upload them for 
every run, and that was our solution (see “spark.yarn.jar” configuration if 
interested.)

Looking at the Resource Orchestration Frameworks 
page,
 I see there’s might be a similar concept through a “yarn.flink-dist-jar” 
configuration option. I wanted to place the flink-dist package we’re using in a 
location in HDFS and configure out jobs to point to it, e.g.

yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar

Am I corr

Re: Process parquet files in batch mode with blink planner

2020-03-09 Thread Jingsong Li
Hi olivier,

Sorry for the late reply.
In blink planner,
- only hive parquet table can be read now.
- If you want to support native parquet files, you can modify
`ParquetTableSource` a little bit, extends StreamTableSource.

Best,
Jingsong Lee

On Wed, Feb 26, 2020 at 7:50 PM  wrote:

> Hi community,
>
> For a PoC I need to process some parquet files in batch mode.
>
> I managed to implement some processing using the DataSet API. It is
> working fine.
> Now, I would like to test the SQL API and the blink planner.
>
> If I do well understand, the ParquetTableSource is not compatible with the
> blink planner. Thus I am wondering if there is a TableSource compatible
> with the blink planner which can be used to read parquet files and if there
> are some examples available.
>
> Thanks,
>
> Olivier
>


-- 
Best, Jingsong Lee


Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-09 Thread kant kodali
Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time
characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to
Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call
datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if
time characteristic is event time. No? Did this behavior change in Flink
1.10? because I see libraries not setting
datastream.assignTimestampsAndWatermarks when time characteristic is
Ingestion time but they do for event time. If not, I am wondering how can I
set AscendingTimestampExtractor in a distributed environment? is there
anyway to add monotonically increasing long(AscendingTimestampExtractor)
without any distributed locks?

Thanks!