Thanks for the tip! I guess now it is working as it should be <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L54> .
Just one last question. Why did you decide to use "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am asking because I was basing my solution also (I also looked at your solution) on the "StreamFlatMap <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java#L29>" class implementation. Best, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <[email protected]> wrote: > I think you might mixed some test codes with the operator. "List<String> > getOutputs()" is from "TestMapBundleFunction" and only used for > validation. > For the real usage, you need to write whatever records you want to emit to > the "collector" which passed in during "finishBundle". > > Best, > Kurt > > > On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez < > [email protected]> wrote: > >> Hi Kurt, >> >> How do you make the finishBundle >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59> >> method returns the combined tuples? I saw that there is a method >> "List<String> getOutputs()" which is never called. >> >> I did an implementation >> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators> >> based on the example that you suggested. The MapBundleFunctionImpl >> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53> >> class >> has the method finishBundle which iterate all the combined tuples and >> return it. However, my application does not continue to receive tuples >> after the transform method >> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86> >> . >> >> Thanks, >> Felipe >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[email protected]> wrote: >> >>> I think you can simply copy the source codes to your project if maven >>> dependency can not be used. >>> >>> Best, >>> Kurt >>> >>> >>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez < >>> [email protected]> wrote: >>> >>>> Hi again Kurt, >>>> >>>> could you please help me with the pom.xml file? I have included >>>> all Table ecosystem dependencies and the flink-table-runtime-blink as well. >>>> However the class org.apache.flink.table.runtime.context.ExecutionContext >>>> is still not found. I guess I am missing some dependency, but I do not know >>>> which. This is my pom.xml file. >>>> >>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>>> <modelVersion>4.0.0</modelVersion> >>>> >>>> <groupId>org.sense.flink</groupId> >>>> <artifactId>explore-flink</artifactId> >>>> <version>0.0.1-SNAPSHOT</version> >>>> <packaging>jar</packaging> >>>> >>>> <name>explore-flink</name> >>>> <url>http://maven.apache.org</url> >>>> >>>> <properties> >>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >>>> <jdk.version>1.8</jdk.version> >>>> <scala.binary.version>2.11</scala.binary.version> >>>> <!-- <flink.version>1.8.0</flink.version> --> >>>> <flink.version>1.9-SNAPSHOT</flink.version> >>>> <junit.version>4.12</junit.version> >>>> </properties> >>>> >>>> <dependencies> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-java</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-clients_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-metrics-dropwizard</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>provided</scope> >>>> </dependency> >>>> >>>> <!-- Table ecosystem --> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> >>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> >>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-runtime-blink</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.fusesource.mqtt-client</groupId> >>>> <artifactId>mqtt-client</artifactId> >>>> <version>1.15</version> >>>> <!-- <scope>provided</scope> --> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.slf4j</groupId> >>>> <artifactId>slf4j-api</artifactId> >>>> <version>1.7.26</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.slf4j</groupId> >>>> <artifactId>slf4j-log4j12</artifactId> >>>> <version>1.7.26</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>junit</groupId> >>>> <artifactId>junit</artifactId> >>>> <version>${junit.version}</version> >>>> </dependency> >>>> </dependencies> >>>> <build> >>>> <finalName>explore-flink</finalName> >>>> <plugins> >>>> <!-- download source code in Eclipse, best practice --> >>>> <plugin> >>>> <groupId>org.apache.maven.plugins</groupId> >>>> <artifactId>maven-eclipse-plugin</artifactId> >>>> <version>2.10</version> >>>> <configuration> >>>> <downloadSources>true</downloadSources> >>>> <downloadJavadocs>false</downloadJavadocs> >>>> </configuration> >>>> </plugin> >>>> >>>> <!-- Set a compiler level --> >>>> <plugin> >>>> <groupId>org.apache.maven.plugins</groupId> >>>> <artifactId>maven-compiler-plugin</artifactId> >>>> <version>3.8.0</version> >>>> <configuration> >>>> <source>${jdk.version}</source> >>>> <target>${jdk.version}</target> >>>> </configuration> >>>> </plugin> >>>> >>>> <!-- Maven Shade Plugin --> >>>> <plugin> >>>> <groupId>org.apache.maven.plugins</groupId> >>>> <artifactId>maven-shade-plugin</artifactId> >>>> <version>3.2.0</version> >>>> <!-- Run shade goal on package phase --> >>>> <executions> >>>> <execution> >>>> <phase>package</phase> >>>> <goals> >>>> <goal>shade</goal> >>>> </goals> >>>> <configuration> >>>> <artifactSet> >>>> <excludes> >>>> <exclude>org.apache.flink:*</exclude> >>>> <!-- Also exclude very big transitive dependencies of Flink WARNING: >>>> You have to remove these excludes if your code relies on other versions >>>> of >>>> these dependencies. --> >>>> <exclude>org.slf4j:*</exclude> >>>> <exclude>log4j:*</exclude> >>>> <exclude>com.typesafe:config:*</exclude> >>>> <exclude>junit:junit:*</exclude> >>>> <exclude>com.codahale.metrics:*</exclude> >>>> </excludes> >>>> </artifactSet> >>>> <filters> >>>> <filter> >>>> <artifact>org.apache.flink:*</artifact> >>>> <excludes> >>>> <!-- exclude shaded google but include shaded curator --> >>>> <exclude>org/apache/flink/shaded/com/**</exclude> >>>> <exclude>web-docs/**</exclude> >>>> </excludes> >>>> </filter> >>>> <filter> >>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise, >>>> this might cause SecurityExceptions when using the JAR. --> >>>> <artifact>*:*</artifact> >>>> <excludes> >>>> <exclude>META-INF/*.SF</exclude> >>>> <exclude>META-INF/*.DSA</exclude> >>>> <exclude>META-INF/*.RSA</exclude> >>>> </excludes> >>>> </filter> >>>> <filter> >>>> <artifact>*:*</artifact> >>>> <includes> >>>> <include>org/apache/calcite/**</include> >>>> <include>org/apache/flink/calcite/shaded/**</include> >>>> <include>org/apache/flink/table/**</include> >>>> <include>org.codehaus.commons.compiler.properties</include> >>>> <include>org/codehaus/janino/**</include> >>>> <include>org/codehaus/commons/**</include> >>>> </includes> >>>> </filter> >>>> </filters> >>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment >>>> the following lines. This will add a Main-Class entry to the manifest >>>> file --> >>>> <transformers> >>>> <transformer >>>> >>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> >>>> <mainClass>org.sense.flink.App</mainClass> >>>> </transformer> >>>> </transformers> >>>> <createDependencyReducedPom>false</createDependencyReducedPom> >>>> </configuration> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> </plugins> >>>> </build> >>>> </project> >>>> >>>> >>>> Thanks >>>> >>>> >>>> >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>> >>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez < >>>> [email protected]> wrote: >>>> >>>>> oh, yes. I just saw. I will use 1.9 then. thanks >>>>> >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com>* >>>>> >>>>> >>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[email protected]> wrote: >>>>> >>>>>> It's because all blink codes are not shipped with 1.8.0, they current >>>>>> only available in 1.9-SNAPSHOT. >>>>>> >>>>>> Best, >>>>>> Kurt >>>>>> >>>>>> >>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> what are the artifacts that I have to import on maven in order to >>>>>>> use Blink Api? >>>>>>> >>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use >>>>>>> its ExecutionContext >>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>. >>>>>>> I want to do this in order to implement my own operator like it is >>>>>>> implemented here >>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>. >>>>>>> I guess if I import flink-table everything should come inside the same >>>>>>> jar >>>>>>> as it is done here >>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>. >>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that >>>>>>> it >>>>>>> is a missing artifact. >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-table-planner_2.11</artifactId> >>>>>>> <version>1.8.0</version> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >>>>>>> <version>1.8.0</version> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>>>>> <version>1.8.0</version> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-table-common</artifactId> >>>>>>> <version>1.8.0</version> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-table</artifactId> >>>>>>> <version>1.8.0</version> >>>>>>> <type>pom</type> >>>>>>> <scope>provided</scope> >>>>>>> </dependency> >>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT --> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-table-runtime-blink</artifactId> >>>>>>> <version>1.8.0</version> >>>>>>> </dependency> >>>>>>> >>>>>>> >>>>>>> *--* >>>>>>> *-- Felipe Gutierrez* >>>>>>> >>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Cool, thanks Kurt! >>>>>>>> *-* >>>>>>>> *- Felipe Gutierrez* >>>>>>>> >>>>>>>> *- skype: felipe.o.gutierrez* >>>>>>>> *- **https://felipeogutierrez.blogspot.com >>>>>>>> <https://felipeogutierrez.blogspot.com>* * >>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> You can checkout the bundle operator which used in Blink to >>>>>>>>> perform similar thing you mentioned: >>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Kurt >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I was trying to implement a better way to handle data skew using >>>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff >>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming >>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they >>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement the >>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map >>>>>>>>>> phase, >>>>>>>>>> before shuffling). >>>>>>>>>> >>>>>>>>>> I need some help here. What are some of the Flink source-code >>>>>>>>>> operators that I can peek up to implement my on operator that deals >>>>>>>>>> with >>>>>>>>>> data skew? Or maybe, is there someone that have an example of a use >>>>>>>>>> case >>>>>>>>>> similar to this? >>>>>>>>>> >>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835 >>>>>>>>>> [2] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> Felipe >>>>>>>>>> >>>>>>>>>> *--* >>>>>>>>>> *-- Felipe Gutierrez* >>>>>>>>>> >>>>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>>>> >>>>>>>>>
