Singal task backpressure problem with Credit-based Flow Control

2020-05-24 Thread Weihua Hu
Hi, all

I ran into a weird single Task BackPressure problem.

JobInfo:
DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via rescale. 
Flink version: 1.9.0

There is no related info in jobmanager/taskamanger log.

Through Metrics, I see that Map (242) 's outPoolUsage is full, but its 
downstream Sink (121)' s inPoolUsage is 0.

After dumping the memory and analyzing it, I found:
Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
This is not consistent with my understanding of the Flink network transmission 
mechanism.

Can someone help me? Thanks a lot.


Best
Weihua Hu



Re: Flink 1.8.3 Kubernetes POD OOM

2020-05-24 Thread Andrey Zagrebin
Hi Josson,

Thanks for the details. Sorry, I overlooked, you indeed mentioned the file 
backend.

Looking into Flink memory model [1], I do not notice any problems related to 
the types of memory consumption we model in Flink.
Direct memory consumption by network stack corresponds to your configured 
fraction (0.02f). JVM heap cannot cause problems.
I do not know any other types of memory consumption in Flink 1.8.

Nonetheless, there is no way to control all types of memory consumption, 
especially native memory allocation either by user code or JVM (if you do not 
use RocksDB, Flink barely uses the native memory explicitly).
The examples (not exhaustive):
- native libraries in user code or its dependencies which use off-heap, e.g. 
malloc (detecting this would require some OS process dump)
- JVM metaspace, threads/GC overhead etc (we do not limit any of this in 1.8 by 
JVM args)

Recently, we discovered some class loading leaks (JVM meatspace), e.g. [2] or 
[3].
Since 1.10, Flink limits JVM meatspace and direct memory then you would get a 
concrete OOM exception before container dies.
Maybe Kafka or Elastic search connector clients got updated with 1.8 and caused 
some leaks.
I cc’ed Gordon and Piotr whether they have an idea.

I suggest to try to decrease POD memory, note the consumed memory of various 
types at the moment the container dies 
(I suppose as you did), and then increase POD memory multiple times until you 
see which type of memory consumption always grows till OOM
and other types hopefully stabilise on some level.
Then you could take a dump of that ever growing type of memory consumption to 
analyse if there is memory leak.

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#total-memory
 

[2] https://issues.apache.org/jira/browse/FLINK-16142 

[3] https://issues.apache.org/jira/browse/FLINK-11205 


> On 24 May 2020, at 06:18, Josson Paul  wrote:
> 
> Hi Andrey,
>   To clarify the above email. I am using Heap Based State and not Rocks DB. 
> 
> Thanks,
> Josson
> 
> On Sat, May 23, 2020, 17:37 Josson Paul  > wrote:
> Hi Andrey,
>   We don't use Rocks DB. As I said in the original email I am using File 
> Based. Even though our cluster is on Kubernetes out Flink cluster is Flink's 
> stand alone resource manager. We have not yet integrated our Flink with 
> Kubernetes.
> 
> Thanks,
> Josson
> 
> On Fri, May 22, 2020 at 3:37 AM Andrey Zagrebin  > wrote:
> Hi Josson,
> 
> Do you use state backend? is it RocksDB?
> 
> Best,
> Andrey
> 
> On Fri, May 22, 2020 at 12:58 PM Fabian Hueske  > wrote:
> Hi Josson,
> 
> I don't have much experience setting memory bounds in Kubernetes myself, but 
> my colleague Andrey (in CC) reworked Flink's memory configuration for the 
> last release to ease the configuration in container envs.
> He might be able to help.
> 
> Best, Fabian
> 
> Am Do., 21. Mai 2020 um 18:43 Uhr schrieb Josson Paul  >:
> Cluster type: Standalone cluster
> Job Type: Streaming
> JVM memory: 26.2 GB
> POD memory: 33 GB
> CPU: 10 Cores
> GC: G1GC
> Flink Version: 1.8.3
> State back end: File based
> NETWORK_BUFFERS_MEMORY_FRACTION : 0.02f of the Heap
> We are not accessing Direct memory from application. Only Flink uses direct 
> memory
> 
> We notice that in Flink 1.8.3 over a period of 30 minutes the POD is killed 
> with OOM. JVM Heap is with in limit. 
> We read from Kafka and have windows in the application. Our Sink is either 
> Kafka or Elastic Search 
> The same application/job was working perfectly in Flink 1.4.1 with the same 
> input rate and output rate
> No back pressure
> I have attached few Grafana charts as PDF
> Any idea why the off heap memory / outside JVM memory is going up and 
> eventually reaching the limit.
> 
>  Java Heap (reserved=26845184KB, committed=26845184KB)
> (mmap: reserved=26845184KB, committed=26845184KB) 
> 
> - Class (reserved=1241866KB, committed=219686KB)
> (classes #36599)
> (malloc=4874KB #74568) 
> (mmap: reserved=1236992KB, committed=214812KB) 
> 
> - Thread (reserved=394394KB, committed=394394KB)
> (thread #383)
> (stack: reserved=392696KB, committed=392696KB)
> (malloc=1250KB #1920) 
> (arena=448KB #764)
> 
> - Code (reserved=272178KB, committed=137954KB)
> (malloc=22578KB #33442) 
> (mmap: reserved=249600KB, committed=115376KB) 
> 
> - GC (reserved=1365088KB, committed=1365088KB)
> (malloc=336112KB #1130298) 
> (mmap: reserved=1028976KB, committed=1028976KB)
> 
> 
> 
> -- 
> Thanks
> Josson
> 
> 
> -- 
> Thanks
> Josson



Re: Does Flink use EMRFS?

2020-05-24 Thread Rafi Aroch
Hi Peter,

I've dealt with the cross-account delegation issues in the past (with no
relation to Flink) and got into the same ownership problems (accounts can't
access data, account A 'loses' access to it's own data).

My 2-cents are that:

   - The account that produces the data (A) should be the ONLY OWNER of
   that data.
   - The policy to access the data should be managed in ONE place only, the
   producing account (A).
   - If you wish to expose access to your data to other accounts (B, C, D),
   the best approach would be to:
  - In account A - Create a policy that defines the access you wish to
  expose. For example: read access to specific bucket & path:

{
>   "Version": "2012-10-17",
>   "Statement": [
> {
>   "Effect": "Allow",
>   "Action": [
> "s3:GetObject",
> "s3:ListBucket"
>   ],
>   "Resource": [
> "arn:aws:s3:::bucket-name",
> "arn:aws:s3:::bucket-name/*"
>   ]
> }
>   ]
> }
>
>
   - In account A - Create a role and define which accounts you allow to
  AssumeRole (this let's you control if ALL or specific users of the other
  account should access the data):

{
  "Version": "2012-10-17",
  "Statement": [
{
  "Sid": "",
  "Effect": "Allow",
  "Principal": {
"AWS": [
  "arn:aws:iam::account-B:root",
  "arn:aws:iam::account-C:root",
  "arn:aws:iam::account-D:root"
]
  },
  "Action": "sts:AssumeRole"
}
  ]
}


   - In account A - attach the policy to the role.
  - In other accounts - THEY control which users have access to the
  data by allowing AssumeRole permissions to the role above from account A.
  This could be unrestricted (by *) or restricted to a specific role.:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam:::role/external-access-role"
}
]
}


Now when a user AssumeRole to that external-access-role role, it will be
granted the specified access without playing around with ownership
configurations.

Hope this helps,
Rafi


On Fri, May 22, 2020 at 11:39 PM Peter Groesbeck 
wrote:

> Hi,
>
> I'm using Flink StreamingFileSink running in one AWS account (A) to
> another (B). I'm also leveraging a SecurityConfiguration in the CFN to
> assume a role in account B so that when I write there the files are owned
> by account B which then in turn allows account B to delegate to other AWS
> accounts (C and D). The reason these files must be owned by the other
> account is because AWS doesn't support cross account delegation:
>
> https://docs.aws.amazon.com/AmazonS3/latest/dev/example-walkthroughs-managing-access-example4.html
>
> SecurityConfiguration:
>   Type: AWS::EMR::SecurityConfiguration
>   Properties:
> Name: String
> SecurityConfiguration:
>   AuthorizationConfiguration:
> EmrFsConfiguration:
>   RoleMappings:
> - Role: arn:aws:iam:::role/EMR_EC2_DefaultRole
>   IdentifierType: Prefix
>   Identifiers:
> - s3://my-bucket/prefix/
> - Role: arn:aws:iam:::role/EMR_DefaultRole
>   IdentifierType: Prefix
>   Identifiers:
> - s3://my-bucket/prefix/
>
>
> I've referenced this in my Cluster block as well:
>
> ReleaseLabel: !Ref ReleaseLabel
> SecurityConfiguration: !Ref SecurityConfiguration
> ScaleDownBehavior: TERMINATE_AT_TASK_COMPLETION
>
> For some reason the files are still owned by account A. It seems like
> Flink is using the old Hadoop FS implementation instead of EMRFS which
> should (I believe) grant the proper ownership so that bucket permissions
> can apply to the written objects and in turn delegate read permissinos to
> accounts C, D ect.
>
> Any help would be greatly appreciated.
>
> Thanks,
> Peter
>


Re: Singal task backpressure problem with Credit-based Flow Control

2020-05-24 Thread Zhijiang
Hi Weihua,

From your below info, it is with the expectation in credit-based flow control. 

I guess one of the sink parallelism causes the backpressure, so you will see 
that there are no available credits on Sink side and
the outPoolUsage of Map is almost 100%. It really reflects the credit-based 
states in the case of backpressure.

If you want to analyze the root cause of backpressure, you can trace the task 
stack of respective Sink parallelism to find which operation costs much,
 then you can increase the parallelism or improve the UDF(if have bottleneck) 
to have a try. In addition, i am not sure why you choose rescale to shuffle 
data among operators. The default
forward mode can gain really good performance by default if you adjusting the 
same parallelism among them.

Best,
Zhijiang
--
From:Weihua Hu 
Send Time:2020年5月24日(星期日) 18:32
To:user 
Subject:Singal task backpressure problem with Credit-based Flow Control

Hi, all

I ran into a weird single Task BackPressure problem.

JobInfo:
DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via rescale. 
Flink version: 1.9.0
There is no related info in jobmanager/taskamanger log.

Through Metrics, I see that Map (242) 's outPoolUsage is full, but its 
downstream Sink (121)' s inPoolUsage is 0.

After dumping the memory and analyzing it, I found:
Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
This is not consistent with my understanding of the Flink network transmission 
mechanism.

Can someone help me? Thanks a lot.


Best
Weihua Hu



Re: Re: Flink Window with multiple trigger condition

2020-05-24 Thread Yun Gao
Hi,

   First sorry that I'm not expert on Window and please correct me if I'm 
wrong, but from my side, it seems the assigner might also be a problem in 
addition to the trigger: currently Flink window assigner should be all based on 
time (processing time or event time), and it might be hard to implement an 
event-driven window assigner that start to assign elements to a window after 
received some elements. 
  What comes to me is that a possible alternative method is to use the 
low-level KeyedProcessFunction directly:  you may register a timer 30 mins 
later when received the "search" event and write the time of search event into 
the state. Then for the following events, they will be saved to the state since 
the flag is set. After received the "start" event or the timer is triggered, 
you could load all the events from the states, do the aggregation and cancel 
the timer if it is triggered by "start" event. A simpler case is [1] and it 
does not consider stop the aggreation when received special event, but it seems 
that the logic could be added to the case.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example

Best,
 Yun




 --Original Mail --
Sender:aj 
Send Date:Sun May 24 01:10:55 2020
Recipients:Tzu-Li (Gordon) Tai 
CC:user 
Subject:Re: Flink Window with multiple trigger condition


I am still not able to get much after reading the stuff. Please help with some 
basic code to start to build this window and trigger. 

Another option I am thinking is I just use a Richflatmap function and use the 
keyed state to build this logic. Is that the correct approach? 



On Fri, May 22, 2020 at 4:52 PM aj  wrote:


I was also thinking to have a processing time window but that will not work for 
me. I want to start the window when the user  "search" event arrives. So for 
each user window will start from the search event. 
 The Tumbling window has fixed start end time so that will not be suitable in 
my case. 




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai  
wrote:
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





Multiple Sinks for a Single Soure

2020-05-24 Thread Prasanna kumar
Hi,

There is a single source of events for me in my system.

I need to process and send the events to multiple destination/sink at the
same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]

I am able send to one sink.

By adding more sink stream to the source stream could we achieve it . Are
there any shortcomings.

Please let me know if any one here has successfully implemented one .

Thanks,
Prasanna.


Re: kerberos integration with flink

2020-05-24 Thread Yangze Guo
Yes, you can use kinit. But AFAIK, if you deploy Flink on Kubernetes
or Mesos, Flink will not ship the ticket cache. If you deploy Flink on
Yarn, Flink will acquire delegation tokens with your ticket cache and
set tokens for job manager and task executor. As the document said,
the main drawback is that the cluster is necessarily short-lived since
the generated delegation tokens will expire (typically within a week).

Best,
Yangze Guo

On Sat, May 23, 2020 at 1:23 AM Nick Bendtner  wrote:
>
> Hi Guo,
> Even for HDFS I don't really need to set "security.kerberos.login.contexts" . 
> As long as there is the right ticket in the ticket cache before starting the 
> flink cluster it seems to work fine. I think even [4] from your reference 
> seems to do the same thing. I have defined own ticket cache specifically for 
> flink cluster by setting this environment variable. Before starting the 
> cluster I create a ticket by using kinit.
> This is how I make flink read this cache.
> export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to find 
> the location of ticket cache using this variable [1].
> Do you see any problems in setting up hadoop security module this way ? And 
> thanks a lot for your help.
>
> [1] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
>
> Best,
> Nick
>
>
>
> On Thu, May 21, 2020 at 9:54 PM Yangze Guo  wrote:
>>
>> Hi, Nick,
>>
>> From my understanding, if you configure the
>> "security.kerberos.login.keytab", Flink will add the
>> AppConfigurationEntry of this keytab to all the apps defined in
>> "security.kerberos.login.contexts". If you define
>> "java.security.auth.login.config" at the same time, Flink will also
>> keep the configuration in it. For more details, see [1][2].
>>
>> If you want to use this keytab to interact with HDFS, HBase and Yarn,
>> you need to set "security.kerberos.login.contexts". See [3][4].
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
>> [2] 
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
>> [4] 
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner  wrote:
>> >
>> > Hi guys,
>> > Is there any difference in providing kerberos config to the flink jvm 
>> > using this method in the flink configuration?
>> >
>> > env.java.opts:  -Dconfig.resource=qa.conf 
>> > -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/ 
>> > -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
>> >  -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
>> >
>> > Is there any difference in doing it this way vs providing it from 
>> > security.kerberos.login.keytab .
>> >
>> > Best,
>> >
>> > Nick.


Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-24 Thread Yangze Guo
Glad to see that!

However, I was told that it is not the right approach to directly
extend `AbstractUdfStreamOperator` in DataStream API. This would be
fixed at some point (maybe Flink 2.0). The JIRA link is [1].

[1] https://issues.apache.org/jira/browse/FLINK-17862

Best,
Yangze Guo

On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez
 wrote:
>
> thanks. it worked!
>
> I add the following method at the
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> class:
>
> public Environment getTaskEnvironment() { return this.taskEnvironment; }
>
> Then I am getting the IP using:
>
> ConfigOption restAddressOption = ConfigOptions
>.key("rest.address")
>.stringType()
>.noDefaultValue();
> String restAddress =
> this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);
>
> Thanks!
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 22, 2020 at 3:54 AM Yangze Guo  wrote:
> >
> > Hi, Felipe
> >
> > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> > could indeed get all the configurations(including what you defined in
> > flink-conf.yaml) through
> > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> > However, I guess it is not the right behavior and might be fixed in
> > future versions.
> >
> > Best,
> > Yangze Guo
> >
> >
> >
> > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
> >  wrote:
> > >
> > > Hi all,
> > >
> > > I would like to have the IP of the JobManager, not the Task Executors.
> > > I explain why.
> > >
> > > I have an operator (my own operator that extends
> > > AbstractUdfStreamOperator) that sends and receives messages from a
> > > global controller. So, regardless of which TaskManager these operator
> > > instances are deployed, they need to send and receive messages from my
> > > controller. Currently, I am doing this using MQTT broker (this is my
> > > first approach and I don't know if there is a better way to do it,
> > > maybe there is...)
> > >
> > > The first thing that I do is to start my controller using the
> > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > > it to the JobManager host. I am getting the IP of the JobManager by
> > > adding this method on the
> > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > > class:
> > >public String getRpcServiceAddress() {
> > > return this.rpcService.getAddress();
> > > }
> > > That is working. Although I am not sure if it is the best approach.
> > >
> > > The second thing that I am doing is to make each operator instance
> > > publish and subscribe to this controller. To do this they need the
> > > JobManager IP. I could get the TaskManager IPs from the
> > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > > the JobManager IP as a parameter to the operator at the moment. I
> > > suppose that it is easy to get the JobManager IP inside the
> > > AbstractUdfStreamOperator or simply add some method somewhere to get
> > > this value. However, I don't know where.
> > >
> > > Thanks,
> > > Felipe
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo  wrote:
> > > >
> > > > Hi, Felipe
> > > >
> > > > Do you mean to get the Host and Port of the task executor where your
> > > > operator is indeed running on?
> > > >
> > > > If that is the case, IIUC, two possible components that contain this
> > > > information are RuntimeContext and the Configuration param of
> > > > RichFunction#open. After reading the relevant code path, it seems you
> > > > could not get it at the moment.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > > >  wrote:
> > > > >
> > > > > Hi Felippe,
> > > > >
> > > > > could you clarify in some more details what you are trying to achieve?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > --
> > > > >
> > > > > Alexander Fedulov | Solutions Architect
> > > > >
> > > > > +49 1514 6265796
> > > > >
> > > > >
> > > > >
> > > > > Follow us @VervericaData
> > > > >
> > > > > --
> > > > >
> > > > > Join Flink Forward - The Apache Flink Conference
> > > > >
> > > > > Stream Processing | Event Driven | Real Time
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH
> > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, 
> > > > > Ji (Tony) Cheng
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
> > > > >  wrote:
> > > > >>
> > > > >> Hi all,
> > > > >>
> > > >

Re: java.lang.NoSuchMethodError while writing to Kafka from Flink

2020-05-24 Thread Guowei Ma
Hi
1. You could check whether the 'org.apache.flink.api.java.clean' is in
your classpath first.
2. Do you follow the doc[1] to deploy your local cluster and run some
existed examples such as WordCount?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html
Best,
Guowei


Re: Flink Dashboard UI Tasks hard limit

2020-05-24 Thread Xintong Song
>
> Increasing network memory buffers (fraction, min, max) seems to increase
> tasks slightly.

That's wired. I don't think the number of network memory buffers have
anything to do with the task amount.

Let me try to clarify a few things.

Please be aware that, how many tasks a Flink job has, and how many slots a
Flink cluster has, are two different things.
- The number of tasks are decided by your job's parallelism and topology.
E.g., if your job graph have 3 vertices A, B and C, with parallelism 2, 3,
4 respectively. Then you would have totally 9 (2+3+4) tasks.
- The number of slots are decided by number of TMs and slots-per-TM.
- For streaming jobs, you have to make sure the number of slots is enough
for executing all your tasks. The number of slots needed for executing your
job is by default the max parallelism of your job graph vertices. Take the
above example, you would need 4 slots, because it's the max among all the
vertices' parallelisms (2, 3, 4).

In your case, the screenshot shows that you job has 9621 tasks in total
(not around 18000, the dark box shows total tasks while the green box shows
running tasks), and 600 slots are in use (658 - 58) suggesting that the max
parallelism of your job graph vertices is 600.

If you want to increase the number of tasks, you should increase your job
parallelism. There are several ways to do that.

   - In your job codes (assuming you are using DataStream API)
  - Use `StreamExecutionEnvironment#setParallelism()` to set
  parallelism for all operators.
  - Use `SingleOutputStreamOperator#setParallelism()` to set
  parallelism for a specific operator. (Only supported for subclasses of
  `SingleOutputStreamOperator`.)
   - When submitting your job, use `-p ` as an argument for
   the `flink run` command, to set parallelism for all operators.
   - Set `parallelism.default` in your `flink-conf.yaml`, to set a default
   parallelism for your jobs. This will be used for jobs that have not set
   parallelism with neither of the above methods.


Thank you~

Xintong Song



On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan 
wrote:

> Hi Xintong,
> Thx for your reply.  Increasing network memory buffers (fraction, min,
> max) seems to increase tasks slightly.
>
> Streaming job
> Standalone
>
> Vijay
>
> On Fri, May 22, 2020 at 2:49 AM Xintong Song 
> wrote:
>
>> Hi Vijay,
>>
>> I don't think your problem is related to number of opening files. The
>> parallelism of your job is decided before actually tries to open the files.
>> And if the OS limit for opening files is reached, you should see a job
>> execution failure, instead of a success execution with a lower parallelism.
>>
>> Could you share some more information about your use case?
>>
>>- What kind of job are your executing? Is it a streaming or batch
>>processing job?
>>- Which Flink deployment do you use? Standalone? Yarn?
>>- It would be helpful if you can share the Flink logs.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> I have increased the number of slots available but the Job is not using
>>> all the slots but runs into this approximate 18000 Tasks limit. Looking
>>> into the source code, it seems to be opening file -
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>> So, do I have to tune the ulimit or something similar at the Ubuntu O/S
>>> level to increase number of tasks available ? What I am confused about is
>>> the ulimit is per machine but the ExecutionGraph is across many machines ?
>>> Please pardon my ignorance here. Does number of tasks equate to number of
>>> open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has
>>> 16 vCPUs.
>>>
>>> TIA.
>>>
>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan 
>>> wrote:
>>>
 Hi,

 Flink Dashboard UI seems to show tasks having a hard limit for Tasks
 column around 18000 on a Ubuntu Linux box.
 I kept increasing the number of slots per task manager to 15 and number
 of slots increased to 705 but the slots to tasks
 stayed at around 18000. Below 18000 tasks, the Flink Job is able to
 start up.
 Even though I increased the number of slots, it still works when 312
 slots are being used.

 taskmanager.numberOfTaskSlots: 15

 What knob can I tune to increase the number of Tasks ?

 Pls find attached the Flink Dashboard UI.

 TIA,




Re: Apache Flink - Question about application restart

2020-05-24 Thread Zhu Zhu
Hi M,

Regarding your questions:
1. yes. The id is fixed once the job graph is generated.
2. yes

Regarding yarn mode:
1. the job id keeps the same because the job graph will be generated once
at client side and persist in DFS for reuse
2. yes if high availability is enabled

Thanks,
Zhu Zhu

M Singh  于2020年5月23日周六 上午4:06写道:

> Hi Flink Folks:
>
> If I have a Flink Application with 10 restarts, if it fails and restarts,
> then:
>
> 1. Does the job have the same id ?
> 2. Does the automatically restarting application, pickup from the last
> checkpoint ? I am assuming it does but just want to confirm.
>
> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
> restart the job 3 times (after it has exhausted it's restart policy) .  If
> that is the case:
> 1. Does the job get a new id ? I believe it does, but just want to confirm.
> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
> not, but is there a way to make it restart from the last checkpoint of the
> failed job (after it has exhausted its restart policy) ?
>
> Thanks
>
>
>


Re: java.lang.NoSuchMethodError while writing to Kafka from Flink

2020-05-24 Thread tison
Could you try to download binary dist from flink download page and
re-execute the job? It seems like something wrong with flink-dist.jar.

BTW, please post user question on only user mailing list(not dev).

Best,
tison.


Guowei Ma  于2020年5月25日周一 上午10:49写道:

> Hi
> 1. You could check whether the 'org.apache.flink.api.java.clean' is in
> your classpath first.
> 2. Do you follow the doc[1] to deploy your local cluster and run some
> existed examples such as WordCount?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html
> Best,
> Guowei
>


Pojo List and Map Data Type in UDFs

2020-05-24 Thread lec ssmi
Hi:
  I received a java pojo serialized json string from kafka,  and I want to
use UDTF to restore it to a table with a similar structure to pojo.
  Some  member variables of pojo use the List type or Map type
whose generic  type  is also a pojo .
  The sample code as bellow:

   public class Car {
>   private String name;
>   private Integer id;
>  private  List drivers;
> setters and getters .
>
>   }


  I want to  I want to convert it into a table with three columns of id,
name, drivers. And the type of Driver column is Array. Each driver element
can be taken out of the driver by index.
  Does the current  API support this  complex type of pojo?

 Thanks.


In consistent Check point API response

2020-05-24 Thread Vijay Bhaskar
Hi
I am using flink retained check points and along with
 jobs/:jobid/checkpoints API for retrieving the latest retained check point
Following the response of Flink Checkpoints API:

I have my jobs restart attempts are 5
 check point API response in "latest" key, check point file name of both
"restored" and "completed" values are having following behavior
1)Suppose the job is failed 3 times and recovered 4'th time, then both
values are same
2)Suppose the job is failed 4 times and recovered 5'th time, then both
values are same
3)Suppose the job is failed 5 times and recovered 6'th time, then both
values are same
4) Suppose the job is failed all 6 times and the job marked failed. then
also both the values are same
5)Suppose job is failed 6'th time , after recovering from 5 attempts
and made few check points, then both values are different.

During case (1), case (2), case (3) and case (4) i never had any issue.
Only When case (5) i had severe issue in my production as the "restored "
field check point doesn't exist

Please suggest any



{
   "counts":{
  "restored":6,
  "total":3,
  "in_progress":0,
  "completed":3,
  "failed":0
   },
   "summary":{
  "state_size":{
 "min":4879,
 "max":4879,
 "avg":4879
  },
  "end_to_end_duration":{
 "min":25,
 "max":130,
 "avg":87
  },
  "alignment_buffered":{
 "min":0,
 "max":0,
 "avg":0
  }
   },
   "latest":{
  "completed":{
 "@class":"completed",
 "id":7094,
 "status":"COMPLETED",
 "is_savepoint":false,
 "trigger_timestamp":1590382502772,
 "latest_ack_timestamp":1590382502902,
 "state_size":4879,
 "end_to_end_duration":130,
 "alignment_buffered":0,
 "num_subtasks":2,
 "num_acknowledged_subtasks":2,
 "tasks":{

 },

 
"external_path":"file:/var/lib/persist/flink/checkpoints/29ae7600aa4f7d53a0dc1a0a7b257c85/chk-7094",
 "discarded":false
  },
  "savepoint":null,
  "failed":null,
  "restored":{
 "id":7093,
 "restore_timestamp":1590382478448,
 "is_savepoint":false,

 
"external_path":"file:/var/lib/persist/flink/checkpoints/29ae7600aa4f7d53a0dc1a0a7b257c85/chk-7093"
  }
   },
   "history":[
  {
 "@class":"completed",
 "id":7094,
 "status":"COMPLETED",
 "is_savepoint":false,
 "trigger_timestamp":1590382502772,
 "latest_ack_timestamp":1590382502902,
 "state_size":4879,
 "end_to_end_duration":130,
 "alignment_buffered":0,
 "num_subtasks":2,
 "num_acknowledged_subtasks":2,
 "tasks":{

 },

 
"external_path":"file:/var/lib/persist/flink/checkpoints/29ae7600aa4f7d53a0dc1a0a7b257c85/chk-7094",
 "discarded":false
  },
  {
 "@class":"completed",
 "id":7093,
 "status":"COMPLETED",
 "is_savepoint":false,
 "trigger_timestamp":1590382310195,
 "latest_ack_timestamp":1590382310220,
 "state_size":4879,
 "end_to_end_duration":25,
 "alignment_buffered":0,
 "num_subtasks":2,
 "num_acknowledged_subtasks":2,
 "tasks":{

 },

 
"external_path":"file:/var/lib/persist/flink/checkpoints/29ae7600aa4f7d53a0dc1a0a7b257c85/chk-7093",
 "discarded":false
  },
  {
 "@class":"completed",
 "id":7092,
 "status":"COMPLETED",
 "is_savepoint":false,
 "trigger_timestamp":1590382190195,
 "latest_ack_timestamp":1590382190303,
 "state_size":4879,
 "end_to_end_duration":108,
 "alignment_buffered":0,
 "num_subtasks":2,
 "num_acknowledged_subtasks":2,
 "tasks":{

 },

 
"external_path":"file:/var/lib/persist/flink/checkpoints/29ae7600aa4f7d53a0dc1a0a7b257c85/chk-7092",
 "discarded":true
  }
   ]
}