There is no reason for it, the operator and function doesn't rely on the
logic which AbstractUdfStreamOperator supplied.

Best,
Kurt


On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Thanks for the tip! I guess now it is working as it should be
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L54>
> .
>
> Just one last question. Why did you decide to use "AbstractStreamOperator"
> instead of "AbstractUdfStreamOperator". I am asking because I was basing my
> solution also (I also looked at your solution) on the "StreamFlatMap
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java#L29>"
> class implementation.
>
> Best,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <ykt...@gmail.com> wrote:
>
>> I think you might mixed some test codes with the operator.  "List<String>
>> getOutputs()"  is from "TestMapBundleFunction" and only used for
>> validation.
>> For the real usage, you need to write whatever records you want to emit
>> to the "collector" which passed in during "finishBundle".
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Kurt,
>>>
>>> How do you make the finishBundle
>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59>
>>> method returns the combined tuples? I saw that there is a method
>>> "List<String> getOutputs()" which is never called.
>>>
>>> I did an implementation
>>> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators>
>>> based on the example that you suggested. The MapBundleFunctionImpl
>>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53>
>>>  class
>>> has the method finishBundle which iterate all the combined tuples and
>>> return it. However, my application does not continue to receive tuples
>>> after the transform method
>>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86>
>>> .
>>>
>>> Thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> I think you can simply copy the source codes to your project if maven
>>>> dependency can not be used.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>
>>>>> Hi again Kurt,
>>>>>
>>>>> could you please help me with the pom.xml file? I have included
>>>>> all Table ecosystem dependencies and the flink-table-runtime-blink as 
>>>>> well.
>>>>> However the class org.apache.flink.table.runtime.context.ExecutionContext
>>>>> is still not found. I guess I am missing some dependency, but I do not 
>>>>> know
>>>>> which. This is my pom.xml file.
>>>>>
>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>>> <modelVersion>4.0.0</modelVersion>
>>>>>
>>>>> <groupId>org.sense.flink</groupId>
>>>>> <artifactId>explore-flink</artifactId>
>>>>> <version>0.0.1-SNAPSHOT</version>
>>>>> <packaging>jar</packaging>
>>>>>
>>>>> <name>explore-flink</name>
>>>>> <url>http://maven.apache.org</url>
>>>>>
>>>>> <properties>
>>>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>>> <jdk.version>1.8</jdk.version>
>>>>> <scala.binary.version>2.11</scala.binary.version>
>>>>> <!-- <flink.version>1.8.0</flink.version> -->
>>>>> <flink.version>1.9-SNAPSHOT</flink.version>
>>>>> <junit.version>4.12</junit.version>
>>>>> </properties>
>>>>>
>>>>> <dependencies>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-java</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-metrics-dropwizard</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>>
>>>>> <!-- Table ecosystem -->
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>>
>>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>>
>>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.fusesource.mqtt-client</groupId>
>>>>> <artifactId>mqtt-client</artifactId>
>>>>> <version>1.15</version>
>>>>> <!-- <scope>provided</scope> -->
>>>>> </dependency>
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.slf4j</groupId>
>>>>> <artifactId>slf4j-api</artifactId>
>>>>> <version>1.7.26</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.slf4j</groupId>
>>>>> <artifactId>slf4j-log4j12</artifactId>
>>>>> <version>1.7.26</version>
>>>>> </dependency>
>>>>>
>>>>> <dependency>
>>>>> <groupId>junit</groupId>
>>>>> <artifactId>junit</artifactId>
>>>>> <version>${junit.version}</version>
>>>>> </dependency>
>>>>> </dependencies>
>>>>> <build>
>>>>> <finalName>explore-flink</finalName>
>>>>> <plugins>
>>>>> <!-- download source code in Eclipse, best practice -->
>>>>> <plugin>
>>>>> <groupId>org.apache.maven.plugins</groupId>
>>>>> <artifactId>maven-eclipse-plugin</artifactId>
>>>>> <version>2.10</version>
>>>>> <configuration>
>>>>> <downloadSources>true</downloadSources>
>>>>> <downloadJavadocs>false</downloadJavadocs>
>>>>> </configuration>
>>>>> </plugin>
>>>>>
>>>>> <!-- Set a compiler level -->
>>>>> <plugin>
>>>>> <groupId>org.apache.maven.plugins</groupId>
>>>>> <artifactId>maven-compiler-plugin</artifactId>
>>>>> <version>3.8.0</version>
>>>>> <configuration>
>>>>> <source>${jdk.version}</source>
>>>>> <target>${jdk.version}</target>
>>>>> </configuration>
>>>>> </plugin>
>>>>>
>>>>> <!-- Maven Shade Plugin -->
>>>>> <plugin>
>>>>> <groupId>org.apache.maven.plugins</groupId>
>>>>> <artifactId>maven-shade-plugin</artifactId>
>>>>> <version>3.2.0</version>
>>>>> <!-- Run shade goal on package phase -->
>>>>> <executions>
>>>>> <execution>
>>>>> <phase>package</phase>
>>>>> <goals>
>>>>> <goal>shade</goal>
>>>>> </goals>
>>>>> <configuration>
>>>>> <artifactSet>
>>>>> <excludes>
>>>>> <exclude>org.apache.flink:*</exclude>
>>>>> <!-- Also exclude very big transitive dependencies of Flink WARNING:
>>>>> You have to remove these excludes if your code relies on other
>>>>> versions of
>>>>> these dependencies. -->
>>>>> <exclude>org.slf4j:*</exclude>
>>>>> <exclude>log4j:*</exclude>
>>>>> <exclude>com.typesafe:config:*</exclude>
>>>>> <exclude>junit:junit:*</exclude>
>>>>> <exclude>com.codahale.metrics:*</exclude>
>>>>> </excludes>
>>>>> </artifactSet>
>>>>> <filters>
>>>>> <filter>
>>>>> <artifact>org.apache.flink:*</artifact>
>>>>> <excludes>
>>>>> <!-- exclude shaded google but include shaded curator -->
>>>>> <exclude>org/apache/flink/shaded/com/**</exclude>
>>>>> <exclude>web-docs/**</exclude>
>>>>> </excludes>
>>>>> </filter>
>>>>> <filter>
>>>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
>>>>> this might cause SecurityExceptions when using the JAR. -->
>>>>> <artifact>*:*</artifact>
>>>>> <excludes>
>>>>> <exclude>META-INF/*.SF</exclude>
>>>>> <exclude>META-INF/*.DSA</exclude>
>>>>> <exclude>META-INF/*.RSA</exclude>
>>>>> </excludes>
>>>>> </filter>
>>>>> <filter>
>>>>> <artifact>*:*</artifact>
>>>>> <includes>
>>>>> <include>org/apache/calcite/**</include>
>>>>> <include>org/apache/flink/calcite/shaded/**</include>
>>>>> <include>org/apache/flink/table/**</include>
>>>>> <include>org.codehaus.commons.compiler.properties</include>
>>>>> <include>org/codehaus/janino/**</include>
>>>>> <include>org/codehaus/commons/**</include>
>>>>> </includes>
>>>>> </filter>
>>>>> </filters>
>>>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
>>>>> the following lines. This will add a Main-Class entry to the manifest
>>>>> file -->
>>>>> <transformers>
>>>>> <transformer
>>>>>
>>>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>>> <mainClass>org.sense.flink.App</mainClass>
>>>>> </transformer>
>>>>> </transformers>
>>>>> <createDependencyReducedPom>false</createDependencyReducedPom>
>>>>> </configuration>
>>>>> </execution>
>>>>> </executions>
>>>>> </plugin>
>>>>> </plugins>
>>>>> </build>
>>>>> </project>
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>
>>>>>> oh, yes. I just saw. I will use 1.9 then. thanks
>>>>>>
>>>>>> *--*
>>>>>> *-- Felipe Gutierrez*
>>>>>>
>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>
>>>>>>> It's because all blink codes are not shipped with 1.8.0, they
>>>>>>> current only available in 1.9-SNAPSHOT.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> what are the artifacts that I have to import on maven in order to
>>>>>>>> use Blink Api?
>>>>>>>>
>>>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use
>>>>>>>> its ExecutionContext
>>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>>>>>> I want to do this in order to implement my own operator like it is
>>>>>>>> implemented here
>>>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>>>>>> I guess if I import flink-table everything should come inside the same 
>>>>>>>> jar
>>>>>>>> as it is done here
>>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says 
>>>>>>>> that it
>>>>>>>> is a missing artifact.
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>>>>>> <version>1.8.0</version>
>>>>>>>> </dependency>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>>>>> <version>1.8.0</version>
>>>>>>>> </dependency>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>>>>> <version>1.8.0</version>
>>>>>>>> </dependency>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-table-common</artifactId>
>>>>>>>> <version>1.8.0</version>
>>>>>>>> </dependency>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-table</artifactId>
>>>>>>>> <version>1.8.0</version>
>>>>>>>> <type>pom</type>
>>>>>>>> <scope>provided</scope>
>>>>>>>> </dependency>
>>>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>>>>> <version>1.8.0</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> *--*
>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>
>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Cool, thanks Kurt!
>>>>>>>>> *-*
>>>>>>>>> *- Felipe Gutierrez*
>>>>>>>>>
>>>>>>>>> *- skype: felipe.o.gutierrez*
>>>>>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> You can checkout the bundle operator which used in Blink to
>>>>>>>>>> perform similar thing you mentioned:
>>>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Kurt
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I was trying to implement a better way to handle data skew using
>>>>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff
>>>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they
>>>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement 
>>>>>>>>>>> the
>>>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map 
>>>>>>>>>>> phase,
>>>>>>>>>>> before shuffling).
>>>>>>>>>>>
>>>>>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>>>>>> operators that I can peek up to implement my on operator that deals 
>>>>>>>>>>> with
>>>>>>>>>>> data skew? Or maybe, is there someone that have an example of a use 
>>>>>>>>>>> case
>>>>>>>>>>> similar to this?
>>>>>>>>>>>
>>>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>>>>>> [2]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Felipe
>>>>>>>>>>>
>>>>>>>>>>> *--*
>>>>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>>>>
>>>>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to