[
https://issues.apache.org/jira/browse/AIRFLOW-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690508#comment-16690508
]
ASF GitHub Bot commented on AIRFLOW-3332:
-----------------------------------------
kaxil closed pull request #4179: [AIRFLOW-3332] Add insert_all to allow
inserting rows into BigQuery table
URL: https://github.com/apache/incubator-airflow/pull/4179
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/hooks/bigquery_hook.py
b/airflow/contrib/hooks/bigquery_hook.py
index d300dbe6b7..0937299d91 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -1515,6 +1515,79 @@ def get_datasets_list(self, project_id=None):
return datasets_list
+ def insert_all(self, project_id, dataset_id, table_id,
+ rows, ignore_unknown_values=False,
+ skip_invalid_rows=False, fail_on_error=False):
+ """
+ Method to stream data into BigQuery one record at a time without
needing
+ to run a load job
+
+ .. seealso::
+ For more information, see:
+
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
+
+ :param project_id: The name of the project where we have the table
+ :type project_id: str
+ :param dataset_id: The name of the dataset where we have the table
+ :type dataset_id: str
+ :param table_id: The name of the table
+ :type table_id: str
+ :param rows: the rows to insert
+ :type rows: list
+
+ **Example or rows**:
+ rows=[{"json": {"a_key": "a_value_0"}}, {"json": {"a_key":
"a_value_1"}}]
+
+ :param ignore_unknown_values: [Optional] Accept rows that contain
values
+ that do not match the schema. The unknown values are ignored.
+ The default value is false, which treats unknown values as errors.
+ :type ignore_unknown_values: bool
+ :param skip_invalid_rows: [Optional] Insert all valid rows of a
request,
+ even if invalid rows exist. The default value is false, which
causes
+ the entire request to fail if any invalid rows exist.
+ :type skip_invalid_rows: bool
+ :param fail_on_error: [Optional] Force the task to fail if any errors
occur.
+ The default value is false, which indicates the task should not
fail
+ even if any insertion errors occur.
+ :type fail_on_error: bool
+ """
+
+ dataset_project_id = project_id if project_id else self.project_id
+
+ body = {
+ "rows": rows,
+ "ignoreUnknownValues": ignore_unknown_values,
+ "kind": "bigquery#tableDataInsertAllRequest",
+ "skipInvalidRows": skip_invalid_rows,
+ }
+
+ try:
+ self.log.info('Inserting {} row(s) into Table {}:{}.{}'.format(
+ len(rows), dataset_project_id,
+ dataset_id, table_id))
+
+ resp = self.service.tabledata().insertAll(
+ projectId=dataset_project_id, datasetId=dataset_id,
+ tableId=table_id, body=body
+ ).execute()
+
+ if 'insertErrors' not in resp:
+ self.log.info('All row(s) inserted successfully:
{}:{}.{}'.format(
+ dataset_project_id, dataset_id, table_id))
+ else:
+ error_msg = '{} insert error(s) occured: {}:{}.{}. Details:
{}'.format(
+ len(resp['insertErrors']),
+ dataset_project_id, dataset_id, table_id,
resp['insertErrors'])
+ if fail_on_error:
+ raise AirflowException(
+ 'BigQuery job failed. Error was: {}'.format(error_msg)
+ )
+ self.log.info(error_msg)
+ except HttpError as err:
+ raise AirflowException(
+ 'BigQuery job failed. Error was: {}'.format(err.content)
+ )
+
class BigQueryCursor(BigQueryBaseCursor):
"""
diff --git a/tests/contrib/hooks/test_bigquery_hook.py
b/tests/contrib/hooks/test_bigquery_hook.py
index 8f350ff2ee..01ce1d0398 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -294,6 +294,56 @@ def test_duplication_check(self):
self.assertIsNone(_api_resource_configs_duplication_check(
"key_one", key_one, {"key_one": True}))
+ @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
+ def test_insert_all_succeed(self, run_with_config):
+ project_id = 'bq-project'
+ dataset_id = 'bq_dataset'
+ table_id = 'bq_table'
+ rows = [
+ {"json": {"a_key": "a_value_0"}}
+ ]
+ body = {
+ "rows": rows,
+ "ignoreUnknownValues": False,
+ "kind": "bigquery#tableDataInsertAllRequest",
+ "skipInvalidRows": False,
+ }
+
+ mock_service = mock.Mock()
+ method = (mock_service.tabledata.return_value.insertAll)
+ method.return_value.execute.return_value = {
+ "kind": "bigquery#tableDataInsertAllResponse"
+ }
+ cursor = hook.BigQueryBaseCursor(mock_service, 'project_id')
+ cursor.insert_all(project_id, dataset_id, table_id, rows)
+ method.assert_called_with(projectId=project_id, datasetId=dataset_id,
+ tableId=table_id, body=body)
+
+ @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
+ def test_insert_all_fail(self, run_with_config):
+ project_id = 'bq-project'
+ dataset_id = 'bq_dataset'
+ table_id = 'bq_table'
+ rows = [
+ {"json": {"a_key": "a_value_0"}}
+ ]
+
+ mock_service = mock.Mock()
+ method = (mock_service.tabledata.return_value.insertAll)
+ method.return_value.execute.return_value = {
+ "kind": "bigquery#tableDataInsertAllResponse",
+ "insertErrors": [
+ {
+ "index": 1,
+ "errors": []
+ }
+ ]
+ }
+ cursor = hook.BigQueryBaseCursor(mock_service, 'project_id')
+ with self.assertRaises(Exception):
+ cursor.insert_all(project_id, dataset_id, table_id,
+ rows, fail_on_error=True)
+
class TestLabelsInRunJob(unittest.TestCase):
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add BigQuery Streaming insert_all to BigQueryHook
> -------------------------------------------------
>
> Key: AIRFLOW-3332
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3332
> Project: Apache Airflow
> Issue Type: New Feature
> Reporter: Ryan Yuan
> Assignee: Ryan Yuan
> Priority: Major
> Fix For: 2.0.0
>
>
> Add a function to BigQueryHook to allow inserting one or more rows into a
> BigQuery table.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)