potluck.timeout
Hacks in an attempt to permit running a function with an external time limit. Only the Unix-specific SIGALRM method seems to work?
timeout.py
1""" 2Hacks in an attempt to permit running a function with an external time 3limit. Only the Unix-specific SIGALRM method seems to work? 4 5timeout.py 6""" 7 8import threading 9import ctypes 10import time 11import signal 12 13from . import logging 14 15 16class TimeoutError(Exception): 17 """ 18 Error generated when a time limit causes a function to be 19 terminated. 20 """ 21 pass 22 23 24def with_threading_timeout(time_limit, function, args): 25 """ 26 Attempts to run a function with a time_limit using a child thread and 27 the timeout argument to `threading.Thread.join`. Note that running 28 some kinds of code in a child thread can be disastrous (e.g., turtle 29 graphics). 30 """ 31 result = None 32 33 class PayloadThread(threading.Thread): 34 """ 35 A thread that runs a payload and has the ability to (attempt 36 to) kill itself. 37 """ 38 # TODO: Trying to run turtle code in a secondary thread crashes 39 def run(self): 40 """ 41 Runs the payload and smuggles out the result via a 42 non-local variable. 43 """ 44 nonlocal result, function, args 45 try: 46 result = function(*args) 47 except TimeoutError: 48 # TODO: Log more info here...? 49 logging.log("Payload timed out.") 50 51 def get_id(self): 52 """ 53 Hack to get our thread ID (hidden threading module 54 state). 55 """ 56 if hasattr(self, "_thread_id"): 57 return self._thread_id 58 else: 59 for tid, thread in threading._active.items(): 60 if thread is self: 61 return tid 62 63 def timeout(self): 64 thread_id = self.get_id() 65 res = ctypes.pythonapi.PyThreadState_SetAsyncExc( 66 thread_id, 67 ctypes.py_object( 68 TimeoutError( 69 f"Test took too long to run (more than" 70 f" {time_limit:.3f}s (in child thread)." 71 ) 72 ) 73 ) 74 if res > 1: 75 ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0) 76 logging.log( 77 "Failed to raise timeout exception via ctypes!" 78 ) 79 80 # Measure actual elapsed time 81 start_time = time.monotonic() 82 83 # Create our payload thread 84 payload_thread = PayloadThread() 85 payload_thread.start() # starts the payload thread 86 87 # join, but only wait so long 88 payload_thread.join(timeout=time_limit) 89 90 # If it's still alive, we'll attempt to kill it and raise an 91 # exception ourselves... 92 if payload_thread.is_alive(): 93 payload_thread.timeout() 94 elapsed = time.monotonic() - start_time 95 raise TimeoutError( 96 f"Test took too long to run (more than {time_limit:.3f}s" 97 f"; actual time elapsed was {elapsed:.3f}s)." 98 ) 99 100 # Return our result if we got here... 101 return result 102 103 104def with_sigalrm_timeout(time_limit, function, args): 105 """ 106 Attempts to run a function with a time limit using signal's alarm 107 functionality (which is unfortunately Unix-specific). If that 108 functionality isn't available, it just runs the function without 109 enforcing a timeout, but raises an exception if the function takes 110 too long in the end. 111 112 Note that this is NOT re-entrant: only one timer can be running at 113 once. That's probably fine though... 114 """ 115 # Record start time 116 start_time = time.monotonic() 117 118 if hasattr(signal, 'SIGALRM'): 119 # We can enforce our time limit 120 def handle_timeout(signum, frame): 121 """ 122 Raises a TimeoutError since the signal will be generated when 123 the time is up. 124 """ 125 elapsed = time.monotonic() - start_time 126 raise TimeoutError( 127 f"Test took too long to run (more than {time_limit:.3f}s" 128 f"; actual time elapsed was {elapsed:.3f}s)." 129 ) 130 131 # Set up signal handler (we don't care about the old handler) 132 _ = signal.signal(signal.SIGALRM, handle_timeout) 133 # Set up timer (we'll try to at least log collisions) 134 old_delay, _ = signal.setitimer(signal.ITIMER_REAL, time_limit) 135 if old_delay != 0.0: 136 logging.debug_msg("setitimer collision") 137 138 # Run our function, making sure to clean up afterwards whether 139 # we're interrupted or not: 140 try: 141 result = function(*args) 142 finally: 143 # Unset signal handler 144 signal.signal(signal.SIGALRM, signal.SIG_DFL) 145 # Stop timer as well 146 signal.setitimer(signal.ITIMER_REAL, 0) 147 else: 148 # No way to enforce time limit, but we can penalize in the end 149 result = function(*args) 150 151 # Check elapsed time and still raise an error if we went over 152 elapsed = time.monotonic() - start_time 153 if elapsed > time_limit: 154 raise TimeoutError( 155 f"Test took too long to run (more than {time_limit:.3f}s" 156 f"; actual time elapsed was {elapsed:.3f}s)." 157 ) 158 159 # Either way, if we make it here return our result 160 return result
17class TimeoutError(Exception): 18 """ 19 Error generated when a time limit causes a function to be 20 terminated. 21 """ 22 pass
Error generated when a time limit causes a function to be terminated.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
25def with_threading_timeout(time_limit, function, args): 26 """ 27 Attempts to run a function with a time_limit using a child thread and 28 the timeout argument to `threading.Thread.join`. Note that running 29 some kinds of code in a child thread can be disastrous (e.g., turtle 30 graphics). 31 """ 32 result = None 33 34 class PayloadThread(threading.Thread): 35 """ 36 A thread that runs a payload and has the ability to (attempt 37 to) kill itself. 38 """ 39 # TODO: Trying to run turtle code in a secondary thread crashes 40 def run(self): 41 """ 42 Runs the payload and smuggles out the result via a 43 non-local variable. 44 """ 45 nonlocal result, function, args 46 try: 47 result = function(*args) 48 except TimeoutError: 49 # TODO: Log more info here...? 50 logging.log("Payload timed out.") 51 52 def get_id(self): 53 """ 54 Hack to get our thread ID (hidden threading module 55 state). 56 """ 57 if hasattr(self, "_thread_id"): 58 return self._thread_id 59 else: 60 for tid, thread in threading._active.items(): 61 if thread is self: 62 return tid 63 64 def timeout(self): 65 thread_id = self.get_id() 66 res = ctypes.pythonapi.PyThreadState_SetAsyncExc( 67 thread_id, 68 ctypes.py_object( 69 TimeoutError( 70 f"Test took too long to run (more than" 71 f" {time_limit:.3f}s (in child thread)." 72 ) 73 ) 74 ) 75 if res > 1: 76 ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0) 77 logging.log( 78 "Failed to raise timeout exception via ctypes!" 79 ) 80 81 # Measure actual elapsed time 82 start_time = time.monotonic() 83 84 # Create our payload thread 85 payload_thread = PayloadThread() 86 payload_thread.start() # starts the payload thread 87 88 # join, but only wait so long 89 payload_thread.join(timeout=time_limit) 90 91 # If it's still alive, we'll attempt to kill it and raise an 92 # exception ourselves... 93 if payload_thread.is_alive(): 94 payload_thread.timeout() 95 elapsed = time.monotonic() - start_time 96 raise TimeoutError( 97 f"Test took too long to run (more than {time_limit:.3f}s" 98 f"; actual time elapsed was {elapsed:.3f}s)." 99 ) 100 101 # Return our result if we got here... 102 return result
Attempts to run a function with a time_limit using a child thread and
the timeout argument to threading.Thread.join
. Note that running
some kinds of code in a child thread can be disastrous (e.g., turtle
graphics).
105def with_sigalrm_timeout(time_limit, function, args): 106 """ 107 Attempts to run a function with a time limit using signal's alarm 108 functionality (which is unfortunately Unix-specific). If that 109 functionality isn't available, it just runs the function without 110 enforcing a timeout, but raises an exception if the function takes 111 too long in the end. 112 113 Note that this is NOT re-entrant: only one timer can be running at 114 once. That's probably fine though... 115 """ 116 # Record start time 117 start_time = time.monotonic() 118 119 if hasattr(signal, 'SIGALRM'): 120 # We can enforce our time limit 121 def handle_timeout(signum, frame): 122 """ 123 Raises a TimeoutError since the signal will be generated when 124 the time is up. 125 """ 126 elapsed = time.monotonic() - start_time 127 raise TimeoutError( 128 f"Test took too long to run (more than {time_limit:.3f}s" 129 f"; actual time elapsed was {elapsed:.3f}s)." 130 ) 131 132 # Set up signal handler (we don't care about the old handler) 133 _ = signal.signal(signal.SIGALRM, handle_timeout) 134 # Set up timer (we'll try to at least log collisions) 135 old_delay, _ = signal.setitimer(signal.ITIMER_REAL, time_limit) 136 if old_delay != 0.0: 137 logging.debug_msg("setitimer collision") 138 139 # Run our function, making sure to clean up afterwards whether 140 # we're interrupted or not: 141 try: 142 result = function(*args) 143 finally: 144 # Unset signal handler 145 signal.signal(signal.SIGALRM, signal.SIG_DFL) 146 # Stop timer as well 147 signal.setitimer(signal.ITIMER_REAL, 0) 148 else: 149 # No way to enforce time limit, but we can penalize in the end 150 result = function(*args) 151 152 # Check elapsed time and still raise an error if we went over 153 elapsed = time.monotonic() - start_time 154 if elapsed > time_limit: 155 raise TimeoutError( 156 f"Test took too long to run (more than {time_limit:.3f}s" 157 f"; actual time elapsed was {elapsed:.3f}s)." 158 ) 159 160 # Either way, if we make it here return our result 161 return result
Attempts to run a function with a time limit using signal's alarm functionality (which is unfortunately Unix-specific). If that functionality isn't available, it just runs the function without enforcing a timeout, but raises an exception if the function takes too long in the end.
Note that this is NOT re-entrant: only one timer can be running at once. That's probably fine though...