Ishan,

How do you multi-thread?

Secondly, could you please tell me what to look from the log that the
Indexing is committing 1000 documents at a time after executing the code
*solr.commit()?*  Do you see anything  that tells why it stops after 5000
rows, while there are about 150000 rows fetched?  How to have it continue
after 5000?


On Thu, Dec 14, 2023 at 11:54 PM Ishan Chattopadhyaya <
ichattopadhy...@gmail.com> wrote:

> If you're able to do multithreaded indexing, it will go much faster.
>
> On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <
> sippingonesandze...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have written a  custom python program to Index which may provide a
> > better control than DIH.
> >
> > But, it is still doing at most 5000 documentation.  I have enable debug
> > logging to show after updating the log4j2.xml  I am doing a count of
> > documents after each batch of indexing.
> >
> > I would really appreciate you help to fix the Indexing only 5000
> documents.
> >
> > The solr.log is in enclosed zip.
> >
> > try:
> >     conn = psycopg2.connect(**postgres_connection)
> >     cursor = conn.cursor()
> >
> >     # Replace "your_table" and "your_columns" with your actual table and
> > columns
> >     postgres_query = """
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= %s
> >                             LIMIT %s OFFSET %s
> >                     """
> >     StartDT = '2023-07-29 00:00:00.0000'
> >
> >     batch_size = 1000  # Set your desired batch size
> >     offset = 0
> >
> >     while True:
> >         rows = fetch_data_batch(cursor, postgres_query, StartDT,
> > batch_size, offset)
> >
> >         # conver tuples to dicts, which is what solr wants
> >         if rows:
> >             print(f"Type of row: {type(rows[0])}")
> >             print(f"Example row content: {rows[0]}")
> >             # Get column names from cursor.description
> >             column_names        = [desc[0] for desc in
> cursor.description]
> >             rows_as_dicts       = [dict(zip(column_names, row)) for row
> in
> > rows]
> >
> >             print(f"Type of row after rows_as_dicts:
> {type(rows_as_dicts[0
> > ])}")
> >
> >         if not rows:
> >             break  # No more data
> >
> >         # Connect to Solr
> >         solr = Solr(solr_url, always_commit=True)
> >
> >         # Index data into Solr
> >         docs = [
> >             {
> >                 # Assuming each row is a dictionary where keys are field
> > names
> >                     "id":       str(row_dict["id"]),
> >                     "name":         row_dict["name"],
> >                     "update_at":    row_dict["update_at"].isoformat() if
> > "update_at" in rows_as_dicts else None
> >                     # Add more fields as needed
> >             }
> >             for row_dict in rows_as_dicts
> >         ]
> >
> >
> >         # Log the content of each document
> >         for doc in docs:
> >             logging.debug(f"Indexing document: {doc}")
> >
> >         # Index data into Solr and get the response
> >         response = solr.add(docs)
> >
> >         solr_core='p4a'
> >
> >         # Construct the Solr select query URL
> >         select_query_url = f"http://localhost:8983/solr/{solr_core}
> > /select?q=*:*&rows=0&wt=json"
> >
> >         # Request the select query response
> >         select_response = requests.get(select_query_url)
> >
> >         try:
> >             # Try to parse the response as JSON
> >             select_data = json.loads(select_response.text)
> >
> >             # Extract the number of documents from the response
> >             num_docs = select_data.get("response", {}).get("numFound",
> -1)
> >
> >             print(f"Total number of documents in the core '{solr_core}':
> {
> > num_docs}")
> >
> >         except json.decoder.JSONDecodeError as e:
> >             print(f"Error decoding JSON response: {e}")
> >             print(f"Response content: {select_response.text}")
> >
> >
> >         # Log the outcome of the indexing operation
> > *        logging.info <http://logging.info>("Committing batch to Solr")*
> >
> >         # Commit the changes to Solr
> > *        solr.commit()*
> >         time.sleep(1)
> >         offset += batch_size
> >
> > finally:
> >     # Close PostgreSQL connection
> >     cursor.close()
> >     conn.close()
> >
> > Example of content from Postgres:
> > Type of row: <class 'tuple'>
> > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3',
> > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165))
> > Type of row after rows_as_dicts: <class 'dict'>
> > Total number of documents in the core 'p4a': 5000
> >
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= '2023-07-29 00:00:00.0000'
> >                             LIMIT 1000 *OFFSET 13000*
> >
> > Type of row: <class 'tuple'>
> > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73',
> > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572))
> > Type of row after rows_as_dicts: <class 'dict'>
> > Total number of documents in the core 'p4a': 5000
> >
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= '2023-07-29 00:00:00.0000'
> >                             LIMIT 1000 *OFFSET 14000*
> >
> > Type of row: <class 'tuple'>
> > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3',
> > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065))
> > Type of row after rows_as_dicts: <class 'dict'>
> > Total number of documents in the core 'p4a': 5000
> >
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= '2023-07-29 00:00:00.0000'
> >                             LIMIT 1000 OFFSET 15000
> >
> >
> >
>

Reply via email to