Re: [DISCUSS] FLIP-440: User-defined SQL operators / ProcessTableFunction (PTF)

2024-10-06 Thread Jim Hughes
Hi Timo,

After thinking about it more today, I have some additionals questions /
topics:

9.  For the existing window TVFs, GROUP BYs over the window start and end
are optimized to return only one result per group.  With the proposal as
is, would it be possible to implement a replacement/copy of the temporal
windows which is optimized in a similar way?  (The statement "The optimizer
optimizes around PTFs." makes me think that this would not be possible.)
10. How hard is it to have support for State TTL with the initial
implementation?  From the list of future work, that seems to be the first
one / most important that I'd find missing.

Cheers,

Jim

On Tue, Oct 1, 2024 at 5:02 PM Jim Hughes  wrote:

> Hi Timo,
>
> Thanks for the FLIP!  Also, thanks for saying that this has been in your
> head for a few years; there is a ton here.
>
> 1. For the pass through semantics, if the partition columns are already
> listed in the pass through column, they are not duplicated right?
> 2. A number of places mention that Calcite doesn't support XYZ.  Do we
> have tickets for that work?
> 3. I like the scoping section.  Relative to "More than two tables is out
> of scope for this FLIP.", will need to change any of the interfaces in a
> major way to support multiple output tables in the future?
> 4. The section "Empty Semantics" under Scoping is a bit terse and I'll
> admit that I don't understand it.  Could you say more in the FLIP there?
> 5. For Pass Through Semantics, will adding PASS THROUGH later be easy to
> do?  Or is there some reason to avoid it completely?
> 6. nit: Under `TimedLastValue`, there is a line which is copied from the
> above "The on_time descriptor takes two arguments in this case to forward
> the time attributes of each side."
> 7. Will upsert mode be possible later?  Can you say more about why upsert
> is not supported?  (I can guess, but it seems like a brief discussion in
> the FLIP would be useful.)
> 8. In terms of the two bullets at the end migration plan, I am for both
> changing the order of SESSION window colums and changing the name from
> TIMECOL to on_time (or support both names?).  Is there any downside to
> doing so?
>
> Thanks,
>
> Jim
>
> On Mon, Sep 23, 2024 at 6:38 PM Timo Walther  wrote:
>
>> Hi everyone,
>>
>> I'm super excited to start a discussion about FLIP-440: User-defined SQL
>> operators / ProcessTableFunction (PTF) [1].
>>
>> This FLIP has been in my head for many years and Flink 2.0 is a good
>> time to open up Flink SQL to a new category of use cases. Following the
>> principle "Make simple things easy, and complex ones possible", I would
>> like to propose a new UDF type "ProcessTableFunction" that gives users
>> access to Flink's primitives for advanced stream processing. This should
>> unblock people when hitting shortcomings in Flink SQL and expand the
>> scope of SQL from analytical to more event-driven applications.
>>
>> This proposal is by no means a full replacement of DataStream API.
>> DataStream API will always provide the full power of Flink whereas PTFs
>> provide at least a necessary toolbox to cover ~80% of all use cases
>> without leaving the SQL ecosystem. The SQL ecosystem is a great
>> foundation with well-defined type system, catalog integration, CDC
>> support, and built-in functions/operators. PTFs complete it by offering
>> a standard compliant extension point.
>>
>> Looking forward to your feedback.
>>
>> Thanks,
>> Timo
>>
>> [1] https://cwiki.apache.org/confluence/x/pQnPEQ
>>
>


[jira] [Created] (FLINK-36430) Enhancing Flink History Server File Storage and Retrieval with RocksDB

2024-10-06 Thread Xiaowen Sun (Jira)
Xiaowen Sun created FLINK-36430:
---

 Summary: Enhancing Flink History Server File Storage and Retrieval 
with RocksDB
 Key: FLINK-36430
 URL: https://issues.apache.org/jira/browse/FLINK-36430
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.20.0
Reporter: Xiaowen Sun


Currently, when a Flink job finishes, it writes an archive as a single file 
that maps paths to JSON files. Flink History Server (FHS) job archives are 
pulled locally where the FHS is running on, and this process creates a local 
directory that expands based on the contents of the single archive file.

Because of how the FHS stores the files, there are a large number of 
directories created in the local file system. This system can become 
inefficient and slow as the volume of job archives increases, creating 
bottlenecks in job data navigation and retrieval.

To illustrate the problem of inode usage, let’s consider a scenario where there 
are 5000 subtasks. Each subtask creates its own directory, and within each 
subtask directory, there are additional directories that might store only a 
single file. This structure rapidly increases the number of inodes consumed.

Integrating RocksDB, a high-performance embedded database for key-value data, 
aims to resolve these issues by offering faster data access and better 
scalability. This integration is expected to significantly enhance the 
operational efficiency of FHS by allowing faster data retrieval and enabling a 
larger cache on local Kubernetes deployments, thus overcoming inode limitations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36429) Enhancing Flink History Server File Storage and Retrieval with RocksDB

2024-10-06 Thread Xiaowen Sun (Jira)
Xiaowen Sun created FLINK-36429:
---

 Summary: Enhancing Flink History Server File Storage and Retrieval 
with RocksDB
 Key: FLINK-36429
 URL: https://issues.apache.org/jira/browse/FLINK-36429
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.20.0
Reporter: Xiaowen Sun


Currently, when a Flink job finishes, it writes an archive as a single file 
that maps paths to JSON files. Flink History Server (FHS) job archives are 
pulled locally where the FHS is running on, and this process creates a local 
directory that expands based on the contents of the single archive file.

Because of how the FHS stores the files, there are a large number of 
directories created in the local file system. This system can become 
inefficient and slow as the volume of job archives increases, creating 
bottlenecks in job data navigation and retrieval.

To illustrate the problem of inode usage, let’s consider a scenario where there 
are 5000 subtasks. Each subtask creates its own directory, and within each 
subtask directory, there are additional directories that might store only a 
single file. This structure rapidly increases the number of inodes consumed.

Integrating RocksDB, a high-performance embedded database for key-value data, 
aims to resolve these issues by offering faster data access and better 
scalability. This integration is expected to significantly enhance the 
operational efficiency of FHS by allowing faster data retrieval and enabling a 
larger cache on local Kubernetes deployments, thus overcoming inode limitations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36431) Enhancing Flink History Server File Storage and Retrieval with RocksDB

2024-10-06 Thread Xiaowen Sun (Jira)
Xiaowen Sun created FLINK-36431:
---

 Summary: Enhancing Flink History Server File Storage and Retrieval 
with RocksDB
 Key: FLINK-36431
 URL: https://issues.apache.org/jira/browse/FLINK-36431
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.20.0
Reporter: Xiaowen Sun


Currently, when a Flink job finishes, it writes an archive as a single file 
that maps paths to JSON files. Flink History Server (FHS) job archives are 
pulled locally where the FHS is running on, and this process creates a local 
directory that expands based on the contents of the single archive file.

Because of how the FHS stores the files, there are a large number of 
directories created in the local file system. This system can become 
inefficient and slow as the volume of job archives increases, creating 
bottlenecks in job data navigation and retrieval.

To illustrate the problem of inode usage, let’s consider a scenario where there 
are 5000 subtasks. Each subtask creates its own directory, and within each 
subtask directory, there are additional directories that might store only a 
single file. This structure rapidly increases the number of inodes consumed.

Integrating RocksDB, a high-performance embedded database for key-value data, 
aims to resolve these issues by offering faster data access and better 
scalability. This integration is expected to significantly enhance the 
operational efficiency of FHS by allowing faster data retrieval and enabling a 
larger cache on local Kubernetes deployments, thus overcoming inode limitations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36428) DynamoDb Table API Sink fails when null value in the RowData

2024-10-06 Thread maoxingda (Jira)
maoxingda created FLINK-36428:
-

 Summary: DynamoDb Table API Sink fails when null value in the 
RowData
 Key: FLINK-36428
 URL: https://issues.apache.org/jira/browse/FLINK-36428
 Project: Flink
  Issue Type: Bug
  Components: Connectors / AWS
Affects Versions: 1.18.1
Reporter: maoxingda
 Fix For: 1.19.1


DynamoDb Table API Sink fails when there are null values in the RowData.

 
package com.meican;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class SqlDynamodbSinkApp {
public static void main(String[] args) {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql("create temporary view source as " +
"select '1' as id, 'name1' as name, 18 as age union all " +
"select '2' as id, 'name2' as name, 19 as age union all " +
"select '3' as id, cast(null as string) as name, 20 as age"
);

tableEnv.executeSql("create table sink" +
"(" +
" id string," +
" name string," +
" age int" +
") partitioned by ( id )" +
"with" +
"(" +
" 'connector' = 'dynamodb'," +
" 'aws.region' = 'cn-northwest-1'," +
" 'table-name' = 'bi-oltp-mydata'," +
" 'ignore-nulls' = 'true'" +
")"
);

tableEnv.executeSql("insert into sink select * from source");
}
}
 
java.lang.NullPointerException: null
    at 
org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39)
 ~[flink-table-runtime-1.18.0.jar:1.18.0]
    at 
org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27)
 ~[flink-table-runtime-1.18.0.jar:1.18.0]
    at 
org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at 
software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
    at 
java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) 
~[?:?]
    at 
software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at 
software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at 
org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328)
 ~[flink-connector-files-1.17.2.jar:1.17.2]
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) 
~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.ru