I changed the ".map(...)" and ".print()" terminal statement to : .executeAndCollect() .forEachRemaining(System.out::println);
The warnings were replaced with: WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by com.twitter.chill.java.ArraysAsListSerializer (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar) to field java.util.Arrays$ArrayList.a WARNING: Please consider reporting this to the maintainers of com.twitter.chill.java.ArraysAsListSerializer WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release On Tue, Nov 29, 2022 at 3:25 PM Curtis Jensen <curtis.jen...@gmail.com> wrote: > > Hello, > > Using Flink version 1.15.0, I recieve these warnings when trying a > small example (code below): > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.java.ClosureCleaner > (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar) > to field java.lang.String.value > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further > illegal reflective access operations > WARNING: All illegal access operations will be denied in a future release > > I am undoubtedly doing something incorrectly, but felt that it may be > useful to take the advice "Please consider reporting this to the > maintainers of org.apache.flink.api.java.ClosureCleaner". > Also, any corrections to my example would be appreciated. > > Thanks, > Curtis > > > > > AvgAmount.java > > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.KeyedStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > > public class AvgAmount { > > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > > DataStream<ExampleData.PurchaseEvent> purchaseStream = > env.fromElements(ExampleData.PURCHASE_EVENTS); > KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event -> > event.account_id); > keyedPurchaseStream.process(new PurchaseEventProcessor()) > .map(stats -> stats.toString()) > .print(); > } > > public static class PurchaseStats { > public String accountId; > public long amountSum; > public long amountCount; > > public PurchaseStats(String accountId) { > this.accountId = accountId; > } > > public void addAmount(long amount) { > amountSum += amount; > amountCount += 1; > } > > @Override > public String toString() { > return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}", > accountId, (double)amountSum/(double)amountCount); > } > } > > public static class PurchaseEventProcessor extends > KeyedProcessFunction<String, ExampleData.PurchaseEvent, PurchaseStats> > { > ValueState<PurchaseStats> seen; > > @Override > public void open(Configuration parameters) { > seen = getRuntimeContext().getState(new > ValueStateDescriptor<>("seen", PurchaseStats.class)); > } > > @Override > public void processElement(ExampleData.PurchaseEvent > purchaseEvent, KeyedProcessFunction<String, ExampleData.PurchaseEvent, > PurchaseStats>.Context context, Collector<PurchaseStats> out) throws > Exception { > PurchaseStats currentStats = seen.value(); > if (currentStats == null) { > currentStats = new PurchaseStats(purchaseEvent.account_id); > } > > currentStats.addAmount(purchaseEvent.amount); > > seen.update(currentStats); > out.collect(currentStats); > } > } > } > > ExampleData.java > > import org.apache.flink.types.Row; > import org.apache.flink.types.RowKind; > > import java.time.Instant; > > public class ExampleData { > public static final PurchaseEvent[] PURCHASE_EVENTS = > new PurchaseEvent[] { > new PurchaseEvent("1337Gamer", "192.168.0.1", 1000), > new PurchaseEvent("1337", "127.0.0.1", 1000), > new PurchaseEvent("1337", "127.0.0.2", 100), > new PurchaseEvent("1337", "127.0.0.1", 9900) > }; > > public static class PurchaseEvent { > public long timestamp; > public String account_id; > public String ip; > public long amount; > > public PurchaseEvent() { } > > public PurchaseEvent(String accountId, String ip, long amount) { > this(Instant.now().getEpochSecond(), accountId, ip, amount); > } > > public PurchaseEvent(long timestamp, String accountId, String > ip, long amount) { > this.timestamp = timestamp; > this.account_id = accountId; > this.ip = ip; > this.amount = amount; > } > } > }