[jira] [Created] (FLINK-36622) Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.
Han Yin created FLINK-36622: --- Summary: Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs. Key: FLINK-36622 URL: https://issues.apache.org/jira/browse/FLINK-36622 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 2.0-preview Reporter: Han Yin Fix For: 2.0.0 Currently, flink-benchmarks relies on non-public APIs in Flink. For example, in {_}+StateBackendBenchmarkUtils.java+{_}, the function _+compactState+_ takes RocksDBKeyedStateBackend as its first argument. This requires explicit type conversion in flink-benchmark(from +_KeyedStateBackend_+ to {+}_RocksDBKeyedStateBackend_{+}). Moreover, this means that once the signature of +_RocksDBKeyedStateBackend_+ changes, we need to modify flink-benchmark correspondingly. Therefore, we should avoid exposing non-public APIs in {_}+StateBackendBenchmarkUtils+{_}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36621) Build failure: StatefulSink not found
Piotr Nowojski created FLINK-36621: -- Summary: Build failure: StatefulSink not found Key: FLINK-36621 URL: https://issues.apache.org/jira/browse/FLINK-36621 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 2.0-preview Reporter: Piotr Nowojski Locally in the IntelliJ building Flink fails for me due to: {code:java} flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42 java: cannot access org.apache.flink.api.connector.sink2.StatefulSink class file for org.apache.flink.api.connector.sink2.StatefulSink not found {code} flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in turns is still referring to the StatefulSink: {code:java} public class KafkaSink implements StatefulSink, TwoPhaseCommittingSink (...) {code} which has been deleted in FLINK-36245. I think maven builds might be working due to some luck and differences between how IntelliJ and Maven are interpreting pom files and dealing with the dependencies. CC [~kunni] [~renqs] [~Leonard] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36625) Add helper classes for Lineage integration in connectors
Zhenqiu Huang created FLINK-36625: - Summary: Add helper classes for Lineage integration in connectors Key: FLINK-36625 URL: https://issues.apache.org/jira/browse/FLINK-36625 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Zhenqiu Huang -- This message was sent by Atlassian Jira (v8.20.10#820010)
Micro batching with flink
Hello team, I apologize for reaching out on the dev mailing list. I'm working on implementing micro-batching with near real-time processing. I've seen similar questions in the Flink Slack channel and user mailing list, but there hasn't been much discussion or feedback. Here are the options I've explored: 1. Windowing: This approach looked promising, but the flushing mechanism requires record-level information checks, as window data isn't accessible throughout the pipeline. 2. Window + Trigger: This method buffers events until the trigger interval is reached, which affects real-time processing; events are only processed when the trigger occurs. 3. Processing Time: The processing time is specific to each file writer, resulting in inconsistencies across different task managers. 4. Watermark: There’s no global watermark; it's specific to each source task, and the initial watermark information (before the first watermark event) isn't epoch-based. I'm looking to write data grouped by time (micro-batch time). What’s the best approach to achieve micro-batching in Flink? Let me know if you have any questions. thanks. Thanks.
[jira] [Created] (FLINK-36627) Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.
Hector Miuler Malpica Gallegos created FLINK-36627: -- Summary: Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8. Key: FLINK-36627 URL: https://issues.apache.org/jira/browse/FLINK-36627 Project: Flink Issue Type: Bug Reporter: Hector Miuler Malpica Gallegos I have error in read csv with charset ISO-8859, my error is the following: {{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not look like UTF-8_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder._nextUnquotedString(CsvDecoder.java:782)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.nextString(CsvDecoder.java:732)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:963)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:763)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:321)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:283)_{color}}} {{{color:#de350b} _at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.next(MappingIterator.java:199)_{color}}} {{{color:#de350b} _... 11 more_{color}}} {{My code is the following:}} {{{}{color:#0747a6}_val env = StreamExecutionEnvironment.createLocalEnvironment()_{color}{}}}{{{}{color:#0747a6}_val csvFormat = CsvReaderFormat.forPojo(Empresa::class.java)_{color}{}}} {{{color:#0747a6}_val csvSource = FileSource_{color}}} {{{color:#0747a6}_.forRecordStreamFormat(csvFormat, Path("/miuler/PadronRUC_202410.csv"))_{color}}} {{{color:#0747a6}_.build()_{color}}} {{val empresaStreamSource = env.fromSource(csvSource, WatermarkStrategy.noWatermarks(), "CSV Source")}} {{empresaStreamSource.print()}} {{env.execute("Load CSV")}} my dependencies: {{{color:#0747a6}val kotlinVersion = "1.20.0"{color}}} {{{color:#0747a6}// FLINK{color}}} {{{color:#0747a6}dependencies {{color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-shaded-jackson:2.15.3-19.0"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-core:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-runtime:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-runtime-web:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-clients:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-streaming-java:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-csv:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-connector-base:$kotlinVersion"){color}}} {{{color:#0747a6} implementation("org.apache.flink:flink-connector-files:$kotlinVersion"){color}}} {{{color:#0747a6}}{color}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
kafka-connector random WakeupException after enabling watermark alignment
Hello, in Flink 19, AWS managed Flink flink-connector-kafka:3.3.0-1.19 After i enable watermark alignment at KafkaSource, It starts throwing uncaught WakeupException. It happens: * On every checkpoint unless i disable offset committing: setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false") * randomly Stack trace is for both cases. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more Watermark alignment stops working after recovery. Checking the code, i see that long consumerPosition = consumer.position(tp); at line KafkaPartitionSplitReader.java:127 is the only call to consumer.position that is not wrapped on retryOnWakeup on the whole file ( there are a few calls in there ) Tested wrapping it and i am able to make my app work without any exception. I could make a PR, waiting for ASF Self-Service account. But i dont really understand what the race condition here and not able to reproduce on tests. Hints and help would be appreciated, Thanks
[jira] [Created] (FLINK-36624) Log JobID in SourceCoordinator
Piotr Nowojski created FLINK-36624: -- Summary: Log JobID in SourceCoordinator Key: FLINK-36624 URL: https://issues.apache.org/jira/browse/FLINK-36624 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Piotr Nowojski Currently log entries from the SourceCoordinator are not tagged with the JobID, which could be quite easily done. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Elasticsearch v3.1 release
Thanks Danny for the input, I figured out most of the steps require committer permissions, I will reach out offline for the next steps so thanks in advance for the assistance also. Best Regards Ahmed Hamdy On Mon, 14 Oct 2024 at 17:23, Danny Cranmer wrote: > Hey Ahmed, > > +1 for the release and you as release manager with PMC assistance. > > Thanks for opening this discussion, we never finished releasing the > Elasticsearch connector for Flink 1.19 [1], so we do not have an official > version for Flink 1.19/1.20 which is a problem. I am happy to assist you as > suggested, however I do not believe you will be able to do much without > committer permissions. But let's roll with this and see how it goes. > > Thanks, > Danny > > [1] https://issues.apache.org/jira/browse/FLINK-35131 > > On Mon, Oct 14, 2024 at 10:22 AM Ahmed Hamdy wrote: > > > Hi all, > > I want to kick off the discussion for elasticsearch connector release > v3.1. > > Our latest release v3.0.1 was more than a year ago[1]. Since then the > > connector has developed an important set of features like SinkV2 > > compatibility update[2] and most importantly Elasticsearch 8 support[3] > > which is of high demand. I believe it might be time to release V3.1. > > I am happy to drive the release with an assist from a PMC for > > privileged actions or even assist if a PMC is willing to lead. > > > > Let me know your thoughts. > > > > 1-https://lists.apache.org/thread/374078blmqgfvtt41pbbzr2df37k2nc0 > > 2-https://issues.apache.org/jira/browse/FLINK-34113 > > < > https://lists.apache.org/thread/374078blmqgfvtt41pbbzr2df37k2nc02-https://issues.apache.org/jira/browse/FLINK-34113 > > > > 3-https://issues.apache.org/jira/browse/FLINK-26088 > > > > > > Best Regards > > Ahmed Hamdy > > >
[ANNOUNCE] Apache Flink Kubernetes Operator 1.10.0 released
The Apache Flink community is very happy to announce the release of Apache Flink Kubernetes Operator 1.10.0 The Flink Kubernetes Operator allows users to manage their Apache Flink applications and their lifecycle through native k8s tooling like kubectl. Please check out the release blog post for an overview of the release: https://flink.apache.org/2024/10/25/apache-flink-kubernetes-operator-1.10.0-release-announcement The release is available for download at: https://flink.apache.org/downloads.html Maven artifacts for Flink Kubernetes Operator can be found at: https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator Official Docker image for Flink Kubernetes Operator applications can be found at: https://hub.docker.com/r/apache/flink-kubernetes-operator The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12354833 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, Matyas Orhidi
Re: [DISCUSS] Release HBase connector with partial JDK support
+1 for Ferenc as the release manager and the content. On Mon, Oct 28, 2024 at 5:03 PM Ferenc Csaky wrote: > Hi, > > Based on this discussion I would like to suggest to move on with > the originally planned release with the HBase connector 4.0, that > will support 1.18, and 1.19. > > I volunteer to be the release manager. > > Thanks, > Ferenc > > > > On Wednesday, October 23rd, 2024 at 13:19, Ferenc Csaky > wrote: > > > > > > > Hi Marton, Yanquan, > > > > Thank you for your responses! Regarding the points brought up to > > discuss: > > > > 1. Supporting 1.20 definitely makes sense, but since there is quite > > a big gap to work down here now, I am not sure it should be done in > > 1 step. As my understanding, the externalized connector dev model > > [1] do not explicitly forbid that, but AFAIK there were external > > connector release that supported 3 different Flink minor versions. > > > > In this case, I think technically would be possible, but IMO > > supporting 3 Flink verisons adds more complexity to maintain. So > > what I would suggest to release 4.0 with Flink 1.18 and 1.19 > > support, and after that there can be a 4.1 that supports 1.19 and > > 1.20. 4.0 will only have patch support, probably minimizing Flink > > version specific problems. > > > > 2. Flink 1.17 had no JDK17 support, so those Hadoop related > > problems should not play a role if something needs to be released > > that supports 1.17. But if connector 4.0 is released, 3.x versions > > will not get any new releases (even not patch), cause 1.17 is out > > of support already. > > > > Best, > > Ferenc > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development > > > > On Wednesday, 23 October 2024 at 05:19, Yanquan Lv decq12y...@gmail.com > wrote: > > > > > Hi Feri, > > > Thank you for bringing up this discussion. > > > I agree to release a version to bump the newer version of Flink with > partial JDK versions support. I have two points to be discussed. > > > 1. I have heard many inquiries about supporting higher versions of > Flink in Slack, Chinese communities, etc., and a large part of them hope to > use it on Flink1.20. Should we consider explicitly supporting Flink1.20 on > version 4.0, otherwise users will have to wait for a relatively long > release cycle. > > > 2. Currently supporting Flink1.17 is difficult, but are there any > plans to support it in the future? Do we need to wait for Hadoop related > repositories to release specific versions. > > > > > > > 2024年10月22日 19:44,Ferenc Csaky ferenc.cs...@pm.me.INVALID 写道: > > > > > > > > Hello devs, > > > > > > > > I would like to start a discussion regarding a new HBase connector > release. Currently, the > > > > externalized HBase connector has only 1 release: 3.0.0 that supports > Flink 1.16 and 1.17. > > > > > > > > By stating this, it is obvious that the connector is already > outdated for quite a while. There > > > > is a long-lasting ticket [1] to release a newer HBase version, which > also contains a major version > > > > bump as HBase 1.x support is removed, but covering JDK17 with the > current Hadoop related > > > > dependency mix is impossible, because there are parts that do not > play well with it when you > > > > try to compile with JDK17+, and there are no runtime tests as well. > > > > > > > > Solving that properly will require to bump the HBase, Hadoop, and > Zookeeper versions as well, > > > > but that will require more digging and some refactoring, at least on > the test side. > > > > > > > > To cut some corners and move forward I think at this point it would > make sense to release > > > > version 4.0 that supports Flink 1.18 and 1.19 but only on top of > JDK8 and JDK11 just to close the > > > > current gap a bit. I am thinking about including the limitations in > the java compat docs [2] to > > > > highlight users. > > > > > > > > WDYT? > > > > > > > > Best, > > > > Ferenc > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-35136 > > > > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/java_compatibility/ >
[jira] [Created] (FLINK-36623) Improve logging in DefaultStateTransitionManager
Roman Khachatryan created FLINK-36623: - Summary: Improve logging in DefaultStateTransitionManager Key: FLINK-36623 URL: https://issues.apache.org/jira/browse/FLINK-36623 Project: Flink Issue Type: Improvement Reporter: Roman Khachatryan Assignee: Zdenek Tison Fix For: 1.20.1 When the job transitions from one state to another, e.g. restarts when new slots are available; it's not visible in the logs unless log.level is debug. Therefore, it'd make sense to: # Change log level from DEBUG to INFO # Log job ID when such transition happens -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+
Eduardo Breijo created FLINK-36626: -- Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+ Key: FLINK-36626 URL: https://issues.apache.org/jira/browse/FLINK-36626 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.20.0, 1.18.1 Environment: AWS Managed Apache Flink Reporter: Eduardo Breijo Attachments: Flink-SQL-query.txt There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL joins that I haven't been able to pin point and is causing the query to output different results. Flink SQL Query: ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
zjjiang created FLINK-36620: --- Summary: Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats Key: FLINK-36620 URL: https://issues.apache.org/jira/browse/FLINK-36620 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.1, cdc-3.2.0, cdc-3.1.0 Reporter: zjjiang Fix For: cdc-3.3.0 Currently, most of FlinkCDC's command line arguments are supported in the format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink- home, which only supports space spacing. Users who use the "--flink-home=$FLINK_HOME" format on the command line (trying to be consistent with the other = spacing arguments) will not be able to set flink home correctly. In particular, when there is an environment variable $FLINK_HOME and you want to override it by setting --flink-home=/path/to/new/flink/home, you will find that it does not work. We would like to support the flink-home parameter in both --flink-home $FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid formatting differences and runtime exceptions when using command line arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[DISCUSS] Optimize the processing logic of TopNFunction to avoid the expiration of downstream operator states when state TTL is set
Hello devs, I would like to initiate a discussion about the Flink TopNFunction. In our experience, we encountered the following issue while implementing the Top3Function. During this process, if no input has a timestamp smaller than the RowData currently held by the Top3Function, that operator will not emit any messages related to this product. As a result, some state in the join operator remains unupdated, and when the state expires, certain states are cleared. From the user's perspective, even though data about a particular product is continuously flowing in, the join may fail to find a matching record. The issue arises because the TopNFunction processes data in a manner that it will not take any action if it encounters input RowData that exceeds the specified N. This situation can occur in both retract streams and append-only streams. Considering that in the aggregation function, if the state retention time is set and the function output remains unchanged, result will still be emitted. This can lead to a triggering of the reset of the state retention time in downstream operator. Proposed Solutions To address this issue, we propose a couple of solutions: 1. Emit Changed Dataset on Each Incoming Record: If we can confirm that the join key of the downstream join operator and the partition key of the TopN operator are consistent, we can emit only the modified RowData. 2. Full Emission on Each Incoming Record: If the keys are inconsistent we could emit the full dataset each time a new record is received. However, this approach would place a heavier burden on the system. These two methods may prevent the downstream operator's state from expiring. WDYT? Best, Yang Li https://docs.google.com/document/d/1OQXE6Pf6pVLyX1cVmlwvZ1GnX5rvRpIyVttiEhy8BRs/edit?tab=t.edj7xjbz07e4
Re: [DISCUSS] FLIP-480: Support to deploy script in application mode
I also have some questions: 1. Whether all SQL commands such as DDL & DML & SELECT are supported. 2. How to determine JobID and return JobID & ClusterId from the application cluster 3. How to dynamically download the JAR specified by the user when submitting the sql script, and whether it is possible to specify a local jar? Best, Ron Ron Liu 于2024年10月30日周三 10:57写道: > Hi, Shengkai > > Thanks for initializing this FLIP, supports application mode for SQL > Gateway is a great job. The FLIP design looks good to me. > > > I've read the FLIP-316 which mentions supporting deploying SQL job to > application clusters for interactive or non-interactive gateway mode. > But I noticed that you say this FLIP focuses on supporting deploy sql > scripts to the application cluster, does it mean that it only supports > non-interactive gateway mode? > > > Best, > Ron > > Shengkai Fang 于2024年10月29日周二 14:46写道: > >> Hi, HongShun. Thanks a lot for your response! >> >> > I wonder what is the scope of this FLIP, only aim for k8s, not including >> yarn? >> >> This FLIP also works for the yarn-application mode. But the yarn >> deployment >> doesn't support to ship the artifacts into the remote side. Please >> correct me if I'm wrong. >> >> > When talking about "custom", you mean these also will have some builtin >> implementations? If it exists, how to get their location in dfs based on >> SQL? Depending on some configuration or just convention over >> configuration. >> >> I think the builtin artfacts are catalogs/connectors/udf that are located >> at the $FLINK_HOME/lib directory. >> >> > Is the FLIP-316 still in need later? >> >> Yes. I think FLIP-316 is a great idea to use json plan to run the SQL Job >> and it brings great convenience to users to submit job in application mode >> in interactive mode. >> >> Best, >> Shengkai >> >> >> >> >> Shengkai Fang 于2024年10月29日周二 14:25写道: >> >> > Hi, Feng. >> > >> > Thanks for your response. >> > >> > > Will FLIP-316 merge into Flink 2.0 too ? >> > >> > I don't have time to finish the FLIP-316. So it depends on whether >> anyone >> > else can help to continue the discussion. >> > >> > > Will SqlDriver use the same one? >> > >> > Yes. We should reuse the same driver. I think the driver is the >> entrypoint >> > for the SQL script. >> > >> > >> > > The details SQL-client deploy SQL File to Cluster may not be very >> clear ? >> > >> > I have pushed a PoC branch about the change. Please take a look at >> > https://github.com/fsk119/flink/tree/application-mode (I don't test it >> > yet). At the mean time, I add a new method in the SqlGatewayService to >> > describe the change. >> > >> > Best, >> > Shengkai >> > >> > >> > >> > Feng Jin 于2024年10月25日周五 21:15写道: >> > >> >> Hi, Shenkai >> >> >> >> Thank you for initiating this FLIP, I understand that supporting >> >> application mode for SQL gateway is very important. There are two small >> >> issues. >> >> >> >> > FLIP-480 is different from FLIP-316 >> >> >> >> >> >>1. Will FLIP-316 merge into Flink 2.0 too ? >> >> >> >> >> >>2. Will SqlDriver use the same one? >> >> >> >> >> >> The details SQL-client deploy SQL File to Cluster may not be very >> clear ? >> >> >> >> I guess that some modifications need to be made to the client here, >> >> when deploying scripts in application mode, we need to call the newly >> >> added >> >> interface of the gateway service. >> >> >> >> >> >> Best, >> >> Feng >> >> >> >> >> >> On Thu, Oct 24, 2024 at 4:27 PM Shengkai Fang >> wrote: >> >> >> >> > Hi, everyone. >> >> > >> >> > I'd like to initiate a discussion about FLIP-480: Support to deploy >> >> script >> >> > in application mode[1]. >> >> > >> >> > FLIP-480 supports to solve the problem that table program can not >> run in >> >> > application mode. Comparing to FLIP-316[2], FLIP-480 tries to compile >> >> the >> >> > script in the JM side, which is free from the limitation of the JSON >> >> > plan(JSON plan only serialize the identifier for temporary object) . >> >> > >> >> > For more details, please refer to the FLIP[1]. Welcome any feedback >> and >> >> > suggestions for improvement. >> >> > >> >> > Best, >> >> > Shengkai >> >> > >> >> > [1] >> >> > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode >> >> > [2] >> >> > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Support+application+mode+for+SQL+Gateway?src=contextnavpagetreemode >> >> > >> >> >> > >> >
[DISCUSS][FLINK-36547] Add option to retain `RowKind` sematics for cdc formats
Hi everyone, As official docs said, `RowKind` semantics have been changed after encode: -U -> -D, +D -> +I. In fact, we also have a demand to make it consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. I have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in business. please see details in jira [1]. Looking forward to your feedback :) [1] https://issues.apache.org/jira/browse/FLINK-36547 Best, Yubin
Re: [DISCUSS] FLIP-480: Support to deploy script in application mode
Hi, Shengkai Thanks for initializing this FLIP, supports application mode for SQL Gateway is a great job. The FLIP design looks good to me. I've read the FLIP-316 which mentions supporting deploying SQL job to application clusters for interactive or non-interactive gateway mode. But I noticed that you say this FLIP focuses on supporting deploy sql scripts to the application cluster, does it mean that it only supports non-interactive gateway mode? Best, Ron Shengkai Fang 于2024年10月29日周二 14:46写道: > Hi, HongShun. Thanks a lot for your response! > > > I wonder what is the scope of this FLIP, only aim for k8s, not including > yarn? > > This FLIP also works for the yarn-application mode. But the yarn deployment > doesn't support to ship the artifacts into the remote side. Please > correct me if I'm wrong. > > > When talking about "custom", you mean these also will have some builtin > implementations? If it exists, how to get their location in dfs based on > SQL? Depending on some configuration or just convention over configuration. > > I think the builtin artfacts are catalogs/connectors/udf that are located > at the $FLINK_HOME/lib directory. > > > Is the FLIP-316 still in need later? > > Yes. I think FLIP-316 is a great idea to use json plan to run the SQL Job > and it brings great convenience to users to submit job in application mode > in interactive mode. > > Best, > Shengkai > > > > > Shengkai Fang 于2024年10月29日周二 14:25写道: > > > Hi, Feng. > > > > Thanks for your response. > > > > > Will FLIP-316 merge into Flink 2.0 too ? > > > > I don't have time to finish the FLIP-316. So it depends on whether anyone > > else can help to continue the discussion. > > > > > Will SqlDriver use the same one? > > > > Yes. We should reuse the same driver. I think the driver is the > entrypoint > > for the SQL script. > > > > > > > The details SQL-client deploy SQL File to Cluster may not be very > clear ? > > > > I have pushed a PoC branch about the change. Please take a look at > > https://github.com/fsk119/flink/tree/application-mode (I don't test it > > yet). At the mean time, I add a new method in the SqlGatewayService to > > describe the change. > > > > Best, > > Shengkai > > > > > > > > Feng Jin 于2024年10月25日周五 21:15写道: > > > >> Hi, Shenkai > >> > >> Thank you for initiating this FLIP, I understand that supporting > >> application mode for SQL gateway is very important. There are two small > >> issues. > >> > >> > FLIP-480 is different from FLIP-316 > >> > >> > >>1. Will FLIP-316 merge into Flink 2.0 too ? > >> > >> > >>2. Will SqlDriver use the same one? > >> > >> > >> The details SQL-client deploy SQL File to Cluster may not be very clear > ? > >> > >> I guess that some modifications need to be made to the client here, > >> when deploying scripts in application mode, we need to call the newly > >> added > >> interface of the gateway service. > >> > >> > >> Best, > >> Feng > >> > >> > >> On Thu, Oct 24, 2024 at 4:27 PM Shengkai Fang > wrote: > >> > >> > Hi, everyone. > >> > > >> > I'd like to initiate a discussion about FLIP-480: Support to deploy > >> script > >> > in application mode[1]. > >> > > >> > FLIP-480 supports to solve the problem that table program can not run > in > >> > application mode. Comparing to FLIP-316[2], FLIP-480 tries to compile > >> the > >> > script in the JM side, which is free from the limitation of the JSON > >> > plan(JSON plan only serialize the identifier for temporary object) . > >> > > >> > For more details, please refer to the FLIP[1]. Welcome any feedback > and > >> > suggestions for improvement. > >> > > >> > Best, > >> > Shengkai > >> > > >> > [1] > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode > >> > [2] > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Support+application+mode+for+SQL+Gateway?src=contextnavpagetreemode > >> > > >> > > >
[jira] [Created] (FLINK-36628) OpenTelemetryTestBase.eventuallyConsumeJson failed on AZP
Weijie Guo created FLINK-36628: -- Summary: OpenTelemetryTestBase.eventuallyConsumeJson failed on AZP Key: FLINK-36628 URL: https://issues.apache.org/jira/browse/FLINK-36628 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 2.0.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-480: Support to deploy script in application mode
Hi Shengkai, Thank you for the timely updates and replies: 1. I have been studying your POC code, introducing OperationExecutor in SqlRunner is a great idea, which can maximize compatibility with SqlClient. Especially some SQL statements cannot be executed directly in the Table Environment, such as those related to Materialized Tables statements. 2. I also have a minor question about the return value of /sessions/${session-id}/scripts. Here we directly return ClusterID as toString. But if only clusterID is available, it may not be very convenient to connect to this application later on. It would be better to have: "kubernetes.cluster-id": "my-first-application-cluster" or: "yarn.application.id": "application-xxx" . About this, I am currently working on FLIP-479.[1] Which will support converting ClusterID into Map Options. What do you think about it ? [1]. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=327977476 Best, Feng On Tue, Oct 29, 2024 at 2:46 PM Shengkai Fang wrote: > Hi, HongShun. Thanks a lot for your response! > > > I wonder what is the scope of this FLIP, only aim for k8s, not including > yarn? > > This FLIP also works for the yarn-application mode. But the yarn deployment > doesn't support to ship the artifacts into the remote side. Please > correct me if I'm wrong. > > > When talking about "custom", you mean these also will have some builtin > implementations? If it exists, how to get their location in dfs based on > SQL? Depending on some configuration or just convention over configuration. > > I think the builtin artfacts are catalogs/connectors/udf that are located > at the $FLINK_HOME/lib directory. > > > Is the FLIP-316 still in need later? > > Yes. I think FLIP-316 is a great idea to use json plan to run the SQL Job > and it brings great convenience to users to submit job in application mode > in interactive mode. > > Best, > Shengkai > > > > > Shengkai Fang 于2024年10月29日周二 14:25写道: > > > Hi, Feng. > > > > Thanks for your response. > > > > > Will FLIP-316 merge into Flink 2.0 too ? > > > > I don't have time to finish the FLIP-316. So it depends on whether anyone > > else can help to continue the discussion. > > > > > Will SqlDriver use the same one? > > > > Yes. We should reuse the same driver. I think the driver is the > entrypoint > > for the SQL script. > > > > > > > The details SQL-client deploy SQL File to Cluster may not be very > clear ? > > > > I have pushed a PoC branch about the change. Please take a look at > > https://github.com/fsk119/flink/tree/application-mode (I don't test it > > yet). At the mean time, I add a new method in the SqlGatewayService to > > describe the change. > > > > Best, > > Shengkai > > > > > > > > Feng Jin 于2024年10月25日周五 21:15写道: > > > >> Hi, Shenkai > >> > >> Thank you for initiating this FLIP, I understand that supporting > >> application mode for SQL gateway is very important. There are two small > >> issues. > >> > >> > FLIP-480 is different from FLIP-316 > >> > >> > >>1. Will FLIP-316 merge into Flink 2.0 too ? > >> > >> > >>2. Will SqlDriver use the same one? > >> > >> > >> The details SQL-client deploy SQL File to Cluster may not be very clear > ? > >> > >> I guess that some modifications need to be made to the client here, > >> when deploying scripts in application mode, we need to call the newly > >> added > >> interface of the gateway service. > >> > >> > >> Best, > >> Feng > >> > >> > >> On Thu, Oct 24, 2024 at 4:27 PM Shengkai Fang > wrote: > >> > >> > Hi, everyone. > >> > > >> > I'd like to initiate a discussion about FLIP-480: Support to deploy > >> script > >> > in application mode[1]. > >> > > >> > FLIP-480 supports to solve the problem that table program can not run > in > >> > application mode. Comparing to FLIP-316[2], FLIP-480 tries to compile > >> the > >> > script in the JM side, which is free from the limitation of the JSON > >> > plan(JSON plan only serialize the identifier for temporary object) . > >> > > >> > For more details, please refer to the FLIP[1]. Welcome any feedback > and > >> > suggestions for improvement. > >> > > >> > Best, > >> > Shengkai > >> > > >> > [1] > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode > >> > [2] > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Support+application+mode+for+SQL+Gateway?src=contextnavpagetreemode > >> > > >> > > >
Re: Micro batching with flink
Can you share more details on what do you mean by micro-batching? Can you explain with an example to understand it better? Thanks Venkat On Tue, Oct 29, 2024, 1:22 PM Anil Dasari wrote: > Hello team, > I apologize for reaching out on the dev mailing list. I'm working on > implementing micro-batching with near real-time processing. > I've seen similar questions in the Flink Slack channel and user mailing > list, but there hasn't been much discussion or feedback. Here are the > options I've explored: > 1. Windowing: This approach looked promising, but the flushing mechanism > requires record-level information checks, as window data isn't accessible > throughout the pipeline. > 2. Window + Trigger: This method buffers events until the trigger interval > is reached, which affects real-time processing; events are only processed > when the trigger occurs. > 3. Processing Time: The processing time is specific to each file writer, > resulting in inconsistencies across different task managers. > 4. Watermark: There’s no global watermark; it's specific to each source > task, and the initial watermark information (before the first watermark > event) isn't epoch-based. > I'm looking to write data grouped by time (micro-batch time). What’s the > best approach to achieve micro-batching in Flink? > Let me know if you have any questions. thanks. > Thanks. > >
Re: Micro batching with flink
Hi Venkat,Thanks for the reply. Microbatching is a data processing technique where small batches of data are collected and processed together at regular intervals.However, I'm aiming to avoid traditional micro-batch processing by tagging records within a time window as a batch, allowing for near-real-time data processing. I’m currently exploring Flink for the following use case: 1. Group data by a time window and write it to S3 under the appropriate prefix.2. Generate metrics for the microbatch and, if needed, store them in S3.3. Send metrics to an external system to notify that Step 1 has been completed.If any part of the process fails, the entire microbatch step should be rolled back. Planning to implement two phase commit sink for Step 2 and 3. The primary challenge is tagging the record set with epoch time across all tasks within a window to utilize it in the sink process for creating committable splits, such as the processing time in the flink file sink. ThanksOn Tuesday, October 29, 2024 at 09:40:40 PM PDT, Venkatakrishnan Sowrirajan wrote: Can you share more details on what do you mean by micro-batching? Can you explain with an example to understand it better? Thanks Venkat On Tue, Oct 29, 2024, 1:22 PM Anil Dasari wrote: > Hello team, > I apologize for reaching out on the dev mailing list. I'm working on > implementing micro-batching with near real-time processing. > I've seen similar questions in the Flink Slack channel and user mailing > list, but there hasn't been much discussion or feedback. Here are the > options I've explored: > 1. Windowing: This approach looked promising, but the flushing mechanism > requires record-level information checks, as window data isn't accessible > throughout the pipeline. > 2. Window + Trigger: This method buffers events until the trigger interval > is reached, which affects real-time processing; events are only processed > when the trigger occurs. > 3. Processing Time: The processing time is specific to each file writer, > resulting in inconsistencies across different task managers. > 4. Watermark: There’s no global watermark; it's specific to each source > task, and the initial watermark information (before the first watermark > event) isn't epoch-based. > I'm looking to write data grouped by time (micro-batch time). What’s the > best approach to achieve micro-batching in Flink? > Let me know if you have any questions. thanks. > Thanks. > >