[ANNOUNCE] Apache Flink 1.11.1 released

2020-07-21 Thread Dian Fu
The Apache Flink community is very happy to announce the release of Apache Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streamin

Re: Handle idle kafka source in Flink 1.9

2020-07-21 Thread bat man
Hi Team, Can someone share their experiences handling this. Thanks. On Tue, Jul 21, 2020 at 11:30 AM bat man wrote: > Hello, > > I have a pipeline which consumes data from a Kafka source. Since, the > partitions are partitioned by device_id in case a group of devices is down > some partitions

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
Hi David thanks for the confirmation, good to know that. Best, Congxian David Magalhães 于2020年7月21日周二 下午11:42写道: > Hi Congxian, the leftover files were on the local disk of the TaskManager. > But looking better into the issue, I think the issue was the "logs". The > sink, in this case, was

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay, I'm not entirely sure of the semantics between ThreadPoolSize and MaxConnections since they are all KPL configurations (this specific question would probably be better directed to AWS), but my guess would be that the number of concurrent requests to the KPL backend is capped by MaxConnec

Re: FlinkKinesisProducer blocking ?

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay, ThreadPoolSize is for per Kinesis producer, which there is one for each parallel subtask. If you are constantly hitting the 1MB per second per shard quota, then the records will be buffered by the FlinkKinesisProducer. During this process, backpressure is not applied if you have not conf

Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Jark Wu
Hi Kelly, As a simple workaround, You can remove the watermark definition in `KafkaStream`, in this way, the stream-stream join will not complain "Rowtime attributes" exception. Best, Jark On Wed, 22 Jul 2020 at 03:13, Kelly Smith wrote: > Thanks Leonard and Danny, > > > > This makes a lot of

Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Jingsong Li
Hi Kelly, There are issues for tracking: - Filesystem support single file reading: https://issues.apache.org/jira/browse/FLINK-17398 - Filesystem support LookupJoin: https://issues.apache.org/jira/browse/FLINK-17397 Best, Jingsong On Wed, Jul 22, 2020 at 3:13 AM Kelly Smith wrote: > Thanks Leo

Strange stack when job encounter back pressure

2020-07-21 Thread aitozi
Hi, I notice the job encounter a strange case: the upstream operator is underpressure, all task in back pressure sample shows HIGH, but when i jstack the downstream task, I only see the stack below: java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Me

Re: Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay, Your assumption is correct that the discovery interval does not affect the interval of fetching records. As a side note, you can actually disable shard discovery, by setting the value to -1. The FlinkKinesisProducer would then only call ListShards once at job startup. Cheers, Gordon O

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
You can not do that in Flink yet, Flink partition column must be mapped to columns from the table schema which you can select from. The syntax is a little different from Hive’s => create table table_name (   idint,   dtDontQuery   string,   name  string ) partitio

Re: Parquet batch table sink in Flink 1.11

2020-07-21 Thread Jingsong Li
In table/SQL, I think we don't need a source/sink for `AvroParquetOutputFormat`, because the data structure is always Row or RowData, should not be a avro object. Best, Jingsong On Tue, Jul 21, 2020 at 8:09 PM Flavio Pompermaier wrote: > This is what I actually do but I was hoping to be able t

MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-21 Thread Vijay Balakrishnan
Hi, Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data stream(KDS). Getting following errors: 1. Throttling at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) org.apache.flink.streaming.runtime.tasks.OperatorCha

Re: Flink SQL - Join Lookup Table

2020-07-21 Thread Kelly Smith
Thanks Leonard and Danny, This makes a lot of sense. My hope here is to only use SQL without any specialized Java/Scala code, so it seems it may not be possible to use either of these methods yet. I’ll open an issue for the LookupTableSource implementation, and look into the workaround you sug

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread David Magalhães
Hi Congxian, the leftover files were on the local disk of the TaskManager. But looking better into the issue, I think the issue was the "logs". The sink, in this case, was writing one line into the logger (I was writing 8 GB in total), and that makes more sense. So nothing wrong with the Flink/Save

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Thanks Jark for the update. However, getting back to the original question, can I use a nested column directly for CREATE TABLE PARTITIONED BY like below without declaring an additional column? CREATE TABLE output > PARTITIONED BY (`location.transId`) > WITH ( > 'connector' = 'filesystem', >

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Jark Wu
Hi Dongwon, I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource. I created an issue [1] to track this problem. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-18665 On Tue, 21 Jul 2020 at 17:31, Dongwon Kim wrot

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-21 Thread Till Rohrmann
Thanks a lot for being our release managers Zhijiang and Piotr and thanks to everyone who helped making this release possible. Cheers, Till On Wed, Jul 8, 2020 at 10:59 AM godfrey he wrote: > Congratulations! > > Thanks Zhijiang and Piotr for the great work, and thanks everyone for > their cont

Re: Parquet batch table sink in Flink 1.11

2020-07-21 Thread Flavio Pompermaier
This is what I actually do but I was hoping to be able to get rid of the HadoopOutputForma and be able to use a more comfortable Source/Sink implementation. On Tue, Jul 21, 2020 at 12:38 PM Jingsong Li wrote: > Hi Flavio, > > AvroOutputFormat only supports writing Avro files. > I think you can

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Thanks for your help anyway, Jingsong & Rui. I read the jira description, and I’m +1 to check the lazy initiation first. It looks like the file creation is skipped or it doesn’t block the writing, and I’ve seen a bucket was writing to a file that was not supposed to exist, e.g. its parent dir w

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread Congxian Qiu
Hi David Sorry for the late reply, seems I missed your previous email. I'm not sure I fully understand here, do the leftover files on s3 filesystem or the local disk of Taskmanager?. Currently, the savepoint data will directly write to output stream of the underlying file(here is s3 file), yo

Re: Unsubscribe

2020-07-21 Thread Yangze Guo
Hi Harshvardhan, You need to send an email to user-unsubscr...@flink.apache.org to unsubscribe. Best, Yangze Guo On Tue, Jul 21, 2020 at 7:12 PM Harshvardhan Agrawal wrote: > > -- > Regards, > Harshvardhan

Unsubscribe

2020-07-21 Thread Harshvardhan Agrawal
-- Regards, Harshvardhan

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Jingsong Li
Hi, Sorry for this. This work around only works in Hive 2+. We can only wait for 1.11.2. Best, Jingsong On Tue, Jul 21, 2020 at 6:15 PM Rui Li wrote: > Hi Paul, > > I believe Jingsong meant try using native writer, for which the option key > is `table.exec.hive.fallback-mapred-writer` and is b

Re: Beam Word Cound on Flink/kubernetes Issue with task manager ot getting picked up

2020-07-21 Thread Yang Wang
Hi Avijit Saha, I think you could use 'kubectl describe pod flink-task-manager-5cc79c5795-7mnqh' to get more information. Usually, it is caused by no enough resource in your K8s cluster. Best, Yang Avijit Saha 于2020年7月14日周二 上午7:12写道: > Hi, > > I have a docker image of the Beam WordCount examp

Re: Key group is not in KeyGroupRange

2020-07-21 Thread Ori Popowski
I should have mentioned, I've opened a bug for it https://issues.apache.org/jira/browse/FLINK-18637. So the discussion moved there. On Tue, Jul 14, 2020 at 2:03 PM Ori Popowski wrote: > I'm getting this error when creating a savepoint. I've read in > https://issues.apache.org/jira/browse/FLINK-1

Re: Parquet batch table sink in Flink 1.11

2020-07-21 Thread Jingsong Li
Hi Flavio, AvroOutputFormat only supports writing Avro files. I think you can use `AvroParquetOutputFormat` as a hadoop output format, and wrap it through Flink `HadoopOutputFormat`. Best, Jingsong On Fri, Jul 17, 2020 at 11:59 PM Flavio Pompermaier wrote: > Hi to all, > is there a way to writ

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Rui Li
Hi Paul, I believe Jingsong meant try using native writer, for which the option key is `table.exec.hive.fallback-mapred-writer` and is by default set to true. You can set it to false like this: tableEnv.getConfig().getConfiguration().set( HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false)

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Hi JingSong, Thanks for your advice! But IIUC, it seems that `table.exec.hive.fallback-mapred-reader` is false by default? Moreover, explicitly setting this option might cause a serialization issue. Wonder if I’m setting it in the right way? ``` tableEnv.getConfig().getConfiguration().setStr

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Hi Rui, I reproduced the error with a minimum case, the SQL is similar to `insert into hive_table_x select simple_string from kafka_table_b`. I’m pretty sure it’s not related to the table schema. And I removed all the optional properties in the Hive table DDL, the error still happened. Best,

Re: Simple MDC logs don't show up

2020-07-21 Thread Fabian Hueske
Hi, When running your code in the IDE, everything runs in the same local JVM. When you run the job on Kubernetes, the situation is very different. Your code runs in multiple JVM processes distributed in a cluster. Flink provides a metrics collection system that you should use to collect metrics f

Re: Flink rest api cancel job

2020-07-21 Thread Fabian Hueske
Hi White, Can you describe your problem in more detail? * What is your Flink version? * How do you deploy the job (application / session cluster), (Kubernetes, Docker, YARN, ...) * What kind of job are you running (DataStream, Table/SQL, DataSet)? Best, Fabian Am Mo., 20. Juli 2020 um 08:42 Uhr

RE: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-21 Thread B.Zhou
Hi Fabian, Thanks for the reply. I also created a JIRA: https://issues.apache.org/jira/browse/FLINK-18641 yesterday. I think we can extend our discussion there. Best Regards, Brian From: Fabian Hueske Sent: Tuesday, July 21, 2020 17:35 To: Zhou, Brian Cc: user; Arvid Heise; Piotr Nowojski Sub

Re: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-21 Thread Fabian Hueske
Hi Brian, AFAIK, Arvid and Piotr (both in CC) have been working on the threading model of the checkpoint coordinator. Maybe they can help with this question. Best, Fabian Am Mo., 20. Juli 2020 um 03:36 Uhr schrieb : > Anyone can help us on this issue? > > > > Best Regards, > > Brian > > > > *Fr

Re: Custom metrics output

2020-07-21 Thread Fabian Hueske
Hi Joris, I don't think that the approach of "add methods in operator class code that can be called from the main Flink program" will work. The most efficient approach would be implementing a ProcessFunction that counts in 1-min time buckets (using event-time semantics) and updates the metrics. I

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Hi Danny, Which version did you use I use Flink 1.11.0. > what SQL context throws the error ? I think the declaration itself is not a problem. The exception occurs when I tried to execute the following which I didn't show you in the previous email: > tEnv.sqlQuery("SELECT type, location FRO

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
Hi, I execute the sql below """ |create table navi ( | a STRING, | location ROW |) with ( | 'connector' = 'filesystem', | 'path' = 'east-out', | 'format' = 'json' |) |""".stripMargin tableEnv.executeSql(sql0) val sql = """ |CREATE TABLE output ( | `partition`

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Jingsong Li
Hi Paul, If your orc table has no complex(list,map,row) types, you can try to set `table.exec.hive.fallback-mapred-writer` to false in TableConfig. And Hive sink will use ORC native writer, it is a work-around way. About this error, I think this is a bug for Hive 1.1 ORC. I will try to re-produce

How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Dongwon Kim
Hi, I want to create subdirectories named after values of a nested column, location.transId. This is my first attempt: > CREATE TABLE output > PARTITIONED BY (`location.transId`) > WITH ( > 'connector' = 'filesystem', > 'path' = 'east-out', > 'format' = 'json' > ) LIKE navi (EXCLUDING ALL)

Re: How to get flink JobId in runtime

2020-07-21 Thread Yangze Guo
Hi Si-li, Just a reminder that it is not the right way to get JobId because the `StreamTask` is actually an internal class. For more discussion about it, please refer to [1] and [2]. You could get JobId through this way at the moment. Please keep in mind that it is not a stable contract. [1] http

Re: DynamoDB sink

2020-07-21 Thread Robert Metzger
Hi Lorenzo, I'm not aware of any well-maintained DynamoDB Sink for Flink. I created a JIRA ticket to track requests for it earlier this year: https://issues.apache.org/jira/browse/FLINK-16504 On Fri, Jul 17, 2020 at 5:40 PM Lorenzo Nicora wrote: > Hi > > I was wondering whether there is any re

Re: How to get flink JobId in runtime

2020-07-21 Thread Si-li Liu
I figure out another way, wrapper my function in a custom StreamOperator that extends AbstractUdfStreamOperator, then I can use this.getContainingTask.getEnvironment.getJobId Congxian Qiu 于2020年7月21日周二 上午11:49写道: > Hi Sili > > I'm not sure if there are other ways to get this value properly.

Re: Key group is not in KeyGroupRange

2020-07-21 Thread Robert Metzger
Looks like this thread is already being resolved in https://issues.apache.org/jira/browse/FLINK-18637 On Tue, Jul 21, 2020 at 10:26 AM Robert Metzger wrote: > Hi Ori, > thanks a lot for your email. Which version of Flink are you using? > > On Tue, Jul 14, 2020 at 1:03 PM Ori Popowski wrote: > >

Re: FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Rui Li
Hey Paul, Could you please share more about your job, e.g. the schema of your Hive table, whether it's partitioned, and the table properties you've set? On Tue, Jul 21, 2020 at 4:02 PM Paul Lam wrote: > Hi, > > I'm doing a POC on Hive connectors and find that when writing orc format > Hive tabl

Re: Map type param escaping :

2020-07-21 Thread Robert Metzger
Cool! Thanks for sharing the solution! On Tue, Jul 14, 2020 at 11:39 PM Bohinski, Kevin wrote: > Figured it out, pulled StructuredOptionsSplitter into a debugger and was > able to get it working with: > > -Dkubernetes.jobmanager.annotations="\"KEY:\"\"V:A:L:U:E\"\"\"" > > > > Best > > kevin >

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-21 Thread David Magalhães
Hi Till, I'm using s3:// schema, but not sure what was the default used if s3a or s3p. then the state backend should try to directly write to the target file > system That was the behaviour that I saw the second time I've run this with more slots. Does the savepoint write directly to S3 via stre

Re: Saving file to the ftp server

2020-07-21 Thread Robert Metzger
Hi Paweł, I believe this is a bug. I don't think many people use Flink to write to an FTP server, that's why this hasn't been addressed yet. There's probably something off with the semantics of distributed vs non-distributed file systems. I guess the easiest way to resolve this is by running your F

FileNotFoundException when writting Hive orc tables

2020-07-21 Thread Paul Lam
Hi, I'm doing a POC on Hive connectors and find that when writing orc format Hive tables, the job failed with FileNotFoundException right after ingesting data (full stacktrace at the bottom of the mail). The error can be steadily reproduced in my environment, which is Hadoop 2.6.5(CDH-5.6.0),

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-21 Thread Till Rohrmann
Two quick comments: With unaligned checkpoints which are released with Flink 1.11.0, the problem of slow checkpoints under backpressure has been resolved/mitigated to a good extent. Moreover, the community wants to work on event time alignment for sources in the next release. This should prevent th