Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-04 Thread
;> >> > >>>> >> > Hi Jiangang, >>>> >> > >>>> >> > Thanks for your suggestion. >>>> >> > >>>> >> > >>> The config can affect the memory usage. Will the related >>>

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread
Glad to see the suggestion. In our test, we found that small jobs with the changing configs can not improve the performance much just as your test. I have some suggestions: - The config can affect the memory usage. Will the related memory configs be changed? - Can you share the tpcds resu

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread
Good work for flink's batch processing! Remote shuffle service can resolve the container lost problem and reduce the running time for batch jobs once failover. We have investigated the component a lot and welcome Flink's native solution. We will try it and help improve it. Thanks, Liu Jiangang Yi

Re: How to execute multi SQL in one job

2021-10-25 Thread
> if (insertSqlBuffer.size > 0) { > insertSqlBuffer.foreach(item => { > println("insert sql: " + item) > statementSet.addInsertSql(item) > }) > val explain = statementSet.explain() > println(explain) > statementSet.execute() > } > > > ``` >

How to execute multi SQL in one job

2021-10-25 Thread
I have multi batch SQL commands separated by semicolon(;). The SQL commands need to be executed in order(In other cases, the SQL command may share sources or sinks). I want to execute them in one job. When I use tableEnv.executeSql(multiSQL), it will throw errors. How can I execute them in one job

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread
Thanks, Till. There are many reasons to reduce the heartbeat interval and timeout. But I am not sure what values are suitable. In our cases, the GC time and big job can be related factors. Since most flink jobs are pipeline and a total failover can cost some time, we should tolerate some stop-world

Re: Job Recovery Time on TM Lost

2021-07-12 Thread
Yes, time is main when detecting the TM's liveness. The count method will check by certain intervals. Gen Luo 于2021年7月9日周五 上午10:37写道: > @刘建刚 > Welcome to join the discuss and thanks for sharing your experience. > > I have a minor question. In my experience, network fail

Re: Job Recovery Time on TM Lost

2021-07-06 Thread
It is really helpful to find the lost container quickly. In our inner flink version, we optimize it by task's report and jobmaster's probe. When a task fails because of the connection, it reports to the jobmaster. The jobmaster will try to confirm the liveness of the unconnected taskmanager for cer

Re: [DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-07-01 Thread
Thanks for the discussion, JING ZHANG. I like the first proposal since it is simple and consistent with dataStream API. It is helpful to add more docs about the special late case in WindowAggregate. Also, I expect the more flexible emit strategies later. Jark Wu 于2021年7月2日周五 上午10:33写道: > Sorry,

Re: Add control mode for flink

2021-06-11 Thread
to this problem. > > Cheers, > Till > > On Fri, Jun 11, 2021 at 9:51 AM Jary Zhen <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=44392&i=0>> wrote: > >> big +1 for this feature, >> >>1. Reset kafka offset in certain cases. >>

Re: Add control mode for flink

2021-06-10 Thread
>>> > >>>> I also agree with the summarization by Xintong and Jing that control > >>>> flow seems to be > >>>> a common buidling block for many functionalities and dynamic > >>>> configuration framework > >>>> is a representative application that frequently requi

Re: Re: Add control mode for flink

2021-06-08 Thread
s case we need to broadcast an event through the iteration body >>> to detect if there are still >>> records reside in the iteration body). And regarding whether to >>> implement the dynamic configuration >>> framework, I also agree with Xintong that the consistency guarantee >>> would be a point to co

Re: Add control mode for flink

2021-06-07 Thread
figuration frameworks is for sure one possible > approach. The reason we are in favor of introducing the control flow is > that: > - It benefits not only this specific dynamic controlling feature, but > potentially other future features as well. > - AFAICS, it's non-trivial to make a

Re: Add control mode for flink

2021-06-06 Thread
ttps://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a > > On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 wrote: > >> Hi everyone, >> >> Flink jobs are always long-running. When the job is running, users

Add control mode for flink

2021-06-04 Thread
Hi everyone, Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following: 1. Change data processing’ logic, such as filter condition. 2. Send trigger events to make the progre

Re: Enable Multiple kafka Consumer sources for job

2021-05-28 Thread
For debug, you can just pull data from inputStream2. sudhansu069 [via Apache Flink User Mailing List archive.] < ml+s2336050n44010...@n4.nabble.com> 于2021年5月27日周四 下午11:22写道: > Hi Team , > > We are trying to build a data pipeline where we have to set up two > different kafka consumers for two diff

Re: Fail to cancel perJob for that deregisterApplication is not called

2021-05-21 Thread
ssage? > > WebMonitorEndpoint.closeAsync() already logs on it's own whether the > shutdown future was completed; meaning that it shouldn't have been > necessary to add a separate log message. > If you now only see the one you added, chances are that it was added at > th

Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread
We meet the same problem in our company. One stream always has data. The other stream is much smaller and can be idle. Once the smaller one becomes active, its data may be dropped in this case. 张静 [via Apache Flink User Mailing List archive.] < ml+s2336050n43873...@n4.nabble.com> 于2021年5月21日周五 上午1

Fail to cancel perJob for that deregisterApplication is not called

2021-03-23 Thread
I am using flink 1.10.0. My perJob can not be cancelled. From the log I find that webMonitorEndpoint.closeAsync() is completed but deregisterApplication is not called. The related code is as follows: public CompletableFuture deregisterApplicationAndClose( final ApplicationStatus appli

How to visit outer service in batch for sql

2020-08-26 Thread
For API, we can visit outer service in batch through countWindow, such as the following. We can visit outer service every 1000 records. If we visit outer service every record, it will be very slow for our job. source.keyBy(new KeySelector()) .countWindow(1000) .apply((WindowF

Re: Blink window and cube

2020-04-28 Thread
issing feature. We need to do > something in the optimizer to make this possible. > Could you please help to create a JIRA issue for this? > > Best, > Jark > > On Tue, 28 Apr 2020 at 14:55, 刘建刚 <[hidden email] > > wrote: > Hi, I find that blink planner suppo

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-27 Thread
ey, long value) { > datas.put(key, value); > } > > @Override > public String getValue(Map acc) { > return JSON.toJSONString(acc); > } > > > @Override > public TypeInformation getResultType() { > return Types.STRING; &g

Blink window and cube

2020-04-27 Thread
Hi, I find that blink planner supports CUBE. CUBE can be used together with a field but not window. For example, the following SQL is not supported: SELECT A, B, sum(C) FROM person GROUP BY cube(A, B), TUMBLE(curTimestamp, interval '1' minute) The following error is reported. Is th

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread
ep through the code, > this method should show you which array variable is being passed a null > argument when the array variable is not null able. > > > > > On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email] > > wrote: > I am using Roaring64NavigableMap

Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread
I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following: SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D, E, uv(bitmap(id)) as bmp FROM person GROUP BY TUMBLE(eventTi

Re: Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

2020-04-15 Thread
f WindowOperator from append to retract. > > 刘建刚 mailto:liujiangangp...@gmail.com>> > 于2020年4月16日周四 上午8:40写道: > No ,I do not use "fast-emit”. Another group by is combined with this SQL. I > use “tableConfig.setIdleStateRetentionTime()” to control idled state. If I > d

Re: Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

2020-04-15 Thread
n open issue about this[1]. > > [1] https://issues.apache.org/jira/browse/FLINK-16844 > <https://issues.apache.org/jira/browse/FLINK-16844> > 刘建刚 <[hidden email] > > 于2020年4月15日周三 下午7:07写道: > I am using two sequence windows in SQL as following: > > SELECT TUMBLE_

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

2020-04-15 Thread
I am using two sequence windows in SQL as following: SELECT TUMBLE_START(rowtime, interval '1' minute) AS windowStart, bitmapUnion(bmp) AS bmp FROM (SELECT TUMBLE_ROWTIME(eventTime, interval '1' minute) AS rowtime, bitmap(id) AS bmp FROM person GROUP BY TUMBLE(eventTi

Deploy flink-dist_2.11 fail in flink-10.0.0

2020-03-07 Thread
I am testing flink-1.10.0. When I deploy it to our repositories for snapshot, the build is failing in flink-dist_2.11. The error is as follows. Can anyone help me? [2020-03-07T10:03:19.619Z] [WARNING] Assembly file: /home/jenkins/slave/flink-1-10-0-release-2593/SRC/flink-dist/target/flink-1.

Re: Encountered error while consuming partitions

2020-02-13 Thread
--- > From:张光辉 > Send Time:2020 Feb. 12 (Wed.) 22:19 > To:Benchao Li > Cc:刘建刚 ; user > Subject:Re: Encountered error while consuming partitions > > Network can fail in many ways, sometimes pretty subtle (e.g. high ratio > packet loss). > >

Re: Fail to deploy flink on k8s in minikube

2020-01-12 Thread
on-cluster-on-kubernetes > > http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ > [3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator > [4] https://github.com/lyft/flinkk8soperator > [5] > https://ci.apache.org/projects/flink/flink-doc

Fail to deploy flink on k8s in minikube

2020-01-12 Thread
I fail to deploy flink on k8s referring to https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html When I run the command 'kubectl create -f jobmanager-deployment.yaml', following error is reported: [image: image.png] I am new to k8s. Our team want to

How to get kafka record's timestamp in job

2019-12-31 Thread
In kafka010, ConsumerRecord has a field named timestamp. It is encapsulated in Kafka010Fetcher. How can I get the timestamp when I write a flink job? Thank you very much.

Re: How to estimate the memory size of flink state

2019-11-20 Thread
aas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=sysukelee&uid=sysukelee%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22sysukelee%40gmail.com%22%5D> > On 11/20/2019 15:08,刘建刚 > <mailto:liujiangangp...@gmail.c

How to estimate the memory size of flink state

2019-11-19 Thread
We are using flink 1.6.2. For filesystem backend, we want to monitor the state size in memory. Once the state size becomes bigger, we can get noticed and take measures such as rescaling the job, or the job may fail because of the memory. We have tried to get the memory usage for the jvm

How to use two continuously window with EventTime in sql

2019-10-29 Thread
For one sql window, I can register table with event time and use time field in the tumble window. But if I want to use the result for the first window and use another window to process it, how can I do it? Thank you.

Uncertain result when using group by in stream sql

2019-09-13 Thread
I use flink stream sql to write a demo about "group by". The records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and sum the second element. Every time I run the program, the result is different. It seems that the records are out of order. Even sometimes record is los

How to implement grouping set in stream

2019-09-10 Thread
I want to implement grouping set in stream. I am new to flink sql. I want to find a example to teach me how to self define rule and implement corresponding operator. Can anyone give me any suggestion?

How to calculate one day's uv every minute by SQL

2019-09-03 Thread
We want to calculate one day's uv and show the result every minute . We have implemented this by java code: dataStream.keyBy(dimension) .incrementWindow(Time.days(1), Time.minutes(1)) .uv(userId) The input data is big. So we use Va

How to load udf jars in flink program

2019-08-15 Thread
We are using per-job to load udf jar when start job. Our jar file is in another path but not flink's lib path. In the main function, we use classLoader to load the jar file by the jar path. But it reports the following error when job starts running. If the jar file is in lib, everything

How to convert protobuf to Row

2019-05-06 Thread
I read byte data from Kafka. I use a class ProtoSchema implemented DeserializationSchema to get the actual java class. My question is that how can I transfer the byte data to Row just by ProtoSchema? What if the data structure is nested? Thank you.

One source is much slower than the other side when join history data

2019-02-26 Thread
When consuming history data in join operator with eventTime, reading data from one source is much slower than the other. As a result, the join operator will cache much data from the faster source in order to wait the slower source. The question is that how can I make the difference of c