Hi Alexey,
You don't have to set the streaming mode. The Flink Runner will
automatically choose to use streaming mode when it discovers
UnboundedSources like Kafka. I'm wondering why that didn't work in
your case. I just ran your example and it chose streaming mode and
didn't return an error durin
Thanks
with if (translator == null || !applyCanTranslate(transform, node,
translator)) all working as expectected
Regards,
Alexey Diomin
2016-08-31 14:12 GMT+04:00 Aljoscha Krettek :
> Ah I see, an unbounded source, such as the Kafka source does not work in
> batch mode (which streamStreamin
Ah I see, an unbounded source, such as the Kafka source does not work in
batch mode (which streamStreaming(false) enables). The code should work in
streaming mode if you apply some window that is compatible with the
side-input window to the main input.
I think the code in streaming still works bec
Program for reproduce
https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e
1) options.setStreaming(false); - we have NPE and i can't understand how
code work
2) options.setStreaming(true); - pipeline can compile (he still have
error, but it's my incorrect work with window)
2016-
Hi
If we can change code on translator != null then next line (
applyStreamingTransform(transform, node, translator); ) will cause NPE
It's main problem why I don't understand code:
x = null;
if (x == null && f1_null_value_forbid(x)) { ..}
f2_null_value_forbid(x);
change (x == null) => (x !=nul
Hi,
I think this is more suited for the Beam dev list. Nevertheless, I think
this is a coding error and the condition should be
if (translator != null && !applyCanTranslate(transform, node, translator))
With what program did you encounter an NPE, it seems to me that this should
rarely happen, at l
Hi
Sorry if i mistake with mailing list.
After BEAM-102 was solved in FlinkStreamingPipelineTranslator we have code
in visitPrimitiveTransform:
if (translator == null && applyCanTranslate(transform, node, translator)) {
LOG.info(node.getTransform().getClass().toString());
throw new Unsuppo