Hi team, Sent from Mail for Windows |
import org.apache.flink.connector.file.src.FileSourceSplit import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat import org.apache.flink.table.types.logical.RowType import models.EnrichElementMapperFunction.MemberDescriptor import models.{AccidentClaim, EnrichElement, EnrichElementMapperFunction, Members} import org.apache.flink.api.common.serialization.{BulkWriter, Encoder, SimpleStringEncoder} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.connector.file.sink.FileSink.RowFormatBuilder import org.apache.flink.connector.jdbc.catalog.JdbcCatalog import org.apache.flink.core.fs import org.apache.flink.formats.parquet.avro.ParquetAvroWriters import org.apache.flink.formats.parquet.{ParquetBuilder, ParquetBulkWriter, ParquetColumnarRowInputFormat, ParquetFileFormatFactory, ParquetWriterFactory} import org.apache.flink.runtime.state.hashmap.HashMapStateBackend import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, CoProcessFunction} import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder import org.apache.flink.streaming.api.functions.sink.filesystem.{OutputFileConfig, StreamingFileSink} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.Collector import org.slf4j.Logger import org.slf4j.LoggerFactory import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder import org.apache.flink.table.types.logical.RowType import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetWriter} import org.apache.parquet.io.OutputFile import org.apache.flink.formats.parquet.ParquetBulkWriter
object WindowWordCount2 { def register_table_source(st_env:StreamTableEnvironment) { Class.forName("org.postgresql.Driver") val name = "postgres" val default_database = "postgres" val username = "postgres" val password = "postgres" val base_url = "jdbc:postgresql://postgres:5432/" val catalog = new JdbcCatalog(name, default_database, username, password, base_url) st_env.registerCatalog("postgresql", catalog) } def toInt(s: String): Option[Int] = { try { Some(s.toInt) } catch { case e: Exception => None } } def main(args: Array[String]) { implicit val typeInfo = TypeInformation.of(classOf[(String)]) val env = StreamExecutionEnvironment.getExecutionEnvironment // final BulkWriter.Factory<Tuple2<String, Integer>> writer, env.setStateBackend(new HashMapStateBackend) env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("s3://quynh-demo-flink/check_points/")) env.enableCheckpointing(1000) env.setParallelism(1) val st_env = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance() .inStreamingMode() .build()) val configuration = st_env.getConfig() configuration.setNullCheck(false) println("Creating catalog") register_table_source(st_env) st_env.executeSql(""" CREATE TABLE accident_claims WITH ( 'connector'='kafka', 'topic'='pg_claims.claims.accident_claims', 'properties.bootstrap.servers'='kafka:9092', 'properties.group.id'='test-consumer-group', 'format'='debezium-json', 'scan.startup.mode'='earliest-offset' ) LIKE postgresql.postgres.`claims.accident_claims` ( EXCLUDING OPTIONS )""" ) st_env.executeSql(""" CREATE TABLE members WITH ( 'connector'='kafka', 'topic'='pg_claims.claims.members', 'properties.bootstrap.servers'='kafka:9092', 'properties.group.id'='test-consumer-group', 'format'='debezium-json', 'scan.startup.mode'='earliest-offset' ) LIKE postgresql.postgres.`claims.members` ( EXCLUDING OPTIONS )""" ) // val resultTable = st_env.sqlQuery("SELECT t1.member_id, t1.claim_total_receipt, t2.id, t2.first_name, t2.last_name FROM (SELECT member_id, claim_total_receipt FROM accident_claims) t1 LEFT JOIN (SELECT * FROM members) t2 ON t1.member_id = t2.id") // val resultTable = st_env.sqlQuery("SELECT * FROM accident_claims") val st_accident_claims = st_env.from("accident_claims") val st_members = st_env.from("members") // interpret the insert-only Table as a DataStream again val ds_accident_claims = st_env.toChangelogStream(st_accident_claims) val ds_members = st_env.toChangelogStream(st_members) val dsConvertFromChangelogMembers:DataStream[(RowKind,Members)] = ds_members.map(x => (x.getKind, new Members(x.getField("id").asInstanceOf[Int], Some(x.getField("first_name").toString), Some(x.getField("last_name").toString), Some(x.getField("address").toString), Some(x.getField("address_city").toString), Some(x.getField("address_country").toString), Some(x.getField("insurance_company").toString), Some(x.getField("insurance_number").toString), Some(x.getField("ts_created").toString), Some(x.getField("ts_updated").toString)))).uid("memberdatastream").filter(_._1 != RowKind.UPDATE_BEFORE) val ds_member_broadcast = dsConvertFromChangelogMembers.broadcast(EnrichElementMapperFunction.MemberDescriptor) val dsConvertFromChangelogAccidentClaim:DataStream[(RowKind, AccidentClaim)] = ds_accident_claims.map(x => (x.getKind, new AccidentClaim(x.getField("claim_id").asInstanceOf[Int], Some(x.getField("claim_total").asInstanceOf[Double]), Some(x.getField("claim_total_receipt").asInstanceOf[String]), Some(x.getField("claim_currency").toString), Some(x.getField("member_id").asInstanceOf[Int]), Some(x.getField("accident_date").toString), Some(x.getField("accident_type").toString), Some(x.getField("accident_detail").toString), Some(x.getField("claim_date").toString), Some(x.getField("claim_status").toString)))).uid("accidentcliamdatastream").filter(_._1 != RowKind.UPDATE_BEFORE) // val resultStream = dsConvertFromChangelogAccidentClaim.connect(ds_member_broadcast) // .process(new EnrichEventLocationMapperFunction).uid("connectandprocess") // // // ds_members.print() // dsConvertFromChangelogAccidentClaim.print() val resultStream = dsConvertFromChangelogAccidentClaim.connect(ds_member_broadcast) .process(new EnrichEventLocationMapperFunction).uid("connectandprocess") // resultStream.print() // val resultStreamEncode = resultStream.map((rowKind, enrichElement) => Types.Row(rowKind, )) // val factory = new ParquetWriterFactory[(RowKind, EnrichElement)](new ParquetBuilder[(RowKind, EnrichElement)] { // override def createWriter(out: OutputFile): ParquetWriter[(RowKind, EnrichElement)] = { // val parquetWrite = new ParquetWriter[(RowKind, EnrichElement)]() // } // }) // val buider:ParquetBuilder[(RowKind, EnrichElement)] // val builder:ParquetWriterFactory[(RowKind, EnrichElement)] = // val builder<BulkWriter.Factory>() = ParquetWriterFactory(ParquetBuilder.cre) // val rowType = RowType.of((RowKind, EnrichElement)) // val format = new ParquetColumnarRowInputFormat[SplitT](new Nothing, RowType.of(fieldTypes, Array[String]("f7", "f4", "f99")), 500, false, true) val outputPath = new fs.Path("s3://quynh-demo-flink/quynh_test_5/") val fileSink:StreamingFileSink[(RowKind,EnrichElement)] = StreamingFileSink .forRowFormat(outputPath, new SimpleStringEncoder[(RowKind,EnrichElement)]("UTF-8")) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("pre").withPartSuffix(".csv").build()) .build() resultStream.addSink(fileSink).uid("addsink") env.execute("Window Stream WordCount") } } case class EnrichEventLocationMapperFunction() extends BroadcastProcessFunction[(RowKind, AccidentClaim), (RowKind, Members), (RowKind, EnrichElement)]() { override def processElement(value: (RowKind, AccidentClaim), ctx: BroadcastProcessFunction[(RowKind,AccidentClaim), (RowKind,Members), (RowKind, EnrichElement)]#ReadOnlyContext, out: Collector[(RowKind,EnrichElement)]): Unit = { val (rowKind, accidentClaim) = value print("Get value from ssh") print(value) val memberState = ctx.getBroadcastState(MemberDescriptor) val memberPk = accidentClaim.memberId.get val member:Members = memberState.get(memberPk) val enrichElemenet = EnrichElement(accidentClaim, member) out.collect(rowKind,enrichElemenet) } override def processBroadcastElement(value: (RowKind, Members), ctx: BroadcastProcessFunction[(RowKind,AccidentClaim), (RowKind,Members), (RowKind,EnrichElement)]#Context, collector: Collector[(RowKind,EnrichElement)]): Unit = { val memberState = ctx.getBroadcastState(MemberDescriptor) value match { case (RowKind.DELETE, member) => memberState.remove(member.id) case (_, member) => memberState.put(member.id, member) } val (rowKind, member) = value val enrichElemenet = EnrichElement(accidentClaim, member) collector.collect(rowKind, EnrichElement()) } }