If you're able to do multithreaded indexing, it will go much faster.

On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <sippingonesandze...@gmail.com>
wrote:

> Hi,
>
> I have written a  custom python program to Index which may provide a
> better control than DIH.
>
> But, it is still doing at most 5000 documentation.  I have enable debug
> logging to show after updating the log4j2.xml  I am doing a count of
> documents after each batch of indexing.
>
> I would really appreciate you help to fix the Indexing only 5000 documents.
>
> The solr.log is in enclosed zip.
>
> try:
>     conn = psycopg2.connect(**postgres_connection)
>     cursor = conn.cursor()
>
>     # Replace "your_table" and "your_columns" with your actual table and
> columns
>     postgres_query = """
>                             SELECT
>                                     ROW_NUMBER() over (order by update_at)
>                                     , id, name, update_at
>                             FROM city
>                             WHERE update_at >= %s
>                             LIMIT %s OFFSET %s
>                     """
>     StartDT = '2023-07-29 00:00:00.0000'
>
>     batch_size = 1000  # Set your desired batch size
>     offset = 0
>
>     while True:
>         rows = fetch_data_batch(cursor, postgres_query, StartDT,
> batch_size, offset)
>
>         # conver tuples to dicts, which is what solr wants
>         if rows:
>             print(f"Type of row: {type(rows[0])}")
>             print(f"Example row content: {rows[0]}")
>             # Get column names from cursor.description
>             column_names        = [desc[0] for desc in cursor.description]
>             rows_as_dicts       = [dict(zip(column_names, row)) for row in
> rows]
>
>             print(f"Type of row after rows_as_dicts: {type(rows_as_dicts[0
> ])}")
>
>         if not rows:
>             break  # No more data
>
>         # Connect to Solr
>         solr = Solr(solr_url, always_commit=True)
>
>         # Index data into Solr
>         docs = [
>             {
>                 # Assuming each row is a dictionary where keys are field
> names
>                     "id":       str(row_dict["id"]),
>                     "name":         row_dict["name"],
>                     "update_at":    row_dict["update_at"].isoformat() if
> "update_at" in rows_as_dicts else None
>                     # Add more fields as needed
>             }
>             for row_dict in rows_as_dicts
>         ]
>
>
>         # Log the content of each document
>         for doc in docs:
>             logging.debug(f"Indexing document: {doc}")
>
>         # Index data into Solr and get the response
>         response = solr.add(docs)
>
>         solr_core='p4a'
>
>         # Construct the Solr select query URL
>         select_query_url = f"http://localhost:8983/solr/{solr_core}
> /select?q=*:*&rows=0&wt=json"
>
>         # Request the select query response
>         select_response = requests.get(select_query_url)
>
>         try:
>             # Try to parse the response as JSON
>             select_data = json.loads(select_response.text)
>
>             # Extract the number of documents from the response
>             num_docs = select_data.get("response", {}).get("numFound", -1)
>
>             print(f"Total number of documents in the core '{solr_core}': {
> num_docs}")
>
>         except json.decoder.JSONDecodeError as e:
>             print(f"Error decoding JSON response: {e}")
>             print(f"Response content: {select_response.text}")
>
>
>         # Log the outcome of the indexing operation
> *        logging.info <http://logging.info>("Committing batch to Solr")*
>
>         # Commit the changes to Solr
> *        solr.commit()*
>         time.sleep(1)
>         offset += batch_size
>
> finally:
>     # Close PostgreSQL connection
>     cursor.close()
>     conn.close()
>
> Example of content from Postgres:
> Type of row: <class 'tuple'>
> Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3',
> datetime.datetime(2023, 7, 31, 9, 39, 13, 463165))
> Type of row after rows_as_dicts: <class 'dict'>
> Total number of documents in the core 'p4a': 5000
>
>                             SELECT
>                                     ROW_NUMBER() over (order by update_at)
>                                     , id, name, update_at
>                             FROM city
>                             WHERE update_at >= '2023-07-29 00:00:00.0000'
>                             LIMIT 1000 *OFFSET 13000*
>
> Type of row: <class 'tuple'>
> Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73',
> datetime.datetime(2023, 7, 31, 14, 22, 14, 1572))
> Type of row after rows_as_dicts: <class 'dict'>
> Total number of documents in the core 'p4a': 5000
>
>                             SELECT
>                                     ROW_NUMBER() over (order by update_at)
>                                     , id, name, update_at
>                             FROM city
>                             WHERE update_at >= '2023-07-29 00:00:00.0000'
>                             LIMIT 1000 *OFFSET 14000*
>
> Type of row: <class 'tuple'>
> Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3',
> datetime.datetime(2023, 7, 31, 18, 59, 8, 906065))
> Type of row after rows_as_dicts: <class 'dict'>
> Total number of documents in the core 'p4a': 5000
>
>                             SELECT
>                                     ROW_NUMBER() over (order by update_at)
>                                     , id, name, update_at
>                             FROM city
>                             WHERE update_at >= '2023-07-29 00:00:00.0000'
>                             LIMIT 1000 OFFSET 15000
>
>
>

Reply via email to