Treating the BYTES schema differently (i.e. forbid uploading it) than other schemas will be confusing to users.
If the BYTES schema is the default schema for a new topic, then how about saving a BYTES schema in the registry instead of saving nothing? On Mon, Apr 17, 2023 at 8:42 PM PengHui Li <peng...@apache.org> wrote: > I have pushed out a PR to revert this change first. > > https://github.com/apache/pulsar/pull/20123 > > Please help review. > > Thanks, > Penghui > > On Tue, Apr 18, 2023 at 11:36 AM PengHui Li <peng...@apache.org> wrote: > > > > Flink uses the schema to store some kv based properties. If we can > > expose all the operations of the topic level metadata store. We can > > truly drop the use of uploading the BYTES schema in the Flink > > connector. > > > > After 3.0.0, Pulsar will provide the ability to set properties for a > topic > > https://github.com/apache/pulsar/pull/17238. > > > > Thanks, > > Penghui > > > > On Mon, Apr 17, 2023 at 11:44 PM Yufan Sheng <syh...@gmail.com> wrote: > > > >> >I support reverting the PR first and then looking for a long-term > >> solution. > >> > >> Flink uses the schema to store some kv based properties. If we can > >> expose all the operations of the topic level metadata store. We can > >> truly drop the use of uploading the BYTES schema in the Flink > >> connector. > >> > >> On Mon, Apr 17, 2023 at 12:12 PM PengHui Li <peng...@apache.org> wrote: > >> > > >> > I'm sorry. I have provided the wrong description of the changes from > >> the PR. > >> > The PR has changed the server side, so it's hard for users to ask to > >> > upgrade > >> > the Flink connector if the pulsar server is upgraded. > >> > > >> > I support reverting the PR first and then looking for a long-term > >> solution. > >> > > >> > Best, > >> > Penghui > >> > > >> > On Mon, Apr 17, 2023 at 10:16 AM PengHui Li <peng...@apache.org> > wrote: > >> > > >> > > > Did we consider > >> > > making a call to upload a Bytes schema a no-op? > >> > > > >> > > It was a BUG that the PR fixed. > >> > > You will not be able to get the uploaded schema as expected. > >> > > Please take a look at the details from the GitHub issue. > >> > > > >> > > What is the challenge for the Flink connector now? > >> > > The changes only take effect on the client side. > >> > > So, the issue will only happen if they use a new connector. > >> > > Upgrading the Pulsar server will not make any impaction? > >> > > Is it better to fix the Flink connector? > >> > > IMO, the Flink connector should not use admin-api > >> > > to upload a BYTE schema. It's a redundant operation. > >> > > Pulsar will do nothing. > >> > > > >> > > What do you think about a long-term solution? > >> > > > >> > > Regards > >> > > - Penghui > >> > > > >> > > On Sat, Apr 15, 2023 at 12:52 AM Michael Marshall < > >> mmarsh...@apache.org> > >> > > wrote: > >> > > > >> > >> I think the primary point is that unless there is a strict need, we > >> > >> shouldn't introduce breaking changes to the implementation. Why did > >> we > >> > >> choose to forbid users from uploading a Bytes schema? Did we > consider > >> > >> making a call to upload a Bytes schema a no-op? > >> > >> > >> > >> Thanks, > >> > >> Michael > >> > >> > >> > >> On Fri, Apr 14, 2023 at 10:46 AM SiNan Liu <liusinan1...@gmail.com > > > >> > >> wrote: > >> > >> > > >> > >> > 1. I don't know much about flink, but what I see here is that you > >> need > >> > >> to > >> > >> > save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so > >> it is > >> > >> > used to record the metadata information of the table. > >> > >> > **In > >> > >> > > >> > >> > >> > org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable** > >> > >> > ```java > >> > >> > @Override > >> > >> > public void createTable(ObjectPath tablePath, > >> ResolvedCatalogTable > >> > >> > table) > >> > >> > throws PulsarAdminException { > >> > >> > // only allow creating table in explict database, the > >> topic is > >> > >> used > >> > >> > to save table > >> > >> > // information > >> > >> > if (!isExplicitDatabase(tablePath.getDatabaseName())) { > >> > >> > throw new CatalogException( > >> > >> > String.format( > >> > >> > "Can't create explict table under > >> pulsar > >> > >> > tenant/namespace: %s because it's a native database", > >> > >> > tablePath.getDatabaseName())); > >> > >> > } > >> > >> > > >> > >> > String mappedTopic = > >> > >> findExplicitTablePlaceholderTopic(tablePath); > >> > >> > pulsarAdminTool.createTopic(mappedTopic, 1); > >> > >> > > >> > >> > // use pulsar schema to store explicit table information > >> > >> > try { > >> > >> > SchemaInfo schemaInfo = > >> > >> > TableSchemaHelper.generateSchemaInfo(table.toProperties()); > >> > >> > pulsarAdminTool.uploadSchema(mappedTopic, > schemaInfo); > >> > >> > } catch (Exception e) { > >> > >> > // delete topic if table info cannot be persisted > >> > >> > try { > >> > >> > pulsarAdminTool.deleteTopic(mappedTopic); > >> > >> > } catch (PulsarAdminException ex) { > >> > >> > // do nothing > >> > >> > } > >> > >> > e.printStackTrace(); > >> > >> > throw new CatalogException("Can't store table > >> metadata"); > >> > >> > } > >> > >> > } > >> > >> > ``` > >> > >> > > >> > >> > 2. In > >> `TableSchemaHelper.generateSchemaInfo(table.toProperties());`: > >> > >> > Why must SchemaType.BYTE be used? Is it OK to use > SchemaType.JSON? > >> > >> > ```java > >> > >> > public static SchemaInfo generateSchemaInfo(Map<String, > String> > >> > >> > properties) > >> > >> > throws JsonProcessingException { > >> > >> > ObjectMapper mapper = new ObjectMapper(); > >> > >> > // json > >> > >> > String json = mapper.writeValueAsString(properties); > >> > >> > return SchemaInfoImpl.builder() > >> > >> > .name("flink_table_schema") > >> > >> > //.type(SchemaType.BYTES) > >> > >> > //.schema(mapper.writeValueAsBytes(properties)) > >> > >> > // SchemaType.JSON > >> > >> > .type(SchemaType.JSON) > >> > >> > .schema(json.getBytes()) > >> > >> > .build(); > >> > >> > } > >> > >> > ``` > >> > >> > > >> > >> > 3. Sorry, I'm a newbie. Can an experienced developer help with > >> this? > >> > >> Thanks! > >> > >> > > >> > >> > > >> > >> > Thanks, > >> > >> > sinan > >> > >> > > >> > >> > > >> > >> > Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道: > >> > >> > > >> > >> > > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet > >> > >> > > <bornet.ch...@gmail.com> ha scritto: > >> > >> > > > > >> > >> > > > The change was not reverted for 3.0.0-RC so in the current > >> state the > >> > >> > > > Flink Pulsar catalog won't work with Pulsar 3.0. > >> > >> > > > To give more details, Flink Pulsar stores Flink Table > metadata > >> in > >> > >> the > >> > >> > > > metadata of BYTES schema topics (those topics don't have > >> messages, > >> > >> > > > only the schema metadata of these topics is used). > >> > >> > > > >> > >> > > I think that we should not introduce this breaking change, it > >> will be > >> > >> > > a big pain for the users in the Flink ecosystem > >> > >> > > > >> > >> > > Enrico > >> > >> > > > >> > >> > > > > >> > >> > > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> > a > >> > >> écrit : > >> > >> > > > > > >> > >> > > > > Hi SiNan, > >> > >> > > > > > >> > >> > > > > In the flink world, we don't always rely on the schema > >> information > >> > >> > > > > provided by Pulsar or other connector systems. Flink > >> application > >> > >> has > >> > >> > > > > its own (de)serialization schema logic, which treats the > >> messages > >> > >> only > >> > >> > > > > in a binary format like a byte array. > >> > >> > > > > > >> > >> > > > > In flink-connector-pulsar, we only use the schema when the > >> users > >> > >> want > >> > >> > > > > to do some evolution check. Otherwise, we will only send > >> messages > >> > >> in > >> > >> > > > > BYTES schema. > >> > >> > > > > > >> > >> > > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu < > >> > >> liusinan1...@gmail.com> > >> > >> > > wrote: > >> > >> > > > > > > >> > >> > > > > > Hi yufan. > >> > >> > > > > > Can you describe a bit the usage scenario of byte schema > in > >> > >> > > > > > flink-connector-pulsa? > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > Thanks, > >> > >> > > > > > sinan > >> > >> > > > > > > >> > >> > > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道: > >> > >> > > > > > > >> > >> > > > > > > As the flink-connector-pulsar developer, I don't want > to > >> > >> disable > >> > >> > > the > >> > >> > > > > > > BYTES schema upload. In my opinion, using BYTES schema > >> means > >> > >> the > >> > >> > > users > >> > >> > > > > > > want to bypass the schema check and handle the schema > >> > >> validation by > >> > >> > > > > > > themselves. > >> > >> > > > > > > > >> > >> > > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu < > >> > >> liusinan1...@gmail.com> > >> > >> > > wrote: > >> > >> > > > > > > > > >> > >> > > > > > > > Hi, everyone. > >> > >> > > > > > > > When a user uploads bytes schema. We can warn the > user > >> and > >> > >> skip > >> > >> > > uploading > >> > >> > > > > > > > bytes schema. > >> > >> > > > > > > > Also check to see if the topic has a schema other > than > >> > >> bytes. > >> > >> > > > > > > > 1. If yes, warn the user that it is not necessary to > >> upload > >> > >> > > bytes schema. > >> > >> > > > > > > > You can subscribe to a topic using bytes schema. > >> > >> > > > > > > > 2. If there is no schema, warn the user that the > topic > >> does > >> > >> not > >> > >> > > have a > >> > >> > > > > > > > schema. The default is bytes schema, and there is no > >> need to > >> > >> > > upload it. > >> > >> > > > > > > > Rather than simply throwing an exception rejecting > the > >> > >> upload > >> > >> > > bytes > >> > >> > > > > > > schema. > >> > >> > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > > Thanks, > >> > >> > > > > > > > sinan > >> > >> > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于 > >> 2023年3月28日周二 > >> > >> > > 上午1:15写道: > >> > >> > > > > > > > > >> > >> > > > > > > > > This change broke the Flink SQL Pulsar connector: > >> > >> > > > > > > > > https://github.com/streamnative/flink/issues/270 > >> > >> > > > > > > > > So I propose to revert it. > >> > >> > > > > > > > > > >> > >> > > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong < > >> > >> > > labulad...@foxmail.com> a > >> > >> > > > > > > écrit : > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > Hi pulsar community, > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > I'd like to discuss the behavior of schema > >> uploading, > >> > >> for > >> > >> > > more > >> > >> > > > > > > context > >> > >> > > > > > > > > see https://github.com/apache/pulsar/issues/18825 > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > I think that forbidding users to upload `BYTES` > >> schema > >> > >> is a > >> > >> > > > > > > recommended > >> > >> > > > > > > > > way to solve this issue. But this may change the > >> existing > >> > >> > > behavior, so > >> > >> > > > > > > do > >> > >> > > > > > > > > you have any suggestion about this issue? > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > Thanks, > >> > >> > > > > > > > > > Donglai > >> > >> > > > > > > > > > >> > >> > > > > > > > >> > >> > > > >> > >> > >> > > > >> > > > -- Best Regards, Neng