If you're able to do multithreaded indexing, it will go much faster. On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <sippingonesandze...@gmail.com> wrote:
> Hi, > > I have written a custom python program to Index which may provide a > better control than DIH. > > But, it is still doing at most 5000 documentation. I have enable debug > logging to show after updating the log4j2.xml I am doing a count of > documents after each batch of indexing. > > I would really appreciate you help to fix the Indexing only 5000 documents. > > The solr.log is in enclosed zip. > > try: > conn = psycopg2.connect(**postgres_connection) > cursor = conn.cursor() > > # Replace "your_table" and "your_columns" with your actual table and > columns > postgres_query = """ > SELECT > ROW_NUMBER() over (order by update_at) > , id, name, update_at > FROM city > WHERE update_at >= %s > LIMIT %s OFFSET %s > """ > StartDT = '2023-07-29 00:00:00.0000' > > batch_size = 1000 # Set your desired batch size > offset = 0 > > while True: > rows = fetch_data_batch(cursor, postgres_query, StartDT, > batch_size, offset) > > # conver tuples to dicts, which is what solr wants > if rows: > print(f"Type of row: {type(rows[0])}") > print(f"Example row content: {rows[0]}") > # Get column names from cursor.description > column_names = [desc[0] for desc in cursor.description] > rows_as_dicts = [dict(zip(column_names, row)) for row in > rows] > > print(f"Type of row after rows_as_dicts: {type(rows_as_dicts[0 > ])}") > > if not rows: > break # No more data > > # Connect to Solr > solr = Solr(solr_url, always_commit=True) > > # Index data into Solr > docs = [ > { > # Assuming each row is a dictionary where keys are field > names > "id": str(row_dict["id"]), > "name": row_dict["name"], > "update_at": row_dict["update_at"].isoformat() if > "update_at" in rows_as_dicts else None > # Add more fields as needed > } > for row_dict in rows_as_dicts > ] > > > # Log the content of each document > for doc in docs: > logging.debug(f"Indexing document: {doc}") > > # Index data into Solr and get the response > response = solr.add(docs) > > solr_core='p4a' > > # Construct the Solr select query URL > select_query_url = f"http://localhost:8983/solr/{solr_core} > /select?q=*:*&rows=0&wt=json" > > # Request the select query response > select_response = requests.get(select_query_url) > > try: > # Try to parse the response as JSON > select_data = json.loads(select_response.text) > > # Extract the number of documents from the response > num_docs = select_data.get("response", {}).get("numFound", -1) > > print(f"Total number of documents in the core '{solr_core}': { > num_docs}") > > except json.decoder.JSONDecodeError as e: > print(f"Error decoding JSON response: {e}") > print(f"Response content: {select_response.text}") > > > # Log the outcome of the indexing operation > * logging.info <http://logging.info>("Committing batch to Solr")* > > # Commit the changes to Solr > * solr.commit()* > time.sleep(1) > offset += batch_size > > finally: > # Close PostgreSQL connection > cursor.close() > conn.close() > > Example of content from Postgres: > Type of row: <class 'tuple'> > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3', > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165)) > Type of row after rows_as_dicts: <class 'dict'> > Total number of documents in the core 'p4a': 5000 > > SELECT > ROW_NUMBER() over (order by update_at) > , id, name, update_at > FROM city > WHERE update_at >= '2023-07-29 00:00:00.0000' > LIMIT 1000 *OFFSET 13000* > > Type of row: <class 'tuple'> > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73', > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572)) > Type of row after rows_as_dicts: <class 'dict'> > Total number of documents in the core 'p4a': 5000 > > SELECT > ROW_NUMBER() over (order by update_at) > , id, name, update_at > FROM city > WHERE update_at >= '2023-07-29 00:00:00.0000' > LIMIT 1000 *OFFSET 14000* > > Type of row: <class 'tuple'> > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3', > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065)) > Type of row after rows_as_dicts: <class 'dict'> > Total number of documents in the core 'p4a': 5000 > > SELECT > ROW_NUMBER() over (order by update_at) > , id, name, update_at > FROM city > WHERE update_at >= '2023-07-29 00:00:00.0000' > LIMIT 1000 OFFSET 15000 > > >