Great questions. Here are some of the answers. " Exit condition `if Not rows:\n break` is not clear to me. Why should it work? " The exit condition is when the postgres_query fetch nothing then "if not row" will breaks out from the while loop and close the cursor.
" Also, how many distinct ids you have behind update_at >= '2023-07-29 00:00:00.0000'? " it is about 15000 rows with distinct ids. To me the log file itself seems to show all the PRE_UPDATE add(, id=<each unique id>) for every batch of rows fetched from postgres at 1000 rows at a time. Could you please tell me what to look from the log that the Indexing is committing 1000 documents at a time after executing the code *solr.commit()?* And do you see anything tells why it stops after 5000? On Fri, Dec 15, 2023 at 2:43 AM Mikhail Khludnev <m...@apache.org> wrote: > NB: it's not easy to build a robust ETL from scratch (btw, have you asked > Copilot or chat gpt for it? ). > I spot a few oddities in the code, but they are not critical. > From log I see (fwiw, you still have DEBUG log enabled) that 1000 recs were > added in 17 or something secs. It makes some sense. But then, it turns out > 5000 recs returned on *:*. How could it be? > Exit condition `if Not rows:\n break` is not clear to me. Why should > it work? > What happens to the main paging loop? How does it stop? Does it? > Note, looping via LIMIT OFFSET is counterproductive. It's better to select > once, and tweak the driver to pull page-by-page lazily. > Also, how many distinct ids you have behind update_at >= '2023-07-29 > 00:00:00.0000'? > > > On Fri, Dec 15, 2023 at 7:56 AM Ishan Chattopadhyaya < > ichattopadhy...@gmail.com> wrote: > > > If you're able to do multithreaded indexing, it will go much faster. > > > > On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, < > > sippingonesandze...@gmail.com> > > wrote: > > > > > Hi, > > > > > > I have written a custom python program to Index which may provide a > > > better control than DIH. > > > > > > But, it is still doing at most 5000 documentation. I have enable debug > > > logging to show after updating the log4j2.xml I am doing a count of > > > documents after each batch of indexing. > > > > > > I would really appreciate you help to fix the Indexing only 5000 > > documents. > > > > > > The solr.log is in enclosed zip. > > > > > > try: > > > conn = psycopg2.connect(**postgres_connection) > > > cursor = conn.cursor() > > > > > > # Replace "your_table" and "your_columns" with your actual table > and > > > columns > > > postgres_query = """ > > > SELECT > > > ROW_NUMBER() over (order by > > update_at) > > > , id, name, update_at > > > FROM city > > > WHERE update_at >= %s > > > LIMIT %s OFFSET %s > > > """ > > > StartDT = '2023-07-29 00:00:00.0000' > > > > > > batch_size = 1000 # Set your desired batch size > > > offset = 0 > > > > > > while True: > > > rows = fetch_data_batch(cursor, postgres_query, StartDT, > > > batch_size, offset) > > > > > > # conver tuples to dicts, which is what solr wants > > > if rows: > > > print(f"Type of row: {type(rows[0])}") > > > print(f"Example row content: {rows[0]}") > > > # Get column names from cursor.description > > > column_names = [desc[0] for desc in > > cursor.description] > > > rows_as_dicts = [dict(zip(column_names, row)) for row > > in > > > rows] > > > > > > print(f"Type of row after rows_as_dicts: > > {type(rows_as_dicts[0 > > > ])}") > > > > > > if not rows: > > > break # No more data > > > > > > # Connect to Solr > > > solr = Solr(solr_url, always_commit=True) > > > > > > # Index data into Solr > > > docs = [ > > > { > > > # Assuming each row is a dictionary where keys are > field > > > names > > > "id": str(row_dict["id"]), > > > "name": row_dict["name"], > > > "update_at": row_dict["update_at"].isoformat() > if > > > "update_at" in rows_as_dicts else None > > > # Add more fields as needed > > > } > > > for row_dict in rows_as_dicts > > > ] > > > > > > > > > # Log the content of each document > > > for doc in docs: > > > logging.debug(f"Indexing document: {doc}") > > > > > > # Index data into Solr and get the response > > > response = solr.add(docs) > > > > > > solr_core='p4a' > > > > > > # Construct the Solr select query URL > > > select_query_url = f"http://localhost:8983/solr/{solr_core} > > > /select?q=*:*&rows=0&wt=json" > > > > > > # Request the select query response > > > select_response = requests.get(select_query_url) > > > > > > try: > > > # Try to parse the response as JSON > > > select_data = json.loads(select_response.text) > > > > > > # Extract the number of documents from the response > > > num_docs = select_data.get("response", {}).get("numFound", > > -1) > > > > > > print(f"Total number of documents in the core > '{solr_core}': > > { > > > num_docs}") > > > > > > except json.decoder.JSONDecodeError as e: > > > print(f"Error decoding JSON response: {e}") > > > print(f"Response content: {select_response.text}") > > > > > > > > > # Log the outcome of the indexing operation > > > * logging.info <http://logging.info>("Committing batch to > Solr")* > > > > > > # Commit the changes to Solr > > > * solr.commit()* > > > time.sleep(1) > > > offset += batch_size > > > > > > finally: > > > # Close PostgreSQL connection > > > cursor.close() > > > conn.close() > > > > > > Example of content from Postgres: > > > Type of row: <class 'tuple'> > > > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3', > > > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165)) > > > Type of row after rows_as_dicts: <class 'dict'> > > > Total number of documents in the core 'p4a': 5000 > > > > > > SELECT > > > ROW_NUMBER() over (order by > > update_at) > > > , id, name, update_at > > > FROM city > > > WHERE update_at >= '2023-07-29 > 00:00:00.0000' > > > LIMIT 1000 *OFFSET 13000* > > > > > > Type of row: <class 'tuple'> > > > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73', > > > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572)) > > > Type of row after rows_as_dicts: <class 'dict'> > > > Total number of documents in the core 'p4a': 5000 > > > > > > SELECT > > > ROW_NUMBER() over (order by > > update_at) > > > , id, name, update_at > > > FROM city > > > WHERE update_at >= '2023-07-29 > 00:00:00.0000' > > > LIMIT 1000 *OFFSET 14000* > > > > > > Type of row: <class 'tuple'> > > > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3', > > > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065)) > > > Type of row after rows_as_dicts: <class 'dict'> > > > Total number of documents in the core 'p4a': 5000 > > > > > > SELECT > > > ROW_NUMBER() over (order by > > update_at) > > > , id, name, update_at > > > FROM city > > > WHERE update_at >= '2023-07-29 > 00:00:00.0000' > > > LIMIT 1000 OFFSET 15000 > > > > > > > > > > > > > > -- > Sincerely yours > Mikhail Khludnev >