Re: [VOTE] FLIP-399: Flink Connector Doris
+1 ( non binding ) Bests, Samrat On Fri, 12 Apr 2024 at 1:07 PM, Hang Ruan wrote: > +1 (non-binding) > > Best, > Hang > > Martijn Visser 于2024年4月12日周五 05:39写道: > > > +1 (binding) > > > > On Wed, Apr 10, 2024 at 4:34 AM Jing Ge > > wrote: > > > > > +1(binding) > > > > > > Best regards, > > > Jing > > > > > > On Tue, Apr 9, 2024 at 8:54 PM Feng Jin wrote: > > > > > > > +1 (non-binding) > > > > > > > > Best, > > > > Feng > > > > > > > > On Tue, Apr 9, 2024 at 5:56 PM gongzhongqiang < > > gongzhongqi...@apache.org > > > > > > > > wrote: > > > > > > > > > +1 (non-binding) > > > > > > > > > > Best, > > > > > > > > > > Zhongqiang Gong > > > > > > > > > > wudi <676366...@qq.com.invalid> 于2024年4月9日周二 10:48写道: > > > > > > > > > > > Hi devs, > > > > > > > > > > > > I would like to start a vote about FLIP-399 [1]. The FLIP is > about > > > > > > contributing the Flink Doris Connector[2] to the Flink community. > > > > > > Discussion thread [3]. > > > > > > > > > > > > The vote will be open for at least 72 hours unless there is an > > > > objection > > > > > or > > > > > > insufficient votes. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > Di.Wu > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris > > > > > > [2] https://github.com/apache/doris-flink-connector > > > > > > [3] > > https://lists.apache.org/thread/p3z4wsw3ftdyfs9p2wd7bbr2gfyl3xnh > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format
Hi David, Thank you for the suggestion. IIUC, you are proposing using an explicit schema string, instead of the schemaID. This makes sense, as it would make the behavior consistent with Avro, although a bit more verbose from a config standpoint. If we go via the schema string route, the user would have to ensure that the input schema string corresponds to an existing schemaID. This however, might end up registering a new id (based on https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493 ). How about adding both the options (explicit schema string/ schemaID). If a schema string is specified we register a new schemaID, if the user specifies an explicit schemaID we just use it directly? Thanks Anupam On Wed, Apr 10, 2024 at 2:27 PM David Radley wrote: > Hi, > I notice in the draft pr that there is a schema id in the format config. I > was wondering why? In the confluent avro and existing debezium formats, > there is no schema id in the config, but there is the ability to specify a > complete schema. In the protobuf format there is no schema id. > > I assume the schema id would be used during serialize in the case there is > already an existing registered schema and you have its id. I see in the > docs > https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html > there is a serialize example where 2 schemas are registered. > > I would suggest aiming to copy what the confluent DeSer libraries do > rather than having a schema id hard coded in the config. > > WDYT? > Kind regards, David. > > From: Kevin Lam > Date: Tuesday, 26 March 2024 at 20:06 > To: dev@flink.apache.org > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf > Confluent Format > Thanks Anupam! Looking forward to it. > > On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal > > wrote: > > > Hi Kevin, > > > > Thanks, these are some great points. > > Just to clarify, I do agree that the subject should be an option (like in > > the case of RegistryAvroFormatFactory). > > We could fallback to subject and auto-register schemas, if schema-Id not > > provided explicitly. > > In general, I think it would be good to be more explicit about the > schemas > > used ( > > > > > https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration > > < > > > https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration > > > > > ). > > This would also help prevent us from overriding the ids in incompatible > > ways. > > > > Under the current implementation of FlinkToProtoSchemaConverter we might > > end up overwriting the field-Ids. > > If we are able to locate a prior schema, the approach you outlined makes > a > > lot of sense. > > Let me explore this a bit further and get back(in terms of feasibility). > > > > Thanks again! > > - Anupam > > > > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam > > > wrote: > > > > > Hi Anupam, > > > > > > Thanks again for your work on contributing this feature back. > > > > > > Sounds good re: the refactoring/re-organizing. > > > > > > Regarding the schema-id, in my opinion this should NOT be a > configuration > > > option on the format. We should be able to deterministically map the > > Flink > > > type to the ProtoSchema and register that with the Schema Registry. > > > > > > I think it can make sense to provide the `subject` as a parameter, so > > that > > > the serialization format can look up existing schemas. > > > > > > This would be used in something I mentioned something related above > > > > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it > would > > be > > > > nice if we are idiomatic about not renumbering them in incompatible > > ways, > > > > similar to what's discussed on the Schema Registry issue here: > > > > https://github.com/confluentinc/schema-registry/issues/2551 > > > > > > > > > When we construct the ProtobufSchema from the Flink LogicalType, we > > > shouldn't renumber the field ids in an incompatible way, so one > approach > > > would be to use the subject to look up the most recent version, and use > > > that to evolve the field ids correctly. > > > > > > > > > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal < > > anupam.aggar...@gmail.com > > > > > > > wrote: > > > > > > > Hi Kevin, > > > > > > > > Thanks for starting the discussion on this. > > > > I will be working on contributing this feature back (This was > developed > > > by > > > > Dawid Wysakowicz and others at Confluent). > > > > > > > > I have opened a (very initial) draft PR here > > > > https://github.com/apache/flink/pull/24482 with our current > > > > implementation. > > > > Thanks for the feedback on the PR, I haven’t gotten around to > > > > re-organizing/refactoring the classes yet, but it would be inline > with > > > some > > > > of your comments. > > > > > > > > On the overall approach there are some
[jira] [Created] (FLINK-35098) Incorrect results for queries like "10 >= y" on tables using Filesystem connector and Orc format
Andrey Gaskov created FLINK-35098: - Summary: Incorrect results for queries like "10 >= y" on tables using Filesystem connector and Orc format Key: FLINK-35098 URL: https://issues.apache.org/jira/browse/FLINK-35098 Project: Flink Issue Type: Bug Components: Connectors / ORC, Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3, 1.15.4, 1.14.6, 1.13.6, 1.12.7 Reporter: Andrey Gaskov When working with ORC files, there is an issue with evaluation of SQL queries containing expressions with a literal as the first operand. Specifically, the query *10 >= y* does not always return the correct result. This test added to OrcFileSystemITCase.java fails on the second check: {code:java} @TestTemplate void testOrcFilterPushDownLiteralFirst() throws ExecutionException, InterruptedException { super.tableEnv() .executeSql("insert into orcLimitTable values('a', 10, 10)") .await(); List expected = Collections.singletonList(Row.of(10)); check("select y from orcLimitTable where y <= 10", expected); check("select y from orcLimitTable where 10 >= y", expected); } Results do not match for query: select y from orcLimitTable where 10 >= y Results == Correct Result - 1 == == Actual Result - 0 == !+I[10] {code} The checks are equivalent and should evaluate to the same result. But the second query doesn't return the record with y=10. The table is defined as: {code:java} create table orcLimitTable ( x string, y int, a int) with ( 'connector' = 'filesystem', 'path' = '/tmp/junit4374176500101507155/junit7109291529844202275/', 'format'='orc'){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Question around Flink's AdaptiveBatchScheduler
Let me state why I think "*jobmanager.adaptive-batch-sche* *duler.default-source-parallelism*" should not be bound by the " *jobmanager.adaptive-batch-sche**duler.max-parallelism*". - Source vertex is unique and does not have any upstream vertices - Downstream vertices read shuffled data partitioned by key, which is not the case for the Source vertex - Limiting source parallelism by downstream vertices' max parallelism is incorrect If we say for ""semantic consistency" the source vertex parallelism has to be bound by the overall job's max parallelism, it can lead to following issues: - High filter selectivity with huge amounts of data to read - setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that source parallelism can be set higher can lead to small blocks and sub-optimal performance. - Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" requires careful tuning of network buffer configurations which is unnecessary in cases where it is not required just so that the source parallelism can be set high. Regards Venkata krishnan On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee wrote: > Hello Venkata krishnan, > > I think the term "semantic inconsistency" defined by > jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a > uniform upper limit on parallelism across all vertices within a job. As the > source vertices are part of the global execution graph, they should also > respect this rule to ensure consistent application of parallelism > constraints. > > Best, > Junrui > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > > > Gentle bump on this question. cc @Becket Qin as > > well. > > > > Regards > > Venkata krishnan > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > vsowr...@asu.edu> wrote: > > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few > > > follow up questions. > > > > > > > Source can actually ignore this limit > > > because it has no upstream, but this will lead to semantic > inconsistency. > > > > > > Lijie, can you please elaborate on the above comment further? What do > you > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > Secondly, we first need to limit the max parallelism of (downstream) > > > vertex, and then we can decide how many subpartitions (upstream vertex) > > > should produce. The limit should be effective, otherwise some > downstream > > > tasks will have no data to process. > > > > > > This makes sense in the context of any other vertices other than the > > > source vertex. As you mentioned above ("Source can actually ignore this > > > limit because it has no upstream"), therefore I feel " > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need > not > > > be upper bounded by > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > > > Regards > > > Venkata krishnan > > > > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee > wrote: > > > > > >> Hi Venkat, > > >> > > >> As Lijie mentioned, in Flink, the parallelism is required to be less > > than > > >> or equal to the maximum parallelism. The config option > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be > > set > > >> as the source's parallelism and max-parallelism, respectively. > > Therefore, > > >> the check failed situation you encountered is in line with the > > >> expectations. > > >> > > >> Best, > > >> Junrui > > >> > > >> Lijie Wang 于2024年2月29日周四 17:35写道: > > >> > > >> > Hi Venkat, > > >> > > > >> > >> default-source-parallelism config should be independent from the > > >> > max-parallelism > > >> > > > >> > Actually, it's not. > > >> > > > >> > Firstly, it's obvious that the parallelism should be less than or > > equal > > >> to > > >> > the max parallelism(both literally and execution). The > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used > as > > >> the > > >> > max parallelism for a vertex if you don't set max parallelism for it > > >> > individually (Just like the source in your case). > > >> > > > >> > Secondly, we first need to limit the max parallelism of (downstream) > > >> > vertex, and then we can decide how many subpartitions (upstream > > vertex) > > >> > should produce. The limit should be effective, otherwise some > > downstream > > >> > tasks will have no data to process. Source can actually ignore this > > >> limit > > >> > because it has no upstream, but this will lead to semantic > > >> inconsistency. > > >> > > > >> > Best, > > >> > Lijie > > >> > > > >> > Venkatakrishnan Sowrirajan 于2024年2月29日周四 > 05:49写道: > > >> > > > >> > > Hi Flink devs, > > >> > > > > >> > > With Flink's AdaptiveBatchScheduler > > >> > > < > > >> > > > > >> > > > >> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scalin