Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-15 Thread meneldor
ties.group.id' = 'test_group' ) INSERT INTO sink_table SELECT distinct id, account, upd_ts FROM stats_topic t, ( SELECT id, max(upd_ts) as maxTs, FROM stats_topic GROUP BY id, TUMBLE(row_ts, INTERVAL '20' MINUTE) ) s WHERE t.id = s.id AND t.upd_ts = s.maxTs T

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-11 Thread meneldor
e where you don't want > to use it, so maybe it helps to first explain your use case. Is stream > processing a good fit for you in the first place? > > On Tue, Feb 9, 2021 at 10:37 AM meneldor wrote: > >> Unfortunately using row_ts doesn't help. Setting the kafka >

Re: S3 parquet files as Sink in the Table SQL API

2021-02-11 Thread meneldor
flink.apache.org/downloads.html#additional-components > > On Wed, Feb 10, 2021 at 1:24 PM meneldor wrote: > >> Hello, >> I am using PyFlink and I want to write records from the table sql api as >> parquet files on AWS S3. I followed the documentations but it seem

S3 parquet files as Sink in the Table SQL API

2021-02-10 Thread meneldor
Hello, I am using PyFlink and I want to write records from the table sql api as parquet files on AWS S3. I followed the documentations but it seems that I'm missing some dependencies or/and configuration. Here is the SQL: > CREATE TABLE sink_table( > `id` VARCHAR, > `type` VARCHAR, >

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-09 Thread meneldor
nto either kafka topic or S3 files(CSV/parquet) ? Thanks! On Mon, Feb 8, 2021 at 7:13 PM meneldor wrote: > Thanks for the quick reply, Timo. Ill test with the row_ts and compaction > mode suggestions. However, ive read somewhere in the archives that the > append only stream is only po

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread meneldor
INK-19857 > > <https://issues.apache.org/jira/browse/FLINK-19857> > > > > Regards, > > Roman > > > > > > On Mon, Feb 8, 2021 at 9:14 AM meneldor > <mailto:menel...@gmail.com>> wrote: > > > > Any help please? I

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread meneldor
Any help please? Is there a way to use the "Last row" from a deduplication in an append-only stream or tell upsert-kafka to not produce *null* records in the sink? Thank you On Thu, Feb 4, 2021 at 1:22 PM meneldor wrote: > Hello, > Flink 1.12.1(pyflink) > I am deduplicating

Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-04 Thread meneldor
Hello, Flink 1.12.1(pyflink) I am deduplicating CDC records coming from Maxwell in a kafka topic. Here is the SQL: CREATE TABLE stats_topic( > `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>, > `ts` BIGINT, > `xid` BIGINT , > row_ts AS TO_TIMESTAMP(

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-19 Thread meneldor
Thank you Xingbo! Do you plan to implement CoProcess functions too? Right now i cant find a convenient method to connect and merge two streams? Regards On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang wrote: > Hi meneldor, > > 1. Yes. Although release 1.12.1 has not been officially rel

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread meneldor
rContext')? Can i access the value as in process_element() in the ctx for example? Thank you! On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang wrote: > Hi Shuiqiang, meneldor, > > 1. In fact, there is a problem with using Python `Named Row` as the return > value of user-defin

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread meneldor
nfo* as i did now. Regards On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen wrote: > Hi meneldor, > > Actually, the return type of the on_timer() must be the same as > process_element(). It seems that the yield value of process_element() is > missing the `timestamp` field. And the `output_type_info` has four field > names but with 5 field types. Could you align them? > > Best, > Shuiqiang >

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread meneldor
type": "update", "data": {"id": "id2", "tp": "B", "device_ts": 1610546861, "account": "279"}, "old": {}} 1) If I change the output type to STRING() and return a str from *process_element* ever

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-15 Thread meneldor
he.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown > Source) > Regards On Fri,

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
shell script from the docs(which uses pip). Regards On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen wrote: > Hi meneldor, > > The main cause of the error is that there is a bug in > `ctx.timer_service().current_watermark()`. At the beginning the stream, > when the first

Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Hello, What is the correct way to use Python dict's as ROW type in pyflink? Im trying this: output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], [Types.STRING(), Types.STRING(), Types.LONG() ]) class MyProcessFunction(KeyedProcessFunction): de

PyFlink Table API and connecting streams with SQL

2021-01-12 Thread meneldor
Hello, I'm a beginner in Flink and after trying to solve my problems for several days i decided to ask in the list. My goal is to connect two kafka topics which have a common ID field then produce the enriched object to a third topic based on a Tumble Window because the result has to be applied in