I think you might mixed some test codes with the operator.  "List<String>
getOutputs()"  is from "TestMapBundleFunction" and only used for
validation.
For the real usage, you need to write whatever records you want to emit to
the "collector" which passed in during "finishBundle".

Best,
Kurt


On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
[email protected]> wrote:

> Hi Kurt,
>
> How do you make the finishBundle
> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59>
> method returns the combined tuples? I saw that there is a method
> "List<String> getOutputs()" which is never called.
>
> I did an implementation
> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators>
> based on the example that you suggested. The MapBundleFunctionImpl
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53>
>  class
> has the method finishBundle which iterate all the combined tuples and
> return it. However, my application does not continue to receive tuples
> after the transform method
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86>
> .
>
> Thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[email protected]> wrote:
>
>> I think you can simply copy the source codes to your project if maven
>> dependency can not be used.
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
>> [email protected]> wrote:
>>
>>> Hi again Kurt,
>>>
>>> could you please help me with the pom.xml file? I have included
>>> all Table ecosystem dependencies and the flink-table-runtime-blink as well.
>>> However the class org.apache.flink.table.runtime.context.ExecutionContext
>>> is still not found. I guess I am missing some dependency, but I do not know
>>> which. This is my pom.xml file.
>>>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>org.sense.flink</groupId>
>>> <artifactId>explore-flink</artifactId>
>>> <version>0.0.1-SNAPSHOT</version>
>>> <packaging>jar</packaging>
>>>
>>> <name>explore-flink</name>
>>> <url>http://maven.apache.org</url>
>>>
>>> <properties>
>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>> <jdk.version>1.8</jdk.version>
>>> <scala.binary.version>2.11</scala.binary.version>
>>> <!-- <flink.version>1.8.0</flink.version> -->
>>> <flink.version>1.9-SNAPSHOT</flink.version>
>>> <junit.version>4.12</junit.version>
>>> </properties>
>>>
>>> <dependencies>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-java</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-metrics-dropwizard</artifactId>
>>> <version>${flink.version}</version>
>>> <scope>provided</scope>
>>> </dependency>
>>>
>>> <!-- Table ecosystem -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>>
>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>>
>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-table-runtime-blink</artifactId>
>>> <version>${flink.version}</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.fusesource.mqtt-client</groupId>
>>> <artifactId>mqtt-client</artifactId>
>>> <version>1.15</version>
>>> <!-- <scope>provided</scope> -->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.slf4j</groupId>
>>> <artifactId>slf4j-api</artifactId>
>>> <version>1.7.26</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.slf4j</groupId>
>>> <artifactId>slf4j-log4j12</artifactId>
>>> <version>1.7.26</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>junit</groupId>
>>> <artifactId>junit</artifactId>
>>> <version>${junit.version}</version>
>>> </dependency>
>>> </dependencies>
>>> <build>
>>> <finalName>explore-flink</finalName>
>>> <plugins>
>>> <!-- download source code in Eclipse, best practice -->
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-eclipse-plugin</artifactId>
>>> <version>2.10</version>
>>> <configuration>
>>> <downloadSources>true</downloadSources>
>>> <downloadJavadocs>false</downloadJavadocs>
>>> </configuration>
>>> </plugin>
>>>
>>> <!-- Set a compiler level -->
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.8.0</version>
>>> <configuration>
>>> <source>${jdk.version}</source>
>>> <target>${jdk.version}</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <!-- Maven Shade Plugin -->
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-shade-plugin</artifactId>
>>> <version>3.2.0</version>
>>> <!-- Run shade goal on package phase -->
>>> <executions>
>>> <execution>
>>> <phase>package</phase>
>>> <goals>
>>> <goal>shade</goal>
>>> </goals>
>>> <configuration>
>>> <artifactSet>
>>> <excludes>
>>> <exclude>org.apache.flink:*</exclude>
>>> <!-- Also exclude very big transitive dependencies of Flink WARNING:
>>> You have to remove these excludes if your code relies on other versions
>>> of
>>> these dependencies. -->
>>> <exclude>org.slf4j:*</exclude>
>>> <exclude>log4j:*</exclude>
>>> <exclude>com.typesafe:config:*</exclude>
>>> <exclude>junit:junit:*</exclude>
>>> <exclude>com.codahale.metrics:*</exclude>
>>> </excludes>
>>> </artifactSet>
>>> <filters>
>>> <filter>
>>> <artifact>org.apache.flink:*</artifact>
>>> <excludes>
>>> <!-- exclude shaded google but include shaded curator -->
>>> <exclude>org/apache/flink/shaded/com/**</exclude>
>>> <exclude>web-docs/**</exclude>
>>> </excludes>
>>> </filter>
>>> <filter>
>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
>>> this might cause SecurityExceptions when using the JAR. -->
>>> <artifact>*:*</artifact>
>>> <excludes>
>>> <exclude>META-INF/*.SF</exclude>
>>> <exclude>META-INF/*.DSA</exclude>
>>> <exclude>META-INF/*.RSA</exclude>
>>> </excludes>
>>> </filter>
>>> <filter>
>>> <artifact>*:*</artifact>
>>> <includes>
>>> <include>org/apache/calcite/**</include>
>>> <include>org/apache/flink/calcite/shaded/**</include>
>>> <include>org/apache/flink/table/**</include>
>>> <include>org.codehaus.commons.compiler.properties</include>
>>> <include>org/codehaus/janino/**</include>
>>> <include>org/codehaus/commons/**</include>
>>> </includes>
>>> </filter>
>>> </filters>
>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
>>> the following lines. This will add a Main-Class entry to the manifest
>>> file -->
>>> <transformers>
>>> <transformer
>>>
>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>> <mainClass>org.sense.flink.App</mainClass>
>>> </transformer>
>>> </transformers>
>>> <createDependencyReducedPom>false</createDependencyReducedPom>
>>> </configuration>
>>> </execution>
>>> </executions>
>>> </plugin>
>>> </plugins>
>>> </build>
>>> </project>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
>>> [email protected]> wrote:
>>>
>>>> oh, yes. I just saw. I will use 1.9 then. thanks
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[email protected]> wrote:
>>>>
>>>>> It's because all blink codes are not shipped with 1.8.0, they current
>>>>> only available in 1.9-SNAPSHOT.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> what are the artifacts that I have to import on maven in order to use
>>>>>> Blink Api?
>>>>>>
>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use
>>>>>> its ExecutionContext
>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>>>> I want to do this in order to implement my own operator like it is
>>>>>> implemented here
>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>>>> I guess if I import flink-table everything should come inside the same 
>>>>>> jar
>>>>>> as it is done here
>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that 
>>>>>> it
>>>>>> is a missing artifact.
>>>>>>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>>>> <version>1.8.0</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>>> <version>1.8.0</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>>> <version>1.8.0</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table-common</artifactId>
>>>>>> <version>1.8.0</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table</artifactId>
>>>>>> <version>1.8.0</version>
>>>>>> <type>pom</type>
>>>>>> <scope>provided</scope>
>>>>>> </dependency>
>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>>> <version>1.8.0</version>
>>>>>> </dependency>
>>>>>>
>>>>>>
>>>>>> *--*
>>>>>> *-- Felipe Gutierrez*
>>>>>>
>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Cool, thanks Kurt!
>>>>>>> *-*
>>>>>>> *- Felipe Gutierrez*
>>>>>>>
>>>>>>> *- skype: felipe.o.gutierrez*
>>>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> You can checkout the bundle operator which used in Blink to perform
>>>>>>>> similar thing you mentioned:
>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I was trying to implement a better way to handle data skew using
>>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff
>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they
>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement the
>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map 
>>>>>>>>> phase,
>>>>>>>>> before shuffling).
>>>>>>>>>
>>>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>>>> operators that I can peek up to implement my on operator that deals 
>>>>>>>>> with
>>>>>>>>> data skew? Or maybe, is there someone that have an example of a use 
>>>>>>>>> case
>>>>>>>>> similar to this?
>>>>>>>>>
>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Felipe
>>>>>>>>>
>>>>>>>>> *--*
>>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>>
>>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>
>>>>>>>>

Reply via email to