Yep, sorry, my fault.
 
I see now that this is not enough to just start cluster; taskmanager have to be started as well.
How I start it (Github CLI):
 
export FLINK_HOME=/C/Flink/flink-1.15.0
$FLINK_HOME/bin/start-cluster.sh
 
Flink is running (I can reach it going to http://localhost:8081)
 
I know that now I should start jobmanager, so i execute"
$FLINK_HOME/bin/jobmanager.sh start
 
I get message:
 
[INFO] 1 instance(s) of standalonesession are already running on MNKH-WjKRv8Leb8.
Starting standalonesession daemon on host MNKH-WjKRv8Leb8.
 
So it's OK I think
 
Now I should start taskmanager:
 
$FLINK_HOME/bin/taskmanager.sh start
Starting taskexecutor daemon on host MNKH-WjKRv8Leb8
 
But I see in logs this process fail:
 
...
2022-07-08 14:07:32,403 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils        [] - Actor system started at akka.tcp://flink@localhost:63030
2022-07-08 14:07:32,427 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Terminating TaskManagerRunner with exit code 1.
org.apache.flink.util.FlinkException: Failed to start the TaskManagerRunner.
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:483) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:525) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:525) [flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:505) [flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:463) [flink-dist-1.15.0.jar:1.15.0]
Caused by: java.io.IOException: Could not create the working directory C:\Users\MIKE~1\AppData\Local\Temp\tm_localhost:63030-ddbffb.
    at org.apache.flink.runtime.entrypoint.WorkingDirectory.createDirectory(WorkingDirectory.java:58) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.entrypoint.WorkingDirectory.<init>(WorkingDirectory.java:39) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.entrypoint.WorkingDirectory.create(WorkingDirectory.java:88) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils.lambda$createTaskManagerWorkingDirectory$0(ClusterEntrypointUtils.java:152) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.entrypoint.DeterminismEnvelope.map(DeterminismEnvelope.java:49) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils.createTaskManagerWorkingDirectory(ClusterEntrypointUtils.java:150) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:210) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) ~[flink-dist-1.15.0.jar:1.15.0]
    ... 5 more
 
I'm trying to find the reason.
 
 
Sent: Friday, July 08, 2022 at 12:21 PM
From: "Alexander Fedulov" <alexan...@ververica.com>
To: pod...@gmx.com
Cc: "user" <user@flink.apache.org>
Subject: Re: Is Flink able to read a CSV file or just like in Blink this function does not work?
Hi Mike,
 
please do not forget to include the mailing list address/reply to all, otherwise, this becomes a private conversation.
How do you start your cluster?
 
Best,
Alex
 
 
On Fri, Jul 8, 2022 at 10:45 AM <pod...@gmx.com> wrote:
Thanks for support.
As you suggested, I did run it in IDE:
 
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Flink/flink-1.15.0/lib/flink-dist-1.15.0.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
+----------------------+
|           some_value |
+----------------------+
|                    3 |
+----------------------+
1 row in set
 
So it works (path is 'file:///c:/Temp/test4.txt' - when path was wrong I got error... unfortunately Flink do not display any error in such case)
 
You say 'Check in the UI that after you start your cluster you have TaskManagers registered successfully'.
If I go to 'Task Managers' managers menu (http://localhost:8081/#/task-manager) I do not see any - list is empty.
 
No idea what it should be there or how to make one.
 
Sent: Thursday, July 07, 2022 at 4:27 PM
From: "Alexander Fedulov" <alexan...@ververica.com>
To: pod...@gmx.com
Subject: Re: Is Flink able to read a CSV file or just like in Blink this function does not work?
Did you try running in an IDE?
Check in the UI that after you start your cluster you have TaskManagers registered successfully and they have enough slots to fulfill your job parallelism requirements.

Best,
Alexander Fedulov
 
On Thu, Jul 7, 2022 at 2:53 PM <pod...@gmx.com> wrote:
 
Thank you very much Alex for your help.
My environment is Windows and Github CLI.
 
I tried different directory syntax:
'file:///c:/temp/test4.txt'
'file:///c://temp//test4.txt'
'//c:/temp/test4.txt'
'c:/temp/test4.txt'
'c:\\temp\\test4.txt'
...
dozens of others
Nothing helps
 
I run this job with command:
$ $FLINK_HOME/bin/flink run c:/Temp/flinkTest2.jar
 
$ $FLINK_HOME/bin/flink run c:/Temp/flinkTest2.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Flink/flink-1.15.0/lib/flink-dist-1.15.0.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID a9eaf86337ca14d0d80e6d6ef70dab6e
 
 
 
But maybe problem is in another place - even if i put non existing path - there is no error; job is subited to Flink, has status RUNNING and it fails after 5 minuts or so.
 
 
In console I see:
NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
 
 
 
Sent: Thursday, July 07, 2022 at 1:10 AM
From: "Alexander Fedulov" <alexan...@ververica.com>
To: pod...@gmx.com
Cc: "user" <user@flink.apache.org>
Subject: Re: Is Flink able to read a CSV file or just like in Blink this function does not work?
Hi Mike,
 
I do not see any issues with your code. With a sample csv file like this
a,1.0
b,2.0
c,3.0
 
it produces the expected result

+----------------------+
|           some_value |
+----------------------+
|                    3 |
+----------------------+
1 row in set

Process finished with exit code 0
 
--
 
I do not use Windows, so I am not 100% sure what is the correct way to specify the file path there, however, some observations:
- You are using a forward slash notation, I assume you are running in WSL or using cygwin or something like that?  Notice that in that notation an additional forward slash is missing 'file:///foo/bar'
- Omitting the schema works with the local filesystem by default. On *nix systems this works fine  'connector.path' = '/home/alex/tmp/test.csv' . Maybe you could just try a standard windows backward slash notation instead.
- That said, when a non-existent file is specified, the job fails immediately, so I would actually expect that behavior if the issue was indeed with the file path. 
Which version of Flink are you running?
 
Best,
Alexander Fedulov
 
 
On Wed, Jul 6, 2022 at 10:39 PM <pod...@gmx.com> wrote:
If I'm reading Flink manul correctly (and this is not simple - no examples), this code should read CSV file:
 
 
package flinkTest2;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
 
public class flinkTest2 {

    public static void main(String[] args) throws Exception {

    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            //.inStreamingMode()
            .inBatchMode()
            .build();
    final TableEnvironment tEnv = TableEnvironment.create(settings);
    
    tEnv.executeSql("CREATE TABLE test_table (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file://C/temp/test4.txt', 'format.type' = 'csv')");
    
    tEnv.sqlQuery("SELECT COUNT(*) AS some_value FROM test_table")
    .execute()
    .print();
 }
}
 
Job is running and running and running...
I think Flink is not able to open file. Does not matter what is here: 'connector.path' = 'file://C/temp/test4.txt' Flink does not display any error or something. 'connector.path' = 'file:blah/blah/blah' could be fine for flink as well.
 
Anyone could help me with that?
Thanks
Mike
 

Reply via email to