Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert, You can refer to https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py for the whole example. Best, Shuiqiang Robert Cullen 于2021年3月13日周六 上午4:01写道: > Shuiqiang, Can you include the import statements? thanks. > > On

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Same error. On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler wrote: > From the exception I would conclude that your core-site.xml file is not > being picked up. > > AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so > try setting HADOOP_CONF_DIR to the directory that the file

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
I validated it's still accepted by the connector but it's not in the documentation anymore. It doesn't seem to help in my case. Thanks, Sebastian From: Magri, Sebastian Sent: Friday, March 12, 2021 18:50 To: Timo Walther ; ro...@apache.org Cc: user Subject: Re

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Robert Cullen
Shuiqiang, Can you include the import statements? thanks. On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen wrote: > Hi Robert, > > Kafka Connector is provided in Python DataStream API since release-1.12.0. > And the documentation for it is lacking, we will make it up soon. > > The following code

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert, Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon. The following code shows how to apply KafkaConsumers and KafkaProducer: ``` env = StreamExecutionEnvironment.get_execution_environment() env.set_

Re: No saving data using rocksdb

2021-03-12 Thread Maminspapin
Roman, thank you for your attention. It looks like you are absolutely right. Thank you very much for helping. Before submitting a job I do next steps: 1. ./bin/start-cluster.sh 2. ./bin/taskmanager.sh start And in my code there is these line: env.setStateBackend(new RocksDBStateBackend("file:///

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
Hi Roman! Seems like that option is no longer available. Best Regards, Sebastian From: Roman Khachatryan Sent: Friday, March 12, 2021 16:59 To: Magri, Sebastian ; Timo Walther Cc: user Subject: Re: [Flink SQL] Leniency of JSON parsing Hi Sebastian, Did you t

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Yep, makes sense. On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan wrote: > > Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? > Window state is cleared (as well as the window itself), but global > state is not (unless you use TTL). > > [1] >

Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-12 Thread Roman Khachatryan
Hi Alexis, This looks like a bug, I've created a Jira ticket to address it [1]. Please feel free to provide any additional information. In particular, whether you are able to reproduce it in any of the subsequent releases. [1] https://issues.apache.org/jira/browse/FLINK-21752 Regards, Roman O

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread Chesnay Schepler
From the exception I would conclude that your core-site.xml file is not being picked up. AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so try setting HADOOP_CONF_DIR to the directory that the file resides in. On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote: If

Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Robert Cullen
I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example? env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) ds = env.from_collection( KAFKA_SOURCE ) ... -- Robert Cullen 240-475-4490

Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Aeden Jameson
Hi Matthias, Yes, all the task managers have the same hardware/memory configuration. Aeden On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl wrote: > > Hi Aeden, > just to be sure: All task managers have the same hardware/memory > configuration, haven't they? I'm not 100% sure whether this affects

Handling Bounded Sources with KafkaSource

2021-03-12 Thread Rion Williams
Hi all, I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration). I've notic

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
If anyone working have flink version 1.8.1 code reading S3 in Intellij in public GitHub please pass it on that will be huge help. Thanks Sri On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Which I already did in my pin still its not working. > > Tha

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Which I already did in my pin still its not working. Thanks Sri On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler wrote: > The concept of plugins does not exist in 1.8.1. As a result it should be > sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to > your project. > > On 3/1

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Roman Khachatryan
Hi Sebastian, Did you try setting debezium-json-map-null-key-mode to DROP [1]? I'm also pulling in Timo who might know better. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode Regards, Roman On Fri, Mar 12,

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
> Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? Window state is cleared (as well as the window itself), but global state is not (unless you use TTL). [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#sta

Re: DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Dawid Wysakowicz
Hi Alexis, As of now there is no such feature in the DataStream API. The Batch mode in DataStream API is a new feature and we would be interested to hear about the use cases people want to use it for to identify potential areas to improve. What you are suggesting generally make sense so I think it

DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Alexis Sarda-Espinosa
Hello, Regarding the new BATCH mode of the data stream API, I see that the documentation states that some operators will process all data for a given key before moving on to the next one. However, I don't see how Flink is supposed to know whether the input will provide all data for a given key

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread Chesnay Schepler
The concept of plugins does not exist in 1.8.1. As a result it should be sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to your project. On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: Let's close this issue guys please answer my questions. I am using Flink 1.

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Sush Bankapura
Hi Roman and Till, Thank you very much for your responses. With regards on the workload variation across the jobs, let me put it like this 1,. We have some jobs which are CPU intensive (and only operator state being persisted) and there are other jobs which are not so CPU intensive, but have I

[Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
I'm trying to extract data from a Debezium CDC source, in which one of the backing tables has an open schema nested JSON field like this: "objectives": { "items": [ { "id": 1, "label": "test 1" "size": 1000.0 }, { "id":

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi wrote: > I intend to augment every event

Re: No saving data using rocksdb

2021-03-12 Thread Maminspapin
Hey, Roman I use every time the same key. And I get the correct value in StateManager every time the processElement() method executes. But then I stop the job and submit it again. And first execution processElement() get me null in state store. The key wasn't change. So, I'am in confuse Thank

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Till Rohrmann
Hi Sushruth, if your jobs need significantly different configurations, then I would suggest to think about dedicated clusters per job. That way you can configure the cluster to work best for the respective job. Of course, running multiple clusters instead of a single one comes at the cost of more

Re: Filtering lines in parquet

2021-03-12 Thread Avi Levi
Cool, thanks! On Fri, Mar 12, 2021, 13:15 Arvid Heise wrote: > Hi Avi, > > thanks for clarifying. > > It seems like it's not possible to parse Parquet in Flink without knowing > the schema. What i'd do is to parse the metadata while setting up the job > and then pass it to the input format: > >

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
I intend to augment every event in a session with a unique ID. To keep the session lean, there is a PurgingTrigger on this aggregate that fires on a count of 1. >> (except that the number of keys can grow). Want to confirm that the keys are GCed ( along with state ) once the (windows close + l

Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-12 Thread Arvid Heise
Yes, please send me the full stack trace. You could also send it to me personally if you don't want to share it on the ML. I'm especially interested in the legacy source thread that holds the lock 0x00058e8c5070 if you only want to share an excerpt. On Fri, Mar 12, 2021 at 2:29 AM ChangZhuo C

Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Matthias Pohl
Hi Aeden, just to be sure: All task managers have the same hardware/memory configuration, haven't they? I'm not 100% sure whether this affects the slot selection in the end, but it looks like this parameter has also an influence on the slot matching strategy preferring slots with less utilization o

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Roman Khachatryan
Hi, Do I understand correctly that: 1. The workload varies across the jobs but stays the same for the same job 2. With a small number of slots per TM you are concerned about uneven resource utilization when running low- and high-intensive jobs on the same cluster simultaneously? If so, wouldn't r

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-12 Thread Arvid Heise
Hi Lei, yes, metaspace would run out eventually if you run too much in parallel. All finished jobs will close the classloaders and free the metaspace memory. For newer setups, we recommend creating an ad-hoc cluster for each Flink application for this and several other reasons. If you are already

Re: Filtering lines in parquet

2021-03-12 Thread Arvid Heise
Hi Avi, thanks for clarifying. It seems like it's not possible to parse Parquet in Flink without knowing the schema. What i'd do is to parse the metadata while setting up the job and then pass it to the input format: ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path,

Re: User metrics outside tasks

2021-03-12 Thread Arvid Heise
Hi Bob and Alexey, I double-checked and there is currently no way to achieve what you want. The good news is that the OOM part should be addressed through FLINK-20833 [1], maybe it's even suitable for other issues. A "workaround" (I don't think it's a workaround) for your issues would actually b

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
Hi Vishal, There is no leak in the code you provided (except that the number of keys can grow). But as you figured out the state is scoped to key, not to window+key. Could you explain what you are trying to achieve and why do you need to combine sliding windows with state scoped to window+key? R

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Are you starting the job from savepoint [1] when submitting it again? If not, it is considered as a new job and will not pick up the old state. [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint Regards, Roman On Fri, Mar 12, 2021 at 1

No saving data using rocksdb

2021-03-12 Thread Maminspapin
I have following piece of configuration in flink.yaml: Key Value high-availability zookeeper high-availability.storageDir file:///home/flink/flink-ha-data high-avai

Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
Thanks David! On Fri, Mar 12, 2021, 01:54 David Anderson wrote: > WatermarkStrategy.withIdleness works by marking idle streams as idle, so > that downstream operators will ignore those streams and allow the > watermarks to progress based only on the advancement of the watermarks of > the still a

Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread David Anderson
WatermarkStrategy.withIdleness works by marking idle streams as idle, so that downstream operators will ignore those streams and allow the watermarks to progress based only on the advancement of the watermarks of the still active streams. As you suspected, this mechanism does not provide for the wa

Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
I haven't been able to get WatermarkStrategy.withIdleness to work. Is it broken? None of my timers trigger when I'd expect idleness to take over. On Tue, Mar 2, 2021 at 11:15 PM Dan Hill wrote: > Hi. > > For local and tests development, I want to flush the events in my system > to make sure I'

Re: Gradually increasing checkpoint size

2021-03-12 Thread Dan Hill
I figured it out. I have some records with the same key and I was doing an IntervalJoin. One of the IntervalJoin implementations that I found looks like it the runtime increases exponentially when there are duplicate keys. I introduced a de-duping step and it works a lot faster. On Thu, Mar 11,

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Hi Yuri, The state that you access with getRuntimeContext().getState(...) is scoped to the key (so for every new key this state will be null). What key do you use? Regards, Roman On Fri, Mar 12, 2021 at 7:22 AM Maminspapin wrote: > > I have following piece of configuration in flink.yaml: > > Ke