There is no reason for it, the operator and function doesn't rely on the logic which AbstractUdfStreamOperator supplied.
Best, Kurt On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Thanks for the tip! I guess now it is working as it should be > <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L54> > . > > Just one last question. Why did you decide to use "AbstractStreamOperator" > instead of "AbstractUdfStreamOperator". I am asking because I was basing my > solution also (I also looked at your solution) on the "StreamFlatMap > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java#L29>" > class implementation. > > Best, > Felipe > > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <ykt...@gmail.com> wrote: > >> I think you might mixed some test codes with the operator. "List<String> >> getOutputs()" is from "TestMapBundleFunction" and only used for >> validation. >> For the real usage, you need to write whatever records you want to emit >> to the "collector" which passed in during "finishBundle". >> >> Best, >> Kurt >> >> >> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> Hi Kurt, >>> >>> How do you make the finishBundle >>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59> >>> method returns the combined tuples? I saw that there is a method >>> "List<String> getOutputs()" which is never called. >>> >>> I did an implementation >>> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators> >>> based on the example that you suggested. The MapBundleFunctionImpl >>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53> >>> class >>> has the method finishBundle which iterate all the combined tuples and >>> return it. However, my application does not continue to receive tuples >>> after the transform method >>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86> >>> . >>> >>> Thanks, >>> Felipe >>> >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> I think you can simply copy the source codes to your project if maven >>>> dependency can not be used. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez < >>>> felipe.o.gutier...@gmail.com> wrote: >>>> >>>>> Hi again Kurt, >>>>> >>>>> could you please help me with the pom.xml file? I have included >>>>> all Table ecosystem dependencies and the flink-table-runtime-blink as >>>>> well. >>>>> However the class org.apache.flink.table.runtime.context.ExecutionContext >>>>> is still not found. I guess I am missing some dependency, but I do not >>>>> know >>>>> which. This is my pom.xml file. >>>>> >>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>>>> <modelVersion>4.0.0</modelVersion> >>>>> >>>>> <groupId>org.sense.flink</groupId> >>>>> <artifactId>explore-flink</artifactId> >>>>> <version>0.0.1-SNAPSHOT</version> >>>>> <packaging>jar</packaging> >>>>> >>>>> <name>explore-flink</name> >>>>> <url>http://maven.apache.org</url> >>>>> >>>>> <properties> >>>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >>>>> <jdk.version>1.8</jdk.version> >>>>> <scala.binary.version>2.11</scala.binary.version> >>>>> <!-- <flink.version>1.8.0</flink.version> --> >>>>> <flink.version>1.9-SNAPSHOT</flink.version> >>>>> <junit.version>4.12</junit.version> >>>>> </properties> >>>>> >>>>> <dependencies> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-java</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-clients_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-metrics-dropwizard</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <!-- Table ecosystem --> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> >>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> >>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-runtime-blink</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>org.fusesource.mqtt-client</groupId> >>>>> <artifactId>mqtt-client</artifactId> >>>>> <version>1.15</version> >>>>> <!-- <scope>provided</scope> --> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>org.slf4j</groupId> >>>>> <artifactId>slf4j-api</artifactId> >>>>> <version>1.7.26</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.slf4j</groupId> >>>>> <artifactId>slf4j-log4j12</artifactId> >>>>> <version>1.7.26</version> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>junit</groupId> >>>>> <artifactId>junit</artifactId> >>>>> <version>${junit.version}</version> >>>>> </dependency> >>>>> </dependencies> >>>>> <build> >>>>> <finalName>explore-flink</finalName> >>>>> <plugins> >>>>> <!-- download source code in Eclipse, best practice --> >>>>> <plugin> >>>>> <groupId>org.apache.maven.plugins</groupId> >>>>> <artifactId>maven-eclipse-plugin</artifactId> >>>>> <version>2.10</version> >>>>> <configuration> >>>>> <downloadSources>true</downloadSources> >>>>> <downloadJavadocs>false</downloadJavadocs> >>>>> </configuration> >>>>> </plugin> >>>>> >>>>> <!-- Set a compiler level --> >>>>> <plugin> >>>>> <groupId>org.apache.maven.plugins</groupId> >>>>> <artifactId>maven-compiler-plugin</artifactId> >>>>> <version>3.8.0</version> >>>>> <configuration> >>>>> <source>${jdk.version}</source> >>>>> <target>${jdk.version}</target> >>>>> </configuration> >>>>> </plugin> >>>>> >>>>> <!-- Maven Shade Plugin --> >>>>> <plugin> >>>>> <groupId>org.apache.maven.plugins</groupId> >>>>> <artifactId>maven-shade-plugin</artifactId> >>>>> <version>3.2.0</version> >>>>> <!-- Run shade goal on package phase --> >>>>> <executions> >>>>> <execution> >>>>> <phase>package</phase> >>>>> <goals> >>>>> <goal>shade</goal> >>>>> </goals> >>>>> <configuration> >>>>> <artifactSet> >>>>> <excludes> >>>>> <exclude>org.apache.flink:*</exclude> >>>>> <!-- Also exclude very big transitive dependencies of Flink WARNING: >>>>> You have to remove these excludes if your code relies on other >>>>> versions of >>>>> these dependencies. --> >>>>> <exclude>org.slf4j:*</exclude> >>>>> <exclude>log4j:*</exclude> >>>>> <exclude>com.typesafe:config:*</exclude> >>>>> <exclude>junit:junit:*</exclude> >>>>> <exclude>com.codahale.metrics:*</exclude> >>>>> </excludes> >>>>> </artifactSet> >>>>> <filters> >>>>> <filter> >>>>> <artifact>org.apache.flink:*</artifact> >>>>> <excludes> >>>>> <!-- exclude shaded google but include shaded curator --> >>>>> <exclude>org/apache/flink/shaded/com/**</exclude> >>>>> <exclude>web-docs/**</exclude> >>>>> </excludes> >>>>> </filter> >>>>> <filter> >>>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise, >>>>> this might cause SecurityExceptions when using the JAR. --> >>>>> <artifact>*:*</artifact> >>>>> <excludes> >>>>> <exclude>META-INF/*.SF</exclude> >>>>> <exclude>META-INF/*.DSA</exclude> >>>>> <exclude>META-INF/*.RSA</exclude> >>>>> </excludes> >>>>> </filter> >>>>> <filter> >>>>> <artifact>*:*</artifact> >>>>> <includes> >>>>> <include>org/apache/calcite/**</include> >>>>> <include>org/apache/flink/calcite/shaded/**</include> >>>>> <include>org/apache/flink/table/**</include> >>>>> <include>org.codehaus.commons.compiler.properties</include> >>>>> <include>org/codehaus/janino/**</include> >>>>> <include>org/codehaus/commons/**</include> >>>>> </includes> >>>>> </filter> >>>>> </filters> >>>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment >>>>> the following lines. This will add a Main-Class entry to the manifest >>>>> file --> >>>>> <transformers> >>>>> <transformer >>>>> >>>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> >>>>> <mainClass>org.sense.flink.App</mainClass> >>>>> </transformer> >>>>> </transformers> >>>>> <createDependencyReducedPom>false</createDependencyReducedPom> >>>>> </configuration> >>>>> </execution> >>>>> </executions> >>>>> </plugin> >>>>> </plugins> >>>>> </build> >>>>> </project> >>>>> >>>>> >>>>> Thanks >>>>> >>>>> >>>>> >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com>* >>>>> >>>>> >>>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>>> oh, yes. I just saw. I will use 1.9 then. thanks >>>>>> >>>>>> *--* >>>>>> *-- Felipe Gutierrez* >>>>>> >>>>>> *-- skype: felipe.o.gutierrez* >>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>> >>>>>> >>>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt...@gmail.com> wrote: >>>>>> >>>>>>> It's because all blink codes are not shipped with 1.8.0, they >>>>>>> current only available in 1.9-SNAPSHOT. >>>>>>> >>>>>>> Best, >>>>>>> Kurt >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez < >>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> what are the artifacts that I have to import on maven in order to >>>>>>>> use Blink Api? >>>>>>>> >>>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use >>>>>>>> its ExecutionContext >>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>. >>>>>>>> I want to do this in order to implement my own operator like it is >>>>>>>> implemented here >>>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>. >>>>>>>> I guess if I import flink-table everything should come inside the same >>>>>>>> jar >>>>>>>> as it is done here >>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>. >>>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says >>>>>>>> that it >>>>>>>> is a missing artifact. >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-planner_2.11</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-common</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> </dependency> >>>>>>>> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> <type>pom</type> >>>>>>>> <scope>provided</scope> >>>>>>>> </dependency> >>>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT --> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-runtime-blink</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> >>>>>>>> *--* >>>>>>>> *-- Felipe Gutierrez* >>>>>>>> >>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez < >>>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Cool, thanks Kurt! >>>>>>>>> *-* >>>>>>>>> *- Felipe Gutierrez* >>>>>>>>> >>>>>>>>> *- skype: felipe.o.gutierrez* >>>>>>>>> *- **https://felipeogutierrez.blogspot.com >>>>>>>>> <https://felipeogutierrez.blogspot.com>* * >>>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> You can checkout the bundle operator which used in Blink to >>>>>>>>>> perform similar thing you mentioned: >>>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Kurt >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez < >>>>>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I was trying to implement a better way to handle data skew using >>>>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff >>>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming >>>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they >>>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement >>>>>>>>>>> the >>>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map >>>>>>>>>>> phase, >>>>>>>>>>> before shuffling). >>>>>>>>>>> >>>>>>>>>>> I need some help here. What are some of the Flink source-code >>>>>>>>>>> operators that I can peek up to implement my on operator that deals >>>>>>>>>>> with >>>>>>>>>>> data skew? Or maybe, is there someone that have an example of a use >>>>>>>>>>> case >>>>>>>>>>> similar to this? >>>>>>>>>>> >>>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835 >>>>>>>>>>> [2] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> Felipe >>>>>>>>>>> >>>>>>>>>>> *--* >>>>>>>>>>> *-- Felipe Gutierrez* >>>>>>>>>>> >>>>>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>>>>> >>>>>>>>>>