You do not need to run anything beyond start-cluster for a simple test. It
starts one jobmanager (this is what also hosts the UI you see) and one
taskmanager:

[image: image.png]


What does Github CLI has to do with the Flink startup?




On Fri, Jul 8, 2022 at 2:25 PM <pod...@gmx.com> wrote:

> Yep, sorry, my fault.
>
> I see now that this is not enough to just start cluster; taskmanager have
> to be started as well.
> How I start it (Github CLI):
>
> export FLINK_HOME=/C/Flink/flink-1.15.0
> $FLINK_HOME/bin/start-cluster.sh
>
> Flink is running (I can reach it going to http://localhost:8081)
>
> I know that now I should start jobmanager, so i execute"
> $FLINK_HOME/bin/jobmanager.sh start
>
> I get message:
>
> [INFO] 1 instance(s) of standalonesession are already running on
> MNKH-WjKRv8Leb8.
> Starting standalonesession daemon on host MNKH-WjKRv8Leb8.
>
> So it's OK I think
>
> Now I should start taskmanager:
>
> $FLINK_HOME/bin/taskmanager.sh start
> Starting taskexecutor daemon on host MNKH-WjKRv8Leb8
>
> But I see in logs this process fail:
>
> ...
> 2022-07-08 14:07:32,403 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils        [] - Actor
> system started at akka.tcp://flink@localhost:63030
> 2022-07-08 14:07:32,427 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -
> Terminating TaskManagerRunner with exit code 1.
> org.apache.flink.util.FlinkException: Failed to start the
> TaskManagerRunner.
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:483)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:525)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:525)
> [flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:505)
> [flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:463)
> [flink-dist-1.15.0.jar:1.15.0]
> Caused by: java.io.IOException: Could not create the working directory
> C:\Users\MIKE~1\AppData\Local\Temp\tm_localhost:63030-ddbffb.
>     at
> org.apache.flink.runtime.entrypoint.WorkingDirectory.createDirectory(WorkingDirectory.java:58)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.entrypoint.WorkingDirectory.<init>(WorkingDirectory.java:39)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.entrypoint.WorkingDirectory.create(WorkingDirectory.java:88)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils.lambda$createTaskManagerWorkingDirectory$0(ClusterEntrypointUtils.java:152)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.entrypoint.DeterminismEnvelope.map(DeterminismEnvelope.java:49)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils.createTaskManagerWorkingDirectory(ClusterEntrypointUtils.java:150)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:210)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481)
> ~[flink-dist-1.15.0.jar:1.15.0]
>     ... 5 more
>
> I'm trying to find the reason.
>
>
> *Sent:* Friday, July 08, 2022 at 12:21 PM
> *From:* "Alexander Fedulov" <alexan...@ververica.com>
> *To:* pod...@gmx.com
> *Cc:* "user" <user@flink.apache.org>
> *Subject:* Re: Is Flink able to read a CSV file or just like in Blink
> this function does not work?
> Hi Mike,
>
> please do not forget to include the mailing list address/reply to all,
> otherwise, this becomes a private conversation.
> How do you start your cluster?
>
> Best,
> Alex
>
>
> On Fri, Jul 8, 2022 at 10:45 AM <pod...@gmx.com> wrote:
>
>> Thanks for support.
>> As you suggested, I did run it in IDE:
>>
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by
>> org.apache.flink.api.java.ClosureCleaner
>> (file:/C:/Flink/flink-1.15.0/lib/flink-dist-1.15.0.jar) to field
>> java.lang.String.value
>> WARNING: Please consider reporting this to the maintainers of
>> org.apache.flink.api.java.ClosureCleaner
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future release
>> +----------------------+
>> |           some_value |
>> +----------------------+
>> |                    3 |
>> +----------------------+
>> 1 row in set
>>
>> So it works (path is 'file:///c:/Temp/test4.txt' - when path was wrong I
>> got error... unfortunately Flink do not display any error in such case)
>>
>> You say 'Check in the UI that after you start your cluster you have
>> TaskManagers registered successfully'.
>> If I go to 'Task Managers' managers menu (
>> http://localhost:8081/#/task-manager) I do not see any - list is empty.
>>
>> No idea what it should be there or how to make one.
>>
>> *Sent:* Thursday, July 07, 2022 at 4:27 PM
>> *From:* "Alexander Fedulov" <alexan...@ververica.com>
>> *To:* pod...@gmx.com
>> *Subject:* Re: Is Flink able to read a CSV file or just like in Blink
>> this function does not work?
>> Did you try running in an IDE?
>> Check in the UI that after you start your cluster you have TaskManagers
>> registered successfully and they have enough slots to fulfill your job
>> parallelism requirements.
>>
>> Best,
>> Alexander Fedulov
>>
>> On Thu, Jul 7, 2022 at 2:53 PM <pod...@gmx.com> wrote:
>>
>>>
>>> Thank you very much Alex for your help.
>>> My environment is Windows and Github CLI.
>>>
>>> I tried different directory syntax:
>>> 'file:///c:/temp/test4.txt'
>>> 'file:///c://temp//test4.txt'
>>> '//c:/temp/test4.txt'
>>> 'c:/temp/test4.txt'
>>> 'c:\\temp\\test4.txt'
>>> ...
>>> dozens of others
>>> Nothing helps
>>>
>>> I run this job with command:
>>> $ $FLINK_HOME/bin/flink run c:/Temp/flinkTest2.jar
>>>
>>> $ $FLINK_HOME/bin/flink run c:/Temp/flinkTest2.jar
>>> WARNING: An illegal reflective access operation has occurred
>>> WARNING: Illegal reflective access by
>>> org.apache.flink.api.java.ClosureCleaner
>>> (file:/C:/Flink/flink-1.15.0/lib/flink-dist-1.15.0.jar) to field
>>> java.lang.String.value
>>> WARNING: Please consider reporting this to the maintainers of
>>> org.apache.flink.api.java.ClosureCleaner
>>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>>> reflective access operations
>>> WARNING: All illegal access operations will be denied in a future release
>>> Job has been submitted with JobID a9eaf86337ca14d0d80e6d6ef70dab6e
>>>
>>>
>>>
>>> But maybe problem is in another place - even if i put non existing path
>>> - there is no error; job is subited to Flink, has status RUNNING and it
>>> fails after 5 minuts or so.
>>>
>>> Source: Custom File source: FAILED
>>> CsvTableSource(read fields: ) -> SourceConversion[1] ->
>>> HashAggregate[2]:  CANCELED
>>> HashAggregate[4] -> ConstraintEnforcer[5] -> Sink: Collect table sink:
>>> CANCELED
>>>
>>> In console I see:
>>> NoResourceAvailableException: Slot request bulk is not fulfillable!
>>> Could not allocate the required slot within slot request timeout
>>>
>>>
>>>
>>> *Sent:* Thursday, July 07, 2022 at 1:10 AM
>>> *From:* "Alexander Fedulov" <alexan...@ververica.com>
>>> *To:* pod...@gmx.com
>>> *Cc:* "user" <user@flink.apache.org>
>>> *Subject:* Re: Is Flink able to read a CSV file or just like in Blink
>>> this function does not work?
>>> Hi Mike,
>>>
>>> I do not see any issues with your code. With a sample csv file like this
>>> a,1.0
>>> b,2.0
>>> c,3.0
>>>
>>> it produces the expected result
>>>
>>> +----------------------+
>>> |           some_value |
>>> +----------------------+
>>> |                    3 |
>>> +----------------------+
>>> 1 row in set
>>>
>>> Process finished with exit code 0
>>>
>>> --
>>>
>>> I do not use Windows, so I am not 100% sure what is the correct way to
>>> specify the file path there, however, some observations:
>>> - You are using a forward slash notation, I assume you are running in
>>> WSL or using cygwin or something like that?  Notice that in that notation
>>> an additional forward slash is missing 'file:///foo/bar'
>>> - Omitting the schema works with the local filesystem by default. On
>>> *nix systems this works fine  'connector.path' =
>>> '/home/alex/tmp/test.csv' . Maybe you could just try a standard windows
>>> backward slash notation instead.
>>> - That said, when a non-existent file is specified, the job fails
>>> immediately, so I would actually expect that behavior if the issue was
>>> indeed with the file path.
>>> Which version of Flink are you running?
>>>
>>> Best,
>>> Alexander Fedulov
>>>
>>>
>>> On Wed, Jul 6, 2022 at 10:39 PM <pod...@gmx.com> wrote:
>>>
>>>> If I'm reading Flink manul correctly (and this is not simple - no
>>>> examples), this code should read CSV file:
>>>>
>>>>
>>>> *package flinkTest2;*
>>>>
>>>>
>>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>>> org.apache.flink.table.api.TableEnvironment;*
>>>>
>>>> *public class flinkTest2 {*
>>>>
>>>> *    public static void main(String[] args) throws Exception {*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *    EnvironmentSettings settings = EnvironmentSettings
>>>> .newInstance()             //.inStreamingMode()             .inBatchMode()
>>>>             .build();*
>>>> *    final TableEnvironment tEnv = TableEnvironment.create(settings);*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *         tEnv.executeSql("CREATE TABLE test_table (column_name1
>>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>>> 'connector.path' = 'file://C/temp/test4.txt', 'format.type' = 'csv')");
>>>>       tEnv.sqlQuery("SELECT COUNT(*) AS some_value FROM test_table")
>>>> .execute()     .print();*
>>>>
>>>> * } }*
>>>>
>>>> Job is running and running and running...
>>>> I think Flink is not able to open file. Does not matter what is here:
>>>> 'connector.path' = 'file://C/temp/test4.txt' Flink does not display any
>>>> error or something. 'connector.path' = 'file:blah/blah/blah' could be fine
>>>> for flink as well.
>>>>
>>>> Anyone could help me with that?
>>>> Thanks
>>>> Mike
>>>>
>>>>
>>>

Reply via email to