I mean no particular reason.

Best,
Kurt


On Wed, Apr 17, 2019 at 7:44 PM Kurt Young <ykt...@gmail.com> wrote:

> There is no reason for it, the operator and function doesn't rely on the
> logic which AbstractUdfStreamOperator supplied.
>
> Best,
> Kurt
>
>
> On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Thanks for the tip! I guess now it is working as it should be
>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L54>
>> .
>>
>> Just one last question. Why did you decide to use
>> "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am
>> asking because I was basing my solution also (I also looked at your
>> solution) on the "StreamFlatMap
>> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java#L29>"
>> class implementation.
>>
>> Best,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> I think you might mixed some test codes with the operator.
>>> "List<String> getOutputs()"  is from "TestMapBundleFunction" and only used
>>> for validation.
>>> For the real usage, you need to write whatever records you want to emit
>>> to the "collector" which passed in during "finishBundle".
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
>>>> Hi Kurt,
>>>>
>>>> How do you make the finishBundle
>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59>
>>>> method returns the combined tuples? I saw that there is a method
>>>> "List<String> getOutputs()" which is never called.
>>>>
>>>> I did an implementation
>>>> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators>
>>>> based on the example that you suggested. The MapBundleFunctionImpl
>>>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53>
>>>>  class
>>>> has the method finishBundle which iterate all the combined tuples and
>>>> return it. However, my application does not continue to receive tuples
>>>> after the transform method
>>>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86>
>>>> .
>>>>
>>>> Thanks,
>>>> Felipe
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> I think you can simply copy the source codes to your project if maven
>>>>> dependency can not be used.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>
>>>>>> Hi again Kurt,
>>>>>>
>>>>>> could you please help me with the pom.xml file? I have included
>>>>>> all Table ecosystem dependencies and the flink-table-runtime-blink as 
>>>>>> well.
>>>>>> However the class org.apache.flink.table.runtime.context.ExecutionContext
>>>>>> is still not found. I guess I am missing some dependency, but I do not 
>>>>>> know
>>>>>> which. This is my pom.xml file.
>>>>>>
>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>>>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>>>> <modelVersion>4.0.0</modelVersion>
>>>>>>
>>>>>> <groupId>org.sense.flink</groupId>
>>>>>> <artifactId>explore-flink</artifactId>
>>>>>> <version>0.0.1-SNAPSHOT</version>
>>>>>> <packaging>jar</packaging>
>>>>>>
>>>>>> <name>explore-flink</name>
>>>>>> <url>http://maven.apache.org</url>
>>>>>>
>>>>>> <properties>
>>>>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>>>> <jdk.version>1.8</jdk.version>
>>>>>> <scala.binary.version>2.11</scala.binary.version>
>>>>>> <!-- <flink.version>1.8.0</flink.version> -->
>>>>>> <flink.version>1.9-SNAPSHOT</flink.version>
>>>>>> <junit.version>4.12</junit.version>
>>>>>> </properties>
>>>>>>
>>>>>> <dependencies>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-java</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-metrics-dropwizard</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> <scope>provided</scope>
>>>>>> </dependency>
>>>>>>
>>>>>> <!-- Table ecosystem -->
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>
>>>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>
>>>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>>
>>>>>> <dependency>
>>>>>> <groupId>org.fusesource.mqtt-client</groupId>
>>>>>> <artifactId>mqtt-client</artifactId>
>>>>>> <version>1.15</version>
>>>>>> <!-- <scope>provided</scope> -->
>>>>>> </dependency>
>>>>>>
>>>>>> <dependency>
>>>>>> <groupId>org.slf4j</groupId>
>>>>>> <artifactId>slf4j-api</artifactId>
>>>>>> <version>1.7.26</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.slf4j</groupId>
>>>>>> <artifactId>slf4j-log4j12</artifactId>
>>>>>> <version>1.7.26</version>
>>>>>> </dependency>
>>>>>>
>>>>>> <dependency>
>>>>>> <groupId>junit</groupId>
>>>>>> <artifactId>junit</artifactId>
>>>>>> <version>${junit.version}</version>
>>>>>> </dependency>
>>>>>> </dependencies>
>>>>>> <build>
>>>>>> <finalName>explore-flink</finalName>
>>>>>> <plugins>
>>>>>> <!-- download source code in Eclipse, best practice -->
>>>>>> <plugin>
>>>>>> <groupId>org.apache.maven.plugins</groupId>
>>>>>> <artifactId>maven-eclipse-plugin</artifactId>
>>>>>> <version>2.10</version>
>>>>>> <configuration>
>>>>>> <downloadSources>true</downloadSources>
>>>>>> <downloadJavadocs>false</downloadJavadocs>
>>>>>> </configuration>
>>>>>> </plugin>
>>>>>>
>>>>>> <!-- Set a compiler level -->
>>>>>> <plugin>
>>>>>> <groupId>org.apache.maven.plugins</groupId>
>>>>>> <artifactId>maven-compiler-plugin</artifactId>
>>>>>> <version>3.8.0</version>
>>>>>> <configuration>
>>>>>> <source>${jdk.version}</source>
>>>>>> <target>${jdk.version}</target>
>>>>>> </configuration>
>>>>>> </plugin>
>>>>>>
>>>>>> <!-- Maven Shade Plugin -->
>>>>>> <plugin>
>>>>>> <groupId>org.apache.maven.plugins</groupId>
>>>>>> <artifactId>maven-shade-plugin</artifactId>
>>>>>> <version>3.2.0</version>
>>>>>> <!-- Run shade goal on package phase -->
>>>>>> <executions>
>>>>>> <execution>
>>>>>> <phase>package</phase>
>>>>>> <goals>
>>>>>> <goal>shade</goal>
>>>>>> </goals>
>>>>>> <configuration>
>>>>>> <artifactSet>
>>>>>> <excludes>
>>>>>> <exclude>org.apache.flink:*</exclude>
>>>>>> <!-- Also exclude very big transitive dependencies of Flink WARNING:
>>>>>> You have to remove these excludes if your code relies on other
>>>>>> versions of
>>>>>> these dependencies. -->
>>>>>> <exclude>org.slf4j:*</exclude>
>>>>>> <exclude>log4j:*</exclude>
>>>>>> <exclude>com.typesafe:config:*</exclude>
>>>>>> <exclude>junit:junit:*</exclude>
>>>>>> <exclude>com.codahale.metrics:*</exclude>
>>>>>> </excludes>
>>>>>> </artifactSet>
>>>>>> <filters>
>>>>>> <filter>
>>>>>> <artifact>org.apache.flink:*</artifact>
>>>>>> <excludes>
>>>>>> <!-- exclude shaded google but include shaded curator -->
>>>>>> <exclude>org/apache/flink/shaded/com/**</exclude>
>>>>>> <exclude>web-docs/**</exclude>
>>>>>> </excludes>
>>>>>> </filter>
>>>>>> <filter>
>>>>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
>>>>>> this might cause SecurityExceptions when using the JAR. -->
>>>>>> <artifact>*:*</artifact>
>>>>>> <excludes>
>>>>>> <exclude>META-INF/*.SF</exclude>
>>>>>> <exclude>META-INF/*.DSA</exclude>
>>>>>> <exclude>META-INF/*.RSA</exclude>
>>>>>> </excludes>
>>>>>> </filter>
>>>>>> <filter>
>>>>>> <artifact>*:*</artifact>
>>>>>> <includes>
>>>>>> <include>org/apache/calcite/**</include>
>>>>>> <include>org/apache/flink/calcite/shaded/**</include>
>>>>>> <include>org/apache/flink/table/**</include>
>>>>>> <include>org.codehaus.commons.compiler.properties</include>
>>>>>> <include>org/codehaus/janino/**</include>
>>>>>> <include>org/codehaus/commons/**</include>
>>>>>> </includes>
>>>>>> </filter>
>>>>>> </filters>
>>>>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
>>>>>> the following lines. This will add a Main-Class entry to the manifest
>>>>>> file -->
>>>>>> <transformers>
>>>>>> <transformer
>>>>>>
>>>>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>>>> <mainClass>org.sense.flink.App</mainClass>
>>>>>> </transformer>
>>>>>> </transformers>
>>>>>> <createDependencyReducedPom>false</createDependencyReducedPom>
>>>>>> </configuration>
>>>>>> </execution>
>>>>>> </executions>
>>>>>> </plugin>
>>>>>> </plugins>
>>>>>> </build>
>>>>>> </project>
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>> *--*
>>>>>> *-- Felipe Gutierrez*
>>>>>>
>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>
>>>>>>> oh, yes. I just saw. I will use 1.9 then. thanks
>>>>>>>
>>>>>>> *--*
>>>>>>> *-- Felipe Gutierrez*
>>>>>>>
>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>>
>>>>>>>> It's because all blink codes are not shipped with 1.8.0, they
>>>>>>>> current only available in 1.9-SNAPSHOT.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> what are the artifacts that I have to import on maven in order to
>>>>>>>>> use Blink Api?
>>>>>>>>>
>>>>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use
>>>>>>>>> its ExecutionContext
>>>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>>>>>>> I want to do this in order to implement my own operator like it is
>>>>>>>>> implemented here
>>>>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>>>>>>> I guess if I import flink-table everything should come inside the 
>>>>>>>>> same jar
>>>>>>>>> as it is done here
>>>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says 
>>>>>>>>> that it
>>>>>>>>> is a missing artifact.
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>>>>>>> <version>1.8.0</version>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>>>>>> <version>1.8.0</version>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>>>>>> <version>1.8.0</version>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-table-common</artifactId>
>>>>>>>>> <version>1.8.0</version>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-table</artifactId>
>>>>>>>>> <version>1.8.0</version>
>>>>>>>>> <type>pom</type>
>>>>>>>>> <scope>provided</scope>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>>>>>> <version>1.8.0</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *--*
>>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>>
>>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Cool, thanks Kurt!
>>>>>>>>>> *-*
>>>>>>>>>> *- Felipe Gutierrez*
>>>>>>>>>>
>>>>>>>>>> *- skype: felipe.o.gutierrez*
>>>>>>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>>>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> You can checkout the bundle operator which used in Blink to
>>>>>>>>>>> perform similar thing you mentioned:
>>>>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Kurt
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I was trying to implement a better way to handle data skew
>>>>>>>>>>>> using Flink and I found this talk from #FlinkForward SF 2017: 
>>>>>>>>>>>> "Cliff
>>>>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they
>>>>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement 
>>>>>>>>>>>> the
>>>>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map 
>>>>>>>>>>>> phase,
>>>>>>>>>>>> before shuffling).
>>>>>>>>>>>>
>>>>>>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>>>>>>> operators that I can peek up to implement my on operator that 
>>>>>>>>>>>> deals with
>>>>>>>>>>>> data skew? Or maybe, is there someone that have an example of a 
>>>>>>>>>>>> use case
>>>>>>>>>>>> similar to this?
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> Felipe
>>>>>>>>>>>>
>>>>>>>>>>>> *--*
>>>>>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>>>>>
>>>>>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to