Great questions.  Here are some of the answers.

" Exit condition `if Not rows:\n break` is not clear to me. Why should it
work? "
The exit condition is when the postgres_query fetch nothing  then "if not
row" will breaks out from the while loop and close the cursor.

" Also, how many distinct ids you have behind update_at >= '2023-07-29
00:00:00.0000'? "
it is about 15000 rows with distinct ids.

To me the log file itself seems to show all the PRE_UPDATE add(, id=<each
unique id>) for every batch of rows fetched from postgres at 1000 rows at a
time.

Could you please tell me what to look from the log that the Indexing is
committing 1000 documents at a time after executing the code
*solr.commit()?*  And do you see anything tells why it stops after 5000?




On Fri, Dec 15, 2023 at 2:43 AM Mikhail Khludnev <m...@apache.org> wrote:

> NB: it's not easy to build a robust ETL from scratch (btw, have you asked
> Copilot or chat gpt for it? ).
> I spot a few oddities in the code, but they are not critical.
> From log I see (fwiw, you still have DEBUG log enabled) that 1000 recs were
> added in 17 or something secs. It makes some sense. But then, it turns out
> 5000 recs returned on *:*. How could it be?
> Exit condition `if Not rows:\n break` is not clear to me. Why should
> it work?
> What happens to the main paging loop? How does it stop? Does it?
> Note, looping via LIMIT OFFSET is counterproductive. It's better to select
> once, and tweak the driver to pull page-by-page lazily.
> Also, how many distinct ids you have behind update_at >= '2023-07-29
> 00:00:00.0000'?
>
>
> On Fri, Dec 15, 2023 at 7:56 AM Ishan Chattopadhyaya <
> ichattopadhy...@gmail.com> wrote:
>
> > If you're able to do multithreaded indexing, it will go much faster.
> >
> > On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <
> > sippingonesandze...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have written a  custom python program to Index which may provide a
> > > better control than DIH.
> > >
> > > But, it is still doing at most 5000 documentation.  I have enable debug
> > > logging to show after updating the log4j2.xml  I am doing a count of
> > > documents after each batch of indexing.
> > >
> > > I would really appreciate you help to fix the Indexing only 5000
> > documents.
> > >
> > > The solr.log is in enclosed zip.
> > >
> > > try:
> > >     conn = psycopg2.connect(**postgres_connection)
> > >     cursor = conn.cursor()
> > >
> > >     # Replace "your_table" and "your_columns" with your actual table
> and
> > > columns
> > >     postgres_query = """
> > >                             SELECT
> > >                                     ROW_NUMBER() over (order by
> > update_at)
> > >                                     , id, name, update_at
> > >                             FROM city
> > >                             WHERE update_at >= %s
> > >                             LIMIT %s OFFSET %s
> > >                     """
> > >     StartDT = '2023-07-29 00:00:00.0000'
> > >
> > >     batch_size = 1000  # Set your desired batch size
> > >     offset = 0
> > >
> > >     while True:
> > >         rows = fetch_data_batch(cursor, postgres_query, StartDT,
> > > batch_size, offset)
> > >
> > >         # conver tuples to dicts, which is what solr wants
> > >         if rows:
> > >             print(f"Type of row: {type(rows[0])}")
> > >             print(f"Example row content: {rows[0]}")
> > >             # Get column names from cursor.description
> > >             column_names        = [desc[0] for desc in
> > cursor.description]
> > >             rows_as_dicts       = [dict(zip(column_names, row)) for row
> > in
> > > rows]
> > >
> > >             print(f"Type of row after rows_as_dicts:
> > {type(rows_as_dicts[0
> > > ])}")
> > >
> > >         if not rows:
> > >             break  # No more data
> > >
> > >         # Connect to Solr
> > >         solr = Solr(solr_url, always_commit=True)
> > >
> > >         # Index data into Solr
> > >         docs = [
> > >             {
> > >                 # Assuming each row is a dictionary where keys are
> field
> > > names
> > >                     "id":       str(row_dict["id"]),
> > >                     "name":         row_dict["name"],
> > >                     "update_at":    row_dict["update_at"].isoformat()
> if
> > > "update_at" in rows_as_dicts else None
> > >                     # Add more fields as needed
> > >             }
> > >             for row_dict in rows_as_dicts
> > >         ]
> > >
> > >
> > >         # Log the content of each document
> > >         for doc in docs:
> > >             logging.debug(f"Indexing document: {doc}")
> > >
> > >         # Index data into Solr and get the response
> > >         response = solr.add(docs)
> > >
> > >         solr_core='p4a'
> > >
> > >         # Construct the Solr select query URL
> > >         select_query_url = f"http://localhost:8983/solr/{solr_core}
> > > /select?q=*:*&rows=0&wt=json"
> > >
> > >         # Request the select query response
> > >         select_response = requests.get(select_query_url)
> > >
> > >         try:
> > >             # Try to parse the response as JSON
> > >             select_data = json.loads(select_response.text)
> > >
> > >             # Extract the number of documents from the response
> > >             num_docs = select_data.get("response", {}).get("numFound",
> > -1)
> > >
> > >             print(f"Total number of documents in the core
> '{solr_core}':
> > {
> > > num_docs}")
> > >
> > >         except json.decoder.JSONDecodeError as e:
> > >             print(f"Error decoding JSON response: {e}")
> > >             print(f"Response content: {select_response.text}")
> > >
> > >
> > >         # Log the outcome of the indexing operation
> > > *        logging.info <http://logging.info>("Committing batch to
> Solr")*
> > >
> > >         # Commit the changes to Solr
> > > *        solr.commit()*
> > >         time.sleep(1)
> > >         offset += batch_size
> > >
> > > finally:
> > >     # Close PostgreSQL connection
> > >     cursor.close()
> > >     conn.close()
> > >
> > > Example of content from Postgres:
> > > Type of row: <class 'tuple'>
> > > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3',
> > > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165))
> > > Type of row after rows_as_dicts: <class 'dict'>
> > > Total number of documents in the core 'p4a': 5000
> > >
> > >                             SELECT
> > >                                     ROW_NUMBER() over (order by
> > update_at)
> > >                                     , id, name, update_at
> > >                             FROM city
> > >                             WHERE update_at >= '2023-07-29
> 00:00:00.0000'
> > >                             LIMIT 1000 *OFFSET 13000*
> > >
> > > Type of row: <class 'tuple'>
> > > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73',
> > > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572))
> > > Type of row after rows_as_dicts: <class 'dict'>
> > > Total number of documents in the core 'p4a': 5000
> > >
> > >                             SELECT
> > >                                     ROW_NUMBER() over (order by
> > update_at)
> > >                                     , id, name, update_at
> > >                             FROM city
> > >                             WHERE update_at >= '2023-07-29
> 00:00:00.0000'
> > >                             LIMIT 1000 *OFFSET 14000*
> > >
> > > Type of row: <class 'tuple'>
> > > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3',
> > > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065))
> > > Type of row after rows_as_dicts: <class 'dict'>
> > > Total number of documents in the core 'p4a': 5000
> > >
> > >                             SELECT
> > >                                     ROW_NUMBER() over (order by
> > update_at)
> > >                                     , id, name, update_at
> > >                             FROM city
> > >                             WHERE update_at >= '2023-07-29
> 00:00:00.0000'
> > >                             LIMIT 1000 OFFSET 15000
> > >
> > >
> > >
> >
>
>
> --
> Sincerely yours
> Mikhail Khludnev
>

Reply via email to