Hi,

we did similar some time ago to read a large JSON file content. It looks for a 
JSON array with a given name and splits it to chunks with a given size. Works 
with an input stream. See the following code ..

import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.databind.*;

import java.io.InputStream;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JsonSplitter implements Processor {

        private static final transient Log log = 
LogFactory.getLog(JsonSplitter.class.getName());

        // constants for property names
        private static final String ARRAY_NAME = "arrayName";
        private static final String SPLIT_SIZE = "splitSize";

        // initialize variables
        private ProducerTemplate producer;
        private String splitString;

        public void setProducer(ProducerTemplate producer) {
                this.producer = producer;
        }

        public void process(Exchange exchange) throws Exception {

                // get variables from Exchange properties
                String arrayName = (String) exchange.getProperty(ARRAY_NAME);
                int splitSize = Integer.parseInt((String) 
exchange.getProperty(SPLIT_SIZE));

                // log the values
                log.info("arrayName: " + arrayName);
                log.info("splitSize: " + splitSize);

                // get JSON parser from Exchange Body with type 
java.io.InputStream
                JsonFactory f = new MappingJsonFactory();
                JsonParser jp = f.createParser((InputStream) 
exchange.getIn().getBody());
                JsonToken current;

                // get JSON root and validate if it is a JSON object
                current = jp.nextToken();

                // throw an exception when the root isn't a JSON object
                if (current != JsonToken.START_OBJECT) {
                        throw new Exception("Input root is not a JSON object.");
                }

                // traverse the JSON object tree
                while (jp.nextToken() != JsonToken.END_OBJECT) {
                        String fieldName = jp.getCurrentName();
                        current = jp.nextToken();

                        // check if the field name matches the ARRAY_NAME
                        if (fieldName.equals(arrayName)) {

                                // check if the token matches the START_ARRAY
                                if (current == JsonToken.START_ARRAY) {

                                        // initialize loop variables
                                        int i = 1;
                                        int j = 1;
                                        splitString = "";

                                        // for each of the records in the array
                                        while (jp.nextToken() != 
JsonToken.END_ARRAY) {

                                                // read the record into a tree 
model,
                                                // this moves the parsing 
position to the end of it
                                                JsonNode node = 
jp.readValueAsTree();

                                                // concatenate the array 
objects as strings
                                                if (splitString == "") {
                                                        splitString = 
node.toString();
                                                } else {
                                                        splitString = 
splitString + "," + node.toString();
                                                }

                                                // check if the split size has 
been reached
                                                if (i % splitSize == 0) {

                                                        // wrap the splitted 
chunk into the array boundaries
                                                        splitString = "{\"" + 
arrayName + "\":[" + splitString + "]}";
                                                        log.info("splitIndex: " 
+ j);

                                                        // process the chunk
                                                        producer.send(new 
Processor() {
                                                                public void 
process(Exchange outExchange) {
                                                                        
outExchange.getIn().setBody(splitString);
                                                                }
                                                        });

                                                        // re-initialize the 
split string
                                                        splitString = "";
                                                }

                                                // increment the loop variables
                                                i++;
                                                j++;
                                        }

                                        // wrap the last splitted chunk into 
the array boundaries
                                        splitString = "{\"" + arrayName + 
"\":[" + splitString + "]}";
                                        log.info("splitIndex: " + j);

                                        // process last chunk < maxChunkSize
                                        producer.send(new Processor() {
                                                public void process(Exchange 
outExchange) {
                                                        
outExchange.getIn().setBody(splitString);
                                                }
                                        });
                                } else {

                                        // throw an exception when the 
ARRAY_NAME object isn't an array
                                        throw new Exception(ARRAY_NAME + "is 
not a JSON array.");
                                }
                        } else {

                                // log the non array field
                                log.info("Unprocessed property: " + fieldName);
                                jp.skipChildren();
                        }
                }

                // close InputStream
                jp.close();
        }
}

Best
Gerald

> Site Register <[email protected]> hat am 23.10.2020 18:34 
> geschrieben:
> 
>  
> Thank you for all the suggestions. 
> For now, I am using gson streaming manually read the huge json file and using 
> producer template to produce the messages.
> 
>     On Friday, October 23, 2020, 12:05:26 PM EDT, Claus Ibsen 
> <[email protected]> wrote:  
>  
>  Ah there is a nice jsonsurfer project that adds streaming support.
> I have created JIRA: https://issues.apache.org/jira/browse/CAMEL-15746
> 
> 
> On Fri, Oct 23, 2020 at 5:59 PM Claus Ibsen <[email protected]> wrote:
> >
> > Hi
> >
> > Just a note about splitting big xml files. Then there is camel-stax
> > and also the xtokenizer language you can use in splitter with
> > streaming.
> > We have some blogs and articles about that.
> >
> > For json then the jsonpath project have a ticket about adding support
> > for streaming, but the maintainers of that project is no very active
> > so I doubt they get around working on that. A PR was in the works that
> > is stalled: https://github.com/json-path/JsonPath/pull/93
> >
> > But from camel you can split anything in streaming mode by passing in
> > something that can be iterated - then its splitted in chunks.
> >
> > If your json file is structured the same way - then you could write an
> > iterator that reads the content and emit chunks when you find where
> > each "array" ends.
> >
> > We may consider adding an jtokenizer (json tokenizer) to camel that
> > works a bit the same.
> >
> > On Fri, Oct 23, 2020 at 4:39 PM Romain Manni-Bucau
> > <[email protected]> wrote:
> > >
> > > Hi,
> > >
> > > From my experience - got the same issue with xml years ago now, the
> > > simplest is to do a custom component (like jsonstreaming:....) and handle
> > > the split in this component.
> > > It will likely require a jsonpointer to find an array to emit array item
> > > one by one to next processor.
> > > At the end the route could look like something like:
> > >
> > > from("jsonstreaming:/path/file.json?pointer=/items")
> > >  .to("bean:processUser");
> > >
> > > Romain Manni-Bucau
> > > @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> > > <https://rmannibucau.metawerx.net/> | Old Blog
> > > <http://rmannibucau.wordpress.com> | Github 
> > > <https://github.com/rmannibucau> |
> > > LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
> > > <https://www.packtpub.com/application-development/java-ee-8-high-performance>
> > >
> > >
> > > Le ven. 23 oct. 2020 à 16:16, Mantas Gridinas <[email protected]> a
> > > écrit :
> > >
> > > > From personal experience any activities related to file component tend 
> > > > to
> > > > try to load entire file into memory. You could perhaps fiddle around by
> > > > converting it to an input stream but then you get into an issue of 
> > > > making
> > > > sure that you don't read an entire file into memory before actually
> > > > converting it.
> > > >
> > > > I'd suggest avoiding using camel here at all for the sake of retaining 
> > > > fine
> > > > grained control over streaming process. At most you could wrap it in a
> > > > processor and be done with it. Otherwise you'll start programming with
> > > > routes, which is something you'd want to avoid.
> > > >
> > > >
> > > > On Fri, Oct 23, 2020, 17:05 Site Register <[email protected]
> > > > .invalid>
> > > > wrote:
> > > >
> > > > > Hi Camel Users,
> > > > > I have a 4G json array file need to load into database. How can I
> > > > leverage
> > > > > Camel to stream the file and split into json?
> > > > > I had tried to use "stream:file" but it was reading line by line not
> > > > split
> > > > > into json.
> > > > > I leveraged gson streaming to read and insert into the database in 
> > > > > java
> > > > > application which took about 3 minutes. However, I would like to 
> > > > > check if
> > > > > there is a way to leverage camel pipeline for the same purpose.
> > > > > Thank you,
> > > >
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > http://davsclaus.com @davsclaus
> > Camel in Action 2: https://www.manning.com/ibsen2
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to