Hi David, > * Is the Paimon system tables part of the SQL standard? No system tables are system-specific. The way of modeling the system tables is standard compliant though.
> * For the HTTP connector as a lookup source, the HTTP connector table is only every accessed as part of the lookup join, so we do not need to every show nulls for a non-nullable field. In this case we are only showing nulls in the join output. We think that in this case is could be reasonable to use metadata columns in the short term and show nulls in the join output? There are two things mixed in your question: * The schema of the lookup join can contain NOT NULL constraints. * If it's used as a lookup join, then it's effectively an outer join. So the resulting schema usually has those fields NOT NULL anyways. So yes, virtual columns might actually work fine for a lookup connector. I wouldn't want to generalize it to all lookup connectors (e.g. put this in the framework) because if we decide to go the system table route, it would introduce two alternative solutions to the same issue. But if you need a short-term solution for the HTTP connector, it would probably work. > * Can I confirm your thinking on how we would set the schema of the side output [1] and for the proposed system tables? The side-output is on a lower level (=datastream) and would not have schema. The user can attach an error handler for an enriched exception with context, which may or may not emit the data to the side output and encode it as a DataStream record as the user desires. Table API would then provide a specific implementation of the handler. The schema of the system table would be up for debate in the FLIP. The schema would be static per connector and contains some system specific fields and some connector specific fields. How I'd propose it CREATE OR REPLACE TABLE errors ( `error_timestamp` TIMESTAMP_LTZ(3) NOT NULL COMMENT 'The timestamp when the error occurred.', `error_code` INT NOT NULL COMMENT 'The code which uniquely identifies different error.', `error_reason ` STRING NOT NULL COMMENT 'The reason corresponding to the error code (1:1 mapping).', `error_message` STRING NOT NULL COMMENT 'A human-readable message describing the error. The representation of the error message may change over time, so it should not be used for programmatic purposes.', ` error_details` MAP<STRING, STRING> NOT NULL COMMENT 'Additional details about the error, which may be used for debugging purposes. Key-value pairs in this map are stable and can be used for tooling. New entries may be added over time.', `affected_type` STRING NOT NULL COMMENT 'The type of the object affected by the error (currently this is always ''table'').', ` affected_catalog` STRING COMMENT 'The catalog name of the object affected by the error.', `affected_database` STRING COMMENT 'The database name of the object affected by the error.', `affected_name` STRING COMMENT 'The name of the object affected by the error.', `source_record` ROW<...> COMMENT 'The connector-specific source record that caused the error. My be null if the error is not related to deserialization.' ) Since I later want to cover sinks and UDFs, some parts may appear too general. Best, Arvid On Thu, Jun 26, 2025 at 5:23 PM David Radley <david_rad...@uk.ibm.com> wrote: > Hi Arvid, > Thank you very much for your response – your ideas sound good as it is a > more natural way of handling the errors. > > I had some questions : > > * Is the Paimon system tables part of the SQL standard? > * For the HTTP connector as a lookup source, the HTTP connector table > is only every accessed as part of the lookup join, so we do not need to > every show nulls for a non-nullable field. In this case we are only showing > nulls in the join output. We think that in this case is could be reasonable > to use metadata columns in the short term and show nulls in the join output? > * Can I confirm your thinking on how we would set the schema of the > side output [1] and for the proposed system tables? > > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ > > > Kind regards, David. > From: Arvid Heise <ahe...@confluent.io.INVALID> > Date: Thursday, 26 June 2025 at 15:07 > To: dev@flink.apache.org <dev@flink.apache.org> > Cc: Mark Nuttall <mnutt...@uk.ibm.com> > Subject: [EXTERNAL] Re: Error aware SQL streams > Hi David, > > Flink currently has not a compelling story when it comes to error handling > and I'd like to change that. > > I'd advocate for an approach that naturally translates into dead letter > queues as known from other stream processors such as KStreams. [1] With > your idea of metadata columns, you are forced to forgo all NOT NULL > constraints in the schema because if you have issues while fetching or > deserializing a record, you cannot have meaningful values in those fields. > Instead of that, having side-outputs will allow us to retain all the > constraints. > > Indeed, the side-outputs of DataStream API are a good building block for > error handling. However, we don't have them directly in sources and sinks. > My first action would be to amend that. > > Then, we should make side-outputs accessible to connectors, so that they > can signal errors on the records. For example, we could extend the > RecordEmitter of the SourceReaderBase framework to allow outputting errors. > Together with a user option on what to do on these errors (fail, > side-output, ignore), we would already allow DataStream users to reroute > those errors to dead letter queues. > > Finally, we need to find a good SQL abstraction. Paimon has this nice > concept of system tables [2]. We could use that to define error tables that > can be used for error handling. > > EXECUTE STATEMENT SET > BEGIN > INSERT INTO output SELECT * FROM input; > INSERT INTO input_errors SELECT * FROM input$errors; > INSERT INTO output_errors SELECT * FROM output$errors; > END; > > During plan translation we would map input$errors to the side-output of > input. This abstraction allows you to filter for specific errors and > reroute them to different error logs (e.g. urgent and non-urgent errors). > > So to summarize, I'd aim for 3 FLIPs > 1. Add side-outputs to source/sink. > 2. Add abstraction to connectors to output errors. Provide an option for > users to choose behavior on error. User's may use DLQs on datastream API. > 3. Add support for system tables for source/sink errors in SQL. User's may > use DLQs on SQL/Table API. > > WDYT? > > Arvid > > [1] > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams > [2] https://paimon.apache.org/docs/1.1/concepts/system-tables/ > > On Fri, Jun 13, 2025 at 5:07 PM David Radley <david_rad...@uk.ibm.com> > wrote: > > > Hi, > > We are looking at enhancing the getindata connector to add support for a > > metadata column, that could surface errors from the connector to the SQL > > level, so the SQL could then act on this error information. See Getindata > > Flink HTTP connector issue [1]. > > > > For the HTTP Connector we are thinking of: > > > > create table api_lookup ( > > `customer_id` STRING NOT NULL, > > `customer_records_from_backend` STRING, > > `errorMessage` STRING METADATA FROM `error_string`, > > `responseCode` INT METADATA FROM `error_code`) > > WITH ( > > `connector` = 'rest_lookup', > > ... > > ) > > > > The subsequent lookup join getting nulls for the values in this error > > case, but the metadata columns would contain error information that would > > allow the stream to handle the errors. > > > > This sort of functionality allows an enterprise to handle errors more > > autonomously/ naturally in the flow – rather than jobs failing and logs > > needing to be analysed. > > > > I was thinking that this approach could be useful for JDBC – surfacing he > > JDBC errors. > > > > Also for the Kafka connector we have side outputs[2] for datastream. I > > wonder if the Kafka connector could surface badly formed events via > > metadata columns – allowing for the flow itself to manage errors in SQL > > > > I do wonder whether the errorMessage and responseCode could be baked into > > Flink as the way to pass this information into the stream. For now, we > will > > implement in the HTTP connector. > > > > What do people think of this idea to enhance SQL to be “error aware”? > > Metadata columns seem a good way to do this, Flink SQL support for try > > catch could be another approach. Has SQL try catch in Flink ever been > > looked into? > > > > Kind regards, David. > > > > > > > > > > [1] https://github.com/getindata/flink-http-connector/issues/154 > > > > [2] > > > https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/side_output/ > > > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > > Winchester, Hampshire SO21 2JN > > > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > Winchester, Hampshire SO21 2JN >