Hi,

The problem in your case is that you exit before anything is printed
out. The method executeInsert executes the query, but it does not wait
for the query to finish. Therefore your main/test method returns,
bringing down the local cluster, before anything is printed out. You can
e.g. add

        TableResult result = transactions.executeInsert("print_table");
        result.await();

which will wait for the insert to finish.

The print sink prints into the stdout/stderr directly, therefore none of
the logging configurations apply in this case.

Best,

Dawid

On 29/10/2020 18:29, Ruben Laguna wrote:
> Hi,
>
> Using `mytable.execute().print()` is exactly what I wanted, thanks.
>
> But I'm still curious. I'm just running this locally, in a junit test
> case (not using a flink
> cluster) just like in [flink-playgrounds SpendReportTest][1] so in
> this scenario where does the task manager (if there is taskmanager)
>  output go?
>
> I just added src/test/resources/log4j.properties with
>
> # Root logger option
> log4j.rootLogger=INFO, stdout
>
> # Direct log messages to stdout
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.Target=System.out
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}
> %-5p %c{1}:%L - %m%n
>
> and still I don't see anything from the print sink, and I even run it
> with the debugger and I can see that although
> PrintSink#getSinkRuntimeProvider is called , the
> RowDataPrintFunction#invoke is never called.
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to the maximal possible value.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.task.heap.size required for
> local execution is not set, setting it to the maximal possible value.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.task.off-heap.size required
> for local execution is not set, setting it to the maximal possible
> value.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.network.min required for local
> execution is not set, setting it to its default value 64 mb.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.network.max required for local
> execution is not set, setting it to its default value 64 mb.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.managed.size required for
> local execution is not set, setting it to its default value 128 mb.
> 2020-10-29 18:18:32 INFO  MiniCluster:253 - Starting Flink Mini Cluster
> 2020-10-29 18:18:32 INFO  MiniCluster:262 - Starting Metrics Registry
> 2020-10-29 18:18:32 INFO  MetricRegistryImpl:122 - No metrics reporter
> configured, no metrics will be exposed/reported.
> 2020-10-29 18:18:32 INFO  MiniCluster:266 - Starting RPC Service(s)
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
> local actor system
> 2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
> started at akka://flink
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
> local actor system
> 2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
> started at akka://flink-metrics
> 2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.metrics.dump.MetricQueryService at
> akka://flink-metrics/user/rpc/MetricQueryService .
> 2020-10-29 18:18:32 INFO  MiniCluster:432 - Starting high-availability 
> services
> 2020-10-29 18:18:32 INFO  BlobServer:143 - Created BLOB server storage
> directory 
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
> 2020-10-29 18:18:32 INFO  BlobServer:207 - Started BLOB server at
> 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
> 2020-10-29 18:18:32 INFO  PermanentBlobCache:107 - Created BLOB cache
> storage directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
> 2020-10-29 18:18:32 INFO  TransientBlobCache:107 - Created BLOB cache
> storage directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
> 2020-10-29 18:18:32 INFO  MiniCluster:519 - Starting 1 TaskManger(s)
> 2020-10-29 18:18:32 INFO  TaskManagerRunner:412 - Starting TaskManager
> with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
> 2020-10-29 18:18:32 INFO  TaskManagerServices:411 - Temporary file
> directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total
> 233 GB, usable 25 GB (10.73% usable)
> 2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
> FileChannelManager uses directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
> for spill files.
> 2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
> FileChannelManager uses directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
> for spill files.
> 2020-10-29 18:18:32 INFO  NetworkBufferPool:139 - Allocated 64 MB for
> network buffer pool (number of memory segments: 2048, bytes per
> segment: 32768).
> 2020-10-29 18:18:32 INFO  NettyShuffleEnvironment:293 - Starting the
> network environment and its components.
> 2020-10-29 18:18:32 INFO  KvStateService:89 - Starting the kvState
> service and its components.
> 2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/rpc/taskmanager_0 .
> 2020-10-29 18:18:32 INFO  DefaultJobLeaderService:116 - Start job
> leader service.
> 2020-10-29 18:18:32 INFO  FileCache:107 - User file cache uses
> directory 
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
> 2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:140 - Starting rest endpoint.
> 2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:126 - Failed to load
> web based job submission extension. Probable reason: flink-runtime-web
> is not in the classpath.
> 2020-10-29 18:18:32 WARN  WebMonitorUtils:85 - Log file environment
> variable 'log.file' is not set.
> 2020-10-29 18:18:32 WARN  WebMonitorUtils:91 - JobManager log files
> are unavailable in the web dashboard. Log file location not found in
> environment variable 'log.file' or configuration key 'web.log.path'.
> 2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:236 - Rest endpoint
> listening at localhost:50966
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender http://localhost:50966
> 2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:821 -
> http://localhost:50966 was granted leadership with
> leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader http://localhost:50966 ,
> session=5d1c56ab-6894-42d9-bc76-716ea59bd473
> 2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> at akka://flink/user/rpc/resourcemanager_1 .
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender LeaderContender: DefaultDispatcherRunner
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender LeaderContender: StandaloneResourceManager
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:1026 -
> ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted
> leadership with fencing token a8e96f157326e3f60a2df92bb1364f97
> 2020-10-29 18:18:33 INFO  MiniCluster:372 - Flink Mini Cluster started
> successfully
> 2020-10-29 18:18:33 INFO  SlotManagerImpl:284 - Starting the SlotManager.
> 2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:102 - Start
> SessionDispatcherLeaderProcess.
> 2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:120 - Recover
> all persisted job graphs.
> 2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:128 -
> Successfully recovered 0 persisted job graphs.
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/resourcemanager_1 ,
> session=0a2df92b-b136-4f97-a8e9-6f157326e3f6
> 2020-10-29 18:18:33 INFO  TaskExecutor:1128 - Connecting to
> ResourceManager
> akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97).
> 2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_2 .
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/dispatcher_2 ,
> session=3ce5d1bf-2d6d-49d6-b315-717f66748938
> 2020-10-29 18:18:33 INFO  TaskExecutor:155 - Resolved ResourceManager
> address, beginning registration
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:821 - Registering
> TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193
> (akka://flink/user/rpc/taskmanager_0) at ResourceManager
> 2020-10-29 18:18:33 INFO  TaskExecutor:84 - Successful registration at
> resource manager akka://flink/user/rpc/resourcemanager_1 under
> registration id 0be76fda7dfc8204153de66b83ba5621.
> 2020-10-29 18:18:33 INFO  StandaloneDispatcher:295 - Received JobGraph
> submission 1fd25ab4d3d51009542ebda2bbadb55d
> (insert-into_default_catalog.default_database.print_table).
> 2020-10-29 18:18:33 INFO  StandaloneDispatcher:352 - Submitting job
> 1fd25ab4d3d51009542ebda2bbadb55d
> (insert-into_default_catalog.default_database.print_table).
> 2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_3 .
> 2020-10-29 18:18:33 INFO  JobMaster:288 - Initializing job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  JobMaster:84 - Using restart back off time
> strategy NoRestartBackoffTimeStrategy for
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  JobMaster:211 - Running initialization on
> master for job insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  JobMaster:229 - Successfully ran
> initialization on master in 3 ms.
> 2020-10-29 18:18:33 INFO  DefaultExecutionTopology:111 - Built 1
> pipelined regions in 0 ms
> 2020-10-29 18:18:33 INFO  JobMaster:231 - No state backend has been
> configured, using default (Memory / JobManager) MemoryStateBackend
> (data in heap memory / checkpoints to JobManager) (checkpoints:
> 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 2020-10-29 18:18:33 INFO  JobMaster:165 - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6
> for insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender akka://flink/user/rpc/jobmanager_3
> 2020-10-29 18:18:33 INFO  JobManagerRunnerImpl:305 - JobManager runner
> for job insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session
> id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at
> akka://flink/user/rpc/jobmanager_3.
> 2020-10-29 18:18:33 INFO  JobMaster:799 - Starting execution of job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) under job master id
> 8b9f639f07344b3f47a72e91c0bf4b40.
> 2020-10-29 18:18:33 INFO  JobMaster:197 - Starting scheduling with
> scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1253 - Job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to
> RUNNING.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
> switched from CREATED to SCHEDULED.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1)
> (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
> switched from CREATED to SCHEDULED.
> 2020-10-29 18:18:33 INFO  SlotPoolImpl:385 - Cannot serve slot
> request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}]
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/jobmanager_3 ,
> session=47a72e91-c0bf-4b40-8b9f-639f07344b3f
> 2020-10-29 18:18:33 INFO  JobMaster:1031 - Connecting to
> ResourceManager
> akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97)
> 2020-10-29 18:18:33 INFO  JobMaster:155 - Resolved ResourceManager
> address, beginning registration
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:330 - Registering
> job manager 
> 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
> for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:765 - Registered
> job manager 
> 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
> for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  JobMaster:1053 - JobManager successfully
> registered at ResourceManager, leader id:
> a8e96f157326e3f60a2df92bb1364f97.
> 2020-10-29 18:18:33 INFO  SlotPoolImpl:347 - Requesting new slot
> [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile
> ResourceProfile{UNKNOWN} with allocation id
> 45ea6506d06f73f61a36db764ce07ba7 from resource manager.
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:464 - Request slot
> with profile ResourceProfile{UNKNOWN} for job
> 1fd25ab4d3d51009542ebda2bbadb55d with allocation id
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO  TaskExecutor:908 - Receive slot request
> 45ea6506d06f73f61a36db764ce07ba7 for job
> 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id
> a8e96f157326e3f60a2df92bb1364f97.
> 2020-10-29 18:18:33 INFO  TaskExecutor:976 - Allocated slot for
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:172 - Add job
> 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring.
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:314 - Try to
> register at job manager akka://flink/user/rpc/jobmanager_3 with leader
> id 47a72e91-c0bf-4b40-8b9f-639f07344b3f.
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:155 - Resolved
> JobManager address, beginning registration
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:369 - Successful
> registration at job manager akka://flink/user/rpc/jobmanager_3 for job
> 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  TaskExecutor:1379 - Establish JobManager
> connection for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  TaskExecutor:1278 - Offer reserved slots to
> the leader of job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  PermanentBlobCache:251 - Shutting down BLOB cache
> 2020-10-29 18:18:33 INFO  TaskExecutorLocalStateStoresManager:213 -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2020-10-29 18:18:33 INFO  TransientBlobCache:251 - Shutting down BLOB cache
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (attempt #0) with attempt id
> 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0
> to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
> allocation id 45ea6506d06f73f61a36db764ce07ba7
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1)
> (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with
> attempt id 
> 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0
> to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
> allocation id 45ea6506d06f73f61a36db764ce07ba7
> 2020-10-29 18:18:33 INFO  TaskSlotTableImpl:361 - Activate slot
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
> FileChannelManager removed spill file directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
> 2020-10-29 18:18:33 INFO  BlobServer:348 - Stopped BLOB server at 
> 0.0.0.0:50965
> 2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
> FileChannelManager removed spill file directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
> 2020-10-29 18:18:33 INFO  FileCache:153 - removed file cache directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
>
> Process finished with exit code 0
>
>
> [1]: 
> https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
>
> On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <dwysakow...@apache.org> 
> wrote:
>> You should be able to use the "print" sink. Remember though that the
>> "print" sink prints into the stdout/stderr of TaskManagers, not the
>> Client, where you submit the query. This is different from the
>> TableResult, which collects results in the client. BTW, for printing you
>> can use TableResult#print, which will nicely format your results.
>>
>> Best,
>>
>> Dawid
>>
>> On 29/10/2020 16:13, Ruben Laguna wrote:
>>> How can I use the Table [Print SQL connector][1]? I tried the
>>> following (batch mode)  but it does not give any output:
>>>
>>>
>>> EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>>
>>> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
>>>
>>> Table transactions =
>>>         tEnv.fromValues(
>>>                 DataTypes.ROW(
>>>                         DataTypes.FIELD("account_id", DataTypes.BIGINT()),
>>>                         DataTypes.FIELD("amount", DataTypes.BIGINT()),
>>>                         DataTypes.FIELD("transaction_time",
>>> DataTypes.TIMESTAMP(3))),
>>>                 Row.of(1, 188, DATE_TIME.plusMinutes(12)),
>>>                 Row.of(2, 374, DATE_TIME.plusMinutes(47)),
>>>                 Row.of(3, 112, DATE_TIME.plusMinutes(36)),
>>>                 Row.of(4, 478, DATE_TIME.plusMinutes(3)),
>>>                 Row.of(5, 208, DATE_TIME.plusMinutes(8)),
>>>                 Row.of(1, 379, DATE_TIME.plusMinutes(53)),
>>>                 Row.of(2, 351, DATE_TIME.plusMinutes(32)),
>>>                 Row.of(3, 320, DATE_TIME.plusMinutes(31)),
>>>                 Row.of(4, 259, DATE_TIME.plusMinutes(19)),
>>>                 Row.of(5, 273, DATE_TIME.plusMinutes(42)));
>>> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
>>> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
>>>
>>> transactions.executeInsert("print_table");
>>>
>>>
>>> I can "materialize" the result manually and print them out with :
>>>
>>> for (Row row : materialize(transactions.execute())) {
>>>     System.out.println(row);
>>> }
>>>
>>> private static List<Row> materialize(TableResult results) {
>>>     try (CloseableIterator<Row> resultIterator = results.collect()) {
>>>         return StreamSupport
>>>
>>> .stream(Spliterators.spliteratorUnknownSize(resultIterator,
>>> Spliterator.ORDERED), false)
>>>                 .collect(Collectors.toList());
>>>     } catch (Exception e) {
>>>         throw new RuntimeException("Failed to materialize results", e);
>>>     }
>>> }
>>>
>>>
>>> But I would like to know why I can't just use the Print sink.
>>>
>>> I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
>>> don't thinks it's that.
>>>
>>> Does anybody know of any working example involving the print connector?
>>>
>>>
>>>
>>> [1]: 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to