[jira] [Created] (FLINK-36622) Remove the dependency of StateBenchmark on RocksDBKeyedStateBackend APIs.

2024-10-29 Thread Han Yin (Jira)
Han Yin created FLINK-36622:
---

 Summary: Remove the dependency of StateBenchmark on 
RocksDBKeyedStateBackend APIs.
 Key: FLINK-36622
 URL: https://issues.apache.org/jira/browse/FLINK-36622
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 2.0-preview
Reporter: Han Yin
 Fix For: 2.0.0


Currently, flink-benchmarks relies on non-public APIs in Flink. For example, in 
{_}+StateBackendBenchmarkUtils.java+{_}, the function _+compactState+_ takes 
RocksDBKeyedStateBackend as its first argument.

This requires explicit type conversion in flink-benchmark(from 
+_KeyedStateBackend_+ to {+}_RocksDBKeyedStateBackend_{+}). Moreover, this 
means that once the signature of +_RocksDBKeyedStateBackend_+ changes, we need 
to modify flink-benchmark correspondingly.

Therefore, we should avoid exposing non-public APIs in 
{_}+StateBackendBenchmarkUtils+{_}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36621) Build failure: StatefulSink not found

2024-10-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-36621:
--

 Summary: Build failure: StatefulSink not found
 Key: FLINK-36621
 URL: https://issues.apache.org/jira/browse/FLINK-36621
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 2.0-preview
Reporter: Piotr Nowojski


Locally in the IntelliJ building Flink fails for me due to:

{code:java}
flink-apache/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:72:42
java: cannot access org.apache.flink.api.connector.sink2.StatefulSink
  class file for org.apache.flink.api.connector.sink2.StatefulSink not found
{code}
flink-examples depend on flink-connector-kafka in version 3.0.0-17 that in 
turns is still referring to the StatefulSink:

{code:java}
public class KafkaSink implements StatefulSink, 
TwoPhaseCommittingSink (...)
{code}

which has been deleted in FLINK-36245. I think maven builds might be working 
due to some luck and differences between how IntelliJ and Maven are 
interpreting pom files and dealing with the dependencies.

CC [~kunni] [~renqs] [~Leonard]




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36625) Add helper classes for Lineage integration in connectors

2024-10-29 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-36625:
-

 Summary: Add helper classes for Lineage integration in connectors
 Key: FLINK-36625
 URL: https://issues.apache.org/jira/browse/FLINK-36625
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Zhenqiu Huang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Micro batching with flink

2024-10-29 Thread Anil Dasari
Hello team,
I apologize for reaching out on the dev mailing list. I'm working on 
implementing micro-batching with near real-time processing.
I've seen similar questions in the Flink Slack channel and user mailing list, 
but there hasn't been much discussion or feedback. Here are the options I've 
explored:
1. Windowing: This approach looked promising, but the flushing mechanism 
requires record-level information checks, as window data isn't accessible 
throughout the pipeline.
2. Window + Trigger: This method buffers events until the trigger interval is 
reached, which affects real-time processing; events are only processed when the 
trigger occurs.
3. Processing Time: The processing time is specific to each file writer, 
resulting in inconsistencies across different task managers.
4. Watermark: There’s no global watermark; it's specific to each source task, 
and the initial watermark information (before the first watermark event) isn't 
epoch-based.
I'm looking to write data grouped by time (micro-batch time). What’s the best 
approach to achieve micro-batching in Flink?
Let me know if you have any questions. thanks.
Thanks.



[jira] [Created] (FLINK-36627) Failure to process a CSV file in Flink due to a character encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.

2024-10-29 Thread Hector Miuler Malpica Gallegos (Jira)
Hector Miuler Malpica Gallegos created FLINK-36627:
--

 Summary: Failure to process a CSV file in Flink due to a character 
encoding mismatch: the file is in ISO-8859 and the application expects UTF-8.
 Key: FLINK-36627
 URL: https://issues.apache.org/jira/browse/FLINK-36627
 Project: Flink
  Issue Type: Bug
Reporter: Hector Miuler Malpica Gallegos


I have error in read csv with charset ISO-8859, my error is the following:



{{{color:#de350b}_Caused by: java.io.CharConversionException: Invalid UTF-8 
middle byte 0x41 (at char #1247, byte #1246): check content encoding, does not 
look like UTF-8_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportInvalidOther(UTF8Reader.java:520)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.reportDeferredInvalid(UTF8Reader.java:531)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.UTF8Reader.read(UTF8Reader.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.loadMore(CsvDecoder.java:458)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder._nextUnquotedString(CsvDecoder.java:782)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.nextString(CsvDecoder.java:732)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:963)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:763)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:321)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:283)_{color}}}
{{{color:#de350b}    _at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.next(MappingIterator.java:199)_{color}}}
{{{color:#de350b}    _... 11 more_{color}}}



{{My code is the following:}}

{{{}{color:#0747a6}_val env = 
StreamExecutionEnvironment.createLocalEnvironment()_{color}{}}}{{{}{color:#0747a6}_val
 csvFormat = CsvReaderFormat.forPojo(Empresa::class.java)_{color}{}}}
{{{color:#0747a6}_val csvSource = FileSource_{color}}}
{{{color:#0747a6}_.forRecordStreamFormat(csvFormat, 
Path("/miuler/PadronRUC_202410.csv"))_{color}}}
{{{color:#0747a6}_.build()_{color}}}
{{val empresaStreamSource = env.fromSource(csvSource, 
WatermarkStrategy.noWatermarks(), "CSV Source")}}
{{empresaStreamSource.print()}}
{{env.execute("Load CSV")}}


my dependencies:



{{{color:#0747a6}val kotlinVersion = "1.20.0"{color}}}
{{{color:#0747a6}// FLINK{color}}}
{{{color:#0747a6}dependencies {{color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-shaded-jackson:2.15.3-19.0"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-core:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-runtime:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-runtime-web:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-clients:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-streaming-java:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-csv:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-connector-base:$kotlinVersion"){color}}}
{{{color:#0747a6} 
implementation("org.apache.flink:flink-connector-files:$kotlinVersion"){color}}}
{{{color:#0747a6}}{color}}}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


kafka-connector random WakeupException after enabling watermark alignment

2024-10-29 Thread Alberto Lago
Hello,
in Flink 19, AWS managed Flink

flink-connector-kafka:3.3.0-1.19

After i enable watermark alignment at KafkaSource, It starts throwing
uncaught WakeupException.
It happens:
* On every checkpoint unless i disable offset committing:
   setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
* randomly

Stack trace is for both cases.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.WakeupException
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more

Watermark alignment stops working after recovery.

Checking the code, i see that

long consumerPosition = consumer.position(tp);

at line KafkaPartitionSplitReader.java:127 is the only call to
consumer.position that is not wrapped on retryOnWakeup on the whole
file ( there are a few calls in there )

Tested wrapping it and i am able to make my app work without any exception.
I could make a PR, waiting for ASF Self-Service account.
But i dont really understand what the race condition here and not able
to reproduce on tests.

Hints and help would be appreciated,

Thanks


[jira] [Created] (FLINK-36624) Log JobID in SourceCoordinator

2024-10-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-36624:
--

 Summary: Log JobID in SourceCoordinator
 Key: FLINK-36624
 URL: https://issues.apache.org/jira/browse/FLINK-36624
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Piotr Nowojski


Currently log entries from the SourceCoordinator are not tagged with the JobID, 
which could be quite easily done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Elasticsearch v3.1 release

2024-10-29 Thread Ahmed Hamdy
Thanks Danny for the input,
I figured out most of the steps require committer permissions, I will reach
out offline for the next steps so thanks in advance for the assistance also.

Best Regards
Ahmed Hamdy


On Mon, 14 Oct 2024 at 17:23, Danny Cranmer  wrote:

> Hey Ahmed,
>
> +1 for the release and you as release manager with PMC assistance.
>
> Thanks for opening this discussion, we never finished releasing the
> Elasticsearch connector for Flink 1.19 [1], so we do not have an official
> version for Flink 1.19/1.20 which is a problem. I am happy to assist you as
> suggested, however I do not believe you will be able to do much without
> committer permissions. But let's roll with this and see how it goes.
>
> Thanks,
> Danny
>
> [1] https://issues.apache.org/jira/browse/FLINK-35131
>
> On Mon, Oct 14, 2024 at 10:22 AM Ahmed Hamdy  wrote:
>
> > Hi all,
> > I want to kick off the discussion for elasticsearch connector release
> v3.1.
> > Our latest release v3.0.1 was more than a year ago[1]. Since then the
> > connector has developed an important set of features like SinkV2
> > compatibility update[2] and most importantly Elasticsearch 8 support[3]
> > which is of high demand. I believe it might be time to release V3.1.
> > I am happy to drive the release with an assist from a PMC for
> > privileged actions or even assist if a PMC is willing to lead.
> >
> > Let me know your thoughts.
> >
> > 1-https://lists.apache.org/thread/374078blmqgfvtt41pbbzr2df37k2nc0
> > 2-https://issues.apache.org/jira/browse/FLINK-34113
> > <
> https://lists.apache.org/thread/374078blmqgfvtt41pbbzr2df37k2nc02-https://issues.apache.org/jira/browse/FLINK-34113
> >
> > 3-https://issues.apache.org/jira/browse/FLINK-26088
> >
> >
> > Best Regards
> > Ahmed Hamdy
> >
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.10.0 released

2024-10-29 Thread Őrhidi Mátyás
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.10.0

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/10/25/apache-flink-kubernetes-operator-1.10.0-release-announcement

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12354833

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Matyas Orhidi


Re: [DISCUSS] Release HBase connector with partial JDK support

2024-10-29 Thread Márton Balassi
+1 for Ferenc as the release manager and the content.

On Mon, Oct 28, 2024 at 5:03 PM Ferenc Csaky 
wrote:

> Hi,
>
> Based on this discussion I would like to suggest to move on with
> the originally planned release with the HBase connector 4.0, that
> will support 1.18, and 1.19.
>
> I volunteer to be the release manager.
>
> Thanks,
> Ferenc
>
>
>
> On Wednesday, October 23rd, 2024 at 13:19, Ferenc Csaky
>  wrote:
>
> >
> >
> > Hi Marton, Yanquan,
> >
> > Thank you for your responses! Regarding the points brought up to
> > discuss:
> >
> > 1. Supporting 1.20 definitely makes sense, but since there is quite
> > a big gap to work down here now, I am not sure it should be done in
> > 1 step. As my understanding, the externalized connector dev model
> > [1] do not explicitly forbid that, but AFAIK there were external
> > connector release that supported 3 different Flink minor versions.
> >
> > In this case, I think technically would be possible, but IMO
> > supporting 3 Flink verisons adds more complexity to maintain. So
> > what I would suggest to release 4.0 with Flink 1.18 and 1.19
> > support, and after that there can be a 4.1 that supports 1.19 and
> > 1.20. 4.0 will only have patch support, probably minimizing Flink
> > version specific problems.
> >
> > 2. Flink 1.17 had no JDK17 support, so those Hadoop related
> > problems should not play a role if something needs to be released
> > that supports 1.17. But if connector 4.0 is released, 3.x versions
> > will not get any new releases (even not patch), cause 1.17 is out
> > of support already.
> >
> > Best,
> > Ferenc
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
> >
> > On Wednesday, 23 October 2024 at 05:19, Yanquan Lv decq12y...@gmail.com
> wrote:
> >
> > > Hi Feri,
> > > Thank you for bringing up this discussion.
> > > I agree to release a version to bump the newer version of Flink with
> partial JDK versions support. I have two points to be discussed.
> > > 1. I have heard many inquiries about supporting higher versions of
> Flink in Slack, Chinese communities, etc., and a large part of them hope to
> use it on Flink1.20. Should we consider explicitly supporting Flink1.20 on
> version 4.0, otherwise users will have to wait for a relatively long
> release cycle.
> > > 2. Currently supporting Flink1.17 is difficult, but are there any
> plans to support it in the future? Do we need to wait for Hadoop related
> repositories to release specific versions.
> > >
> > > > 2024年10月22日 19:44,Ferenc Csaky ferenc.cs...@pm.me.INVALID 写道:
> > > >
> > > > Hello devs,
> > > >
> > > > I would like to start a discussion regarding a new HBase connector
> release. Currently, the
> > > > externalized HBase connector has only 1 release: 3.0.0 that supports
> Flink 1.16 and 1.17.
> > > >
> > > > By stating this, it is obvious that the connector is already
> outdated for quite a while. There
> > > > is a long-lasting ticket [1] to release a newer HBase version, which
> also contains a major version
> > > > bump as HBase 1.x support is removed, but covering JDK17 with the
> current Hadoop related
> > > > dependency mix is impossible, because there are parts that do not
> play well with it when you
> > > > try to compile with JDK17+, and there are no runtime tests as well.
> > > >
> > > > Solving that properly will require to bump the HBase, Hadoop, and
> Zookeeper versions as well,
> > > > but that will require more digging and some refactoring, at least on
> the test side.
> > > >
> > > > To cut some corners and move forward I think at this point it would
> make sense to release
> > > > version 4.0 that supports Flink 1.18 and 1.19 but only on top of
> JDK8 and JDK11 just to close the
> > > > current gap a bit. I am thinking about including the limitations in
> the java compat docs [2] to
> > > > highlight users.
> > > >
> > > > WDYT?
> > > >
> > > > Best,
> > > > Ferenc
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-35136
> > > > [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/java_compatibility/
>


[jira] [Created] (FLINK-36623) Improve logging in DefaultStateTransitionManager

2024-10-29 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-36623:
-

 Summary: Improve logging in DefaultStateTransitionManager
 Key: FLINK-36623
 URL: https://issues.apache.org/jira/browse/FLINK-36623
 Project: Flink
  Issue Type: Improvement
Reporter: Roman Khachatryan
Assignee: Zdenek Tison
 Fix For: 1.20.1


When the job transitions from one state to another, e.g. restarts when new 
slots are available; it's not visible in the logs unless log.level is debug.

Therefore, it'd make sense to:
 # Change log level from DEBUG to INFO
 # Log job ID when such transition happens



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36626) Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+

2024-10-29 Thread Eduardo Breijo (Jira)
Eduardo Breijo created FLINK-36626:
--

 Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink 
1.18+
 Key: FLINK-36626
 URL: https://issues.apache.org/jira/browse/FLINK-36626
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.20.0, 1.18.1
 Environment: AWS Managed Apache Flink 
Reporter: Eduardo Breijo
 Attachments: Flink-SQL-query.txt

There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is 
causing the query to output different results.

Flink SQL Query:

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36620) Add support for the flink-home parameter to be set in both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats

2024-10-29 Thread zjjiang (Jira)
zjjiang created FLINK-36620:
---

 Summary: Add support for the flink-home parameter to be set in 
both “--flink-home $FLINK_HOME” and “--flink-home=$FLINK_HOME” formats
 Key: FLINK-36620
 URL: https://issues.apache.org/jira/browse/FLINK-36620
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1, cdc-3.2.0, cdc-3.1.0
Reporter: zjjiang
 Fix For: cdc-3.3.0


Currently, most of FlinkCDC's command line arguments are supported in the 
format "--$KEY $VALUE" or "--$KEY=$VALUE", e.g. --jar, but, except for flink- 
home, which only supports space spacing. Users who use the 
"--flink-home=$FLINK_HOME" format on the command line (trying to be consistent 
with the other = spacing arguments) will not be able to set flink home 
correctly.

In particular, when there is an environment variable $FLINK_HOME and you want 
to override it by setting --flink-home=/path/to/new/flink/home, you will find 
that it does not work.

We would like to support the flink-home parameter in both --flink-home 
$FLINK_HOME and --flink-home=$FLINK_HOME formats, so that users can avoid 
formatting differences and runtime exceptions when using command line arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] Optimize the processing logic of TopNFunction to avoid the expiration of downstream operator states when state TTL is set

2024-10-29 Thread 李阳
Hello devs,

I would like to initiate a discussion about the Flink TopNFunction. In our
experience, we encountered the following issue while implementing the
Top3Function.

During this process, if no input has a timestamp smaller than the RowData
currently held by the Top3Function, that operator will not emit any
messages related to this product. As a result, some state in the join
operator remains unupdated, and when the state expires, certain states are
cleared. From the user's perspective, even though data about a particular
product is continuously flowing in, the join may fail to find a matching
record.

The issue arises because the TopNFunction processes data in a manner that
it will not take any action if it encounters input RowData that exceeds the
specified N. This situation can occur in both retract streams and
append-only streams.

Considering that in the aggregation function, if the state retention time
is set and the function output remains unchanged, result will still be
emitted. This can lead to a triggering of the reset of the state retention
time in downstream operator.
Proposed Solutions

To address this issue, we propose a couple of solutions:

   1.

   Emit Changed Dataset on Each Incoming Record: If we can confirm that the
   join key of the downstream join operator and the partition key of the TopN
   operator are consistent, we can emit only the modified RowData.
   2.

Full Emission on Each Incoming Record: If the keys are inconsistent we
   could emit the full dataset each time a new record is received. However,
   this approach would place a heavier burden on the system.

These two methods may prevent the downstream operator's state from expiring.

WDYT?

Best,

Yang Li

https://docs.google.com/document/d/1OQXE6Pf6pVLyX1cVmlwvZ1GnX5rvRpIyVttiEhy8BRs/edit?tab=t.edj7xjbz07e4


Re: [DISCUSS] FLIP-480: Support to deploy script in application mode

2024-10-29 Thread Ron Liu
I also have some questions:

1. Whether all SQL commands such as DDL & DML & SELECT are supported.
2. How to determine JobID and return JobID & ClusterId from the application
cluster
3. How to dynamically download the JAR specified by the user when
submitting the sql script, and whether it is possible to specify a local
jar?

Best,
Ron

Ron Liu  于2024年10月30日周三 10:57写道:

> Hi, Shengkai
>
> Thanks for initializing this FLIP,  supports application mode for SQL
> Gateway is a great job. The FLIP design looks good to me.
>
>
> I've read the FLIP-316 which mentions supporting deploying SQL job to
> application clusters for interactive or non-interactive gateway mode.
> But I noticed that you say this FLIP focuses on supporting deploy sql
> scripts to the application cluster, does it mean that it only supports
> non-interactive gateway mode?
>
>
> Best,
> Ron
>
> Shengkai Fang  于2024年10月29日周二 14:46写道:
>
>> Hi, HongShun. Thanks a lot for your response!
>>
>> > I wonder what is the scope of this FLIP, only aim for k8s, not including
>> yarn?
>>
>> This FLIP also works for the yarn-application mode. But the yarn
>> deployment
>> doesn't support to ship the artifacts into the remote side. Please
>> correct me if I'm wrong.
>>
>> > When talking about "custom", you mean these also will have some builtin
>> implementations? If it exists, how to get their location in dfs based on
>> SQL? Depending on some configuration or just convention over
>> configuration.
>>
>> I think the builtin artfacts are catalogs/connectors/udf that are located
>> at the $FLINK_HOME/lib directory.
>>
>> > Is the FLIP-316 still in need later?
>>
>> Yes. I think FLIP-316 is a great idea to use json plan to run the SQL Job
>> and it brings great convenience to users to submit job in application mode
>> in interactive mode.
>>
>> Best,
>> Shengkai
>>
>>
>>
>>
>> Shengkai Fang  于2024年10月29日周二 14:25写道:
>>
>> > Hi, Feng.
>> >
>> > Thanks for your response.
>> >
>> > > Will FLIP-316 merge into Flink 2.0 too ?
>> >
>> > I don't have time to finish the FLIP-316. So it depends on whether
>> anyone
>> > else can help to continue the discussion.
>> >
>> > > Will SqlDriver use the same one?
>> >
>> > Yes. We should reuse the same driver. I think the driver is the
>> entrypoint
>> > for the SQL script.
>> >
>> >
>> > > The details SQL-client deploy SQL File to Cluster may not be very
>> clear ?
>> >
>> > I have pushed a PoC branch about the change. Please take a look at
>> > https://github.com/fsk119/flink/tree/application-mode (I don't test it
>> > yet). At the mean time, I add a new method in the SqlGatewayService to
>> > describe the change.
>> >
>> > Best,
>> > Shengkai
>> >
>> >
>> >
>> > Feng Jin  于2024年10月25日周五 21:15写道:
>> >
>> >> Hi, Shenkai
>> >>
>> >> Thank you for initiating this FLIP, I understand that supporting
>> >> application mode for SQL gateway is very important. There are two small
>> >> issues.
>> >>
>> >> > FLIP-480 is different from FLIP-316
>> >>
>> >>
>> >>1. Will FLIP-316 merge into Flink 2.0 too ?
>> >>
>> >>
>> >>2. Will SqlDriver use the same one?
>> >>
>> >>
>> >> The details SQL-client deploy SQL File to Cluster may not be very
>> clear ?
>> >>
>> >> I guess that some modifications need to be made to the client here,
>> >> when deploying scripts in application mode, we need to call the newly
>> >> added
>> >> interface of the gateway service.
>> >>
>> >>
>> >> Best,
>> >> Feng
>> >>
>> >>
>> >> On Thu, Oct 24, 2024 at 4:27 PM Shengkai Fang 
>> wrote:
>> >>
>> >> > Hi, everyone.
>> >> >
>> >> > I'd like to initiate a discussion about FLIP-480: Support to deploy
>> >> script
>> >> > in application mode[1].
>> >> >
>> >> > FLIP-480 supports to solve the problem that table program can not
>> run in
>> >> > application mode. Comparing to FLIP-316[2], FLIP-480 tries to compile
>> >> the
>> >> > script in the JM side, which is free from the limitation of the JSON
>> >> > plan(JSON plan only serialize the identifier for temporary object) .
>> >> >
>> >> > For more details, please refer to the FLIP[1]. Welcome any feedback
>> and
>> >> > suggestions for improvement.
>> >> >
>> >> > Best,
>> >> > Shengkai
>> >> >
>> >> > [1]
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode
>> >> > [2]
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Support+application+mode+for+SQL+Gateway?src=contextnavpagetreemode
>> >> >
>> >>
>> >
>>
>


[DISCUSS][FLINK-36547] Add option to retain `RowKind` sematics for cdc formats

2024-10-29 Thread Yubin Li
Hi everyone,

As official docs said, `RowKind` semantics have been changed after encode:
-U -> -D, +D -> +I. In fact, we also have a demand to make it consistent in
many scenarios, such as those that require different processing of -U/-D
and +U/+I.

I have taken advantage of the difference between UPDATE_BEFORE and
UPDATE_AFTER to implement the feature and made it run well in business.
please see details in jira [1].

Looking forward to your feedback :)

[1] https://issues.apache.org/jira/browse/FLINK-36547

Best,
Yubin


Re: [DISCUSS] FLIP-480: Support to deploy script in application mode

2024-10-29 Thread Ron Liu
Hi, Shengkai

Thanks for initializing this FLIP,  supports application mode for SQL
Gateway is a great job. The FLIP design looks good to me.


I've read the FLIP-316 which mentions supporting deploying SQL job to
application clusters for interactive or non-interactive gateway mode.
But I noticed that you say this FLIP focuses on supporting deploy sql
scripts to the application cluster, does it mean that it only supports
non-interactive gateway mode?


Best,
Ron

Shengkai Fang  于2024年10月29日周二 14:46写道:

> Hi, HongShun. Thanks a lot for your response!
>
> > I wonder what is the scope of this FLIP, only aim for k8s, not including
> yarn?
>
> This FLIP also works for the yarn-application mode. But the yarn deployment
> doesn't support to ship the artifacts into the remote side. Please
> correct me if I'm wrong.
>
> > When talking about "custom", you mean these also will have some builtin
> implementations? If it exists, how to get their location in dfs based on
> SQL? Depending on some configuration or just convention over configuration.
>
> I think the builtin artfacts are catalogs/connectors/udf that are located
> at the $FLINK_HOME/lib directory.
>
> > Is the FLIP-316 still in need later?
>
> Yes. I think FLIP-316 is a great idea to use json plan to run the SQL Job
> and it brings great convenience to users to submit job in application mode
> in interactive mode.
>
> Best,
> Shengkai
>
>
>
>
> Shengkai Fang  于2024年10月29日周二 14:25写道:
>
> > Hi, Feng.
> >
> > Thanks for your response.
> >
> > > Will FLIP-316 merge into Flink 2.0 too ?
> >
> > I don't have time to finish the FLIP-316. So it depends on whether anyone
> > else can help to continue the discussion.
> >
> > > Will SqlDriver use the same one?
> >
> > Yes. We should reuse the same driver. I think the driver is the
> entrypoint
> > for the SQL script.
> >
> >
> > > The details SQL-client deploy SQL File to Cluster may not be very
> clear ?
> >
> > I have pushed a PoC branch about the change. Please take a look at
> > https://github.com/fsk119/flink/tree/application-mode (I don't test it
> > yet). At the mean time, I add a new method in the SqlGatewayService to
> > describe the change.
> >
> > Best,
> > Shengkai
> >
> >
> >
> > Feng Jin  于2024年10月25日周五 21:15写道:
> >
> >> Hi, Shenkai
> >>
> >> Thank you for initiating this FLIP, I understand that supporting
> >> application mode for SQL gateway is very important. There are two small
> >> issues.
> >>
> >> > FLIP-480 is different from FLIP-316
> >>
> >>
> >>1. Will FLIP-316 merge into Flink 2.0 too ?
> >>
> >>
> >>2. Will SqlDriver use the same one?
> >>
> >>
> >> The details SQL-client deploy SQL File to Cluster may not be very clear
> ?
> >>
> >> I guess that some modifications need to be made to the client here,
> >> when deploying scripts in application mode, we need to call the newly
> >> added
> >> interface of the gateway service.
> >>
> >>
> >> Best,
> >> Feng
> >>
> >>
> >> On Thu, Oct 24, 2024 at 4:27 PM Shengkai Fang 
> wrote:
> >>
> >> > Hi, everyone.
> >> >
> >> > I'd like to initiate a discussion about FLIP-480: Support to deploy
> >> script
> >> > in application mode[1].
> >> >
> >> > FLIP-480 supports to solve the problem that table program can not run
> in
> >> > application mode. Comparing to FLIP-316[2], FLIP-480 tries to compile
> >> the
> >> > script in the JM side, which is free from the limitation of the JSON
> >> > plan(JSON plan only serialize the identifier for temporary object) .
> >> >
> >> > For more details, please refer to the FLIP[1]. Welcome any feedback
> and
> >> > suggestions for improvement.
> >> >
> >> > Best,
> >> > Shengkai
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode
> >> > [2]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Support+application+mode+for+SQL+Gateway?src=contextnavpagetreemode
> >> >
> >>
> >
>


[jira] [Created] (FLINK-36628) OpenTelemetryTestBase.eventuallyConsumeJson failed on AZP

2024-10-29 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-36628:
--

 Summary: OpenTelemetryTestBase.eventuallyConsumeJson failed on AZP
 Key: FLINK-36628
 URL: https://issues.apache.org/jira/browse/FLINK-36628
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 2.0.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-480: Support to deploy script in application mode

2024-10-29 Thread Feng Jin
Hi Shengkai,

Thank you for the timely updates and replies:


   1. I have been studying your POC code, introducing OperationExecutor in
   SqlRunner is a great idea, which can maximize compatibility with SqlClient.

Especially some SQL statements cannot be executed directly in the Table
Environment, such as those related to Materialized Tables statements.


   2. I also have a minor question about the return value of
   /sessions/${session-id}/scripts.

Here we directly return ClusterID as toString. But if only clusterID is
available, it may not be very convenient to connect to this application
later on.
It would be better to have:
"kubernetes.cluster-id": "my-first-application-cluster"
or:
"yarn.application.id": "application-xxx" .
About this, I am currently working on FLIP-479.[1] Which will support
converting ClusterID into Map Options. What do you think about it ?


[1].
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=327977476




Best,
Feng


On Tue, Oct 29, 2024 at 2:46 PM Shengkai Fang  wrote:

> Hi, HongShun. Thanks a lot for your response!
>
> > I wonder what is the scope of this FLIP, only aim for k8s, not including
> yarn?
>
> This FLIP also works for the yarn-application mode. But the yarn deployment
> doesn't support to ship the artifacts into the remote side. Please
> correct me if I'm wrong.
>
> > When talking about "custom", you mean these also will have some builtin
> implementations? If it exists, how to get their location in dfs based on
> SQL? Depending on some configuration or just convention over configuration.
>
> I think the builtin artfacts are catalogs/connectors/udf that are located
> at the $FLINK_HOME/lib directory.
>
> > Is the FLIP-316 still in need later?
>
> Yes. I think FLIP-316 is a great idea to use json plan to run the SQL Job
> and it brings great convenience to users to submit job in application mode
> in interactive mode.
>
> Best,
> Shengkai
>
>
>
>
> Shengkai Fang  于2024年10月29日周二 14:25写道:
>
> > Hi, Feng.
> >
> > Thanks for your response.
> >
> > > Will FLIP-316 merge into Flink 2.0 too ?
> >
> > I don't have time to finish the FLIP-316. So it depends on whether anyone
> > else can help to continue the discussion.
> >
> > > Will SqlDriver use the same one?
> >
> > Yes. We should reuse the same driver. I think the driver is the
> entrypoint
> > for the SQL script.
> >
> >
> > > The details SQL-client deploy SQL File to Cluster may not be very
> clear ?
> >
> > I have pushed a PoC branch about the change. Please take a look at
> > https://github.com/fsk119/flink/tree/application-mode (I don't test it
> > yet). At the mean time, I add a new method in the SqlGatewayService to
> > describe the change.
> >
> > Best,
> > Shengkai
> >
> >
> >
> > Feng Jin  于2024年10月25日周五 21:15写道:
> >
> >> Hi, Shenkai
> >>
> >> Thank you for initiating this FLIP, I understand that supporting
> >> application mode for SQL gateway is very important. There are two small
> >> issues.
> >>
> >> > FLIP-480 is different from FLIP-316
> >>
> >>
> >>1. Will FLIP-316 merge into Flink 2.0 too ?
> >>
> >>
> >>2. Will SqlDriver use the same one?
> >>
> >>
> >> The details SQL-client deploy SQL File to Cluster may not be very clear
> ?
> >>
> >> I guess that some modifications need to be made to the client here,
> >> when deploying scripts in application mode, we need to call the newly
> >> added
> >> interface of the gateway service.
> >>
> >>
> >> Best,
> >> Feng
> >>
> >>
> >> On Thu, Oct 24, 2024 at 4:27 PM Shengkai Fang 
> wrote:
> >>
> >> > Hi, everyone.
> >> >
> >> > I'd like to initiate a discussion about FLIP-480: Support to deploy
> >> script
> >> > in application mode[1].
> >> >
> >> > FLIP-480 supports to solve the problem that table program can not run
> in
> >> > application mode. Comparing to FLIP-316[2], FLIP-480 tries to compile
> >> the
> >> > script in the JM side, which is free from the limitation of the JSON
> >> > plan(JSON plan only serialize the identifier for temporary object) .
> >> >
> >> > For more details, please refer to the FLIP[1]. Welcome any feedback
> and
> >> > suggestions for improvement.
> >> >
> >> > Best,
> >> > Shengkai
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode
> >> > [2]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Support+application+mode+for+SQL+Gateway?src=contextnavpagetreemode
> >> >
> >>
> >
>


Re: Micro batching with flink

2024-10-29 Thread Venkatakrishnan Sowrirajan
Can you share more details on what do you mean by micro-batching? Can you
explain with an example to understand it better?

Thanks
Venkat

On Tue, Oct 29, 2024, 1:22 PM Anil Dasari 
wrote:

> Hello team,
> I apologize for reaching out on the dev mailing list. I'm working on
> implementing micro-batching with near real-time processing.
> I've seen similar questions in the Flink Slack channel and user mailing
> list, but there hasn't been much discussion or feedback. Here are the
> options I've explored:
> 1. Windowing: This approach looked promising, but the flushing mechanism
> requires record-level information checks, as window data isn't accessible
> throughout the pipeline.
> 2. Window + Trigger: This method buffers events until the trigger interval
> is reached, which affects real-time processing; events are only processed
> when the trigger occurs.
> 3. Processing Time: The processing time is specific to each file writer,
> resulting in inconsistencies across different task managers.
> 4. Watermark: There’s no global watermark; it's specific to each source
> task, and the initial watermark information (before the first watermark
> event) isn't epoch-based.
> I'm looking to write data grouped by time (micro-batch time). What’s the
> best approach to achieve micro-batching in Flink?
> Let me know if you have any questions. thanks.
> Thanks.
>
>


Re: Micro batching with flink

2024-10-29 Thread Anil Dasari
 Hi Venkat,Thanks for the reply. 
Microbatching is a data processing technique where small batches of data are 
collected and processed together at regular intervals.However, I'm aiming to 
avoid traditional micro-batch processing by tagging records within a time 
window as a batch, allowing for near-real-time data processing. I’m currently 
exploring Flink for the following use case:
1. Group data by a time window and write it to S3 under the appropriate 
prefix.2. Generate metrics for the microbatch and, if needed, store them in 
S3.3. Send metrics to an external system to notify that Step 1 has been 
completed.If any part of the process fails, the entire microbatch step should 
be rolled back. Planning to implement two phase commit sink for Step 2 and 3. 
The primary challenge is tagging the record set with epoch time across all 
tasks within a window to utilize it in the sink process for creating 
committable splits, such as the processing time in the flink file sink.
ThanksOn Tuesday, October 29, 2024 at 09:40:40 PM PDT, Venkatakrishnan 
Sowrirajan  wrote:  
 
 Can you share more details on what do you mean by micro-batching? Can you
explain with an example to understand it better?

Thanks
Venkat

On Tue, Oct 29, 2024, 1:22 PM Anil Dasari 
wrote:

> Hello team,
> I apologize for reaching out on the dev mailing list. I'm working on
> implementing micro-batching with near real-time processing.
> I've seen similar questions in the Flink Slack channel and user mailing
> list, but there hasn't been much discussion or feedback. Here are the
> options I've explored:
> 1. Windowing: This approach looked promising, but the flushing mechanism
> requires record-level information checks, as window data isn't accessible
> throughout the pipeline.
> 2. Window + Trigger: This method buffers events until the trigger interval
> is reached, which affects real-time processing; events are only processed
> when the trigger occurs.
> 3. Processing Time: The processing time is specific to each file writer,
> resulting in inconsistencies across different task managers.
> 4. Watermark: There’s no global watermark; it's specific to each source
> task, and the initial watermark information (before the first watermark
> event) isn't epoch-based.
> I'm looking to write data grouped by time (micro-batch time). What’s the
> best approach to achieve micro-batching in Flink?
> Let me know if you have any questions. thanks.
> Thanks.
>
>