ashb commented on a change in pull request #4174: [AIRFLOW-571] Airflow CLI:
add gunicorn_config param and refactor webserver cli function
URL: https://github.com/apache/incubator-airflow/pull/4174#discussion_r240124435
##########
File path: airflow/bin/cli.py
##########
@@ -833,136 +833,159 @@ def start_refresh(gunicorn_master_proc):
@cli_utils.action_logging
def webserver(args):
- print(settings.HEADER)
+ return WebserverCommand(args)()
- access_logfile = args.access_logfile or conf.get('webserver',
'access_logfile')
- error_logfile = args.error_logfile or conf.get('webserver',
'error_logfile')
- num_workers = args.workers or conf.get('webserver', 'workers')
- worker_timeout = (args.worker_timeout or
- conf.get('webserver', 'web_server_worker_timeout'))
- ssl_cert = args.ssl_cert or conf.get('webserver', 'web_server_ssl_cert')
- ssl_key = args.ssl_key or conf.get('webserver', 'web_server_ssl_key')
- if not ssl_cert and ssl_key:
- raise AirflowException(
- 'An SSL certificate must also be provided for use with ' + ssl_key)
- if ssl_cert and not ssl_key:
- raise AirflowException(
- 'An SSL key must also be provided for use with ' + ssl_cert)
- if args.debug:
- print(
- "Starting the web server on port {0} and host {1}.".format(
- args.port, args.hostname))
- if settings.RBAC:
- app, _ = create_app_rbac(conf, testing=conf.get('core',
'unit_test_mode'))
+class WebserverCommand(object):
+
+ server_print_info = '''\
+ Running the Gunicorn Server with:
+ Workers: {self.args.workers}
{self.args.workerclass}
+ Host: {self.args.hostname}:{self.args.port}
+ Timeout: {self.args.worker_timeout}
+ Logfiles: {self.args.access_logfile}
{self.args.error_logfile}
+
=================================================================\
+ '''
+
+ def __init__(self, args):
+ if not args.ssl_cert and args.ssl_key:
+ raise AirflowException(
+ 'An SSL certificate must also be provided for use with ' +
args.ssl_key)
+ if args.ssl_cert and not args.ssl_key:
+ raise AirflowException(
+ 'An SSL key must also be provided for use with ' +
args.ssl_cert)
+ self.args = args
+
+ def __call__(self):
+ """ main call method """
+
+ print(settings.HEADER)
+
+ if self.args.debug:
+ self._debug_run()
else:
- app = create_app(conf, testing=conf.get('core', 'unit_test_mode'))
- app.run(debug=True, use_reloader=False if app.config['TESTING'] else
True,
- port=args.port, host=args.hostname,
- ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else
None)
- else:
- os.environ['SKIP_DAGS_PARSING'] = 'True'
- app = cached_app_rbac(conf) if settings.RBAC else cached_app(conf)
- pid, stdout, stderr, log_file = setup_locations(
- "webserver", args.pid, args.stdout, args.stderr, args.log_file)
- os.environ.pop('SKIP_DAGS_PARSING')
- if args.daemon:
- handle = setup_logging(log_file)
- stdout = open(stdout, 'w+')
- stderr = open(stderr, 'w+')
-
- print(
- textwrap.dedent('''\
- Running the Gunicorn Server with:
- Workers: {num_workers} {args.workerclass}
- Host: {args.hostname}:{args.port}
- Timeout: {worker_timeout}
- Logfiles: {access_logfile} {error_logfile}
-
=================================================================\
- '''.format(**locals())))
+ os.environ['SKIP_DAGS_PARSING'] = 'True'
+ app = cached_app_rbac(conf) if settings.RBAC else cached_app(conf)
+ pid, stdout, stderr, log_file = setup_locations(
+ "webserver", self.args.pid, self.args.stdout, self.args.stderr,
+ self.args.log_file)
+ os.environ.pop('SKIP_DAGS_PARSING')
+ if self.args.daemon:
+ handle = setup_logging(log_file)
+ stdout = open(stdout, 'w+')
+ stderr = open(stderr, 'w+')
+
+ print(textwrap.dedent(self.server_print_info.format(**locals())))
+
+ run_args = self.prepare_run_args(pid)
+
+ gunicorn_master_proc = None
+
+ def kill_proc(dummy_signum, dummy_frame):
+ gunicorn_master_proc.terminate()
+ gunicorn_master_proc.wait()
+ sys.exit(0)
+
+ if self.args.daemon:
+ base, ext = os.path.splitext(pid)
+ ctx = daemon.DaemonContext(
+ pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+ files_preserve=[handle],
+ stdout=stdout,
+ stderr=stderr,
+ signal_map={
+ signal.SIGINT: kill_proc,
+ signal.SIGTERM: kill_proc
+ },
+ )
+ with ctx:
+ subprocess.Popen(run_args, close_fds=True)
+
+ # Reading pid file directly, since Popen#pid doesn't
+ # seem to return the right value with DaemonContext.
+ while True:
+ try:
+ with open(pid) as f:
+ gunicorn_master_proc_pid = int(f.read())
+ break
+ except IOError:
+ log.debug("Waiting for gunicorn's pid file to be
created.")
+ time.sleep(0.1)
+
+ gunicorn_master_proc =
psutil.Process(gunicorn_master_proc_pid)
+ self.monitor_gunicorn(gunicorn_master_proc)
+
+ stdout.close()
+ stderr.close()
+ else:
+ gunicorn_master_proc = subprocess.Popen(run_args,
close_fds=True)
+ signal.signal(signal.SIGINT, kill_proc)
+ signal.signal(signal.SIGTERM, kill_proc)
+
+ self.monitor_gunicorn(gunicorn_master_proc)
+
+ def prepare_run_args(self, pid):
+ """ combine command to run gunicorn server """
run_args = [
'gunicorn',
- '-w', str(num_workers),
- '-k', str(args.workerclass),
- '-t', str(worker_timeout),
- '-b', args.hostname + ':' + str(args.port),
+ '-w', str(self.args.workers),
+ '-k', str(self.args.workerclass),
+ '-t', str(self.args.worker_timeout),
+ '-b', self.args.hostname + ':' + str(self.args.port),
'-n', 'airflow-webserver',
'-p', str(pid),
'-c', 'python:airflow.www.gunicorn_config',
]
- if args.access_logfile:
- run_args += ['--access-logfile', str(args.access_logfile)]
+ if self.args.access_logfile:
+ run_args += ['--access-logfile', str(self.args.access_logfile)]
- if args.error_logfile:
- run_args += ['--error-logfile', str(args.error_logfile)]
+ if self.args.error_logfile:
+ run_args += ['--error-logfile', str(self.args.error_logfile)]
- if args.daemon:
+ if self.args.daemon:
run_args += ['-D']
- if ssl_cert:
- run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
+ if self.args.ssl_cert:
+ run_args += ['--certfile', self.args.ssl_cert, '--keyfile',
+ self.args.ssl_key]
+
+ if self.args.gunicorn_config:
+ if isinstance(self.args.gunicorn_config, list):
+ self.args.gunicorn_config = self.args.gunicorn_config[0]
+ for arg in self.args.gunicorn_config.split():
+ run_args.append(arg)
Review comment:
Is this used as:
```
airflow webserver '--gunicorn-opt-a --gunicorn-opt-b'
```
If so why not just take the list directly, make it multiple args not a
single one, remove the need for lines 955-958
```suggestion
run_args += self.args.gunicorn_args
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services