Re: PostgreSQL JDBC connection drops after inserting some records

2020-01-28 Thread Arvid Heise
Hi Soheil, what is your actual question? Did the application eventually finish or does it keep restarting? In general, communication with external systems may fail from time to time. Only if it persists, we would explore it. If it is very rare, a restart should already help. Best, Arvid On Thu

Re: How to debug a job stuck in a deployment/run loop?

2020-01-28 Thread Arvid Heise
Hi Jason, could you describe your topology? Are you writing to Kafka? Are you using exactly once? Are you seeing any warning? If so, one thing that immediately comes to my mind is transaction.max.timeout.ms. If the value in flink (by default 1h) is higher than what the Kafka brokers support, it ma

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-28 Thread Arvid Heise
Hi Aaron, I encountered a similar issue when running on EMR. On the slaves, there are some lingering hadoop versions that are older than 2.7 (it was 2.6 if I remember correctly), which bleed into the classpath of Flink. Flink checks the Hadoop version to check if certain capabilities like file tru

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-28 Thread Arvid Heise
Sorry I meant to respond to Senthi. Thank you Aaron for providing help. Also one more thing that may be confusing the first time you use plugins. You need to put plugins in their own folders, we improved documentation in the upcoming 1.10 release: flink-dist ├── conf ├── lib ... └── plugins └

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-28 Thread Arvid Heise
Hi Mark, if you add `fs.s3a.fast.upload.buffer: true` to your Flink configuration, it should add that to the respective Hadoop configuration when creating the file system. Note, I haven't tried it but all keys with the prefixes "s3.", "s3a.", "fs.s3a." should be forwarded. -- Arvid On Mon, Jan 2

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread Arvid Heise
Hi Kant, just wanted to mention the obvious. If you add a ProcessFunction right after the join, you could maintain a user state with the same result. That will of course blow up the data volume by a factor of 2, but may still be better than writing to an external system. On Mon, Jan 27, 2020 at 6

Re: PostgreSQL JDBC connection drops after inserting some records

2020-01-28 Thread Fabian Hueske
Hi, The exception is thrown by Postgres. I'd start investigating there what the problem is. Maybe you need to tweak your Postgres configuration, but it might also be that the Flink connector needs to be differently configured. If the necessary config option is missing, it would be good to add. H

Re: batch job OOM

2020-01-28 Thread Arvid Heise
Hi Fanbin, you could use the RC1 of Flink that was created yesterday and use the apache repo https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/ . Alternatively, if you build Flink locally with `mvn install`, then you could use mavenLocal() in

Re: Is there anything strictly special about sink functions?

2020-01-28 Thread Arvid Heise
As Konstantin said, you need to use a sink, but you could use `org.apache.flink.streaming.api.functions.sink.DiscardingSink`. There is nothing inherently wrong with outputting things through a UDF. You need to solve the same challenges as in a SinkFunction: you need to implement your own state man

Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-28 Thread Arvid Heise
Hi Alexey, we cannot perform a checkpoint on a UDF that is still being called as we would not be able to have a consistent snapshot. You could potentially have changed the state, so if we replay the event during recovery, you may get inexact results. For example consider a simple counter, where yo

Re: SideOutput Exception: "Output Tag must not be null"

2020-01-28 Thread Arvid Heise
Hi Izual, it seems as the code example is not complete. I'm assuming backupOutputTag is actually a field within your application class. If you look at the examples, you will notice that backupOutputTag should be defined within the method that defines your topology and not on the wrapping object.

Re: Flink RocksDB logs filling up disk space

2020-01-28 Thread Ahmad Hassan
Hi Yun, Thank you for pointing that out. In our production landscapes with live customers, we have 10 second checkpoint interval and 7MB of average checkpoint size. We do incremental checkpoints. If we keep the checkpoint interval longer (i.e. 1 minute) then the kafka consumer lag starts increasin

Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-28 Thread Taher Koitawala
Would AsyncIO operator not be an option for you to connect to RDBMS? On Tue, Jan 28, 2020, 12:45 PM Alexey Trenikhun wrote: > Thank you Yun Tang. > My implementation potentially could block for significant amount of time, > because I wanted to do RDBMS maintenance (create partitions for new data

Re: Flink and Presto integration

2020-01-28 Thread Piotr Nowojski
Hi, Yes, Presto (in presto-hive connector) is just using hive Metastore to get the table definitions/meta data. If you connect to the same hive Metastore with Flink, both systems should be able to see the same tables. Piotrek > On 28 Jan 2020, at 04:34, Jingsong Li wrote: > > Hi Flavio, > >

Re: REST rescale with Flink on YARN

2020-01-28 Thread Gary Yao
Hi, You can use yarn application -status to find the host and port that the server is listening on (AM host & RPC Port). If you need to access that information programmatically, take a look at the YarnClient [1]. Best, Gary [1] https://hadoop.apache.org/docs/r2.8.5/api/org/apache/hadoop/

Re: FileStreamingSink is using the same counter for different files

2020-01-28 Thread Kostas Kloudas
Hi Pawel, You are correct that the write method invocation is guaranteed to be thread safe for the same sub operator instance. But I am not sure if having a unique counter per subtask across buckets would add much to the user experience of the sink. I think that in both cases, the interpretation o

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread kant kodali
Hi Arvid, I am trying to understand your statement. I am new to Flink so excuse me if I don't know something I should have known. ProcessFunction just process the records right? If so, how is it better than writing to an external system? At the end of the day I want to be able to query it (doesn't

Re: Flink and Presto integration

2020-01-28 Thread Flavio Pompermaier
Hive metastore is the de facto standard for Hadoop but in my use case I have to query other databases (like MySQL, Oracle and SQL Server). So Presto would be a good choice (apart from the fact that you need to restart it when you add a new catalog..), and I'd like to have an easy translation of the

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread Arvid Heise
Yes, the default is writing to an external system. Especially if you want SQL, then there is currently no other way around it. The drawbacks of writing to external systems are: additional maintenance of another system and higher latency. On Tue, Jan 28, 2020 at 11:49 AM kant kodali wrote: > Hi

Re: Flink ParquetAvroWriters Sink

2020-01-28 Thread aj
I am able to resolve this issue by setting classloader.resolve-order as parent-first. On Wed, Jan 22, 2020, 23:13 aj wrote: > Hi Arvid, > > I have implemented the code with envelope schema as you suggested but now > I am facing issues with the consumer . I have written code like this: > > FlinkK

Re: Flink RocksDB logs filling up disk space

2020-01-28 Thread Yun Tang
Hi Ahmad We mainly recommend our user to set the checkpoint interval as three minutes. If you don't rely on the keyed state to persistence, you could also disable checkpoint and let the kafka client to commit offset automatically [1] which might the most light-weight solution. [1] https://ci.

Re: Flink RocksDB logs filling up disk space

2020-01-28 Thread Ahmad Hassan
Hello Yun, With no checkpointing it is even a bigger problem because if we rely on flink auto commit then if it fails to commit once due to any outage or kafka rebalancing then it never retries again and it means full outage on live systems. For sure we need checkpointing for other reasons too i.

Reinterpreting a pre-partitioned data stream as keyed stream

2020-01-28 Thread KristoffSC
Hi all, we have a use case where order of received events matters and it should be kept across pipeline. Our pipeline would be paralleled. We can key the stream just after Source operator, but in order to keep the ordering among next operators we would have to still keep the stream keyed. Obviou

Flink distribution housekeeping for YARN sessions

2020-01-28 Thread Theo Diefenthal
Hi there, Today I realized that we currently have a lot of not housekept flink distribution jar files and would like to know what to do about this, i.e. how to proper housekeep them. In the job submitting HDFS home directory, I find a subdirectory called `.flink` with hundreds of subfolders

Does flink support retries on checkpoint write failures

2020-01-28 Thread Richard Deurwaarder
Hi all, We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more. The reason for this instability is tha

is streaming outer join sending unnecessary traffic?

2020-01-28 Thread kant kodali
Hi All, I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sa

Re: is streaming outer join sending unnecessary traffic?

2020-01-28 Thread kant kodali
Sorry. fixed some typos. I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 F

Re: REST rescale with Flink on YARN

2020-01-28 Thread Yang Wang
Gary is right. You could also access the Yarn RM rest api to get the AM original address. http://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Application_API Best, Yang Gary Yao 于2020年1月28日周二 下午6:17写道: > Hi, > > You can use > > yarn applicati

Flink+YARN HDFS replication factor

2020-01-28 Thread Piper Piper
Hello, When using Flink+YARN (with HDFS) and having a long running Flink session (mode) cluster with a Flink client submitting jobs, the HDFS could have a replication factor greater than 1 (example 3). So, I would like to know when and how any of the data (like event-data or batch-data) or code (

Apache Flink Job fails repeatedly due to RemoteTransportException

2020-01-28 Thread M Singh
Hi Folks: We have streaming Flink application (using v 1.6.2) and it dies within 12 hours.  We have configured number of restarts which is 10 at the moment. Sometimes the job runs for some time and then within a very short time has a number of restarts and finally fails.  In other instances, the