Re: Re: Flink sql regular join not working as expect.

2021-06-04 Thread JING ZHANG
Hi, Although JDBC connector could not read changlog from Database, however there are already connectors which could satisfy your demands. You could use Maxwell [1], Canal [2],Debezium [3] CDC tools to capture

Re: Elasticsearch sink connector timeout

2021-06-04 Thread Kai Fu
With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that. * 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shade

Re: Question about State TTL and Interval Join

2021-06-04 Thread JING ZHANG
Hi Chris, There is no need to state TTL if stateful operators only contain IntervalJoin. Please check the watermark of two input streams, does the watermark not advance for a long time? Best regards, JING ZHANG McBride, Chris 于2021年6月5日周六 上午3:17写道: > We currently have a flink 1.8 application de

Is it possible to use OperatorState, when NOT implementing a source or sink function?

2021-06-04 Thread Marco Villalobos
Is it possible to use OperatorState, when NOT implementing a source or sink function? If yes, then how?

Re: Re: Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. BTW, would we support read changelog for JDBC source when it works as right stream of a regular join in future? 1095193...@qq.com From: JING ZHANG Date: 2021-06-04 18:32 To: Yun Gao CC: 1095193...@qq.com; user Subject: Re

Elasticsearch sink connector timeout

2021-06-04 Thread Kai Fu
Hi team, We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32. The error log is as below, we want t

Re: Flink exported metrics scope configuration

2021-06-04 Thread Kai Fu
Hi Mason, Thank you for the advice, as I tried, it works and reduces a lot in size. On Fri, Jun 4, 2021 at 11:45 AM Mason Chen wrote: > Hi Kai, > > You can use the excluded variables config for the reporter. > >- metrics.reporter..scope.variables.excludes: (optional) A >semi-colon (;) s

Re: Corrupted unaligned checkpoints in Flink 1.11.1

2021-06-04 Thread Alexander Filipchik
Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing. To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb): T1: Job1 was running T2: Job1 was savepointed, brought down and replaced with Job2. T3: Atte

Question about State TTL and Interval Join

2021-06-04 Thread McBride, Chris
We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that our checkpoint sizes continue to increase over time, we eventually have OOM failures

Re: Add control mode for flink

2021-06-04 Thread Peter Huang
I agree with Steven. This logic can be added in a dynamic config framework that can bind into Flink operators. We probably don't need to let Flink runtime handle it. On Fri, Jun 4, 2021 at 8:11 AM Steven Wu wrote: > I am not sure if we should solve this problem in Flink. This is more like > a dy

Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Thomas Wang
Hi Yun, Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented. Also could you suggest how I could use the "request-id" to get the savepoint location? T

Re:Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread SmileSmile
Hi, after failover still OutOfOrderSequenceException. when I close checkpoint, kafka broker still return OutOfOrderSequenceException to me . At 2021-06-04 17:52:22, "Yun Gao" wrote: Hi, Have you checked if the error during normal execution, or right after failover? Best, Y

Re: Add control mode for flink

2021-06-04 Thread Steven Wu
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-sprin

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
Thanks Timo 🙏 On Fri, Jun 4, 2021, 17:13 Timo Walther wrote: > Hi Yuval, > > I would recommend option 2. Because esp. when it comes to state you > should be in control what is persisted. There is no guarantee that the > ExternalSerializer will not change in the future. It is only meant for > shi

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Timo Walther
Hi Yuval, I would recommend option 2. Because esp. when it comes to state you should be in control what is persisted. There is no guarantee that the ExternalSerializer will not change in the future. It is only meant for shipping data as the input of the next operator. I would recommend to wr

Add control mode for flink

2021-06-04 Thread 刘建刚
Hi everyone, Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following: 1. Change data processing’ logic, such as filter condition. 2. Send trigger events to make the progre

Re: Error with extracted type from custom partitioner key

2021-06-04 Thread Timo Walther
Hi Ken, non-POJOs are serialized with Kryo. This might not give you optimal performance. You can register a custom Kryo serializer in ExecutionConfig to speed up the serialization. Alternatively, you can implement `ResultTypeQueryable` provide a custom type information with a custom serializ

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
Hi Timo, Thank you for the response. The tables being created in reality are based on arbitrary SQL code such that I don't know what the schema actually is to create the TypeInformation "by hand" and pass it on to the DataStream API. This leaves me with option 1, which leads to another question:

Error with extracted type from custom partitioner key

2021-06-04 Thread Ken Krugler
Hi all, I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, with a DataSet) to do a better job of distributing data to tasks. The classes look like: public class MyPartitioner implements Partitioner { ... } public class MyGroupingKey implements Comparable { ..

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Timo Walther
Hi Yuval, TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge between TypeInformation and DataType until TypeInformation is not exposed through the Table API anymore. Beginning from Flink 1.13 the Table API is able to serialize the records to the first DataStream operator

Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
When upgrading to Flink 1.13, I ran into deprecation warnings on TypeConversions [image: image.png] The deprecation message states that this API will be deprecated soon, but does not mention the alternatives that can be used for these transformations. My use case is that I have a table that need

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Wouter Zorgdrager
Hi Dian, all, Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception: Caused by: java.lang.NegativeArraySizeException: -2147183315 at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.

Re: Flink sql regular join not working as expect.

2021-06-04 Thread JING ZHANG
Hi, JDBC source only does a snapshot and sends all datas in the snapshot to downstream when it works as a right stream of a regular join, it could not produce a changlog stream. After you update the field 'target' from '56.32.15.55:8080' to ' 56.32.15.54:8080', JDBC source would not send new data

Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread Yun Gao
Hi, Have you checked if the error during normal execution, or right after failover? Best, Yun -- From:SmileSmile Send Time:2021 Jun. 4 (Fri.) 11:07 To:user Subject:open checkpoint, send message to kafka OutOfOrderSequenceExceptio

Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Yun Gao
Hi Thomas, I think you are right that the CLI is also using the same rest API underlying, and since the response of the rest API is ok and the savepoint is triggered successfully, I reckon that it might not be due to rest API process, and we might still first focus on the stop-with-savepoint p

Re: Flink sql regular join not working as expect.

2021-06-04 Thread Yun Gao
Hi, I'm not the expert for the table/sql, but it seems to me that for regular joins, Flink would not re-read the dimension table after it has read it fully for the first time. If you want to always join the records with the latest version of dimension table, you may need to use the temporal j

Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Hi I am working on joining a Kafka stream with a Postgres Dimension table. Accoring to: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/ "Regular joins are the most generic type of join in which any new record, or changes to either side of the j