Hi,

Using `mytable.execute().print()` is exactly what I wanted, thanks.

But I'm still curious. I'm just running this locally, in a junit test
case (not using a flink
cluster) just like in [flink-playgrounds SpendReportTest][1] so in
this scenario where does the task manager (if there is taskmanager)
 output go?

I just added src/test/resources/log4j.properties with

# Root logger option
log4j.rootLogger=INFO, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}
%-5p %c{1}:%L - %m%n

and still I don't see anything from the print sink, and I even run it
with the debugger and I can see that although
PrintSink#getSinkRuntimeProvider is called , the
RowDataPrintFunction#invoke is never called.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.cpu.cores required for local
execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.heap.size required for
local execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.off-heap.size required
for local execution is not set, setting it to the maximal possible
value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.min required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.max required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.managed.size required for
local execution is not set, setting it to its default value 128 mb.
2020-10-29 18:18:32 INFO  MiniCluster:253 - Starting Flink Mini Cluster
2020-10-29 18:18:32 INFO  MiniCluster:262 - Starting Metrics Registry
2020-10-29 18:18:32 INFO  MetricRegistryImpl:122 - No metrics reporter
configured, no metrics will be exposed/reported.
2020-10-29 18:18:32 INFO  MiniCluster:266 - Starting RPC Service(s)
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
started at akka://flink
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
started at akka://flink-metrics
2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService .
2020-10-29 18:18:32 INFO  MiniCluster:432 - Starting high-availability services
2020-10-29 18:18:32 INFO  BlobServer:143 - Created BLOB server storage
directory 
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
2020-10-29 18:18:32 INFO  BlobServer:207 - Started BLOB server at
0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
2020-10-29 18:18:32 INFO  PermanentBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
2020-10-29 18:18:32 INFO  TransientBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
2020-10-29 18:18:32 INFO  MiniCluster:519 - Starting 1 TaskManger(s)
2020-10-29 18:18:32 INFO  TaskManagerRunner:412 - Starting TaskManager
with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
2020-10-29 18:18:32 INFO  TaskManagerServices:411 - Temporary file
directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total
233 GB, usable 25 GB (10.73% usable)
2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
for spill files.
2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
for spill files.
2020-10-29 18:18:32 INFO  NetworkBufferPool:139 - Allocated 64 MB for
network buffer pool (number of memory segments: 2048, bytes per
segment: 32768).
2020-10-29 18:18:32 INFO  NettyShuffleEnvironment:293 - Starting the
network environment and its components.
2020-10-29 18:18:32 INFO  KvStateService:89 - Starting the kvState
service and its components.
2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.taskexecutor.TaskExecutor at
akka://flink/user/rpc/taskmanager_0 .
2020-10-29 18:18:32 INFO  DefaultJobLeaderService:116 - Start job
leader service.
2020-10-29 18:18:32 INFO  FileCache:107 - User file cache uses
directory 
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:140 - Starting rest endpoint.
2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:126 - Failed to load
web based job submission extension. Probable reason: flink-runtime-web
is not in the classpath.
2020-10-29 18:18:32 WARN  WebMonitorUtils:85 - Log file environment
variable 'log.file' is not set.
2020-10-29 18:18:32 WARN  WebMonitorUtils:91 - JobManager log files
are unavailable in the web dashboard. Log file location not found in
environment variable 'log.file' or configuration key 'web.log.path'.
2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:236 - Rest endpoint
listening at localhost:50966
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender http://localhost:50966
2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:821 -
http://localhost:50966 was granted leadership with
leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader http://localhost:50966 ,
session=5d1c56ab-6894-42d9-bc76-716ea59bd473
2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
at akka://flink/user/rpc/resourcemanager_1 .
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender LeaderContender: DefaultDispatcherRunner
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender LeaderContender: StandaloneResourceManager
2020-10-29 18:18:33 INFO  StandaloneResourceManager:1026 -
ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted
leadership with fencing token a8e96f157326e3f60a2df92bb1364f97
2020-10-29 18:18:33 INFO  MiniCluster:372 - Flink Mini Cluster started
successfully
2020-10-29 18:18:33 INFO  SlotManagerImpl:284 - Starting the SlotManager.
2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:102 - Start
SessionDispatcherLeaderProcess.
2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:120 - Recover
all persisted job graphs.
2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:128 -
Successfully recovered 0 persisted job graphs.
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/resourcemanager_1 ,
session=0a2df92b-b136-4f97-a8e9-6f157326e3f6
2020-10-29 18:18:33 INFO  TaskExecutor:1128 - Connecting to
ResourceManager
akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97).
2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_2 .
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/dispatcher_2 ,
session=3ce5d1bf-2d6d-49d6-b315-717f66748938
2020-10-29 18:18:33 INFO  TaskExecutor:155 - Resolved ResourceManager
address, beginning registration
2020-10-29 18:18:33 INFO  StandaloneResourceManager:821 - Registering
TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193
(akka://flink/user/rpc/taskmanager_0) at ResourceManager
2020-10-29 18:18:33 INFO  TaskExecutor:84 - Successful registration at
resource manager akka://flink/user/rpc/resourcemanager_1 under
registration id 0be76fda7dfc8204153de66b83ba5621.
2020-10-29 18:18:33 INFO  StandaloneDispatcher:295 - Received JobGraph
submission 1fd25ab4d3d51009542ebda2bbadb55d
(insert-into_default_catalog.default_database.print_table).
2020-10-29 18:18:33 INFO  StandaloneDispatcher:352 - Submitting job
1fd25ab4d3d51009542ebda2bbadb55d
(insert-into_default_catalog.default_database.print_table).
2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_3 .
2020-10-29 18:18:33 INFO  JobMaster:288 - Initializing job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  JobMaster:84 - Using restart back off time
strategy NoRestartBackoffTimeStrategy for
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  JobMaster:211 - Running initialization on
master for job insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  JobMaster:229 - Successfully ran
initialization on master in 3 ms.
2020-10-29 18:18:33 INFO  DefaultExecutionTopology:111 - Built 1
pipelined regions in 0 ms
2020-10-29 18:18:33 INFO  JobMaster:231 - No state backend has been
configured, using default (Memory / JobManager) MemoryStateBackend
(data in heap memory / checkpoints to JobManager) (checkpoints:
'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2020-10-29 18:18:33 INFO  JobMaster:165 - Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6
for insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender akka://flink/user/rpc/jobmanager_3
2020-10-29 18:18:33 INFO  JobManagerRunnerImpl:305 - JobManager runner
for job insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session
id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at
akka://flink/user/rpc/jobmanager_3.
2020-10-29 18:18:33 INFO  JobMaster:799 - Starting execution of job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) under job master id
8b9f639f07344b3f47a72e91c0bf4b40.
2020-10-29 18:18:33 INFO  JobMaster:197 - Starting scheduling with
scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2020-10-29 18:18:33 INFO  ExecutionGraph:1253 - Job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to
RUNNING.
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
switched from CREATED to SCHEDULED.
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1)
(1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
switched from CREATED to SCHEDULED.
2020-10-29 18:18:33 INFO  SlotPoolImpl:385 - Cannot serve slot
request, no ResourceManager connected. Adding as pending request
[SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}]
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/jobmanager_3 ,
session=47a72e91-c0bf-4b40-8b9f-639f07344b3f
2020-10-29 18:18:33 INFO  JobMaster:1031 - Connecting to
ResourceManager
akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97)
2020-10-29 18:18:33 INFO  JobMaster:155 - Resolved ResourceManager
address, beginning registration
2020-10-29 18:18:33 INFO  StandaloneResourceManager:330 - Registering
job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  StandaloneResourceManager:765 - Registered
job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  JobMaster:1053 - JobManager successfully
registered at ResourceManager, leader id:
a8e96f157326e3f60a2df92bb1364f97.
2020-10-29 18:18:33 INFO  SlotPoolImpl:347 - Requesting new slot
[SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile
ResourceProfile{UNKNOWN} with allocation id
45ea6506d06f73f61a36db764ce07ba7 from resource manager.
2020-10-29 18:18:33 INFO  StandaloneResourceManager:464 - Request slot
with profile ResourceProfile{UNKNOWN} for job
1fd25ab4d3d51009542ebda2bbadb55d with allocation id
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO  TaskExecutor:908 - Receive slot request
45ea6506d06f73f61a36db764ce07ba7 for job
1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id
a8e96f157326e3f60a2df92bb1364f97.
2020-10-29 18:18:33 INFO  TaskExecutor:976 - Allocated slot for
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:172 - Add job
1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring.
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:314 - Try to
register at job manager akka://flink/user/rpc/jobmanager_3 with leader
id 47a72e91-c0bf-4b40-8b9f-639f07344b3f.
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:155 - Resolved
JobManager address, beginning registration
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:369 - Successful
registration at job manager akka://flink/user/rpc/jobmanager_3 for job
1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  TaskExecutor:1379 - Establish JobManager
connection for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  TaskExecutor:1278 - Offer reserved slots to
the leader of job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  PermanentBlobCache:251 - Shutting down BLOB cache
2020-10-29 18:18:33 INFO  TaskExecutorLocalStateStoresManager:213 -
Shutting down TaskExecutorLocalStateStoresManager.
2020-10-29 18:18:33 INFO  TransientBlobCache:251 - Shutting down BLOB cache
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
switched from SCHEDULED to DEPLOYING.
2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (attempt #0) with attempt id
1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0
to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
allocation id 45ea6506d06f73f61a36db764ce07ba7
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1)
(1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
switched from SCHEDULED to DEPLOYING.
2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with
attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0
to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
allocation id 45ea6506d06f73f61a36db764ce07ba7
2020-10-29 18:18:33 INFO  TaskSlotTableImpl:361 - Activate slot
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
FileChannelManager removed spill file directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
2020-10-29 18:18:33 INFO  BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965
2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
FileChannelManager removed spill file directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
2020-10-29 18:18:33 INFO  FileCache:153 - removed file cache directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52

Process finished with exit code 0


[1]: 
https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java

On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote:
>
> You should be able to use the "print" sink. Remember though that the
> "print" sink prints into the stdout/stderr of TaskManagers, not the
> Client, where you submit the query. This is different from the
> TableResult, which collects results in the client. BTW, for printing you
> can use TableResult#print, which will nicely format your results.
>
> Best,
>
> Dawid
>
> On 29/10/2020 16:13, Ruben Laguna wrote:
> > How can I use the Table [Print SQL connector][1]? I tried the
> > following (batch mode)  but it does not give any output:
> >
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > TableEnvironment tEnv = TableEnvironment.create(settings);
> >
> > final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
> >
> > Table transactions =
> >         tEnv.fromValues(
> >                 DataTypes.ROW(
> >                         DataTypes.FIELD("account_id", DataTypes.BIGINT()),
> >                         DataTypes.FIELD("amount", DataTypes.BIGINT()),
> >                         DataTypes.FIELD("transaction_time",
> > DataTypes.TIMESTAMP(3))),
> >                 Row.of(1, 188, DATE_TIME.plusMinutes(12)),
> >                 Row.of(2, 374, DATE_TIME.plusMinutes(47)),
> >                 Row.of(3, 112, DATE_TIME.plusMinutes(36)),
> >                 Row.of(4, 478, DATE_TIME.plusMinutes(3)),
> >                 Row.of(5, 208, DATE_TIME.plusMinutes(8)),
> >                 Row.of(1, 379, DATE_TIME.plusMinutes(53)),
> >                 Row.of(2, 351, DATE_TIME.plusMinutes(32)),
> >                 Row.of(3, 320, DATE_TIME.plusMinutes(31)),
> >                 Row.of(4, 259, DATE_TIME.plusMinutes(19)),
> >                 Row.of(5, 273, DATE_TIME.plusMinutes(42)));
> > tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
> > BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
> >
> > transactions.executeInsert("print_table");
> >
> >
> > I can "materialize" the result manually and print them out with :
> >
> > for (Row row : materialize(transactions.execute())) {
> >     System.out.println(row);
> > }
> >
> > private static List<Row> materialize(TableResult results) {
> >     try (CloseableIterator<Row> resultIterator = results.collect()) {
> >         return StreamSupport
> >
> > .stream(Spliterators.spliteratorUnknownSize(resultIterator,
> > Spliterator.ORDERED), false)
> >                 .collect(Collectors.toList());
> >     } catch (Exception e) {
> >         throw new RuntimeException("Failed to materialize results", e);
> >     }
> > }
> >
> >
> > But I would like to know why I can't just use the Print sink.
> >
> > I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
> > don't thinks it's that.
> >
> > Does anybody know of any working example involving the print connector?
> >
> >
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
>


-- 
/Rubén

Reply via email to