Oh, Mikhail. Thanks for your questions. There are just 5000 distinct ids.
I'll check during the office hours with the source side to fix their problem and try again. Thank you and you are a very wise man. On Fri, Dec 15, 2023 at 4:32 AM Vince McMahon <sippingonesandze...@gmail.com> wrote: > Great questions. Here are some of the answers. > > " Exit condition `if Not rows:\n break` is not clear to me. Why should it > work? " > The exit condition is when the postgres_query fetch nothing then "if not > row" will breaks out from the while loop and close the cursor. > > " Also, how many distinct ids you have behind update_at >= '2023-07-29 > 00:00:00.0000'? " > it is about 15000 rows with distinct ids. > > To me the log file itself seems to show all the PRE_UPDATE add(, id=<each > unique id>) for every batch of rows fetched from postgres at 1000 rows at a > time. > > Could you please tell me what to look from the log that the Indexing is > committing 1000 documents at a time after executing the code > *solr.commit()?* And do you see anything tells why it stops after 5000? > > > > > On Fri, Dec 15, 2023 at 2:43 AM Mikhail Khludnev <m...@apache.org> wrote: > >> NB: it's not easy to build a robust ETL from scratch (btw, have you asked >> Copilot or chat gpt for it? ). >> I spot a few oddities in the code, but they are not critical. >> From log I see (fwiw, you still have DEBUG log enabled) that 1000 recs >> were >> added in 17 or something secs. It makes some sense. But then, it turns out >> 5000 recs returned on *:*. How could it be? >> Exit condition `if Not rows:\n break` is not clear to me. Why should >> it work? >> What happens to the main paging loop? How does it stop? Does it? >> Note, looping via LIMIT OFFSET is counterproductive. It's better to select >> once, and tweak the driver to pull page-by-page lazily. >> Also, how many distinct ids you have behind update_at >= '2023-07-29 >> 00:00:00.0000'? >> >> >> On Fri, Dec 15, 2023 at 7:56 AM Ishan Chattopadhyaya < >> ichattopadhy...@gmail.com> wrote: >> >> > If you're able to do multithreaded indexing, it will go much faster. >> > >> > On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, < >> > sippingonesandze...@gmail.com> >> > wrote: >> > >> > > Hi, >> > > >> > > I have written a custom python program to Index which may provide a >> > > better control than DIH. >> > > >> > > But, it is still doing at most 5000 documentation. I have enable >> debug >> > > logging to show after updating the log4j2.xml I am doing a count of >> > > documents after each batch of indexing. >> > > >> > > I would really appreciate you help to fix the Indexing only 5000 >> > documents. >> > > >> > > The solr.log is in enclosed zip. >> > > >> > > try: >> > > conn = psycopg2.connect(**postgres_connection) >> > > cursor = conn.cursor() >> > > >> > > # Replace "your_table" and "your_columns" with your actual table >> and >> > > columns >> > > postgres_query = """ >> > > SELECT >> > > ROW_NUMBER() over (order by >> > update_at) >> > > , id, name, update_at >> > > FROM city >> > > WHERE update_at >= %s >> > > LIMIT %s OFFSET %s >> > > """ >> > > StartDT = '2023-07-29 00:00:00.0000' >> > > >> > > batch_size = 1000 # Set your desired batch size >> > > offset = 0 >> > > >> > > while True: >> > > rows = fetch_data_batch(cursor, postgres_query, StartDT, >> > > batch_size, offset) >> > > >> > > # conver tuples to dicts, which is what solr wants >> > > if rows: >> > > print(f"Type of row: {type(rows[0])}") >> > > print(f"Example row content: {rows[0]}") >> > > # Get column names from cursor.description >> > > column_names = [desc[0] for desc in >> > cursor.description] >> > > rows_as_dicts = [dict(zip(column_names, row)) for >> row >> > in >> > > rows] >> > > >> > > print(f"Type of row after rows_as_dicts: >> > {type(rows_as_dicts[0 >> > > ])}") >> > > >> > > if not rows: >> > > break # No more data >> > > >> > > # Connect to Solr >> > > solr = Solr(solr_url, always_commit=True) >> > > >> > > # Index data into Solr >> > > docs = [ >> > > { >> > > # Assuming each row is a dictionary where keys are >> field >> > > names >> > > "id": str(row_dict["id"]), >> > > "name": row_dict["name"], >> > > "update_at": row_dict["update_at"].isoformat() >> if >> > > "update_at" in rows_as_dicts else None >> > > # Add more fields as needed >> > > } >> > > for row_dict in rows_as_dicts >> > > ] >> > > >> > > >> > > # Log the content of each document >> > > for doc in docs: >> > > logging.debug(f"Indexing document: {doc}") >> > > >> > > # Index data into Solr and get the response >> > > response = solr.add(docs) >> > > >> > > solr_core='p4a' >> > > >> > > # Construct the Solr select query URL >> > > select_query_url = f"http://localhost:8983/solr/{solr_core} >> > > /select?q=*:*&rows=0&wt=json" >> > > >> > > # Request the select query response >> > > select_response = requests.get(select_query_url) >> > > >> > > try: >> > > # Try to parse the response as JSON >> > > select_data = json.loads(select_response.text) >> > > >> > > # Extract the number of documents from the response >> > > num_docs = select_data.get("response", {}).get("numFound", >> > -1) >> > > >> > > print(f"Total number of documents in the core >> '{solr_core}': >> > { >> > > num_docs}") >> > > >> > > except json.decoder.JSONDecodeError as e: >> > > print(f"Error decoding JSON response: {e}") >> > > print(f"Response content: {select_response.text}") >> > > >> > > >> > > # Log the outcome of the indexing operation >> > > * logging.info <http://logging.info>("Committing batch to >> Solr")* >> > > >> > > # Commit the changes to Solr >> > > * solr.commit()* >> > > time.sleep(1) >> > > offset += batch_size >> > > >> > > finally: >> > > # Close PostgreSQL connection >> > > cursor.close() >> > > conn.close() >> > > >> > > Example of content from Postgres: >> > > Type of row: <class 'tuple'> >> > > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3', >> > > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165)) >> > > Type of row after rows_as_dicts: <class 'dict'> >> > > Total number of documents in the core 'p4a': 5000 >> > > >> > > SELECT >> > > ROW_NUMBER() over (order by >> > update_at) >> > > , id, name, update_at >> > > FROM city >> > > WHERE update_at >= '2023-07-29 >> 00:00:00.0000' >> > > LIMIT 1000 *OFFSET 13000* >> > > >> > > Type of row: <class 'tuple'> >> > > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73', >> > > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572)) >> > > Type of row after rows_as_dicts: <class 'dict'> >> > > Total number of documents in the core 'p4a': 5000 >> > > >> > > SELECT >> > > ROW_NUMBER() over (order by >> > update_at) >> > > , id, name, update_at >> > > FROM city >> > > WHERE update_at >= '2023-07-29 >> 00:00:00.0000' >> > > LIMIT 1000 *OFFSET 14000* >> > > >> > > Type of row: <class 'tuple'> >> > > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3', >> > > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065)) >> > > Type of row after rows_as_dicts: <class 'dict'> >> > > Total number of documents in the core 'p4a': 5000 >> > > >> > > SELECT >> > > ROW_NUMBER() over (order by >> > update_at) >> > > , id, name, update_at >> > > FROM city >> > > WHERE update_at >= '2023-07-29 >> 00:00:00.0000' >> > > LIMIT 1000 OFFSET 15000 >> > > >> > > >> > > >> > >> >> >> -- >> Sincerely yours >> Mikhail Khludnev >> >