Re: [DISCUSS] FLIP-440: User-defined SQL operators / ProcessTableFunction (PTF)
Hi Timo, After thinking about it more today, I have some additionals questions / topics: 9. For the existing window TVFs, GROUP BYs over the window start and end are optimized to return only one result per group. With the proposal as is, would it be possible to implement a replacement/copy of the temporal windows which is optimized in a similar way? (The statement "The optimizer optimizes around PTFs." makes me think that this would not be possible.) 10. How hard is it to have support for State TTL with the initial implementation? From the list of future work, that seems to be the first one / most important that I'd find missing. Cheers, Jim On Tue, Oct 1, 2024 at 5:02 PM Jim Hughes wrote: > Hi Timo, > > Thanks for the FLIP! Also, thanks for saying that this has been in your > head for a few years; there is a ton here. > > 1. For the pass through semantics, if the partition columns are already > listed in the pass through column, they are not duplicated right? > 2. A number of places mention that Calcite doesn't support XYZ. Do we > have tickets for that work? > 3. I like the scoping section. Relative to "More than two tables is out > of scope for this FLIP.", will need to change any of the interfaces in a > major way to support multiple output tables in the future? > 4. The section "Empty Semantics" under Scoping is a bit terse and I'll > admit that I don't understand it. Could you say more in the FLIP there? > 5. For Pass Through Semantics, will adding PASS THROUGH later be easy to > do? Or is there some reason to avoid it completely? > 6. nit: Under `TimedLastValue`, there is a line which is copied from the > above "The on_time descriptor takes two arguments in this case to forward > the time attributes of each side." > 7. Will upsert mode be possible later? Can you say more about why upsert > is not supported? (I can guess, but it seems like a brief discussion in > the FLIP would be useful.) > 8. In terms of the two bullets at the end migration plan, I am for both > changing the order of SESSION window colums and changing the name from > TIMECOL to on_time (or support both names?). Is there any downside to > doing so? > > Thanks, > > Jim > > On Mon, Sep 23, 2024 at 6:38 PM Timo Walther wrote: > >> Hi everyone, >> >> I'm super excited to start a discussion about FLIP-440: User-defined SQL >> operators / ProcessTableFunction (PTF) [1]. >> >> This FLIP has been in my head for many years and Flink 2.0 is a good >> time to open up Flink SQL to a new category of use cases. Following the >> principle "Make simple things easy, and complex ones possible", I would >> like to propose a new UDF type "ProcessTableFunction" that gives users >> access to Flink's primitives for advanced stream processing. This should >> unblock people when hitting shortcomings in Flink SQL and expand the >> scope of SQL from analytical to more event-driven applications. >> >> This proposal is by no means a full replacement of DataStream API. >> DataStream API will always provide the full power of Flink whereas PTFs >> provide at least a necessary toolbox to cover ~80% of all use cases >> without leaving the SQL ecosystem. The SQL ecosystem is a great >> foundation with well-defined type system, catalog integration, CDC >> support, and built-in functions/operators. PTFs complete it by offering >> a standard compliant extension point. >> >> Looking forward to your feedback. >> >> Thanks, >> Timo >> >> [1] https://cwiki.apache.org/confluence/x/pQnPEQ >> >
[jira] [Created] (FLINK-36430) Enhancing Flink History Server File Storage and Retrieval with RocksDB
Xiaowen Sun created FLINK-36430: --- Summary: Enhancing Flink History Server File Storage and Retrieval with RocksDB Key: FLINK-36430 URL: https://issues.apache.org/jira/browse/FLINK-36430 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.20.0 Reporter: Xiaowen Sun Currently, when a Flink job finishes, it writes an archive as a single file that maps paths to JSON files. Flink History Server (FHS) job archives are pulled locally where the FHS is running on, and this process creates a local directory that expands based on the contents of the single archive file. Because of how the FHS stores the files, there are a large number of directories created in the local file system. This system can become inefficient and slow as the volume of job archives increases, creating bottlenecks in job data navigation and retrieval. To illustrate the problem of inode usage, let’s consider a scenario where there are 5000 subtasks. Each subtask creates its own directory, and within each subtask directory, there are additional directories that might store only a single file. This structure rapidly increases the number of inodes consumed. Integrating RocksDB, a high-performance embedded database for key-value data, aims to resolve these issues by offering faster data access and better scalability. This integration is expected to significantly enhance the operational efficiency of FHS by allowing faster data retrieval and enabling a larger cache on local Kubernetes deployments, thus overcoming inode limitations -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36429) Enhancing Flink History Server File Storage and Retrieval with RocksDB
Xiaowen Sun created FLINK-36429: --- Summary: Enhancing Flink History Server File Storage and Retrieval with RocksDB Key: FLINK-36429 URL: https://issues.apache.org/jira/browse/FLINK-36429 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.20.0 Reporter: Xiaowen Sun Currently, when a Flink job finishes, it writes an archive as a single file that maps paths to JSON files. Flink History Server (FHS) job archives are pulled locally where the FHS is running on, and this process creates a local directory that expands based on the contents of the single archive file. Because of how the FHS stores the files, there are a large number of directories created in the local file system. This system can become inefficient and slow as the volume of job archives increases, creating bottlenecks in job data navigation and retrieval. To illustrate the problem of inode usage, let’s consider a scenario where there are 5000 subtasks. Each subtask creates its own directory, and within each subtask directory, there are additional directories that might store only a single file. This structure rapidly increases the number of inodes consumed. Integrating RocksDB, a high-performance embedded database for key-value data, aims to resolve these issues by offering faster data access and better scalability. This integration is expected to significantly enhance the operational efficiency of FHS by allowing faster data retrieval and enabling a larger cache on local Kubernetes deployments, thus overcoming inode limitations -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36431) Enhancing Flink History Server File Storage and Retrieval with RocksDB
Xiaowen Sun created FLINK-36431: --- Summary: Enhancing Flink History Server File Storage and Retrieval with RocksDB Key: FLINK-36431 URL: https://issues.apache.org/jira/browse/FLINK-36431 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.20.0 Reporter: Xiaowen Sun Currently, when a Flink job finishes, it writes an archive as a single file that maps paths to JSON files. Flink History Server (FHS) job archives are pulled locally where the FHS is running on, and this process creates a local directory that expands based on the contents of the single archive file. Because of how the FHS stores the files, there are a large number of directories created in the local file system. This system can become inefficient and slow as the volume of job archives increases, creating bottlenecks in job data navigation and retrieval. To illustrate the problem of inode usage, let’s consider a scenario where there are 5000 subtasks. Each subtask creates its own directory, and within each subtask directory, there are additional directories that might store only a single file. This structure rapidly increases the number of inodes consumed. Integrating RocksDB, a high-performance embedded database for key-value data, aims to resolve these issues by offering faster data access and better scalability. This integration is expected to significantly enhance the operational efficiency of FHS by allowing faster data retrieval and enabling a larger cache on local Kubernetes deployments, thus overcoming inode limitations -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36428) DynamoDb Table API Sink fails when null value in the RowData
maoxingda created FLINK-36428: - Summary: DynamoDb Table API Sink fails when null value in the RowData Key: FLINK-36428 URL: https://issues.apache.org/jira/browse/FLINK-36428 Project: Flink Issue Type: Bug Components: Connectors / AWS Affects Versions: 1.18.1 Reporter: maoxingda Fix For: 1.19.1 DynamoDb Table API Sink fails when there are null values in the RowData. package com.meican; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class SqlDynamodbSinkApp { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("create temporary view source as " + "select '1' as id, 'name1' as name, 18 as age union all " + "select '2' as id, 'name2' as name, 19 as age union all " + "select '3' as id, cast(null as string) as name, 20 as age" ); tableEnv.executeSql("create table sink" + "(" + " id string," + " name string," + " age int" + ") partitioned by ( id )" + "with" + "(" + " 'connector' = 'dynamodb'," + " 'aws.region' = 'cn-northwest-1'," + " 'table-name' = 'bi-oltp-mydata'," + " 'ignore-nulls' = 'true'" + ")" ); tableEnv.executeSql("insert into sink select * from source"); } } java.lang.NullPointerException: null at org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39) ~[flink-table-runtime-1.18.0.jar:1.18.0] at org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27) ~[flink-table-runtime-1.18.0.jar:1.18.0] at org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54) ~[dynamodb-enhanced-2.20.144.jar:?] at software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518) ~[dynamodb-enhanced-2.20.144.jar:?] at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?] at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) ~[?:?] at software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516) ~[dynamodb-enhanced-2.20.144.jar:?] at software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67) ~[dynamodb-enhanced-2.20.144.jar:?] at org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328) ~[flink-connector-files-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.ru