Kafka IO: value of expansion_service

2020-04-21 Thread Piotr Filipiuk
Hi, I would like to know whether it is possible to run a streaming pipeline that reads from (or writes to) Kafka using DirectRunner? If so, what should the expansion_service point to: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90? Also, when using

Re: Kafka IO: value of expansion_service

2020-04-22 Thread Piotr Filipiuk
l:50450']' returned non-zero exit status 125. Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I cannot pull the image. Is it possible to get access to dev images? Alternatively, are there any instructions on how to build the beam_java_sdk locally and then use the loca

Re: Kafka IO: value of expansion_service

2020-04-22 Thread Piotr Filipiuk
radlew :sdks:java:container:docker > > On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk > wrote: > >> Thanks for quick response. >> >> Since Beam 2.21.0 is not yet available via pip >> <https://pypi.org/project/apache-beam/#history>, I tried to run it from >> HEAD. A

Re: Kafka IO: value of expansion_service

2020-04-22 Thread Piotr Filipiuk
ker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver wrote: > It should just work without any other changes. If it doesn't let us know. > > On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk > wrote: > &

Re: Kafka IO: value of expansion_service

2020-04-25 Thread Piotr Filipiuk
ect(ObjectInputStream.java:423) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) ... 18 more I am not sure it is related to https://issues.apache.org/jira/browse/BEAM-9745. On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk wrote: > H

Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Piotr Filipiuk
I am unable to read from Kafka and getting the following warnings & errors when calling kafka.ReadFromKafka() (Python SDK): WARNING:root:severity: WARN timestamp { seconds: 1591370012 nanos: 52300 } message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could not be estab

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Piotr Filipiuk
nment where the Flink step is executed from ? Can you try specifying > the actual IP address of the node running the Kafka broker ? > > > > On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik wrote: > > +dev +Chamikara Jayalath > +Heejong > Lee > > > > On Fri, Jun

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-08 Thread Piotr Filipiuk
n" log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk wrote: > Thank you for the suggestions. > > Neither Kafka nor Flink run in a docker container, they all run locally. > Furthermore, the same issue happens for Direct Runner. That being said > ch

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-12 Thread Piotr Filipiuk
FlinkRunner for xlang pipelines. > > On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath > wrote: > >> To clarify, Kafka dependency was already available as an embedded >> dependency in Java SDK Harness but not sure if this worked for >> DirectRunner. startin

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-18 Thread Piotr Filipiuk
I also wanted to clarify whether Kafka IO for Python SDK is general availability or is it still experimental? On Fri, Jun 12, 2020 at 2:52 PM Piotr Filipiuk wrote: > For completeness I am also attaching task manager logs. > > On Fri, Jun 12, 2020 at 2:32 PM Piotr Filipiuk > wrote

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-18 Thread Piotr Filipiuk
-language transforms. > > Thanks, > Cham > > > On Thu, Jun 18, 2020 at 11:26 AM Piotr Filipiuk > wrote: > >> I also wanted to clarify whether Kafka IO for Python SDK is general >> availability or is it still experimental? >> >> On Fri, Jun 12, 2020 at

Re: ReadFromKafka: UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler

2020-07-09 Thread Piotr Filipiuk
n. > > > > 1: https://issues.apache.org/jira/browse/BEAM-6868 > > > > On Mon, Jul 6, 2020 at 1:42 PM Piotr Filipiuk > <mailto:piotr.filip...@gmail.com>> wrote: > > > > Hi, > > > > I am trying to run a simple example that use

Processing files as they arrive with custom timestamps

2020-10-08 Thread Piotr Filipiuk
Hi, I am looking into: https://beam.apache.org/documentation/patterns/file-processing/ since I would like to create a continuous pipeline that reads from files and assigns Event Times based on e.g. file metadata or actual data inside the file. For example: private static void run(String[] args) {

Re: Processing files as they arrive with custom timestamps

2020-10-13 Thread Piotr Filipiuk
Cwik wrote: > I'm working on a blog post[1] about splittable dofns that covers this > topic. > > The TLDR; is that FileIO.match() should allow users to control the > watermark estimator that is used and for your use case you should hold the > watermark to some computable value (e

Re: Processing files as they arrive with custom timestamps

2020-10-14 Thread Piotr Filipiuk
ectly. It is >> likely that you copied this from Beam code and it is used there because >> user implementations of UnboundedSource were incorrectly setting the >> watermark outside of the bounds and there is no way to fix them. >> >> 1: >> https://github.com/apach

Re: Processing files as they arrive with custom timestamps

2020-10-14 Thread Piotr Filipiuk
would suspect that once the watermark is set to day+1, the results of the previous day should be finalized and hence the result for a given window should be outputted. On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik wrote: > I think you should be using the largest "complete" timestamp from the

Re: Processing files as they arrive with custom timestamps

2020-10-15 Thread Piotr Filipiuk
Made it work e2e. Thank you all for the help! On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk wrote: > Got it, thank you for the clarification. > > I tried to run the pipeline locally, with the following main (see full > source code attached): > > public static void

Re: Processing files as they arrive with custom timestamps

2020-10-23 Thread Piotr Filipiuk
e(filename, dateTimeFormatter); if (index != -1) { // In the case it has a suffix i.e. it is complete, fast forward to the next day. return timestamp.plus(Duration.standardDays(1)); } return timestamp; } On Mon, Oct 19, 2020 at 9:56 AM Luke Cwik wrote: > For future reference, w