[jira] [Created] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries

2018-03-25 Thread Francesco Guardiani (JIRA)
Francesco Guardiani created KAFKA-6712:
--

 Summary: Throw a specific exception with wrong topic name for 
interactive queries
 Key: KAFKA-6712
 URL: https://issues.apache.org/jira/browse/KAFKA-6712
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Francesco Guardiani


When you use the interactive queries state stores with a wrong topic name and 
you call the store() method, the client should throw an exception that explains 
that you have specified a wrong topic name. Now it throws an 
IllegalStateStoreException "the state store may have migrated to another 
instance." that is too generic and it's used also when the stream thread is not 
ready.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-25 Thread JIRA
Cemalettin Koç created KAFKA-6713:
-

 Summary: Provide an easy way replace store with a custom one on 
High-Level Streams DSL
 Key: KAFKA-6713
 URL: https://issues.apache.org/jira/browse/KAFKA-6713
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.1
Reporter: Cemalettin Koç


I am trying to use GlobalKTable with a custom store implementation. In my 
stores, I would like to store my `Category` entites and I would like to query 
them by their name as well. My custom store has some capabilities beyond `get` 
such as get by `name`. I also want to get all entries in a hierarchical way in 
a lazy fashion. I have other use cases as well.

 

In order to accomplish my task I had to implement  a custom 
`KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 

 
{code:java}
public class DelegatingByteStore implements KeyValueStore {

  private BytesTypeConverter converter;

  private KeyValueStore delegated;

  public DelegatingByteStore(KeyValueStore delegated, 
BytesTypeConverter converter) {
this.converter = converter;
this.delegated = delegated;
  }

  @Override
  public void put(Bytes key, byte[] value) {
delegated.put(converter.outerKey(key),
  converter.outerValue(value));
  }

  @Override
  public byte[] putIfAbsent(Bytes key, byte[] value) {
V v = delegated.putIfAbsent(converter.outerKey(key),
converter.outerValue(value));
return v == null ? null : value;
  }
  ..

{code}
 

 Type Converter:
{code:java}
public interface TypeConverter {

  IK innerKey(final K key);

  IV innerValue(final V value);

  List> innerEntries(final List> from);

  List> outerEntries(final List> from);

  V outerValue(final IV value);

  KeyValue outerKeyValue(final KeyValue from);

  KeyValueinnerKeyValue(final KeyValue entry);

  K outerKey(final IK ik);
}
{code}
 

This is unfortunately too cumbersome and hard to maintain.  

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [jira] [Created] (KAFKA-6709) broker failed to handle request due to OOM

2018-03-25 Thread zhenya Sun
why don't you increase your machine memory?




| |
zhenya Sun
邮箱:toke...@126.com
|

签名由 网易邮箱大师 定制

On 03/24/2018 17:27, Zou Tao (JIRA) wrote:
Zou Tao created KAFKA-6709:
--

Summary: broker failed to handle request due to OOM
Key: KAFKA-6709
URL: https://issues.apache.org/jira/browse/KAFKA-6709
Project: Kafka
 Issue Type: Bug
 Components: core
   Affects Versions: 1.0.1
   Reporter: Zou Tao


I have updated to release 1.0.1.

I set up cluster which have four brokers.
you could find the server.properties in the attachment.
There are about 150 topics, and about total 4000 partitions, ReplicationFactor 
is 2.
connctors are used to write/read data to/from brokers.
connecotr version is 0.10.1.
The average message size is 500B, and around 6 messages per seconds.
one of the broker keep report OOM, and can't handle request like:

[2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request 
\{replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[{partition=16,fetch_offset=51198,max_bytes=60728640},\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]}
 (kafka.server.KafkaApis)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at 
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
at 
org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
at scala.Option.map(Option.scala:146)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
at scala.Option.flatMap(Option.scala:171)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
at 
kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
at 
kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
at 
kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
at 
kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
at kafka.server.KafkaApis.handle(KafkaApis.scala:99)

and then lots of shrink ISR ( this broker is 1001).

[2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 
broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:00,286] INFO [Partition s1mme.data-72 broker=1001] Shrinking 
ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:00,286] INFO [Partition gnup.sink.status.storage.topic-17 
broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:43:00,287] INFO [Partition 
probessgsniups.sink.offset.storage.topic-4 broker=1001] Shrinking ISR from 
1001,1002 to 1001 (kafka.cluster.Partition)
[2018-03-24 13:4