ashb closed pull request #4175: [AIRFLOW-3072] Assign permission 
get_logs_with_metadata to viewer role
URL: https://github.com/apache/incubator-airflow/pull/4175
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/www_rbac/security.py b/airflow/www_rbac/security.py
index 2405aa8f80..72691565e6 100644
--- a/airflow/www_rbac/security.py
+++ b/airflow/www_rbac/security.py
@@ -75,6 +75,7 @@
     'can_task_stats',
     'can_code',
     'can_log',
+    'can_get_logs_with_metadata',
     'can_tries',
     'can_graph',
     'can_tree',
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index d7f099d6be..f378783e90 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -786,5 +786,112 @@ def test_start_date_filter(self):
         pass
 
 
+class TestLogViewPermission(TestBase):
+    """
+    Test Airflow DAG acl
+    """
+    default_date = timezone.datetime(2018, 6, 1)
+    run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestLogViewPermission, cls).setUpClass()
+
+    def cleanup_dagruns(self):
+        DR = models.DagRun
+        dag_ids = ['example_bash_operator',
+                   'example_subdag_operator']
+        (self.session
+             .query(DR)
+             .filter(DR.dag_id.in_(dag_ids))
+             .filter(DR.run_id == self.run_id)
+             .delete(synchronize_session='fetch'))
+        self.session.commit()
+
+    def prepare_dagruns(self):
+        dagbag = models.DagBag(include_examples=True)
+        self.bash_dag = dagbag.dags['example_bash_operator']
+        self.sub_dag = dagbag.dags['example_subdag_operator']
+
+        self.bash_dagrun = self.bash_dag.create_dagrun(
+            run_id=self.run_id,
+            execution_date=self.default_date,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING)
+
+        self.sub_dagrun = self.sub_dag.create_dagrun(
+            run_id=self.run_id,
+            execution_date=self.default_date,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING)
+
+    def setUp(self):
+        super(TestLogViewPermission, self).setUp()
+        self.cleanup_dagruns()
+        self.prepare_dagruns()
+        self.logout()
+
+    def login(self, username=None, password=None):
+        role_admin = self.appbuilder.sm.find_role('Admin')
+        tester = self.appbuilder.sm.find_user(username='test_admin')
+        if not tester:
+            self.appbuilder.sm.add_user(
+                username='test_admin',
+                first_name='test_admin',
+                last_name='test_admin',
+                email='[email protected]',
+                role=role_admin,
+                password='test_admin')
+
+        role_user = self.appbuilder.sm.find_role('User')
+        test_user = self.appbuilder.sm.find_user(username='test_user')
+        if not test_user:
+            self.appbuilder.sm.add_user(
+                username='test_user',
+                first_name='test_user',
+                last_name='test_user',
+                email='[email protected]',
+                role=role_user,
+                password='test_user')
+
+        return self.client.post('/login/', data=dict(
+            username=username,
+            password=password
+        ))
+
+    def logout(self):
+        return self.client.get('/logout/')
+
+    def test_log_success_for_admin(self):
+        self.logout()
+        self.login(username='test_admin',
+                   password='test_admin')
+        url = 
('log?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('Log by attempts', resp)
+        url = 
('get_logs_with_metadata?task_id=runme_0&dag_id=example_bash_operator&'
+               'execution_date={}&try_number=1&metadata=null'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('"message":', resp)
+        self.check_content_in_response('"metadata":', resp)
+
+    def test_log_success_for_user(self):
+        self.logout()
+        self.login(username='test_user',
+                   password='test_user')
+        url = 
('log?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('Log by attempts', resp)
+        url = 
('get_logs_with_metadata?task_id=runme_0&dag_id=example_bash_operator&'
+               'execution_date={}&try_number=1&metadata=null'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('"message":', resp)
+        self.check_content_in_response('"metadata":', resp)
+
+
 if __name__ == '__main__':
     unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to