Thanks for the tip! I guess now it is working as it should be
<https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L54>
.

Just one last question. Why did you decide to use "AbstractStreamOperator"
instead of "AbstractUdfStreamOperator". I am asking because I was basing my
solution also (I also looked at your solution) on the "StreamFlatMap
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java#L29>"
class implementation.

Best,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <[email protected]> wrote:

> I think you might mixed some test codes with the operator.  "List<String>
> getOutputs()"  is from "TestMapBundleFunction" and only used for
> validation.
> For the real usage, you need to write whatever records you want to emit to
> the "collector" which passed in during "finishBundle".
>
> Best,
> Kurt
>
>
> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
> [email protected]> wrote:
>
>> Hi Kurt,
>>
>> How do you make the finishBundle
>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59>
>> method returns the combined tuples? I saw that there is a method
>> "List<String> getOutputs()" which is never called.
>>
>> I did an implementation
>> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators>
>> based on the example that you suggested. The MapBundleFunctionImpl
>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53>
>>  class
>> has the method finishBundle which iterate all the combined tuples and
>> return it. However, my application does not continue to receive tuples
>> after the transform method
>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86>
>> .
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <[email protected]> wrote:
>>
>>> I think you can simply copy the source codes to your project if maven
>>> dependency can not be used.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
>>> [email protected]> wrote:
>>>
>>>> Hi again Kurt,
>>>>
>>>> could you please help me with the pom.xml file? I have included
>>>> all Table ecosystem dependencies and the flink-table-runtime-blink as well.
>>>> However the class org.apache.flink.table.runtime.context.ExecutionContext
>>>> is still not found. I guess I am missing some dependency, but I do not know
>>>> which. This is my pom.xml file.
>>>>
>>>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>> <modelVersion>4.0.0</modelVersion>
>>>>
>>>> <groupId>org.sense.flink</groupId>
>>>> <artifactId>explore-flink</artifactId>
>>>> <version>0.0.1-SNAPSHOT</version>
>>>> <packaging>jar</packaging>
>>>>
>>>> <name>explore-flink</name>
>>>> <url>http://maven.apache.org</url>
>>>>
>>>> <properties>
>>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>> <jdk.version>1.8</jdk.version>
>>>> <scala.binary.version>2.11</scala.binary.version>
>>>> <!-- <flink.version>1.8.0</flink.version> -->
>>>> <flink.version>1.9-SNAPSHOT</flink.version>
>>>> <junit.version>4.12</junit.version>
>>>> </properties>
>>>>
>>>> <dependencies>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-java</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-metrics-dropwizard</artifactId>
>>>> <version>${flink.version}</version>
>>>> <scope>provided</scope>
>>>> </dependency>
>>>>
>>>> <!-- Table ecosystem -->
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>>
>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>>
>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>>
>>>> <dependency>
>>>> <groupId>org.fusesource.mqtt-client</groupId>
>>>> <artifactId>mqtt-client</artifactId>
>>>> <version>1.15</version>
>>>> <!-- <scope>provided</scope> -->
>>>> </dependency>
>>>>
>>>> <dependency>
>>>> <groupId>org.slf4j</groupId>
>>>> <artifactId>slf4j-api</artifactId>
>>>> <version>1.7.26</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.slf4j</groupId>
>>>> <artifactId>slf4j-log4j12</artifactId>
>>>> <version>1.7.26</version>
>>>> </dependency>
>>>>
>>>> <dependency>
>>>> <groupId>junit</groupId>
>>>> <artifactId>junit</artifactId>
>>>> <version>${junit.version}</version>
>>>> </dependency>
>>>> </dependencies>
>>>> <build>
>>>> <finalName>explore-flink</finalName>
>>>> <plugins>
>>>> <!-- download source code in Eclipse, best practice -->
>>>> <plugin>
>>>> <groupId>org.apache.maven.plugins</groupId>
>>>> <artifactId>maven-eclipse-plugin</artifactId>
>>>> <version>2.10</version>
>>>> <configuration>
>>>> <downloadSources>true</downloadSources>
>>>> <downloadJavadocs>false</downloadJavadocs>
>>>> </configuration>
>>>> </plugin>
>>>>
>>>> <!-- Set a compiler level -->
>>>> <plugin>
>>>> <groupId>org.apache.maven.plugins</groupId>
>>>> <artifactId>maven-compiler-plugin</artifactId>
>>>> <version>3.8.0</version>
>>>> <configuration>
>>>> <source>${jdk.version}</source>
>>>> <target>${jdk.version}</target>
>>>> </configuration>
>>>> </plugin>
>>>>
>>>> <!-- Maven Shade Plugin -->
>>>> <plugin>
>>>> <groupId>org.apache.maven.plugins</groupId>
>>>> <artifactId>maven-shade-plugin</artifactId>
>>>> <version>3.2.0</version>
>>>> <!-- Run shade goal on package phase -->
>>>> <executions>
>>>> <execution>
>>>> <phase>package</phase>
>>>> <goals>
>>>> <goal>shade</goal>
>>>> </goals>
>>>> <configuration>
>>>> <artifactSet>
>>>> <excludes>
>>>> <exclude>org.apache.flink:*</exclude>
>>>> <!-- Also exclude very big transitive dependencies of Flink WARNING:
>>>> You have to remove these excludes if your code relies on other versions
>>>> of
>>>> these dependencies. -->
>>>> <exclude>org.slf4j:*</exclude>
>>>> <exclude>log4j:*</exclude>
>>>> <exclude>com.typesafe:config:*</exclude>
>>>> <exclude>junit:junit:*</exclude>
>>>> <exclude>com.codahale.metrics:*</exclude>
>>>> </excludes>
>>>> </artifactSet>
>>>> <filters>
>>>> <filter>
>>>> <artifact>org.apache.flink:*</artifact>
>>>> <excludes>
>>>> <!-- exclude shaded google but include shaded curator -->
>>>> <exclude>org/apache/flink/shaded/com/**</exclude>
>>>> <exclude>web-docs/**</exclude>
>>>> </excludes>
>>>> </filter>
>>>> <filter>
>>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
>>>> this might cause SecurityExceptions when using the JAR. -->
>>>> <artifact>*:*</artifact>
>>>> <excludes>
>>>> <exclude>META-INF/*.SF</exclude>
>>>> <exclude>META-INF/*.DSA</exclude>
>>>> <exclude>META-INF/*.RSA</exclude>
>>>> </excludes>
>>>> </filter>
>>>> <filter>
>>>> <artifact>*:*</artifact>
>>>> <includes>
>>>> <include>org/apache/calcite/**</include>
>>>> <include>org/apache/flink/calcite/shaded/**</include>
>>>> <include>org/apache/flink/table/**</include>
>>>> <include>org.codehaus.commons.compiler.properties</include>
>>>> <include>org/codehaus/janino/**</include>
>>>> <include>org/codehaus/commons/**</include>
>>>> </includes>
>>>> </filter>
>>>> </filters>
>>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
>>>> the following lines. This will add a Main-Class entry to the manifest
>>>> file -->
>>>> <transformers>
>>>> <transformer
>>>>
>>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>> <mainClass>org.sense.flink.App</mainClass>
>>>> </transformer>
>>>> </transformers>
>>>> <createDependencyReducedPom>false</createDependencyReducedPom>
>>>> </configuration>
>>>> </execution>
>>>> </executions>
>>>> </plugin>
>>>> </plugins>
>>>> </build>
>>>> </project>
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
>>>> [email protected]> wrote:
>>>>
>>>>> oh, yes. I just saw. I will use 1.9 then. thanks
>>>>>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <[email protected]> wrote:
>>>>>
>>>>>> It's because all blink codes are not shipped with 1.8.0, they current
>>>>>> only available in 1.9-SNAPSHOT.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> what are the artifacts that I have to import on maven in order to
>>>>>>> use Blink Api?
>>>>>>>
>>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use
>>>>>>> its ExecutionContext
>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>>>>> I want to do this in order to implement my own operator like it is
>>>>>>> implemented here
>>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>>>>> I guess if I import flink-table everything should come inside the same 
>>>>>>> jar
>>>>>>> as it is done here
>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that 
>>>>>>> it
>>>>>>> is a missing artifact.
>>>>>>>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>>>>> <version>1.8.0</version>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>>>> <version>1.8.0</version>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>>>> <version>1.8.0</version>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-table-common</artifactId>
>>>>>>> <version>1.8.0</version>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-table</artifactId>
>>>>>>> <version>1.8.0</version>
>>>>>>> <type>pom</type>
>>>>>>> <scope>provided</scope>
>>>>>>> </dependency>
>>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>>>> <version>1.8.0</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>>
>>>>>>> *--*
>>>>>>> *-- Felipe Gutierrez*
>>>>>>>
>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Cool, thanks Kurt!
>>>>>>>> *-*
>>>>>>>> *- Felipe Gutierrez*
>>>>>>>>
>>>>>>>> *- skype: felipe.o.gutierrez*
>>>>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> You can checkout the bundle operator which used in Blink to
>>>>>>>>> perform similar thing you mentioned:
>>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Kurt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I was trying to implement a better way to handle data skew using
>>>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff
>>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they
>>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement the
>>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map 
>>>>>>>>>> phase,
>>>>>>>>>> before shuffling).
>>>>>>>>>>
>>>>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>>>>> operators that I can peek up to implement my on operator that deals 
>>>>>>>>>> with
>>>>>>>>>> data skew? Or maybe, is there someone that have an example of a use 
>>>>>>>>>> case
>>>>>>>>>> similar to this?
>>>>>>>>>>
>>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>> Felipe
>>>>>>>>>>
>>>>>>>>>> *--*
>>>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>>>
>>>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>>
>>>>>>>>>

Reply via email to