[
https://issues.apache.org/jira/browse/KAFKA-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17058990#comment-17058990
]
John Roesler commented on KAFKA-9636:
-------------------------------------
Aha! You gave me a clue. The serde in your code doesn't do bounds-checking on
the array, it just returns the backing array (which has the problem I described
above).
Give this a shot:
{code:java}
val bbser = new ByteBufferSerializer
val bbdes = new ByteBufferDeserializer
implicit val jsonSerde = fromFn(
(json: Json) => {
val buffer: ByteBuffer = Pickle.intoBytes(json)
bbser.serialize(null, buffer)
},
(data: Array[Byte]) => {
val buffer = bbdes.deserialize(null, data)
Option(Unpickle[Json].fromBytes(buffer))
}
){code}
This is just a shortcut; I happen to know that the ByteBufferDe/Serializer does
do proper bounds checking.
I ran this a bunch of times without failure.
I did find one failure, which looks like there's still something wrong with the
datagen:
{code:java}
TestFailedException was thrown during property evaluation.
Message: TestOutputTopic[topic='output-topic',
keyDeserializer=StringDeserializer, valueDeserializer=anon$2, size=0] was empty
for (topics, keys, records) and topology
List((TestInputTopic[topic='input-topic-1', keySerializer=StringSerializer,
valueSerializer=anon$1], w, {
"" : -72720317283481706220422921637065353877103225292369802784
})) and
Topologies: ...{code}
Notice how there's only one input record.
> Simple join of two KTables fails
> --------------------------------
>
> Key: KAFKA-9636
> URL: https://issues.apache.org/jira/browse/KAFKA-9636
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.1
> Reporter: Paul Snively
> Assignee: John Roesler
> Priority: Major
> Attachments: merge_issue.zip
>
>
> Attempting to join two KTables yields a `Topology` that, when tested with
> `TopologyTestDriver` by adding records to the two `TestInputTopic`s, results
> in an empty `TestOutputTopic`.
> I'm attaching a very small reproduction. The code is in Scala. The project is
> therefore an "sbt" project. You can reproduce the results from your shell
> with `sbt test`. The failure output will include the `describe` of the
> `Topology` in question.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)