I mean no particular reason. Best, Kurt
On Wed, Apr 17, 2019 at 7:44 PM Kurt Young <ykt...@gmail.com> wrote: > There is no reason for it, the operator and function doesn't rely on the > logic which AbstractUdfStreamOperator supplied. > > Best, > Kurt > > > On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Thanks for the tip! I guess now it is working as it should be >> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L54> >> . >> >> Just one last question. Why did you decide to use >> "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am >> asking because I was basing my solution also (I also looked at your >> solution) on the "StreamFlatMap >> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java#L29>" >> class implementation. >> >> Best, >> Felipe >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Wed, Apr 17, 2019 at 4:13 AM Kurt Young <ykt...@gmail.com> wrote: >> >>> I think you might mixed some test codes with the operator. >>> "List<String> getOutputs()" is from "TestMapBundleFunction" and only used >>> for validation. >>> For the real usage, you need to write whatever records you want to emit >>> to the "collector" which passed in during "finishBundle". >>> >>> Best, >>> Kurt >>> >>> >>> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> >>>> Hi Kurt, >>>> >>>> How do you make the finishBundle >>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59> >>>> method returns the combined tuples? I saw that there is a method >>>> "List<String> getOutputs()" which is never called. >>>> >>>> I did an implementation >>>> <https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators> >>>> based on the example that you suggested. The MapBundleFunctionImpl >>>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53> >>>> class >>>> has the method finishBundle which iterate all the combined tuples and >>>> return it. However, my application does not continue to receive tuples >>>> after the transform method >>>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86> >>>> . >>>> >>>> Thanks, >>>> Felipe >>>> >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>> >>>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> I think you can simply copy the source codes to your project if maven >>>>> dependency can not be used. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>>> Hi again Kurt, >>>>>> >>>>>> could you please help me with the pom.xml file? I have included >>>>>> all Table ecosystem dependencies and the flink-table-runtime-blink as >>>>>> well. >>>>>> However the class org.apache.flink.table.runtime.context.ExecutionContext >>>>>> is still not found. I guess I am missing some dependency, but I do not >>>>>> know >>>>>> which. This is my pom.xml file. >>>>>> >>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>>>>> <modelVersion>4.0.0</modelVersion> >>>>>> >>>>>> <groupId>org.sense.flink</groupId> >>>>>> <artifactId>explore-flink</artifactId> >>>>>> <version>0.0.1-SNAPSHOT</version> >>>>>> <packaging>jar</packaging> >>>>>> >>>>>> <name>explore-flink</name> >>>>>> <url>http://maven.apache.org</url> >>>>>> >>>>>> <properties> >>>>>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >>>>>> <jdk.version>1.8</jdk.version> >>>>>> <scala.binary.version>2.11</scala.binary.version> >>>>>> <!-- <flink.version>1.8.0</flink.version> --> >>>>>> <flink.version>1.9-SNAPSHOT</flink.version> >>>>>> <junit.version>4.12</junit.version> >>>>>> </properties> >>>>>> >>>>>> <dependencies> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-java</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-clients_${scala.binary.version}</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-metrics-dropwizard</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> <scope>provided</scope> >>>>>> </dependency> >>>>>> >>>>>> <!-- Table ecosystem --> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> >>>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> >>>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-table-runtime-blink</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> </dependency> >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.fusesource.mqtt-client</groupId> >>>>>> <artifactId>mqtt-client</artifactId> >>>>>> <version>1.15</version> >>>>>> <!-- <scope>provided</scope> --> >>>>>> </dependency> >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.slf4j</groupId> >>>>>> <artifactId>slf4j-api</artifactId> >>>>>> <version>1.7.26</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.slf4j</groupId> >>>>>> <artifactId>slf4j-log4j12</artifactId> >>>>>> <version>1.7.26</version> >>>>>> </dependency> >>>>>> >>>>>> <dependency> >>>>>> <groupId>junit</groupId> >>>>>> <artifactId>junit</artifactId> >>>>>> <version>${junit.version}</version> >>>>>> </dependency> >>>>>> </dependencies> >>>>>> <build> >>>>>> <finalName>explore-flink</finalName> >>>>>> <plugins> >>>>>> <!-- download source code in Eclipse, best practice --> >>>>>> <plugin> >>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>> <artifactId>maven-eclipse-plugin</artifactId> >>>>>> <version>2.10</version> >>>>>> <configuration> >>>>>> <downloadSources>true</downloadSources> >>>>>> <downloadJavadocs>false</downloadJavadocs> >>>>>> </configuration> >>>>>> </plugin> >>>>>> >>>>>> <!-- Set a compiler level --> >>>>>> <plugin> >>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>> <artifactId>maven-compiler-plugin</artifactId> >>>>>> <version>3.8.0</version> >>>>>> <configuration> >>>>>> <source>${jdk.version}</source> >>>>>> <target>${jdk.version}</target> >>>>>> </configuration> >>>>>> </plugin> >>>>>> >>>>>> <!-- Maven Shade Plugin --> >>>>>> <plugin> >>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>> <artifactId>maven-shade-plugin</artifactId> >>>>>> <version>3.2.0</version> >>>>>> <!-- Run shade goal on package phase --> >>>>>> <executions> >>>>>> <execution> >>>>>> <phase>package</phase> >>>>>> <goals> >>>>>> <goal>shade</goal> >>>>>> </goals> >>>>>> <configuration> >>>>>> <artifactSet> >>>>>> <excludes> >>>>>> <exclude>org.apache.flink:*</exclude> >>>>>> <!-- Also exclude very big transitive dependencies of Flink WARNING: >>>>>> You have to remove these excludes if your code relies on other >>>>>> versions of >>>>>> these dependencies. --> >>>>>> <exclude>org.slf4j:*</exclude> >>>>>> <exclude>log4j:*</exclude> >>>>>> <exclude>com.typesafe:config:*</exclude> >>>>>> <exclude>junit:junit:*</exclude> >>>>>> <exclude>com.codahale.metrics:*</exclude> >>>>>> </excludes> >>>>>> </artifactSet> >>>>>> <filters> >>>>>> <filter> >>>>>> <artifact>org.apache.flink:*</artifact> >>>>>> <excludes> >>>>>> <!-- exclude shaded google but include shaded curator --> >>>>>> <exclude>org/apache/flink/shaded/com/**</exclude> >>>>>> <exclude>web-docs/**</exclude> >>>>>> </excludes> >>>>>> </filter> >>>>>> <filter> >>>>>> <!-- Do not copy the signatures in the META-INF folder. Otherwise, >>>>>> this might cause SecurityExceptions when using the JAR. --> >>>>>> <artifact>*:*</artifact> >>>>>> <excludes> >>>>>> <exclude>META-INF/*.SF</exclude> >>>>>> <exclude>META-INF/*.DSA</exclude> >>>>>> <exclude>META-INF/*.RSA</exclude> >>>>>> </excludes> >>>>>> </filter> >>>>>> <filter> >>>>>> <artifact>*:*</artifact> >>>>>> <includes> >>>>>> <include>org/apache/calcite/**</include> >>>>>> <include>org/apache/flink/calcite/shaded/**</include> >>>>>> <include>org/apache/flink/table/**</include> >>>>>> <include>org.codehaus.commons.compiler.properties</include> >>>>>> <include>org/codehaus/janino/**</include> >>>>>> <include>org/codehaus/commons/**</include> >>>>>> </includes> >>>>>> </filter> >>>>>> </filters> >>>>>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment >>>>>> the following lines. This will add a Main-Class entry to the manifest >>>>>> file --> >>>>>> <transformers> >>>>>> <transformer >>>>>> >>>>>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> >>>>>> <mainClass>org.sense.flink.App</mainClass> >>>>>> </transformer> >>>>>> </transformers> >>>>>> <createDependencyReducedPom>false</createDependencyReducedPom> >>>>>> </configuration> >>>>>> </execution> >>>>>> </executions> >>>>>> </plugin> >>>>>> </plugins> >>>>>> </build> >>>>>> </project> >>>>>> >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> *--* >>>>>> *-- Felipe Gutierrez* >>>>>> >>>>>> *-- skype: felipe.o.gutierrez* >>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>> >>>>>> >>>>>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez < >>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>> >>>>>>> oh, yes. I just saw. I will use 1.9 then. thanks >>>>>>> >>>>>>> *--* >>>>>>> *-- Felipe Gutierrez* >>>>>>> >>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt...@gmail.com> wrote: >>>>>>> >>>>>>>> It's because all blink codes are not shipped with 1.8.0, they >>>>>>>> current only available in 1.9-SNAPSHOT. >>>>>>>> >>>>>>>> Best, >>>>>>>> Kurt >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez < >>>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> what are the artifacts that I have to import on maven in order to >>>>>>>>> use Blink Api? >>>>>>>>> >>>>>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use >>>>>>>>> its ExecutionContext >>>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>. >>>>>>>>> I want to do this in order to implement my own operator like it is >>>>>>>>> implemented here >>>>>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>. >>>>>>>>> I guess if I import flink-table everything should come inside the >>>>>>>>> same jar >>>>>>>>> as it is done here >>>>>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>. >>>>>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says >>>>>>>>> that it >>>>>>>>> is a missing artifact. >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-table-planner_2.11</artifactId> >>>>>>>>> <version>1.8.0</version> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >>>>>>>>> <version>1.8.0</version> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>>>>>>> <version>1.8.0</version> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-table-common</artifactId> >>>>>>>>> <version>1.8.0</version> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-table</artifactId> >>>>>>>>> <version>1.8.0</version> >>>>>>>>> <type>pom</type> >>>>>>>>> <scope>provided</scope> >>>>>>>>> </dependency> >>>>>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT --> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-table-runtime-blink</artifactId> >>>>>>>>> <version>1.8.0</version> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> >>>>>>>>> *--* >>>>>>>>> *-- Felipe Gutierrez* >>>>>>>>> >>>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez < >>>>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Cool, thanks Kurt! >>>>>>>>>> *-* >>>>>>>>>> *- Felipe Gutierrez* >>>>>>>>>> >>>>>>>>>> *- skype: felipe.o.gutierrez* >>>>>>>>>> *- **https://felipeogutierrez.blogspot.com >>>>>>>>>> <https://felipeogutierrez.blogspot.com>* * >>>>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> You can checkout the bundle operator which used in Blink to >>>>>>>>>>> perform similar thing you mentioned: >>>>>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Kurt >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez < >>>>>>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I was trying to implement a better way to handle data skew >>>>>>>>>>>> using Flink and I found this talk from #FlinkForward SF 2017: >>>>>>>>>>>> "Cliff >>>>>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming >>>>>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they >>>>>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement >>>>>>>>>>>> the >>>>>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map >>>>>>>>>>>> phase, >>>>>>>>>>>> before shuffling). >>>>>>>>>>>> >>>>>>>>>>>> I need some help here. What are some of the Flink source-code >>>>>>>>>>>> operators that I can peek up to implement my on operator that >>>>>>>>>>>> deals with >>>>>>>>>>>> data skew? Or maybe, is there someone that have an example of a >>>>>>>>>>>> use case >>>>>>>>>>>> similar to this? >>>>>>>>>>>> >>>>>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835 >>>>>>>>>>>> [2] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> Felipe >>>>>>>>>>>> >>>>>>>>>>>> *--* >>>>>>>>>>>> *-- Felipe Gutierrez* >>>>>>>>>>>> >>>>>>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>>>>>> >>>>>>>>>>>