Re: [Flink CDC] What's the difference between Pipeline connectors and Flink Source connectors?

2024-12-03 Thread Shengkai Fang
Accidentally sent an email that was not finished... Yaml is much easier for users to use compared to SQL. Many external systems can use yaml spec to build a data pipeline platform easily. Best, Shengkai Shengkai Fang 于2024年12月3日周二 14:53写道: > As far as I know, Flink pipeline connector

Re: [Flink CDC] What's the difference between Pipeline connectors and Flink Source connectors?

2024-12-02 Thread Shengkai Fang
As far as I know, Flink pipeline connector has the following benefits: 1. User-friendly: * Schema inference: you don't need write schema in the yaml file, the framework will convert the data type for users. * Yaml is much easier for users to use comparing to SQL. Many external system can use y

Re: Cdc on HA postgresql

2024-11-25 Thread Shengkai Fang
Do you use the latest version? Its doc says it supports incremental reading right now[1]. > As we see it the current version of Flink CDC can’t handle the failover If you configure the checkpoint, I think Flink CDC is able to recover from the checkpoint when something goes wrong. > only support

Re: Trying to connect to HBase from FlinkSql

2024-11-12 Thread Shengkai Fang
Hi. You should put the hbase jar into the ${FLINK-HOMe}/lib directory. Best, Shengkai Guillermo 于2024年11月12日周二 19:28写道: > I'm using Flink 1.20 because I need some features from this version, I > have seen that there're not available the HBase connector in this version, > I don't know if this i

Re: Re:FlinkSQL Hints - Boradcast table.

2024-11-04 Thread Shengkai Fang
I think you may need to dump the upsert-kafka data to some storage that accepts cdc data, e.g. paimon or hudi. Then look up the data in these data lake storage. But Flink SQL doesn't support event time lookup join. Best, Shengkai

Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-03 Thread Shengkai Fang
+1 a FLIP to clarify the idea. Please be careful to choose which type of state you use here. The doc[1] says the broadcast state doesn't support RocksDB backend here. Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#impo

Re: Can I dedup over an upsert topic?

2023-03-05 Thread Shengkai Fang
Hello. Thanks for sharing this with us. I think it's not easy work to support Deduplicate in streaming mode. For example, in the keep first-row case, we need to memorize all records during the running. Because the first row may be deleted at some point. One idea to work around is to use window ded

Re: Flink SQL on Docker

2022-10-24 Thread Shengkai Fang
Hi. Could you share the query you use in the tests and let us reproduce this problem offline? It's better you can provide us with more infos: - the Flink version you use - the logs in the sql-client and jm - It's better you can dump the memory to detect which uses the memory Best, Shengkai Trist

Re: Utilizing Kafka headers in Flink Kafka connector

2022-10-13 Thread Shengkai Fang
hi. You can use SQL API to parse or write the header in the Kafka record[1] if you are using Flink SQL. Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata Yaroslav Tkachenko 于2022年10月13日周四 02:21写道: > Hi, > > You can implemen

Re: Build failing when Flink version upgrade from 1.11.6 to 1.15.0

2022-10-13 Thread Shengkai Fang
Hi. I read the trace and I find nothing is related about the flink... could you also give us some code snippets about the blocking test. Best, Shengkai Pappula, Prasanna via user 于2022年10月14日周五 00:06写道: > > > I have upgraded the flink version from 1.11.6 to 1.15.0. Build is failing. > It hangs

Re: INSERT INTO will work faster in Flink than in regular database?

2022-09-18 Thread Shengkai Fang
Hi. I think you can write a udf[1] to process some fields and then insert into the sink. Best. Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ 于2022年9月15日周四 22:10写道: > What's the most effective way (performance) to update big no of rows? > Sure

Re: Does Table API connector, csv, has some option to ignore some columns

2022-07-10 Thread Shengkai Fang
Hi. In Flink SQL, you can select the column that you wants in the query. For example, you can use ``` SELECT col_a, col_b FROM some_table; ``` Best, Shengkai 于2022年7月9日周六 01:48写道: > Does Table API connector, CSV, has some option to ignore some columns in > source file? > For instance read on

Re: Re:How can I convert a SQL String to a ResolvedExpression?

2022-06-22 Thread Shengkai Fang
Hi. I think you can use Expressions#callSql to convert the String to Expression. Then you can use ExpressionResolver to resolve the converted Expression. Best, Shengkai Qing Lim 于2022年6月22日周三 23:58写道: > Hi Xuyang, > > > > Thanks for the pointer, however it does not seems to achieve what I want

Re: New KafkaSource API: Change in default behavior regarding starting offset

2022-06-20 Thread Shengkai Fang
hi. Please use English in the user mail list. If you want to unsubscribe the mail list, you can send mail to user-unsubscr...@flink.apache.org . Best, Shengkai liangzai 于2022年6月19日周日 10:36写道: > 请问这个邮件咋退订? > > > Replied Message > From bastien dine > Date 06/15/2022 17:50 > To Marti

Re: Flink, JSON, and JSONSchemas

2022-06-16 Thread Shengkai Fang
Hi. > *1. Is there some easy way to use deserialized JSON in DataStream without case classes or POJOs?* Could you explain what you expected? Do you mean you want to just register a DataType that is able to bridge the received bytes to the POJO objects. I am not sure wether the current RAW type[1]

Re: context.timestamp null in keyedprocess function

2022-06-15 Thread Shengkai Fang
hi. Could you share more info for us, e.g. exception stack? Do you set the assigner for all the source? I think you can modify the KeyedProcessFuncition to print the message whose timestamp is null. Best, Shengkai bat man 于2022年6月15日周三 14:57写道: > Has anyone experienced this or has any clue? >

Re: Fink 15: InvalidProgramException: Table program cannot be compiled. This is a bug

2022-06-09 Thread Shengkai Fang
Hi. I open a ticket about upgrading the version[1]. Maybe it is worth a try. Best, Shengkai [1] https://issues.apache.org/jira/browse/FLINK-27995 Benenson, Michael 于2022年6月10日周五 04:51写道: > Hi, David > > > > Hard to tell for sure, but yes, [1] could also indicate some problems with > Janio. >

Re: Flink config driven tool ?

2022-06-07 Thread Shengkai Fang
Hi. I am not sure whether Flink SQL satisfies your requirement or not. You can just write the SQL in the file and use the SQL Client to submit it to your cluster. We have a quick start in the Flink CDC and you can make a try[1]. Best, Shengkai [1] https://ververica.github.io/flink-cdc-connector

Re: Not able to see std output in console/.out files with table API

2022-06-05 Thread Shengkai Fang
Hi. The TableResult.print() only prints the result to the client console. How do you redirect the output to the .out file? Can you get the output without redirection? Best, Shengkai Xuyang 于2022年6月2日周四 21:17写道: > Could you find that the input amount of the node `sink` is > being accumulated in

Re: [External] Re: Exception when running Java UDF with Blink table planner

2022-05-31 Thread Shengkai Fang
Hi, Tom. I don't reproduce the exception in the master. I am not sure whether the problem is fixed or I missing something. The only difference is my test udf extends ScalarFunction rather than DPScalarFunction and I use String[] as the input type. ``` public static class ListToString extends Sc

Re: Custom restart strategy

2022-05-29 Thread Shengkai Fang
Given very few pipelines experience failures and > they are far in-between, I am looking for a push based model vs polling. > > Thanks > AK > > On Thu, May 26, 2022 at 7:21 PM Shengkai Fang wrote: > >> Hi. >> >> I think you can use REST OPEN API to fetch

Re: Exception when running Java UDF with Blink table planner

2022-05-26 Thread Shengkai Fang
Hi. Could you also tell us which Flink version you are using, the schema of the source table and some test data? With these info, we can debug in our local environment. Best, Shengkai Tom Thornton 于2022年5月27日周五 06:47写道: > We are migrating from the legacy table planner to the Blink table planne

Re: Custom restart strategy

2022-05-26 Thread Shengkai Fang
Hi. I think you can use REST OPEN API to fetch the job status from the JM periodically to detect whether something happens. Currently REST OPEN API also supports to fetch the exception list for the specified job[2]. Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops

Re: LinkedMap ClassCastException issue

2022-05-25 Thread Shengkai Fang
Hi. Could you tell us the version of the Flink you are using? What's the version of commons-collections:commons-collections:jar when you compile the sql and the version in the cluster? It's possible you compile the sql and submit with the different version. I am not sure how you submit your flink

Re: length value for some classes extending LogicalType.

2022-05-25 Thread Shengkai Fang
Hi. It will also influence how Flink serialize/deserialize the RowData. For example, Flink will build the TimestampDataSerializer with specified precision in the type. You can see it only extract the expected part to serialize[1]. But for char/varchar type, the serializer will not truncate the str

Re: accuracy validation of streaming pipeline

2022-05-24 Thread Shengkai Fang
Hi, all. >From my understanding, the accuracy for the sync pipeline requires to snapshot the source and sink at some points. It is just like we have a checkpoint that contains all the data at some time for both sink and source. Then we can compare the content in the checkpoint and find the differ

Re: Application mode deployment through API call

2022-05-24 Thread Shengkai Fang
the CLIFrontend is not as robust as the REST API, or you >> will end up having to rebuild a very similar Rest API. For the meta space >> issue, have you tried adding shared libraries to the flink lib folder? >> >> On Mon, May 23, 2022 at 23:31 Shengkai Fang wrote: >>

Re: accuracy validation of streaming pipeline

2022-05-23 Thread Shengkai Fang
It's a good question. Let me ping @Leonard to share more thoughts. Best, Shengkai vtygoss 于2022年5月20日周五 16:04写道: > Hi community! > > > I'm working on migrating from full-data-pipeline(with spark) to > incremental-data-pipeline(with flink cdc), and i met a problem about > accuracy validation bet

Re: Job Logs - Yarn Application Mode

2022-05-23 Thread Shengkai Fang
If you find the JM in the yarn web ui, I think you can also find the webui to access the Flink web ui with the JM. Best, Shengkai

Re: Window aggregation fails after upgrading to Flink 1.15

2022-05-23 Thread Shengkai Fang
Glad to see you find the root cause. I think we can shade the janino dependency if it influences the usage. WDYT, godfrey? Best, Shengkai Pouria Pirzadeh 于2022年5月21日周六 00:59写道: > Thanks for help; I digged into it and the issue turned out to be the > version of Janino: > flink-table has pinned

Re: Application mode -yarn dependancy error

2022-05-23 Thread Shengkai Fang
Hi. I think you should send the mail to the user mail list or stack overflow, which is about the usage and help. The dev mail list focus on the design of the Flink itself. Could you share more details for your problems, including - which version you use. - how you use the Flink, including you cod

Re: Json Deserialize in DataStream API with array length not fixed

2022-05-23 Thread Shengkai Fang
Hi. In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For example, ``` CREATE TABLE source( `array_coordinates` ARRAY> ) WITH ( 'format' = 'json' ) ``` [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/ Zain Haider Nemati

Re: Application mode deployment through API call

2022-05-23 Thread Shengkai Fang
Hi, all. > is there any plan in the Flink community to provide an easier way of deploying Flink with application mode on YARN Yes. Jark has already opened a ticket about how to use the sql client to submit the SQL in application mode[1]. What's more, in FLIP-222 we are able to manage the jobs in

Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Shengkai Fang
ss the JM launched by YARN, users need to access YARN web ui to find > the YARN application by applicationId and then click 'application master > url' of that application to be redirected to Flink web ui. > > Best, > Biao Geng > > Shengkai Fang 于2022年5月20日周五 10:5

Re: Does kafka key is supported in kafka sink table

2022-05-19 Thread Shengkai Fang
Hi. Yes. Flink supports to write the value to the Kafka record key parts. You just need to specify which column belongs to the key in the WITH blocks, e.g. ``` CREATE TABLE kafka_sink ( ... ) WITH ( `key.fields` = 'id' ); ``` [1] https://nightlies.apache.org/flink/flink-docs-master/docs/conne

Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Shengkai Fang
Hi. I am not familiar with the YARN application mode. Because the job manager is started when submit the jobs. So how can users know the address of the JM? Do we need to look up the Yarn UI to search the submitted job with the JobID? Best, Shengkai Weihua Hu 于2022年5月20日周五 10:23写道: > Hi, > You

Re: How to KafkaConsume from Particular Partition in Flink(version 1.14.4)

2022-05-19 Thread Shengkai Fang
Hi, If you use SQL API, you can specify the partition in the DDL[1] and filter out the record that you don't need. ``` CREATE TABLE KafkaSource ( ... `partition` METADATA ) WITH ( ... ); SELECT * FROM KafkaSource WHERE partition = 1; ``` Best, Shengkai [1] https://nightlies.apache.o

Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

2022-04-21 Thread Shengkai Fang
Hi, The watermark of the join operator is the minimum of the watermark of the input streams. ``` JoinOperator.watermark = min(left.watermark, right.watermark); ``` I think it's enough for most cases. Could you share more details about the logic in the UDF getEventTimeInNS? I think the better s

Re: XXX doesn't exist in the parameters of the SQL statement

2022-04-18 Thread Shengkai Fang
Hi, John. Could you share the exception stack to us and the schema of the `dummy` table in your database? Best, Shengkai John Tipper 于2022年4月17日周日 21:15写道: > Hi all, > > I'm having some issues with getting a Flink SQL application to work, where > I get an exception and I'm not sure why it's oc

Re: Flink SQL support array transform function

2021-08-03 Thread Shengkai Fang
Hi, Caizhi. Do you think we should support this? Maybe we can open a jira for this or to align with the spark to support more useful built-in functions. Caizhi Weng 于2021年8月3日周二 下午3:42写道: > Hi! > > Currently there is no such built-in function in Flink SQL. You can try to > write your own user-

Re: Using Hive UDFs

2021-04-27 Thread Shengkai Fang
Hi. The order of the module may influence the load of the function. [1] https://issues.apache.org/jira/browse/FLINK-22383 Youngwoo Kim (김영우) 于2021年4月28日周三 上午10:50写道: > Hi, > > I've configured Hive metastore to use HiveCatalog in streaming > application. So far, most of the features are working

Re: Read Hive table in Stream Mode use distinct cause heap OOM

2021-04-25 Thread Shengkai Fang
Hi, could you tell me which version do you use? I just want to check whether there are any problems. Best, Shengkai 张颖 于2021年4月25日周日 下午5:23写道: > hi,I met an appearance like this: > > this is my sql: > SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat > FROM app.app_rankin

Re: Watermarks in Event Time Temporal Join

2021-04-25 Thread Shengkai Fang
Hi, maverick. The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit. Best, Shengkai maverick 于2021年4月26日周一 上午2:31写道: > Hi, > I'm curious why Even

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Shengkai Fang
Sorry for the typo... I mean it will not take too much time. Best, Shengkai Shengkai Fang 于2021年3月9日周二 上午10:25写道: > Hi, Yuval. > > I have opened a ticket about this[1]. But I don't think we have any > solution to solve. > > Do you have time to help us to solve this? I

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Shengkai Fang
Shenkai, > That does explain what I'm seeing. > > Jark / Shenkai - Is there any workaround to get Flink to work with push > watermarks and predicate pushdown until this is resolved? > > On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang wrote: > >> Hi, Yuval, Jark, Timo.

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-07 Thread Shengkai Fang
Hi, Yuval, Jark, Timo. Currently the watermark push down happens in the logical rewrite phase but the filter push down happens in the local phase, which means the planner will first check the Filter push down and then check the watermark push down. I think we need a rule to transpose between the

Re: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-26 Thread Shengkai Fang
e and thus the > watermark is not increased. In any case I have observed how with a larger > number of source tasks no results are produced. > > > > Best, > > Jan > > *Von:* Shengkai Fang > *Gesendet:* Freitag, 26. Februar 2021 15:32 > *An:* Jan Oelschlegel >

Re: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-26 Thread Shengkai Fang
Hi, Jan. Could you tell us which Flink version you use? As far as I know, the kafka sql connector has implemented `SupportWatermarkPushDown` in Flink-1.12. The `SupportWatermarkPushDown` pushes the watermark generator into the source and emits the minimum watermark among all the partitions. For mo

Re: Pushing Down Filters

2021-01-15 Thread Shengkai Fang
Hi Satyam, Currently, the community is using the new table source/sink API and the `FilterableTableSource`, `ProjectableTableSource` have been deprecated. The interface `SupportsProjectionPushDown` and `SupportsFilterPushDown` are the new interfaces to push down the `projection` and `filter`. You

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread Shengkai Fang
The checkpoint is a snapshot for the job and we can resume the job if the job is killed unexpectedly. The state is another thing to memorize the intermediate result of calculation. I don't think the checkpoint can replace state. 大森林 于2020年10月7日周三 下午12:26写道: > Could you tell me: > > why we need k

Re: what's the example for datastream data generator?

2020-10-06 Thread Shengkai Fang
Hi, I think you can take a look at *org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan#createSourceTransformation*, which will tell you how to get transformations by source function and stream execution environment. In datastream api, we also have a DataGen that is *org.