Hi!

I am currently working with Flink's Table API (Flink version 1.17, Java 11). I 
am pulling streaming data from a Kafka topic, processing it and I want to write 
the processed data to Azure Blob Storage. I am using the Filesystem SQL 
connector (following this page: 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/#full-example).

I set up the Azure Blob Storage path and credentials following this: 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/filesystems/azure/.
 This is a snippet of my code:


String accountName = "<account-name>";

String accountKey = "<account-key>";

EnvironmentSettings settings = EnvironmentSettings
      .newInstance()
       .inStreamingMode()
        .build();

TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().set("fs.azure.account.key." + accountName + 
".blob.core.windows.net", accountKey);
tEnv.getConfig().set("parallelism.default", "1");
tEnv.getConfig().set("table.exec.source.idle-timeout", "5000 ms");

tEnv.executeSql("CREATE TABLE clickStream (\n" +
                "    window_start TIMESTAMP(3),\n" +
                "    window_end TIMESTAMP(3),\n" +
                "    user_id INT,\n" +
                "    num_clicks BIGINT,\n" +
                "    num_products BIGINT,\n" +
                "    total_amount_spent DOUBLE,\n" +
                "    made_checkout INT\n" +
                ") PARTITIONED BY (window_start) WITH (\n" +
                "    'connector' = 'filesystem',\n" +
                "    'path' = 
'wasb://<container_name>@$<storage_account_name>.blob.core.windows.net/stream-data/',\n"
 +
                "    'format' = 'json'\n" +
                ");");

tEnv.executeSql("INSERT INTO clickStream ... //data to insert");

When I run the program, there are no exceptions thrown but no files are being 
created in Blob Storage. I tried storing the data to a local folder and Azure 
SQL database and that works, so I am guessing there is some problem with 
connecting to the Blob Storage. I enabled public network access and container 
access, so if you have any ideas what could be causing the problem, I would 
really appreciate the help.
Also, here is a list of dependencies I have (relevant for this, all flink 
dependencies are version 1.17.0): flink-table-api-java-bridge, 
flink-table-planner_2.12, flink-clients, flink-json, json, 
flink-connector-files, hadoop-common (3.3.6), flink-azure-fs-hadoop, 
hadoop-client (3.3.6), flink-hadoop-fs, flink-hadoop-compatibility_2.12.

Thank you,
Dora


Reply via email to