tfiala created this revision. tfiala added a reviewer: clayborg. tfiala added a subscriber: lldb-commits.
This change does the following: - Adds a mechanism for adding arbitrary event content to all test events from the command line. - Adds a 0-based worker_index key into all inferior test events. This allows ResultsFormatters to know which worker queue is/was performing the work for which the event is raised. We can use this in interesting ways. More to come. For the parallel test runner, all methods of running tests (multiprocessing, threading, pool variants, and serial) all work this in properly. - Now supports --results-file set to "stdout" or "stderr". If so, the results formatter uses stdout/stderr of the dotest.py process. - Adds a test_results.DumpFormatter option that prints the pretty-print (pprint module) form of the test events to the results file. Calling 'dotest.py --results=file=stdout --results-formatter test_results.DumpFormatter' is a great way to see the stream of test events as they are generated. http://reviews.llvm.org/D12983 Files: test/dosep.py test/dotest.py test/dotest_args.py test/test_results.py
Index: test/test_results.py =================================================================== --- test/test_results.py +++ test/test_results.py @@ -12,6 +12,7 @@ import cPickle import inspect import os +import pprint import re import sys import threading @@ -22,6 +23,9 @@ class EventBuilder(object): """Helper class to build test result event dictionaries.""" + + BASE_DICTIONARY = None + @staticmethod def _get_test_name_info(test): """Returns (test-class-name, test-method-name) from a test case instance. @@ -46,12 +50,19 @@ @return event dictionary with common event fields set. """ test_class_name, test_name = EventBuilder._get_test_name_info(test) - return { + + if EventBuilder.BASE_DICTIONARY is not None: + # Start with a copy of the "always include" entries. + result = dict(EventBuilder.BASE_DICTIONARY) + else: + result = {} + result.update({ "event": event_type, "test_class": test_class_name, "test_name": test_name, "event_time": time.time() - } + }) + return result @staticmethod def _error_tuple_class(error_tuple): @@ -122,9 +133,9 @@ event = EventBuilder._event_dictionary_test_result(test, status) event["issue_class"] = EventBuilder._error_tuple_class(error_tuple) event["issue_message"] = EventBuilder._error_tuple_message(error_tuple) - tb = EventBuilder._error_tuple_traceback(error_tuple) - if tb is not None: - event["issue_backtrace"] = traceback.format_tb(tb) + backtrace = EventBuilder._error_tuple_traceback(error_tuple) + if backtrace is not None: + event["issue_backtrace"] = traceback.format_tb(backtrace) return event @staticmethod @@ -246,8 +257,38 @@ event["issue_phase"] = "cleanup" return event + @staticmethod + def add_entries_to_all_events(entries_dict): + """Specifies a dictionary of entries to add to all test events. + This provides a mechanism for, say, a parallel test runner to + indicate to each inferior dotest.py that it should add a + worker index to each. + + Calling this method replaces all previous entries added + by a prior call to this. + + Event build methods will overwrite any entries that collide. + Thus, the passed in dictionary is the base, which gets merged + over by event building when keys collide. + + @param entries_dict a dictionary containing key and value + pairs that should be merged into all events created by the + event generator. May be None to clear out any extra entries. + """ + EventBuilder.BASE_DICTIONARY = dict(entries_dict) + + @staticmethod + def base_event(): + """@return the base event dictionary that all events should contain.""" + if EventBuilder.BASE_DICTIONARY is not None: + return dict(EventBuilder.BASE_DICTIONARY) + else: + return None + + class ResultsFormatter(object): + """Provides interface to formatting test results out to a file-like object. This class allows the LLDB test framework's raw test-realted @@ -420,6 +461,9 @@ @staticmethod def _build_illegal_xml_regex(): + """Contructs a regex to match all illegal xml characters. + + Expects to be used against a unicode string.""" # Construct the range pairs of invalid unicode chareacters. illegal_chars_u = [ (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84), @@ -453,6 +497,15 @@ return xml.sax.saxutils.quoteattr(text) def _replace_invalid_xml(self, str_or_unicode): + """Replaces invalid XML characters with a '?'. + + @param str_or_unicode a string to replace invalid XML + characters within. Can be unicode or not. If not unicode, + assumes it is a byte string in utf-8 encoding. + + @returns a utf-8-encoded byte string with invalid + XML replaced with '?'. + """ # Get the content into unicode if isinstance(str_or_unicode, str): unicode_content = str_or_unicode.decode('utf-8') @@ -472,6 +525,11 @@ XunitFormatter.RM_FAILURE, XunitFormatter.RM_PASSTHRU] parser.add_argument( + "--assert-on-unknown-events", + action="store_true", + help=('cause unknown test events to generate ' + 'a python assert. Default is to ignore.')) + parser.add_argument( "--xpass", action="store", choices=results_mapping_choices, default=XunitFormatter.RM_FAILURE, help=('specify mapping from unexpected success to jUnit/xUnit ' @@ -533,8 +591,10 @@ elif event_type == "test_result": self._process_test_result(test_event) else: - sys.stderr.write("unknown event type {} from {}\n".format( - event_type, test_event)) + # This is an unknown event. + if self.options.assert_on_unknown_events: + raise Exception("unknown event type {} from {}\n".format( + event_type, test_event)) def _handle_success(self, test_event): """Handles a test success. @@ -817,27 +877,40 @@ def begin_session(self): super(RawPickledFormatter, self).begin_session() - self.process_event({ + event = EventBuilder.base_event() + if event is None: + event = {} + event.update({ "event": "session_begin", "event_time": time.time(), "pid": self.pid }) + self.process_event(event) def process_event(self, test_event): super(RawPickledFormatter, self).process_event(test_event) - # Add pid to the event for tracking. - # test_event["pid"] = self.pid - # Send it as {serialized_length_of_serialized_bytes}#{serialized_bytes} pickled_message = cPickle.dumps(test_event) self.out_file.send( "{}#{}".format(len(pickled_message), pickled_message)) def end_session(self): - self.process_event({ + event = EventBuilder.base_event() + if event is None: + event = {} + event.update({ "event": "session_end", "event_time": time.time(), "pid": self.pid }) + self.process_event(event) super(RawPickledFormatter, self).end_session() + + +class DumpFormatter(ResultsFormatter): + """Formats events to the file as their raw python dictionary format.""" + + def process_event(self, test_event): + super(DumpFormatter, self).process_event(test_event) + self.out_file.write("\n" + pprint.pformat(test_event) + "\n") Index: test/dotest_args.py =================================================================== --- test/dotest_args.py +++ test/dotest_args.py @@ -165,6 +165,11 @@ help=('Specify comma-separated options to pass to the formatter. ' 'Use --results-formatter-options="--option1[,--option2[,...]]" ' 'syntax. Note the "=" is critical, and don\'t use whitespace.')) + group.add_argument( + '--event-add-entries', + action='store', + help=('Specify comma-separated KEY=VAL entries to add key and value ' + 'pairs to all test events generated by this test run.')) # Remove the reference to our helper function del X Index: test/dotest.py =================================================================== --- test/dotest.py +++ test/dotest.py @@ -823,6 +823,19 @@ lldb_platform_url = args.lldb_platform_url if args.lldb_platform_working_dir: lldb_platform_working_dir = args.lldb_platform_working_dir + + if args.event_add_entries and len(args.event_add_entries) > 0: + entries = {} + # Parse out key=val pairs, separated by comma + for keyval in args.event_add_entries.split(","): + key_val_entry = keyval.split("=") + if len(key_val_entry) == 2: + entries[key_val_entry[0]] = key_val_entry[1] + # Tell the event builder to create all events with these + # key/val pairs in them. + if len(entries) > 0: + test_results.EventBuilder.add_entries_to_all_events(entries) + # Gather all the dirs passed on the command line. if len(args.args) > 0: testdirs = map(os.path.abspath, args.args) @@ -947,8 +960,15 @@ if results_filename: # Open the results file for writing. - results_file_object = open(results_filename, "w") - cleanup_func = results_file_object.close + if results_filename == 'stdout': + results_file_object = sys.stdout + cleanup_func = None + elif results_filename == 'stderr': + results_file_object = sys.stderr + cleanup_func = None + else: + results_file_object = open(results_filename, "w") + cleanup_func = results_file_object.close default_formatter_name = "test_results.XunitFormatter" elif results_port: # Connect to the specified localhost port. @@ -995,7 +1015,8 @@ results_formatter_object.end_session() # And now close out the output file-like object. - cleanup_func() + if cleanup_func is not None: + cleanup_func() atexit.register(shutdown_formatter) Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -80,7 +80,12 @@ RUNNER_PROCESS_ASYNC_MAP = None RESULTS_LISTENER_CHANNEL = None -def setup_global_variables(lock, counter, total, name_len, options): +"""Contains an optional function pointer that can return the worker index + for the given thread/process calling it. Returns a 0-based index.""" +GET_WORKER_INDEX = None + +def setup_global_variables( + lock, counter, total, name_len, options, worker_index_map): global output_lock, test_counter, total_tests, test_name_len global dotest_options output_lock = lock @@ -89,7 +94,23 @@ test_name_len = name_len dotest_options = options + if worker_index_map is not None: + # We'll use the output lock for this to avoid sharing another lock. + # This won't be used much. + index_lock = lock + def get_worker_index_use_pid(): + """Returns a 0-based, process-unique index for the worker.""" + pid = os.getpid() + with index_lock: + if pid not in worker_index_map: + worker_index_map[pid] = len(worker_index_map) + return worker_index_map[pid] + + global GET_WORKER_INDEX + GET_WORKER_INDEX = get_worker_index_use_pid + + def report_test_failure(name, command, output): global output_lock with output_lock: @@ -150,31 +171,6 @@ return passes, failures, unexpected_successes -def inferior_session_interceptor(forwarding_func, event): - """Intercepts session begin/end events, passing through everyting else. - - @param forwarding_func a callable object to pass along the event if it - is not one that gets intercepted. - - @param event the test result event received. - """ - - if event is not None and isinstance(event, dict): - if "event" in event: - if event["event"] == "session_begin": - # Swallow it. Could report on inferior here if we - # cared. - return - elif event["event"] == "session_end": - # Swallow it. Could report on inferior here if we - # cared. More usefully, we can verify that the - # inferior went down hard if we don't receive this. - return - - # Pass it along. - forwarding_func(event) - - def call_with_timeout(command, timeout, name, inferior_pid_events): """Run command with a timeout if possible. -s QUIT will create a coredump if they are enabled on your system @@ -183,6 +179,10 @@ if timeout_command and timeout != "0": command = [timeout_command, '-s', 'QUIT', timeout] + command + if GET_WORKER_INDEX is not None: + worker_index = GET_WORKER_INDEX() + command.extend([ + "--event-add-entries", "worker_index={}".format(worker_index)]) # Specifying a value for close_fds is unsupported on Windows when using # subprocess.PIPE if os.name != "nt": @@ -263,7 +263,8 @@ def process_dir_worker_multiprocessing( a_output_lock, a_test_counter, a_total_tests, a_test_name_len, - a_dotest_options, job_queue, result_queue, inferior_pid_events): + a_dotest_options, job_queue, result_queue, inferior_pid_events, + worker_index_map): """Worker thread main loop when in multiprocessing mode. Takes one directory specification at a time and works on it.""" @@ -273,7 +274,7 @@ # Setup the global state for the worker process. setup_global_variables( a_output_lock, a_test_counter, a_total_tests, a_test_name_len, - a_dotest_options) + a_dotest_options, worker_index_map) # Keep grabbing entries from the queue until done. while not job_queue.empty(): @@ -441,14 +442,36 @@ # rest of the flat module. global output_lock output_lock = multiprocessing.RLock() + initialize_global_vars_common(num_threads, test_work_items) def initialize_global_vars_threading(num_threads, test_work_items): + """Initializes global variables used in threading mode. + @param num_threads specifies the number of workers used. + @param test_work_items specifies all the work items + that will be processed. + """ # Initialize the global state we'll use to communicate with the # rest of the flat module. global output_lock output_lock = threading.RLock() + + index_lock = threading.RLock() + index_map = {} + + def get_worker_index_threading(): + """Returns a 0-based, thread-unique index for the worker thread.""" + thread_id = threading.current_thread().ident + with index_lock: + if thread_id not in index_map: + index_map[thread_id] = len(index_map) + return index_map[thread_id] + + + global GET_WORKER_INDEX + GET_WORKER_INDEX = get_worker_index_threading + initialize_global_vars_common(num_threads, test_work_items) @@ -630,6 +653,10 @@ # hold 2 * (num inferior dotest.py processes started) entries. inferior_pid_events = multiprocessing.Queue(4096) + # Worker dictionary allows each worker to figure out its worker index. + manager = multiprocessing.Manager() + worker_index_map = manager.dict() + # Create workers. We don't use multiprocessing.Pool due to # challenges with handling ^C keyboard interrupts. workers = [] @@ -643,7 +670,8 @@ dotest_options, job_queue, result_queue, - inferior_pid_events)) + inferior_pid_events, + worker_index_map)) worker.start() workers.append(worker) @@ -717,11 +745,14 @@ # Initialize our global state. initialize_global_vars_multiprocessing(num_threads, test_work_items) + manager = multiprocessing.Manager() + worker_index_map = manager.dict() + pool = multiprocessing.Pool( num_threads, initializer=setup_global_variables, initargs=(output_lock, test_counter, total_tests, test_name_len, - dotest_options)) + dotest_options, worker_index_map)) # Start the map operation (async mode). map_future = pool.map_async( @@ -819,6 +850,10 @@ # Initialize our global state. initialize_global_vars_multiprocessing(1, test_work_items) + # We're always worker index 0 + global GET_WORKER_INDEX + GET_WORKER_INDEX = lambda: 0 + # Run the listener and related channel maps in a separate thread. # global RUNNER_PROCESS_ASYNC_MAP global RESULTS_LISTENER_CHANNEL @@ -861,8 +896,7 @@ # listener channel and tell the inferior to send results to the # port on which we'll be listening. if RESULTS_FORMATTER is not None: - forwarding_func = lambda event: inferior_session_interceptor( - RESULTS_FORMATTER.process_event, event) + forwarding_func = RESULTS_FORMATTER.process_event RESULTS_LISTENER_CHANNEL = ( dotest_channels.UnpicklingForwardingListenerChannel( RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func)) @@ -1088,6 +1122,11 @@ if dotest_options.results_formatter_options is not None: _remove_option(dotest_argv, "--results-formatter-options", 2) + # Remove test runner name if present. + if dotest_options.test_runner_name is not None: + _remove_option(dotest_argv, "--test-runner-name", 2) + + def main(print_details_on_success, num_threads, test_subdir, test_runner_name, results_formatter): """Run dotest.py in inferior mode in parallel.
_______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits