Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-09 Thread meneldor
Unfortunately using row_ts doesn't help. Setting the kafka
topic cleanup.policy to compact is not a very good idea as it increases
cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is
successfully deduplicating CDC records into either kafka topic or S3
files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor  wrote:

> Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction
> mode suggestions. However, ive read somewhere in the archives that the
> append only stream is only possible if i extract "the first" record from
> the ranking only which in my case is the oldest record.
>
> Regards
>
> On Mon, Feb 8, 2021, 18:56 Timo Walther  wrote:
>
>> Hi,
>>
>> could the problem be that you are mixing OVER and TUMBLE window with
>> each other? The TUMBLE is correctly defined over time attribute `row_ts`
>> but the OVER window is defined using a regular column `upd_ts`. This
>> might be the case why the query is not append-only but updating.
>>
>> Maybe you can split the problem into sub queries and share the plan with
>> us using .explain()?
>>
>> The nulls in upsert-kafka should be gone once you enable compaction mode
>> in Kafka.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 08.02.21 10:53, Khachatryan Roman wrote:
>> > Hi,
>> >
>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>> > I'm pulling in Timo and Jark who might know better.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-19857
>> > 
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor > > > wrote:
>> >
>> > Any help please? Is there a way to use the "Last row" from a
>> > deduplication in an append-only stream or tell upsert-kafka to not
>> > produce *null* records in the sink?
>> >
>> > Thank you
>> >
>> > On Thu, Feb 4, 2021 at 1:22 PM meneldor > > > wrote:
>> >
>> > Hello,
>> > Flink 1.12.1(pyflink)
>> > I am deduplicating CDC records coming from Maxwell in a kafka
>> > topic.  Here is the SQL:
>> >
>> > CREATE TABLE stats_topic(
>> >`data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>> > BIGINT>,
>> >`ts` BIGINT,
>> >`xid` BIGINT ,
>> >row_ts AS
>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>> >WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>> > '15' SECOND
>> >  ) WITH (
>> >'connector' = 'kafka',
>> >'format' = 'json',
>> >'topic' = 'stats_topic',
>> >'properties.bootstrap.servers' =
>> 'localhost:9092',
>> >'properties.group.id
>> > ' = 'test_group'
>> >  )
>> >
>> > CREATE TABLE sink_table(
>> >`id` BIGINT,
>> >`account` INT,
>> >`upd_ts` BIGINT
>> >  ) WITH (
>> >'connector' = 'kafka',
>> >'format' = 'json',
>> >'topic' = 'sink_topic',
>> >'properties.bootstrap.servers' =
>> 'localhost:9092',
>> >'properties.group.id
>> > ' = 'test_group'
>> >  )
>> >
>> >
>> > INSERT INTO sink_table
>> > SELECT
>> > id,
>> > account,
>> > upd_ts
>> > FROM (
>> > SELECT
>> >   id,
>> >   account,
>> >   upd_ts,
>> >   ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>> > AS rownum
>> > FROM stats_topic
>> > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>> > MINUTE)
>> > )
>> > WHERE rownum=1
>> >
>> >
>> >   As there are a lot of CDC records for a single ID im using
>> > ROW_NUMBER() and produce them on a 20 minutes interval to the
>> > sink_topic. The problem is that flink doesnt allow me to use it
>> > in combination with with the kafka connector:
>> >
>> > pyflink.util.exceptions.TableException: Table sink
>> > 'default_catalog.default_database.sink_table' doesn't
>> > support consuming update and delete changes which is
>> > produced by node Rank(strategy=[UndefinedStrategy],
>> > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>> > partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>> $f2])
>> >
>> >
>> > If I use the*upsert-kafka* connector everything is fine but the

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-09 Thread joris.vanagtmaal
My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Any plans to make Flink configurable with pure data?

2021-02-09 Thread Yun Gao
Hi Pilgrim,

Currently table indeed could not using low level api like timer, would a 
mixture of sql & datastream
could satisfy the requirements? A job might be created via multiple sqls, and 
connected via datastream
operations.

Best,
 Yun

--
Sender:Pilgrim Beart
Date:2021/02/09 02:22:46
Recipient:
Theme:Any plans to make Flink configurable with pure data?

To a naive Flink newcomer (me) it's a little surprising that there is no pure 
"data" mechanism for specifying a Flink pipeline, only "code" interfaces. With 
the DataStream interface I can use Java, Scala or Python to set up a pipeline 
and then execute it - but that doesn't really seem to need a programming model, 
it seems like configuration, which could be done with data? OK, one does need 
occasionally to specify some custom code, e.g. a ProcessFunction, but for any 
given use-case, a relatively static library of such functions would seem fine.

My use case is that I have lots of customers, and I'm doing a similar job for 
each of them, so I'd prefer to have a library of common code (e.g. 
ProcessFunctions), and then specify each customer's specific requirements in a 
single config file.  To do that in Java, I'd have to do metaprogramming (to 
build various pieces of Java out of that config file).

Flink SQL seems to be the closest solution, but doesn't appear to support 
fundamental Flink concepts such as timers (?). Is there a plan to evolve Flink 
SQL to support timers? Timeouts is my specific need.

Thanks,
-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot 





Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-09 Thread Yun Gao

Hi,

Could you have a try to add the jar via python configuration explicitly? It 
might refer to [1].

Best,
 Yun

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program
 
--
Sender:joris.vanagtmaal
Date:2021/02/09 15:50:27
Recipient:
Theme:Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Sandeep khanzode
Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to connect() 
two streams to basically share the state of one stream in another. 

This is what I do:
private transient MapState state;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);

This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not all 
the possible keys that were created before and saved. Next time, I get a hit 
for another key, I will only see the other key and not the rest of previous 
keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state? I.e. 
can I access the state in some other class? If not, can I use the 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
 

 to do this? Is that the correct way to access the running state of one stream 
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more details 
if required.

Thanks,
Sandeep Ramesh Khanzode

Re: Native Kubernetes annotation parsing problem

2021-02-09 Thread Yang Wang
>
> > is there a way I can load my own ConfigMap and specify it via the
> dynamic argument?


Flink client will automatically ship the "flink-conf.yaml" and
"log4j-console.properties" under FLINK_HOME/conf directory on the client
side.
After the application is submitted successfully, you could find a ConfigMap
named "flink-config-".

If you want to mount your own ConfigMaps, it could not be supported now. We
will get it done via pod template[1]. It is still in progress and
have a PR[2] waiting for review.

[1]. https://issues.apache.org/jira/browse/FLINK-15656
[2]. https://github.com/apache/flink/pull/14629

Best,
Yang

Kevin Kwon  于2021年2月9日周二 下午8:55写道:

> thank you Yang
>
> but is there a way I can load my own ConfigMap and specify it via the
> dynamic argument?
>
> it seems that *kubernetes.hadoop.conf.config-map.name
> * exists but it's only for
> custom hadoop config
>
> I'd want to encapsulate all the my custom config (flink-conf.yaml,
> log4j-consle.yaml etc) and feed it into native K8S CLI
>
> On Tue, Feb 9, 2021 at 3:54 AM Yang Wang  wrote:
>
>> If you are setting the config options in flink-conf.yaml, then you could
>> directly add the following example.
>> *kubernetes.jobmanager.annotations:
>> iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
>> '*
>>
>> However, if you are using the dynamic properties in the CLI. Then the
>> configuration value should also
>> be wrapped with a double quote. It just because we need to escape the
>> value with single quote.
>>
>> *-Dkubernetes.jobmanager.annotations="iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
>> '"*
>>
>> It seems that IAM is not a common feature in Kubernetes. But from the
>> documentation of AWS[1],
>> I think it could be specified via service account[2]. Hope this helps.
>>
>> [1].
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#rbac
>>
>> Best,
>> Yang
>>
>> Kevin Kwon  于2021年2月8日周一 下午11:58写道:
>>
>>> I think it will be more generic question of how I inject IAM roles in
>>> Native Kubernetes pods
>>>
>>> I'm using Kubeiam and seems the namespace annotation doesn't work
>>>
>>> On Mon, Feb 8, 2021 at 2:30 PM Kevin Kwon  wrote:
>>>
 Hi team, I'm using Native Kubernetes annotation config


 *kubernetes.jobmanager.annotations*

 and I'm facing some problem with parsing.

 I use annotation


 *iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
 '*

 but seems no matter what I do, the colon is getting parsed for key,
 value. can anyone help?

>>>


Custom source with multiple, differently typed outputs

2021-02-09 Thread Roman Karlstetter
Hi everyone,

I want to connect to a proprietary data stream, which sends different types
of messages (maybe interpreted as a table), intertwined in the stream.
Every type of message (or table) can have a different schema, but for each
type this schema is known when connecting (i.e., at runtime) and does not
change.

I'm new to flink, so I have a few (stupid?) questions about this use case.
I have created a custom SourceFunction which produces Rows read from this
data stream. Then I use side outputs to split up this stream into multiple
DataStream[Row]. Is this the right way to do it?
What's the best way to add custom TypeInformation[Row] to each of those
streams, so that I can easily map this to a table which can be accessed via
the Table API? Or would I rather directly implement a ScanTableSource (I
played with this, the SourceFunction approach was easier though)? I believe
that Row is the best way to create this kind of schema at runtime, or is
there a better alternative?

Kind regards
Roman


Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-09 Thread joris.vanagtmaal
hi Yun,

thanks for the help!

if i direct the Kafka connector in the DDL to a local Kafka cluster, it
works fine. So i assume access to the JAR files should not be the issue. 

This is how i referred to the JAR files from Python:
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")

All the best, 
Joris



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


What is the difference between RuntimeContext state and ProcessWindowFunction.Context state when using a ProcessWindowFunction?

2021-02-09 Thread Marco Villalobos
Hi,

I am having a difficult time distinguishing the difference between
RuntimeContext state and global state when using a ProcessWindowFunction.

A ProcessWindowFunction has three access different kinds of state.
1. RuntimeContext state.
2. ProcessWindowFunction.Context global state
3. ProcessWindowFunction.Context window state.

It's clear to me that the window state belongs to a window, the lines
seemed a bit blurred between RuntimeContext state and
ProcessWindowFunction.Context global state.

Can somebody please elaborate on the differences, and when I should use
global state vs runtime context state?

Thank you.


Join two streams from Kafka

2021-02-09 Thread Shamit
Hello Flink Users,

I am newbie and have question on join of two streams (stream1 and stream2 )
from Kafka topic based on some key.

In my use case I need to join with stream2 data which might be year old and
more. 

Now if on stream1 the data gets arrived today and I need to join with
stream2 based on some key Please let me know how efficiently I can do. 

stream2 might have lots of records(in millions).

Please help.

Regards,
Shamit Jain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: "upsert-kafka" connector not working with Avro confluent schema registry

2021-02-09 Thread Shamit
Hello Flink Users,

Request you to please help. I am facing issue with "KafkaAvroDeserializer"
by using "upsert-kafka" connector.

Regards,
Shamit Jain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


ClassLoader leak when using s3a upload through DataSet.output

2021-02-09 Thread Vishal Santoshi
Hello folks,
 We see threads from
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49
outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader
Leak. Is this a known issue ?  Logically a close on the TransferManager
should close the ExecutorService ( and thus the threads ),

The code is fairly straightforward,

val job = new Job()val hadoopOutFormat = new
HadoopOutputFormat[Void, GenericRecord](  new
AvroParquetOutputFormat(),  job)
AvroParquetOutputFormat.setSchema(job, schema)
FileOutputFormat.setOutputPath(job, new
org.apache.hadoop.fs.Path(path))
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)
 ParquetOutputFormat.setEnableDictionary(job, true)  // do we need
this?

 and then an output

This is  using


scalaVersion := "2.12.12"flinkVersion = "1.11.2"hadoopVersion = "2.8.3"



Regards

Vishal


Re: State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Kezhu Wang
(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Sandeep khanzode
Hello,

Thanks a lot for the response. I will try to check Queryable-state for this 
purpose. 

Actually, my use case is that I want to share the state of one stream in two 
other streams. Right now, I can think of connecting this stream independently 
with each of the two other streams and manage the state twice, effectively 
duplicating it.

I was trying to check whether there are options where I can share this state 
with both the streams but save it only once.


> On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:
> 
> (a) It is by design. For keyed state, you can only access state for that key, 
> not others. If you want one value per key, ValueState fits more appropriate 
> that MapState.
> (b) state-processor-api aims to access/create/modify/upgrade offline 
> savepoint but not running state. Queryable state may meet your requirement, 
> but it is not actively developed for a while according to my observation and 
> still beta. 
> 
> Queryable state: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>  
> 
> 
> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
> ) wrote:
> 
>> Hello,
>> 
>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>> connect() two streams to basically share the state of one stream in another. 
>> 
>> This is what I do:
>> private transient MapState state;
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> MapStateDescriptor stateDescriptor =
>> new MapStateDescriptor<>(“abc-saved-state",
>> Types.POJO(KeyClass.class), 
>> Types.POJO(ValueClass.class));
>> state = getRuntimeContext().getMapState(stateDescriptor);
>> 
>> This works correctly.
>> 
>> 
>> I have two questions:
>> (a) Whenever I debug, I can only see the current key in the MapState, not 
>> all the possible keys that were created before and saved. Next time, I get a 
>> hit for another key, I will only see the other key and not the rest of 
>> previous keys. Is it by design or am I missing something?
>> 
>> (b) Can I somehow access this state beyond the class that holds the state? 
>> I.e. can I access the state in some other class? If not, can I use the 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>  
>> 
>>  to do this? Is that the correct way to access the running state of one 
>> stream elsewhere in the program without corrupting it?
>> 
>> 
>> Your response will be greatly appreciated. I will be happy to add more 
>> details if required.
>> 
>> Thanks,
>> Sandeep Ramesh Khanzode



Re: State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Kezhu Wang
Flink has broadcast state to broadcast one stream to other in case you are
not aware of it. It actually duplicates state.

1. Broadcast state:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Best,
Kezhu Wang

On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Thanks a lot for the response. I will try to check Queryable-state for this
purpose.

Actually, my use case is that I want to share the state of one stream in
two other streams. Right now, I can think of connecting this stream
independently with each of the two other streams and manage the state
twice, effectively duplicating it.

I was trying to check whether there are options where I can share this
state with both the streams but save it only once.


On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:

(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode