Re: Debugging long Flink checkpoint durations

2021-03-02 Thread Yun Gao
Hi Dan,

I think you could see the detail of the checkpoints via the checkpoint UI[1]. 
Also, if you see in the
pending checkpoints some tasks do not take snapshot,  you might have a look 
whether this task
is backpressuring the previous tasks [2].

Best,
Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
--
Sender:Dan Hill
Date:2021/03/02 04:34:56
Recipient:user
Theme:Debugging long Flink checkpoint durations

Hi.  Are there good ways to debug long Flink checkpoint durations?

I'm running a backfill job that runs ~10 days of data and then starts 
checkpointing failing.  Since I only see the last 10 checkpoints in the 
jobmaster UI, I don't see when it starts.

I looked through the text logs and didn't see much.

I assume:
1) I have something misconfigured that is causing old state is sticking around.
2) I don't have enough resources.



Savepoint documentation

2021-03-02 Thread Farouk
Hi

Does this chapter is outdated with Flink 1.11 ?

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage

*Can I move the Savepoint files on stable storage?*

*The quick answer to this question is currently “no” because the meta data
file references the files on stable storage as absolute paths for technical
reasons. The longer answer is: if you MUST move the files for some reason
there are two potential approaches as workaround. First, simpler but
potentially more dangerous, you can use an editor to find the old path in
the meta data file and replace them with the new path. Second, you can use
the class SavepointV2Serializer as starting point to programmatically read,
manipulate, and rewrite the meta data file with the new paths.*



Thanks
Farouk


Re: Flink application kept restarting

2021-03-02 Thread Rainie Li
Thanks for checking, Matthias.

I have another flink job which failed last weekend with the same buffer
pool destroyed error. This job is also running version 1.9.
Here is the error I found from the task manager log. Any suggestion what is
the root cause and how to fix it?

2021-02-28 00:54:45,943 WARN
 org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
while canceling task.
java.lang.RuntimeException: Buffer pool is destroyed.
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
--
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.io.network.partition.ProducerFailedException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandle

How to get operator uid from a sql

2021-03-02 Thread XU Qinghui
Hello folks

I'm trying to use the flink state processor api to read the state of
operators from a checkpoint. But currently the operator look up in the API
relies on the operator `uid` (e.g. ExistingSavepoint.readKeyedState(uid,
readerFunction)).
But when it comes to a sql job, where should I look up for the operator
uid? I tried both the hexstring of the hash and the operator name, but
neither works.

Could you point me in the right direction?

BR,


Flink KafkaProducer flushing on savepoints

2021-03-02 Thread Witzany, Tomas
Hi,
I have a question about the at-least-once guarantees for Kafka producers when 
checkpointing is disabled. In our data pipeline we have a Flink job on an 
unlimited stream that originally, we had checkpoints turned on. Further this 
job is cancelled with a savepoint once a day to do some data pre and 
post-processing for the next day, afterwards this job is restarted from the 
savepoint.

The issue we have is that we want to turn off checkpointing, since it does not 
give us much value and only creates extra IO. When this is done this 
message
 shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling 
flushing."
This prompted us to investigate, and it seems that if you have checkpointing 
disabled, there are no at-least-once guarantees. 


What about if you have no checkpointing, but you make savepoints that you 
restore from yourself? Savepoints are the same thing as checkpoints in the 
code. The flink producer makes it impossible to turn on flushing and have 
checkpointing disabled. I can see why this is the case as there is some extra 
synchronization overhead related to the flushing flag being on. Is there a way 
to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very 
high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany


Re: Savepoint documentation

2021-03-02 Thread David Anderson
You are correct in thinking that the documentation wasn't updated. If you
look at the master docs [1] you will see that they now say

Can I move the Savepoint files on stable storage? #


The quick answer to this question is currently “yes”. Sink Flink 1.11.0,
savepoints are self-contained and relocatable. You can move the file and
restore from any location.


If you want more detail than the quick answer, see [2].

Best,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#can-i-move-the-savepoint-files-on-stable-storage
[2] https://issues.apache.org/jira/browse/FLINK-19381

On Tue, Mar 2, 2021 at 10:33 AM Farouk  wrote:

> Hi
>
> Does this chapter is outdated with Flink 1.11 ?
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage
>
> *Can I move the Savepoint files on stable storage?*
>
> *The quick answer to this question is currently “no” because the meta data
> file references the files on stable storage as absolute paths for technical
> reasons. The longer answer is: if you MUST move the files for some reason
> there are two potential approaches as workaround. First, simpler but
> potentially more dangerous, you can use an editor to find the old path in
> the meta data file and replace them with the new path. Second, you can use
> the class SavepointV2Serializer as starting point to programmatically read,
> manipulate, and rewrite the meta data file with the new paths.*
>
>
> 
>
> Thanks
> Farouk
>


Re: Best way to handle BIGING to TIMESTAMP conversions

2021-03-02 Thread Sebastián Magrí
Thanks a lot Jark,

On Mon, 1 Mar 2021 at 02:38, Jark Wu  wrote:

> Hi Sebastián,
>
> You can use `TO_TIMESTAMP(FROM_UNIXTIME(e))` to get a timestamp value.
> The BIGINT should be in seconds.  Please note to declare the computed
> column
>  in DDL schema and declare a watermark strategy on this computed field to
> make
>  the field to be a rowtime attribute. Because streaming over window
> requires to
>  order by a time attribute.
>
> Best,
> Jark
>
> On Sun, 21 Feb 2021 at 07:32, Sebastián Magrí 
> wrote:
>
>> I have a table with two BIGINT fields for start and end of an event as
>> UNIX time in milliseconds. I want to be able to have a resulting column
>> with the delta in milliseconds and group by that difference. Also, I want
>> to be able to have aggregations with window functions based upon the `end`
>> field.
>>
>> The table definition looks like this:
>> |CREATE TABLE sessions (
>> |  `ats`   STRING,
>> |  `e` BIGINT,
>> |  `s` BIGINT,
>> |  `proc_time` AS PROCTIME(),
>> |  PRIMARY KEY (`ats`, `s`, `e`) NOT ENFORCED
>> |)
>>
>> Then I have a few views like this:
>>
>> CREATE VIEW second_sessions AS
>>   SELECT * FROM sessions
>>   WHERE `e` - `s` = 1000
>>
>> And some windows using these views like this:
>>
>>   WINDOW w3m AS (
>> PARTITION BY `t`
>> ORDER BY `proc_time`
>> RANGE BETWEEN INTERVAL '3' MINUTE PRECEDING AND CURRENT ROW
>>   )
>>
>> I'd like to use the `e` field for windowing instead of `proc_time`. But I
>> keep running into errors with the `TO_TIMESTAMP(BIGINT)` function now
>> missing or with unsupported timestamp arithmetics.
>>
>> What is the best practice for a case such as this?
>>
>> Best Regards,
>> --
>> Sebastián Ramírez Magrí
>>
>

-- 
Sebastián Ramírez Magrí


[Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-02 Thread Sebastián Magrí
While using a simple query such as this

SELECT
   `ts`,
   FLOOR(`ts` TO WEEK) as `week_start`,
   CEIL(`ts` TO WEEK) as `week_end`
FROM some_table

I get some weird results like these:

2021-03-01T00:00|2021-02-25T00:00|2021-03-04T00:00

Which is obviously wrong since March 1st is on Monday, February 25th is
Thursday as well as March 04th.

I've tried different combinations of timezone configurations and with both
timestamps and dates, with the same results.

Is there anything obviously wrong in that query? Is there any configuration
to keep in mind for the start of week day?

-- 
Sebastián Ramírez Magrí


Re: Best way to handle BIGING to TIMESTAMP conversions

2021-03-02 Thread Yik San Chan
I think you can also do CAST((e / 1000) AS TIMESTAMP)

On Tue, Mar 2, 2021 at 7:27 PM Sebastián Magrí  wrote:

> Thanks a lot Jark,
>
> On Mon, 1 Mar 2021 at 02:38, Jark Wu  wrote:
>
>> Hi Sebastián,
>>
>> You can use `TO_TIMESTAMP(FROM_UNIXTIME(e))` to get a timestamp value.
>> The BIGINT should be in seconds.  Please note to declare the computed
>> column
>>  in DDL schema and declare a watermark strategy on this computed field to
>> make
>>  the field to be a rowtime attribute. Because streaming over window
>> requires to
>>  order by a time attribute.
>>
>> Best,
>> Jark
>>
>> On Sun, 21 Feb 2021 at 07:32, Sebastián Magrí 
>> wrote:
>>
>>> I have a table with two BIGINT fields for start and end of an event as
>>> UNIX time in milliseconds. I want to be able to have a resulting column
>>> with the delta in milliseconds and group by that difference. Also, I want
>>> to be able to have aggregations with window functions based upon the `end`
>>> field.
>>>
>>> The table definition looks like this:
>>> |CREATE TABLE sessions (
>>> |  `ats`   STRING,
>>> |  `e` BIGINT,
>>> |  `s` BIGINT,
>>> |  `proc_time` AS PROCTIME(),
>>> |  PRIMARY KEY (`ats`, `s`, `e`) NOT ENFORCED
>>> |)
>>>
>>> Then I have a few views like this:
>>>
>>> CREATE VIEW second_sessions AS
>>>   SELECT * FROM sessions
>>>   WHERE `e` - `s` = 1000
>>>
>>> And some windows using these views like this:
>>>
>>>   WINDOW w3m AS (
>>> PARTITION BY `t`
>>> ORDER BY `proc_time`
>>> RANGE BETWEEN INTERVAL '3' MINUTE PRECEDING AND CURRENT ROW
>>>   )
>>>
>>> I'd like to use the `e` field for windowing instead of `proc_time`. But
>>> I keep running into errors with the `TO_TIMESTAMP(BIGINT)` function now
>>> missing or with unsupported timestamp arithmetics.
>>>
>>> What is the best practice for a case such as this?
>>>
>>> Best Regards,
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>
>
> --
> Sebastián Ramírez Magrí
>


Re: [Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-02 Thread Timo Walther

Hi Sebastián,

it might be the case that some time functions are not correct due to the 
underlying refactoring of data structures. I will loop in Leonard in CC 
that currently works on improving this situation as part of FLIP-162 [1].


@Leonard: Is this wrong behavior on your list?

Regards,
Timo


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior





On 02.03.21 12:26, Sebastián Magrí wrote:

While using a simple query such as this

SELECT
    `ts`,
    FLOOR(`ts` TO WEEK) as `week_start`,
    CEIL(`ts` TO WEEK) as `week_end`
FROM some_table

I get some weird results like these:

2021-03-01T00:00|    2021-02-25T00:00|    2021-03-04T00:00

Which is obviously wrong since March 1st is on Monday, February 25th is 
Thursday as well as March 04th.


I've tried different combinations of timezone configurations and with 
both timestamps and dates, with the same results.


Is there anything obviously wrong in that query? Is there any 
configuration to keep in mind for the start of week day?


--
Sebastián Ramírez Magrí




Running Apache Flink on Android

2021-03-02 Thread Alexander Borgschulze
I was trying to run Apache Flink within an Android App. I just want to run a 
minimum working example, like this:


@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

runFlinkExample();
}

private void runFlinkExample() {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 
5));
stream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
} 
These are my two .gradle files:


build.gradle (Module)

 
plugins {
id 'com.android.application'
}

android {
compileSdkVersion 30
buildToolsVersion "30.0.3"

defaultConfig {
applicationId "com.example.flink"
minSdkVersion 26
targetSdkVersion 30
versionCode 1
versionName "1.0"

testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
}

buildTypes {
release {
minifyEnabled false
proguardFiles 
getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}

packagingOptions {
exclude 'META-INF/DEPENDENCIES'
exclude 'reference.conf'
}
}

dependencies {

implementation 'androidx.appcompat:appcompat:1.2.0'
implementation 'com.google.android.material:material:1.3.0'
implementation 'androidx.constraintlayout:constraintlayout:2.0.4'
testImplementation 'junit:junit:4.+'
androidTestImplementation 'androidx.test.ext:junit:1.1.2'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.3.0'

// Flink
implementation 'org.apache.flink:flink-streaming-java_2.12:1.12.1'
implementation 'org.apache.flink:flink-clients_2.12:1.12.1'
} 
build.gradle (Project)

// Top-level build file where you can add configuration options common to all 
sub-projects/modules.
buildscript {
repositories {
google()
jcenter()
}
dependencies {
classpath "com.android.tools.build:gradle:4.1.2"

// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
}

allprojects {
repositories {
google()
jcenter()
}
}

task clean(type: Delete) {
delete rootProject.buildDir
}
 
The first problem is, that I get the following Error:

Caused by: java.lang.ClassNotFoundException: Didn't find class 
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" on 
path: DexPathList[[zip file "/data/app/~~DbT_CZ7AhLED2xZgLBk 

 
In cases there this error doesn't appear, I get Akka-Actor errors, because I 
must exclude 'reference.conf', otherwise the code wouldn't compile. However, 
this leads to more exceptions, e.g. missing akka-version.
So my general question is: Is it possible to run Flink within an Android-App? 
Or is this not possible (recommended)? Perhaps someone knows how to modfiy my 
gradle files (or something else) to run my example. Or perhaps someone already 
has successfully used Flink in Android.




Re: Savepoint documentation

2021-03-02 Thread XU Qinghui
Out of curiosity, does it mean that savepoint created by flink 1.11 cannot
be recovered by a job running with flink 1.10 or older versions (so
downgrade is impossible)?

Le mar. 2 mars 2021 à 12:25, David Anderson  a écrit :

> You are correct in thinking that the documentation wasn't updated. If you
> look at the master docs [1] you will see that they now say
>
> Can I move the Savepoint files on stable storage? #
> 
>
> The quick answer to this question is currently “yes”. Sink Flink 1.11.0,
> savepoints are self-contained and relocatable. You can move the file and
> restore from any location.
>
>
> If you want more detail than the quick answer, see [2].
>
> Best,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#can-i-move-the-savepoint-files-on-stable-storage
> [2] https://issues.apache.org/jira/browse/FLINK-19381
>
> On Tue, Mar 2, 2021 at 10:33 AM Farouk  wrote:
>
>> Hi
>>
>> Does this chapter is outdated with Flink 1.11 ?
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage
>>
>> *Can I move the Savepoint files on stable storage?*
>>
>> *The quick answer to this question is currently “no” because the meta
>> data file references the files on stable storage as absolute paths for
>> technical reasons. The longer answer is: if you MUST move the files for
>> some reason there are two potential approaches as workaround. First,
>> simpler but potentially more dangerous, you can use an editor to find the
>> old path in the meta data file and replace them with the new path. Second,
>> you can use the class SavepointV2Serializer as starting point to
>> programmatically read, manipulate, and rewrite the meta data file with the
>> new paths.*
>>
>>
>> 
>>
>> Thanks
>> Farouk
>>
>


Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-02 Thread Fuyao Li
Hi Flink Community,

I need some help with JDBC sink in Datastream API. I can produce some records 
and sink it to database correctly. However, if I wait for 5 minutes between 
insertions. I will run into broken pipeline issue. Ater that, the Flink 
application will restart and recover from checkpoint and execute the failed SQL 
query. I tried hard to search for resources to understand such broken pipeline 
will happen, but I still can’t understand it.

The interesting thing is that, if the idle time is around 3 minutes, everything 
seems to be fine.

It seems to be a timeout related issue, but I just don’t know what should I do 
to fix the issue. I have shared the sink code. Could anyone share some ideas? 
Thank you so much!
My environment settings:
Flink version: 1.12.1
Scala version: 2.11
Java version: 1.11
Flink System parallelism: 1
JDBC Driver: Oracle ojdbc10
Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can 
regard this as an cloud based Oracle Database)

The code for the sink:
boDataStream
.addSink(
JdbcSink.sink(
"INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",
(preparedStatement, testInvoiceBo) -> {
  try {
  Gson gson = new GsonBuilder()
  .excludeFieldsWithoutExposeAnnotation()
  .create();
  String invoiceId = testInvoiceBo.getINVOICE_ID();
  String json = gson.toJson(testInvoiceBo);
  log.info("insertion information: {}", json);
  preparedStatement.setString(1, invoiceId);
  preparedStatement.setString(2, json);
  } catch (JsonIOException e) {
  log.error("Failed to parse JSON", e);
  }
},
new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(0)
.withBatchSize(1)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(DB_URL)
.withDriverName("oracle.jdbc.driver.OracleDriver")
.withUsername("admin")
.withPassword("password")
.build()))
.name("adwSink")
.uid("adwSink")
.setParallelism(1);

The JDBC broken pipeline log:




Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-02 Thread Fuyao Li
Sorry for the uncompleted email.

Error log of broken pipeline, the failed SQL will be executed after checkpoint 
automatic recovery. Please share some ideas on this issue. Really appreciate 
it. Thanks!

09:20:02,868 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,869 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
  

Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-02 Thread Kevin Lam
Hello everyone,

I have some questions about the Python API that hopefully folks in the
Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API
because of stakeholders who don’t have a background in Scala/Java, and
would prefer Python if possible. Our team is open to maintaining Scala
constructs on our end, however we are looking to expose Flink for stateful
streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in
Python, but must be written in Java/Scala [1]. What is the recommended
approach for interoperating between custom sinks/sources written in Scala,
with the Python API? If nothing is currently supported, is it on the road
map?

2/ Also, I’ve noted that the Python DataStream API has several connectors
[2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
there a way for users to build their own connectors? What would this
process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and
use them in our Python API Flink Applications. For example, defining a
BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python
interoperability in Flink, and how we can expose the power of Flink’s Scala
APIs to Python. Open to any suggestions, strategies, etc.

Looking forward to any thoughts!


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2]
https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
env = StreamExecutionEnvironment.get_execution_environment()
...
ds.add_sink(MyBigTableSink, ...)
env.execute("Application with Custom Sink")

if __name__ == '__main__':
example()
```


Re: [Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-02 Thread Jaffe, Julian
Calcite does not follow ISO-8601. Instead, until very recently Calcite weeks 
started on Thursdays[1].

(As an aside, Calcite somewhat abuses the WEEK time unit - converting a date to 
a week returns an integer representing the week of the year the date falls in 
while FLOORing or CEILing a timestamp to week returns a timestamp. This can 
cause integration issues with other systems if you're unaware)

Julian


[1] https://issues.apache.org/jira/browse/CALCITE-3412

On 3/2/21, 4:12 AM, "Timo Walther"  wrote:

Hi Sebastián,

it might be the case that some time functions are not correct due to the 
underlying refactoring of data structures. I will loop in Leonard in CC 
that currently works on improving this situation as part of FLIP-162 [1].

@Leonard: Is this wrong behavior on your list?

Regards,
Timo


[1] 

https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D162-253A-2BConsistent-2BFlink-2BSQL-2Btime-2Bfunction-2Bbehavior&d=DwIDaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=ZeBr2XK222xGShmn_0N2tF_qGbi7kvWg8WQrsLuquMk&s=3-zBtwyRTd7WiU63ZVwpTKW4vDnn-fjckRI9yjFjrNs&e=
 




On 02.03.21 12:26, Sebastián Magrí wrote:
> While using a simple query such as this
> 
> SELECT
> `ts`,
> FLOOR(`ts` TO WEEK) as `week_start`,
> CEIL(`ts` TO WEEK) as `week_end`
> FROM some_table
> 
> I get some weird results like these:
> 
> 2021-03-01T00:00|2021-02-25T00:00|2021-03-04T00:00
> 
> Which is obviously wrong since March 1st is on Monday, February 25th is 
> Thursday as well as March 04th.
> 
> I've tried different combinations of timezone configurations and with 
> both timestamps and dates, with the same results.
> 
> Is there anything obviously wrong in that query? Is there any 
> configuration to keep in mind for the start of week day?
> 
> -- 
> Sebastián Ramírez Magrí




Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-03-02 Thread Roman Khachatryan
Hi Jan,

Thanks for sharing your solution.
You probably also want to remove previously created timer(s) in
processElement; so that you don't end up with a timer per element.
For that, you can store the previous time (in function state).

Regards,
Roman


On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch 
wrote:

> Hi everybody,
>
> I just wanted to say thanks again for all your input and share the
> (surprisingly simple) solution that we came up with in the meantime:
>
> class SensorRecordCounter extends KeyedProcessFunction SensorRecord, SensorCount>{
>
> private ValueState state;
> private long windowSizeMs = 6L;
>
>  @Override
>   public void open(Configuration parameters) throws Exception {
> state = getRuntimeContext().getState(new
> ValueStateDescriptor<>("sensorCount", SensorCount.class));
>   }
>
>
> @Override
> public void processElement(SensorRecord sensorRecord, Context ctx,
> Collector out) throws Exception {
> SensorCount count = state.value();
> if (count == null) {
> count = new SensorCount();
> count.setSensorID(sensorRecord.getSensorID());
> count.setCount(0);
> }
> count.increase();
> state.update(count);
> out.collect(count);
>
> ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> windowSizeMs);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx,
> Collector out) throws Exception {
> SensorCount count = state.value();
> count.decrease();
> state.update(count);
> out.collect(count);
>
> if (count.getCount() <= 0) {
> state.clear();
> }
> }
>
> }
>
>
> Best regards and a nice weekend
>
> Jan
>
>
> On 09.02.21 08:28, Arvid Heise wrote:
>
> Hi Jan,
>
> Another solution is to insert Heartbeat-events at the source for each
> sensor. The solution is very similar to how to advance watermarks when
> there are no elements in the respective source partition.
>
> However, it's only easy to implement if you have your own source and know
> all sensors on application start. It might also be possible to implement if
> you use a new Source interface.
>
> On Tue, Feb 9, 2021 at 7:20 AM Yun Gao  wrote:
>
>>
>> Hi,
>>
>> I also think there should be different ways to achieve the target. For
>> the first option listed previously,
>> the pseudo-code roughly like
>>
>> class MyFunciton extends KeyedProcessFunction {
>> ValueState count;
>>
>> void open() {
>>count = ... // Create the value state
>>}
>>
>> ​void processElement(T t, Context context, Collector collector) {
>> ​Integer current = count.get();
>> if (current == null) {
>>   context.timeService().registerTimer(30); //
>> Register timer for the first time
>>   current = 0;
>> }
>>
>> count.update(current + 1); // update the count
>> }
>>
>> void onTimer(...) {
>>  collector.collect(new Tuple2<>(getCurrentKey(), count.get());
>>   context.timeService().registerTimer(30);  // register the
>> following timer
>> }
>> }
>>
>> 1. For flink the state and timer are all bound to a key implicitly, thus
>> I think they should
>> not need to be bound manually.
>> 2. To clear the outdated state, it could be cleared via count.clear(); if
>> it has been 0
>> for a long time. There are different ways to count the interval, like
>> register another timer
>> and clear the timer when received the elements or update the counter to
>> -1, -2... to mark
>> how much timer it has passed.
>>
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> --Original Mail --
>> *Sender:*Khachatryan Roman 
>> *Send Date:*Tue Feb 9 02:35:20 2021
>> *Recipients:*Jan Brusch 
>> *CC:*Yun Gao , user 
>> *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>
>>> Hi,
>>>
>>> Probably another solution would be to register a timer
>>> (using KeyedProcessFunction) once we see an element after keyBy. The timer
>>> will fire in windowIntervalMs. Upon firing, it will emit a dummy element
>>> which will be ignored (or subtracted) in the end.
>>> Upon receiving each new element, the function will shift the timer
>>> accordingly.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch 
>>> wrote:
>>>
 Hi Yun,

 thanks for your reply.

 I do agree with your point about standard windows being for high level
 operations and the lower-level apis offering a rich toolset for most
 advanced use cases.

 I have tried to solve my problem with keyedProcessFunctions also but
 was not able to get it to work for two reasons:

 1) I was not able to set up a combination of ValueState, Timers and
 Triggers that emulated a sliding window with a rising and falling count
 (including 0) good enough.

 2) Memory Leak: States / Windows should be cleared after a certain time
>>>

Flink + Hive + Compaction + Parquet?

2021-03-02 Thread Theo Diefenthal
Hi there, 

Currently, I have a Flink 1.11 job which writes parquet files via the 
StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 
minutes and thus have many small files in HDFS. Downstream, the generated table 
is consumed from Spark Jobs and Impala queries. HDFS doesn't like to have too 
many small files and writing to parquet fast but also desiring large files is a 
rather common problem and solutions were suggested like recently in the mailing 
list [1] or in flink forward talks [2]. Cloudera also posted two possible 
scenarios in their blog posts [3], [4]. Mostly, it comes down to asynchronously 
compact the many small files into larger ones, at best non blocking and in an 
occasionally running batch job. 

I am now about to implement something like suggested in the cloudera blog [4] 
but from parquet to parquet. For me, it seems to be not straight forward but 
rather involved, especially as my data is partitioned in eventtime and I need 
the compaction to be non blocking (my users query impala and expect near real 
time performance in each query). When starting the work on that, I noticed that 
Hive already has a compaction mechanism included and the Flink community works 
a lot in terms of integrating with hive in the latest releases. Some of my 
questions are not directly related to Flink, but I guess many of you have also 
experience with hive and writing from Flink to Hive is rather common nowadays. 

I read online that Spark should integrate nicely with Hive tables, i.e. instead 
of querying HDFS files, querying a hive table has the same performance [5]. We 
also all know that Impala integrates nicely with Hive so that overall, I can 
expect writing to Hive internal tables instead of HDFS parquet doesn't have any 
disadvantages for me. 

My questions: 
1. Can I use Flink to "streaming write" to Hive? 
2. For compaction, I need "transactional tables" and according to the hive 
docs, transactional tables must be fully managed by hive (i.e., they are not 
external). Does Flink support writing to those out of the box? (I only have 
Hive 2 available) 
3. Does Flink use the "Hive Streaming Data Ingest" APIs? 
4. Do you see any downsides in writing to hive compared to writing to parquet 
directly? (Especially in my usecase only having impala and spark consumers) 
5. Not Flink related: Have you ever experienced performance issues when using 
hive transactional tables over writing parquet directly? I guess there must be 
a reason why "transactional" is off by default in Hive? I won't use any 
features except for compaction, i.e. there are only streaming inserts, no 
updates, no deletes. (Delete only after given retention and always delete 
entire partitions) 


Best regards 
Theo 

[1] [ 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-data-to-parquet-td38029.html
 | 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-data-to-parquet-td38029.html
 ] 
[2] [ https://www.youtube.com/watch?v=eOQ2073iWt4 | 
https://www.youtube.com/watch?v=eOQ2073iWt4 ] 
[3] [ 
https://blog.cloudera.com/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
 | 
https://blog.cloudera.com/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
 ] 
[4] [ 
https://blog.cloudera.com/transparent-hierarchical-storage-management-with-apache-kudu-and-impala/
 | 
https://blog.cloudera.com/transparent-hierarchical-storage-management-with-apache-kudu-and-impala/
 ] 
[5] [ 
https://stackoverflow.com/questions/51190646/spark-dataset-on-hive-vs-parquet-file
 | 
https://stackoverflow.com/questions/51190646/spark-dataset-on-hive-vs-parquet-file
 ] 


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-03-02 Thread Jan Brusch

Hi Roman,

thanks for your reply.

Don't timers remove themselves after firing?

Apart from that, the idea is indeed to have one timer per element, so 
that we count one up whenever the element comes in and count one down 
exactly  later. So we emulate a sliding window without the 
"hops" in certain intervals. Instead, we always have a real-time running 
count of elements in the last . But yes, the price for that 
is to have one timer per element. Which is manageable for our use case 
(large windowsize, a LOT of sensors but relatively few elements per 
sensor). In fact, for our use case this solution is much more efficient 
than a sliding window.



Best regards

Jan


On 02.03.21 20:40, Roman Khachatryan wrote:

Hi Jan,

Thanks for sharing your solution.
You probably also want to remove previously created timer(s) in 
processElement; so that you don't end up with a timer per element.

For that, you can store the previous time (in function state).

Regards,
Roman


On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch > wrote:


Hi everybody,

I just wanted to say thanks again for all your input and share the
(surprisingly simple) solution that we came up with in the meantime:

class SensorRecordCounter extends KeyedProcessFunction{

private ValueState state;
private long windowSizeMs = 6L;

 @Override
  public void open(Configuration parameters) throws Exception {
    state = getRuntimeContext().getState(new
ValueStateDescriptor<>("sensorCount", SensorCount.class));
  }


@Override
public void processElement(SensorRecord sensorRecord, Context ctx,
Collector out) throws Exception {
    SensorCount count = state.value();
    if (count == null) {
    count = new SensorCount();
    count.setSensorID(sensorRecord.getSensorID());
    count.setCount(0);
    }
    count.increase();
    state.update(count);
    out.collect(count);

ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
windowSizeMs);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
    SensorCount count = state.value();
    count.decrease();
    state.update(count);
    out.collect(count);

    if (count.getCount() <= 0) {
    state.clear();
    }
}

}


Best regards and a nice weekend

Jan


On 09.02.21 08:28, Arvid Heise wrote:

Hi Jan,

Another solution is to insert Heartbeat-events at the source for
each sensor. The solution is very similar to how to advance
watermarks when there are no elements in the respective source
partition.

However, it's only easy to implement if you have your own source
and know all sensors on application start. It might also be
possible to implement if you use a new Source interface.

On Tue, Feb 9, 2021 at 7:20 AM Yun Gao mailto:yungao...@aliyun.com>> wrote:


Hi,

I also think there should be different ways to achieve the
target. For the first option listed previously,
the pseudo-code roughly like

class MyFunciton extends KeyedProcessFunction {
    ValueState count;

    void open() {
       count = ... // Create the value state
   }

​void processElement(T t, Context context, Collector
collector) {
        ​Integer current = count.get();
            if (current == null) {
context.timeService().registerTimer(30); // Register timer
for the first time
                      current = 0;
            }

            count.update(current + 1); // update the count
    }

    void onTimer(...) {
         collector.collect(new Tuple2<>(getCurrentKey(),
count.get());
context.timeService().registerTimer(30);  // register the
following timer
    }
}

1. For flink the state and timer are all bound to a key
implicitly, thus I think they should
not need to be bound manually.
2. To clear the outdated state, it could be cleared via
count.clear(); if it has been 0
for a long time. There are different ways to count the
interval, like register another timer
and clear the timer when received the elements or update the
counter to -1, -2... to mark
how much timer it has passed.


Best,
 Yun




--Original Mail --
*Sender:*Khachatryan Roman mailto:khachatryan.ro...@gmail.com>>
*Send Date:*Tue Feb 9 02:35:20 2021
*Recipients:*Jan Brusch mailto:jan.bru...@neuland-bfi.de>>
*CC:*Yun Gao mailto:yungao...@aliyun.com>>, user
mailto:user@flink.apache.org>>
*Subj

Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-02 Thread XU Qinghui
It sounds like the jdbc driver's connection is closed somehow, and probably
has nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close
the connection after some inactivity, or otherwise it could be your network
drops the inactive tcp connection after some time (you can try to use tcp
keepalive in this case).

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li  a écrit :

> Sorry for the uncompleted email.
>
>
>
> Error log of broken pipeline, the failed SQL will be executed after
> checkpoint automatic recovery. Please share some ideas on this issue.
> Really appreciate it. Thanks!
>
>
>
> 09:20:02,868 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC
> executeBatch error, retry times = 3
>
> java.sql.SQLRecoverableException: Closed Connection
>
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
>
> at
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
>
> at
> oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
>
> at
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
>
> at
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
>
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
>
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
>
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
>
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
>
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> at java.base/java.lang.Thread.run(Thread.java:834)
>
> 09:20:02,869 WARN
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  -
> Writing records to JDBC failed.
>
> java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
>
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
>
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
>
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by: java.sql.SQLRecoverableException: Closed Connection
>
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
>
> at
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
>
> at
> oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
>
>   

Re: Debugging long Flink checkpoint durations

2021-03-02 Thread Dan Hill
Thanks!  Yes, I've looked at these.   My job is facing backpressure
starting at an early join step.  I'm unclear if more time is fine for the
backfill or if I need more resources.

On Tue, Mar 2, 2021 at 12:50 AM Yun Gao  wrote:

> Hi Dan,
>
> I think you could see the detail of the checkpoints via the checkpoint
> UI[1]. Also, if you see in the
> pending checkpoints some tasks do not take snapshot,  you might have a
> look whether this task
> is backpressuring the previous tasks [2].
>
> Best,
> Yun
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>
> --
> Sender:Dan Hill
> Date:2021/03/02 04:34:56
> Recipient:user
> Theme:Debugging long Flink checkpoint durations
>
> Hi.  Are there good ways to debug long Flink checkpoint durations?
>
> I'm running a backfill job that runs ~10 days of data and then starts
> checkpointing failing.  Since I only see the last 10 checkpoints in the
> jobmaster UI, I don't see when it starts.
>
> I looked through the text logs and didn't see much.
>
> I assume:
> 1) I have something misconfigured that is causing old state is sticking
> around.
> 2) I don't have enough resources.
>
>
>


Re: java Flink local test failure (Could not create actor system)

2021-03-02 Thread Vijayendra Yadav
Hi Smile,

Thanks for your clarification, it helped.

Thanks,
Vijay

> On Feb 28, 2021, at 7:06 PM, Smile  wrote:
> 
> Hi Vijay,
> 
> Since version 1.7 Flink builds with Scala version 2.11 (default) and 2.12.
> Flink has APIs, libraries, and runtime modules written in Scala. Users of
> the Scala API and libraries may have to match the Scala version of Flink
> with the Scala version of their projects (because Scala is not strictly
> backward compatible). See [1] for more information.
> 
> If using maven, artifactId of Flink components usually end with scala
> version, such as flink-streaming-java_2.11 means it was built against Scala
> 2.11.
> 
> [1].
> https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html#scala-versions
> 
> Regards,
> Smile
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink 1.12 Compatibility with hbase-shaded-client 2.1 in application jar

2021-03-02 Thread Debraj Manna
Hi

I am trying to deploy an application in flink 1.12 having
hbase-shaded-client 2.1.0 as dependency  in application mode
.
On deploying the application I am seeing the below ClassCastException:

*org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
cannot be cast to
org.apache.hadoop.hbase.shaded.com.google.protobuf.Message*

*I have done *export HADOOP_CLASSPATH=`hadoop classpath` as mentioned in
the hadoop documentation. I did not add any hadoop / hbase jars in the
flink/lib folder .

ubuntu@vrni-platform://tmp/debraj-flink/flink/lib$ ls
flink-csv-1.12.1.jarflink-json-1.12.1.jar
 flink-table_2.12-1.12.1.jarlog4j-1.2.17.jar
slf4j-log4j12-1.7.25.jar
flink-dist_2.12-1.12.1.jar  flink-shaded-zookeeper-3.4.14.jar
 flink-table-blink_2.12-1.12.1.jar  log4j-to-slf4j-2.11.1.jar
 vrni-flink-datadog-0.001-SNAPSHOT.jar

Can anyone suggest what could be going wrong here?

The full exception trace is like below

2021-03-02 18:10:45,819 ERROR
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
- Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start the ResourceManager
akka.tcp://flink@localhost:41477/user/rpc/resourcemanager_0
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:233)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:607)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:181)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Cannot initialize resource provider.
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:124)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:245)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:229)
... 22 more
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start resource manager client.
at 
org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:181)
at 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:81)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:122)
... 24 more
Caused by: java.lang.ClassCastException:
org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
cannot be cast to
org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy16.registerApplicationMaster(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:107)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Flink Zookeeper leader change v 1.9.X

2021-03-02 Thread Varun Chakravarthy Senthilnathan
Hi,

We are using flink version 1.9.1 and in a long-running environment, we 
encountered the specific issue mentioned in : 
https://issues.apache.org/jira/browse/FLINK-14091
While we are working on upgrading our version,

  1.  Why does zookeeper go for a leader change? As far as we checked, there 
was not scaling in our cluster at all. The load was very minimal. Is there any 
reason for the zookeeper leader change to happen?
  2.  is there a way to replicate the zookeeper leader change manually to 
verify if the version upgrade helped us?

Regards,
Varun.



Flink, local development, finish processing a stream of Kafka data

2021-03-02 Thread Dan Hill
Hi.

For local and tests development, I want to flush the events in my system to
make sure I'm processing everything.  My watermark does not progress to
finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm
guessing there is logic to prevent removing an idle partition if it's the
only partition.  Is there a version of this I can enable for local
development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?
https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be

Do I need to write my own watermark generator?  Or change my test data to
have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark
doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator viewInput = env.addSource(...)
.uid("source-view")
.assignTimestampsAndWatermarks(

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));