; > > >>>>
> > > > >>>> Please check out the release blog post for an overview of the
> > > > >> improvements for this bugfix release:
> > > > >>>>
> > > > >>
> > > >
> > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> > > > >>>>
> > > > >>>> The full release notes are available in Jira:
> > > > >>>>
> > > > >>
> > > >
> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353282
> > > > >>>>
> > > > >>>> We would like to thank all contributors of the Apache Flink
> > > community
> > > > >> who made this release possible!
> > > > >>>>
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Yun, Jing, Martijn and Lincoln
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
--
Best,
Benchao Li
eam join to also support
>>>>>>> Broadcast join with SQL)
>>>>>>>
>>>>>>>
>>>>>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
>>>>>>> parsed from the query:
>>>>>>>
>>>>>>>
>>>>>>> ```sql
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
>>>>>>> options:[gpla)
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> However, the Flink optimizer ignores the hint and still represents the
>>>>>>> join as a regular `hash` join in the `Exchange` step:
>>>>>>>
>>>>>>>
>>>>>>> ```sql
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> In Flink `StreamExecExchange`, the translation happens only via the
>>>>>>> `HASH` distribution type. unlike in the Flink `BatchExecExchange`, the
>>>>>>> translation can happen via a multitude of options (`HASH/BROADCAST`).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Quoting this Flink mailing list discussion for the FLIP that
>>>>>>> implemented the Broadcast join hint for batch sql:
>>>>>>>
>>>>>>>
>>>>>>> > But currently, only in batch the optimizer has different Join
>>>>>>> > strategies for Join and
>>>>>>>
>>>>>>> > there is no choice of join strategies in the stream. The join hints
>>>>>>> > listed in the current
>>>>>>>
>>>>>>> > flip should be ignored (maybe can be warned) in streaming mode. When
>>>>>>> > in the
>>>>>>>
>>>>>>> > future the stream mode has the choice of join strategies, I think
>>>>>>> > that's a good time > to discuss that the join hint can affect the
>>>>>>> > streaming SQL.
>>>>>>>
>>>>>>>
>>>>>>> What do you folks think about the possibility of a Broadcast join for
>>>>>>> Streaming Sql along with its corresponding Broadcast hint, that lets
>>>>>>> the user choose the kind of distribution they’d want with the dataset ?
>>>>>>>
>>>>>>> Happy to learn more about this and hopefully implement it, if it
>>>>>>> doesn’t sound like a terrible idea.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Prabhjot
>>>>>>>
>>>>>>>
>>>>>>>
--
Best,
Benchao Li
tions.
> > >>>>>>
> > >>>>>> The release is available for download at:
> > >>>>>> https://flink.apache.org/downloads.html
> > >>>>>>
> > >>>>>> Please check out the release blog post for an overview of the
> > >>>>> improvements
> > >>>>>> for this release:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >>>>>>
> > >>>>>> The full release notes are available in Jira:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352885
> > >>>>>>
> > >>>>>> We would like to thank all contributors of the Apache Flink
> > >> community
> > >>>> who
> > >>>>>> made this release possible!
> > >>>>>>
> > >>>>>> Best regards,
> > >>>>>> Konstantin, Qingsheng, Sergey, and Jing
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
--
Best,
Benchao Li
n could be intercepted, corrupted, lost, destroyed, delayed or
> incomplete, or contain viruses. Grab does not accept liability for any errors
> or omissions in this email that arise as a result of email transmission. All
> intellectual property rights in this email and any attachments shall remain
> vested in Grab, unless otherwise provided by law
--
Best,
Benchao Li
b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
> [2]:https://issues.apache.org/jira/browse/FLINK-26681
> [3]:https://issues.apache.org/jira/browse/FLINK-31413
> [4]:https://issues.apache.org/jira/browse/FLINK-30064
>
>
>
> Best regards,
> Yuxia
>
--
Best,
Benchao Li
; > 'protobuf.ignore-parse-errors' = 'true'
> > )
> > ;
> [INFO] Execute statement succeed.
>
> Flink SQL> select * from simple_test;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: com.example.SimpleTest
>
> Flink SQL>
>
> Any advice greatly appreciated, thank you.
>
> [1]
> https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
>
--
Best,
Benchao Li
to get the results.
>
>
> On Fri, Mar 19, 2021 at 5:24 PM Benchao Li wrote:
>
>> Hi Aneesha,
>>
>> For the interval join operator will output the data with NULL when it
>> confirms that
>> there will no data coming before the watermark.
>> And there i
#x27;
> );
>
> CREATE VIEW abandoned_visits_with_no_orders AS
> SELECT
> av.key1
> , av.email
> , av.abandoned_pids
> , FLOOR(av.ts TO MINUTE)AS visit_timestamp
> , FLOOR(o.ts TO MINUTE) AS order_timestamp
> , o.email AS order_email
> FROM abandoned_visits av
> FULL OUTER JOIN orders o
> ON av.key1 = o.key1
> AND av.email = o.email
> AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30'
> MINUTE
> -- WHERE
> --o.email IS NULL // Commented this
> out so as to get something in result
> ;
>
> *Result: *
> select * from abandoned_visits_with_no_orders;
>
> This gives a result the same as an inner join. It doesn't have rows with
> NULL order data.
> I would appreciate any help.
>
> Thanks,
> Aneesha
>
>
--
Best,
Benchao Li
INWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>
--
Best,
Benchao Li
nk/flink-docs-stable/dev/event_timestamps_watermarks.html#how-operators-process-watermarks
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>
--
Best,
Benchao Li
19 PM Danny Chan wrote:
>
>> Hi Dongwon ~
>>
>> Table from different catalog/db is supported, you need to specify the
>> full path of the source table:
>>
>> CREATE TABLE Orders_with_watermark (
>> *...*) WITH (
>> *...*)LIKE my_catalog.my_db.Orders;
>>
>>
>> Dongwon Kim 于2020年11月11日周三 下午2:53写道:
>>
>>> Hi,
>>>
>>> Is it disallowed to refer to a table from different databases or
>>> catalogs when someone creates a table?
>>>
>>> According to [1], there's no way to refer to tables belonging to
>>> different databases or catalogs.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>
--
Best,
Benchao Li
;>>>> Hi all,
> > > > > >>>>>
> > > > > >>>>> On behalf of the Flink PMC, I'm happy to announce that
> Dian Fu
> > is
> > > > now
> > > > > >>>>> part of the Apache Flink Project Management Committee
> (PMC).
> > > > > >>>>>
> > > > > >>>>> Dian Fu has been very active on PyFlink component,
> working on
> > > > various
> > > > > >>>>> important features, such as the Python UDF and Pandas
> > > integration,
> > > > > and
> > > > > >>>>> keeps checking and voting for our releases, and also has
> > > > successfully
> > > > > >>>>> produced two releases(1.9.3&1.11.1) as RM, currently
> working as
> > > RM
> > > > > to push
> > > > > >>>>> forward the release of Flink 1.12.
> > > > > >>>>>
> > > > > >>>>> Please join me in congratulating Dian Fu for becoming a
> Flink
> > PMC
> > > > > >>>>> Member!
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Jincheng(on behalf of the Flink PMC)
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > > >> --
> > > > > >> Best regards!
> > > > > >> Rui Li
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
>
--
Best,
Benchao Li
gt; 05:40:00").getTime(;
> DataStream ds2 = bsEnv.addSource(new SourceFunction() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> for(Row row : list2) {
> ctx.collect(row);
> Thread.sleep(1000);
> }
>
> }
>
> @Override
> public void cancel() {
>
> }
> });
> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
> ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING,
> Types.SQL_TIMESTAMP)));
> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time,
> rowtime.rowtime");
>
> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from
> order_info a left join pay b on a.order_id=b.order_id and b.rowtime between
> a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
> bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> ProcessFunction() {
> @Override
> public void processElement(Row value, Context ctx,
> Collector out) throws Exception {
> SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd
> HH:mm:ss.SSS");
> System.err.println("row:" + value + ",rowtime:" +
> value.getField(3) + ",watermark:" +
> sdf.format(ctx.timerService().currentWatermark()));
> }
> });
>
> bsTableEnv.execute("job");
> }
> }
>
>
--
Best,
Benchao Li
;
> And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP
> WITH TIME ZONE, ), the DDL does not work as well.
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
> line 5, column 29.
> Was expecting:
> "LOCAL" ...
>
> How can I make my date-time format as the timestamp type? I'm running
> Flink 1.11.1 and executing sql using FlinkSQL CLI.
>
> Thanks,
> Youngwoo
>
--
Best,
Benchao Li
er.convertCreateTable(SqlCreateTableConverter.java:76)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at userSink.TestSink.main(TestSink.java:29)
>
>
--
Best,
Benchao Li
.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346364
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made
>> this release possible!
>>
>> Cheers,
>> Piotr & Zhijiang
>>
>
>
> --
> Best regards!
> Rui Li
>
--
Best,
Benchao Li
hat's the watermark interval in your
job?
Mark Zitnik 于2020年7月5日周日 下午7:44写道:
> Hi Benchao
>
> The capacity is 100
> Parallelism is 8
> Rpc req is 20ms
>
> Thanks
>
>
> On Sun, 5 Jul 2020, 6:16 Benchao Li, wrote:
>
>> Hi Mark,
>>
>> Could
ot; +
> "'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
> ")";
> System.out.println(ddlSource);
> bsEnv.sqlUpdate(ddlSource);
>
> Table table = bsEnv.sqlQuery("select * from actionTable3");
> //Table table = bsEnv.sqlQuery("select * from actionTable2,
> LATERAL TABLE(explode3(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print();// the result is null
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>
--
Best,
Benchao Li
00 req/seq
>
> Flink job flow
> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write to
> Kafka
>
> Using Akkad grpc code written in scala
>
> Thanks
>
--
Best,
Benchao Li
.addSource(new SourceFunction() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> for(Row row : list2) {
> ctx.collect(row);
> Thread.sleep(1000);
> }
>
> }
>
> @Override
> public void cancel() {
>
> }
> });
> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
> ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING,
> Types.SQL_TIMESTAMP)));
> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time,
> rowtime.rowtime");
>
> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from
> order_info a left join pay b on a.order_id=b.order_id and b.rowtime between
> a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>
> bsTableEnv.toAppendStream(joinTable, Row.class).process(new
> ProcessFunction() {
> @Override
> public void processElement(Row value, Context ctx,
> Collector out) throws Exception {
> SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd
> HH:mm:ss.SSS");
> System.err.println("row:" + value + ",rowtime:" +
> value.getField(3) + ",watermark:" +
> sdf.format(ctx.timerService().currentWatermark()));
> }
> });
>
> bsTableEnv.execute("job");
> }
> }
>
>
--
Best,
Benchao Li
when the above update
> occurs, will the count value of the old partition be subtracted by 1, and
> then added to the new partition?
>
> Benchao Li 于2020年7月1日周三 下午1:11写道:
>
>> Hi lec ssmi,
>>
>> > If the type value of a record is updated in the database, the v
t; Source data can be regarded as bin log data.
> If the type value of a record is updated in the database, the values
> before and after the update will be divided into different partitions and
> handed over to different operators for calculation. Can Retraction happen
> correctly?
>
--
Best,
Benchao Li
warning.for each record I want to apply some conditions (if
> temperature>15, humidity>...) and then defne the status of the sensor, if
> it is in Nomarl status, or Alerte status ...
> And I am wondering if CEP Api can help me achieve that.
> Thank you guys for your time !
> Best,
> AISSA
>
--
Best,
Benchao Li
umbling window
> closes. Instead, with input watermarks propagated as-is ALL events in the
> resulting stream end up being late in relation to the current watermark...
> Doesn't this behavior ruin the composition, as downstream operators will be
> discarding all late data?
>
> I'd greatly appreciate if someone could shed light on this design decision.
>
> Thanks,
> Sergii
>
--
Best,
Benchao Li
Congratulations Yu!
Jark Wu 于2020年6月17日周三 上午10:36写道:
> Congratulations Yu! Well deserved!
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun wrote:
>
> > Congratulations Yu!
> >
> > Best,
> > Haibo
> >
> >
> > At 2020-06-17 09:15:02, "jincheng sun" wrote:
> > >Hi all,
> > >
> > >On b
API/SQL on top of BoundedStream. Then the DataSet/DataStream
API will be unified
as BoundedStream API. Hence the DataSet API is not the recommended approach
for the long term.
Satyam Shekhar 于2020年5月30日周六 下午3:34写道:
> Thanks for your reply, Benchao Li.
>
> While I can use the Blink planner
he
> following questions -
>
>1. Is it possible to obtain a globally sorted output on DataStreams on
>an arbitrary sort column?
>2. What are the tradeoffs in using DataSet vs DataStream in
>performance, long term support, etc?
>3. Is there any other way to address this issue?
>
> Regards,
> Satyam
>
--
Best,
Benchao Li
n',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = 'root',
> 'connector.write.flush.max-rows' = '1'
> );
> INSERT INTO pv_five_min
> SELECT
> itemCode As item_code,
> DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dt,
> DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dd,
> COUNT(*) AS pv
> FROM user_behavior
> GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;
>
>
>
>
>
>
>
--
Best,
Benchao Li
>
> Guodong
>
>
> On Thu, May 28, 2020 at 11:11 PM Benchao Li wrote:
>
>> Hi Guodong,
>>
>> Does the RAW type meet your requirements? For example, you can specify
>> map type, and the value for the map is the raw JsonNode
>> parsed from Jackson.
>>
ic schema
> for each field, to let user define a generic map or array for one field.
> but the value of map/array can be any object. Then, the type conversion
> cost might be saved.
>
> Guodong
>
>
> On Thu, May 28, 2020 at 7:43 PM Benchao Li wrote:
>
>> Hi Guodong,
>&
generic object as the value of the map. Because the values in nested_object
> are of different types, some of them are int, some of them are string or
> array.
>
> So. how to expose this kind of json data as table in Flink SQL without
> enumerating all the nested_keys?
>
> Thanks.
>
> Guodong
>
--
Best,
Benchao Li
>
> Can these two records guarantee the order?Must the old result be deleted
> first and then the new result inserted?
>
>
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
do some more work on these connectors.
>
> I'd like to know if the community has a plan to make a lower-level
> implementation for all connectors, also for table API and SQL?
>
> Thanks
> Ray
>
> Benchao Li 于2020年5月14日周四 下午5:49写道:
>
>> AFAIK, `FlinkKafkaConsumer
The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Yu
>>>
>>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
port rate limitation?
> How to limit the rate when the external database connected by the sink
> operator has throughput limitation.
> Instead of passive back pressure after reaching the limit of the external
> database, we want to limit rate actively.
>
> Thanks
> Ray
>
--
gt; will it expire with the TTL time configured by TableConfig?
> Benchao Li 于 2020年5月12日周二 20:27写道:
>
>> The state will be cleaned with watermark movement.
>>
>> lec ssmi 于2020年5月12日周二 下午5:55写道:
>>
>>> Hi:
>>> If I join two streams in SQL, the time ran
ll it expire with the TTL time
> configured by TableConfig? Or both?
>
> Best
> Lec Ssmi
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
ftlId=1&name=shadowell&uid=shadowell%40126.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22shadowell%40126.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>
--
Benchao Li
School of Electr
Hi,
AFAIK, there is no way to do this for now.
This needs the operators running UDFs to support async IO.
lec ssmi 于2020年5月7日周四 下午3:23写道:
> Hi:
> Is there any way to implements async IO in UDFs (scalar function,
> table function, aggregate function)?
>
--
Benchao
alent to not being able to do multiple joins in one job.
>
> Benchao Li 于2020年5月5日周二 下午9:23写道:
>
>> You cannot select more than one time attribute, the planner will give you
>> an Exception if you did that.
>>
>>
>> lec ssmi 于2020年5月5日周二 下午8:34写道:
>>
&g
You cannot select more than one time attribute, the planner will give you
an Exception if you did that.
lec ssmi 于2020年5月5日周二 下午8:34写道:
> As you said, if I select all the time attribute fields from
> both , which will be the final one?
>
> Benchao Li 于 2020年5月
ensures that the watermark will be aligned with both of them.
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>>>> shicheng31...@gmail.com>:
>>>>
>>>>> Thanks for your replay.
>&
disappear after the jon , and
> pure sql cannot declare the time attribute field again . So, to make is
> success, I need to insert the last result of join to kafka ,and consume
> it and join it with another stream table in another flink job . This seems
> troublesome.
> Any good i
these options.
>> > >
>> > > However, I think this FLIP violates our code style guidelines because
>> > >
>> > > 'format' = 'json',
>> > > 'format.fail-on-missing-field' = 'false'
>> > >
ing
> mechanism and restored
> So is it right to implement my own COUNT/SUM UDF?
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>
>
>
>
>
>
> At 2020-04-27 17:32:14, "Benchao Li" wro
1548753499000.193293.1548753540...@atlassian.jira%3E
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
t;> who made this release possible!
>>>> Also great thanks to @Jincheng for helping finalize this release.
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>
>
> --
>
> Konstantin Knauf | Head of Product
>
> +49 160 913945
;
>
> On Thu, Apr 23, 2020 at 2:27 PM Benchao Li wrote:
>
>> Hi Jingsong,
>>
>> Thanks for your quick response. I've CC'ed Chongchen who understands the
>> scenario much better.
>>
>>
>> Jingsong Li 于2020年4月23日周四 下午12:34写道:
>>
>>&
FYI, the question has been answered in user-zh ML.
lec ssmi 于2020年4月23日周四 下午2:57写道:
> Hi:
> Is there an aggregation operation or window operation, the result is
> with retract characteristics?
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking
Jingsong Lee
>
> On Thu, Apr 23, 2020 at 12:15 PM Benchao Li wrote:
>
>> Hi,
>>
>> Currently the sort operator in blink planner is global, which has
>> bottleneck if we sort a lot of data.
>>
>> And I found 'table.exec.range-sort.enabled' config in
didn't plan to implement this?
2. If this is in the plan, then which version may we expect it to be ready?
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
hat to take effect ?
> How does it work in case there is persistent state (like a window
> operator) involved ?
>
> Any design documents on how partition mapping works would be very helpful.
>
> Thanks
> Suraj
>
--
Benchao Li
School of Electronics Engineering and C
求方肯定不能接受的:
> 20200417,90
> 20200417,86
> 20200417,130
> 20200417,86
> 20200417,131
>
> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>
> Sent from my iPhone
>
> On Apr 18, 2020, at 10:08, Benchao Li wrote:
>
>
&g
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive
> data(true,0,131,20200417)
>
>
> 我们使用的是1.7.2, 测试作业的并行度为1。
> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>
>
> --
> dixingxin...@163.com
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
found the WatermarkGenerator passed
> in WatermarkAssignerOperator is the interface WatermarkGenerator. And
> BoundedOutOfOrderWatermarkGenerator is the only implementation class of
> WatermarkGenerator. By the way , the interval is based processing time .
>
> Benchao Li 于
2020年4月17日周五 下午4:50写道:
> Maybe you are all right. I was more confused .
> As the cwiki said, flink could use BoundedOutOfOrderTimestamps ,
> [image: image.png]
>
> but I have heard about WatermarkAssignerOperator from Blink developers.
>
> Benchao Li 于2020年4月17日周五 下午4:33写道
> There seems to be no explanation on the official website.
>
> Thanks.
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
; will work.
>
> But if i insist to use format.json-schema, the CREATE TABLE must be
> writtten as:
>
> `id` DECIMAL(38,18),
> `timestamp` DECIMAL(38,18)
>
> ------
> wangl...@geekplus.com.cn
>
>
> *From:* Benchao Li
>
: {
>"id": {"type": "integer"},
>"timestamp": {"type": "number"}
> }
> }'
> );
>
> Then select * from user_log;
>
> org.apache.flink.table.api.ValidationException: Type INT
SQL.
> I use “tableConfig.setIdleStateRetentionTime()” to control idled state. If
> I delete “tableConfig.setIdleStateRetentionTime()” in blink, the error
> disappears. How can I resolve it? Thank you.
>
> 2020年4月15日 下午8:11,Benchao Li [via Apache Flink User Mailing List archive.]
>
e.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287)
> at
> com.kuaishou.flink.hdp.demos.statistics.sql.GroupingSetsUV.main(GroupingSetsUV.java:79)
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
ral Table Function
> from the SQL API? Any example of DDL?
>
> Thanks
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
e's still enough time for community to further enhance blink
> planner for this purpose.
>
> Let me know what you think, especially if you want to report or complain
> about something. Thanks
> in advance.
>
> Best,
> Kurt
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
TO sink1 SELECT ...
> INSERT INTO sink2 SELECT ...
>
> Is there anyway to make sure the order of these two insert-into sqls.
>
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
gt;
> I want to change the parallelism of GroupWindowAggregate,but i
> can't.
>
> Best wishes
> forideal
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
maybe the problem is related with maven jar
> classpath. But not sure about that.
>
> If you can submit the job by a shade jar through cluster , could you share
> the project pom settings and sample test code ?
>
>
>
>
> At 2020-03-02 20:36:06, "Benchao Li" wr
*,
> mng_name *varchar *,
> mng_id *varchar *,
> is_tg *varchar *,
> cust_name *varchar *,
> cust_type *varchar*,
> avg_tot_aset_y365 *double *,
> avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'jdbc'*,
>
#x27; *= *''*,
> *'connector.index' *= *'realtime_product_sell_007118'*,
> *'connector.document-type' *= *'_doc'*,
> *'update-mode' *= *'upsert'*,
> *'connector.key-delimiter' *= *'$'*,
> *'con
quot;-MM-dd"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> } *else *{
> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(f
kpointing and use udf, the job can submit
> successfully too.
>
> I dive a lot with this exception. Maybe it is related with some
> classloader issue. Hope for your suggestion.
>
>
>
>
>
> 在 2020-03-01 17:54:03,"Benchao Li" 写道:
>
> Hi fulin,
>
> It see
ava:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCom
erty('user.name'),
>> 'Build-Jdk': System.getProperty('java.version')
>> }
>> }
>>
>> shadowJar {
>> configurations = [project.configurations.flinkShadowJar]
>> }
>>
>>
>>
>&
("0.11")
>> .topic("test-topic2")
>> )
>> .withFormat(new Csv())
>> .withSchema(new Schema().field("f0", DataTypes.STRING()))
>> .inAppendMode()
>>
slateToPlan$1.apply(StreamPlanner.scala:59)
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>
>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>> Test.main(Test.java:40)
>>>
>>> The error seems to be on the line
>>>
>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>
>>> and I am not sure why it is not able to serialize?
>>>
>>> Thanks!
>>>
>>
>>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
ectly although the consumers will be in the same
> consumer-group?
>
> Thanks for any help.
> regards.
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
very
> active in both dev
> and user mailing lists, helped discussing designs and answering users
> questions, also
> helped to verify various releases.
>
> Congratulations Jingsong!
>
> Best, Kurt
> (on behalf of the Flink PMC)
>
>
>
--
Benchao Li
School of Ele
t; .withKeyDeserializer(KeyDeserializer.class)
> .withValueDeserializerAndCoder(getDeserializer(encoding), new
> JsonNodeCoder<>())
> .withConsumerConfigUpdates(consumerConfig)
> .withoutMetadata();
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
5:20]
> FilesystemSize Used Avail Use% Mounted on
> /dev/mapper/vg00-rootlv00
> 2.1G 777M 1.2G 41% /
> tmpfs 2.1G 753M 1.4G 37% /dev/shm
>
> On Mon, Feb 17, 2020 at 7:13 PM Benchao Li wrote:
>
>> Hi Richard,
>>
&g
at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Unfortunately the partition listed doe
5
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Cheers,
> Gary & Yu
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
h to drop
> > both of them, I would strongly suggest dropping at least 2.x connector
> > and update the 5.x line to a working es client module.
> >
> > What do you think? Should we drop both versions? Drop only the 2.x
> > connector? Or keep them both?
> >
> >
s good
>>>>
>>>> At time t2 Send a message "flink" to test-topic4
>>>>
>>>> (true,null,null,null,flink) // Looks good
>>>>
>>>> At time t3 Send a message "flink" to test-topic3
>>>>
>>>&g
rce?) to invoke and declare the Source as
> being non duplicateable.
>
> I have tried a lot of options (uid(), operation chaining restrictions,
> twiddling the transformation, forceNonParallel(), etc.), but can't find
> quite how to do that! My SourceFunction is a RichSourceFunctio
Dian Fu and welcome on board!
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> Shuo Cheng 于2020年1月16日周四 下午6:22写道:
>> >>
>> >>> Congratulations! Dian Fu
>> >>>
>> >>> > Xingbo Wei Zhong
er join in my code,I saw the flink
> document, the flink sql inner join will keep both sides of the join input
> in Flink’s state forever.
> As result , the hdfs files size are so big , is there any way to clear the
> sql join state?
> Thanks to your reply.
>
--
Benchao Li
School
Sorry for the missing link.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
Benchao Li 于2020年1月12日周日 下午9:06写道:
> Hi kant,
>
> In my understanding, "treat batch as a special case of streaming" j
that continuously get populated however I cannot specify the
> time constraint on the state which is why I would want to write to external
> database such that I can treat it as an infinite store and then do join as
> the data come in continuously.
>
> Thanks!
>
>
>
>
of
> features and optimizations in Blink and if they aren't merged into flink
> 1.9 I am not sure on which one to use? is there any plan towards merging it?
>
> Thanks!
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-1565
> FROM
>>>
>>> (
>>>
>>> SELECT
>>>
>>> 'ZL_005' as aggId,
>>>
>>> 'ZL_UV_PER_MINUTE' as pageId,
>>>
>>> deviceId,
>>>
>>> ts2Date(recvTime) as statkey
>>>
>>> from
>>>
>>> kafka_zl_etrack_event_stream
>>>
>>> )
>>>
>>> GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>
>>> ) as t1
>>>
>>> group by aggId, pageId, statkey
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best
>>
>>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
L CLI is a very important entrypoint for trying out new
>>>>>> feautures and
>>>>>> >> prototyping for users.
>>>>>> >> In order to give new planner more exposures, I would like to
>>>>>> suggest to set
>>>>>> >> default planner
>>>>>> >> for SQL Client to Blink planner before 1.10 release.
>>>>>> >>
>>>>>> >> The approach is just changing the default SQL CLI yaml
>>>>>> configuration[5]. In
>>>>>> >> this way, the existing
>>>>>> >> environment is still compatible and unaffected.
>>>>>> >>
>>>>>> >> Changing the default planner for the whole Table API & SQL is
>>>>>> another topic
>>>>>> >> and is out of scope of this discussion.
>>>>>> >>
>>>>>> >> What do you think?
>>>>>> >>
>>>>>> >> Best,
>>>>>> >> Jark
>>>>>> >>
>>>>>> >> [1]:
>>>>>> >>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>>>> >> [2]:
>>>>>> >>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>>>>>> >> [3]:
>>>>>> >>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>>>> >> [4]:
>>>>>> >>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>>>>>> >> [5]:
>>>>>> >>
>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
Hi Jary,
You need to send a email to *user-subscr...@flink.apache.org
* to subscribe, not user@flink.apache.org.
Jary Zhen 于2020年1月2日周四 下午4:53写道:
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gma
calcite. Therefore, why does Flink import Antlr at the same time? For what?
>
> Could anyone give me some hints? Thanks in advance.
>
> Best wishes,
> Trista
>
> *Juan Pan (Trista) *
>
> Senior DBA & PPMC of Apache ShardingSphere(Incubating)
> E-mail: panj...@apa
return Types.ROW(Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
> }
> }
>
>
>
> polaris...@gmail.com
>
>
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
nk planner have different behavior.
>> Could you create an issue in https://issues.apache.org/jira/browse/FLINK
>> ?
>>
>>
>>
>> On Dec 15, 2019, at 16:17, Benchao Li wrote:
>>
>> hi all,
>>
>> We are using 1.9.0 blink planner, and find fli
here is my question:
Obviously, this is a bug in blink planner, and we should fix that. But we
have two ways to fix this:
1, make behavior of cast behave like before, which produces `null`,
2, change the behavior of blink planner to align with old planner, which
produces `NumberFormatException`.
>> > > Regards,
> >>> > >
> >>> > > Chesnay
> >>> > >
> >>> > >
> >>> >
> >>> > --
> >>> >
> >>> > Konstantin Knauf | Solutions Architect
> >>> >
> >>> > +49 160 91394525
> >>> >
> >>> >
> >>> > Follow us @VervericaData Ververica <https://www.ververica.com/>
> >>> >
> >>> >
> >>> > --
> >>> >
> >>> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> >>> > Conference
> >>> >
> >>> > Stream Processing | Event Driven | Real Time
> >>> >
> >>> > --
> >>> >
> >>> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >>> >
> >>> > --
> >>> > Ververica GmbH
> >>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji
> >>> > (Tony) Cheng
> >>> >
> >>>
> >>
> >
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
AvailabilityServices and much more. Zili
> Chen also helped the community by PR reviews, reporting Flink issues,
> answering user mails and being very active on the dev mailing list.
>
> Congratulations Zili Chen!
>
> Best, Till
> (on behalf of the Flink PMC)
>
--
Benchao
osedException:
> Channel became inactive.
> ... 37 more
> The error is consistent. It always happens after I let Flink run for a
> while, usually more than 1 month). Why am I not able to submit job to flink
> after a while? What happened here?
> Regards,
>
> Son
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
98 matches
Mail list logo