Hi Kurt, How do you make the finishBundle <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59> method returns the combined tuples? I saw that there is a method "List<String> getOutputs()" which is never called.
I did an implementation <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators> based on the example that you suggested. The MapBundleFunctionImpl <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53> class has the method finishBundle which iterate all the combined tuples and return it. However, my application does not continue to receive tuples after the transform method <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86> . Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt...@gmail.com> wrote: > I think you can simply copy the source codes to your project if maven > dependency can not be used. > > Best, > Kurt > > > On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hi again Kurt, >> >> could you please help me with the pom.xml file? I have included all Table >> ecosystem dependencies and the flink-table-runtime-blink as well. However >> the class org.apache.flink.table.runtime.context.ExecutionContext is still >> not found. I guess I am missing some dependency, but I do not know which. >> This is my pom.xml file. >> >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>org.sense.flink</groupId> >> <artifactId>explore-flink</artifactId> >> <version>0.0.1-SNAPSHOT</version> >> <packaging>jar</packaging> >> >> <name>explore-flink</name> >> <url>http://maven.apache.org</url> >> >> <properties> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> <jdk.version>1.8</jdk.version> >> <scala.binary.version>2.11</scala.binary.version> >> <!-- <flink.version>1.8.0</flink.version> --> >> <flink.version>1.9-SNAPSHOT</flink.version> >> <junit.version>4.12</junit.version> >> </properties> >> >> <dependencies> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-metrics-dropwizard</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> >> <!-- Table ecosystem --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-runtime-blink</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.fusesource.mqtt-client</groupId> >> <artifactId>mqtt-client</artifactId> >> <version>1.15</version> >> <!-- <scope>provided</scope> --> >> </dependency> >> >> <dependency> >> <groupId>org.slf4j</groupId> >> <artifactId>slf4j-api</artifactId> >> <version>1.7.26</version> >> </dependency> >> <dependency> >> <groupId>org.slf4j</groupId> >> <artifactId>slf4j-log4j12</artifactId> >> <version>1.7.26</version> >> </dependency> >> >> <dependency> >> <groupId>junit</groupId> >> <artifactId>junit</artifactId> >> <version>${junit.version}</version> >> </dependency> >> </dependencies> >> <build> >> <finalName>explore-flink</finalName> >> <plugins> >> <!-- download source code in Eclipse, best practice --> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-eclipse-plugin</artifactId> >> <version>2.10</version> >> <configuration> >> <downloadSources>true</downloadSources> >> <downloadJavadocs>false</downloadJavadocs> >> </configuration> >> </plugin> >> >> <!-- Set a compiler level --> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-compiler-plugin</artifactId> >> <version>3.8.0</version> >> <configuration> >> <source>${jdk.version}</source> >> <target>${jdk.version}</target> >> </configuration> >> </plugin> >> >> <!-- Maven Shade Plugin --> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-shade-plugin</artifactId> >> <version>3.2.0</version> >> <!-- Run shade goal on package phase --> >> <executions> >> <execution> >> <phase>package</phase> >> <goals> >> <goal>shade</goal> >> </goals> >> <configuration> >> <artifactSet> >> <excludes> >> <exclude>org.apache.flink:*</exclude> >> <!-- Also exclude very big transitive dependencies of Flink WARNING: >> You have to remove these excludes if your code relies on other versions >> of >> these dependencies. --> >> <exclude>org.slf4j:*</exclude> >> <exclude>log4j:*</exclude> >> <exclude>com.typesafe:config:*</exclude> >> <exclude>junit:junit:*</exclude> >> <exclude>com.codahale.metrics:*</exclude> >> </excludes> >> </artifactSet> >> <filters> >> <filter> >> <artifact>org.apache.flink:*</artifact> >> <excludes> >> <!-- exclude shaded google but include shaded curator --> >> <exclude>org/apache/flink/shaded/com/**</exclude> >> <exclude>web-docs/**</exclude> >> </excludes> >> </filter> >> <filter> >> <!-- Do not copy the signatures in the META-INF folder. Otherwise, >> this might cause SecurityExceptions when using the JAR. --> >> <artifact>*:*</artifact> >> <excludes> >> <exclude>META-INF/*.SF</exclude> >> <exclude>META-INF/*.DSA</exclude> >> <exclude>META-INF/*.RSA</exclude> >> </excludes> >> </filter> >> <filter> >> <artifact>*:*</artifact> >> <includes> >> <include>org/apache/calcite/**</include> >> <include>org/apache/flink/calcite/shaded/**</include> >> <include>org/apache/flink/table/**</include> >> <include>org.codehaus.commons.compiler.properties</include> >> <include>org/codehaus/janino/**</include> >> <include>org/codehaus/commons/**</include> >> </includes> >> </filter> >> </filters> >> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment >> the following lines. This will add a Main-Class entry to the manifest >> file --> >> <transformers> >> <transformer >> >> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> >> <mainClass>org.sense.flink.App</mainClass> >> </transformer> >> </transformers> >> <createDependencyReducedPom>false</createDependencyReducedPom> >> </configuration> >> </execution> >> </executions> >> </plugin> >> </plugins> >> </build> >> </project> >> >> >> Thanks >> >> >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> oh, yes. I just saw. I will use 1.9 then. thanks >>> >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> It's because all blink codes are not shipped with 1.8.0, they current >>>> only available in 1.9-SNAPSHOT. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez < >>>> felipe.o.gutier...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> what are the artifacts that I have to import on maven in order to use >>>>> Blink Api? >>>>> >>>>> I am using Flink 1.8.0 and I am trying to import blink code to use its >>>>> ExecutionContext >>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>. >>>>> I want to do this in order to implement my own operator like it is >>>>> implemented here >>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>. >>>>> I guess if I import flink-table everything should come inside the same jar >>>>> as it is done here >>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>. >>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that it >>>>> is a missing artifact. >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-planner_2.11</artifactId> >>>>> <version>1.8.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >>>>> <version>1.8.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>>> <version>1.8.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-common</artifactId> >>>>> <version>1.8.0</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table</artifactId> >>>>> <version>1.8.0</version> >>>>> <type>pom</type> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT --> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-runtime-blink</artifactId> >>>>> <version>1.8.0</version> >>>>> </dependency> >>>>> >>>>> >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com>* >>>>> >>>>> >>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>>> Cool, thanks Kurt! >>>>>> *-* >>>>>> *- Felipe Gutierrez* >>>>>> >>>>>> *- skype: felipe.o.gutierrez* >>>>>> *- **https://felipeogutierrez.blogspot.com >>>>>> <https://felipeogutierrez.blogspot.com>* * >>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>> >>>>>> >>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> You can checkout the bundle operator which used in Blink to perform >>>>>>> similar thing you mentioned: >>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java >>>>>>> >>>>>>> Best, >>>>>>> Kurt >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez < >>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I was trying to implement a better way to handle data skew using >>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff >>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming >>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they >>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement the >>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map >>>>>>>> phase, >>>>>>>> before shuffling). >>>>>>>> >>>>>>>> I need some help here. What are some of the Flink source-code >>>>>>>> operators that I can peek up to implement my on operator that deals >>>>>>>> with >>>>>>>> data skew? Or maybe, is there someone that have an example of a use >>>>>>>> case >>>>>>>> similar to this? >>>>>>>> >>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835 >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html >>>>>>>> >>>>>>>> Thanks! >>>>>>>> Felipe >>>>>>>> >>>>>>>> *--* >>>>>>>> *-- Felipe Gutierrez* >>>>>>>> >>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>> >>>>>>>