Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-29 Thread Felipe Gutierrez
because I am measuring one operator (all instances) and I want to place its downstream operators in another machine in order to use network channels. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, May 29, 2020 at 4:59 AM Weihua Hu wrote: > >

Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Robert Metzger
Hey Prasanna, (Side note: there is not need to send this email to multiple mailing lists. The user@ list is the right one) Let me quickly go through your questions: Is this usecase suited for flink ? Based on the details you've provided: Yes What you also need to consider are the hardware requ

Re: 关于flink sql 滚动窗口无法输出结果集合

2020-05-29 Thread Benchao Li
Hi, Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。 steven chen 于2020年5月29日周五 下午2:34写道: > 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答 > CREATE TABLE user_behavior ( > > itemCode VARCHAR, > > ts BIGINT COMMENT '时间戳', > > t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Chesnay Schepler
oh I'm not using the HistoryServer; I just wrote it ;) Are these archives all in the same location? So we're roughly looking at 5 GB of archives then? That could indeed "just" be a resource problem. The HistoryServer eagerly downloads all archives, and not on-demand. The next step would be to

Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Prasanna kumar
Thanks Robert for the reply. On Fri 29 May, 2020, 12:31 Robert Metzger, wrote: > Hey Prasanna, > > (Side note: there is not need to send this email to multiple mailing > lists. The user@ list is the right one) > > Let me quickly go through your questions: > > Is this usecase suited for flink ? >

Re: [DISCUSS] Remove dependency shipping through nested jars during job submission.

2020-05-29 Thread Robert Metzger
Hi, afaik, this feature was added because Hadoop MapReduce has it as well ( https://blog.cloudera.com/how-to-include-third-party-libraries-in-your-map-reduce-job/, point 2.). I don't remember having seen this anywhere in the wild. I believe it is a good idea to simplify our codebase here. If there

Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-29 Thread Felipe Gutierrez
Using slotSharingGroup I can do some placement. however, I am using two different slotSharingGroup for two different sources, even though they are placed in the same TM. And this starts splitting the downstream operators in different TM as well. stream01 = source01.slot1 -> map01(4).slot1 -> flat

Re: Flink Kafka Connector Source Parallelism

2020-05-29 Thread Robert Metzger
Hi Mason, your understanding is correct. On Thu, May 28, 2020 at 8:23 AM Chen, Mason wrote: > I think I may have just answered my own question. There’s only one Kafka > partition, so the maximum parallelism is one and it doesn’t really make > sense to make another kafka consumer under the same g

Re: Age old stop vs cancel debate

2020-05-29 Thread Robert Metzger
Hi Kumar, They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it d

Re: Question on stream joins

2020-05-29 Thread Sudan S
Thanks Yun. Was thinking a similar way. I had one more question. leftSource.connect(rightSource) .process(new TagCoprocessFunction()) // In this function, tag the left source with "0" and the right source with "1" .window(xx) .process(new XX()) In this when will the window be

Re: Executing a controllable benchmark in Flink

2020-05-29 Thread Robert Metzger
Hi Felipe, the file is just 80 MBs. It is probably cached in the linux page cache, there should not be any disk IO involved. So you are saying is that you can not further increase the throughput for sleeps shorter than 2000 nanoseconds. Have you tried running this w/o any Sleep / nano.time syscall

Re: Streaming multiple csv files

2020-05-29 Thread Robert Metzger
Hi Nikola, you could implement a custom SourceFunction that implements this in some way: If the files are small (< 10 MB) send each file as a record, then process it in a subsequent flatMap operation. If the files are large, split the work across the parallel sources and read them serially in the

Re: Executing a controllable benchmark in Flink

2020-05-29 Thread Felipe Gutierrez
I was losing something when because I was reading the line of the GZIPInputStream outside of the busy while loop. I changed it and now I am having more throughput. It is also a good idea to use VisualVM to check if the throughput is correct and where I am losing more cycles. while (reader.ready()

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-29 Thread Robert Metzger
Hi Arnaud, Maybe I don't fully understand the constraints, but what about stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink()); The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with open() and close() where you can handle the connection with Kudu's partitioning

Re: Dropping messages based on timestamp.

2020-05-29 Thread Robert Metzger
Hi Joe, my gut feeling is that a flatMap() is what you are looking for. Best, Robert On Thu, May 28, 2020 at 7:21 PM Joe Malt wrote: > Hi, > > I'm working on a custom TimestampAssigner which will do different things > depending on the value of the extracted timestamp. One of the actions I > wa

Re: Flink Iterator Functions

2020-05-29 Thread Robert Metzger
Hi Roderick, Luckily there are no silly questions, just silly answers (so I have the harder job here ;) ) It seems that you are trying to read data from an Arango Database, right? What is important to understand is that the "flink job" that you are implementing in your main() method gets executed

Re: Re: Question on stream joins

2020-05-29 Thread Yun Gao
Hi Sudan, The first process is used to tag the elements from the left and right windows, so next they could be merged into the same stream and then they could be assigned to the same window. Then the next window(xxx).process(new WindowProcessFunction) defines the window operator to process t

Re: Flink Iterator Functions

2020-05-29 Thread Arvid Heise
Hi Roderick, adding to Robert's response: The easiest way is to get all needed information injected only in the driver from which you manually pass the config in a serializable form to your iterator. Configs could be for example a Java Map using serializable elements, such as Strings. If you need

Re: Dropping messages based on timestamp.

2020-05-29 Thread Arvid Heise
Although flatMap() is a valid choice, it would be more idiomatic to use filter(). I'd apply that even before running TimestampAssigner, except when extracting the timestamp is rather complicated. But if it's a simple field, then it feels better to first filter bad data, and then apply any kind of l

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Hailu, Andreas
Yes, these are all in the same directory, and we're at 67G right now. I'll try with incrementally smaller directories and let you know what I find. // ah From: Chesnay Schepler Sent: Friday, May 29, 2020 3:11 AM To: Hailu, Andreas [Engineering] ; user@flink.apache.org Subject: Re: History Serv

RE: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-29 Thread LINZ, Arnaud
Hello, Yes, that would definitely do the trick, with an extra mapper after keyBy to remove the tuple so that it stays seamless. It’s less hacky that what I was thinking of, thanks! However, is there any plan in a future release to have rich partitioners ? That would avoid adding overhead and “

Re: How to create schema for flexible json data in Flink SQL

2020-05-29 Thread Guodong Wang
Benchao, Thank you for your detailed explanation. Schema Inference can solve my problem partially. For example, starting from some time, all the json afterward will contain a new field. I think for this case, schema inference will help. but if I need to handle all the json events with different s

Re: Tumbling windows - increasing checkpoint size over time

2020-05-29 Thread Wissman, Matt
Till, I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth. [cid:image001.png@01D6359B.BE0FD540] Here is a screenshot of the checkpoint size for the pi

Getting Window information from coGroup functin

2020-05-29 Thread Sudan S
Hi, I have a usecase where i want to join two streams. I am using coGroup for this KeyBuilder leftKey = new KeyBuilder(jobConfiguration.getConnectStream().getLeftKey()); KeyBuilder rightKey = new KeyBuilder(jobConfiguration.getConnectStream().getRightKey()); leftSource.coGroup(rightSource).where(

Re: Flink Iterator Functions

2020-05-29 Thread Roderick Vincent
Thank you both for your answers and yes, that does explain what's going on. I will have to refactor this code. Thanks again for your help! Rick On Fri, May 29, 2020 at 2:29 PM Arvid Heise wrote: > Hi Roderick, > > adding to Robert's response: The easiest way is to get all needed > information

Re: kerberos integration with flink

2020-05-29 Thread Nick Bendtner
Hi Guo, Thanks again for your inputs. If I periodically renew the kerberos cache using an external process(kinit) on all flink nodes in standalone mode, will the cluster still be short lived or will the new ticket in the cache be used and the cluster can live till the end of the new expiry ? Best,

Re: Flink Dashboard UI Tasks hard limit

2020-05-29 Thread Vijay Balakrishnan
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now. As I increase the defaultParallelism, I keep getting this error: org.apache.flink.runtime.io.network.partition.consumer. PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f62

Auto adjusting watermarks?

2020-05-29 Thread Theo Diefenthal
Hi there, Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as usu

Re: Age old stop vs cancel debate

2020-05-29 Thread Senthil Kumar
Hi Robert, Would appreciate more insights please. What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException It certainly receives the exception when the flink job is issued a cancel command. In both cases (cancel and stop) th

Sorting Bounded Streams

2020-05-29 Thread Satyam Shekhar
Hello, I am using Flink as the streaming execution engine for building a low-latency alerting application. The use case also requires ad-hoc querying on batch data, which I also plan to serve using Flink to avoid the complexity of maintaining two separate engines. My current understanding is that

Re: Inconsistent checkpoint durations vs state size

2020-05-29 Thread Congxian Qiu
Hi >From the given picture, 1. there were some checkpoint failed(but not because of timeout), could you please check why these checkpoint would fail? 2. The checkpoint data size is the delta size for current checkpoint[1], assume you using incremental checkpoint 3. In fig1 the checkpoint size is ~3

Re: Auto adjusting watermarks?

2020-05-29 Thread Congxian Qiu
Hi Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing) Best, Congxian Theo Diefenthal 于2020年5月30日周六

Re: Custom trigger to trigger for late events

2020-05-29 Thread Congxian Qiu
Hi Poornapragna I'll try to answer your questions 1. you don't need to delete the timer manually(it will be deleted after fired), but you can delete the timers manually if you want. 2. AFAIK, triggers would not be snapshot out, but the timers will be snapshot out 3. delete timer that was not regi

Re: Sorting Bounded Streams

2020-05-29 Thread Benchao Li
Hi Satyam, Are you using blink planner in streaming mode? AFAIK, blink planner in batch mode can sort on arbitrary columns. Satyam Shekhar 于2020年5月30日周六 上午6:19写道: > Hello, > > I am using Flink as the streaming execution engine for building a > low-latency alerting application. The use case also