Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Ivan Yang
Yes, increase the taskmanager.network.memory.fraction in your case. Also reduce the parallelism will reduce number of network buffer required for your job. I never used 1.4.x, so don’t know about it. Ivan > On Jul 31, 2020, at 11:37 PM, Rahul Patwari > wrote: > > Thanks for your reply, Ivan.

Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Rahul Patwari
Thanks for your reply, Ivan. I think taskmanager.network.memory.max is by default 1GB. In my case, the network buffers memory is 13112 * 32768 = around 400MB which is 10% of the TM memory as by default taskmanager.network.memory.fraction is 0.1. Do you mean to increase taskmanager.network.memory.f

Error with Flink-Gelly, lastJobExecutionResult is null for ExecutionEnvironment

2020-07-31 Thread Xia Rui
Hello, everyone. I am trying to use Flink-Gelly. The version of Flink I used is 1.11 (I also tried 1.12, and it does not work as well). Following the instruction in https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/gelly/ . First, I build my Flink code with: (1) git clo

Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Ivan Yang
Hi Rahul, Try to increase taskmanager.network.memory.max to 1GB, basically double what you have now. However, you only have 4GB RAM for the entire TM, seems out of proportion to have 1GB network buffer with 4GB total RAM. Reducing number of shuffling will require less network buffer. But if you

allowNonRestoredState: metadata file in checkpoint dir missing

2020-07-31 Thread Deshpande, Omkar
Hello, When deleting an operator we run our application with --allowNonRestoredState=true, as described in the documentation. When running with this

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Hequn Cheng
Hi Jincheng, Thanks a lot for raising the discussion. +1 for the FLIP. I think this will bring big benefits for the PyFlink users. Currently, the Python TableAPI document is hidden deeply under the TableAPI&SQL tab which makes it quite unreadable. Also, the PyFlink documentation is mixed with Jav

sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Rahul Patwari
Hi, We are observing "Insufficient number of Network Buffers" issue Sporadically when Flink is upgraded from 1.4.2 to 1.8.2. The state of the tasks with this issue translated from DEPLOYING to FAILED. Whenever this issue occurs, the job manager restarts. Sometimes, the issue goes away after the re

Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Vijayendra Yadav
Thank You Niels. Would you have something for the scala object class. Say for example if I want to implement a unit test ( not integration test) for below code or similar : https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streami

Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Niels Basjes
Does this test in one of my own projects do what you are looking for? https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107 On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav

Kafka source, committing and retries

2020-07-31 Thread Jack Phelan
Scenario === A partition that Flink is reading: [ 1 - 2 - 3 - 4 - 5 - 6 - 7 - | 8 _ 9 _ 10 _ 11 | 12 ~ 13 ] [. Committed. | In flight | unread ] Kafka basically breaks off pieces of the end of the queue and shoves them downstream for processing? So suppose whil

[Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Vijayendra Yadav
Hi Team, Looking for some help and reference code / material to implement unit tests of possible scenarios in Flink *streaming *Code that should assert specific cases. Regards, Vijay

Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 Thread Eleanore Jin
Hi Experts, I have a flink cluster (per job mode) running on kubernetes. The job is configured with restart strategy restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s So after 3 times retry, the job will be marked as FAILED, hence the pods are not running. However

Is there a way to get file "metadata" as part of stream?

2020-07-31 Thread John Smith
Hi, so reading a CSV file using env.readFile() with RowCsvInputFormat. Is there a way to get the filename as part of the row stream? The file contains a unique identifier to tag the rows with.

Re: How to stream CSV from S3?

2020-07-31 Thread John Smith
Hi Yes it works :) For the Java guys... final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String path = "file:///foo/bar"; TypeInformation[] fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO

JDBCOutputFormat dependency loading error

2020-07-31 Thread Flavio Pompermaier
Hi to all, I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting toward Mariadb in my code. I've put the mariadb-java-client-2.6.0.jar in the lib directory and in the pom.xml I set that dependency as provided. The code runs successfully from the Ide but when I try to run the code on t

Re: Support for Event time clock specific to each stream in parallel streams

2020-07-31 Thread David Anderson
It sounds like you would like to have something like event-time-based windowing, but with independent watermarking for every key. An approach that can work, but it is somewhat cumbersome, is to not use watermarks or windows, but instead put all of the logic in a KeyedProcessFunction (or RichFlatMap

Re: Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread Jark Wu
Hi, For now, you can explicitly set the RowTypeInfo to retain the field names. This works in master branch: *val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)* // t1 stream schema: Row(a: Integer, b: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tEnv.reg

Support for Event time clock specific to each stream in parallel streams

2020-07-31 Thread Sush Bankapura
Hi, We have a single Flink job that works on data from multiple data sources. These data sources are not aligned in time and also have intermittent connectivity lasting for days, due to which data will arrive late We attempted to use the event time and watermarks with parallel streams using ke

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-07-31 Thread Vikash Dat
Thanks for the reply. I am currently using 1.10 but also saw it happens in 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to 1.10 at the moment. Are there any known work arounds? On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren wrote: > Hi Vikash, > > It's a bug about classloa

Re: Customization of execution environment

2020-07-31 Thread Aljoscha Krettek
I agree! My long-term goal is that a Configuration is the basis of truth and that the programmatic setter methods and everything else just modify the underlying configuration. We have made big steps in at least allowing to configure most (if not all) StreamExecutionEnvironment and TableEnviron

Re:Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread izual
I create a JIRA issue here, https://issues.apache.org/jira/browse/FLINK-18782 And thanks for your advice to avoid 「top-level projection/rename」^_^ At 2020-07-30 16:58:45, "Dawid Wysakowicz" wrote: Hi, I am afraid you are facing an issue that was not checked for/was not considered. I t

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Marta Paes Moreira
Hi, Jincheng! Thanks for creating this detailed FLIP, it will make a big difference in the experience of Python developers using Flink. I'm interested in contributing to this work, so I'll reach out to you offline! Also, thanks for sharing some information on the adoption of PyFlink, it's great t

Re: Colocating Compute

2020-07-31 Thread Dawid Wysakowicz
Hi Satyam, It should be fine to have unbounded InputFormat. The important thing is not to produce more splits than there are parallel instances of your source. In createInputSplits(int minNumSplits) generate only minNumSplits. It is so that all splits can be assigned immediately. Unfortunately you

EMR Saving CheckPoint into to S3 and 403 Access Denied

2020-07-31 Thread mars
Hi, I am running Flin k Jobs on EMR (5.30.1) and trying to save the checkpoint info to S3. I have the following in flink-conf.xml file and when i try to submit the jobs to flink cluster the JobManager is is failing as it is unable to save the checkpoint info to S3. s3.access-key: <> s3.s