Spaces:
				
			
			
	
			
			
		No application file
		
	
	
	
			
			
	
	
	
	
		
		
		No application file
		
	| """ | |
| Based on sacred/stdout_capturing.py in project Sacred | |
| https://github.com/IDSIA/sacred | |
| Author: Paul-Edouard Sarlin (skydes) | |
| """ | |
| from __future__ import division, print_function, unicode_literals | |
| import os | |
| import subprocess | |
| import sys | |
| from contextlib import contextmanager | |
| from threading import Timer | |
| def apply_backspaces_and_linefeeds(text): | |
| """ | |
| Interpret backspaces and linefeeds in text like a terminal would. | |
| Interpret text like a terminal by removing backspace and linefeed | |
| characters and applying them line by line. | |
| If final line ends with a carriage it keeps it to be concatenable with next | |
| output chunk. | |
| """ | |
| orig_lines = text.split("\n") | |
| orig_lines_len = len(orig_lines) | |
| new_lines = [] | |
| for orig_line_idx, orig_line in enumerate(orig_lines): | |
| chars, cursor = [], 0 | |
| orig_line_len = len(orig_line) | |
| for orig_char_idx, orig_char in enumerate(orig_line): | |
| if orig_char == "\r" and ( | |
| orig_char_idx != orig_line_len - 1 | |
| or orig_line_idx != orig_lines_len - 1 | |
| ): | |
| cursor = 0 | |
| elif orig_char == "\b": | |
| cursor = max(0, cursor - 1) | |
| else: | |
| if ( | |
| orig_char == "\r" | |
| and orig_char_idx == orig_line_len - 1 | |
| and orig_line_idx == orig_lines_len - 1 | |
| ): | |
| cursor = len(chars) | |
| if cursor == len(chars): | |
| chars.append(orig_char) | |
| else: | |
| chars[cursor] = orig_char | |
| cursor += 1 | |
| new_lines.append("".join(chars)) | |
| return "\n".join(new_lines) | |
| def flush(): | |
| """Try to flush all stdio buffers, both from python and from C.""" | |
| try: | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| except (AttributeError, ValueError, IOError): | |
| pass # unsupported | |
| # Duplicate stdout and stderr to a file. Inspired by: | |
| # http://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/ | |
| # http://stackoverflow.com/a/651718/1388435 | |
| # http://stackoverflow.com/a/22434262/1388435 | |
| def capture_outputs(filename): | |
| """Duplicate stdout and stderr to a file on the file descriptor level.""" | |
| with open(str(filename), "a+") as target: | |
| original_stdout_fd = 1 | |
| original_stderr_fd = 2 | |
| target_fd = target.fileno() | |
| # Save a copy of the original stdout and stderr file descriptors | |
| saved_stdout_fd = os.dup(original_stdout_fd) | |
| saved_stderr_fd = os.dup(original_stderr_fd) | |
| tee_stdout = subprocess.Popen( | |
| ["tee", "-a", "-i", "/dev/stderr"], | |
| start_new_session=True, | |
| stdin=subprocess.PIPE, | |
| stderr=target_fd, | |
| stdout=1, | |
| ) | |
| tee_stderr = subprocess.Popen( | |
| ["tee", "-a", "-i", "/dev/stderr"], | |
| start_new_session=True, | |
| stdin=subprocess.PIPE, | |
| stderr=target_fd, | |
| stdout=2, | |
| ) | |
| flush() | |
| os.dup2(tee_stdout.stdin.fileno(), original_stdout_fd) | |
| os.dup2(tee_stderr.stdin.fileno(), original_stderr_fd) | |
| try: | |
| yield | |
| finally: | |
| flush() | |
| # then redirect stdout back to the saved fd | |
| tee_stdout.stdin.close() | |
| tee_stderr.stdin.close() | |
| # restore original fds | |
| os.dup2(saved_stdout_fd, original_stdout_fd) | |
| os.dup2(saved_stderr_fd, original_stderr_fd) | |
| # wait for completion of the tee processes with timeout | |
| # implemented using a timer because timeout support is py3 only | |
| def kill_tees(): | |
| tee_stdout.kill() | |
| tee_stderr.kill() | |
| tee_timer = Timer(1, kill_tees) | |
| try: | |
| tee_timer.start() | |
| tee_stdout.wait() | |
| tee_stderr.wait() | |
| finally: | |
| tee_timer.cancel() | |
| os.close(saved_stdout_fd) | |
| os.close(saved_stderr_fd) | |
| # Cleanup log file | |
| with open(str(filename), "r") as target: | |
| text = target.read() | |
| text = apply_backspaces_and_linefeeds(text) | |
| with open(str(filename), "w") as target: | |
| target.write(text) | |
