Hi, Using `mytable.execute().print()` is exactly what I wanted, thanks.
But I'm still curious. I'm just running this locally, in a junit test case (not using a flink cluster) just like in [flink-playgrounds SpendReportTest][1] so in this scenario where does the task manager (if there is taskmanager) output go? I just added src/test/resources/log4j.properties with # Root logger option log4j.rootLogger=INFO, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n and still I don't see anything from the print sink, and I even run it with the debugger and I can see that although PrintSink#getSinkRuntimeProvider is called , the RowDataPrintFunction#invoke is never called. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 2020-10-29 18:18:32 INFO MiniCluster:253 - Starting Flink Mini Cluster 2020-10-29 18:18:32 INFO MiniCluster:262 - Starting Metrics Registry 2020-10-29 18:18:32 INFO MetricRegistryImpl:122 - No metrics reporter configured, no metrics will be exposed/reported. 2020-10-29 18:18:32 INFO MiniCluster:266 - Starting RPC Service(s) 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start local actor system 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system started at akka://flink 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start local actor system 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system started at akka://flink-metrics 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService . 2020-10-29 18:18:32 INFO MiniCluster:432 - Starting high-availability services 2020-10-29 18:18:32 INFO BlobServer:143 - Created BLOB server storage directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860 2020-10-29 18:18:32 INFO BlobServer:207 - Started BLOB server at 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000 2020-10-29 18:18:32 INFO PermanentBlobCache:107 - Created BLOB cache storage directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07 2020-10-29 18:18:32 INFO TransientBlobCache:107 - Created BLOB cache storage directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30 2020-10-29 18:18:32 INFO MiniCluster:519 - Starting 1 TaskManger(s) 2020-10-29 18:18:32 INFO TaskManagerRunner:412 - Starting TaskManager with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193 2020-10-29 18:18:32 INFO TaskManagerServices:411 - Temporary file directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total 233 GB, usable 25 GB (10.73% usable) 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - FileChannelManager uses directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 for spill files. 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - FileChannelManager uses directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 for spill files. 2020-10-29 18:18:32 INFO NetworkBufferPool:139 - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768). 2020-10-29 18:18:32 INFO NettyShuffleEnvironment:293 - Starting the network environment and its components. 2020-10-29 18:18:32 INFO KvStateService:89 - Starting the kvState service and its components. 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 . 2020-10-29 18:18:32 INFO DefaultJobLeaderService:116 - Start job leader service. 2020-10-29 18:18:32 INFO FileCache:107 - User file cache uses directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:140 - Starting rest endpoint. 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:126 - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. 2020-10-29 18:18:32 WARN WebMonitorUtils:85 - Log file environment variable 'log.file' is not set. 2020-10-29 18:18:32 WARN WebMonitorUtils:91 - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:236 - Rest endpoint listening at localhost:50966 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender http://localhost:50966 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:821 - http://localhost:50966 was granted leadership with leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader http://localhost:50966 , session=5d1c56ab-6894-42d9-bc76-716ea59bd473 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/rpc/resourcemanager_1 . 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender LeaderContender: DefaultDispatcherRunner 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender LeaderContender: StandaloneResourceManager 2020-10-29 18:18:33 INFO StandaloneResourceManager:1026 - ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted leadership with fencing token a8e96f157326e3f60a2df92bb1364f97 2020-10-29 18:18:33 INFO MiniCluster:372 - Flink Mini Cluster started successfully 2020-10-29 18:18:33 INFO SlotManagerImpl:284 - Starting the SlotManager. 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:102 - Start SessionDispatcherLeaderProcess. 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:120 - Recover all persisted job graphs. 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:128 - Successfully recovered 0 persisted job graphs. 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader akka://flink/user/rpc/resourcemanager_1 , session=0a2df92b-b136-4f97-a8e9-6f157326e3f6 2020-10-29 18:18:33 INFO TaskExecutor:1128 - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97). 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_2 . 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader akka://flink/user/rpc/dispatcher_2 , session=3ce5d1bf-2d6d-49d6-b315-717f66748938 2020-10-29 18:18:33 INFO TaskExecutor:155 - Resolved ResourceManager address, beginning registration 2020-10-29 18:18:33 INFO StandaloneResourceManager:821 - Registering TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193 (akka://flink/user/rpc/taskmanager_0) at ResourceManager 2020-10-29 18:18:33 INFO TaskExecutor:84 - Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 0be76fda7dfc8204153de66b83ba5621. 2020-10-29 18:18:33 INFO StandaloneDispatcher:295 - Received JobGraph submission 1fd25ab4d3d51009542ebda2bbadb55d (insert-into_default_catalog.default_database.print_table). 2020-10-29 18:18:33 INFO StandaloneDispatcher:352 - Submitting job 1fd25ab4d3d51009542ebda2bbadb55d (insert-into_default_catalog.default_database.print_table). 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 . 2020-10-29 18:18:33 INFO JobMaster:288 - Initializing job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO JobMaster:84 - Using restart back off time strategy NoRestartBackoffTimeStrategy for insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO JobMaster:211 - Running initialization on master for job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO JobMaster:229 - Successfully ran initialization on master in 3 ms. 2020-10-29 18:18:33 INFO DefaultExecutionTopology:111 - Built 1 pipelined regions in 0 ms 2020-10-29 18:18:33 INFO JobMaster:231 - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2020-10-29 18:18:33 INFO JobMaster:165 - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6 for insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 2020-10-29 18:18:33 INFO JobManagerRunnerImpl:305 - JobManager runner for job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at akka://flink/user/rpc/jobmanager_3. 2020-10-29 18:18:33 INFO JobMaster:799 - Starting execution of job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d) under job master id 8b9f639f07344b3f47a72e91c0bf4b40. 2020-10-29 18:18:33 INFO JobMaster:197 - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] 2020-10-29 18:18:33 INFO ExecutionGraph:1253 - Job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to RUNNING. 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 00:12:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) switched from CREATED to SCHEDULED. 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: Sink(table=[default_catalog.default_database.print_table], fields=[account_id, amount, transaction_time]) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) switched from CREATED to SCHEDULED. 2020-10-29 18:18:33 INFO SlotPoolImpl:385 - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=47a72e91-c0bf-4b40-8b9f-639f07344b3f 2020-10-29 18:18:33 INFO JobMaster:1031 - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97) 2020-10-29 18:18:33 INFO JobMaster:155 - Resolved ResourceManager address, beginning registration 2020-10-29 18:18:33 INFO StandaloneResourceManager:330 - Registering job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO StandaloneResourceManager:765 - Registered job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO JobMaster:1053 - JobManager successfully registered at ResourceManager, leader id: a8e96f157326e3f60a2df92bb1364f97. 2020-10-29 18:18:33 INFO SlotPoolImpl:347 - Requesting new slot [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile ResourceProfile{UNKNOWN} with allocation id 45ea6506d06f73f61a36db764ce07ba7 from resource manager. 2020-10-29 18:18:33 INFO StandaloneResourceManager:464 - Request slot with profile ResourceProfile{UNKNOWN} for job 1fd25ab4d3d51009542ebda2bbadb55d with allocation id 45ea6506d06f73f61a36db764ce07ba7. 2020-10-29 18:18:33 INFO TaskExecutor:908 - Receive slot request 45ea6506d06f73f61a36db764ce07ba7 for job 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id a8e96f157326e3f60a2df92bb1364f97. 2020-10-29 18:18:33 INFO TaskExecutor:976 - Allocated slot for 45ea6506d06f73f61a36db764ce07ba7. 2020-10-29 18:18:33 INFO DefaultJobLeaderService:172 - Add job 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring. 2020-10-29 18:18:33 INFO DefaultJobLeaderService:314 - Try to register at job manager akka://flink/user/rpc/jobmanager_3 with leader id 47a72e91-c0bf-4b40-8b9f-639f07344b3f. 2020-10-29 18:18:33 INFO DefaultJobLeaderService:155 - Resolved JobManager address, beginning registration 2020-10-29 18:18:33 INFO DefaultJobLeaderService:369 - Successful registration at job manager akka://flink/user/rpc/jobmanager_3 for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO TaskExecutor:1379 - Establish JobManager connection for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO TaskExecutor:1278 - Offer reserved slots to the leader of job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO PermanentBlobCache:251 - Shutting down BLOB cache 2020-10-29 18:18:33 INFO TaskExecutorLocalStateStoresManager:213 - Shutting down TaskExecutorLocalStateStoresManager. 2020-10-29 18:18:33 INFO TransientBlobCache:251 - Shutting down BLOB cache 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 00:12:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) switched from SCHEDULED to DEPLOYING. 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Source: Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 00:12:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) (1/1) (attempt #0) with attempt id 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0 to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with allocation id 45ea6506d06f73f61a36db764ce07ba7 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: Sink(table=[default_catalog.default_database.print_table], fields=[account_id, amount, transaction_time]) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) switched from SCHEDULED to DEPLOYING. 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Sink: Sink(table=[default_catalog.default_database.print_table], fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0 to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with allocation id 45ea6506d06f73f61a36db764ce07ba7 2020-10-29 18:18:33 INFO TaskSlotTableImpl:361 - Activate slot 45ea6506d06f73f61a36db764ce07ba7. 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - FileChannelManager removed spill file directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 2020-10-29 18:18:33 INFO BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - FileChannelManager removed spill file directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 2020-10-29 18:18:33 INFO FileCache:153 - removed file cache directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 Process finished with exit code 0 [1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > > You should be able to use the "print" sink. Remember though that the > "print" sink prints into the stdout/stderr of TaskManagers, not the > Client, where you submit the query. This is different from the > TableResult, which collects results in the client. BTW, for printing you > can use TableResult#print, which will nicely format your results. > > Best, > > Dawid > > On 29/10/2020 16:13, Ruben Laguna wrote: > > How can I use the Table [Print SQL connector][1]? I tried the > > following (batch mode) but it does not give any output: > > > > > > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tEnv = TableEnvironment.create(settings); > > > > final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0); > > > > Table transactions = > > tEnv.fromValues( > > DataTypes.ROW( > > DataTypes.FIELD("account_id", DataTypes.BIGINT()), > > DataTypes.FIELD("amount", DataTypes.BIGINT()), > > DataTypes.FIELD("transaction_time", > > DataTypes.TIMESTAMP(3))), > > Row.of(1, 188, DATE_TIME.plusMinutes(12)), > > Row.of(2, 374, DATE_TIME.plusMinutes(47)), > > Row.of(3, 112, DATE_TIME.plusMinutes(36)), > > Row.of(4, 478, DATE_TIME.plusMinutes(3)), > > Row.of(5, 208, DATE_TIME.plusMinutes(8)), > > Row.of(1, 379, DATE_TIME.plusMinutes(53)), > > Row.of(2, 351, DATE_TIME.plusMinutes(32)), > > Row.of(3, 320, DATE_TIME.plusMinutes(31)), > > Row.of(4, 259, DATE_TIME.plusMinutes(19)), > > Row.of(5, 273, DATE_TIME.plusMinutes(42))); > > tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount > > BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')"); > > > > transactions.executeInsert("print_table"); > > > > > > I can "materialize" the result manually and print them out with : > > > > for (Row row : materialize(transactions.execute())) { > > System.out.println(row); > > } > > > > private static List<Row> materialize(TableResult results) { > > try (CloseableIterator<Row> resultIterator = results.collect()) { > > return StreamSupport > > > > .stream(Spliterators.spliteratorUnknownSize(resultIterator, > > Spliterator.ORDERED), false) > > .collect(Collectors.toList()); > > } catch (Exception e) { > > throw new RuntimeException("Failed to materialize results", e); > > } > > } > > > > > > But I would like to know why I can't just use the Print sink. > > > > I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I > > don't thinks it's that. > > > > Does anybody know of any working example involving the print connector? > > > > > > > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html > -- /Rubén