Hi, I just opened a PR for this. https://github.com/apache/flink/pull/983
However, I was not able to "reproduce" serialization issues... I tested Tuple0 (see enclosed code) in a cluster, and the program worked. Do I miss anything? -Matthias On 08/03/2015 01:01 AM, Matthias J. Sax wrote: > Thanks for the advice about Tuple0. > > I personally don't see any advantage in having "flink-tuple" project. Do > I miss anything about it? Furthermore, I am not sure if it is a good > idea the have too many too small projects. > > > On 08/03/2015 12:48 AM, Stephan Ewen wrote: >> Tuple0 would need special serialization and comparator logic. If that is >> given, I see no reason not to support it. >> >> There is BTW, the request to create a dedicated "flink-tuple" project, that >> only contains the tuple classes. Any opinions on that? >> >> On Mon, Aug 3, 2015 at 12:45 AM, Matthias J. Sax < >> mj...@informatik.hu-berlin.de> wrote: >> >>> Thanks for the explanation! >>> >>> As I mentioned before, Tuple0 might also be helpful for streaming. And I >>> guess I will need it for Storm compatibility layer, too. (I need to >>> double check, but Storm supports zero-attribute-tuples, too). >>> >>> With regard to the information I collected during the discussion, I vote >>> for keeping Tuple0 in Flink core, and fix the serialization problem. >>> Should we have another JIRA for this? Or should I extend the existing >>> JIRA? (https://issues.apache.org/jira/browse/FLINK-2457) >>> >>> -Matthias >>> >>> >>> On 08/03/2015 12:22 AM, Chesnay Schepler wrote: >>>> First of all, it was a really good idea to start a discussion about this. >>>> >>>> So the general idea behind Tuple0 was this: >>>> >>>> The Python API maps python tuples to flink tuples. Python can have empty >>>> tuples, so i thought "well duh, let's make a Tuple0 class!". What i did >>>> not wanna do is create some non-Tuple object to represent empty tuples, >>>> I'd rather have them treated the same, because it's less work and >>>> creates simpler code. >>>> >>>> When transferring the plan to java, certain parameters for operations >>>> are tuples, which can be empty aswell. >>>> This is where the Tuple0 class is really useful, because these empty >>>> tuples go through the same logic as other tuples. >>>> This is also why i want to keep the class, at least in the python >>>> project, for now. >>>> >>>> For the actual program execution, I need a new solution. Funny story, >>>> while writing this reply i noticed that the Python API can't handle >>>> Tuple0 at runtime aswell. ha...ha... -.- >>>> >>>> Guess I now know what I'm working on next. >>>> >>>> On 02.08.2015 21:24, Matthias J. Sax wrote: >>>>> Can you elaborate how and why Python used Tuple0? If it cannot be >>>>> serialized similar to regular Tuples, what is the usage in Python? Right >>>>> now it seems, as there is no special serialization code for Tuple0. >>>>> >>>>> I just want to understand the topic in detail. >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 08/01/2015 03:38 PM, Stephan Ewen wrote: >>>>>> I think a Tuple0 cannot be implemented like the current tuples, at >>> least >>>>>> with respect to runtime serialization. >>>>>> >>>>>> The system makes the assumption that it makes progress in consuming >>>>>> bytes >>>>>> when deserializing values. If a Tuple= never consumes data from the >>> byte >>>>>> stream, this assumption is broken. It would need at least one marker >>>>>> byte. >>>>>> Then it effectively is a Tuple1<Byte> disgusing itself as a tuple0. >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Aug 1, 2015 at 1:38 PM, Matthias J. Sax < >>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>> >>>>>>> I just double checked. Scala does not have type Tuple0. IMHO, it would >>>>>>> be best to remove Tuple0 for consistency. Having Tuple types is for >>>>>>> consistency reason with Scala in the first place, right? Please give >>>>>>> feedback. >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 08/01/2015 01:04 PM, Matthias J. Sax wrote: >>>>>>>> I see. >>>>>>>> >>>>>>>> I think that it might be useful to have Tuple0, because in rare >>> cases, >>>>>>>> you only want to "notify" a downstream operators (taking about >>>>>>>> streaming) that something happened but there is no actual data to be >>>>>>>> processed. Furthermore, if Flink cannot deal with Tuple0 it should be >>>>>>>> removed completely for consistency IMHO. >>>>>>>> >>>>>>>> I will open a JIRA for it. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 07/31/2015 10:44 PM, Chesnay Schepler wrote: >>>>>>>>> also, I'm not sure if I ever sent a Tuple0 through a program, it >>>>>>>>> could >>>>>>>>> be that the system freaks out. >>>>>>>>> >>>>>>>>> On 31.07.2015 22:40, Chesnay Schepler wrote: >>>>>>>>>> there's no specific reason. it was added fairly recently by me >>>>>>>>>> (mid of >>>>>>>>>> april), and you're most likely the second person to use it. >>>>>>>>>> >>>>>>>>>> i didn't integrate into all our tuple related stuff because, well, >>> i >>>>>>>>>> never thought anyone would actually need it, so i saved myself the >>>>>>>>>> trouble. >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> is there any specific reason, why Tuple.getTupleClass(int arity) >>>>>>>>>>> does >>>>>>>>>>> not support arity zero? There is a class Tuple0, but it cannot be >>>>>>>>>>> generator by Tuple.getTupleClass(...). Is it a missing feature (I >>>>>>> would >>>>>>>>>>> like to have it). >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>> >>> >>> >> >
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.stormcompatibility.wordcount; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; public class TestEmptyTuple { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple0> input = env.addSource(new Empty()).setParallelism(4); input.shuffle().transform("verify", null, new Counter()).setParallelism(1); env.execute(); } public static class Empty implements ParallelSourceFunction<Tuple0> { private static final long serialVersionUID = 1350902748274781033L; private volatile boolean isRunning = true; @Override public void run( org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Tuple0> ctx) throws Exception { while (this.isRunning) { Thread.sleep(1000); ctx.collect(new Tuple0()); } } @Override public void cancel() { this.isRunning = false; } } public static class Counter implements OneInputStreamOperator<Tuple0,Object> { private static final long serialVersionUID = 5518823010274195186L; @Override public void setup(Output<StreamRecord<Object>> output, StreamingRuntimeContext runtimeContext) { // TODO Auto-generated method stub } @Override public void open(Configuration config) throws Exception { // TODO Auto-generated method stub } @Override public void close() throws Exception { // TODO Auto-generated method stub } @Override public StreamingRuntimeContext getRuntimeContext() { // TODO Auto-generated method stub return null; } @Override public boolean isInputCopyingDisabled() { // TODO Auto-generated method stub return false; } @Override public void setChainingStrategy( org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy strategy) { // TODO Auto-generated method stub } @Override public org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy getChainingStrategy() { // TODO Auto-generated method stub return null; } private int counter = 0; @Override public void processElement(StreamRecord<Tuple0> element) throws Exception { System.out.println("Tuple0: " + (++counter)); } @Override public void processWatermark(Watermark mark) throws Exception { // TODO Auto-generated method stub } } }
signature.asc
Description: OpenPGP digital signature