[ https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-5799. ------------------------------ Resolution: Auto Closed Closing Apache Storm - Kafka Spout related query. If this still issue, please contact storm mailing list. > New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme > ---------------------------------------------------- > > Key: KAFKA-5799 > URL: https://issues.apache.org/jira/browse/KAFKA-5799 > Project: Kafka > Issue Type: New Feature > Affects Versions: 0.11.0.0 > Environment: apache-storm 1.1.0 > Reporter: Juhong NamGung > Priority: Minor > Attachments: 1.JPG, 2.JPG, bakvs.JPG > > > I try to integrate Kafka with Apache Strom. > I want to get data from Kafka, using KafkaSpout in Apache Storm. > To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. > (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka > gets transformed into a storm tuple) > I want to get both key and value in Kafka, so I used to KafkaSpoutConfig > ‘KeyValueSchemeAsMultiScheme’. > KeyValueSchemeAsMultiScheme’s Constructor is as follows. > [^2.JPG] > But, as you can see in the picture, implementing classes of Interface > KeyValueScheme are only StringKeyValueScheme. > [^1.JPG] > Using StringKeyValueShceme causes problems when importing Integer data from > Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String. > So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to > ByteArray. > ByteArrayKeyValueScheme imports data as BtyeArray. > If you use ByteArrayKeyValueScheme, you can import data regardless of data > type from Kafka without error. > (But, you should convert data type ByteArray to data type that you want(e.g. > String, Integer...)) > [^bakvs.JPG] > {code:java} > // Some comments here > import java.nio.ByteBuffer; > import java.util.List; > import org.apache.storm.kafka.KeyValueScheme; > import org.apache.storm.spout.RawScheme; > import org.apache.storm.tuple.Values; > import com.google.common.collect.ImmutableMap; > public class ByteArrayKeyValueScheme extends RawScheme implements > KeyValueScheme { > @Override > public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer > value) { > // TODO Auto-generated method stub > if (key == null) { > return deserialize(value); > } > Object keytuple = deserialize(key).get(0); > Object valuetuple = deserialize(value).get(0); > return new Values(ImmutableMap.of(keytuple, valuetuple)); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)