Failed to cancel a job using the STOP rest API

2021-06-03 Thread Thomas Wang
Hi, Flink community, I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id". >From the F

Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Thomas Wang
Hi Yun, Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented. Also could you suggest how I could use the "request-id" to get the savepoint location? T

Re: Re: Failed to cancel a job using the STOP rest API

2021-06-05 Thread Thomas Wang
you ever found some exception or some messages > in the > TaskManager's log when it could not be stopped ? > > Best, > Yun > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid > > > >

Re: Re: Failed to cancel a job using the STOP rest API

2021-06-05 Thread Thomas Wang
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints? Thomas On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: > Hi Yu

Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-07 Thread Thomas Wang
ns, and it seems to be not work as > expected to me... > Could you also show us the dag of the job ? And does some operators in the > source task > use multiple-threads to emit records? > > Best, > Yun > > > --Original Mail -- > *Sender

Resource Planning

2021-06-15 Thread Thomas Wang
Hi, I'm trying to see if we have been given enough resources (i.e. CPU and memory) to each task node to perform a deduplication job. Currently, the job is not running very stable. What I have been observing is that after a couple of days run, we will suddenly see backpressure happen on one arbitra

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-15 Thread Thomas Wang
@aliyun.com) wrote: > Hi Thomas, > > I tried but do not re-produce the exception yet. I have filed > an issue for the exception first [1]. > > > > [1] https://issues.apache.org/jira/browse/FLINK-22928 > > > --Original Mail -- > *Sen

How would Flink job react to change of partitions in Kafka topic?

2021-06-22 Thread Thomas Wang
Hi, I'm wondering if anyone has changed the number of partitions of a source Kafka topic. Let's say I have a Flink job read from a Kafka topic which used to have 32 partitions. If I change the number of partitions of that topic to 64, can the Flink job still guarantee the exactly-once semantics?

NoSuchMethodError - getColumnIndexTruncateLength after upgrading Flink from 1.11.2 to 1.12.1

2021-06-22 Thread Thomas Wang
Hi, We recently upgraded our Flink version from 1.11.2 to 1.12.1 and one of our jobs that used to run ok, now sees the following error. This error doesn't seem to be related to any user code. Can someone help me take a look? Thanks. Thomas java.lang.NoSuchMethodError: org.apache.parquet.column.

Use Flink to write a Kafka topic to s3 as parquet files

2021-06-22 Thread Thomas Wang
Hi, I'm trying to tail a Kafka topic and copy the data to s3 as parquet files. I'm using StreamingFileSink with ParquetAvroWriters. It works just fine. However, it looks like I have to generate the Avro schema and convert my POJO class to GenericRecord first (i.e. convert DataStream to DataStream)

Re: How would Flink job react to change of partitions in Kafka topic?

2021-06-23 Thread Thomas Wang
Jun 22, 2021 at 1:32 PM Thomas Wang wrote: > >> Hi, >> >> I'm wondering if anyone has changed the number of partitions of a source >> Kafka topic. >> >> Let's say I have a Flink job read from a Kafka topic which used to have >> 32 partiti

Yarn Application Crashed?

2021-06-27 Thread Thomas Wang
Hi, I recently experienced a job crash due to the underlying Yarn application failing for some reason. Here is the only error message I saw. It seems I can no longer see any of the Flink job logs. Application application_1623861596410_0010 failed 1 times (global limit =2; local limit is =1) due t

Re: Yarn Application Crashed?

2021-06-27 Thread Thomas Wang
this job was using like 99% of the entire cluster. However in that case shouldn't Yarn wait for containers to become available? I'm not quite sure how Flink would behave in this case. Could someone provide some insights here? Thanks. Thomas On Sun, Jun 27, 2021 at 4:24 PM Thomas Wan

Re: Yarn Application Crashed?

2021-06-29 Thread Thomas Wang
-attempts (default 1). The YARN Application will fail once > all attempts are exhausted. > > ? > > Best, > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/yarn/#flink-on-yarn-reference > > pon., 28 c

Re: NoSuchMethodError - getColumnIndexTruncateLength after upgrading Flink from 1.11.2 to 1.12.1

2021-06-29 Thread Thomas Wang
t;> To give more information >> >> parquet-avro version 1.10.0 with Flink 1.11.2 and it was running fine. >> >> now Flink 1.12.1, the error msg shows up. >> >> Thank you for help. >> >> Rommel >> >> >> >> >> >> On Tu

Exact S3 Permissions to allow a flink job to use s3 for checkpointing

2021-09-23 Thread Thomas Wang
Hi, I'm trying to figure out what exact s3 permissions does a flink job need to work appropriately when using s3 for checkpointing. Currently, I have the following IAM Policy, but it seems insufficient. Can anyone help me figure this out? Thanks. { Action = [ "s3:PutObject", "s3:GetObject", ] Eff

Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
Hi, I'm using the official docker image: apache/flink:1.12.1-scala_2.11-java11 I'm trying to run a Flink job on an EKS cluster. The job is running under a k8s service account that is tied to an IAM role. If I'm not using s3 as RocksDB checkpoint backend, everything works just fine. However, when

Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
e/flink/pull/16717 > > > Best > Ingo > > On Sat, Sep 25, 2021, 20:46 Thomas Wang wrote: > >> Hi, >> >> I'm using the official docker image: >> apache/flink:1.12.1-scala_2.11-java11 >> >> I'm trying to run a Flink job on an EKS cluster

Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
On Sat, Sep 25, 2021 at 7:25 PM Thomas Wang wrote: > Thanks Ingo. Adding the following setting worked. > > fs.s3a.aws.credentials.provider: > com.amazonaws.auth.WebIdentityTokenCredentialsProvider > > Thomas > > On Sat, Sep 25, 2021 at 1:12 PM Ingo Bürk wrote: > >&g

Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Thomas Wang
/... fs.s3a.aws.credentials.provider.role.sessionName: ... However, for some reason, I'm still getting the same error. Please help! Thanks. Thomas On Sat, Sep 25, 2021 at 9:36 PM Thomas Wang wrote: > Ingo, > > It looks like I'm now seeing "Caused by: java.lang.NullPointerException: > You must

DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Thomas Wang
Hi, I have a use case where I need to call DataStream.keyBy() with keys loaded from a configuration. The number of keys and their data types are variables and is determined by the configuration. Once the configuration is loaded, they won't change. I'm trying to use the following key selector, but

Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Thomas Wang
10, 2022 at 6:30 AM Thomas Wang wrote: > >> Hi, >> >> I have a use case where I need to call DataStream.keyBy() with keys >> loaded from a configuration. The number of keys and their data types are >> variables and is determined by the configuration. Once the con

Re: DataStream.keyBy() with keys determined at run time

2022-07-11 Thread Thomas Wang
Tuple0 to Tuple25. > > > > On Sun, Jul 10, 2022 at 5:43 PM Thomas Wang wrote: > >> > >> I didn't copy the exact error message, but basically the idea of the > error message is that I cannot use the abstract class Tuple and instead, I > should use Tuple1, T