This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ddb365  Duplicate Connection: Added logic to query if a connection id 
exists before creating one (#18161)
3ddb365 is described below

commit 3ddb36578c5020408f89f5532b21dc0c38e739fb
Author: Kanthi <[email protected]>
AuthorDate: Sat Oct 9 10:06:11 2021 -0400

    Duplicate Connection: Added logic to query if a connection id exists before 
creating one (#18161)
---
 airflow/www/views.py                     | 65 +++++++++++++++++++++-----------
 tests/www/views/test_views_connection.py | 32 ++++++++++++++++
 2 files changed, 75 insertions(+), 22 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2dca5fd..35cc590 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3413,34 +3413,55 @@ class ConnectionModelView(AirflowModelView):
         for selected_conn in connections:
             new_conn_id = selected_conn.conn_id
             match = re.search(r"_copy(\d+)$", selected_conn.conn_id)
+
+            base_conn_id = selected_conn.conn_id
             if match:
-                conn_id_prefix = selected_conn.conn_id[: match.start()]
-                new_conn_id = f"{conn_id_prefix}_copy{int(match.group(1)) + 1}"
-            else:
-                new_conn_id += '_copy1'
-
-            dup_conn = Connection(
-                new_conn_id,
-                selected_conn.conn_type,
-                selected_conn.description,
-                selected_conn.host,
-                selected_conn.login,
-                selected_conn.password,
-                selected_conn.schema,
-                selected_conn.port,
-                selected_conn.extra,
-            )
+                base_conn_id = base_conn_id.split('_copy')[0]
+
+            potential_connection_ids = [f"{base_conn_id}_copy{i}" for i in 
range(1, 11)]
 
+            query = 
session.query(Connection.conn_id).filter(Connection.conn_id.in_(potential_connection_ids))
+
+            found_conn_id_set = {conn_id for conn_id, in query}
+
+            possible_conn_id_iter = (
+                connection_id
+                for connection_id in potential_connection_ids
+                if connection_id not in found_conn_id_set
+            )
             try:
-                session.add(dup_conn)
-                session.commit()
-                flash(f"Connection {new_conn_id} added successfully.", 
"success")
-            except IntegrityError:
+                new_conn_id = next(possible_conn_id_iter)
+            except StopIteration:
                 flash(
-                    f"Connection {new_conn_id} can't be added. Integrity 
error, probably unique constraint.",
+                    f"Connection {new_conn_id} can't be added because it 
already exists, "
+                    f"Please rename the existing connections",
                     "warning",
                 )
-                session.rollback()
+            else:
+
+                dup_conn = Connection(
+                    new_conn_id,
+                    selected_conn.conn_type,
+                    selected_conn.description,
+                    selected_conn.host,
+                    selected_conn.login,
+                    selected_conn.password,
+                    selected_conn.schema,
+                    selected_conn.port,
+                    selected_conn.extra,
+                )
+
+                try:
+                    session.add(dup_conn)
+                    session.commit()
+                    flash(f"Connection {new_conn_id} added successfully.", 
"success")
+                except IntegrityError:
+                    flash(
+                        f"Connection {new_conn_id} can't be added. Integrity 
error, "
+                        f"probably unique constraint.",
+                        "warning",
+                    )
+                    session.rollback()
 
         self.update_redirect()
         return redirect(self.get_redirect())
diff --git a/tests/www/views/test_views_connection.py 
b/tests/www/views/test_views_connection.py
index 249bf2a..e729856 100644
--- a/tests/www/views/test_views_connection.py
+++ b/tests/www/views/test_views_connection.py
@@ -148,3 +148,35 @@ def test_duplicate_connection(admin_client):
     response = {conn[0] for conn in session.query(Connection.conn_id).all()}
     assert resp.status_code == 200
     assert expected_result == response
+
+
+def test_duplicate_connection_error(admin_client):
+    """Test Duplicate multiple connection with suffix
+    when there are already 10 copies, no new copy
+    should be created"""
+
+    connection_ids = [f'test_duplicate_postgres_connection_copy{i}' for i in 
range(1, 11)]
+    connections = [
+        Connection(
+            conn_id=connection_id,
+            conn_type='FTP',
+            description='Postgres',
+            host='localhost',
+            schema='airflow',
+            port=3306,
+        )
+        for connection_id in connection_ids
+    ]
+
+    with create_session() as session:
+        session.query(Connection).delete()
+        session.add_all(connections)
+
+    data = {"action": "mulduplicate", "rowid": [connections[0].id]}
+    resp = admin_client.post('/connection/action_post', data=data, 
follow_redirects=True)
+
+    expected_result = {f'test_duplicate_postgres_connection_copy{i}' for i in 
range(1, 11)}
+
+    assert resp.status_code == 200
+    response = {conn[0] for conn in session.query(Connection.conn_id).all()}
+    assert expected_result == response

Reply via email to