Hi, When I proceed further to timePrediction exercise ( http://training.data-artisans.com/exercises/timePrediction.html), I realize that the nycTaxiRides.gz's format is fine. The problem is in TaxiRide.toString(), the columns were serialized in wrong order. Hence the data persisted in Kafka has wrong format. Therefore I change TaxiRide.toString() to the following:
public String toString() { StringBuilder sb = new StringBuilder(); sb.append(rideId).append(","); sb.append(isStart ? "START" : "END").append(","); sb.append(startTime.toString(timeFormatter)).append(","); sb.append(endTime.toString(timeFormatter)).append(","); sb.append(startLon).append(","); sb.append(startLat).append(","); sb.append(endLon).append(","); sb.append(endLat).append(","); sb.append(passengerCnt).append(","); sb.append(taxiId).append(","); sb.append(driverId); return sb.toString(); } This is a UTF-8 formatted mail ----------------------------------------------- James C.-C.Yu +886988713275 2018-03-23 9:59 GMT+08:00 James Yu <cyu...@gmail.com>: > Just figured out the data format in nycTaxiRides.gz doesn't match to the > way TaxiRide.java interpreting the lines fed into it. > Then I check the exercise training github and found the TaxiRide.java ( > https://github.com/dataArtisans/flink-training-exercises/tree/master/src/ > main/java/com/dataartisans/flinktraining/exercises/ > datastream_java/datatypes) was recently updated (like 11 days ago). > After making some changes to TaxiRide.java, the example works like a charm. > > I got the nycTaxiRides.gz by issuing this line in console: > wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz > > Following is the changes I made to TaxiRide.java locally (basically just > the index to variable tokens): > try { > ride.rideId = Long.parseLong(tokens[0]); > > switch (tokens[3]) { > case "START": > ride.isStart = true; > ride.startTime = DateTime.parse(tokens[4], timeFormatter); > ride.endTime = DateTime.parse(tokens[5], timeFormatter); > break; > case "END": > ride.isStart = false; > ride.endTime = DateTime.parse(tokens[4], timeFormatter); > ride.startTime = DateTime.parse(tokens[5], timeFormatter); > break; > default: > throw new RuntimeException("Invalid record: " + line); > } > > ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : > 0.0f; > ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : > 0.0f; > ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f; > ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f; > ride.passengerCnt = Short.parseShort(tokens[10]); > ride.taxiId = Long.parseLong(tokens[1]); > ride.driverId = Long.parseLong(tokens[2]); > > } catch (NumberFormatException nfe) { > throw new RuntimeException("Invalid record: " + line, nfe); > } > > > This is a UTF-8 formatted mail > ----------------------------------------------- > James C.-C.Yu > +886988713275 <+886%20988%20713%20275> > > 2018-03-23 8:06 GMT+08:00 James Yu <cyu...@gmail.com>: > >> Hi, >> >> I fail to run the PopularPlacesFromKafka example with the following >> exception, and I wonder what might cause this "Invalid record" error? >> >> when running within Intellij IDEA --> >> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO >> org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> >> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to >> FAILED. >> java.lang.RuntimeException: Invalid record: 4010,2013003778 >> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01 >> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1 >> at com.dataartisans.flinktraining.exercises.datastream_java. >> datatypes.TaxiRide.fromString(TaxiRide.java:119) >> ~[flink-training-exercises-0.15.1.jar:na] >> at com.dataartisans.flinktraining.exercises.datastream_java. >> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37) >> ~[flink-training-exercises-0.15.1.jar:na] >> at com.dataartisans.flinktraining.exercises.datastream_java. >> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28) >> ~[flink-training-exercises-0.15.1.jar:na] >> at org.apache.flink.streaming.util.serialization.KeyedDeseriali >> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42) >> ~[flink-training-exercises-0.15.1.jar:na] >> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09 >> Fetcher.runFetchLoop(Kafka09Fetcher.java:139) >> ~[flink-training-exercises-0.15.1.jar:na] >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum >> erBase.run(FlinkKafkaConsumerBase.java:652) >> ~[flink-training-exercises-0.15.1.jar:na] >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) >> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) >> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> ~[flink-runtime_2.11-1.4.2.jar:1.4.2] >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] >> >> when deploy to and run on local cluster --> >> 2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO >> org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> >> Map (1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to >> FAILED. >> java.lang.RuntimeException: Invalid record: 2264,2013002216 >> <(201)%20300-2216>,2013002213 <(201)%20300-2213>,START,2013-01-01 >> 00:09:00,1970-01-01 00:00:00,-74.00402,40.742107,-73.98032,40.73522,1 >> at com.dataartisans.flinktraining.exercises.datastream_java. >> datatypes.TaxiRide.fromString(TaxiRide.java:119) >> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f >> aa39e6b27477bb80f66178:na] >> at com.dataartisans.flinktraining.exercises.datastream_java. >> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37) >> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f >> aa39e6b27477bb80f66178:na] >> at com.dataartisans.flinktraining.exercises.datastream_java. >> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28) >> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f >> aa39e6b27477bb80f66178:na] >> at org.apache.flink.streaming.util.serialization.KeyedDeseriali >> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42) >> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f >> aa39e6b27477bb80f66178:na] >> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09 >> Fetcher.runFetchLoop(Kafka09Fetcher.java:139) >> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f >> aa39e6b27477bb80f66178:na] >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum >> erBase.run(FlinkKafkaConsumerBase.java:652) >> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f >> aa39e6b27477bb80f66178:na] >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) >> ~[flink-dist_2.11-1.4.2.jar:1.4.2] >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >> ~[flink-dist_2.11-1.4.2.jar:1.4.2] >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) >> ~[flink-dist_2.11-1.4.2.jar:1.4.2] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >> ~[flink-dist_2.11-1.4.2.jar:1.4.2] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> ~[flink-dist_2.11-1.4.2.jar:1.4.2] >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] >> >> I copied the PopularPlacesFromKafka.java from >> https://raw.githubusercontent.com/dataArtisans/flink-trainin >> g-exercises/master/src/main/java/com/dataartisans/flinktra >> ining/exercises/datastream_java/connectors/PopularPlacesFromKafka.java >> >> >> This is a UTF-8 formatted mail >> ----------------------------------------------- >> James C.-C.Yu >> +886988713275 <+886%20988%20713%20275> >> > >