Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-29 Thread Felipe Gutierrez
because I am measuring one operator (all instances) and I want to
place its downstream operators in another machine in order to use
network channels.

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 4:59 AM Weihua Hu  wrote:
>
> Hi, Felipe
>
> Flink does not support run tasks on specified TM.
> You can use slotSharingGroup to control Tasks not in same Slot, but cannot 
> specified which TM.
>
> Can you please give the reason for specifying TM?
>
>
> Best
> Weihua Hu
>
> 2020年5月28日 21:37,Felipe Gutierrez  写道:
>
> For instance, if I have the following DAG with the respect parallelism
> in parenthesis (I hope the dag appears real afterall):
>
>  source01 -> map01(4) -> flatmap01(4) \
>
>  |-> keyBy -> reducer(8)
>  source02 -> map02(4) -> flatmap02(4) /
>
> And I have 4 TMs in 4 machines with 4 cores each. I would like to
> place source01 and map01 and flatmap01 in TM-01. source02 and map02
> and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> in TM-04.
>
> I am using the methods "setParallelism()" and "slotSharingGroup()" to
> define it but both source01 and source02 are placed in TM-01 and map01
> is split into 2 TMs. The same with map02.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>


Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Robert Metzger
Hey Prasanna,

(Side note: there is not need to send this email to multiple mailing lists.
The user@ list is the right one)

Let me quickly go through your questions:

Is this usecase suited for flink ?


Based on the details you've provided: Yes
What you also need to consider are the hardware requirements you'll have
for processing such amounts of data. I can strongly recommend setting up a
small demo environment to measure the throughput of a smaller Flink cluster
(say 10 machines).

1) If you do not have any consistency guarantees (data loss is acceptable),
and you have good infrastructure in place to deploy and monitor such
microservices then a microservice might also be an option.
Flink is pretty well suited for heavy IO use-cases. Afaik Netflix has
talked at several Flink Forward conferences about similar cases (check
Youtube for recorded talks)

2) It should not be too difficult to build a small, generic framework on
top of the Flink APIs

3) If you are deploying Flink on a resource manager like Kubernetes or
YARN, they will take care of recovering your cluster if it goes down. Your
recovery time will mostly depend on the state size that you are
checkpointing (and the ability of your resource manager to bring up new
resources). I don't think you'll be able to recover in < 500 milliseconds,
but within a few seconds.
I don't think that the other frameworks you are looking at are going to be
much better at this.

Best,
Robert

On Tue, May 19, 2020 at 1:28 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi,
>
> I have the following usecase to implement in my organization.
>
> Say there is huge relational database(1000 tables for each of our 30k
> customers) in our monolith setup
>
> We want to reduce the load on the DB and prevent the applications from
> hitting it for latest events. So an extract is done from redo logs on to
> kafka.
>
> We need to set up a streaming platform based on the table updates that
> happen(read from kafka) , we need to form events and send it consumer.
>
> Each consumer may be interested in same table but different
> updates/columns respective of their business needs and then deliver it to
> their endpoint/kinesis/SQS/a kafka topic.
>
> So the case here is *1* table update : *m* events : *n* sink.
> Peak Load expected is easily a  100k-million table updates per second(all
> customers put together)
> Latency expected by most customers is less than a second. Mostly in
> 100-500ms.
>
> Is this usecase suited for flink ?
>
> I went through the Flink book and documentation. These are the following
> questions i have
>
> 1). If we have situation like this *1* table update : *m* events : *n*
> sink , is it better to write our micro service on our own or it it better
> to implement through flink.
>   1 a)  How does checkpointing happens if we have *1* input: *n*
> output situations.
>   1 b)  There are no heavy transformations maximum we might do is to
> check the required columns are present in the db updates and decide whether
> to create an event. So there is an alternative thought process to write a
> service in node since it more IO and less process.
>
> 2)  I see that we are writing a Job and it is deployed and flink takes
> care of the rest in handling parallelism, latency and throughput.
>  But what i need is to write a generic framework so that we should be
> able to handle any table structure. we should not end up writing one job
> driver for each case.
> There are at least 200 type of events in the existing monolith system
> which might move to this new system once built.
>
> 3)  How do we maintain flink cluster HA . From the book , i get that
> internal task level failures are handled gracefully in flink.  But what if
> the flink cluster goes down, how do we make sure its HA ?
> I had earlier worked with spark and we had issues managing it. (Not
> much problem was there since there the latency requirement is 15 min and we
> could make sure to ramp another one up within that time).
> These are absolute realtime cases and we cannot miss even one
> message/event.
>
> There are also thoughts whether to use kafka streams/apache storm for the
> same. [They are investigated by different set of folks]
>
> Thanks,
> Prasanna.
>


Re: 关于flink sql 滚动窗口无法输出结果集合

2020-05-29 Thread Benchao Li
Hi,

Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。


steven chen  于2020年5月29日周五 下午2:34写道:

> 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
> CREATE TABLE user_behavior (
>
> itemCode VARCHAR,
>
> ts BIGINT COMMENT '时间戳',
>
> t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'-MM-dd HH:mm:ss')),
>
> proctime as PROCTIME(),
>
> WATERMARK FOR t as t - INTERVAL '5' SECOND
>
> ) WITH (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'scan-flink-topic',
>
> 'connector.properties.group.id' ='qrcode_pv_five_min',
>
> 'connector.startup-mode' = 'latest-offset',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'update-mode' = 'append',
>
> 'format.type' = 'json',
>
> 'format.derive-schema' = 'true'
> );
> CREATE TABLE pv_five_min (
> item_code VARCHAR,
> dt VARCHAR,
> dd VARCHAR,
> pv BIGINT
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
> 'connector.table' = 'qrcode_pv_five_min',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = 'root',
> 'connector.write.flush.max-rows' = '1'
> );
> INSERT INTO pv_five_min
> SELECT
> itemCode As item_code,
> DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dt,
> DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dd,
> COUNT(*) AS pv
> FROM user_behavior
> GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;
>
>
>
>
>
>
>


-- 

Best,
Benchao Li


Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Chesnay Schepler

oh I'm not using the HistoryServer; I just wrote it ;)
Are these archives all in the same location? So we're roughly looking at 
5 GB of archives then?


That could indeed "just" be a resource problem. The HistoryServer 
eagerly downloads all archives, and not on-demand.
The next step would be to move some of the archives into a separate HDFS 
directory and try again.


(Note that by configuring "historyserver.web.tmpdir" to some permanent 
directory subsequent (re)starts of the HistorySserver can re-use this 
directory; so you only have to download things once)


On 29/05/2020 00:43, Hailu, Andreas wrote:


May I also ask what version of flink-hadoop you’re using and the 
number of jobs you’re storing the history for? As of writing we have 
roughly 101,000 application history files. I’m curious to know if 
we’re encountering some kind of resource problem.


*// *ah**

*From:*Hailu, Andreas [Engineering]
*Sent:* Thursday, May 28, 2020 12:18 PM
*To:* 'Chesnay Schepler' ; user@flink.apache.org
*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Okay, I will look further to see if we’re mistakenly using a version 
that’s pre-2.6.0. However, I don’t see flink-shaded-hadoop in my /lib 
directory for flink-1.9.1.


flink-dist_2.11-1.9.1.jar

flink-table-blink_2.11-1.9.1.jar

flink-table_2.11-1.9.1.jar

log4j-1.2.17.jar

slf4j-log4j12-1.7.15.jar

Are the files within /lib.

*// *ah**

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Thursday, May 28, 2020 11:00 AM
*To:* Hailu, Andreas [Engineering] >; user@flink.apache.org 


*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:

https://issues.apache.org/jira/browse/HDFS-6999 



https://issues.apache.org/jira/browse/HDFS-7005 



https://issues.apache.org/jira/browse/HDFS-7145 



It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and 
flink-shaded-hadoop in /lib then you basically don't know what Hadoop 
version is actually being used,


which could lead to incompatibilities and dependency clashes.

If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is 
being used and runs into HDFS-7005.


On 28/05/2020 16:27, Hailu, Andreas wrote:

Just created a dump, here’s what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5
os_prio=0 tid=0x7f93a5a2c000 nid=0x5692 runnable
[0x7f934a0d3000]

java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

    at
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)

    at
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

    - locked <0x0005df986960> (a sun.nio.ch.Util$2)

    - locked <0x0005df986948> (a
java.util.Collections$UnmodifiableSet)

    - locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)

    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

    at

org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)

    at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)

    at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)

    at

org.apache.hadoop.hdfs.RemoteBlockReader2.readNext

Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Prasanna kumar
Thanks Robert for the reply.

On Fri 29 May, 2020, 12:31 Robert Metzger,  wrote:

> Hey Prasanna,
>
> (Side note: there is not need to send this email to multiple mailing
> lists. The user@ list is the right one)
>
> Let me quickly go through your questions:
>
> Is this usecase suited for flink ?
>
>
> Based on the details you've provided: Yes
> What you also need to consider are the hardware requirements you'll have
> for processing such amounts of data. I can strongly recommend setting up a
> small demo environment to measure the throughput of a smaller Flink cluster
> (say 10 machines).
>
> 1) If you do not have any consistency guarantees (data loss is
> acceptable), and you have good infrastructure in place to deploy and
> monitor such microservices then a microservice might also be an option.
> Flink is pretty well suited for heavy IO use-cases. Afaik Netflix has
> talked at several Flink Forward conferences about similar cases (check
> Youtube for recorded talks)
>
> 2) It should not be too difficult to build a small, generic framework on
> top of the Flink APIs
>
> 3) If you are deploying Flink on a resource manager like Kubernetes or
> YARN, they will take care of recovering your cluster if it goes down. Your
> recovery time will mostly depend on the state size that you are
> checkpointing (and the ability of your resource manager to bring up new
> resources). I don't think you'll be able to recover in < 500 milliseconds,
> but within a few seconds.
> I don't think that the other frameworks you are looking at are going to be
> much better at this.
>
> Best,
> Robert
>
> On Tue, May 19, 2020 at 1:28 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi,
>>
>> I have the following usecase to implement in my organization.
>>
>> Say there is huge relational database(1000 tables for each of our 30k
>> customers) in our monolith setup
>>
>> We want to reduce the load on the DB and prevent the applications from
>> hitting it for latest events. So an extract is done from redo logs on to
>> kafka.
>>
>> We need to set up a streaming platform based on the table updates that
>> happen(read from kafka) , we need to form events and send it consumer.
>>
>> Each consumer may be interested in same table but different
>> updates/columns respective of their business needs and then deliver it to
>> their endpoint/kinesis/SQS/a kafka topic.
>>
>> So the case here is *1* table update : *m* events : *n* sink.
>> Peak Load expected is easily a  100k-million table updates per second(all
>> customers put together)
>> Latency expected by most customers is less than a second. Mostly in
>> 100-500ms.
>>
>> Is this usecase suited for flink ?
>>
>> I went through the Flink book and documentation. These are the following
>> questions i have
>>
>> 1). If we have situation like this *1* table update : *m* events : *n*
>> sink , is it better to write our micro service on our own or it it better
>> to implement through flink.
>>   1 a)  How does checkpointing happens if we have *1* input: *n*
>> output situations.
>>   1 b)  There are no heavy transformations maximum we might do is to
>> check the required columns are present in the db updates and decide whether
>> to create an event. So there is an alternative thought process to write a
>> service in node since it more IO and less process.
>>
>> 2)  I see that we are writing a Job and it is deployed and flink takes
>> care of the rest in handling parallelism, latency and throughput.
>>  But what i need is to write a generic framework so that we should be
>> able to handle any table structure. we should not end up writing one job
>> driver for each case.
>> There are at least 200 type of events in the existing monolith system
>> which might move to this new system once built.
>>
>> 3)  How do we maintain flink cluster HA . From the book , i get that
>> internal task level failures are handled gracefully in flink.  But what if
>> the flink cluster goes down, how do we make sure its HA ?
>> I had earlier worked with spark and we had issues managing it. (Not
>> much problem was there since there the latency requirement is 15 min and we
>> could make sure to ramp another one up within that time).
>> These are absolute realtime cases and we cannot miss even one
>> message/event.
>>
>> There are also thoughts whether to use kafka streams/apache storm for the
>> same. [They are investigated by different set of folks]
>>
>> Thanks,
>> Prasanna.
>>
>


Re: [DISCUSS] Remove dependency shipping through nested jars during job submission.

2020-05-29 Thread Robert Metzger
Hi,
afaik, this feature was added because Hadoop MapReduce has it as well (
https://blog.cloudera.com/how-to-include-third-party-libraries-in-your-map-reduce-job/,
point 2.).

I don't remember having seen this anywhere in the wild. I believe it is a
good idea to simplify our codebase here.
If there are concerns, then we could at least add a big WARN log message in
Flink 1.11+ that this feature will be deprecated in the future.


On Wed, May 20, 2020 at 10:39 AM Kostas Kloudas  wrote:

> Hi all,
>
> I would like to bring the discussion in
> https://issues.apache.org/jira/browse/FLINK-17745 to the dev mailing
> list, just to hear the opinions of the community.
>
> In a nutshell, in the early days of Flink, users could submit their
> jobs as fat-jars that had a specific structure. More concretely, the
> user could put the dependencies of the submitted job in a lib/ folder
> within his/her jar and Flink would search within the user's jar for
> such a folder, and if this existed, it would extract the nested jars,
> ship them independently and add them to the classpath. Finally, it
> would also ship the fat-jar itself so that the user-code is available
> at the cluster (for details see [1]).
>
> This way of submission was NOT documented anywhere and it has the
> obvious shortcoming that the "nested" jars will be shipped twice. In
> addition, it makes the codebase a bit more difficult to maintain, as
> this constitutes another way of submitting stuff.
>
> Given the above, I would like to propose to remove this codepath. But
> given that there are users using the hidden feature, I would like to
> discuss 1) how many such users exist, 2) how difficult it is for them
> to "migrate" to a different way of submitting jobs, and 3) if the rest
> of the community agrees on removing it.
>
> I post this on both dev and user ML so that we have better coverage.
>
> Looking forward to a fruitful discussion,
> Kostas
>
> [1]
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L222
>


Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-29 Thread Felipe Gutierrez
Using slotSharingGroup I can do some placement. however, I am using
two different slotSharingGroup for two different sources, even though
they are placed in the same TM. And this starts splitting the
downstream operators in different TM as well.

 stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \
 stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 /
  |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3

I am not sure which configuration I can adjust in the
conf/flink-conf.yaml file to make it works. Currently, my
configuration is like this bellow on the four TMs.

taskmanager.numberOfTaskSlots: 4
parallelism.default: 4

Maybe if I use different numberOfTaskSlots on different TMs would it work?

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez
 wrote:
>
> because I am measuring one operator (all instances) and I want to
> place its downstream operators in another machine in order to use
> network channels.
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 29, 2020 at 4:59 AM Weihua Hu  wrote:
> >
> > Hi, Felipe
> >
> > Flink does not support run tasks on specified TM.
> > You can use slotSharingGroup to control Tasks not in same Slot, but cannot 
> > specified which TM.
> >
> > Can you please give the reason for specifying TM?
> >
> >
> > Best
> > Weihua Hu
> >
> > 2020年5月28日 21:37,Felipe Gutierrez  写道:
> >
> > For instance, if I have the following DAG with the respect parallelism
> > in parenthesis (I hope the dag appears real afterall):
> >
> >  source01 -> map01(4) -> flatmap01(4) \
> >
> >  |-> keyBy -> reducer(8)
> >  source02 -> map02(4) -> flatmap02(4) /
> >
> > And I have 4 TMs in 4 machines with 4 cores each. I would like to
> > place source01 and map01 and flatmap01 in TM-01. source02 and map02
> > and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> > operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> > in TM-04.
> >
> > I am using the methods "setParallelism()" and "slotSharingGroup()" to
> > define it but both source01 and source02 are placed in TM-01 and map01
> > is split into 2 TMs. The same with map02.
> >
> > Thanks,
> > Felipe
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> >


Re: Flink Kafka Connector Source Parallelism

2020-05-29 Thread Robert Metzger
Hi Mason,
your understanding is correct.

On Thu, May 28, 2020 at 8:23 AM Chen, Mason  wrote:

> I think I may have just answered my own question. There’s only one Kafka
> partition, so the maximum parallelism is one and it doesn’t really make
> sense to make another kafka consumer under the same group id. What threw me
> off is that there’s a 2nd subtask for the kafka source created even
> though it’s not actually doing anything. So, it seems a general statement
> can be made that (# kafka partitions) >= (# parallelism of flink kafka
> source)…well I guess you could have more parallelism than kafka partitions,
> but the extra subtasks will not doing anything.
>
>
>
> *From: *"Chen, Mason" 
> *Date: *Wednesday, May 27, 2020 at 11:09 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Flink Kafka Connector Source Parallelism
>
>
>
> Hi all,
>
>
>
> I’m currently trying to understand Flink’s Kafka Connector and how
> parallelism affects it. So, I am running the flink playground click count
> job and the parallelism is set to 2 by default.
>
>
> However, I don’t see the 2nd subtask of the Kafka Connector sending any 
> records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
> kafka?
>
> ```
> clicks = clicks
>.keyBy(ClickEvent::getPage)
>.map(*new *BackpressureMap())
>.name(*"Backpressure"*);
> ```
>
>
>
> `clicks` is the kafka click stream. From my reading in the operator docs,
> it seems counterintuitive to do a `rebalance()` when I am already doing a
> `keyBy()`.
>
> So, my questions:
>
> 1. How do I make use of the 2nd subtask?
>
> 2. Does the number of partitions have some sort of correspondence with the
> parallelism of the source operator? If so, is there a general statement to
> be made about parallelism across all source operators?
>
>
>
> Thanks,
>
> Mason
>


Re: Age old stop vs cancel debate

2020-05-29 Thread Robert Metzger
Hi Kumar,

They way you've implemented your custom source sounds like the right way:
Having a "running" flag checked by the run() method and changing it in
cancel().
Also, it is good that you are properly handling the interrupt set by Flink
(some people ignore InterruptedExceptions, which make it difficult
(basically impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar  wrote:

> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via
> the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps
> (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which
> sets the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the
> isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not
> get interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which
> seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink
> job receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got
> removed in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar
>


Re: Question on stream joins

2020-05-29 Thread Sudan S
Thanks Yun. Was thinking a similar way.  I had one more question.

leftSource.connect(rightSource)
   .process(new TagCoprocessFunction()) // In this function,  tag the
left source with "0" and the right source with "1"
  .window(xx)
  .process(new XX())

In this when will the window be applied ? since the window operator is
after process(new TagCoprocessFunction()).

On Fri, May 29, 2020 at 11:35 AM Yun Gao  wrote:

> Hi Sudan,
>
>As far as I know, both join and cogroup requires keys (namely
> partitioning), thus for the non-keyed scenario, you may have to use
> low-level connect operator to achieve it. In my opinion it should be
> something like
>
>   leftSource.connect(rightSource)
>.process(new TagCoprocessFunction()) // In this function,  tag the
> left source with "0" and the right source with "1"
> ​  .window(xx)
> ​  .process(new XX()) // In this function, you could get all the left
> and right elements in this window, and you could distinguish them with the
> tag added in the previous step.
>
> It should be pointed out that without key (partitioning) the paralellism
> of the window operator will have to be 1.
>
>
> For the keyed scenarios, You may use high-level operators join/cogroup to
> achieve that. The join could be seen as a special example as cogroup that
> in cogroup, you could access all the left and right elements directly, and
> in join function, the framework will iterate the elements for you and you
> can only specify the logic for each (left, right) pair.
>
> Best,
>  Yun
>
> --Original Mail --
> *Sender:*Sudan S 
> *Send Date:*Fri May 29 01:40:59 2020
> *Recipients:*User-Flink 
> *Subject:*Question on stream joins
>
>> Hi ,
>>
>> I have two usecases
>>
>> 1. I have two streams which `leftSource` and `rightSource` which i want
>> to join without partitioning over a window and find the difference of
>> count of elements of leftSource and rightSource and emit the result of
>> difference. Which is the appropriate join function ican use ?
>>
>> join/cogroup/connect.
>>
>> 2. I want to replicate the same behaviour over a keyed source. Basically
>> leftSource and rightSource are joined by a partition key.
>>
>> Plz let me know which is the appropriate join operator for the usecase
>>
>> --
>> *"The information contained in this e-mail and any accompanying documents
>> may contain information that is confidential or otherwise protected from
>> disclosure. If you are not the intended recipient of this message, or if
>> this message has been addressed to you in error, please immediately alert
>> the sender by replying to this e-mail and then delete this message,
>> including any attachments. Any dissemination, distribution or other use of
>> the contents of this message by anyone other than the intended recipient is
>> strictly prohibited. All messages sent to and from this e-mail address may
>> be monitored as permitted by applicable law and regulations to ensure
>> compliance with our internal policies and to protect our business."*
>> --
>>
>

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*


Re: Executing a controllable benchmark in Flink

2020-05-29 Thread Robert Metzger
Hi Felipe,

the file is just 80 MBs. It is probably cached in the linux page cache,
there should not be any disk IO involved.
So you are saying is that you can not further increase the throughput for
sleeps shorter than 2000 nanoseconds.
Have you tried running this w/o any Sleep / nano.time syscalls? These
syscalls can potentially be also expensive.
Running the source in a simple while loop should give you the theoretical
maximum.

If you really want to generate data at a high speed, I would pre-generate
some dataset on the heap (do not run any RNG, as it will waste CPU cycles)
and emit that.

In general: VisualVM is your friend in understanding where you are
loosing cycles.

Best,
Robert


On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am trying to benchmark a stream application in Flink. So, I am using
> the source Function that reads events from the NYC Taxi Rides
> (http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
> control the emission with System.nanoTime(). I am not using
> Thread.sleep because Java does not guarantee the time that the thread
> will be awakened.
>
> public void busySleep() {
> final long startTime = System.nanoTime();
> while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
> }
>
> So, when I wait for 1 nanoseconds I will get a workload of 100K
> rec/sec. When I wait for 2000 nanoseconds I will get a workload of
> 500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
> rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.
>
> The problem that I am facing is that when I set the workload for 1M
> rec/sec it seems that it is not generating at this rate. I guess it is
> because it is consuming more time reading the TaxiRide file, or doing
> IO operations, Or maybe it is some Java limitation.
> If I use some message broker it will end up adding one more middleware
> to have read/write IO operations and I guess it will be worst.
> What do you recommend to do a controllable benchmark for stream processing?
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>


Re: Streaming multiple csv files

2020-05-29 Thread Robert Metzger
Hi Nikola,

you could implement a custom SourceFunction that implements this in some
way: If the files are small (< 10 MB) send each file as a record, then
process it in a subsequent flatMap operation. If the files are large, split
the work across the parallel sources and read them serially in the
SourceFunction.

The other option (which I have not fully thought through is using the
readFile method with a custom FileInputFormat implementation:

DataStreamSource readFile(FileInputFormat inputFormat,
String filePath)

You basically have to overwrite the
"FileInputFormat.createInputSplits()" method to get the CSV schema,
and pass it along to the splits so that they can properly parse the
data.
This approach is a little bit more involved to understand, but the
Flink Framework will do the heavy lifting of the file system handling
/ splitting / fault tolerance stuff.

Best,

Robert


On Thu, May 28, 2020 at 4:52 PM Nikola Hrusov  wrote:

> Hello,
>
> I have multiple files (file1, file2, file3) each being CSV and having
> different columns and data. The column headers are finite and we know
> their format. I would like to take them and parse them based on the column
> structure. I already have the parsers
>
> e.g.:
>
> file1 has columns (id, firstname, lastname)
> file2 has columns (id, name)
> file3 has columns (id, name_1, name_2, name_3, name_4)
>
> I would like to take all those files, read them, parse them and output
> objects to a sink as Person { id, fullName }
>
> Example files would be:
>
> file1:
> --
> id, firstname, lastname
> 33, John, Smith
> 55, Labe, Soni
>
> file2:
> --
> id, name
> 5, Mitr Kompi
> 99, Squi Masw
>
> file3:
> --
> id, name_1, name_2, name_3, name_4
> 1, Peter, Hov, Risti, Pena
> 2, Rii, Koni, Ques,,
>
> Expected output of my program would be:
>
> Person { 33, John Smith }
> Person { 55, Labe Soni }
> Person { 5, Mitr Kompi }
> Person { 99, Squi Masw }
> Person { 1, Peter Hov Risti Pena }
> Person { 2, Rii Koni Ques }
>
>
>
> What I do now is:
>
> My code (very simplified) is: env.readFile().flatMap(new
> MyParser()).addSink(new MySink())
> The MyParser receives the rows 1 by 1 in string format. Which means that
> when I run with parallelism > 1 I receive data from any file and I cannot
> say this line comes from where.
>
>
>
> What I would like to do is:
>
> Be able to figure out which is the file I am reading from.
> Since I only know the file type based on the first row (columns) I need to
> either send the 1st row to MyParser() or send a tuple <1st row of file
> being read, current row of file being read>.
> Another option that I can think about is to have some keyed function based
> on the first row, but I am not sure how to achieve that by using readFile.
>
>
> Is there a way I can achieve this?
>
>
> Regards
> ,
> Nikola
>


Re: Executing a controllable benchmark in Flink

2020-05-29 Thread Felipe Gutierrez
I was losing something when because I was reading the line of the
GZIPInputStream outside of the busy while loop. I changed it and now I
am having more throughput. It is also a good idea to use VisualVM to
check if the throughput is correct and where I am losing more cycles.


while (reader.ready() && (line = reader.readLine()) != null) {
startTime = System.nanoTime();
taxiRide = TaxiRide.fromString(line);
sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide));
this.dataRateListener.busySleep(startTime);
}

public void busySleep(long startTime) {
long deadLine = startTime + this.delayInNanoSeconds;
while (System.nanoTime() < deadLine) ;
}

Thanks!

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 12:46 PM Robert Metzger  wrote:
>
> Hi Felipe,
>
> the file is just 80 MBs. It is probably cached in the linux page cache, there 
> should not be any disk IO involved.
> So you are saying is that you can not further increase the throughput for 
> sleeps shorter than 2000 nanoseconds.
> Have you tried running this w/o any Sleep / nano.time syscalls? These 
> syscalls can potentially be also expensive.
> Running the source in a simple while loop should give you the theoretical 
> maximum.
>
> If you really want to generate data at a high speed, I would pre-generate 
> some dataset on the heap (do not run any RNG, as it will waste CPU cycles) 
> and emit that.
>
> In general: VisualVM is your friend in understanding where you are loosing 
> cycles.
>
> Best,
> Robert
>
>
> On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez 
>  wrote:
>>
>> Hi,
>>
>> I am trying to benchmark a stream application in Flink. So, I am using
>> the source Function that reads events from the NYC Taxi Rides
>> (http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
>> control the emission with System.nanoTime(). I am not using
>> Thread.sleep because Java does not guarantee the time that the thread
>> will be awakened.
>>
>> public void busySleep() {
>> final long startTime = System.nanoTime();
>> while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
>> }
>>
>> So, when I wait for 1 nanoseconds I will get a workload of 100K
>> rec/sec. When I wait for 2000 nanoseconds I will get a workload of
>> 500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
>> rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.
>>
>> The problem that I am facing is that when I set the workload for 1M
>> rec/sec it seems that it is not generating at this rate. I guess it is
>> because it is consuming more time reading the TaxiRide file, or doing
>> IO operations, Or maybe it is some Java limitation.
>> If I use some message broker it will end up adding one more middleware
>> to have read/write IO operations and I guess it will be worst.
>> What do you recommend to do a controllable benchmark for stream processing?
>>
>> Thanks,
>> Felipe
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com


Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-29 Thread Robert Metzger
Hi Arnaud,

Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());

The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
open() and close() where you can handle the connection with Kudu's
partitioning service.
The map will output a Tuple2 (or something nicer :) ),
then Flink shuffles your data correctly, and the sinks will process the
data correctly partitioned.

I hope that this is what you were looking for!

Best,
Robert

On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> I would like to upgrade the performance of my Apache Kudu Sink by using
> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
> Kudu partitions to lower the network shuffling.
>
> For that, I would like to implement something like
>
> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
> KuduSink(…)));
>
> With KuduFLinkPartitioner a implementation of 
> org.apache.flink.api.common.functions.Partitioner
> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>
>
>
> However for that KuduPartioner to work, it needs to open – and close at
> the end – a connection to the Kudu table – obviously something that can’t
> be done for each line. But there is no “AbstractRichPartitioner” with
> open() and close() method that I can use for that (the way I use it in the
> sink for instance).
>
>
>
> What is the best way to implement this ?
>
> I thought of ThreadLocals that would be initialized during the first call
> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
> close() things nicely as I won’t be notified on job termination.
>
>
>
> I thought of putting those static ThreadLocals inside a “Identity Mapper”
> that would be called just prior the partition with something like :
>
> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>
> with kudu connections initialized in the mapper open(), closed in the
> mapper close(), and used  in the partitioner partition().
>
> However It looks like an ugly hack breaking every coding principle, but as
> long as the threads are reused between the mapper and the partitioner I
> think that it should work.
>
>
>
> Is there a better way to do this ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: Dropping messages based on timestamp.

2020-05-29 Thread Robert Metzger
Hi Joe,

my gut feeling is that a flatMap() is what you are looking for.

Best,
Robert

On Thu, May 28, 2020 at 7:21 PM Joe Malt  wrote:

> Hi,
>
> I'm working on a custom TimestampAssigner which will do different things
> depending on the value of the extracted timestamp. One of the actions I
> want to take is to drop messages entirely if their timestamp meets certain
> criteria.
>
> Of course there's no direct way to do this in the TimestampAssigner, but
> I'd like to keep this logic as close to the TimestampAssigner as possible
> since this is going to be a pluggable component used in a bunch of
> different Flink apps.
>
> What would be the best way to implement this?
>
> Thanks,
> Joe
>
>
>


Re: Flink Iterator Functions

2020-05-29 Thread Robert Metzger
Hi Roderick,

Luckily there are no silly questions, just silly answers (so I have the
harder job here ;) )

It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are
implementing in your main() method gets executed in the JVM submitting your
job to the Flink cluster. There, we are just constructing a graph of
dataflow operations, which will then be distributed to the cluster.
As part of this process, we are serializing all the user code from the
client, and sending it over to the cluster where it gets executed in a
distributed fashion.
Further reading:
https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html
I assume you are seeing "null" references because the objects you are
trying to send to the cluster are not serializable (but stored in a
transient field); or Spring is doing some Dependency Injection magic that
does not work in the remote Flink environment.

tl;dr: What I would recommend is implement a custom SourceFunction that
reads from ArangoDB. The RichParallelSourceFunction will allow you to read
with parallelism > 1, and it has some lifecycle methods for opening and
closing the connection to Arango.
For the configuration passing, I would pass it a _serializable_ object
through the constructor of your custom source.

Best,
Robert



On Thu, May 28, 2020 at 6:40 PM Roderick Vincent 
wrote:

> Hi,
>
> I am brand new to Apache Flink so please excuse any silly questions.  I
> have an Iterator function defined as below and adding it as a source to a
> Flink stream.  But when I try to pass configuration information to it (via
> a Spring env), what I notice is that one of the threads calls hasNext() and
> it is not the same object and the passed information is null.  Something is
> constructing it, but what is strange is that if I add a default constructor
> I do not see this being called by this thread with the null data so I am
> wondering what is going on.  Any ideas?  How do we pass configuration
> information to these functions?  Any help would be appreciated.
>
> Thanks,
> Rick
>
> @Public
> public class NodeSource extends
> FromIteratorFunction> {
>
>
> private static final long serialVersionUID = 1L;
>
> public NodeSource(ArangoDBSource iterator) {
> super(iterator);
> }
>
> }
>


Re: Re: Question on stream joins

2020-05-29 Thread Yun Gao
Hi Sudan,

   The first process is used to tag the elements from the left and right 
windows, so next they could be merged into the same stream and then they could 
be assigned to the same window. Then the next window(xxx).process(new 
WindowProcessFunction) defines the window operator to process the windowed 
elements, thus the second process defines the window process logic. Without the 
tagging we may not be able to assign the elements from both the left and right 
stream to the same window.

Best,
 Yun



 --Original Mail --
Sender:Sudan S 
Send Date:Fri May 29 14:39:31 2020
Recipients:Yun Gao 
CC:User-Flink 
Subject:Re: Question on stream joins

Thanks Yun. Was thinking a similar way.  I had one more question.

leftSource.connect(rightSource)
   .process(new TagCoprocessFunction()) // In this function,  tag the left 
source with "0" and the right source with "1"
  .window(xx) 
  .process(new XX()) 

In this when will the window be applied ? since the window operator is after 
process(new TagCoprocessFunction()).

On Fri, May 29, 2020 at 11:35 AM Yun Gao  wrote:

Hi Sudan,

   As far as I know, both join and cogroup requires keys (namely partitioning), 
thus for the non-keyed scenario, you may have to use low-level connect operator 
to achieve it. In my opinion it should be something like

  leftSource.connect(rightSource)
   .process(new TagCoprocessFunction()) // In this function,  tag the left 
source with "0" and the right source with "1"
​  .window(xx) 
​  .process(new XX()) // In this function, you could get all the left and 
right elements in this window, and you could distinguish them with the tag 
added in the previous step.

It should be pointed out that without key (partitioning) the paralellism of the 
window operator will have to be 1.


For the keyed scenarios, You may use high-level operators join/cogroup to 
achieve that. The join could be seen as a special example as cogroup that in 
cogroup, you could access all the left and right elements directly, and in join 
function, the framework will iterate the elements for you and you can only 
specify the logic for each (left, right) pair. 

Best,
 Yun


 --Original Mail --
Sender:Sudan S 
Send Date:Fri May 29 01:40:59 2020
Recipients:User-Flink 
Subject:Question on stream joins

Hi ,

I have two usecases

1. I have two streams which `leftSource` and `rightSource` which i want to join 
without partitioning over a window and find the difference of count of elements 
of leftSource and rightSource and emit the result of difference. Which is the 
appropriate join function ican use ?

join/cogroup/connect.

2. I want to replicate the same behaviour over a keyed source. Basically 
leftSource and rightSource are joined by a partition key.

Plz let me know which is the appropriate join operator for the usecase
"The information contained in this e-mail and any accompanying documents may 
contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if this 
message has been addressed to you in error, please immediately alert the sender 
by replying to this e-mail and then delete this message, including any 
attachments. Any dissemination, distribution or other use of the contents of 
this message by anyone other than the intended recipient is strictly 
prohibited. All messages sent to and from this e-mail address may be monitored 
as permitted by applicable law and regulations to ensure compliance with our 
internal policies and to protect our business."
"The information contained in this e-mail and any accompanying documents may 
contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if this 
message has been addressed to you in error, please immediately alert the sender 
by replying to this e-mail and then delete this message, including any 
attachments. Any dissemination, distribution or other use of the contents of 
this message by anyone other than the intended recipient is strictly 
prohibited. All messages sent to and from this e-mail address may be monitored 
as permitted by applicable law and regulations to ensure compliance with our 
internal policies and to protect our business."

Re: Flink Iterator Functions

2020-05-29 Thread Arvid Heise
Hi Roderick,

adding to Robert's response: The easiest way is to get all needed
information injected only in the driver from which you manually pass the
config in a serializable form to your iterator. Configs could be for
example a Java Map using serializable elements, such as Strings.

If you need non-serializable objects, it is common practice to initialize
them in Rich(ParallelSource)Function#open and pass all information needed
to construct them in the constructor of the RichFunction and store them in
serializable fields.


On Fri, May 29, 2020 at 1:26 PM Robert Metzger  wrote:

> Hi Roderick,
>
> Luckily there are no silly questions, just silly answers (so I have the
> harder job here ;) )
>
> It seems that you are trying to read data from an Arango Database, right?
> What is important to understand is that the "flink job" that you are
> implementing in your main() method gets executed in the JVM submitting your
> job to the Flink cluster. There, we are just constructing a graph of
> dataflow operations, which will then be distributed to the cluster.
> As part of this process, we are serializing all the user code from the
> client, and sending it over to the cluster where it gets executed in a
> distributed fashion.
> Further reading:
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html
> I assume you are seeing "null" references because the objects you are
> trying to send to the cluster are not serializable (but stored in a
> transient field); or Spring is doing some Dependency Injection magic that
> does not work in the remote Flink environment.
>
> tl;dr: What I would recommend is implement a custom SourceFunction that
> reads from ArangoDB. The RichParallelSourceFunction will allow you to read
> with parallelism > 1, and it has some lifecycle methods for opening and
> closing the connection to Arango.
> For the configuration passing, I would pass it a _serializable_ object
> through the constructor of your custom source.
>
> Best,
> Robert
>
>
>
> On Thu, May 28, 2020 at 6:40 PM Roderick Vincent 
> wrote:
>
>> Hi,
>>
>> I am brand new to Apache Flink so please excuse any silly questions.  I
>> have an Iterator function defined as below and adding it as a source to a
>> Flink stream.  But when I try to pass configuration information to it (via
>> a Spring env), what I notice is that one of the threads calls hasNext() and
>> it is not the same object and the passed information is null.  Something is
>> constructing it, but what is strange is that if I add a default constructor
>> I do not see this being called by this thread with the null data so I am
>> wondering what is going on.  Any ideas?  How do we pass configuration
>> information to these functions?  Any help would be appreciated.
>>
>> Thanks,
>> Rick
>>
>> @Public
>> public class NodeSource extends
>> FromIteratorFunction> {
>>
>>
>> private static final long serialVersionUID = 1L;
>>
>> public NodeSource(ArangoDBSource iterator) {
>> super(iterator);
>> }
>>
>> }
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Dropping messages based on timestamp.

2020-05-29 Thread Arvid Heise
Although flatMap() is a valid choice, it would be more idiomatic to use
filter(). I'd apply that even before running TimestampAssigner, except when
extracting the timestamp is rather complicated. But if it's a simple field,
then it feels better to first filter bad data, and then apply any kind of
logic.

On Fri, May 29, 2020 at 1:13 PM Robert Metzger  wrote:

> Hi Joe,
>
> my gut feeling is that a flatMap() is what you are looking for.
>
> Best,
> Robert
>
> On Thu, May 28, 2020 at 7:21 PM Joe Malt  wrote:
>
>> Hi,
>>
>> I'm working on a custom TimestampAssigner which will do different things
>> depending on the value of the extracted timestamp. One of the actions I
>> want to take is to drop messages entirely if their timestamp meets certain
>> criteria.
>>
>> Of course there's no direct way to do this in the TimestampAssigner, but
>> I'd like to keep this logic as close to the TimestampAssigner as possible
>> since this is going to be a pluggable component used in a bunch of
>> different Flink apps.
>>
>> What would be the best way to implement this?
>>
>> Thanks,
>> Joe
>>
>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Hailu, Andreas
Yes, these are all in the same directory, and we're at 67G right now. I'll try 
with incrementally smaller directories and let you know what I find.

// ah

From: Chesnay Schepler 
Sent: Friday, May 29, 2020 3:11 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

oh I'm not using the HistoryServer; I just wrote it ;)
Are these archives all in the same location? So we're roughly looking at 5 GB 
of archives then?

That could indeed "just" be a resource problem. The HistoryServer eagerly 
downloads all archives, and not on-demand.
The next step would be to move some of the archives into a separate HDFS 
directory and try again.

(Note that by configuring "historyserver.web.tmpdir" to some permanent 
directory subsequent (re)starts of the HistorySserver can re-use this 
directory; so you only have to download things once)

On 29/05/2020 00:43, Hailu, Andreas wrote:
May I also ask what version of flink-hadoop you're using and the number of jobs 
you're storing the history for? As of writing we have roughly 101,000 
application history files. I'm curious to know if we're encountering some kind 
of resource problem.

// ah

From: Hailu, Andreas [Engineering]
Sent: Thursday, May 28, 2020 12:18 PM
To: 'Chesnay Schepler' ; 
user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Okay, I will look further to see if we're mistakenly using a version that's 
pre-2.6.0. However, I don't see flink-shaded-hadoop in my /lib directory for 
flink-1.9.1.

flink-dist_2.11-1.9.1.jar
flink-table-blink_2.11-1.9.1.jar
flink-table_2.11-1.9.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar

Are the files within /lib.

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Thursday, May 28, 2020 11:00 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:
https://issues.apache.org/jira/browse/HDFS-6999
https://issues.apache.org/jira/browse/HDFS-7005
https://issues.apache.org/jira/browse/HDFS-7145

It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and flink-shaded-hadoop in 
/lib then you basically don't know what Hadoop version is actually being used,
which could lead to incompatibilities and dependency clashes.
If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is being used 
and runs into HDFS-7005.

On 28/05/2020 16:27, Hailu, Andreas wrote:
Just created a dump, here's what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 
tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0005df986960> (a sun.nio.ch.Util$2)
- locked <0x0005df986948> (a java.util.Collections$UnmodifiableSet)
- locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead

RE: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-29 Thread LINZ, Arnaud
Hello,

Yes, that would definitely do the trick, with an extra mapper after keyBy to 
remove the tuple so that it stays seamless. It’s less hacky that what I was 
thinking of, thanks!
However, is there any plan in a future release to have rich partitioners ? That 
would avoid adding  overhead and “intermediate” technical info in the stream 
payload.
Best,
Arnaud

De : Robert Metzger 
Envoyé : vendredi 29 mai 2020 13:10
À : LINZ, Arnaud 
Cc : user 
Objet : Re: Best way to "emulate" a rich Partitioner with open() and close() 
methods ?

Hi Arnaud,

Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());

The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with 
open() and close() where you can handle the connection with Kudu's partitioning 
service.
The map will output a Tuple2 (or something nicer :) ), then 
Flink shuffles your data correctly, and the sinks will process the data 
correctly partitioned.

I hope that this is what you were looking for!

Best,
Robert

On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:

Hello,



I would like to upgrade the performance of my Apache Kudu Sink by using the new 
“KuduPartitioner” of Kudu API to match Flink stream partitions with Kudu 
partitions to lower the network shuffling.

For that, I would like to implement something like

stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

With KuduFLinkPartitioner a implementation of 
org.apache.flink.api.common.functions.Partitioner that internally make use of 
the KuduPartitioner client tool of Kudu’s API.



However for that KuduPartioner to work, it needs to open – and close at the end 
– a connection to the Kudu table – obviously something that can’t be done for 
each line. But there is no “AbstractRichPartitioner” with open() and close() 
method that I can use for that (the way I use it in the sink for instance).



What is the best way to implement this ?

I thought of ThreadLocals that would be initialized during the first call to 
int partition(K key, int numPartitions);  but I won’t be able to close() things 
nicely as I won’t be notified on job termination.



I thought of putting those static ThreadLocals inside a “Identity Mapper” that 
would be called just prior the partition with something like :

stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new 
KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

with kudu connections initialized in the mapper open(), closed in the mapper 
close(), and used  in the partitioner partition().

However It looks like an ugly hack breaking every coding principle, but as long 
as the threads are reused between the mapper and the partitioner I think that 
it should work.



Is there a better way to do this ?



Best regards,

Arnaud







L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: How to create schema for flexible json data in Flink SQL

2020-05-29 Thread Guodong Wang
Benchao,

Thank you for your detailed explanation.

Schema Inference can solve my problem partially. For example, starting from
some time, all the json afterward will contain a new field. I think for
this case, schema inference will help.
but if I need to handle all the json events with different schemas in one
table(this is the case 2),  I agree with you. Schema inference does not
help either.



Guodong


On Fri, May 29, 2020 at 11:02 AM Benchao Li  wrote:

> Hi Guodong,
>
> After an offline discussion with Leonard. I think you get the right
> meaning of schema inference.
> But there are two problems here:
> 1. schema of the data is fixed, schema inference can save your effort to
> write the schema explicitly.
> 2. schema of the data is dynamic, in this case the schema inference cannot
> help. Because SQL is somewhat static language, which should know all the
> data types at compile stage.
>
> Maybe I've misunderstood your question at the very beginning. I thought
> your case is #2. If your case is #1, then schema inference is a good
> choice.
>
> Guodong Wang  于2020年5月28日周四 下午11:39写道:
>
>> Yes. Setting the value type as raw is one possible approach. And I would
>> like to vote for schema inference as well.
>>
>> Correct me if I am wrong, IMO schema inference means I can provide a
>> method in the table source to infer the data schema base on the runtime
>> computation. Just like some calcite adaptor does. Right?
>> For SQL table registration, I think that requiring the table source to
>> provide a static schema might be too strict. Let planner to infer the table
>> schema will be more flexible.
>>
>> Thank you for your suggestions.
>>
>> Guodong
>>
>>
>> On Thu, May 28, 2020 at 11:11 PM Benchao Li  wrote:
>>
>>> Hi Guodong,
>>>
>>> Does the RAW type meet your requirements? For example, you can specify
>>> map type, and the value for the map is the raw JsonNode
>>> parsed from Jackson.
>>> This is not supported yet, however IMO this could be supported.
>>>
>>> Guodong Wang  于2020年5月28日周四 下午9:43写道:
>>>
 Benchao,

 Thank you for your quick reply.

 As you mentioned, for current scenario, approach 2 should work for me.
 But it is a little bit annoying that I have to modify schema to add new
 field types when upstream app changes the json format or adds new fields.
 Otherwise, my user can not refer the field in their SQL.

 Per description in the jira, I think after implementing this, all the
 json values will be converted as strings.
 I am wondering if Flink SQL can/will support the flexible schema in the
 future, for example, register the table without defining specific schema
 for each field, to let user define a generic map or array for one field.
 but the value of map/array can be any object. Then, the type conversion
 cost might be saved.

 Guodong


 On Thu, May 28, 2020 at 7:43 PM Benchao Li  wrote:

> Hi Guodong,
>
> I think you almost get the answer,
> 1. map type, it's not working for current implementation. For example,
> use map, if the value if non-string json object, then
> `JsonNode.asText()` may not work as you wish.
> 2. list all fields you cares. IMO, this can fit your scenario. And you
> can set format.fail-on-missing-field = true, to allow setting non-existed
> fields to be null.
>
> For 1, I think maybe we can support it in the future, and I've created
> jira[1] to track this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-18002
>
> Guodong Wang  于2020年5月28日周四 下午6:32写道:
>
>> Hi !
>>
>> I want to use Flink SQL to process some json events. It is quite
>> challenging to define a schema for the Flink SQL table.
>>
>> My data source's format is some json like this
>> {
>> "top_level_key1": "some value",
>> "nested_object": {
>> "nested_key1": "abc",
>> "nested_key2": 123,
>> "nested_key3": ["element1", "element2", "element3"]
>> }
>> }
>>
>> The big challenges for me to define a schema for the data source are
>> 1. the keys in nested_object are flexible, there might be 3 unique
>> keys or more unique keys. If I enumerate all the keys in the schema, I
>> think my code is fragile, how to handle event which contains more
>> nested_keys in nested_object ?
>> 2. I know table api support Map type, but I am not sure if I can put
>> generic object as the value of the map. Because the values in 
>> nested_object
>> are of different types, some of them are int, some of them are string or
>> array.
>>
>> So. how to expose this kind of json data as table in Flink SQL
>> without enumerating all the nested_keys?
>>
>> Thanks.
>>
>> Guodong
>>
>
>
> --
>
> Best,
> Benchao Li
>

>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


Re: Tumbling windows - increasing checkpoint size over time

2020-05-29 Thread Wissman, Matt
Till,

I’ll have to calculate the theoretical upper bound for our window state. Our 
data distribution and rate has a predictable pattern but the data rate pattern 
didn’t match the checkpoint size growth.

[cid:image001.png@01D6359B.BE0FD540]

Here is a screenshot of the checkpoint size for the pipeline. The yellow 
section is when we had the checkpoint interval at 2 secs – the size seems to 
grow linearly and indefinitely. The blue, red and orange lines are in line with 
what I’d expect in terms of checkpoint size (100KB-2 MB).

The incoming stream data for the whole time period is consistent (follows the 
same pattern).

Changing the checkpoint interval seemed to fix the problem of the large and 
growing checkpoint size but I’m not sure why.

Thanks!

-Matt

From: Till Rohrmann 
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" 
Cc: Guowei Ma , "user@flink.apache.org" 

Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent on 
the number of keys (which is equivalent to the number of open windows) but also 
on how many events arrive for each open window because the windows store every 
window event in its state. Hence, it can be the case that you see different 
checkpoint sizes depending on the actual data distribution which can change 
over time. Have you checked whether the data distribution and rate is constant 
over time?

What is the expected number of keys, size of events and number of events per 
key per second? Based on this information one could try to estimate an upper 
state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt 
mailto:matt.wiss...@here.com>> wrote:

Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator aggregatedStream = dataStream

.keyBy(idKeySelector())

.window(TumblingProcessingTimeWindows.of(seconds(15)))

.apply(new Aggregator())

.name("Aggregator")

.setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma mailto:guowei@gmail.com>>
Cc: "Wissman, Matt" mailto:matt.wiss...@here.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does 
this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma 
mailto:guowei@gmail.com>> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt mailto:matt.wiss...@here.com>> 
于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental 
> checkpoint with RocksDB backed by s3. The number of objects in the window is 
> stable but overtime the checkpoint size grows seemingly unbounded. Within the 
> first few hours after bringing the Flink pipeline up, the checkpoint size is 
> around 100K but after a week of operation it grows to around 100MB. The 
> pipeline isn’t using any other Flink state besides the state that the window 
> uses. I think this has something to do with RocksDB’s compaction but 
> shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt


Getting Window information from coGroup functin

2020-05-29 Thread Sudan S
Hi,

I have a usecase where i want to join two streams. I am using coGroup for
this

KeyBuilder leftKey = new
KeyBuilder(jobConfiguration.getConnectStream().getLeftKey());
KeyBuilder rightKey = new
KeyBuilder(jobConfiguration.getConnectStream().getRightKey());
leftSource.coGroup(rightSource).where(leftKey).equalTo(rightKey)
  .window(...)
  .apply()
  .addSink(*...*);


For apply method i'm using RichCoGroupFunction. I am not able to find
access to Window object similar to

ProcessWindowFunction. I would be interested in extracting start time,
end time and key of the window

Plz suggest if there are any alternatives

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*


Re: Flink Iterator Functions

2020-05-29 Thread Roderick Vincent
Thank you both for your answers and yes, that does explain what's going
on.  I will have to refactor this code.

Thanks again for your help!
Rick

On Fri, May 29, 2020 at 2:29 PM Arvid Heise  wrote:

> Hi Roderick,
>
> adding to Robert's response: The easiest way is to get all needed
> information injected only in the driver from which you manually pass the
> config in a serializable form to your iterator. Configs could be for
> example a Java Map using serializable elements, such as Strings.
>
> If you need non-serializable objects, it is common practice to initialize
> them in Rich(ParallelSource)Function#open and pass all information needed
> to construct them in the constructor of the RichFunction and store them in
> serializable fields.
>
>
> On Fri, May 29, 2020 at 1:26 PM Robert Metzger 
> wrote:
>
>> Hi Roderick,
>>
>> Luckily there are no silly questions, just silly answers (so I have the
>> harder job here ;) )
>>
>> It seems that you are trying to read data from an Arango Database, right?
>> What is important to understand is that the "flink job" that you are
>> implementing in your main() method gets executed in the JVM submitting your
>> job to the Flink cluster. There, we are just constructing a graph of
>> dataflow operations, which will then be distributed to the cluster.
>> As part of this process, we are serializing all the user code from the
>> client, and sending it over to the cluster where it gets executed in a
>> distributed fashion.
>> Further reading:
>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html
>> I assume you are seeing "null" references because the objects you are
>> trying to send to the cluster are not serializable (but stored in a
>> transient field); or Spring is doing some Dependency Injection magic that
>> does not work in the remote Flink environment.
>>
>> tl;dr: What I would recommend is implement a custom SourceFunction that
>> reads from ArangoDB. The RichParallelSourceFunction will allow you to read
>> with parallelism > 1, and it has some lifecycle methods for opening and
>> closing the connection to Arango.
>> For the configuration passing, I would pass it a _serializable_ object
>> through the constructor of your custom source.
>>
>> Best,
>> Robert
>>
>>
>>
>> On Thu, May 28, 2020 at 6:40 PM Roderick Vincent 
>> wrote:
>>
>>> Hi,
>>>
>>> I am brand new to Apache Flink so please excuse any silly questions.  I
>>> have an Iterator function defined as below and adding it as a source to a
>>> Flink stream.  But when I try to pass configuration information to it (via
>>> a Spring env), what I notice is that one of the threads calls hasNext() and
>>> it is not the same object and the passed information is null.  Something is
>>> constructing it, but what is strange is that if I add a default constructor
>>> I do not see this being called by this thread with the null data so I am
>>> wondering what is going on.  Any ideas?  How do we pass configuration
>>> information to these functions?  Any help would be appreciated.
>>>
>>> Thanks,
>>> Rick
>>>
>>> @Public
>>> public class NodeSource extends
>>> FromIteratorFunction> {
>>>
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> public NodeSource(ArangoDBSource iterator) {
>>> super(iterator);
>>> }
>>>
>>> }
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: kerberos integration with flink

2020-05-29 Thread Nick Bendtner
Hi Guo,
Thanks again for your inputs. If I periodically renew the kerberos
cache using an external process(kinit) on all flink nodes in standalone
mode, will the cluster still be short lived or will the new ticket in the
cache be used and the cluster can live till the end of the new expiry ?

Best,
Nick.

On Sun, May 24, 2020 at 9:15 PM Yangze Guo  wrote:

> Yes, you can use kinit. But AFAIK, if you deploy Flink on Kubernetes
> or Mesos, Flink will not ship the ticket cache. If you deploy Flink on
> Yarn, Flink will acquire delegation tokens with your ticket cache and
> set tokens for job manager and task executor. As the document said,
> the main drawback is that the cluster is necessarily short-lived since
> the generated delegation tokens will expire (typically within a week).
>
> Best,
> Yangze Guo
>
> On Sat, May 23, 2020 at 1:23 AM Nick Bendtner  wrote:
> >
> > Hi Guo,
> > Even for HDFS I don't really need to set
> "security.kerberos.login.contexts" . As long as there is the right ticket
> in the ticket cache before starting the flink cluster it seems to work
> fine. I think even [4] from your reference seems to do the same thing. I
> have defined own ticket cache specifically for flink cluster by setting
> this environment variable. Before starting the cluster I create a ticket by
> using kinit.
> > This is how I make flink read this cache.
> > export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to
> find the location of ticket cache using this variable [1].
> > Do you see any problems in setting up hadoop security module this way ?
> And thanks a lot for your help.
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
> >
> > Best,
> > Nick
> >
> >
> >
> > On Thu, May 21, 2020 at 9:54 PM Yangze Guo  wrote:
> >>
> >> Hi, Nick,
> >>
> >> From my understanding, if you configure the
> >> "security.kerberos.login.keytab", Flink will add the
> >> AppConfigurationEntry of this keytab to all the apps defined in
> >> "security.kerberos.login.contexts". If you define
> >> "java.security.auth.login.config" at the same time, Flink will also
> >> keep the configuration in it. For more details, see [1][2].
> >>
> >> If you want to use this keytab to interact with HDFS, HBase and Yarn,
> >> you need to set "security.kerberos.login.contexts". See [3][4].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
> >> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
> >> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner 
> wrote:
> >> >
> >> > Hi guys,
> >> > Is there any difference in providing kerberos config to the flink jvm
> using this method in the flink configuration?
> >> >
> >> > env.java.opts:  -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
> -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
> >> >
> >> > Is there any difference in doing it this way vs providing it from
> security.kerberos.login.keytab .
> >> >
> >> > Best,
> >> >
> >> > Nick.
>


Re: Flink Dashboard UI Tasks hard limit

2020-05-29 Thread Vijay Balakrishnan
Thx, Xintong for the detailed explanation of memory fraction. I increased
the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.
PartitionConnectionException: Connection for partition
e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
reachable.
at org.apache.flink.runtime.io.network.partition.consumer.
RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.requestPartitions(SingleInputGate.java:237)
at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.setup(SingleInputGate.java:215)
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
InputGateWithMetrics.java:65)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(
Task.java:866)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.9.239.218:45544' has failed. This might
indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
PartitionRequestClientFactory.java:197)
at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.access$000(
PartitionRequestClientFactory.java:134)
at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory.createPartitionRequestClient(
PartitionRequestClientFactory.java:70)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
.createPartitionRequestClient(NettyConnectionManager.java:68)
at org.apache.flink.runtime.io.network.partition.consumer.
RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.
RemoteTransportException: Connecting to remote task manager + '/
10.9.239.218:45544' has failed. This might indicate that the remote task
manager has been lost.
at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.operationComplete(
PartitionRequestClientFactory.java:220)
at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.operationComplete(
PartitionRequestClientFactory.java:134)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListener0(DefaultPromise.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListeners(DefaultPromise.java:424)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.tryFailure(DefaultPromise.java:121)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
AbstractNioChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:
343)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKey(NioEventLoop.java:644)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeysOptimized(NioEventLoop.java:591)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeys(NioEventLoop.java:508)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:470)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
239.218:45544
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714
)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:
340)
... 6 more
Caused by: java.net.ConnectException: Connection timed out
... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song  wrote:

> Ah, I guess I had misunderstood what your mean.
>
> Below 18000 tasks, the Flink Job is able to start up.
>> Even though I increased the number of slots, it still works when 312
>> slots are being used.
>>
> When you say "it still works", 

Auto adjusting watermarks?

2020-05-29 Thread Theo Diefenthal
Hi there, 

Currently I have a job pipeline reading data from > 10 different kind of 
sources with each having different out-of-orderness characteristics. I am 
currently working on adjusting the watermarks for each source "properly". I 
work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the 
maxOutOfOrderness as low as possible while still keeping as much elements as 
possible in time as late arrivals trigger rather expensive computations. 

Now I thought, what I probably want is something like "I want to have about 
99.9% of my elements within the allowed lateness". Of course, I don't know the 
future events out-of-orderness, but I can predict it from the past, e.g. via a 
histogram with a 99.9% percentile, and adjust the maxOutOfOrdernesss 
dynamically. 

As Flink provides rather simplified Timestamp Assigner only but allows me to 
create my own ones with arbitrary complexity, I was wondering if somebody of 
you already did something like that, if that's a viable approach and I'm on a 
good track here? 

Best regards 
Theo 


Re: Age old stop vs cancel debate

2020-05-29 Thread Senthil Kumar
Hi Robert,

Would appreciate more insights please.

What we are noticing: When the flink job is issued a stop command, the 
Thread.sleep is not receiving the InterruptedException

It certainly receives the exception when the flink job is issued a cancel 
command.

In both cases (cancel and stop) the cancel() method is getting called (to set 
the isRunning variable to false)

However, given that the thread does not get interrupted in stop, we are not in 
a position to check the isRunning variable.


For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
interval which is in hours).
Sleeping for 5 minutes gives us a chance to check the isRunning variable.

Another approach would be to save the currentThread (Thread.currentThread()) 
before doing a Thread.sleep())
and manually calling Thread.interrupt() from the cancel function.

What is your recommendation?

Cheers
Kumar


From: Robert Metzger 
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar 
Cc: "user@flink.apache.org" 
Subject: Re: Age old stop vs cancel debate

Hi Kumar,

They way you've implemented your custom source sounds like the right way: 
Having a "running" flag checked by the run() method and changing it in cancel().
Also, it is good that you are properly handling the interrupt set by Flink 
(some people ignore InterruptedExceptions, which make it difficult (basically 
impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
We are on flink 1.9.0

I have a custom SourceFunction, where I rely on isRunning set to false via the 
cancel() function to exit out of the run loop.
My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.

When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
the isRunning to false.
In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.

We noticed that when we “stop” the flink job, the Thread.sleep does not get 
interrupted.
It also appears that SoruceFunction.cancel() is not getting called (which seems 
like the correct behavior for “stop”)

My question: what’s the “right” way to exit the run() loop when the flink job 
receives a stop command?

My understanding was that there was a Stoppable interface (which got removed in 
1.9.0)

Would appreciate any insights.

Cheers
Kumar


Sorting Bounded Streams

2020-05-29 Thread Satyam Shekhar
Hello,

I am using Flink as the streaming execution engine for building a
low-latency alerting application. The use case also requires ad-hoc
querying on batch data, which I also plan to serve using Flink to avoid the
complexity of maintaining two separate engines.

My current understanding is that Order By operator in Blink planner (on
DataStream) requires time attribute as the primary sort column. This is
quite limiting for ad-hoc querying. It seems I can use the DataSet API to
obtain a globally sorted output on an arbitrary column but that will force
me to use the older Flink planner.

Specifically, I am looking for guidance from the community on the following
questions -

   1. Is it possible to obtain a globally sorted output on DataStreams on
   an arbitrary sort column?
   2. What are the tradeoffs in using DataSet vs DataStream in performance,
   long term support, etc?
   3. Is there any other way to address this issue?

Regards,
Satyam


Re: Inconsistent checkpoint durations vs state size

2020-05-29 Thread Congxian Qiu
Hi
>From the given picture,
1. there were some checkpoint failed(but not because of timeout), could you
please check why these checkpoint would fail?
2. The checkpoint data size is the delta size for current checkpoint[1],
assume you using incremental checkpoint
3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can grow
to ~ 15G, my gut feeling is that the state update/insert ratio for your
program is very high? so that in one checkpoint you'll generate too much
sst files
4. from fig 2 seems you configurate
execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could
you please try to set it to 1 and have a try?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab

[2]
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints
Best,
Congxian


Slotterback, Chris  于2020年5月30日周六 上午7:43写道:

> Hi there,
>
>
>
> We are trying to upgrade a flink app from using FsStateBackend to
> RocksDBStateBackend to reduce overhead memory requirements. When enabling
> rocks, we are seeing a drop in used heap memory as it increments to disk,
> but checkpoint durations have become inconsistent. Our data source has a
> stable rate of reports coming in parallelly across partitions. The state
> size doesn’t seem to correlate with the checkpoint duration from what I can
> see in metrics. we have tried tmpfs and swap on SSDs with high iops, but
> can’t get a good handle on what’s causing smaller state to take longer to
> checkpoint. Our checkpoint location is hdfs, and works well in our
> non-rocks cluster.
>
>
>
> Is ~100x checkpoint duration expected when going from fs to rocks state
> backend, and is checkpoint duration supposed to vary this much with a
> consistent data source normally?
>
>
>
> Chris
>


Re: Auto adjusting watermarks?

2020-05-29 Thread Congxian Qiu
Hi

Could it be store a histogram data in custom
`BoundedOutOfOrdernessTimestampExtractor`
and adjust the `maxOutOfOrderness` according to the histogram data ok for
you case? (be careful, such histogram data would not snapshot out when
checkpointing)

Best,
Congxian


Theo Diefenthal  于2020年5月30日周六 上午4:35写道:

> Hi there,
>
> Currently I have a job pipeline reading data from > 10 different kind of
> sources with each having different out-of-orderness characteristics. I am
> currently working on adjusting the watermarks for each source "properly". I
> work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the
> maxOutOfOrderness as low as possible while still keeping as much elements
> as possible in time as late arrivals trigger rather expensive computations.
>
> Now I thought, what I probably want is something like "I want to have
> about 99.9% of my elements within the allowed lateness". Of course, I don't
> know the future events out-of-orderness, but I can predict it from the
> past, e.g. via a histogram with a 99.9% percentile, and adjust the
> maxOutOfOrdernesss dynamically.
>
> As Flink provides rather simplified Timestamp Assigner only but allows me
> to create my own ones with arbitrary complexity, I was wondering if
> somebody of you already did something like that, if that's a viable
> approach and I'm on a good track here?
>
> Best regards
> Theo
>


Re: Custom trigger to trigger for late events

2020-05-29 Thread Congxian Qiu
Hi Poornapragna

I'll try to answer your questions
1. you don't need to delete the timer manually(it will be deleted after
fired), but you can delete the timers manually if you want.
2. AFAIK, triggers  would not be snapshot out, but the timers will be
snapshot out
3. delete timer that was not register would not be problematic[1]
4. from my understand, the triggers you can use out-of-box can't achieve
your purpose.
5.window will be triggered when watermark exceed the end-of-window, but
currently we do not snapshot watermark when checkpointing[2], so if you're
using event-time, maybe it will be triggered a little late(dependent on the
watermark generate logic you wrote).

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
[2] https://issues.apache.org/jira/browse/FLINK-5601

Best,
Congxian


Poornapragna Ts  于2020年5月29日周五 上午12:38写道:

> Hi,
>
> I have a simple requirement where i want to have 10 second window with
> allow late events upto 1 hour.
>
> Existing TumblingEventTimeWindows with EventTimeTrigger will work for this.
>
> But the EventTimeTrigger, triggers for every incoming event after
> watermark has passed windows max time. I don't want this behaviour. Even
> for late events, I want to fire for every 10 seconds.
>
> For this, I thought of writing custom trigger, which will be similar to
> EventTimeTrigger, but instead of firing on every late event, it will
> register timer in onElement method for upcoming 10th second.
>
> With this setup, I have some questions.
>
> 1) When we register timers to context, is it compulsory to delete them on
> clear() call?
>
> 2) Will these triggers be stored in fault tolerance state? So that
> deleting is must.
>
> 3) Will it be problematic, if I call delete trigger for unregistered time(
> i.e., if I call delete for time T1 for which I had not registered before.)
>
> 4) Without implementing custom trigger, can it be achieved?
>
> 5) Lets say, late event came at 255 second so I will register a timer to
> trigger at 260(next 10th second). If a failure happens before that time,
> then restarting from the checkpoint, Will it trigger when watermark reaches
> 260? That means will the trigger be recovered when we restart from
> failure.
>
> Thanks,
> Poornapragna T S
>


Re: Sorting Bounded Streams

2020-05-29 Thread Benchao Li
Hi Satyam,

Are you using blink planner in streaming mode? AFAIK, blink planner in
batch mode can sort on arbitrary columns.

Satyam Shekhar  于2020年5月30日周六 上午6:19写道:

> Hello,
>
> I am using Flink as the streaming execution engine for building a
> low-latency alerting application. The use case also requires ad-hoc
> querying on batch data, which I also plan to serve using Flink to avoid the
> complexity of maintaining two separate engines.
>
> My current understanding is that Order By operator in Blink planner (on
> DataStream) requires time attribute as the primary sort column. This is
> quite limiting for ad-hoc querying. It seems I can use the DataSet API to
> obtain a globally sorted output on an arbitrary column but that will force
> me to use the older Flink planner.
>
> Specifically, I am looking for guidance from the community on the
> following questions -
>
>1. Is it possible to obtain a globally sorted output on DataStreams on
>an arbitrary sort column?
>2. What are the tradeoffs in using DataSet vs DataStream in
>performance, long term support, etc?
>3. Is there any other way to address this issue?
>
> Regards,
> Satyam
>


-- 

Best,
Benchao Li