Re: Beam supports Flink Async IO operator

2020-07-08 Thread Maximilian Michels
Flink's AsycIO operator is useful for processing io-bound operations, e.g. sending network requests. Like Luke mentioned, it is not available in Beam. -Max On 07.07.20 22:11, Luke Cwik wrote: Beam is a layer that sits on top of execution engines like Flink and provides its own programming mod

Re: Beam supports Flink Async IO operator

2020-07-08 Thread Maximilian Michels
Just to clarify: We could make the AsnycIO operator also available in Beam but the operator has to be represented by a concept in Beam. Otherwise, there is no way to know when to produce it as part of the translation. On 08.07.20 11:53, Maximilian Michels wrote: Flink's AsycIO operator is usef

Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Hi! I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and annotated it with DefaultCoder @DefaultCoder(AvroCoder.class) public class ProductCatalog { When I trying to submit it to cluster I get an error: Caused by: java.io.NotSerializableException: ...common.ProductCatalog If I

Re: Conditional branching during pipeline execution time

2020-07-08 Thread Praveen K Viswanathan
No worries, the error was due to the DoFn element not defined as an instance of SchemaCoder. On Tue, Jul 7, 2020 at 10:46 PM Praveen K Viswanathan < harish.prav...@gmail.com> wrote: > Thanks Luke. I changed the pipeline structure as shown below. I could see > that this flow would work for my orig

TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Hi, I want to test pipeline and the input for it is PCollection of TableRows. I've created a test, and when I run it I get an error: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.google.api.services.bigquery.model.TableRow Is it a known issue? Thank you i

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Here is a code example: List ss = Arrays.asList(session1, session2); PCollection sessions = p.apply(Create.of(ss)); PCollection res = Job.runJob(sessions, "20200614", false, new ProductCatalog()); p.run(); On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich wrote: > Hi, > I want to test pipeline an

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
Kirill - Can you tell us more about what Job.runJob is doing? I would not expect the Beam SDK itself to do any casting to TableRow, so is there a line in your code where you're explicitly casting to TableRow? There may be a point where you need to explicitly set the coder on a PCollection to deseri

Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-08 Thread Kaymak, Tobias
As a workaround I am currently using the following code to generate a TableRow object from a Java Protobuf class - as I am facing a problem with Beam schemas ( https://www.mail-archive.com/user@beam.apache.org/msg05799.html). It relies on the TableRowJsonCoder: String json = JsonFormat.prin

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Hi Jeff, It's a simple pipeline that takes PCollection of TableRow which is selected from Google Analytics export to BigQuery. So each TableRow follows this scheme https://support.google.com/analytics/answer/3437719?hl=en I have part of the code doing casting to TableRow like this: Boolean isMobil

Re: Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-08 Thread Lars Almgren Schwartz
Hey, Don't know if it's the official way but we have written our own proto to BigQuery converter which works pretty well. public static TableRow convertEventToTableRow(TableRow tableRow, Message event) { Map fields = event.getAllFields(); for (Descriptors.FieldDescriptor field : fields.ke

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
The test runner intentionally does some ugly things in order to expose problems which might otherwise be missed. In particular, I believe the test runner enforces coding between each transform and scrambles order of elements whereas production pipelines will often have many transforms fused togethe

Re: Beam supports Flink Async IO operator

2020-07-08 Thread Eleanore Jin
Thanks Luke and Max for the information. We have the use case that inside a DoFn, we will need to call external services to trigger some other flows. The calls to other services are REST based sync calls, and it will take 150 milliseconds plus to return. We are using Flink as the runner and I came

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
So it's correct implementation of TableRow that encode(decode(a)) != a? On Wed, 8 Jul 2020 at 19:03, Jeff Klukas wrote: > The test runner intentionally does some ugly things in order to expose > problems which might otherwise be missed. In particular, I believe the test > runner enforces coding

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich wrote: > So it's correct implementation of TableRow that encode(decode(a)) != a? > A TableRow can contain fields of any map implementation. It makes sense to me that once a TableRow object is serialized and deserialized, that the coder must make a

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Rui Wang
Tried some code search in Beam repo but I didn't find the exact line of code that throws your exception. However, I believe for Java Classes you used in primitives (ParDo, CombineFn) and coders, it's very likely you need to make them serializable (i.e. implements Serializable). -Rui On Wed, Jul

Re: Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-08 Thread Luke Cwik
The deprecated method is not going to be removed anytime soon so I wouldn't worry about it being removed. If you really want to use non-deprecated methods, then the TableRowJsonCoder uses the StringUtf8Coder to parse strings so it is looking for a nested encoding using the StringUtf8Coder encoding

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
So from what I understand, it works like this by design and it's not possible to test my code with the current coder implementation. Is that correct? On Wed, 8 Jul 2020 at 21:41, Jeff Klukas wrote: > On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich > wrote: > >> So it's correct implementation o

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Luke Cwik
Can you provide the full stacktrace? On Wed, Jul 8, 2020 at 12:33 PM Rui Wang wrote: > Tried some code search in Beam repo but I didn't find the exact line > of code that throws your exception. > > However, I believe for Java Classes you used in primitives (ParDo, > CombineFn) and coders, it's v

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Hello Luke, I will try to get "22 more" soon, not sure how to it though 23:11:11.339 [ERROR] [system.err] Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics@444f44c5, mainOutputT

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich wrote: > So from what I understand, it works like this by design and it's not > possible to test my code with the current coder implementation. Is that > correct? > I would argue that this test failure is indicating an area of potential failure in

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Interesting. All my code does is following: public static void main(String[] args) { PCollection bqResult = p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()); PCollection result = runJob(bqResult, boolean and string params); // store results } and private static

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Luke Cwik
The problem is that CreateSessionMetrics has an instance of com.ikea.search.ab.common.ProductCatalog$Product as part of its serialization closure. Does it have a member variable of that type, or refer to one that is part of its parent's class, or ...? Should it be marked transient? Marking it as s

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Cool! Thanks a lot for your help, Luke. On Wed, 8 Jul 2020 at 23:45, Luke Cwik wrote: > The problem is that CreateSessionMetrics has an instance of > com.ikea.search.ab.common.ProductCatalog$Product as part of its > serialization closure. Does it have a member variable of that type, or > refer t

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
Does the stack trace tell you where specifically in the code the cast is happening? I'm guessing there may be assumptions inside your CreateSessionMetrics class if that's where you're manipulating the TableRow objects. On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich wrote: > Interesting. All my

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
I changed code a little bit not to use lambdas. Session(TableRow bigQueryRow, ProductCatalog productCatalog) { List h = ((List) bigQueryRow.get("hits")); List hits = new ArrayList<>(); for (TableRow tableRow : h) { <-- breaks here hits.add(new Hit(tableRo

Unable to read value from state/Unable to fetch data due to token mismatch for key

2020-07-08 Thread Mohil Khare
Hello, I am using beam java sdk 2.19.0 (with enableStreamingEngine set as true) and very heavily use stateful beam processing model. However, sometimes I am seeing the following exception while reading value from state for a key (Please note : here my key is a POJO where fields create a kind of co

Not able to create a checkpoint path

2020-07-08 Thread akul sharma
I am executing a word count streaming job using kafka producer as an unbound input source and kafka consumers to write output. Till this everything worked fine. But when i tried for checkpointing by enabling setCheckpoingInterval parameter of flinkPipelineOptions interface, i am unable to see the c

Re: Unable to read value from state/Unable to fetch data due to token mismatch for key

2020-07-08 Thread Reuven Lax
This error should be benign. It often means that ownership of the work item was moved to a different worker (possibly caused by autoscaling or other source of work rebalancing), so the in-progress work item on that worker failed. However the work item will be processed successfully on the new worke