Re: Questions about using Flink MongoDB CDC

2024-11-17 Thread Jiabao Sun
Hi, The Flink SQL Planner uses the ChangelogNormalize operator to cache all incoming data for upsert type Changelog in order to complete the pre-image values, which results in additional state overhead. When the MongoDB version is below 6.0, the oplog does not contain Pre-Images of changed record

Re: Backporting array_agg from 1.20 to 1.18

2024-10-24 Thread Jiabao Sun
Hi Daniele, It's a good idea to implement a UDF[1] by referring to the ArrayAggFunction[2] in Flink. I don't understand what "I don't get how RowData comes into play" means. Could you clarify it? Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/

Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
y MongoDBSource ? > > I don't see any option in the builder for the same. > > Regards > Sachin > > > > On Mon, Aug 19, 2024 at 8:00 PM Jiabao Sun wrote: > > > Sorry, in my previous reply, I mistakenly wrote Flink 2.0 instead of Flink > > CDC 2.0. > &

Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
n of flink ? > > Thanks > Sachin > > > On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun wrote: > > > Hi Sachin, > > > > Incremental snapshot reading is a new feature introduced in Flink 2.0. > > > > It has the following capabilities: > > - Source c

Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
ction faster. > > Am I understanding that correctly? > > Also I am using Flink 1.8, would it work with this version of flink ? > > Thanks > Sachin > > > On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun wrote: > > > Hi Sachin, > > > > Incremental snapsh

Re: How to set the number formatter for json convertor for mongo cdc connector

2024-08-19 Thread Jiabao Sun
Hi Sachin, It is recommended to use org.bson.Document to convert MongoDB Extended JSON into Java types, and then perform further field mapping. .deserializer(new DebeziumDeserial

Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
Hi Sachin, Incremental snapshot reading is a new feature introduced in Flink 2.0. It has the following capabilities: - Source can be parallel during snapshot reading to improve snapshot speed - Source can perform checkpoints in the chunk granularity during snapshot reading Limitation: - MongoD

Re: Mongo flink CDC connector not reading from the source

2024-08-17 Thread Jiabao Sun
Hi Sachin, The 'collectionList' needs to be filled with fully qualified names. For example, database: test_db collection: test_collection MongoDBSource.builder() .hosts(HOSTNAME) .scheme(SCHEME) .databaseList("test_db") .collectionList("test_db.test_collection") ... Bes

Re: Integrating flink CDC with flink

2024-08-16 Thread Jiabao Sun
connector or I can use > flink-connector-mongodb-cdc to process both existing and new data ? > > Thanks > Sachin > > > On Fri, Aug 16, 2024 at 3:46 PM Jiabao Sun wrote: > > > Hi Sachin, > > > > flink-connector-mongodb supports batch reading and writing

Re: Integrating flink CDC with flink

2024-08-16 Thread Jiabao Sun
Hi Sachin, flink-connector-mongodb supports batch reading and writing to MongoDB, similar to flink-connector-jdbc, while flink-connector-mongodb-cdc supports streaming MongoDB changes. If you need to stream MongoDB changes, you should use flink-connector-mongodb-cdc. You can refer to the fol

Re: How can I debug Assigned key must not be null error when reading from Mongodb source

2024-08-05 Thread Jiabao Sun
Hi Sachin, Could you please check if you have used the keyBy operator and ensure that the keyBy field is not null? Best, Jiabao On 2024/08/05 12:33:27 Sachin Mittal wrote: > So I have an anonymous class implementing MongoDeserializationSchema > > new MongoDeserializationSchema() { > @Overrid

RE: 退订

2024-02-08 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user@flink.apache.org, and you can refer [1][2] for more details. 请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自 user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Best, Jiabao [1] https://fl

RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Jiabao Sun
he/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java > But this doesn't seem released yet. Can you please point me towards correct > Flink version? > > Also, any help on question 1 regarding Schema Registry? > > Regards, > Kirti Dhar > > -Origin

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Jiabao Sun
Hi Kirti, Kafka Sink supports sending messages with headers. You should implement a HeaderProvider to extract headers from input element. KafkaSink sink = KafkaSink.builder() .setBootstrapServers(brokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder()

RE: Re: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
> Could you please share examples on how to "*update*" data using > ElasticsearchSink? > > Thanks > > On Mon, Jan 29, 2024 at 9:07 PM Jiabao Sun wrote: > > > Hi Fidea, > > > > I found some examples in the Java documentation, and I hope they can

RE: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
Hi Fidea, I found some examples in the Java documentation, and I hope they can be helpful. private static class TestElasticSearchSinkFunction implements ElasticsearchSinkFunction> { public IndexRequest createIndexRequest(Tuple2 element) { Map json = new HashMap<>(); json.pu

RE: Elasticsearch Sink 1.17.2 error message

2024-01-25 Thread Jiabao Sun
Hi Tauseef, We cannot directly write POJO types into Elasticsearch. You can try serializing the TopologyDTO into a JSON string like Jackson before writing it. public static void main(String[] args) throws IOException { try (RestHighLevelClient client = new RestHighLevelClient(

RE: 回复:RE: how to get flink accumulated sink record count

2024-01-25 Thread Jiabao Sun
ring the metric in Flink tasks. > > > > > -- 原始邮件 ------ > 发件人: "Jiabao Sun" 发送时间: 2024年1月25日(星期四) 下午3:11 > 收件人: "user" 主题: RE: how to get flink accumulated sink record count > > > > > > I guess ge

RE: how to get flink accumulated sink record count

2024-01-24 Thread Jiabao Sun
Hi Enric, I guess getting the metrics[1] might be helpful for you. You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3]. Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/ [2] https://nightlies.apache.org/flink/flink-docs-relea

RE: Re: Python flink statefun

2024-01-19 Thread Jiabao Sun
e where in Python it's handled). > > Bests, > > Alex > > Le ven. 19 janv. 2024 à 02:44, Jiabao Sun a > écrit : > > > Hi Alexandre, > > > > I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in > > Docker Hub. > > Y

RE: Python flink statefun

2024-01-18 Thread Jiabao Sun
Hi Alexandre, I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in Docker Hub. You can temporarily use the release-3.2 version. Hi Martijn, did we ignore pushing it to the docker registry? Best, Jiabao [1] https://hub.docker.com/r/apache/flink-statefun-playground/tags On 2

RE: Flink Slow Execution

2024-01-17 Thread Jiabao Sun
Hi Dulce, MiniCluster is generally used for local testing and is limited by the resources of a single machine. When more tasks are executed, it may not be able to immediately acquire the resources needed to start the MiniCluster, resulting in slower startup times. If running Flink tasks in a

Re: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
e execution. > I can't figure out whether this is a problem with the flink connector or > iceberg. > > Jiabao Sun mailto:jiabao@xtransfer.cn>> > 于2024年1月10日周三 18:15写道: >> Hi haifang, >> >> lower-bound and upper-bound are defined as long types, and it

RE: 退订这个邮箱

2024-01-10 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user@flink.apache.org, and you can refer [1][2] for more details. 请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自 user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Best, Jiabao [1] https://fl

RE: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
Hi haifang, lower-bound and upper-bound are defined as long types, and it seems difficult to fill in the value of timestamp. However, you may use WHERE t > TIMESTAMP '2022-01-01 07:00:01.333', as JDBC supports filter pushdown. Best, Jiabao On 2024/01/10 08:31:23 haifang luo wrote: > Hello~~ >

RE: Rabbitmq connector for Flink v1.18

2024-01-09 Thread Jiabao Sun
Hi Charlotta, The latest news about connector releases is here[1]. You can subscribe to the mailing list or follow the jira issue to get the latest updates. Best, Jiabao [1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2 [2] https://flink.apache.org/what-is-flink/community/

RE: Pending records

2023-12-21 Thread Jiabao Sun
Hi rania, Does "pending records" specifically refer to the records that have been read from the source but have not been processed yet? If this is the case, FLIP-33[1] introduces some standard metrics for Source, including "pendingRecords," which can be helpful. However, not all Sources suppor

RE: Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-20 Thread Jiabao Sun
Hi Chosen, Whether kafka appender is supported or not has no relation to the flink-kubernetes-operator. It only depends on whether log4j2 supports kafka appender. From the error message, it appears that the error is caused by the absence of the log4j-layout-template-json[1] plugin. For the cus

RE: Feature flag functionality on flink

2023-12-18 Thread Jiabao Sun
Hi, If it is for simplicity, you can also try writing the flag into an external system, such as Redis、Zookeeper or MySQL, and query the flag from the external system when perform data processing. However, Broadcast State is still the mode that I recommend. Perhaps we only need to encapsulate t

RE: Control who can manage Flink jobs

2023-12-17 Thread Jiabao Sun
Hi, I don't have much experience with Beam. If you only need to submit Flink tasks, I would recommend StreamPark[1]. Best, Jiabao [1] https://streampark.apache.org/docs/user-guide/Team On 2023/11/30 09:21:50 Поротиков Станислав Вячеславович via user wrote: > Hello! > Is there any way to contro

RE: Socket timeout when report metrics to pushgateway

2023-12-17 Thread Jiabao Sun
Hi, The pushgateway uses push mode to report metrics. When deployed on a single machine under high load, there may be some performance issues. A simple solution is to set up multiple pushgateways and push the metrics to different pushgateways based on different task groups. There are other met

RE: Questions about java enum when convert DataStream to Table

2023-08-02 Thread Jiabao Sun
Hi haishui, The enum type cannot be mapped as flink table type directly. I think the easiest way is to convert enum to string type first: DataStreamSource> source = env.fromElements( new Tuple2<>("1", TestEnum.A.name()), new Tuple2<>("2", TestEnum.B.name()) ); Or add a map trans

RE: Suggestions for Open Source FLINK SQL editor

2023-07-26 Thread Jiabao Sun
Hi Rajat, I think Apache StreamPark(Incubating) or Apache Zeppelin is a good choice. https://streampark.apache.org/ https://zeppelin.apache.org/ Best, Jiabao On 2023/07/19 16:47:43 Rajat Ahuja wrote: > Hi team, > > I have set up

RE: Re: flink configuration in flink kubernetes operator question about password

2023-07-26 Thread Jiabao Sun
Hi tian tian, I think we can use podTemplate to mount kubernetes secrets as file or environment variables. Then we can access the secrets in our flink program. Please refers to https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml