1. I don't know much about flink, but what I see here is that you need to save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so it is used to record the metadata information of the table. **In org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable** ```java @Override public void createTable(ObjectPath tablePath, ResolvedCatalogTable table) throws PulsarAdminException { // only allow creating table in explict database, the topic is used to save table // information if (!isExplicitDatabase(tablePath.getDatabaseName())) { throw new CatalogException( String.format( "Can't create explict table under pulsar tenant/namespace: %s because it's a native database", tablePath.getDatabaseName())); }
String mappedTopic = findExplicitTablePlaceholderTopic(tablePath); pulsarAdminTool.createTopic(mappedTopic, 1); // use pulsar schema to store explicit table information try { SchemaInfo schemaInfo = TableSchemaHelper.generateSchemaInfo(table.toProperties()); pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo); } catch (Exception e) { // delete topic if table info cannot be persisted try { pulsarAdminTool.deleteTopic(mappedTopic); } catch (PulsarAdminException ex) { // do nothing } e.printStackTrace(); throw new CatalogException("Can't store table metadata"); } } ``` 2. In `TableSchemaHelper.generateSchemaInfo(table.toProperties());`: Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON? ```java public static SchemaInfo generateSchemaInfo(Map<String, String> properties) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); // json String json = mapper.writeValueAsString(properties); return SchemaInfoImpl.builder() .name("flink_table_schema") //.type(SchemaType.BYTES) //.schema(mapper.writeValueAsBytes(properties)) // SchemaType.JSON .type(SchemaType.JSON) .schema(json.getBytes()) .build(); } ``` 3. Sorry, I'm a newbie. Can an experienced developer help with this? Thanks! Thanks, sinan Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道: > Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet > <bornet.ch...@gmail.com> ha scritto: > > > > The change was not reverted for 3.0.0-RC so in the current state the > > Flink Pulsar catalog won't work with Pulsar 3.0. > > To give more details, Flink Pulsar stores Flink Table metadata in the > > metadata of BYTES schema topics (those topics don't have messages, > > only the schema metadata of these topics is used). > > I think that we should not introduce this breaking change, it will be > a big pain for the users in the Flink ecosystem > > Enrico > > > > > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> a écrit : > > > > > > Hi SiNan, > > > > > > In the flink world, we don't always rely on the schema information > > > provided by Pulsar or other connector systems. Flink application has > > > its own (de)serialization schema logic, which treats the messages only > > > in a binary format like a byte array. > > > > > > In flink-connector-pulsar, we only use the schema when the users want > > > to do some evolution check. Otherwise, we will only send messages in > > > BYTES schema. > > > > > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <liusinan1...@gmail.com> > wrote: > > > > > > > > Hi yufan. > > > > Can you describe a bit the usage scenario of byte schema in > > > > flink-connector-pulsa? > > > > > > > > > > > > Thanks, > > > > sinan > > > > > > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道: > > > > > > > > > As the flink-connector-pulsar developer, I don't want to disable > the > > > > > BYTES schema upload. In my opinion, using BYTES schema means the > users > > > > > want to bypass the schema check and handle the schema validation by > > > > > themselves. > > > > > > > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <liusinan1...@gmail.com> > wrote: > > > > > > > > > > > > Hi, everyone. > > > > > > When a user uploads bytes schema. We can warn the user and skip > uploading > > > > > > bytes schema. > > > > > > Also check to see if the topic has a schema other than bytes. > > > > > > 1. If yes, warn the user that it is not necessary to upload > bytes schema. > > > > > > You can subscribe to a topic using bytes schema. > > > > > > 2. If there is no schema, warn the user that the topic does not > have a > > > > > > schema. The default is bytes schema, and there is no need to > upload it. > > > > > > Rather than simply throwing an exception rejecting the upload > bytes > > > > > schema. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > sinan > > > > > > > > > > > > > > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于 2023年3月28日周二 > 上午1:15写道: > > > > > > > > > > > > > This change broke the Flink SQL Pulsar connector: > > > > > > > https://github.com/streamnative/flink/issues/270 > > > > > > > So I propose to revert it. > > > > > > > > > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong < > labulad...@foxmail.com> a > > > > > écrit : > > > > > > > > > > > > > > > > Hi pulsar community, > > > > > > > > > > > > > > > > > > > > > > > > I'd like to discuss the behavior of schema uploading, for > more > > > > > context > > > > > > > see https://github.com/apache/pulsar/issues/18825 > > > > > > > > > > > > > > > > > > > > > > > > I think that forbidding users to upload `BYTES` schema is a > > > > > recommended > > > > > > > way to solve this issue. But this may change the existing > behavior, so > > > > > do > > > > > > > you have any suggestion about this issue? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Donglai > > > > > > > > > > > > >