Hi Timo

Thank you for your advice. It is truely a typo. After I fix it, the same
exception remains.

But when I add the inAppendMode() to the StreamTableDescriptor, the
exception disappears, and it can find the proper kafka08factory.

And another exception turns out.
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('-' (code 45)): Expected space separating root-level
values
 at [Source: [B@69e1cfbe; line: 1, column: 6]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2355)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:94)

But actually, I produced the json data to the topic, why flink can
not deserialize it? It is weird.

Yours
Joshua

On Fri, Jan 11, 2019 at 11:02 PM Timo Walther <[email protected]> wrote:

> Hi Jashua,
>
> according to the property list, you passed "connector.version=0.10" so a
> Kafka 0.8 factory will not match.
>
> Are you sure you are compiling the right thing? There seems to be a
> mismatch between your screenshot and the exception.
>
> Regards,
> Timo
>
> Am 11.01.19 um 15:43 schrieb Joshua Fan:
>
> Hi,
>
> I want to test flink sql locally by consuming kafka data in flink 1.7, but
> it turns out an exception like below.
>
> Exception in thread "main"
>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
>>> a suitable table factory for
>>> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
>>
>> the classpath.
>>
>>
>>> Reason: No context matches.
>>
>>
>>> The following properties are requested:
>>
>> connector.properties.0.key=fetch.message.max.bytes
>>
>> connector.properties.0.value=10485760
>>
>> connector.properties.1.key=zookeeper.connect
>>
>> connector.properties.1.value=10.xxx.:2181/kafka
>>
>> connector.properties.2.key=group.id
>>
>> connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
>>
>> connector.properties.3.key=bootstrap.servers
>>
>> connector.properties.3.value=10.xxx:9092
>>
>> connector.property-version=1
>>
>> connector.startup-mode=latest-offset
>>
>> connector.topic=-flink-test
>>
>> connector.type=kafka
>>
>> connector.version=0.10
>>
>> format.derive-schema=true
>>
>> format.property-version=1
>>
>> format.type=json
>>
>> schema.0.name=rideId
>>
>> schema.0.type=VARCHAR
>>
>> schema.1.name=lon
>>
>> schema.1.type=VARCHAR
>>
>>
>>> The following factories have been considered:
>>
>> org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
>>
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>>
>> org.apache.flink.formats.json.JsonRowFormatFactory
>>
>>
>>> at
>>> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
>>
>> at
>>> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
>>
>> at
>>> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
>>
>> at
>>> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
>>
>> at
>>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
>>
>> at TableSourceFinder.main(TableSourceFinder.java:40)
>>
>>
> here is my code:
>
> public static void main(String[] args) throws Exception{
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();        
> StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env);     
>    Kafka kafka = new Kafka();        Properties properties = new 
> Properties();        String zkString = "10.xxx:2181/kafka";        String 
> brokerList = "10.xxx:9092";        
> properties.setProperty("fetch.message.max.bytes", "10485760");        
> properties.setProperty("group.id", UUID.randomUUID().toString());        
> properties.setProperty("zookeeper.connect", zkString);        
> properties.setProperty("bootstrap.servers", brokerList);        
> kafka.version("0.8").topic("flink-test").properties(properties);        
> kafka.startFromLatest();        stEnv.connect(kafka).withSchema(
>                 new Schema()
>                         .field("rideId", Types.STRING())
>                         .field("lon", Types.STRING()))
>                 .withFormat(new Json().deriveSchema())
>
>                 .registerTableSource("test");        Table table = 
> stEnv.sqlQuery("select rideId from test");        DataStream ds = 
> ((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv).
>                 toAppendStream(table,Types.STRING());        ds.print();      
>   env.execute("KafkaSql");
>
> }
>
>
> And here is my pom.xml
>
> <dependency>    <groupId>org.apache.flink</groupId>    
> <artifactId>flink-table_2.11</artifactId>    
> <version>${flink.version}</version></dependency><dependency>    
> <groupId>org.apache.flink</groupId>    <artifactId>flink-json</artifactId>    
> <version>${flink.version}</version>
>
> </dependency>
>
> <dependency>    <groupId>org.apache.flink</groupId>    
> <artifactId>flink-connector-kafka-0.8_2.11</artifactId>    
> <version>${flink.version}</version></dependency>
>
>
> In my opinion, I have all the lib in pom, don't know why it would fail in
> test locally.
>
> Thank you for any hints.
>
> Yours
> Joshua
>
>
>

Reply via email to