I think the primary point is that unless there is a strict need, we shouldn't introduce breaking changes to the implementation. Why did we choose to forbid users from uploading a Bytes schema? Did we consider making a call to upload a Bytes schema a no-op?
Thanks, Michael On Fri, Apr 14, 2023 at 10:46 AM SiNan Liu <liusinan1...@gmail.com> wrote: > > 1. I don't know much about flink, but what I see here is that you need to > save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so it is > used to record the metadata information of the table. > **In > org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable** > ```java > @Override > public void createTable(ObjectPath tablePath, ResolvedCatalogTable > table) > throws PulsarAdminException { > // only allow creating table in explict database, the topic is used > to save table > // information > if (!isExplicitDatabase(tablePath.getDatabaseName())) { > throw new CatalogException( > String.format( > "Can't create explict table under pulsar > tenant/namespace: %s because it's a native database", > tablePath.getDatabaseName())); > } > > String mappedTopic = findExplicitTablePlaceholderTopic(tablePath); > pulsarAdminTool.createTopic(mappedTopic, 1); > > // use pulsar schema to store explicit table information > try { > SchemaInfo schemaInfo = > TableSchemaHelper.generateSchemaInfo(table.toProperties()); > pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo); > } catch (Exception e) { > // delete topic if table info cannot be persisted > try { > pulsarAdminTool.deleteTopic(mappedTopic); > } catch (PulsarAdminException ex) { > // do nothing > } > e.printStackTrace(); > throw new CatalogException("Can't store table metadata"); > } > } > ``` > > 2. In `TableSchemaHelper.generateSchemaInfo(table.toProperties());`: > Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON? > ```java > public static SchemaInfo generateSchemaInfo(Map<String, String> > properties) > throws JsonProcessingException { > ObjectMapper mapper = new ObjectMapper(); > // json > String json = mapper.writeValueAsString(properties); > return SchemaInfoImpl.builder() > .name("flink_table_schema") > //.type(SchemaType.BYTES) > //.schema(mapper.writeValueAsBytes(properties)) > // SchemaType.JSON > .type(SchemaType.JSON) > .schema(json.getBytes()) > .build(); > } > ``` > > 3. Sorry, I'm a newbie. Can an experienced developer help with this? Thanks! > > > Thanks, > sinan > > > Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道: > > > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet > > <bornet.ch...@gmail.com> ha scritto: > > > > > > The change was not reverted for 3.0.0-RC so in the current state the > > > Flink Pulsar catalog won't work with Pulsar 3.0. > > > To give more details, Flink Pulsar stores Flink Table metadata in the > > > metadata of BYTES schema topics (those topics don't have messages, > > > only the schema metadata of these topics is used). > > > > I think that we should not introduce this breaking change, it will be > > a big pain for the users in the Flink ecosystem > > > > Enrico > > > > > > > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> a écrit : > > > > > > > > Hi SiNan, > > > > > > > > In the flink world, we don't always rely on the schema information > > > > provided by Pulsar or other connector systems. Flink application has > > > > its own (de)serialization schema logic, which treats the messages only > > > > in a binary format like a byte array. > > > > > > > > In flink-connector-pulsar, we only use the schema when the users want > > > > to do some evolution check. Otherwise, we will only send messages in > > > > BYTES schema. > > > > > > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <liusinan1...@gmail.com> > > wrote: > > > > > > > > > > Hi yufan. > > > > > Can you describe a bit the usage scenario of byte schema in > > > > > flink-connector-pulsa? > > > > > > > > > > > > > > > Thanks, > > > > > sinan > > > > > > > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道: > > > > > > > > > > > As the flink-connector-pulsar developer, I don't want to disable > > the > > > > > > BYTES schema upload. In my opinion, using BYTES schema means the > > users > > > > > > want to bypass the schema check and handle the schema validation by > > > > > > themselves. > > > > > > > > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <liusinan1...@gmail.com> > > wrote: > > > > > > > > > > > > > > Hi, everyone. > > > > > > > When a user uploads bytes schema. We can warn the user and skip > > uploading > > > > > > > bytes schema. > > > > > > > Also check to see if the topic has a schema other than bytes. > > > > > > > 1. If yes, warn the user that it is not necessary to upload > > bytes schema. > > > > > > > You can subscribe to a topic using bytes schema. > > > > > > > 2. If there is no schema, warn the user that the topic does not > > have a > > > > > > > schema. The default is bytes schema, and there is no need to > > upload it. > > > > > > > Rather than simply throwing an exception rejecting the upload > > bytes > > > > > > schema. > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > sinan > > > > > > > > > > > > > > > > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于 2023年3月28日周二 > > 上午1:15写道: > > > > > > > > > > > > > > > This change broke the Flink SQL Pulsar connector: > > > > > > > > https://github.com/streamnative/flink/issues/270 > > > > > > > > So I propose to revert it. > > > > > > > > > > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong < > > labulad...@foxmail.com> a > > > > > > écrit : > > > > > > > > > > > > > > > > > > Hi pulsar community, > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to discuss the behavior of schema uploading, for > > more > > > > > > context > > > > > > > > see https://github.com/apache/pulsar/issues/18825 > > > > > > > > > > > > > > > > > > > > > > > > > > > I think that forbidding users to upload `BYTES` schema is a > > > > > > recommended > > > > > > > > way to solve this issue. But this may change the existing > > behavior, so > > > > > > do > > > > > > > > you have any suggestion about this issue? > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Donglai > > > > > > > > > > > > > > > >