malliaridis commented on code in PR #2479: URL: https://github.com/apache/solr/pull/2479#discussion_r1834874458
########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) Review Comment: Any `.required(false)` is obsolete and could be removed. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } Review Comment: This is block no 2 ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials(cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); + + TupleStream stream; + PushBackStream pushBackStream; + + StreamExpression streamExpression = StreamExpressionParser.parse(expr); + StreamFactory streamFactory = new StreamFactory(); + + // stdin is ONLY available in the local mode, not in the remote mode as it + // requires access to System.in + streamFactory.withFunctionName("stdin", StandardInStream.class); + + // LocalCatStream extends CatStream and disables the Solr cluster specific + // logic about where to read data from. + streamFactory.withFunctionName("cat", LocalCatStream.class); + + streamFactory.withDefaultZkHost(zkHost); + + Lang.register(streamFactory); + + stream = StreamTool.constructStream(streamFactory, streamExpression); + + pushBackStream = new PushBackStream(stream); + + // Now we can run the stream and return the results. + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + // Output the headers + pushBackStream.setStreamContext(streamContext); + + return pushBackStream; + } + + public PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + + String solrUrl = SolrCLI.normalizeSolrUrl(cli); + if (!cli.hasOption("name")) { Review Comment: Use option object instead for consistency. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { Review Comment: I believe these methods could all be private if the tests do not directly access them. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials(cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); Review Comment: What about user-managed mode? Is it still supported, or is only cloud mode supported yet? If I am not mistaken `getConnectionOptions` include a solr-url as well. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } Review Comment: There is a lot happening in the `runImpl`, maybe extracting the two blocks into separate private methods would keep a better overview? (this is block no 1) ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials(cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); + + TupleStream stream; + PushBackStream pushBackStream; + + StreamExpression streamExpression = StreamExpressionParser.parse(expr); + StreamFactory streamFactory = new StreamFactory(); + + // stdin is ONLY available in the local mode, not in the remote mode as it + // requires access to System.in + streamFactory.withFunctionName("stdin", StandardInStream.class); + + // LocalCatStream extends CatStream and disables the Solr cluster specific + // logic about where to read data from. + streamFactory.withFunctionName("cat", LocalCatStream.class); + + streamFactory.withDefaultZkHost(zkHost); + + Lang.register(streamFactory); + + stream = StreamTool.constructStream(streamFactory, streamExpression); + + pushBackStream = new PushBackStream(stream); + + // Now we can run the stream and return the results. + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + // Output the headers + pushBackStream.setStreamContext(streamContext); + + return pushBackStream; + } + + public PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + + String solrUrl = SolrCLI.normalizeSolrUrl(cli); + if (!cli.hasOption("name")) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --worker solr parameter."); + } + String collection = cli.getOptionValue("name"); Review Comment: Use option object instead for consistency. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) Review Comment: If this loads an exprssion from a file, would it not be better to use ".expr" instead of "expr"? ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() Review Comment: I think you could simplify it by directly returning the value of `opts` here. Prevents any "surprises" further down this method. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials(cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); + + TupleStream stream; + PushBackStream pushBackStream; + + StreamExpression streamExpression = StreamExpressionParser.parse(expr); + StreamFactory streamFactory = new StreamFactory(); + + // stdin is ONLY available in the local mode, not in the remote mode as it + // requires access to System.in + streamFactory.withFunctionName("stdin", StandardInStream.class); + + // LocalCatStream extends CatStream and disables the Solr cluster specific + // logic about where to read data from. + streamFactory.withFunctionName("cat", LocalCatStream.class); + + streamFactory.withDefaultZkHost(zkHost); + + Lang.register(streamFactory); + + stream = StreamTool.constructStream(streamFactory, streamExpression); + + pushBackStream = new PushBackStream(stream); + + // Now we can run the stream and return the results. + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + // Output the headers + pushBackStream.setStreamContext(streamContext); + + return pushBackStream; + } + + public PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + + String solrUrl = SolrCLI.normalizeSolrUrl(cli); + if (!cli.hasOption("name")) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --worker solr parameter."); + } + String collection = cli.getOptionValue("name"); + + if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { + throw new IllegalStateException( + "The stdin() expression is only usable with --worker local set up."); + } + + final SolrStream solrStream = + new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); + + String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); + if (credentials != null) { + String username = credentials.split(":")[0]; + String password = credentials.split(":")[1]; + solrStream.setCredentials(username, password); + } + return new PushBackStream(solrStream); + } + + public static ModifiableSolrParams params(String... params) { + if (params.length % 2 != 0) throw new RuntimeException("Params length should be even"); + ModifiableSolrParams msp = new ModifiableSolrParams(); + for (int i = 0; i < params.length; i += 2) { + msp.add(params[i], params[i + 1]); + } + return msp; + } + + public static class StandardInStream extends TupleStream implements Expressible { + + private BufferedReader reader; + private InputStream inputStream = System.in; + private boolean doClose = false; + + public StandardInStream() {} + + public StandardInStream(StreamExpression expression, StreamFactory factory) + throws IOException {} + + @Override + public List<TupleStream> children() { + return null; + } + + public void setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + this.doClose = true; + } + + @Override + public void open() { + reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + } + + @Override + public void close() throws IOException { + if (doClose) { + inputStream.close(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public Tuple read() throws IOException { + String line = reader.readLine(); + HashMap map = new HashMap(); + Tuple tuple = new Tuple(map); + if (line != null) { + tuple.put("line", line); + tuple.put("file", "cat"); + } else { + tuple.put("EOF", "true"); + } + return tuple; + } + + @Override + public void setStreamContext(StreamContext context) {} + + @Override + public StreamExpression toExpression(StreamFactory factory) { + return null; + } + + @Override + public Explanation toExplanation(StreamFactory factory) { + return null; + } + + @Override + public StreamComparator getStreamSort() { + return null; + } + } + + public static String[] getOutputFields(CommandLine cli) { + if (cli.hasOption("fields")) { + + String fl = cli.getOptionValue("fields"); Review Comment: Use option object instead for consistency. ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); Review Comment: Are multiple spaces as delimeter fine / expected? ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials(cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); + + TupleStream stream; + PushBackStream pushBackStream; + + StreamExpression streamExpression = StreamExpressionParser.parse(expr); + StreamFactory streamFactory = new StreamFactory(); + + // stdin is ONLY available in the local mode, not in the remote mode as it + // requires access to System.in + streamFactory.withFunctionName("stdin", StandardInStream.class); + + // LocalCatStream extends CatStream and disables the Solr cluster specific + // logic about where to read data from. + streamFactory.withFunctionName("cat", LocalCatStream.class); + + streamFactory.withDefaultZkHost(zkHost); + + Lang.register(streamFactory); + + stream = StreamTool.constructStream(streamFactory, streamExpression); + + pushBackStream = new PushBackStream(stream); + + // Now we can run the stream and return the results. + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + // Output the headers + pushBackStream.setStreamContext(streamContext); + + return pushBackStream; + } + + public PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + + String solrUrl = SolrCLI.normalizeSolrUrl(cli); + if (!cli.hasOption("name")) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --worker solr parameter."); + } + String collection = cli.getOptionValue("name"); + + if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { + throw new IllegalStateException( + "The stdin() expression is only usable with --worker local set up."); + } + + final SolrStream solrStream = + new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); Review Comment: Don't like string concats for URLs, because we get into "v1 / v2" dilemma, but I am not sure if there is another way for this? ########## solr/benchmark/.factorypath: ########## @@ -0,0 +1,6 @@ +<factorypath> + <factorypathentry kind="EXTJAR" id="/Users/epugh/.gradle/caches/modules-2/files-2.1/org.apache.commons/commons-math3/3.6.1/e4ba98f1d4b3c80ec46392f25e094a6a2e58fcbf/commons-math3-3.6.1.jar" enabled="true" runInBatchMode="false"/> + <factorypathentry kind="EXTJAR" id="/Users/epugh/.gradle/caches/modules-2/files-2.1/org.openjdk.jmh/jmh-generator-annprocess/1.37/da93888682df163144edf9b13d2b78e54166063a/jmh-generator-annprocess-1.37.jar" enabled="true" runInBatchMode="false"/> + <factorypathentry kind="EXTJAR" id="/Users/epugh/.gradle/caches/modules-2/files-2.1/net.sf.jopt-simple/jopt-simple/5.0.4/4fdac2fbe92dfad86aa6e9301736f6b4342a3f5c/jopt-simple-5.0.4.jar" enabled="true" runInBatchMode="false"/> + <factorypathentry kind="EXTJAR" id="/Users/epugh/.gradle/caches/modules-2/files-2.1/org.openjdk.jmh/jmh-core/1.37/896f27e49105b35ea1964319c83d12082e7a79ef/jmh-core-1.37.jar" enabled="true" runInBatchMode="false"/> +</factorypath> Review Comment: I believe you didn't mean to push this file? ########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java: ########## @@ -224,4 +224,9 @@ public StreamComparator getStreamSort() { public int getCost() { return 0; } + + @SuppressWarnings({"rawtypes"}) + public Map getLetParams() { + return this.letParams; + } Review Comment: This method has no usage, is it needed? And if so, should we not simply add the types to `Map<String, Object>` rather than suppressing them? ########## solr/core/src/java/org/apache/solr/cli/StreamTool.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("CONTEXT") + .required(false) + .desc( + "Execution context is either 'local' (i.e CLI process) or 'solr'. Default is 'solr'") + .build(); + + private static final Option COLLECTION_OPTION = Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the collection to execute on if workers are 'solr'. Required for 'solr' worker.") + .build(); + + private static final Option FIELDS_OPTION = Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .required(false) + .desc( + "The fields in the tuples to output. (defaults to fields in the first tuple of result set).") + .build(); + + private static final Option HEADER_OPTION = + Option.builder() + .longOpt("header") + .required(false) + .desc("Whether or not to include a header line. (default=false)") + .build(); + private static final Option DELIMITER_OPTION = Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The output delimiter. (default=tab).") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .required(false) + .desc("The delimiter multi-valued fields. (default=|)") + .build(); + + @Override + public Options getOptions() { + Options opts = + super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + + return opts; + } + + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION,"solr"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION,"|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith("expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + public PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials(cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); + + TupleStream stream; + PushBackStream pushBackStream; + + StreamExpression streamExpression = StreamExpressionParser.parse(expr); + StreamFactory streamFactory = new StreamFactory(); + + // stdin is ONLY available in the local mode, not in the remote mode as it + // requires access to System.in + streamFactory.withFunctionName("stdin", StandardInStream.class); + + // LocalCatStream extends CatStream and disables the Solr cluster specific + // logic about where to read data from. + streamFactory.withFunctionName("cat", LocalCatStream.class); + + streamFactory.withDefaultZkHost(zkHost); + + Lang.register(streamFactory); + + stream = StreamTool.constructStream(streamFactory, streamExpression); + + pushBackStream = new PushBackStream(stream); + + // Now we can run the stream and return the results. + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + // Output the headers + pushBackStream.setStreamContext(streamContext); + + return pushBackStream; + } + + public PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + + String solrUrl = SolrCLI.normalizeSolrUrl(cli); + if (!cli.hasOption("name")) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --worker solr parameter."); + } + String collection = cli.getOptionValue("name"); + + if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { + throw new IllegalStateException( + "The stdin() expression is only usable with --worker local set up."); + } + + final SolrStream solrStream = + new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); + + String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); + if (credentials != null) { + String username = credentials.split(":")[0]; + String password = credentials.split(":")[1]; + solrStream.setCredentials(username, password); + } + return new PushBackStream(solrStream); + } + + public static ModifiableSolrParams params(String... params) { + if (params.length % 2 != 0) throw new RuntimeException("Params length should be even"); + ModifiableSolrParams msp = new ModifiableSolrParams(); + for (int i = 0; i < params.length; i += 2) { + msp.add(params[i], params[i + 1]); + } + return msp; + } + + public static class StandardInStream extends TupleStream implements Expressible { + + private BufferedReader reader; + private InputStream inputStream = System.in; + private boolean doClose = false; + + public StandardInStream() {} + + public StandardInStream(StreamExpression expression, StreamFactory factory) + throws IOException {} + + @Override + public List<TupleStream> children() { + return null; + } + + public void setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + this.doClose = true; + } + + @Override + public void open() { + reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + } + + @Override + public void close() throws IOException { + if (doClose) { + inputStream.close(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public Tuple read() throws IOException { + String line = reader.readLine(); + HashMap map = new HashMap(); + Tuple tuple = new Tuple(map); + if (line != null) { + tuple.put("line", line); + tuple.put("file", "cat"); + } else { + tuple.put("EOF", "true"); + } + return tuple; + } + + @Override + public void setStreamContext(StreamContext context) {} + + @Override + public StreamExpression toExpression(StreamFactory factory) { + return null; + } + + @Override + public Explanation toExplanation(StreamFactory factory) { + return null; + } + + @Override + public StreamComparator getStreamSort() { + return null; + } + } + + public static String[] getOutputFields(CommandLine cli) { + if (cli.hasOption("fields")) { Review Comment: Use option object instead for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org