Thanks, I´ll do some additional digging. Not sure if it´s relevant, but for the record numerical offset based resetting works like a charm.
2018-06-19 23:42 GMT+02:00 Emmett Butler <emm...@parsely.com>: > Thanks! I ask because I think it's possible that only having a single log > segment (as your partition does) hamstrings the functionality of > reset_offsets(). I haven't verified this experimentally, but I think it's > possible. Maybe someone on the Kafka users group has insight. > > On Tue, Jun 19, 2018 at 1:55 AM, Moe <volland.mor...@gmail.com> wrote: > >> Thanks for getting back to me so speedily. To be frank, I do not know how >> log segments differ from logs. >> >> *# This is a view into my logs folder:* >> >> >> *# And this is the view into that folder for my offset_retrieval_test.001 >> partition (the topic in question):* >> >> >> Please let me know if this isn't what you were asking for! The topic has >> been auto created so all settings are default (7 days retention if my >> memory serves me right, but I'm sure you know that better than me). >> >> I appreciate the time and effort, >> Moritz >> >> 2018-06-18 18:27 GMT+02:00 Emmett Butler <emm...@parsely.com>: >> >>> How many log segments does this partition have, and what are the >>> retention settings for the topic? You can find the log segments by looking >>> in the directory pointed to by the log.dirs server configuration. >>> >>> On Mon, Jun 18, 2018 at 7:43 AM, Moe <volland.mor...@gmail.com> wrote: >>> >>>> *# Sorry, I should have mentioned that the timestamp I use is one I >>>> read from the server gui* >>>> >>>> >>>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe: >>>>> >>>>> *Hi,* >>>>> >>>>> >>>>> *I'm struggling to implement a time(stamp) based reset_offset and am >>>>> hoping someone here can show me the light. In essence when I call >>>>> reset_offset with a timestamp I get * >>>>> >>>>> *"OffsetOutOfRangeError" * >>>>> >>>>> >>>>> *# Setup:* >>>>> - Kafka: >>>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker >>>>> (Wurstmeister/Kafka image) >>>>> >>>>> - Consumer/Producer: >>>>> Ubuntu17.04 >>>>> pykafka2.7.0 >>>>> python3.6 >>>>> >>>>> >>>>> *# Kafka Connection* >>>>> client = KafkaClient(hosts='10.25.42.127:9092', >>>>> broker_version="1.0.0") >>>>> topic = client.topics['offset_retrieval_test.001'.encode()] >>>>> >>>>> *# Producer* >>>>> delivery_reports=False >>>>> auto_start=True >>>>> linger_ms=10.0 >>>>> >>>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_ >>>>> ms=linger_ms,auto_start=auto_start) >>>>> >>>>> for i in range(100): >>>>> >>>>> dict_obj = { >>>>> 'time': time.time(), >>>>> 'value': i >>>>> } >>>>> >>>>> data = json.dumps(dict_obj) >>>>> >>>>> ksp.produce(data.encode()) >>>>> time.sleep(0.25) >>>>> >>>>> *# This results in data being written to my Kafka Server as follows:* >>>>> >>>>> >>>>> *# But when I call reset offset such as:* >>>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22', >>>>> '%Y-%m-%d %H:%M:%S'))*1000) >>>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)]) >>>>> >>>>> while True: >>>>> message = consumer.consume() >>>>> dict_obj = json.loads(message.value) >>>>> >>>>> print(dict_obj) >>>>> >>>>> *# I get the below error and a reset to 0* >>>>> >>>>> >>>>> *# Potentially related; printing the message timestamp (as below) >>>>> results in 0 being printed* >>>>> >>>>> while True: >>>>> message = consumer.consume() >>>>> dict_obj = json.loads(message.value) >>>>> >>>>> print(message.timestamp) >>>>> >>>>> >>>>> *Thanks a million,* >>>>> Moritz >>>>> >>>>> >>>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "pykafka-user" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to pykafka-user+unsubscr...@googlegroups.com. >>>> To post to this group, send email to pykafka-u...@googlegroups.com. >>>> Visit this group at https://groups.google.com/group/pykafka-user. >>>> To view this discussion on the web visit https://groups.google.com/d/ms >>>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40goog >>>> legroups.com >>>> <https://groups.google.com/d/msgid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com?utm_medium=email&utm_source=footer> >>>> . >>>> >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> >>> >>> -- >>> Emmett Butler | Senior Software Engineer >>> >>> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature> >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "pykafka-user" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to pykafka-user+unsubscr...@googlegroups.com. >>> To post to this group, send email to pykafka-u...@googlegroups.com. >>> Visit this group at https://groups.google.com/group/pykafka-user. >>> To view this discussion on the web visit https://groups.google.com/d/ms >>> gid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39 >>> %3DEit0yDA%40mail.gmail.com >>> <https://groups.google.com/d/msgid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39%3DEit0yDA%40mail.gmail.com?utm_medium=email&utm_source=footer> >>> . >>> >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> > > > -- > Emmett Butler | Senior Software Engineer > > <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature> >