1. I don't know much about flink, but what I see here is that you need to
save a `ResolvedCatalogTable`, which I see has `CatalogTable`, so it is
used to record the metadata information of the table.
**In
org.apache.flink.connector.pulsar.table.catalog.impl.PulsarCatalogSupport#createTable**
```java
    @Override
    public void createTable(ObjectPath tablePath, ResolvedCatalogTable
table)
            throws PulsarAdminException {
        // only allow creating table in explict database, the topic is used
to save table
        // information
        if (!isExplicitDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException(
                    String.format(
                            "Can't create explict table under pulsar
tenant/namespace: %s because it's a native database",
                            tablePath.getDatabaseName()));
        }

        String mappedTopic = findExplicitTablePlaceholderTopic(tablePath);
        pulsarAdminTool.createTopic(mappedTopic, 1);

        // use pulsar schema to store explicit table information
        try {
            SchemaInfo schemaInfo =
TableSchemaHelper.generateSchemaInfo(table.toProperties());
            pulsarAdminTool.uploadSchema(mappedTopic, schemaInfo);
        } catch (Exception e) {
            // delete topic if table info cannot be persisted
            try {
                pulsarAdminTool.deleteTopic(mappedTopic);
            } catch (PulsarAdminException ex) {
                // do nothing
            }
            e.printStackTrace();
            throw new CatalogException("Can't store table metadata");
        }
    }
```

2. In `TableSchemaHelper.generateSchemaInfo(table.toProperties());`:
Why must SchemaType.BYTE be used? Is it OK to use SchemaType.JSON?
```java
    public static SchemaInfo generateSchemaInfo(Map<String, String>
properties)
            throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        // json
        String json = mapper.writeValueAsString(properties);
        return SchemaInfoImpl.builder()
                .name("flink_table_schema")
                //.type(SchemaType.BYTES)
                //.schema(mapper.writeValueAsBytes(properties))
                // SchemaType.JSON
                .type(SchemaType.JSON)
                .schema(json.getBytes())
                .build();
    }
```

3. Sorry, I'm a newbie. Can an experienced developer help with this? Thanks!


Thanks,
sinan


Enrico Olivelli <eolive...@gmail.com> 于2023年4月14日周五 22:50写道:

> Il giorno ven 14 apr 2023 alle ore 16:48 Christophe Bornet
> <bornet.ch...@gmail.com> ha scritto:
> >
> > The change was not reverted for 3.0.0-RC so in the current state the
> > Flink Pulsar catalog won't work with Pulsar 3.0.
> > To give more details, Flink Pulsar stores Flink Table metadata in the
> > metadata of BYTES schema topics (those topics don't have messages,
> > only the schema metadata of these topics is used).
>
> I think that we should not introduce this breaking change, it will be
> a big pain for the users in the Flink ecosystem
>
> Enrico
>
> >
> > Le mer. 29 mars 2023 à 09:54, Yufan Sheng <syh...@gmail.com> a écrit :
> > >
> > > Hi SiNan,
> > >
> > > In the flink world, we don't always rely on the schema information
> > > provided by Pulsar or other connector systems. Flink application has
> > > its own (de)serialization schema logic, which treats the messages only
> > > in a binary format like a byte array.
> > >
> > > In flink-connector-pulsar, we only use the schema when the users want
> > > to do some evolution check. Otherwise, we will only send messages in
> > > BYTES schema.
> > >
> > > On Tue, Mar 28, 2023 at 10:06 AM SiNan Liu <liusinan1...@gmail.com>
> wrote:
> > > >
> > > > Hi yufan.
> > > > Can you describe a bit the usage scenario of byte schema in
> > > > flink-connector-pulsa?
> > > >
> > > >
> > > > Thanks,
> > > > sinan
> > > >
> > > > Yufan Sheng <syh...@gmail.com> 于 2023年3月28日周二 上午9:53写道:
> > > >
> > > > > As the flink-connector-pulsar developer, I don't want to disable
> the
> > > > > BYTES schema upload. In my opinion, using BYTES schema means the
> users
> > > > > want to bypass the schema check and handle the schema validation by
> > > > > themselves.
> > > > >
> > > > > On Tue, Mar 28, 2023 at 8:58 AM SiNan Liu <liusinan1...@gmail.com>
> wrote:
> > > > > >
> > > > > > Hi, everyone.
> > > > > > When a user uploads bytes schema. We can warn the user and skip
> uploading
> > > > > > bytes schema.
> > > > > > Also check to see if the topic has a schema other than bytes.
> > > > > > 1. If yes, warn the user that it is not necessary to upload
> bytes schema.
> > > > > > You can subscribe to a topic using bytes schema.
> > > > > > 2. If there is no schema, warn the user that the topic does not
> have a
> > > > > > schema. The default is bytes schema, and there is no need to
> upload it.
> > > > > > Rather than simply throwing an exception rejecting the upload
> bytes
> > > > > schema.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > sinan
> > > > > >
> > > > > >
> > > > > > Christophe Bornet <bornet.ch...@gmail.com> 于 2023年3月28日周二
> 上午1:15写道:
> > > > > >
> > > > > > > This change broke the Flink SQL Pulsar connector:
> > > > > > > https://github.com/streamnative/flink/issues/270
> > > > > > > So I propose to revert it.
> > > > > > >
> > > > > > > Le ven. 9 déc. 2022 à 11:57, labuladong <
> labulad...@foxmail.com> a
> > > > > écrit :
> > > > > > > >
> > > > > > > > Hi pulsar community,
> > > > > > > >
> > > > > > > >
> > > > > > > > I'd like to discuss the behavior of schema uploading, for
> more
> > > > > context
> > > > > > > see https://github.com/apache/pulsar/issues/18825
> > > > > > > >
> > > > > > > >
> > > > > > > > I think that forbidding users to upload `BYTES` schema is a
> > > > > recommended
> > > > > > > way to solve this issue. But this may change the existing
> behavior, so
> > > > > do
> > > > > > > you have any suggestion about this issue?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Donglai
> > > > > > >
> > > > >
>

Reply via email to