Ishan, you are right. Doing multithreaded Indexing is going much faster. I found out after the remote machine became unresponsive very quickly ; it crashed. lol.
On Fri, Dec 15, 2023 at 4:35 AM Vince McMahon <sippingonesandze...@gmail.com> wrote: > Ishan, > > How do you multi-thread? > > Secondly, could you please tell me what to look from the log that the > Indexing is committing 1000 documents at a time after executing the code > *solr.commit()?* Do you see anything that tells why it stops after 5000 > rows, while there are about 150000 rows fetched? How to have it continue > after 5000? > > > On Thu, Dec 14, 2023 at 11:54 PM Ishan Chattopadhyaya < > ichattopadhy...@gmail.com> wrote: > >> If you're able to do multithreaded indexing, it will go much faster. >> >> On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, < >> sippingonesandze...@gmail.com> >> wrote: >> >> > Hi, >> > >> > I have written a custom python program to Index which may provide a >> > better control than DIH. >> > >> > But, it is still doing at most 5000 documentation. I have enable debug >> > logging to show after updating the log4j2.xml I am doing a count of >> > documents after each batch of indexing. >> > >> > I would really appreciate you help to fix the Indexing only 5000 >> documents. >> > >> > The solr.log is in enclosed zip. >> > >> > try: >> > conn = psycopg2.connect(**postgres_connection) >> > cursor = conn.cursor() >> > >> > # Replace "your_table" and "your_columns" with your actual table and >> > columns >> > postgres_query = """ >> > SELECT >> > ROW_NUMBER() over (order by >> update_at) >> > , id, name, update_at >> > FROM city >> > WHERE update_at >= %s >> > LIMIT %s OFFSET %s >> > """ >> > StartDT = '2023-07-29 00:00:00.0000' >> > >> > batch_size = 1000 # Set your desired batch size >> > offset = 0 >> > >> > while True: >> > rows = fetch_data_batch(cursor, postgres_query, StartDT, >> > batch_size, offset) >> > >> > # conver tuples to dicts, which is what solr wants >> > if rows: >> > print(f"Type of row: {type(rows[0])}") >> > print(f"Example row content: {rows[0]}") >> > # Get column names from cursor.description >> > column_names = [desc[0] for desc in >> cursor.description] >> > rows_as_dicts = [dict(zip(column_names, row)) for row >> in >> > rows] >> > >> > print(f"Type of row after rows_as_dicts: >> {type(rows_as_dicts[0 >> > ])}") >> > >> > if not rows: >> > break # No more data >> > >> > # Connect to Solr >> > solr = Solr(solr_url, always_commit=True) >> > >> > # Index data into Solr >> > docs = [ >> > { >> > # Assuming each row is a dictionary where keys are field >> > names >> > "id": str(row_dict["id"]), >> > "name": row_dict["name"], >> > "update_at": row_dict["update_at"].isoformat() if >> > "update_at" in rows_as_dicts else None >> > # Add more fields as needed >> > } >> > for row_dict in rows_as_dicts >> > ] >> > >> > >> > # Log the content of each document >> > for doc in docs: >> > logging.debug(f"Indexing document: {doc}") >> > >> > # Index data into Solr and get the response >> > response = solr.add(docs) >> > >> > solr_core='p4a' >> > >> > # Construct the Solr select query URL >> > select_query_url = f"http://localhost:8983/solr/{solr_core} >> > /select?q=*:*&rows=0&wt=json" >> > >> > # Request the select query response >> > select_response = requests.get(select_query_url) >> > >> > try: >> > # Try to parse the response as JSON >> > select_data = json.loads(select_response.text) >> > >> > # Extract the number of documents from the response >> > num_docs = select_data.get("response", {}).get("numFound", >> -1) >> > >> > print(f"Total number of documents in the core >> '{solr_core}': { >> > num_docs}") >> > >> > except json.decoder.JSONDecodeError as e: >> > print(f"Error decoding JSON response: {e}") >> > print(f"Response content: {select_response.text}") >> > >> > >> > # Log the outcome of the indexing operation >> > * logging.info <http://logging.info>("Committing batch to >> Solr")* >> > >> > # Commit the changes to Solr >> > * solr.commit()* >> > time.sleep(1) >> > offset += batch_size >> > >> > finally: >> > # Close PostgreSQL connection >> > cursor.close() >> > conn.close() >> > >> > Example of content from Postgres: >> > Type of row: <class 'tuple'> >> > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3', >> > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165)) >> > Type of row after rows_as_dicts: <class 'dict'> >> > Total number of documents in the core 'p4a': 5000 >> > >> > SELECT >> > ROW_NUMBER() over (order by >> update_at) >> > , id, name, update_at >> > FROM city >> > WHERE update_at >= '2023-07-29 >> 00:00:00.0000' >> > LIMIT 1000 *OFFSET 13000* >> > >> > Type of row: <class 'tuple'> >> > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73', >> > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572)) >> > Type of row after rows_as_dicts: <class 'dict'> >> > Total number of documents in the core 'p4a': 5000 >> > >> > SELECT >> > ROW_NUMBER() over (order by >> update_at) >> > , id, name, update_at >> > FROM city >> > WHERE update_at >= '2023-07-29 >> 00:00:00.0000' >> > LIMIT 1000 *OFFSET 14000* >> > >> > Type of row: <class 'tuple'> >> > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3', >> > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065)) >> > Type of row after rows_as_dicts: <class 'dict'> >> > Total number of documents in the core 'p4a': 5000 >> > >> > SELECT >> > ROW_NUMBER() over (order by >> update_at) >> > , id, name, update_at >> > FROM city >> > WHERE update_at >= '2023-07-29 >> 00:00:00.0000' >> > LIMIT 1000 OFFSET 15000 >> > >> > >> > >> >