Re: Flink 1.5.2 - excessive ammount of container requests, Received new/Returning excess container "flood"

2018-10-06 Thread Gary Yao
Hi Borys,

To debug how many containers Flink is requesting, you can look out for the
log
statement below [1]:

Requesting new TaskExecutor container with resources [...]

If you need help debugging, can you attach the full JM logs (preferably on
DEBUG level)? Would it be possible for you to test against 1.5.3 and 1.5.4?
However, I am not aware of any related issues that were fixed for 1.5.3 or
1.5.4. What is the Hadoop distribution that you are using?

Best,
Gary

[1]
https://github.com/apache/flink/blob/release-1.5.2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java#L454

On Wed, Oct 3, 2018 at 11:36 AM Borys Gogulski 
wrote:

> Hey,
>
>
>
> We’re running Flink 1.5.2 (I know there’s 1.5.4 and 1.6.1) on YARN for
> some jobs we’re processing. It’s a “long running” container to which we’re
> submitting jobs – all jobs submitted to that container have got parallelism
> of 32 (to be precise: in this job there are 8 subtasks with parallelism 32
> and one subtask with parallelism 1), we’re running max 8 of them. TMs are
> set to have one slot only and 6GB RAM each.
> On the beginning, when using Flink 1.5.0 and 1.5.1 with the “on-demand”
> resources policy we were noticing that more containers than it’s required
> are spawned but with Flink 1.5.2 it “stabilized” – there were obviously
> some containers kept for some time after job finished (and no additional
> job was submitted to take those resources) but overhead wasn’t big so we
> were “all good”.
> And here’s the plot twist.
> For couple days now we’re witnessing situations in which spawning one job
> makes Flink request couple hundreds of TMs. Additionally in JM’s logs we
> can find dozens of lines like:
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594295 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594295.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594300 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594300.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594303 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594303.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594304 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594304.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594334 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594334.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594337 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594337.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594152 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594152.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594410 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,187 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594410.
>
> Only change made last week seems to be adding 5 new nodes to YARN Cluster.
> Any ideas why it’s requesting so many containers? Any ideas why there’s
> this “Received/Returning” flood? Right now one job was started an

Re: Utilising EMR's master node

2018-10-06 Thread Gary Yao
Hi Averell,

It is up to the YARN scheduler on which hosts the containers are started.

What Flink version are you using? I assume you are using 1.4 or earlier
because you are specifying a fixed number of TMs. If you launch Flink with
-yn
2, you should be only seeing 2 TMs in total (not 4). Are you starting two
clusters?

Beginning with Flink 1.5, -yn is obsolete because resources are acquired
dynamically, and it is not well-defined in what order TM slots are exhausted
[1].

Best,
Gary

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td23364.html

On Wed, Sep 26, 2018 at 9:25 AM Averell  wrote:

> Thank you Gary.
> Regarding your previous suggestion to to change the configuration regarding
> to the number of vcores on the EMR master node, I tried and found one
> funny/bad behaviour as following:
>  * hardware onfiguration: master node: 4vcores + 8GB ram, 2x executors with
> 16vcores + 32GB ram each.
>  * Flink launch parameters: -yn 2 -ys 16 -ytm 4g...
> 4 TMs were created, with 2 of them were used (0 free slots) and two others
> not used (16 free slots). The bad thing is most of the time 2 free TMs are
> on a same machine, and two occupied ones are on the other machine.
> If I dont change the Hadoop configurations then still 4 TMs created, but
> the
> occupied ones are always on two different servers.
>
> I'm not sure whether that's EMR's issue, or YARN's or Flink's.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Utilising EMR's master node

2018-10-06 Thread Averell
Hi Gary,

Thanks for the information. I didn't know that -yn is obsolete :( I am using
Flink 1.6.
Not sure whether that's a bug when I tried to set -yn explicitly, but I
started only 1 cluster.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


JDBCAppendTableSink Metric

2018-10-06 Thread Anil
I need to monitor the latency and other similar metric for the Mysql Sink
that Flink writes to. I'm using  JDBCAppendTableSink. Any way how I can
check these. Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.5.2 - excessive ammount of container requests, Received new/Returning excess container "flood"

2018-10-06 Thread Till Rohrmann
Hi Borys,

if possible the complete logs of the JM (DEBUG log level) would be helpful
to further debug the problem. Have there been recovery operations lately?

Cheers,
Till

On Sat, Oct 6, 2018 at 11:15 AM Gary Yao  wrote:

> Hi Borys,
>
> To debug how many containers Flink is requesting, you can look out for the
> log
> statement below [1]:
>
> Requesting new TaskExecutor container with resources [...]
>
> If you need help debugging, can you attach the full JM logs (preferably on
> DEBUG level)? Would it be possible for you to test against 1.5.3 and 1.5.4?
> However, I am not aware of any related issues that were fixed for 1.5.3 or
> 1.5.4. What is the Hadoop distribution that you are using?
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/release-1.5.2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java#L454
>
> On Wed, Oct 3, 2018 at 11:36 AM Borys Gogulski 
> wrote:
>
>> Hey,
>>
>>
>>
>> We’re running Flink 1.5.2 (I know there’s 1.5.4 and 1.6.1) on YARN for
>> some jobs we’re processing. It’s a “long running” container to which we’re
>> submitting jobs – all jobs submitted to that container have got parallelism
>> of 32 (to be precise: in this job there are 8 subtasks with parallelism 32
>> and one subtask with parallelism 1), we’re running max 8 of them. TMs are
>> set to have one slot only and 6GB RAM each.
>> On the beginning, when using Flink 1.5.0 and 1.5.1 with the “on-demand”
>> resources policy we were noticing that more containers than it’s required
>> are spawned but with Flink 1.5.2 it “stabilized” – there were obviously
>> some containers kept for some time after job finished (and no additional
>> job was submitted to take those resources) but overhead wasn’t big so we
>> were “all good”.
>> And here’s the plot twist.
>> For couple days now we’re witnessing situations in which spawning one job
>> makes Flink request couple hundreds of TMs. Additionally in JM’s logs we
>> can find dozens of lines like:
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594295 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594295.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594300 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594300.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594303 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594303.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594304 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594304.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594334 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594334.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594337 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594337.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594152 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Returning
>> excess container container_e96_1538374332137_0793_01_594152.
>>
>> 2018-10-03 11:08:27,186 INFO
>> org.apache.flink.yarn.YarnResourceManager - Received
>> new container: container_e96_1538374332137_0793_01_594410 - Remaining
>> pending container requests: 0
>>
>> 2018-10-03 11:08:27,187 INFO
>> org.a

error in using kafka in flink

2018-10-06 Thread marzieh ghasemi
Hello

I downloaded kafka and followed these instructions step by step:

cd kafka_2.11-2

# start zookeeper server
./bin/zookeeper-server-start.sh ./config/zookeeper.properties

# start broker
./bin/kafka-server-start.sh ./config/server.properties

# create topic “test”
 ./bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181
--partitions 1 --replication-f

# consume from the topic using the console producer
./bin/kafka-console-consumer.sh --topic test --zookeeper localhost:2181

# produce something into the topic (write something and hit enter)
./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

Also, I added "flink-connector-kafka" and "kafka-client" dependencies to
"pom.xml".
But while I run the example of "Monitoring the Wikipedia Edit Stream" I got
this error:
"cannot resolve symbol FlinkKafkaProducer08". I searched a lot but I could
n't find the solution.

Would you please help me?

Thank you in advance.


Re: Unable to start session cluster using Docker

2018-10-06 Thread Vinay Patil
Thank you Till, I am able to start the session-cluster now.

Regards,
Vinay Patil


On Fri, Oct 5, 2018 at 8:15 PM Till Rohrmann  wrote:

> Hi Vinay,
>
> are you referring to flink-contrib/docker-flink/docker-compose.yml? We
> recently fixed the command line parsing with Flink 1.5.4 and 1.6.1. Due to
> this, the removal of the second command line parameter intended to be
> introduced with 1.5.0 and 1.6.0 (see
> https://issues.apache.org/jira/browse/FLINK-8696) became visible. The
> docker-compose.yml file has not yet been updated. I will do this right away
> and push the changes to the 1.5, 1.6 and master branch. Sorry for the
> inconveniences. As a local fix for you, please go to
> flink-contrib/docker-flink/docker-entrypoint.sh:33 and remove the cluster
> parameter from this line.
>
> Cheers,
> Till
>
> On Thu, Oct 4, 2018 at 8:30 PM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I have used the docker-compose file for creating the cluster as shown in
>> the documentation. The web ui is started successfully, however, the task
>> managers are unable to join.
>>
>> Job Manager container logs:
>>
>> 018-10-04 18:13:13,907 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest
>> endpoint listening at cluster:8081
>>
>> 2018-10-04 18:13:13,907 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
>> http://cluster:8081 was granted leadership with
>> leaderSessionID=----
>>
>> 2018-10-04 18:13:13,907 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web
>> frontend listening at http://cluster:8081
>>
>> 2018-10-04 18:13:14,012 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> ResourceManager akka.tcp://flink@cluster:6123/user/resourcemanager was
>> granted leadership with fencing token 
>>
>> 2018-10-04 18:13:14,013 INFO
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
>> Starting the SlotManager.
>>
>> 2018-10-04 18:13:14,026 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
>> akka.tcp://flink@cluster:6123/user/dispatcher was granted leadership
>> with fencing token ----
>>
>> Not sure why it says Web Frontend listening at cluster:8081 when the job
>> manager rpc address is specified to jobmanager
>>
>> Task Manager Container Logs:
>>
>> 018-10-04 18:19:18,818 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>> to ResourceManager akka.tcp://flink@jobmanager
>> :6123/user/resourcemanager().
>>
>> 2018-10-04 18:19:18,818 INFO
>> org.apache.flink.runtime.filecache.FileCache  - User file
>> cache uses directory
>> /tmp/flink-dist-cache-1bd95c51-3031-42ab-b782-14a0023921e5
>>
>> 2018-10-04 18:19:28,850 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address 
>> akka.tcp://flink@jobmanager:6123/user/resourcemanager,
>> retrying in 1 ms: Ask timed out on
>> [ActorSelection[Anchor(akka.tcp://flink@jobmanager:6123/),
>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>> of type "akka.actor.Identify".
>>
>>
>> I have even tried to set JOB_MANAGER_RPC_ADDRESS=cluster in   in
>> docker-compose file, it does not work.
>> Even "cluster" and "jobmanager" points to localhost in /etc/hosts file.
>>
>> Can you please let me know what is the issue here.
>>
>> Regards,
>> Vinay Patil
>>
>


Re: JDBCAppendTableSink Metric

2018-10-06 Thread Hequn Cheng
Hi anil,

It seems that there are no dedicated metrics for jdbc sink.
For latency, Flink allows to track the latency of records traveling through
the system[1].
For other metics, I think you can make use of the system metrics. For
example, the metric of numRecordsInPerSecond[2] may be helpful for you.
If these metrics can not meet your requirements, you can report metrics by
yourself, Flink exposes a metric system that allows gathering and exposing
metrics to external systems[3].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html

On Sat, Oct 6, 2018 at 7:29 PM Anil  wrote:

> I need to monitor the latency and other similar metric for the Mysql Sink
> that Flink writes to. I'm using  JDBCAppendTableSink. Any way how I can
> check these. Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


RE: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

2018-10-06 Thread Samir Tusharbhai Chauhan
Hi Till,

Thanks for identifying the issue. My cluster is up and running now.

I have few queries. Can you have to anwer that?


  1.  Do I need to set below properties in my cluster?

jobmanager.rpc.address

rest.address

rest.bind-address

jobmanager.web.address

  1.  Is there anything I should be take care while setting it up?
  2.  How do I know which job manager is active?
  3.  How do I secure my cluster?

Samir Chauhan

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Friday, October 05, 2018 11:09 PM
To: Samir Tusharbhai Chauhan 
Cc: user 
Subject: Re: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

Hi Samir,

could you share the logs of the two JMs and the log where you saw the 
FencingTokenException with us?

It looks to me as if the TM had an outdated fencing token (an outdated leader 
session id) with which it contacted the ResourceManager. This can happen and 
the TM should try to reconnect to the RM once it learns about the new leader 
session id via ZooKeeper. You could, for example check in ZooKeeper that it 
contains the valid leader information.

Cheers,
Till

On Fri, Oct 5, 2018 at 9:58 AM Samir Tusharbhai Chauhan 
mailto:samir.tusharbhai.chau...@prudential.com.sg>>
 wrote:
[cid:image001.gif@01D45CB1.46C88460]

Hi,



I am having issue in setting up cluster for Flink. I have 2 nodes for Job 
Manager and 2 nodes for Task Manager.



My configuration file looks like this.



jobmanager.rpc.port: 6123

jobmanager.heap.size: 2048m

taskmanager.heap.size: 2048m

taskmanager.numberOfTaskSlots: 64

parallelism.default: 1

rest.port: 8081

high-availability.jobmanager.port: 50010

high-availability: zookeeper

high-availability.storageDir: file:///sharedflink/state_dir/ha/

high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181

high-availability.zookeeper.path.root: /flink

high-availability.cluster-id: /flick_ns



state.backend: rocksdb

state.checkpoints.dir: file:///sharedflink/state_dir/backend

state.savepoints.dir: file:///sharedflink/state_dir/savepoint

state.backend.incremental: false

state.backend.rocksdb.timer-service.factory: rocksdb

state.backend.local-recovery: false



But when I start services, I get this error message.



java.util.concurrent.CompletionException:

org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token

mismatch: Ignoring message

RemoteFencedMessage(b00185a18ea3da17ebe39ac411a84f3a,

RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) because the fencing token 
b00185a18ea3da17ebe39ac411a84f3a did not match the expected fencing token 
bce1729df0a2ab8a7ea0426ba9994482.

at

java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)





But when I run JM and TM in single box, it is working fine.



Please help to resolve this issue ASAP as I am running out of option and time.



-Samir Chauhan




There's a reason we support Fair Dealing. YOU.


This email and any files transmitted with it or attached to it (the [Email]) 
may contain confidential, proprietary or legally privileged information and is 
intended solely for the use of the individual or entity to whom it is 
addressed. If you are not the intended recipient of the Email, you must not, 
directly or indirectly, copy, use, print, distribute, disclose to any other 
party or take any action in reliance on any part of the Email. Please notify 
the system manager or sender of the error and delete all copies of the Email 
immediately.

No statement in the Email should be construed as investment advice being given 
within or outside Singapore. Prudential Assurance Company Singapore (Pte) 
Limited (PACS) and each of its related entities shall not be responsible for 
any losses, claims, penalties, costs or damages arising from or in connection 
with the use of the Email or the information therein, in whole or in part. You 
are solely responsible for conducting any virus checks prior to opening, 
accessing or disseminating the Email.

PACS (Company Registration No. 199002477Z) is a company incorporated under the 
laws of Singapore and has its registered office at 30 Cecil Street, #30-01, 
Prudential Tower, Singapore 049712.

PACS is an indirect wholly owned subsidiary of Prudential plc of the United 
Kingdom. PACS and Prudential plc are not affiliated in any manner with 
Prudential Financial, Inc., a company whose principal place of business is in 
the United States of America.

There's a reason we support Fair Dealing. YOU.


This email and any files transmitted with it or attached to it (the [Email]) 
may contain confidential, proprietary or legally privileged information and is 
intended solely for the use of the individual or entity to whom it is 
addressed. If you are not the intended recipient of the Email, you must not, 
directly or indirectly, copy, use, print, distribute, disclose to any other 
party or take any action in reliance on any part of the Email. Please noti

Re: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

2018-10-06 Thread Till Rohrmann
Hi Samir,

1. In your setup (not running on top of Yarn or Mesos) you need to set the
jobmanager.rpc.address such that the JM process knows where to bind to. The
other components use ZooKeeper to find out the addresses. The other
properties should not be needed.
3. You can take a look at the ZooKeeper leader latch node. Alternatively,
you can take a look at the address to which you are redirected when
accessing the web UI.
4.
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-ssl.html

Cheers,
Till

On Sat, Oct 6, 2018 at 5:57 PM Samir Tusharbhai Chauhan <
samir.tusharbhai.chau...@prudential.com.sg> wrote:

> Hi Till,
>
>
>
> Thanks for identifying the issue. My cluster is up and running now.
>
>
>
> I have few queries. Can you have to anwer that?
>
>
>
>1. Do I need to set below properties in my cluster?
>
> jobmanager.rpc.address
>
> rest.address
>
> rest.bind-address
>
> jobmanager.web.address
>
>1. Is there anything I should be take care while setting it up?
>2. How do I know which job manager is active?
>3. How do I secure my cluster?
>
>
>
> Samir Chauhan
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Friday, October 05, 2018 11:09 PM
> *To:* Samir Tusharbhai Chauhan  >
> *Cc:* user 
> *Subject:* Re:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>
>
>
> Hi Samir,
>
>
>
> could you share the logs of the two JMs and the log where you saw the
> FencingTokenException with us?
>
>
>
> It looks to me as if the TM had an outdated fencing token (an outdated
> leader session id) with which it contacted the ResourceManager. This can
> happen and the TM should try to reconnect to the RM once it learns about
> the new leader session id via ZooKeeper. You could, for example check in
> ZooKeeper that it contains the valid leader information.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 5, 2018 at 9:58 AM Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> Hi,
>
>
>
> I am having issue in setting up cluster for Flink. I have 2 nodes for Job
> Manager and 2 nodes for Task Manager.
>
>
>
> My configuration file looks like this.
>
>
>
> jobmanager.rpc.port: 6123
>
> jobmanager.heap.size: 2048m
>
> taskmanager.heap.size: 2048m
>
> taskmanager.numberOfTaskSlots: 64
>
> parallelism.default: 1
>
> rest.port: 8081
>
> high-availability.jobmanager.port: 50010
>
> high-availability: zookeeper
>
> high-availability.storageDir: file:///sharedflink/state_dir/ha/
>
> high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181
>
> high-availability.zookeeper.path.root: /flink
>
> high-availability.cluster-id: /flick_ns
>
>
>
> state.backend: rocksdb
>
> state.checkpoints.dir: file:///sharedflink/state_dir/backend
>
> state.savepoints.dir: file:///sharedflink/state_dir/savepoint
>
> state.backend.incremental: false
>
> state.backend.rocksdb.timer-service.factory: rocksdb
>
> state.backend.local-recovery: false
>
>
>
> But when I start services, I get this error message.
>
>
>
> java.util.concurrent.CompletionException:
>
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token
>
> mismatch: Ignoring message
>
> RemoteFencedMessage(b00185a18ea3da17ebe39ac411a84f3a,
>
> RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int,
> HardwareDescription, Time))) because the fencing token
> b00185a18ea3da17ebe39ac411a84f3a did not match the expected fencing token
> bce1729df0a2ab8a7ea0426ba9994482.
>
> at
>
>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>
>
>
>
>
> But when I run JM and TM in single box, it is working fine.
>
>
>
> Please help to resolve this issue ASAP as I am running out of option and
> time.
>
>
>
> -Samir Chauhan
>
>
>
>
>
>
> There's a reason we support Fair Dealing. YOU.
>
>
> This email and any files transmitted with it or attached to it (the
> [Email]) may contain confidential, proprietary or legally privileged
> information and is intended solely for the use of the individual or entity
> to whom it is addressed. If you are not the intended recipient of the
> Email, you must not, directly or indirectly, copy, use, print, distribute,
> disclose to any other party or take any action in reliance on any part of
> the Email. Please notify the system manager or sender of the error and
> delete all copies of the Email immediately.
>
> No statement in the Email should be construed as investment advice being
> given within or outside Singapore. Prudential Assurance Company Singapore
> (Pte) Limited (PACS) and each of its related entities shall not be
> responsible for any losses, claims, penalties, costs or damages arising
> from or in connection with the use of the Email or the information therein,
> in whole or in part. You are solely responsible for conducting any virus
> checks prior to opening, accessing or disseminating the Email.
>
> PACS (Company Registration No. 199002477Z) is a company incorporated under
> the laws of Si

Re: Streaming to Parquet Files in HDFS

2018-10-06 Thread Averell
Hi Kostas,

Please help ignore my previous email about the issue with security. It seems
to I had mixed version of shaded and unshaded jars.

However, I'm now facing another issue with writing parquet files: only the
first part is closed. All the subsequent parts are kept in in-progress state
forever. My settings are to have checkpoint every 3 minutes. Sink
parallelism set to 1 (my tries to set to 4 or 30 showed no difference).
BucketID assigner is using event-timestamp.
I only got this issue when running Flink on a yarn cluster, either writing
to file:/// or to S3. When I ran it on my laptop, I got one part for every
single checkpoint.
TM logs says something like "*BucketState ... has pending files for
checkpoints: {2 }*"

Could you please help on how can I further debug this?

Here below is the TM log:

2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(153765630,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on