Hi,

I just opened a PR for this. https://github.com/apache/flink/pull/983

However, I was not able to "reproduce" serialization issues... I tested
Tuple0 (see enclosed code) in a cluster, and the program worked. Do I
miss anything?

-Matthias



On 08/03/2015 01:01 AM, Matthias J. Sax wrote:
> Thanks for the advice about Tuple0.
> 
> I personally don't see any advantage in having "flink-tuple" project. Do
> I miss anything about it? Furthermore, I am not sure if it is a good
> idea the have too many too small projects.
> 
> 
> On 08/03/2015 12:48 AM, Stephan Ewen wrote:
>> Tuple0 would need special serialization and comparator logic. If that is
>> given, I see no reason not to support it.
>>
>> There is BTW, the request to create a dedicated "flink-tuple" project, that
>> only contains the tuple classes. Any opinions on that?
>>
>> On Mon, Aug 3, 2015 at 12:45 AM, Matthias J. Sax <
>> mj...@informatik.hu-berlin.de> wrote:
>>
>>> Thanks for the explanation!
>>>
>>> As I mentioned before, Tuple0 might also be helpful for streaming. And I
>>> guess I will need it for Storm compatibility layer, too. (I need to
>>> double check, but Storm supports zero-attribute-tuples, too).
>>>
>>> With regard to the information I collected during the discussion, I vote
>>> for keeping Tuple0 in Flink core, and fix the serialization problem.
>>> Should we have another JIRA for this? Or should I extend the existing
>>> JIRA? (https://issues.apache.org/jira/browse/FLINK-2457)
>>>
>>> -Matthias
>>>
>>>
>>> On 08/03/2015 12:22 AM, Chesnay Schepler wrote:
>>>> First of all, it was a really good idea to start a discussion about this.
>>>>
>>>> So the general idea behind Tuple0 was this:
>>>>
>>>> The Python API maps python tuples to flink tuples. Python can have empty
>>>> tuples, so i thought "well duh, let's make a Tuple0 class!". What i did
>>>> not wanna do is create some non-Tuple object to represent empty tuples,
>>>> I'd rather have them treated the same, because it's less work and
>>>> creates simpler code.
>>>>
>>>> When transferring the plan to java, certain parameters for operations
>>>> are tuples, which can be empty aswell.
>>>> This is where the Tuple0 class is really useful, because these empty
>>>> tuples go through the same logic as other tuples.
>>>> This is also why i want to keep the class, at least in the python
>>>> project, for now.
>>>>
>>>> For the actual program execution, I need a new solution. Funny story,
>>>> while writing this reply i noticed that the Python API can't handle
>>>> Tuple0 at runtime aswell. ha...ha... -.-
>>>>
>>>> Guess I now know what I'm working on next.
>>>>
>>>> On 02.08.2015 21:24, Matthias J. Sax wrote:
>>>>> Can you elaborate how and why Python used Tuple0? If it cannot be
>>>>> serialized similar to regular Tuples, what is the usage in Python? Right
>>>>> now it seems, as there is no special serialization code for Tuple0.
>>>>>
>>>>> I just want to understand the topic in detail.
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 08/01/2015 03:38 PM, Stephan Ewen wrote:
>>>>>> I think a Tuple0 cannot be implemented like the current tuples, at
>>> least
>>>>>> with respect to runtime serialization.
>>>>>>
>>>>>> The system makes the assumption that it makes progress in consuming
>>>>>> bytes
>>>>>> when deserializing values. If a Tuple= never consumes data from the
>>> byte
>>>>>> stream, this assumption is broken. It would need at least one marker
>>>>>> byte.
>>>>>> Then it effectively is a Tuple1<Byte> disgusing itself as a tuple0.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Aug 1, 2015 at 1:38 PM, Matthias J. Sax <
>>>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>>>
>>>>>>> I just double checked. Scala does not have type Tuple0. IMHO, it would
>>>>>>> be best to remove Tuple0 for consistency. Having Tuple types is for
>>>>>>> consistency reason with Scala in the first place, right? Please give
>>>>>>> feedback.
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 08/01/2015 01:04 PM, Matthias J. Sax wrote:
>>>>>>>> I see.
>>>>>>>>
>>>>>>>> I think that it might be useful to have Tuple0, because in rare
>>> cases,
>>>>>>>> you only want to "notify" a downstream operators (taking about
>>>>>>>> streaming) that something happened but there is no actual data to be
>>>>>>>> processed. Furthermore, if Flink cannot deal with Tuple0 it should be
>>>>>>>> removed completely for consistency IMHO.
>>>>>>>>
>>>>>>>> I will open a JIRA for it.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 07/31/2015 10:44 PM, Chesnay Schepler wrote:
>>>>>>>>> also, I'm not sure if I ever sent a Tuple0 through a program, it
>>>>>>>>> could
>>>>>>>>> be that the system freaks out.
>>>>>>>>>
>>>>>>>>> On 31.07.2015 22:40, Chesnay Schepler wrote:
>>>>>>>>>> there's no specific reason. it was added fairly recently by me
>>>>>>>>>> (mid of
>>>>>>>>>> april), and you're most likely the second person to use it.
>>>>>>>>>>
>>>>>>>>>> i didn't integrate into all our tuple related stuff because, well,
>>> i
>>>>>>>>>> never thought anyone would actually need it, so i saved myself the
>>>>>>>>>> trouble.
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> is there any specific reason, why Tuple.getTupleClass(int arity)
>>>>>>>>>>> does
>>>>>>>>>>> not support arity zero? There is a class Tuple0, but it cannot be
>>>>>>>>>>> generator by Tuple.getTupleClass(...). Is it a missing feature (I
>>>>>>> would
>>>>>>>>>>> like to have it).
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>
>>>
>>>
>>
> 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.stormcompatibility.wordcount;

import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

public class TestEmptyTuple {

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Tuple0> input = env.addSource(new Empty()).setParallelism(4);
		input.shuffle().transform("verify", null, new Counter()).setParallelism(1);

		env.execute();
	}

	public static class Empty implements ParallelSourceFunction<Tuple0> {
		private static final long serialVersionUID = 1350902748274781033L;
		private volatile boolean isRunning = true;

		@Override
		public void run(
				org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Tuple0> ctx)
						throws Exception {
			while (this.isRunning) {
				Thread.sleep(1000);
				ctx.collect(new Tuple0());
			}

		}

		@Override
		public void cancel() {
			this.isRunning = false;
		}

	}

	public static class Counter implements OneInputStreamOperator<Tuple0,Object> {
		private static final long serialVersionUID = 5518823010274195186L;

		@Override
		public void setup(Output<StreamRecord<Object>> output,
				StreamingRuntimeContext runtimeContext) {
			// TODO Auto-generated method stub

		}

		@Override
		public void open(Configuration config) throws Exception {
			// TODO Auto-generated method stub

		}

		@Override
		public void close() throws Exception {
			// TODO Auto-generated method stub

		}

		@Override
		public StreamingRuntimeContext getRuntimeContext() {
			// TODO Auto-generated method stub
			return null;
		}

		@Override
		public boolean isInputCopyingDisabled() {
			// TODO Auto-generated method stub
			return false;
		}

		@Override
		public void setChainingStrategy(
				org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy strategy) {
			// TODO Auto-generated method stub

		}

		@Override
		public org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy getChainingStrategy() {
			// TODO Auto-generated method stub
			return null;
		}

		private int counter = 0;
		@Override
		public void processElement(StreamRecord<Tuple0> element) throws Exception {
			System.out.println("Tuple0: " + (++counter));
		}

		@Override
		public void processWatermark(Watermark mark) throws Exception {
			// TODO Auto-generated method stub

		}



	}

}

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to