Ishan, you are right.  Doing multithreaded Indexing is going much faster.
I found out after the remote machine became unresponsive very quickly ; it
crashed.  lol.

On Fri, Dec 15, 2023 at 4:35 AM Vince McMahon <sippingonesandze...@gmail.com>
wrote:

> Ishan,
>
> How do you multi-thread?
>
> Secondly, could you please tell me what to look from the log that the
> Indexing is committing 1000 documents at a time after executing the code
> *solr.commit()?*  Do you see anything  that tells why it stops after 5000
> rows, while there are about 150000 rows fetched?  How to have it continue
> after 5000?
>
>
> On Thu, Dec 14, 2023 at 11:54 PM Ishan Chattopadhyaya <
> ichattopadhy...@gmail.com> wrote:
>
>> If you're able to do multithreaded indexing, it will go much faster.
>>
>> On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <
>> sippingonesandze...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > I have written a  custom python program to Index which may provide a
>> > better control than DIH.
>> >
>> > But, it is still doing at most 5000 documentation.  I have enable debug
>> > logging to show after updating the log4j2.xml  I am doing a count of
>> > documents after each batch of indexing.
>> >
>> > I would really appreciate you help to fix the Indexing only 5000
>> documents.
>> >
>> > The solr.log is in enclosed zip.
>> >
>> > try:
>> >     conn = psycopg2.connect(**postgres_connection)
>> >     cursor = conn.cursor()
>> >
>> >     # Replace "your_table" and "your_columns" with your actual table and
>> > columns
>> >     postgres_query = """
>> >                             SELECT
>> >                                     ROW_NUMBER() over (order by
>> update_at)
>> >                                     , id, name, update_at
>> >                             FROM city
>> >                             WHERE update_at >= %s
>> >                             LIMIT %s OFFSET %s
>> >                     """
>> >     StartDT = '2023-07-29 00:00:00.0000'
>> >
>> >     batch_size = 1000  # Set your desired batch size
>> >     offset = 0
>> >
>> >     while True:
>> >         rows = fetch_data_batch(cursor, postgres_query, StartDT,
>> > batch_size, offset)
>> >
>> >         # conver tuples to dicts, which is what solr wants
>> >         if rows:
>> >             print(f"Type of row: {type(rows[0])}")
>> >             print(f"Example row content: {rows[0]}")
>> >             # Get column names from cursor.description
>> >             column_names        = [desc[0] for desc in
>> cursor.description]
>> >             rows_as_dicts       = [dict(zip(column_names, row)) for row
>> in
>> > rows]
>> >
>> >             print(f"Type of row after rows_as_dicts:
>> {type(rows_as_dicts[0
>> > ])}")
>> >
>> >         if not rows:
>> >             break  # No more data
>> >
>> >         # Connect to Solr
>> >         solr = Solr(solr_url, always_commit=True)
>> >
>> >         # Index data into Solr
>> >         docs = [
>> >             {
>> >                 # Assuming each row is a dictionary where keys are field
>> > names
>> >                     "id":       str(row_dict["id"]),
>> >                     "name":         row_dict["name"],
>> >                     "update_at":    row_dict["update_at"].isoformat() if
>> > "update_at" in rows_as_dicts else None
>> >                     # Add more fields as needed
>> >             }
>> >             for row_dict in rows_as_dicts
>> >         ]
>> >
>> >
>> >         # Log the content of each document
>> >         for doc in docs:
>> >             logging.debug(f"Indexing document: {doc}")
>> >
>> >         # Index data into Solr and get the response
>> >         response = solr.add(docs)
>> >
>> >         solr_core='p4a'
>> >
>> >         # Construct the Solr select query URL
>> >         select_query_url = f"http://localhost:8983/solr/{solr_core}
>> > /select?q=*:*&rows=0&wt=json"
>> >
>> >         # Request the select query response
>> >         select_response = requests.get(select_query_url)
>> >
>> >         try:
>> >             # Try to parse the response as JSON
>> >             select_data = json.loads(select_response.text)
>> >
>> >             # Extract the number of documents from the response
>> >             num_docs = select_data.get("response", {}).get("numFound",
>> -1)
>> >
>> >             print(f"Total number of documents in the core
>> '{solr_core}': {
>> > num_docs}")
>> >
>> >         except json.decoder.JSONDecodeError as e:
>> >             print(f"Error decoding JSON response: {e}")
>> >             print(f"Response content: {select_response.text}")
>> >
>> >
>> >         # Log the outcome of the indexing operation
>> > *        logging.info <http://logging.info>("Committing batch to
>> Solr")*
>> >
>> >         # Commit the changes to Solr
>> > *        solr.commit()*
>> >         time.sleep(1)
>> >         offset += batch_size
>> >
>> > finally:
>> >     # Close PostgreSQL connection
>> >     cursor.close()
>> >     conn.close()
>> >
>> > Example of content from Postgres:
>> > Type of row: <class 'tuple'>
>> > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3',
>> > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165))
>> > Type of row after rows_as_dicts: <class 'dict'>
>> > Total number of documents in the core 'p4a': 5000
>> >
>> >                             SELECT
>> >                                     ROW_NUMBER() over (order by
>> update_at)
>> >                                     , id, name, update_at
>> >                             FROM city
>> >                             WHERE update_at >= '2023-07-29
>> 00:00:00.0000'
>> >                             LIMIT 1000 *OFFSET 13000*
>> >
>> > Type of row: <class 'tuple'>
>> > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73',
>> > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572))
>> > Type of row after rows_as_dicts: <class 'dict'>
>> > Total number of documents in the core 'p4a': 5000
>> >
>> >                             SELECT
>> >                                     ROW_NUMBER() over (order by
>> update_at)
>> >                                     , id, name, update_at
>> >                             FROM city
>> >                             WHERE update_at >= '2023-07-29
>> 00:00:00.0000'
>> >                             LIMIT 1000 *OFFSET 14000*
>> >
>> > Type of row: <class 'tuple'>
>> > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3',
>> > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065))
>> > Type of row after rows_as_dicts: <class 'dict'>
>> > Total number of documents in the core 'p4a': 5000
>> >
>> >                             SELECT
>> >                                     ROW_NUMBER() over (order by
>> update_at)
>> >                                     , id, name, update_at
>> >                             FROM city
>> >                             WHERE update_at >= '2023-07-29
>> 00:00:00.0000'
>> >                             LIMIT 1000 OFFSET 15000
>> >
>> >
>> >
>>
>

Reply via email to