Thanks, I´ll do some additional digging. Not sure if it´s relevant, but for
the record numerical offset based resetting works like a charm.

2018-06-19 23:42 GMT+02:00 Emmett Butler <emm...@parsely.com>:

> Thanks! I ask because I think it's possible that only having a single log
> segment (as your partition does) hamstrings the functionality of
> reset_offsets(). I haven't verified this experimentally, but I think it's
> possible. Maybe someone on the Kafka users group has insight.
>
> On Tue, Jun 19, 2018 at 1:55 AM, Moe <volland.mor...@gmail.com> wrote:
>
>> Thanks for getting back to me so speedily. To be frank, I do not know how
>> log segments differ from logs.
>>
>> *# This is a view into my logs folder:*
>>
>>
>> *# And this is the view into that folder for my offset_retrieval_test.001
>> partition (the topic in question):*
>>
>>
>> Please let me know if this isn't what you were asking for! The topic has
>> been auto created so all settings are default (7 days retention if my
>> memory serves me right, but I'm sure you know that better than me).
>>
>> I appreciate the time and effort,
>> Moritz
>>
>> 2018-06-18 18:27 GMT+02:00 Emmett Butler <emm...@parsely.com>:
>>
>>> How many log segments does this partition have, and what are the
>>> retention settings for the topic? You can find the log segments by looking
>>> in the directory pointed to by the log.dirs server configuration.
>>>
>>> On Mon, Jun 18, 2018 at 7:43 AM, Moe <volland.mor...@gmail.com> wrote:
>>>
>>>> *# Sorry, I should have mentioned that the timestamp I use is  one I
>>>> read from the server gui*
>>>>
>>>>
>>>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe:
>>>>>
>>>>> *Hi,*
>>>>>
>>>>>
>>>>> *I'm struggling to implement a time(stamp) based reset_offset and am
>>>>> hoping someone here can show me the light. In essence when I call
>>>>> reset_offset with a timestamp I get *
>>>>>
>>>>> *"OffsetOutOfRangeError" *
>>>>>
>>>>>
>>>>> *# Setup:*
>>>>> - Kafka:
>>>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker
>>>>> (Wurstmeister/Kafka image)
>>>>>
>>>>> - Consumer/Producer:
>>>>> Ubuntu17.04
>>>>> pykafka2.7.0
>>>>> python3.6
>>>>>
>>>>>
>>>>> *# Kafka Connection*
>>>>> client = KafkaClient(hosts='10.25.42.127:9092',
>>>>> broker_version="1.0.0")
>>>>> topic = client.topics['offset_retrieval_test.001'.encode()]
>>>>>
>>>>> *# Producer*
>>>>> delivery_reports=False
>>>>> auto_start=True
>>>>> linger_ms=10.0
>>>>>
>>>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_
>>>>> ms=linger_ms,auto_start=auto_start)
>>>>>
>>>>> for i in range(100):
>>>>>
>>>>>     dict_obj = {
>>>>>         'time': time.time(),
>>>>>         'value': i
>>>>>     }
>>>>>
>>>>>     data = json.dumps(dict_obj)
>>>>>
>>>>>     ksp.produce(data.encode())
>>>>>     time.sleep(0.25)
>>>>>
>>>>> *# This results in data being written to my Kafka Server as follows:*
>>>>>
>>>>>
>>>>> *# But when I call reset offset such as:*
>>>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22',
>>>>> '%Y-%m-%d %H:%M:%S'))*1000)
>>>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)])
>>>>>
>>>>> while True:
>>>>>     message = consumer.consume()
>>>>>     dict_obj = json.loads(message.value)
>>>>>
>>>>>     print(dict_obj)
>>>>>
>>>>> *# I get the below error and a reset to 0*
>>>>>
>>>>>
>>>>> *# Potentially related; printing the message timestamp (as below)
>>>>> results in 0 being printed*
>>>>>
>>>>> while True:
>>>>>     message = consumer.consume()
>>>>>     dict_obj = json.loads(message.value)
>>>>>
>>>>>     print(message.timestamp)
>>>>>
>>>>>
>>>>> *Thanks a million,*
>>>>> Moritz
>>>>>
>>>>>
>>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "pykafka-user" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to pykafka-user+unsubscr...@googlegroups.com.
>>>> To post to this group, send email to pykafka-u...@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40goog
>>>> legroups.com
>>>> <https://groups.google.com/d/msgid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>>
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>> Emmett Butler | Senior Software Engineer
>>>
>>> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "pykafka-user" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to pykafka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to pykafka-u...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>> gid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39
>>> %3DEit0yDA%40mail.gmail.com
>>> <https://groups.google.com/d/msgid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39%3DEit0yDA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>
>
> --
> Emmett Butler | Senior Software Engineer
>
> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>
>

Reply via email to