Re: Flink 1.11.1 - job manager exists with exit code 0

2020-07-28 Thread Robert Metzger
Thanks for reporting back. Glad you found the issue. This reminds me of a ticket about this topic some time ago :) https://issues.apache.org/jira/browse/FLINK-15156 On Wed, Jul 29, 2020 at 7:51 AM Alexey Trenikhun wrote: > Hi Robert, > I found the cause, it was due to bug in job itself - code af

Re: Customization of execution environment

2020-07-28 Thread Robert Metzger
Hi Flavio, I think the recommended approach is as follows: (then you don't need to create to environments) final Configuration conf = new Configuration(); conf.setLong(...) env = new LocalEnvironment(conf); I agree that in theory it would be nicer if the configuration returned was editable, but

Re: JobManager refusing connections when running many jobs in parallel?

2020-07-28 Thread Robert Metzger
Hi Andreas, Thanks for reaching out .. this should not happen ... Maybe your operating system has configured low limits for the number of concurrent connections / sockets. Maybe this thread is helpful: https://stackoverflow.com/questions/923990/why-do-i-get-connection-refused-after-1024-connection

Re: Flink 1.11.1 - job manager exists with exit code 0

2020-07-28 Thread Alexey Trenikhun
Hi Robert, I found the cause, it was due to bug in job itself - code after streamEnv.execute(...) called System.exit(0), it was un-noticeable before 1.11, but with 1.11, I guess in Application Mode, main is called from job manager directly, and System.exit(0) just exits whole JVM. Thank you and

Re: Flink 1.11.1 - job manager exists with exit code 0

2020-07-28 Thread Robert Metzger
Ah yeah, after sending the email, I saw that the exit code is in the subject line :) Can you post the entire log? What I find confusing is this log statement: "Stopped BLOB server at 0.0.0.0:6124". The BLOB server is usually only stopped during shutdown. For some reason, the JobManager is in the p

Re: Flink Deployment on Kubernetes session Cluster

2020-07-28 Thread Yang Wang
Hi Vinay Patil, You are right. Flink does not provide any isolation between different jobs in the same Flink session cluster. You could use Flink job cluster or application cluster(from 1.11) to get better isolation since a dedicated Flink cluster will be started for each job. Please refer to the

Re: Flink 1.11.1 - job manager exists with exit code 0

2020-07-28 Thread Robert Metzger
Hey Alexey, What is the exit code of the JobManager? Can you check if it has been killed by the OOM killer? You could also try to run the job with DEBUG log level, it might give us an additional indication why the JVM dies. What kind of job are you submitting? Is it complicated? On Sat, Jul 25, 2

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
Yes, The pr still needs to be improved. In most cases, there are more than one statement in the sql file, so -f option should support multiple statements. however, a related PR [1] has not completed yet. [1] https://github.com/apache/flink/pull/8738 Best, Godfrey Jun Zhang 于2020年7月29日周三 上午10:17

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread Jun Zhang
hi,godfrey: Thanks for your reply 1. I have seen the -u parameter, but my sql file may not only include 'insert into select ', but also SET, DDL, etc. 2. I may not have noticed this issue. I took a look at this issue. I think this issue may have some problems. For example, he finally called t

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
hi Jun, Currently, sql client has supported -u option, just like: ./bin/sql-client.sh embedded -u "insert_statement". There is already a JIRA [1] that wants to support -f option [1] https://issues.apache.org/jira/browse/FLINK-12828 Best, Godfrey Jun Zhang 于2020年7月29日周三 上午9:22写道: > I want to

[DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread Jun Zhang
I want to execute some flink sql batch jobs regularly, such as 'insert into select .', but I can't find a suitable method so far, so reference hive, I changed the source code and add a '--filename' parameter so that we can execute a sql file. like this: /home/flink/bin/sql-client.sh embed

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Vijayendra Yadav
Thank You Ravi for Quick help. One Last Question is this compression supported with Flink Version 1.10.0 ? Regards, Vijay On Tue, Jul 28, 2020 at 1:20 PM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vijayendra, > > As far as rowFormat is concerned, it doesn't support compre

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, As far as rowFormat is concerned, it doesn't support compression. Regards, Ravi On Tue 28 Jul, 2020, 22:08 Vijayendra Yadav, wrote: > Hi Ravi, > > Thanks for your response. But your example is for *forBulkForma**t*. How > about *forRowFormat* ?. > > Regards, > Vijay > > On Tue

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Vijayendra Yadav
Hi Ravi, Thanks for your response. But your example is for *forBulkForma**t*. How about *forRowFormat* ?. Regards, Vijay On Tue, Jul 28, 2020 at 11:28 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vijayendra, > > You could achieve row encoded with like this as well > >

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, You could achieve row encoded with like this as well codecName = "org.apache.hadoop.io.compress.GzipCodec" val streamingFileSink:StreamingFileSink[String] = StreamingFileSink.forBulkFormat(new Path(outputPath),CompressWriters.forExtractor(new DefaultExtractor[String]).withHadoopCo

Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Vijayendra Yadav
Hi Team, Is there a way to enable compression in StreamingFileSink API for Row-encoded formats ?. val sink: StreamingFileSink[String] = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) Regards, Vijay

Re: Unable to submit high parallelism job in cluster

2020-07-28 Thread Robert Metzger
Ah, the good old cloud-11 cluster at DIMA. I used that one as well in 2014 to test Flink there :) Now regarding your question: Is it possible that "Experiments.Experiment1(Experiments.java:42)" depends on the parallelism, and it is doing a lot more work than expected because of that? On Mon, Jul

Flink Deployment on Kubernetes session Cluster

2020-07-28 Thread Vinay Patil
Hi Team, We have a session cluster running on K8 where multiple stateless jobs are running fine. We observed that once we submit a stateful job (state size per checkpoint is 1GB) to the same session cluster other jobs are impacted because this job starts to utilise more memory and CPU and eventual

Re: Removing stream in a job having state

2020-07-28 Thread David Anderson
When you modify a job by removing a stateful operator, then during a restart when Flink tries to restore the state, it will complain that the snapshot contains state that can not be restored. The solution to this is to restart from the savepoint (or retained checkpoint), specifying that you want t

Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-28 Thread Dmytro Dragan
Hi Timo, I have switched to 1.11.1. Create function using "create function ..." fails with magic: Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at c

Re: How to stream CSV from S3?

2020-07-28 Thread John Smith
Hi, is there an example on how RowCsvInputFormat is initialized? On Tue, 28 Jul 2020 at 04:00, Jingsong Li wrote: > - `env.readCsvFile` is in DataSet, just read the full amount of data once > in batch mode. > - `streamEnv.readFile(RowCsvInputFormat, filePath, > FileProcessingMode.PROCESS_CONTINU

Removing stream in a job having state

2020-07-28 Thread ApoorvK
Hi All, I have multiple steam in flink job which contains different state such as ValueState or MapState. But I now I need to remove one stream having specific (UID,NAME) from the JOB. If I remove it I face issue while restoration stating operator does not exists. I was using BucketSink for sink

Re: PyFlink DDL UDTF join error

2020-07-28 Thread Wei Zhong
Hi Manas, It seems like a bug. You can try to replace the udtf sql call with such code as a workaround currently: t_env.register_table("tmp_view", t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, featureValue)")) This works for me. I’ll try to find out what caused

Re: Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Shuiqiang Chen
Hi Wojciech, After double checking, there should be a way to run PyFlink jobs on kubernetes in the job cluster. You can have a try: 1. The custom image has a corresponding pyflink installed. (it seems that you have already done this) 2. If you use third-party python dependencies in the Python UDF,

Re: Is there a way to use stream API with this program?

2020-07-28 Thread David Anderson
MAX_WATERMARK is emitted by ContinuousFileReaderOperator and StreamSource when they close. I think you'll find this just works without your having to do anything to make it happen. David On Tue, Jul 28, 2020 at 8:07 AM Piotr Nowojski wrote: > MAX_WATERMARK should be emitted automatically by th

PyFlink DDL UDTF join error

2020-07-28 Thread Manas Kale
Hi, Using pyFlink DDL, I am trying to: 1. Consume a Kafka JSON stream. This has messages with aggregate data, example: "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}" 2. I am splitting field "data" so that I can process its values

Re: Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Shuiqiang Chen
Hi Wojciech, Currently, we are not able to deploy a job cluster for PyFlink jobs on kubernetes, but it will be supported in release-1.12. Best, Shuiqiang

Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Wojciech Korczyński
Hello, I would like to use PyFlink jobs on Kubernetes in Job Cluster. I managed to do this in Cluster Session mode but deploying it as an independent Job Cluster for each job seems a better option for me. If I understand the documentation well [1], [2] I should create a custom docker image which

How to retain the column'name when convert a Table to DataStream

2020-07-28 Thread izual
Hi, Community: I met some field name errors when try to convert in Table and DataStream. flink version: 1.9.1 First, init a datastream and convert to table 'source', register a tablefunction named 'foo' val sourceStream = env.socketTextStream("127.0.0.1", 8010) .map(line => line.toInt) tab

Customization of execution environment

2020-07-28 Thread Flavio Pompermaier
Hi to all, migrating to Flink 1.11 I've tried to customize the exec env in this way: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment bte = BatchTableEnvironment.create(env); final Configuration conf = bte.getConfig().getConfiguration(); conf.setLong

Re: How to stream CSV from S3?

2020-07-28 Thread Jingsong Li
- `env.readCsvFile` is in DataSet, just read the full amount of data once in batch mode. - `streamEnv.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor directory, and continue reading in streaming mode. On Tue, Jul 28, 2020 at 3:54 PM John

Re: How to stream CSV from S3?

2020-07-28 Thread John Smith
Also this where I find the docs confusing in the "connectors" section. File system isn't under Data streaming but env.readCsvFile seems like it can do the trick? On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, wrote: > Bassically I want to "monitor" a bucket on S3 and every file that gets > create

Re: How to stream CSV from S3?

2020-07-28 Thread Jingsong Li
Yes, you can try `StreamExecutionEnvironment.readFile(RowCsvInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)`. (And wrap it to a table if you want) On Tue, Jul 28, 2020 at 3:46 PM John Smith wrote: > Bassically I want to "monitor" a bucket on S3 and every file tha

Re: How to stream CSV from S3?

2020-07-28 Thread John Smith
Bassically I want to "monitor" a bucket on S3 and every file that gets created in that bucket read it and stream it. If I understand correctly, I can just use env.readCsvFile() and config to continuously read a folder path? On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, wrote: > Hi John, > > D

Re: AllwindowStream and RichReduceFunction

2020-07-28 Thread Flavio Pompermaier
Ok thanks for the suggestion but I think I'll wait for another Flink version before migrating Datasets to Datastream I think... In my experience it is very helpful to have open/close on all operators. Best, Flavio On Tue, Jul 28, 2020 at 8:51 AM Aljoscha Krettek wrote: > I think that should wo