NB: it's not easy to build a robust ETL from scratch (btw, have you asked
Copilot or chat gpt for it? ).
I spot a few oddities in the code, but they are not critical.
>From log I see (fwiw, you still have DEBUG log enabled) that 1000 recs were
added in 17 or something secs. It makes some sense. But then, it turns out
5000 recs returned on *:*. How could it be?
Exit condition `if Not rows:\n break` is not clear to me. Why should
it work?
What happens to the main paging loop? How does it stop? Does it?
Note, looping via LIMIT OFFSET is counterproductive. It's better to select
once, and tweak the driver to pull page-by-page lazily.
Also, how many distinct ids you have behind update_at >= '2023-07-29
00:00:00.0000'?


On Fri, Dec 15, 2023 at 7:56 AM Ishan Chattopadhyaya <
ichattopadhy...@gmail.com> wrote:

> If you're able to do multithreaded indexing, it will go much faster.
>
> On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <
> sippingonesandze...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have written a  custom python program to Index which may provide a
> > better control than DIH.
> >
> > But, it is still doing at most 5000 documentation.  I have enable debug
> > logging to show after updating the log4j2.xml  I am doing a count of
> > documents after each batch of indexing.
> >
> > I would really appreciate you help to fix the Indexing only 5000
> documents.
> >
> > The solr.log is in enclosed zip.
> >
> > try:
> >     conn = psycopg2.connect(**postgres_connection)
> >     cursor = conn.cursor()
> >
> >     # Replace "your_table" and "your_columns" with your actual table and
> > columns
> >     postgres_query = """
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= %s
> >                             LIMIT %s OFFSET %s
> >                     """
> >     StartDT = '2023-07-29 00:00:00.0000'
> >
> >     batch_size = 1000  # Set your desired batch size
> >     offset = 0
> >
> >     while True:
> >         rows = fetch_data_batch(cursor, postgres_query, StartDT,
> > batch_size, offset)
> >
> >         # conver tuples to dicts, which is what solr wants
> >         if rows:
> >             print(f"Type of row: {type(rows[0])}")
> >             print(f"Example row content: {rows[0]}")
> >             # Get column names from cursor.description
> >             column_names        = [desc[0] for desc in
> cursor.description]
> >             rows_as_dicts       = [dict(zip(column_names, row)) for row
> in
> > rows]
> >
> >             print(f"Type of row after rows_as_dicts:
> {type(rows_as_dicts[0
> > ])}")
> >
> >         if not rows:
> >             break  # No more data
> >
> >         # Connect to Solr
> >         solr = Solr(solr_url, always_commit=True)
> >
> >         # Index data into Solr
> >         docs = [
> >             {
> >                 # Assuming each row is a dictionary where keys are field
> > names
> >                     "id":       str(row_dict["id"]),
> >                     "name":         row_dict["name"],
> >                     "update_at":    row_dict["update_at"].isoformat() if
> > "update_at" in rows_as_dicts else None
> >                     # Add more fields as needed
> >             }
> >             for row_dict in rows_as_dicts
> >         ]
> >
> >
> >         # Log the content of each document
> >         for doc in docs:
> >             logging.debug(f"Indexing document: {doc}")
> >
> >         # Index data into Solr and get the response
> >         response = solr.add(docs)
> >
> >         solr_core='p4a'
> >
> >         # Construct the Solr select query URL
> >         select_query_url = f"http://localhost:8983/solr/{solr_core}
> > /select?q=*:*&rows=0&wt=json"
> >
> >         # Request the select query response
> >         select_response = requests.get(select_query_url)
> >
> >         try:
> >             # Try to parse the response as JSON
> >             select_data = json.loads(select_response.text)
> >
> >             # Extract the number of documents from the response
> >             num_docs = select_data.get("response", {}).get("numFound",
> -1)
> >
> >             print(f"Total number of documents in the core '{solr_core}':
> {
> > num_docs}")
> >
> >         except json.decoder.JSONDecodeError as e:
> >             print(f"Error decoding JSON response: {e}")
> >             print(f"Response content: {select_response.text}")
> >
> >
> >         # Log the outcome of the indexing operation
> > *        logging.info <http://logging.info>("Committing batch to Solr")*
> >
> >         # Commit the changes to Solr
> > *        solr.commit()*
> >         time.sleep(1)
> >         offset += batch_size
> >
> > finally:
> >     # Close PostgreSQL connection
> >     cursor.close()
> >     conn.close()
> >
> > Example of content from Postgres:
> > Type of row: <class 'tuple'>
> > Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3',
> > datetime.datetime(2023, 7, 31, 9, 39, 13, 463165))
> > Type of row after rows_as_dicts: <class 'dict'>
> > Total number of documents in the core 'p4a': 5000
> >
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= '2023-07-29 00:00:00.0000'
> >                             LIMIT 1000 *OFFSET 13000*
> >
> > Type of row: <class 'tuple'>
> > Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73',
> > datetime.datetime(2023, 7, 31, 14, 22, 14, 1572))
> > Type of row after rows_as_dicts: <class 'dict'>
> > Total number of documents in the core 'p4a': 5000
> >
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= '2023-07-29 00:00:00.0000'
> >                             LIMIT 1000 *OFFSET 14000*
> >
> > Type of row: <class 'tuple'>
> > Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3',
> > datetime.datetime(2023, 7, 31, 18, 59, 8, 906065))
> > Type of row after rows_as_dicts: <class 'dict'>
> > Total number of documents in the core 'p4a': 5000
> >
> >                             SELECT
> >                                     ROW_NUMBER() over (order by
> update_at)
> >                                     , id, name, update_at
> >                             FROM city
> >                             WHERE update_at >= '2023-07-29 00:00:00.0000'
> >                             LIMIT 1000 OFFSET 15000
> >
> >
> >
>


-- 
Sincerely yours
Mikhail Khludnev

Reply via email to