[ https://issues.apache.org/jira/browse/FLINK-8244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328689#comment-17328689 ]
Flink Jira Bot commented on FLINK-8244: --------------------------------------- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > There are two zookeeper client created when read data from hbase in the flink > yarn session model > ------------------------------------------------------------------------------------------------- > > Key: FLINK-8244 > URL: https://issues.apache.org/jira/browse/FLINK-8244 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase > Affects Versions: 1.3.2 > Reporter: LionelZ > Priority: Major > Labels: stale-major > > I want to use flink HbaseTableInputFormat api to read data from hbase, and > that is ok when I try to run my code in the local model, but when I try to > run the code use the flink yarn session model,there some problems in task > manager,and it does't has any error message,and no data output from the > datasource ,finally I find the zookeeper client Create two times in the task > manager log as attachment show,I think there has some problem in flink. > thanks > {code:java} > import com.alibaba.fastjson.JSON > import com.google.gson.Gson > import com.demo.shmetro.RuleEnginerStreamingFlinkApp.logger > import com.demo.shmetro.bean.json.JsonBeanGenerator > import com.demo.shmetro.model.TrainData > import com.demo.shmetro.output.GlobalEntity > import com.demo.shmetro.rules.{BatchRuleEngineerByScannerExecutor, > RuleEngineerByRedisExecutor} > import com.demo.shmetro.source.HBaseTableInputFormat > import com.demo.shmetro.util.RedisUtil > import com.demo.shmetro.utils.{MysqlUtil} > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator, > MapOperator} > import org.apache.flink.api.java.tuple.Tuple2 > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.hadoop.hbase.client.Result > import org.apache.hadoop.hbase.util.Bytes > import scala.collection.JavaConversions._ > object RuleEnginerBatchFlinkAppCopy { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > env.setParallelism(1) > env.getConfig.setGlobalJobParameters(params) > val appName = params.get("batch.name") > val updateTime = params.getLong("update.time") > val tableName = params.get("hbase.table.name") > val columnFamily = params.get("hbase.table.cf") > val columns = params.get("hbase.table.columns").split(",") > val startTime = params.get("start.time") > val endTime = params.get("end.time") > val trainNo = params.get("train.no") > val startRow = new StringBuilder > startRow.append(trainNo).append("\0").append(startTime) > val endRow = new StringBuilder > endRow.append(trainNo).append("\0").append(endTime) > val hBaseDataSource: DataSource[Tuple2[String, String]] = > env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName, > columnFamily, columns, null, startRow.toString(), endRow.toString()) { > private val reuse = new Tuple2[String, String] > override > protected def mapResultToTuple(r: Result): Tuple2[String, String] = { > logger.error("**********hbase row: " + reuse) > val key = Bytes.toString(r.getRow) > val value = getMapResult(r) > reuse.setField(key, 0) > val data = value.get(key) > reuse.setField(JSON.toJSON(data).toString, 1) > return reuse > } > }) > hBaseDataSource.collect() > } > } > {code} > {code:java} > import org.apache.commons.lang.StringUtils; > import org.apache.flink.addons.hbase.AbstractTableInputFormat; > import org.apache.flink.addons.hbase.TableInputFormat; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.configuration.Configuration; > import org.apache.hadoop.hbase.Cell; > import org.apache.hadoop.hbase.CellUtil; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.HConstants; > import org.apache.hadoop.hbase.client.HTable; > import org.apache.hadoop.hbase.client.Result; > import org.apache.hadoop.hbase.client.Scan; > import org.apache.hadoop.hbase.filter.CompareFilter; > import org.apache.hadoop.hbase.filter.Filter; > import org.apache.hadoop.hbase.filter.RegexStringComparator; > import org.apache.hadoop.hbase.filter.RowFilter; > import org.apache.hadoop.hbase.util.Bytes; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > public abstract class HBaseTableInputFormat<T extends Tuple> extends > AbstractTableInputFormat<T> { > private String tableName; > private String columnFamily; > private String[] columns; > private String filterName; > private String startRow; > private String stopRow; > public HBaseTableInputFormat(String tableName, String columnFamily, > String[] columns, String filterName, String startRow, String stopRow) { > this.tableName = tableName; > this.columnFamily = columnFamily; > this.columns = columns; > this.filterName = filterName; > this.startRow = startRow; > this.stopRow = stopRow; > } > protected Scan getScanner() { > Scan scan = new Scan(); > if (!StringUtils.isEmpty(columnFamily)) { > scan.addFamily(columnFamily.getBytes()); > } > if (!StringUtils.isEmpty(filterName)) { > Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, > new RegexStringComparator(filterName)); > scan.setFilter(filter); > } > if (columns != null && !StringUtils.isEmpty(columnFamily)) { > for (String column : columns) { > scan.addColumn(columnFamily.getBytes(), > column.getBytes()); > } > } > if (!StringUtils.isEmpty(startRow)) { > scan.setStartRow(startRow.getBytes()); > } > if (!StringUtils.isEmpty(stopRow)) { > scan.setStopRow(stopRow.getBytes()); > } > return scan; > } > protected String getTableName() { > return tableName; > } > protected abstract T mapResultToTuple(Result r); > @Override > public void configure(Configuration parameters) { > table = createTable(); > if (table != null) { > scan = getScanner(); > } > } > private HTable createTable() { > LOG.info("Initializing HBaseConfiguration"); > org.apache.hadoop.conf.Configuration configuration = > HBaseConfiguration.create(); > > configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, > 1200000); > configuration.set("hbase.zookeeper.quorum", "x.x.x.x"); > configuration.set("hbase.master.info.port", "2181"); > configuration.set("hbase.master", "172.17.1.21:60000"); > configuration.setInt("hbase.rpc.timeout", 20000); > configuration.setInt("hbase.client.operation.timeout", 30000); > try { > return new HTable(configuration, getTableName()); > } catch (Exception e) { > LOG.error("Error instantiating a new HTable instance", e); > } > return null; > } > protected T mapResultToOutType(Result r) { > return mapResultToTuple(r); > } > public Map<String, Map<String, String>> getMapResult(Result result) { > Map<String, Map<String, String>> resMap = new HashMap<>(); > Cell[] cells = result.rawCells(); > Map<String, String> cellMap = new HashMap<>(); > for (Cell cell : cells) { > cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), > Bytes.toString(CellUtil.cloneValue(cell))); > } > resMap.put(Bytes.toString(result.getRow()), cellMap); > return resMap; > } > } > {code} > {panel:title=Task manager log} > 2017-12-12 20:18:23.968 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.runtime.filecache.FileCache - User file cache uses > directory > /data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/flink-dist-cache-2bd20058-9c1a-433b-aa35-e610342aedea > 2017-12-12 20:18:23.979 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Starting TaskManager actor at > akka://flink/user/taskmanager#1035429767. > 2017-12-12 20:18:23.980 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - TaskManager data connection > information: container_1513004072194_0025_01_000002 @ ml41.mlamp.co > (dataPort=10331) > 2017-12-12 20:18:23.981 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - TaskManager has 1 task slot(s). > 2017-12-12 20:18:23.982 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Memory usage stats: [HEAP: > 426/2944/2944 MB, NON HEAP: 39/40/-1 MB (used/committed/max)] > 2017-12-12 20:18:23.988 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Trying to register at JobManager > akka.tcp://fl...@ml42.mlamp.co:6542/user/jobmanager (attempt 1, timeout: 500 > milliseconds) > 2017-12-12 20:18:24.163 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Successful registration at > JobManager (akka.tcp://fl...@ml42.mlamp.co:6542/user/jobmanager), starting > network stack and library cache. > 2017-12-12 20:18:24.169 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Determined BLOB server address to be > ml42.mlamp.co/172.17.1.19:9593. Starting BLOB cache. > 2017-12-12 20:18:24.172 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage > directory /tmp/blobStore-a7b202b0-477f-40e5-a69e-b04bccb28acc > 2017-12-12 20:27:04.138 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Received task DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > 2017-12-12 20:27:04.139 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) switched from CREATED to DEPLOYING. > 2017-12-12 20:27:04.140 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak > safety net for task DataSource (at createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) [DEPLOYING] > 2017-12-12 20:27:04.146 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task > DataSource (at createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) [DEPLOYING]. > 2017-12-12 20:27:04.148 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.blob.BlobCache - Downloading > 15eb13effcf291931c0e661c53c3af04e3b63b78 from ml42.mlamp.co/172.17.1.19:9593 > 2017-12-12 20:27:04.171 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Registering task at network: > DataSource (at createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) [DEPLOYING]. > 2017-12-12 20:27:04.178 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) switched from DEPLOYING to RUNNING. > 2017-12-12 20:27:04.216 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.addons.hbase.AbstractTableInputFormat - Initializing > HBaseConfiguration > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client > environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client > environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client environment:host.name=ml41.mlamp.co > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client environment:host.name=ml41.mlamp.co > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_144 > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_144 > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle > Corporation > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle > Corporation > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client > environment:java.home=/usr/java/jdk1.8.0_144/jre > 2017-12-12 20:27:04.422 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client > environment:java.home=/usr/java/jdk1.8.0_144/jre > hFlinkAppCopy$$anon$1)) (1/1)] INFO org.apache.zookeeper.ZooKeeper - Client > environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002 > 2017-12-12 20:27:04.423 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Client > environment:user.dir=/data/yarn/nm/usercache/root/appcache/application_1513004072194_0025/container_e25_1513004072194_0025_01_000002 > 2017-12-12 20:27:04.424 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Initiating client connection, > connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 > sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, > quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase > 2017-12-12 20:27:04.424 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Initiating client connection, > connectString=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181 > sessionTimeout=90000 watcher=hconnection-0x4ad59d510x0, > quorum=172.17.1.19:2181,172.17.1.20:2181,172.17.1.23:2181, baseZNode=/hbase > 2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN > org.apache.zookeeper.ClientCnxn - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-12-12 20:27:04.438 [DataSource (172.17.1.20:2181)] WARN > org.apache.zookeeper.ClientCnxn - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-6441061934657919784.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO > org.apache.zookeeper.ClientCnxn - Opening socket connection to server > 172.17.1.20/172.17.1.20:2181 > 2017-12-12 20:27:04.440 [DataSource (172.17.1.20:2181)] INFO > org.apache.zookeeper.ClientCnxn - Opening socket connection to server > 172.17.1.20/172.17.1.20:2181 > 2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO > org.apache.zookeeper.ClientCnxn - Socket connection established to > 172.17.1.20/172.17.1.20:2181, initiating session > 2017-12-12 20:27:04.442 [DataSource (172.17.1.20:2181)] INFO > org.apache.zookeeper.ClientCnxn - Socket connection established to > 172.17.1.20/172.17.1.20:2181, initiating session > 2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO > org.apache.zookeeper.ClientCnxn - Session establishment complete on server > 172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated > timeout = 60000 > 2017-12-12 20:27:04.456 [DataSource (172.17.1.20:2181)] INFO > org.apache.zookeeper.ClientCnxn - Session establishment complete on server > 172.17.1.20/172.17.1.20:2181, sessionid = 0x2604070bd951a5e, negotiated > timeout = 60000 > 2017-12-12 20:27:05.024 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] WARN > org.apache.flink.metrics.MetricGroup - The operator name DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) exceeded the 80 > characters length limit and was truncated. > 2017-12-12 20:27:05.048 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.addons.hbase.AbstractTableInputFormat - opening split > (this=com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1@313183a0)[0|[ml42.mlamp.co:60020]|3330�9223370526285553387|3330�9223370526285563387] > 2017-12-12 20:27:05.337 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO urce (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Session: 0x2604070bd951a5e closed > 2017-12-12 20:27:05.439 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] > INFO org.apache.zookeeper.ClientCnxn - EventThread shut down > 2017-12-12 20:27:05.439 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.zookeeper.ZooKeeper - Session: 0x2604070bd951a5e closed > 2017-12-12 20:27:05.439 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)-EventThread] > INFO org.apache.zookeeper.ClientCnxn - EventThread shut down > 2017-12-12 20:27:05.453 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) switched from RUNNING to FINISHED. > 2017-12-12 20:27:05.453 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Freeing task resources for > DataSource (at createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff). > 2017-12-12 20:27:05.453 [DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams > are closed for task DataSource (at createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) (1/1) > (16703a0cb1de95710c1f204e440eecff) [FINISHED] > 2017-12-12 20:27:05.456 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Un-registering task and sending > final execution state FINISHED to JobManager for task DataSource (at > createInput(ExecutionEnvironment.java:553) > (com.demo.rules.RuleEnginerBatchFlinkAppCopy$$anon$1)) > (16703a0cb1de95710c1f204e440eecff) > 2017-12-12 20:27:05.464 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.yarn.YarnTaskManager - Received task DataSink (collect()) > (1/1) > 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1) > (0bd94830e242beb92a8916ac27cd0b89) switched from CREATED to DEPLOYING. > 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak > safety net for task DataSink (collect()) (1/1) > (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING] > 2017-12-12 20:27:05.465 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task > DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING]. > 2017-12-12 20:27:05.469 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Registering task at network: > DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89) [DEPLOYING]. > 2017-12-12 20:27:05.470 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1) > (0bd94830e242beb92a8916ac27cd0b89) switched from DEPLOYING to RUNNING. > 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1) > (0bd94830e242beb92a8916ac27cd0b89) switched from RUNNING to FINISHED. > 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Freeing task resources for > DataSink (collect()) (1/1) (0bd94830e242beb92a8916ac27cd0b89). > 2017-12-12 20:27:05.485 [DataSink (collect()) (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams > are closed for task DataSink (collect()) (1/1) > (0bd94830e242beb92a8916ac27cd0b89) [FINISHED] > 2017-12-12 20:27:05.486 [flink-akka.actor.default-dispatcher-2] INFO > org.apache.flink.yarn.YarnTaskManager - Un-registering task and sending > final execution state FINISHED to JobManager for task DataSink (collect()) > (0bd94830e242beb92a8916ac27cd0b89) > {panel} -- This message was sent by Atlassian Jira (v8.3.4#803005)