I think you can start from this (using flink table-api), I hope it could be
helpful:

PS:maybe someone could write a blog post on how to do this with Scala since
it's a frequent question on the mailing list... :)

public static void main(String[] args) throws Exception {
                String path = "file:/tmp/myFile.csv";
String rowDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER;
String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
                String[] fieldNames = "Column 1,Column 2,Column
3,Column4".split(fieldDelim);

Character quoteCharacter = '"';
boolean ignoreFirstLine = Boolean.TRUE;
String ignoreComments = null;
boolean lenient = Boolean.FALSE;

TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};

                ExecutionEnvironment env = ExecutionEnvironment.
getExecutionEnvironment();

CsvTableSource csvTableSource = new CsvTableSource(path,
fieldNames,fieldTypes,fieldDelim,
    rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
DataSet<Row> csvDataSet = csvTableSource.getDataSet(env);
JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql://
localhost/test?user=xxx&password=xxx") .setQuery("insert into %s (id,
title, author, price, qty) values (?,?,?,?,?)") .finish(); csvDataSet
.output(jdbcOutputFormat);
}

Best,
Flavio

On Tue, Oct 4, 2016 at 2:18 PM, ram kumar <ramkumar09...@gmail.com> wrote:

> Hi Guys,
>
> We are in the process of creating POC,
> I am looking for the sample project - Flink scala or java which can load
> data from database to database or
> CSV to relational database(any).
>
>
> CSV ----------> SQLSERVER  ----------> AWS Redshift
>
> could you please some one help me on that..
>
> Cheers
> Ram
>

Reply via email to