because I am measuring one operator (all instances) and I want to
place its downstream operators in another machine in order to use
network channels.
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
On Fri, May 29, 2020 at 4:59 AM Weihua Hu wrote:
>
>
Hey Prasanna,
(Side note: there is not need to send this email to multiple mailing lists.
The user@ list is the right one)
Let me quickly go through your questions:
Is this usecase suited for flink ?
Based on the details you've provided: Yes
What you also need to consider are the hardware requ
Hi,
Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。
steven chen 于2020年5月29日周五 下午2:34写道:
> 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
> CREATE TABLE user_behavior (
>
> itemCode VARCHAR,
>
> ts BIGINT COMMENT '时间戳',
>
> t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'
oh I'm not using the HistoryServer; I just wrote it ;)
Are these archives all in the same location? So we're roughly looking at
5 GB of archives then?
That could indeed "just" be a resource problem. The HistoryServer
eagerly downloads all archives, and not on-demand.
The next step would be to
Thanks Robert for the reply.
On Fri 29 May, 2020, 12:31 Robert Metzger, wrote:
> Hey Prasanna,
>
> (Side note: there is not need to send this email to multiple mailing
> lists. The user@ list is the right one)
>
> Let me quickly go through your questions:
>
> Is this usecase suited for flink ?
>
Hi,
afaik, this feature was added because Hadoop MapReduce has it as well (
https://blog.cloudera.com/how-to-include-third-party-libraries-in-your-map-reduce-job/,
point 2.).
I don't remember having seen this anywhere in the wild. I believe it is a
good idea to simplify our codebase here.
If there
Using slotSharingGroup I can do some placement. however, I am using
two different slotSharingGroup for two different sources, even though
they are placed in the same TM. And this starts splitting the
downstream operators in different TM as well.
stream01 = source01.slot1 -> map01(4).slot1 -> flat
Hi Mason,
your understanding is correct.
On Thu, May 28, 2020 at 8:23 AM Chen, Mason wrote:
> I think I may have just answered my own question. There’s only one Kafka
> partition, so the maximum parallelism is one and it doesn’t really make
> sense to make another kafka consumer under the same g
Hi Kumar,
They way you've implemented your custom source sounds like the right way:
Having a "running" flag checked by the run() method and changing it in
cancel().
Also, it is good that you are properly handling the interrupt set by Flink
(some people ignore InterruptedExceptions, which make it d
Thanks Yun. Was thinking a similar way. I had one more question.
leftSource.connect(rightSource)
.process(new TagCoprocessFunction()) // In this function, tag the
left source with "0" and the right source with "1"
.window(xx)
.process(new XX())
In this when will the window be
Hi Felipe,
the file is just 80 MBs. It is probably cached in the linux page cache,
there should not be any disk IO involved.
So you are saying is that you can not further increase the throughput for
sleeps shorter than 2000 nanoseconds.
Have you tried running this w/o any Sleep / nano.time syscall
Hi Nikola,
you could implement a custom SourceFunction that implements this in some
way: If the files are small (< 10 MB) send each file as a record, then
process it in a subsequent flatMap operation. If the files are large, split
the work across the parallel sources and read them serially in the
I was losing something when because I was reading the line of the
GZIPInputStream outside of the busy while loop. I changed it and now I
am having more throughput. It is also a good idea to use VisualVM to
check if the throughput is correct and where I am losing more cycles.
while (reader.ready()
Hi Arnaud,
Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
open() and close() where you can handle the connection with Kudu's
partitioning
Hi Joe,
my gut feeling is that a flatMap() is what you are looking for.
Best,
Robert
On Thu, May 28, 2020 at 7:21 PM Joe Malt wrote:
> Hi,
>
> I'm working on a custom TimestampAssigner which will do different things
> depending on the value of the extracted timestamp. One of the actions I
> wa
Hi Roderick,
Luckily there are no silly questions, just silly answers (so I have the
harder job here ;) )
It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are
implementing in your main() method gets executed
Hi Sudan,
The first process is used to tag the elements from the left and right
windows, so next they could be merged into the same stream and then they could
be assigned to the same window. Then the next window(xxx).process(new
WindowProcessFunction) defines the window operator to process t
Hi Roderick,
adding to Robert's response: The easiest way is to get all needed
information injected only in the driver from which you manually pass the
config in a serializable form to your iterator. Configs could be for
example a Java Map using serializable elements, such as Strings.
If you need
Although flatMap() is a valid choice, it would be more idiomatic to use
filter(). I'd apply that even before running TimestampAssigner, except when
extracting the timestamp is rather complicated. But if it's a simple field,
then it feels better to first filter bad data, and then apply any kind of
l
Yes, these are all in the same directory, and we're at 67G right now. I'll try
with incrementally smaller directories and let you know what I find.
// ah
From: Chesnay Schepler
Sent: Friday, May 29, 2020 3:11 AM
To: Hailu, Andreas [Engineering] ;
user@flink.apache.org
Subject: Re: History Serv
Hello,
Yes, that would definitely do the trick, with an extra mapper after keyBy to
remove the tuple so that it stays seamless. It’s less hacky that what I was
thinking of, thanks!
However, is there any plan in a future release to have rich partitioners ? That
would avoid adding overhead and “
Benchao,
Thank you for your detailed explanation.
Schema Inference can solve my problem partially. For example, starting from
some time, all the json afterward will contain a new field. I think for
this case, schema inference will help.
but if I need to handle all the json events with different s
Till,
I’ll have to calculate the theoretical upper bound for our window state. Our
data distribution and rate has a predictable pattern but the data rate pattern
didn’t match the checkpoint size growth.
[cid:image001.png@01D6359B.BE0FD540]
Here is a screenshot of the checkpoint size for the pi
Hi,
I have a usecase where i want to join two streams. I am using coGroup for
this
KeyBuilder leftKey = new
KeyBuilder(jobConfiguration.getConnectStream().getLeftKey());
KeyBuilder rightKey = new
KeyBuilder(jobConfiguration.getConnectStream().getRightKey());
leftSource.coGroup(rightSource).where(
Thank you both for your answers and yes, that does explain what's going
on. I will have to refactor this code.
Thanks again for your help!
Rick
On Fri, May 29, 2020 at 2:29 PM Arvid Heise wrote:
> Hi Roderick,
>
> adding to Robert's response: The easiest way is to get all needed
> information
Hi Guo,
Thanks again for your inputs. If I periodically renew the kerberos
cache using an external process(kinit) on all flink nodes in standalone
mode, will the cluster still be short lived or will the new ticket in the
cache be used and the cluster can live till the end of the new expiry ?
Best,
Thx, Xintong for the detailed explanation of memory fraction. I increased
the mem fraction now.
As I increase the defaultParallelism, I keep getting this error:
org.apache.flink.runtime.io.network.partition.consumer.
PartitionConnectionException: Connection for partition
e312b2db4d1d0c65224664f62
Hi there,
Currently I have a job pipeline reading data from > 10 different kind of
sources with each having different out-of-orderness characteristics. I am
currently working on adjusting the watermarks for each source "properly". I
work with BoundedOutOfOrdernessTimestampExtractor and, as usu
Hi Robert,
Would appreciate more insights please.
What we are noticing: When the flink job is issued a stop command, the
Thread.sleep is not receiving the InterruptedException
It certainly receives the exception when the flink job is issued a cancel
command.
In both cases (cancel and stop) th
Hello,
I am using Flink as the streaming execution engine for building a
low-latency alerting application. The use case also requires ad-hoc
querying on batch data, which I also plan to serve using Flink to avoid the
complexity of maintaining two separate engines.
My current understanding is that
Hi
>From the given picture,
1. there were some checkpoint failed(but not because of timeout), could you
please check why these checkpoint would fail?
2. The checkpoint data size is the delta size for current checkpoint[1],
assume you using incremental checkpoint
3. In fig1 the checkpoint size is ~3
Hi
Could it be store a histogram data in custom
`BoundedOutOfOrdernessTimestampExtractor`
and adjust the `maxOutOfOrderness` according to the histogram data ok for
you case? (be careful, such histogram data would not snapshot out when
checkpointing)
Best,
Congxian
Theo Diefenthal 于2020年5月30日周六
Hi Poornapragna
I'll try to answer your questions
1. you don't need to delete the timer manually(it will be deleted after
fired), but you can delete the timers manually if you want.
2. AFAIK, triggers would not be snapshot out, but the timers will be
snapshot out
3. delete timer that was not regi
Hi Satyam,
Are you using blink planner in streaming mode? AFAIK, blink planner in
batch mode can sort on arbitrary columns.
Satyam Shekhar 于2020年5月30日周六 上午6:19写道:
> Hello,
>
> I am using Flink as the streaming execution engine for building a
> low-latency alerting application. The use case also
34 matches
Mail list logo