Hi, The problem in your case is that you exit before anything is printed out. The method executeInsert executes the query, but it does not wait for the query to finish. Therefore your main/test method returns, bringing down the local cluster, before anything is printed out. You can e.g. add
TableResult result = transactions.executeInsert("print_table"); result.await(); which will wait for the insert to finish. The print sink prints into the stdout/stderr directly, therefore none of the logging configurations apply in this case. Best, Dawid On 29/10/2020 18:29, Ruben Laguna wrote: > Hi, > > Using `mytable.execute().print()` is exactly what I wanted, thanks. > > But I'm still curious. I'm just running this locally, in a junit test > case (not using a flink > cluster) just like in [flink-playgrounds SpendReportTest][1] so in > this scenario where does the task manager (if there is taskmanager) > output go? > > I just added src/test/resources/log4j.properties with > > # Root logger option > log4j.rootLogger=INFO, stdout > > # Direct log messages to stdout > log4j.appender.stdout=org.apache.log4j.ConsoleAppender > log4j.appender.stdout.Target=System.out > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout > log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} > %-5p %c{1}:%L - %m%n > > and still I don't see anything from the print sink, and I even run it > with the debugger and I can see that although > PrintSink#getSinkRuntimeProvider is called , the > RowDataPrintFunction#invoke is never called. > > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.cpu.cores required for local > execution is not set, setting it to the maximal possible value. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.task.heap.size required for > local execution is not set, setting it to the maximal possible value. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.task.off-heap.size required > for local execution is not set, setting it to the maximal possible > value. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.network.min required for local > execution is not set, setting it to its default value 64 mb. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.network.max required for local > execution is not set, setting it to its default value 64 mb. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.managed.size required for > local execution is not set, setting it to its default value 128 mb. > 2020-10-29 18:18:32 INFO MiniCluster:253 - Starting Flink Mini Cluster > 2020-10-29 18:18:32 INFO MiniCluster:262 - Starting Metrics Registry > 2020-10-29 18:18:32 INFO MetricRegistryImpl:122 - No metrics reporter > configured, no metrics will be exposed/reported. > 2020-10-29 18:18:32 INFO MiniCluster:266 - Starting RPC Service(s) > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start > local actor system > 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system > started at akka://flink > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start > local actor system > 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system > started at akka://flink-metrics > 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.metrics.dump.MetricQueryService at > akka://flink-metrics/user/rpc/MetricQueryService . > 2020-10-29 18:18:32 INFO MiniCluster:432 - Starting high-availability > services > 2020-10-29 18:18:32 INFO BlobServer:143 - Created BLOB server storage > directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860 > 2020-10-29 18:18:32 INFO BlobServer:207 - Started BLOB server at > 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000 > 2020-10-29 18:18:32 INFO PermanentBlobCache:107 - Created BLOB cache > storage directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07 > 2020-10-29 18:18:32 INFO TransientBlobCache:107 - Created BLOB cache > storage directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30 > 2020-10-29 18:18:32 INFO MiniCluster:519 - Starting 1 TaskManger(s) > 2020-10-29 18:18:32 INFO TaskManagerRunner:412 - Starting TaskManager > with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193 > 2020-10-29 18:18:32 INFO TaskManagerServices:411 - Temporary file > directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total > 233 GB, usable 25 GB (10.73% usable) > 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - > FileChannelManager uses directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 > for spill files. > 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - > FileChannelManager uses directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 > for spill files. > 2020-10-29 18:18:32 INFO NetworkBufferPool:139 - Allocated 64 MB for > network buffer pool (number of memory segments: 2048, bytes per > segment: 32768). > 2020-10-29 18:18:32 INFO NettyShuffleEnvironment:293 - Starting the > network environment and its components. > 2020-10-29 18:18:32 INFO KvStateService:89 - Starting the kvState > service and its components. > 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.taskexecutor.TaskExecutor at > akka://flink/user/rpc/taskmanager_0 . > 2020-10-29 18:18:32 INFO DefaultJobLeaderService:116 - Start job > leader service. > 2020-10-29 18:18:32 INFO FileCache:107 - User file cache uses > directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 > 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:140 - Starting rest endpoint. > 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:126 - Failed to load > web based job submission extension. Probable reason: flink-runtime-web > is not in the classpath. > 2020-10-29 18:18:32 WARN WebMonitorUtils:85 - Log file environment > variable 'log.file' is not set. > 2020-10-29 18:18:32 WARN WebMonitorUtils:91 - JobManager log files > are unavailable in the web dashboard. Log file location not found in > environment variable 'log.file' or configuration key 'web.log.path'. > 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:236 - Rest endpoint > listening at localhost:50966 > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender http://localhost:50966 > 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:821 - > http://localhost:50966 was granted leadership with > leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473 > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader http://localhost:50966 , > session=5d1c56ab-6894-42d9-bc76-716ea59bd473 > 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager > at akka://flink/user/rpc/resourcemanager_1 . > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender LeaderContender: DefaultDispatcherRunner > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender LeaderContender: StandaloneResourceManager > 2020-10-29 18:18:33 INFO StandaloneResourceManager:1026 - > ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted > leadership with fencing token a8e96f157326e3f60a2df92bb1364f97 > 2020-10-29 18:18:33 INFO MiniCluster:372 - Flink Mini Cluster started > successfully > 2020-10-29 18:18:33 INFO SlotManagerImpl:284 - Starting the SlotManager. > 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:102 - Start > SessionDispatcherLeaderProcess. > 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:120 - Recover > all persisted job graphs. > 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:128 - > Successfully recovered 0 persisted job graphs. > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader > akka://flink/user/rpc/resourcemanager_1 , > session=0a2df92b-b136-4f97-a8e9-6f157326e3f6 > 2020-10-29 18:18:33 INFO TaskExecutor:1128 - Connecting to > ResourceManager > akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97). > 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at > akka://flink/user/rpc/dispatcher_2 . > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader > akka://flink/user/rpc/dispatcher_2 , > session=3ce5d1bf-2d6d-49d6-b315-717f66748938 > 2020-10-29 18:18:33 INFO TaskExecutor:155 - Resolved ResourceManager > address, beginning registration > 2020-10-29 18:18:33 INFO StandaloneResourceManager:821 - Registering > TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193 > (akka://flink/user/rpc/taskmanager_0) at ResourceManager > 2020-10-29 18:18:33 INFO TaskExecutor:84 - Successful registration at > resource manager akka://flink/user/rpc/resourcemanager_1 under > registration id 0be76fda7dfc8204153de66b83ba5621. > 2020-10-29 18:18:33 INFO StandaloneDispatcher:295 - Received JobGraph > submission 1fd25ab4d3d51009542ebda2bbadb55d > (insert-into_default_catalog.default_database.print_table). > 2020-10-29 18:18:33 INFO StandaloneDispatcher:352 - Submitting job > 1fd25ab4d3d51009542ebda2bbadb55d > (insert-into_default_catalog.default_database.print_table). > 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/rpc/jobmanager_3 . > 2020-10-29 18:18:33 INFO JobMaster:288 - Initializing job > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO JobMaster:84 - Using restart back off time > strategy NoRestartBackoffTimeStrategy for > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO JobMaster:211 - Running initialization on > master for job insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO JobMaster:229 - Successfully ran > initialization on master in 3 ms. > 2020-10-29 18:18:33 INFO DefaultExecutionTopology:111 - Built 1 > pipelined regions in 0 ms > 2020-10-29 18:18:33 INFO JobMaster:231 - No state backend has been > configured, using default (Memory / JobManager) MemoryStateBackend > (data in heap memory / checkpoints to JobManager) (checkpoints: > 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) > 2020-10-29 18:18:33 INFO JobMaster:165 - Using failover strategy > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6 > for insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender akka://flink/user/rpc/jobmanager_3 > 2020-10-29 18:18:33 INFO JobManagerRunnerImpl:305 - JobManager runner > for job insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session > id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at > akka://flink/user/rpc/jobmanager_3. > 2020-10-29 18:18:33 INFO JobMaster:799 - Starting execution of job > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d) under job master id > 8b9f639f07344b3f47a72e91c0bf4b40. > 2020-10-29 18:18:33 INFO JobMaster:197 - Starting scheduling with > scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > 2020-10-29 18:18:33 INFO ExecutionGraph:1253 - Job > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to > RUNNING. > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: > Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS > account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 > 00:12:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) > (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) > switched from CREATED to SCHEDULED. > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: > Sink(table=[default_catalog.default_database.print_table], > fields=[account_id, amount, transaction_time]) (1/1) > (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) > switched from CREATED to SCHEDULED. > 2020-10-29 18:18:33 INFO SlotPoolImpl:385 - Cannot serve slot > request, no ResourceManager connected. Adding as pending request > [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader > akka://flink/user/rpc/jobmanager_3 , > session=47a72e91-c0bf-4b40-8b9f-639f07344b3f > 2020-10-29 18:18:33 INFO JobMaster:1031 - Connecting to > ResourceManager > akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97) > 2020-10-29 18:18:33 INFO JobMaster:155 - Resolved ResourceManager > address, beginning registration > 2020-10-29 18:18:33 INFO StandaloneResourceManager:330 - Registering > job manager > 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 > for job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO StandaloneResourceManager:765 - Registered > job manager > 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 > for job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO JobMaster:1053 - JobManager successfully > registered at ResourceManager, leader id: > a8e96f157326e3f60a2df92bb1364f97. > 2020-10-29 18:18:33 INFO SlotPoolImpl:347 - Requesting new slot > [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile > ResourceProfile{UNKNOWN} with allocation id > 45ea6506d06f73f61a36db764ce07ba7 from resource manager. > 2020-10-29 18:18:33 INFO StandaloneResourceManager:464 - Request slot > with profile ResourceProfile{UNKNOWN} for job > 1fd25ab4d3d51009542ebda2bbadb55d with allocation id > 45ea6506d06f73f61a36db764ce07ba7. > 2020-10-29 18:18:33 INFO TaskExecutor:908 - Receive slot request > 45ea6506d06f73f61a36db764ce07ba7 for job > 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id > a8e96f157326e3f60a2df92bb1364f97. > 2020-10-29 18:18:33 INFO TaskExecutor:976 - Allocated slot for > 45ea6506d06f73f61a36db764ce07ba7. > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:172 - Add job > 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring. > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:314 - Try to > register at job manager akka://flink/user/rpc/jobmanager_3 with leader > id 47a72e91-c0bf-4b40-8b9f-639f07344b3f. > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:155 - Resolved > JobManager address, beginning registration > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:369 - Successful > registration at job manager akka://flink/user/rpc/jobmanager_3 for job > 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO TaskExecutor:1379 - Establish JobManager > connection for job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO TaskExecutor:1278 - Offer reserved slots to > the leader of job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO PermanentBlobCache:251 - Shutting down BLOB cache > 2020-10-29 18:18:33 INFO TaskExecutorLocalStateStoresManager:213 - > Shutting down TaskExecutorLocalStateStoresManager. > 2020-10-29 18:18:33 INFO TransientBlobCache:251 - Shutting down BLOB cache > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: > Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS > account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 > 00:12:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) > (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) > switched from SCHEDULED to DEPLOYING. > 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Source: > Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS > account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 > 00:12:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) > (1/1) (attempt #0) with attempt id > 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0 > to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with > allocation id 45ea6506d06f73f61a36db764ce07ba7 > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: > Sink(table=[default_catalog.default_database.print_table], > fields=[account_id, amount, transaction_time]) (1/1) > (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) > switched from SCHEDULED to DEPLOYING. > 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Sink: > Sink(table=[default_catalog.default_database.print_table], > fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with > attempt id > 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0 > to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with > allocation id 45ea6506d06f73f61a36db764ce07ba7 > 2020-10-29 18:18:33 INFO TaskSlotTableImpl:361 - Activate slot > 45ea6506d06f73f61a36db764ce07ba7. > 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - > FileChannelManager removed spill file directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 > 2020-10-29 18:18:33 INFO BlobServer:348 - Stopped BLOB server at > 0.0.0.0:50965 > 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - > FileChannelManager removed spill file directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 > 2020-10-29 18:18:33 INFO FileCache:153 - removed file cache directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 > > Process finished with exit code 0 > > > [1]: > https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java > > On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: >> You should be able to use the "print" sink. Remember though that the >> "print" sink prints into the stdout/stderr of TaskManagers, not the >> Client, where you submit the query. This is different from the >> TableResult, which collects results in the client. BTW, for printing you >> can use TableResult#print, which will nicely format your results. >> >> Best, >> >> Dawid >> >> On 29/10/2020 16:13, Ruben Laguna wrote: >>> How can I use the Table [Print SQL connector][1]? I tried the >>> following (batch mode) but it does not give any output: >>> >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >>> TableEnvironment tEnv = TableEnvironment.create(settings); >>> >>> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0); >>> >>> Table transactions = >>> tEnv.fromValues( >>> DataTypes.ROW( >>> DataTypes.FIELD("account_id", DataTypes.BIGINT()), >>> DataTypes.FIELD("amount", DataTypes.BIGINT()), >>> DataTypes.FIELD("transaction_time", >>> DataTypes.TIMESTAMP(3))), >>> Row.of(1, 188, DATE_TIME.plusMinutes(12)), >>> Row.of(2, 374, DATE_TIME.plusMinutes(47)), >>> Row.of(3, 112, DATE_TIME.plusMinutes(36)), >>> Row.of(4, 478, DATE_TIME.plusMinutes(3)), >>> Row.of(5, 208, DATE_TIME.plusMinutes(8)), >>> Row.of(1, 379, DATE_TIME.plusMinutes(53)), >>> Row.of(2, 351, DATE_TIME.plusMinutes(32)), >>> Row.of(3, 320, DATE_TIME.plusMinutes(31)), >>> Row.of(4, 259, DATE_TIME.plusMinutes(19)), >>> Row.of(5, 273, DATE_TIME.plusMinutes(42))); >>> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount >>> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')"); >>> >>> transactions.executeInsert("print_table"); >>> >>> >>> I can "materialize" the result manually and print them out with : >>> >>> for (Row row : materialize(transactions.execute())) { >>> System.out.println(row); >>> } >>> >>> private static List<Row> materialize(TableResult results) { >>> try (CloseableIterator<Row> resultIterator = results.collect()) { >>> return StreamSupport >>> >>> .stream(Spliterators.spliteratorUnknownSize(resultIterator, >>> Spliterator.ORDERED), false) >>> .collect(Collectors.toList()); >>> } catch (Exception e) { >>> throw new RuntimeException("Failed to materialize results", e); >>> } >>> } >>> >>> >>> But I would like to know why I can't just use the Print sink. >>> >>> I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I >>> don't thinks it's that. >>> >>> Does anybody know of any working example involving the print connector? >>> >>> >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html >
signature.asc
Description: OpenPGP digital signature