I think you can start from this (using flink table-api), I hope it could be helpful:
PS:maybe someone could write a blog post on how to do this with Scala since it's a frequent question on the mailing list... :) public static void main(String[] args) throws Exception { String path = "file:/tmp/myFile.csv"; String rowDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER; String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER; String[] fieldNames = "Column 1,Column 2,Column 3,Column4".split(fieldDelim); Character quoteCharacter = '"'; boolean ignoreFirstLine = Boolean.TRUE; String ignoreComments = null; boolean lenient = Boolean.FALSE; TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; ExecutionEnvironment env = ExecutionEnvironment. getExecutionEnvironment(); CsvTableSource csvTableSource = new CsvTableSource(path, fieldNames,fieldTypes,fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient); DataSet<Row> csvDataSet = csvTableSource.getDataSet(env); JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql:// localhost/test?user=xxx&password=xxx") .setQuery("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)") .finish(); csvDataSet .output(jdbcOutputFormat); } Best, Flavio On Tue, Oct 4, 2016 at 2:18 PM, ram kumar <ramkumar09...@gmail.com> wrote: > Hi Guys, > > We are in the process of creating POC, > I am looking for the sample project - Flink scala or java which can load > data from database to database or > CSV to relational database(any). > > > CSV ----------> SQLSERVER ----------> AWS Redshift > > could you please some one help me on that.. > > Cheers > Ram >