potluck_server.sync
Simple synchronization & caching for Flask apps.
sync.py
Implements simple/inefficient shared state through the filesystem. For read-only access w/ caching to reduce disk overhead, provides load_or_get_cached: a thread-safe way of loading from a file (if multiple WSGI processes are used, each will have its own cache). For read/write access to control application global state, access_state_file is provided: a process-safe method (using a Manager that gets started automatically) to read or write file contents. By default, both functions are JSON-in, JSON-out, but raw strings can be requested instead.
1""" 2Simple synchronization & caching for Flask apps. 3 4sync.py 5 6Implements simple/inefficient shared state through the filesystem. For 7read-only access w/ caching to reduce disk overhead, provides 8load_or_get_cached: a thread-safe way of loading from a file (if multiple 9WSGI processes are used, each will have its own cache). For read/write 10access to control application global state, access_state_file is 11provided: a process-safe method (using a Manager that gets started 12automatically) to read or write file contents. By default, both functions 13are JSON-in, JSON-out, but raw strings can be requested instead. 14""" 15 16__version__ = "0.9.1" 17 18from flask import json 19import os, sys, copy, time 20import threading 21import multiprocessing, multiprocessing.connection, multiprocessing.managers 22 23# Python 2/3 dual compatibility 24if sys.version_info[0] < 3: 25 reload(sys) # noqa F821 26 sys.setdefaultencoding('utf-8') 27 import socket 28 ConnectionRefusedError = socket.error 29 IOError_or_FileNotFoundError = IOError 30 OSError_or_FileNotFoundError = OSError 31 AuthenticationError = multiprocessing.AuthenticationError 32else: 33 IOError_or_FileNotFoundError = FileNotFoundError 34 OSError_or_FileNotFoundError = FileNotFoundError 35 AuthenticationError = multiprocessing.context.AuthenticationError 36 37# Python 2.7.5 multiprocessing bug workaround 38# Bug thread of same behavior: 39# https://bugzilla.redhat.com/show_bug.cgi?id=1717330 40# Python bugfix: 41# https://github.com/python/cpython/commit/e8a57b98ec8f2b161d4ad68ecc1433c9e3caad57 42# Source for this workaround: https://github.com/oamg/leapp/pull/533/files 43 44# Implements: 45# https://github.com/python/cpython/commit/e8a57b98ec8f2b161d4ad68ecc1433c9e3caad57 46# 47# Detection of fix: os imported to compare pids, before the fix os has not 48# been imported 49import multiprocessing.util 50if getattr(multiprocessing.util, 'os', None): 51 # we're on a version that has the fix; no need to apply it 52 pass 53else: 54 # we'd better apply the fix 55 class FixedFinalize(multiprocessing.util.Finalize): 56 def __init__(self, *args, **kwargs): 57 super(FixedFinalize, self).__init__(*args, **kwargs) 58 self._pid = os.getpid() 59 60 def __call__(self, *args, **kwargs): 61 if self._pid != os.getpid(): 62 return None 63 return super(FixedFinalize, self).__call__(*args, **kwargs) 64 65 setattr(multiprocessing.util, 'Finalize', FixedFinalize) 66 67 68#----------------------# 69# Core caching routine # 70#----------------------# 71 72class AbortGeneration: 73 """ 74 Class to signal that generation of a cached item should not proceed. 75 Holds a default value to return instead. 76 """ 77 def __init__(self, replacement): 78 self.replacement = replacement 79 80 81class NotInCache: 82 """ 83 Placeholder for recognizing that a value is not in the cache (when 84 e.g., None might be a valid cache value). 85 """ 86 pass 87 88 89def _gen_or_get_cached(lock, cache, cache_key, check_dirty, result_generator): 90 """ 91 Common functionality that uses a reentrant lock and a cache 92 dictionary to return a cached value if the cached value is fresh. The 93 value from the cache is deep-copied before being returned, so that 94 any modifications to the returned value shouldn't alter the cache. 95 Parameters are: 96 97 lock: Specifies the lock to use. Should be a threading.RLock. 98 cache: The cache dictionary. 99 cache_key: String key for this cache item. 100 check_dirty: Function which will be given the cache_key and a 101 timestamp and must return True if the cached value (created 102 at that instant) is dirty (needs to be updated) and False 103 otherwise. May also return an AbortGeneration instance with a 104 default value inside to be returned directly. If there is no 105 cached value, check_dirty will be given a timestamp of None. 106 result_generator: Function to call to build a new result if the 107 cached value is stale. This new result will be cached. It 108 will be given the cache_key as a parameter. 109 """ 110 with lock: 111 # We need to read the file contents and return them. 112 cache_ts, cached = cache.get(cache_key, (None, NotInCache)) 113 safe_cached = copy.deepcopy(cached) 114 115 # Get mtime of file, or find out it's missing. No point in 116 # caching the missing value, because the mtime check the 117 # next time will hit the same branch. 118 is_dirty = check_dirty(cache_key, cache_ts) 119 if isinstance(is_dirty, AbortGeneration): 120 # check_fresh calls for an abort: return replacement value 121 return is_dirty.replacement 122 elif not is_dirty and cached != NotInCache: 123 # Cache is fresh: return cached value 124 return safe_cached 125 else: 126 # Cache is stale 127 128 # Get timestamp before we even start generating value: 129 ts = time.time() 130 131 # Generate reuslt: 132 result = result_generator(cache_key) 133 134 # Safely store new result value + timestamp in cache: 135 with lock: 136 cache[cache_key] = (ts, result) 137 # Don't allow outside code to mess with internals of 138 # cached value (JSON results could be mutable): 139 safe_result = copy.deepcopy(result) 140 141 # Return fresh deep copy of cached value: 142 return safe_result 143 144 145def set_new_cached_value(lock, cache, cache_key, value): 146 """ 147 Directly overrides a cached value, inserting a new value timestamped 148 when this function is called. 149 """ 150 with lock: 151 # Create timestamp first 152 ts = time.time() 153 # Store new value in cache: 154 cache[cache_key] = (ts, copy.deepcopy(value)) 155 156 157def cache_key_for(target, view): 158 """ 159 Builds a hybrid cache key value with a certain target and view. 160 """ 161 return target + "::" + view.__name__ 162 163 164def cache_key_filename(cache_key): 165 """ 166 Returns just the filename given a cache key. 167 """ 168 if '::' not in cache_key: 169 raise ValueError("Value '{}' is not a cache key!".format(cache_key)) 170 return '::'.join(cache_key.split('::')[:-1]) 171 172 173#----------------------------------# 174# View objects for managing caches # 175#----------------------------------# 176 177class View: 178 """ 179 Abstract View class to organize decoding/encoding of views. Each View 180 must define encode and decode class methods which are each others' 181 inverse. The class name is used as part of the cache key. 182 """ 183 @staticmethod 184 def encode(obj): 185 """ 186 The encode function of a View must return a string (to be written 187 to a file). 188 """ 189 raise NotImplementedError("Don't use the base View class.") 190 191 @staticmethod 192 def decode(string): 193 """ 194 The encode function of a View must accept a string, and if given 195 a string produced by encode, should return an equivalent object. 196 """ 197 raise NotImplementedError("Don't use the base View class.") 198 199 200class AsIs(View): 201 """ 202 A pass-through view that returns strings unaltered. 203 """ 204 @staticmethod 205 def encode(obj): 206 """Returns the object it is given unaltered.""" 207 return obj 208 209 @staticmethod 210 def decode(string): 211 """Returns the string it is given unaltered.""" 212 return string 213 214 215class AsJSON(View): 216 """ 217 A view that converts objects to JSON for file storage and back on 218 access. It passes through None. 219 """ 220 @staticmethod 221 def encode(obj): 222 """Returns the JSON encoding of the object.""" 223 return json.dumps(obj) 224 225 @staticmethod 226 def decode(string): 227 """ 228 Returns a JSON object parsed from the string. 229 Returns None if it gets None. 230 """ 231 if string is None: 232 return None 233 return json.loads(string) 234 235 236def build_view(name, encoder, decoder, pass_none=True): 237 """ 238 Function for building a view given a name, an encoding function, and 239 a decoding function. Unless pass_none is given as False, the decoder 240 will be skipped if the decode argument is None and the None will pass 241 through, in which case the decoder will *always* get a string as an 242 argument. 243 """ 244 class SyntheticView(View): 245 """ 246 View class created using build_view. 247 """ 248 @staticmethod 249 def encode(obj): 250 return encoder(obj) 251 252 @staticmethod 253 def decode(string): 254 if pass_none and string is None: 255 return None 256 return decoder(string) 257 258 SyntheticView.__name__ = name 259 SyntheticView.__doc__ = ( 260 "View that uses '{}' for encoding and '{}' for decoding." 261 ).format(encoder.__name__, decoder.__name__) 262 SyntheticView.encode.__doc__ = encoder.__doc__ 263 SyntheticView.decode.__doc__ = decoder.__doc__ 264 265 return SyntheticView 266 267 268#--------------------------# 269# Process-local singletons # 270#--------------------------# 271 272def build_or_get_singleton(cache_key, builder): 273 """ 274 Caches the result of the given builder function under the given cache 275 key, always returning the cached result once it's been created. 276 This is thread-safe, but each singleton is unique to the process that 277 created it. 278 """ 279 return _gen_or_get_cached( 280 _CACHE_LOCK, 281 _CACHE, 282 cache_key, 283 lambda key, ts: ts is not None, 284 builder 285 ) 286 287 288#--------------------------------------# 289# Process-local read-only file caching # 290#--------------------------------------# 291 292def build_file_freshness_checker(missing=Exception): 293 """ 294 Builds a freshness checker that checks the mtime of a filename, but 295 if that file doesn't exist, it returns AbortGeneration with the given 296 missing value (unless missing is left as the default of Exception, in 297 which case it lets the exception bubble out). 298 """ 299 def check_file_is_changed(cache_key, ts): 300 """ 301 Checks whether a file has been modified more recently than the given 302 timestamp. 303 """ 304 filename = cache_key_filename(cache_key) 305 try: 306 mtime = os.path.getmtime(filename) 307 except OSError_or_FileNotFoundError: 308 if missing == Exception: 309 raise 310 else: 311 return AbortGeneration(missing) 312 313 # File is changed if the mtime is after the given cache 314 # timestamp, or if the timestamp is None 315 return ts is None or mtime >= ts 316 317 return check_file_is_changed 318 319 320def build_file_reader(view=AsJSON): 321 """ 322 Builds a file reader function which returns the result of the given 323 view on the file contents. 324 """ 325 def read_file(cache_key): 326 """ 327 Reads a file and returns the result of calling a view's decode 328 function on the file contents. Returns None if there's an error, 329 and prints the error unless it's a FileNotFoundError. 330 """ 331 filename = cache_key_filename(cache_key) 332 try: 333 with open(filename, 'r') as fin: 334 return view.decode(fin.read()) 335 except IOError_or_FileNotFoundError: 336 return None 337 except Exception as e: 338 sys.stderr.write( 339 "[sync module] Exception viewing file:\n" + str(e) + '\n' 340 ) 341 return None 342 343 return read_file 344 345 346# Note: by using a threading.RLock and a global variable here, we are not 347# process-safe, which is fine, because this is just a cache: each process 348# in a multi-process environment can safely maintain its own cache which 349# will waste a bit of memory but not lead to corruption. As a corollary, 350# load_or_get_cached should be treated as read-only, and 351# access_state_file should be used to access a file which needs to have 352# consistent state viewable by potentially multiple processes. 353 354# TODO: We should probably have some kind of upper limit on the cache 355# size, and maintain staleness so we can get rid of stale items... 356_CACHE_LOCK = threading.RLock() # Make this reentrant just in case... 357_CACHE = {} 358 359 360def load_or_get_cached( 361 filename, 362 view=AsJSON, 363 missing=Exception 364): 365 """ 366 Loads the contents of the given file and returns the result of the 367 given view function on it (or the contents as a string if the view is 368 None). It first checks the modification time and returns a cached 369 copy of the view result if one exists. Uses a lock for thread safety. 370 The default view parses file contents as JSON and returns the 371 resulting object. If the file is missing, an exception would normally 372 be raised, but if the missing value is provided as something other 373 than Exception, a deep copy of that value will be returned instead. 374 The __name__ of the view class will be used to compute a cache key 375 for that view; avoid view name collisions. 376 377 Note: On a cache hit, a deep copy of the cached value is returned, so 378 modifying that value should not affect what is stored in the cache. 379 """ 380 381 # Figure out our view object (& cache key): 382 if view is None: 383 view = AsIs 384 385 cache_key = cache_key_for(filename, view) 386 387 # Build functions for checking freshness and reading the file 388 check_mtime = build_file_freshness_checker(missing) 389 read_file = build_file_reader(view) 390 391 return _gen_or_get_cached( 392 _CACHE_LOCK, 393 _CACHE, 394 cache_key, 395 check_mtime, 396 read_file 397 ) 398 399 400#-------------------------------------# 401# Cross-process file access & caching # 402#-------------------------------------# 403 404# A lock and cache for access_file 405_ACCESS_LOCK = threading.RLock() 406_ACCESS_CACHE = {} 407 408# The _FileAccess proxy to use for file access: 409_ACCESS = None 410 411 412def read_file( 413 filename, 414 view=AsJSON, 415 missing=Exception, 416 cache=True 417): 418 """ 419 Reads from a file, operating via proxy through a manager so that 420 multiple threads/processes' requests will happen sequentially to 421 avoid simultaneous reads/writes to any file. Returns the string 422 contents of the file, or None if the file does not exist or there's 423 some other error in accessing the file. If there's an error other 424 than the file not existing, an error message will be printed to 425 stderr. 426 427 This model is pretty draconian and sets up file access as competitive 428 across all files, so only use it for files that may be written to by 429 the server; read-only files can use load_or_get_cached instead. This 430 function does set up a local in-process cache and check file mtimes, 431 to avoid the entire proxy call if it can (only if cache is True). 432 433 The arguments behave as follows: 434 435 filename: The full path to the file to be accessed. 436 view: A View object which can encode and decode strings. AsJSON 437 is the default, which converts strings in files to objects 438 using JSON. The view's decode function is applied to file 439 contents before they are cached. Using two different views 440 with the same __name__ will cause incorrect cached values to 441 be returned. 442 missing: A value to be returned (but not cached or written to 443 disk) if the file does not exist. Leave it as Exception and 444 whatever exception causes os.path.getmtime to fail will be 445 allowed to bubble out. 446 cache: True by default, results will be cached, and the 447 file-access-via-proxy chain of events will be skipped 448 entirely if the cache value is newer than the file on disk. 449 Set to False for large and/or constantly changing files where 450 you expect this to be a waste of time or space. The cache is 451 per-process but shared between threads. 452 """ 453 454 # Figure out our view function (& cache key just in case): 455 if view is None: 456 view = AsIs 457 458 # Define our reader function which incorporates our view: 459 def read_file_contents(cache_key): 460 """ 461 File reading function that accesses file contents through an 462 _ACCESS proxy. Applies a view function. Returns None if the view 463 function encounters an error (and prints that error to stderr). 464 """ 465 filename = cache_key_filename(cache_key) 466 return view.decode(_ACCESS.read_file(filename)) 467 468 if cache: # If we're caching, do that: 469 cache_key = cache_key_for(filename, view) 470 check_mtime = build_file_freshness_checker(missing) 471 472 return _gen_or_get_cached( 473 _ACCESS_LOCK, 474 _ACCESS_CACHE, 475 cache_key, 476 check_mtime, 477 read_file_contents 478 ) 479 480 else: # Otherwise, just call our reader function 481 # But first check whether the file exists... 482 if not os.path.exists(filename): 483 if missing == Exception: 484 raise OSError("File not found: '{}'".format(filename)) 485 else: 486 return missing 487 else: 488 return read_file_contents(cache_key_for(filename, view)) 489 490 491def write_file(filename, content, view=AsJSON, cache=True): 492 """ 493 Writes the given content to the given file, encoding the content 494 using the encode function of the given view. If cache is True, 495 updates the _ACCESS_CACHE with the new file contents to avoid 496 unnecessary round trips to disk. The original content, not the 497 encoded string, will be cached. The view may be given as None to 498 write the content directly to the file as a string. 499 """ 500 if view is None: 501 view = AsIs 502 503 file_content = view.encode(content) 504 505 _ACCESS.write_file(filename, file_content) 506 507 if cache: 508 cache_key = cache_key_for(filename, view) 509 set_new_cached_value( 510 _ACCESS_LOCK, 511 _ACCESS_CACHE, 512 cache_key, 513 content 514 ) 515 516 517class _FileAccessor: 518 """ 519 A class for accessing files. Meant to be used via Proxy, with a 520 single Manager having the real instance so that all access is 521 sequential and we don't have conflicting writes or partial reads. 522 Instead of instantiating this class, use FileSystemManager.get (see 523 below). 524 """ 525 def __init__(self): 526 # This... really isn't necessary, surely. But just in case the 527 # Manager uses multiple threads... 528 self.lock = threading.RLock() 529 530 def read_file(self, filename): 531 """ 532 Reads file contents as a string, returning None if the file 533 doesn't exist or there's some other error (e.g., encoding issue). 534 If the file doesn't exist, no error is printed, but otherwise the 535 error messages is printed to stderr. 536 """ 537 with self.lock: 538 try: 539 with open(filename, 'r') as fin: 540 return fin.read() 541 except IOError_or_FileNotFoundError: 542 return None 543 except Exception as e: 544 sys.stderr.write("Exception reading file:\n" + str(e) + '\n') 545 return None 546 547 def write_file(self, filename, content): 548 """ 549 Writes the given content into the given file, replacing any 550 previous content. 551 TODO: What happens if there's an exception here? Does the manager 552 process crash entirely? 553 """ 554 with self.lock: 555 with open(filename, 'w') as fin: 556 fin.write(content) 557 558 559#--------------------# 560# Manager management # 561#--------------------# 562 563class FileSystemManager(multiprocessing.managers.BaseManager): 564 """ 565 Custom manager class for managing read/write file system access from 566 multiple threads. Go use a database instead! 567 """ 568 pass 569 570 571FileSystemManager.register("get", _FileAccessor) 572 573 574# The manager connection for this process: 575_MANAGER = None 576 577 578def init_sync(port=51723, key=None): 579 """ 580 init_sync should be called once per process, ideally early in the 581 life of the process, like right after importing the sync module. 582 Calling access_file before init_sync will fail. A file named 583 'syncauth' should exist unless a key is given (should be a 584 byte-string). If 'syncauth' doesn't exist, it will be created. Don't 585 rely too heavily on the default port, since any two apps using sync 586 with the default port on the same machine will collide. 587 """ 588 global _MANAGER, _ACCESS 589 # Get synchronization key: 590 if key is None: 591 try: 592 if os.path.exists('syncauth'): 593 with open('syncauth', 'rb') as fin: 594 key = fin.read() 595 else: 596 print("Creating new sync secret file 'syncauth'.") 597 key = os.urandom(16) 598 with open('syncauth', 'wb') as fout: 599 fout.write(key) 600 except Exception: 601 raise IOError_or_FileNotFoundError( 602 "Unable to access 'syncauth' file.\n" 603 + " Create it and put some random bytes in there, or pass" 604 + " key= to init_sync." 605 ) 606 607 _MANAGER = FileSystemManager(address=('localhost', port), authkey=key) 608 # Attempt to connect; if that fails, attempt to start a new manager 609 # if that fails (due to simultaneous starts and one wins the port 610 # that's not us) attempt to connect again. Repeat until we connect. 611 while True: 612 print("Attempting to connect to MP manager...") 613 614 # Reduce multiprocessing's connection timeout value since we're 615 # pretty sure we'll fail first and succeed later 616 old_init_timeout = multiprocessing.connection._init_timeout 617 multiprocessing.connection._init_timeout = lambda t=2.0: ( 618 time.time() + t 619 ) 620 try: 621 # NOTE: This call may just never return if we accidentally 622 # connect to e.g., an HTTP server. 623 _MANAGER.connect() # Check your port settings! 624 print("...connected successfully.") 625 break # only way out of loop is to connect successfully 626 except ConnectionRefusedError: # Nobody listening on that port 627 pass 628 except AuthenticationError: 629 raise ValueError( 630 "Your authkey is not correct. Make sure you're not" 631 + " sharing the port you chose!" 632 ) 633 print("...failed to connect...") 634 635 # Restore old timeout function 636 multiprocessing.connection._init_timeout = old_init_timeout 637 638 # We didn't get to connect? Let's try to start a manager: 639 try: 640 print("...starting manager process...") 641 _MANAGER.start() 642 print("...finished starting manager...") 643 except EOFError: # Error generated if port is in use 644 pass 645 except RuntimeError: 646 # Error generated as of 3.8 when attempting to start the 647 # manager during the post-fork module reloading phase of a 648 # manager sub-process. 649 return 650 651 # Sleep for quite a while to try to avoid calling _MANAGER.start 652 # multiple times before the first call can bind the port. 653 time.sleep(0.2) 654 655 # And now we'll return to the top of the loop and attempt to 656 # connect again. 657 658 # At this point, _MANAGER is a connected manager for this process, so 659 # we're done. 660 661 # Set up _ACCESS as a proxy object: 662 _ACCESS = _MANAGER.get() 663 664 if _ACCESS is None: 665 raise ValueError("_MANAGER.get() returned None")
73class AbortGeneration: 74 """ 75 Class to signal that generation of a cached item should not proceed. 76 Holds a default value to return instead. 77 """ 78 def __init__(self, replacement): 79 self.replacement = replacement
Class to signal that generation of a cached item should not proceed. Holds a default value to return instead.
82class NotInCache: 83 """ 84 Placeholder for recognizing that a value is not in the cache (when 85 e.g., None might be a valid cache value). 86 """ 87 pass
Placeholder for recognizing that a value is not in the cache (when e.g., None might be a valid cache value).
146def set_new_cached_value(lock, cache, cache_key, value): 147 """ 148 Directly overrides a cached value, inserting a new value timestamped 149 when this function is called. 150 """ 151 with lock: 152 # Create timestamp first 153 ts = time.time() 154 # Store new value in cache: 155 cache[cache_key] = (ts, copy.deepcopy(value))
Directly overrides a cached value, inserting a new value timestamped when this function is called.
158def cache_key_for(target, view): 159 """ 160 Builds a hybrid cache key value with a certain target and view. 161 """ 162 return target + "::" + view.__name__
Builds a hybrid cache key value with a certain target and view.
165def cache_key_filename(cache_key): 166 """ 167 Returns just the filename given a cache key. 168 """ 169 if '::' not in cache_key: 170 raise ValueError("Value '{}' is not a cache key!".format(cache_key)) 171 return '::'.join(cache_key.split('::')[:-1])
Returns just the filename given a cache key.
178class View: 179 """ 180 Abstract View class to organize decoding/encoding of views. Each View 181 must define encode and decode class methods which are each others' 182 inverse. The class name is used as part of the cache key. 183 """ 184 @staticmethod 185 def encode(obj): 186 """ 187 The encode function of a View must return a string (to be written 188 to a file). 189 """ 190 raise NotImplementedError("Don't use the base View class.") 191 192 @staticmethod 193 def decode(string): 194 """ 195 The encode function of a View must accept a string, and if given 196 a string produced by encode, should return an equivalent object. 197 """ 198 raise NotImplementedError("Don't use the base View class.")
Abstract View class to organize decoding/encoding of views. Each View must define encode and decode class methods which are each others' inverse. The class name is used as part of the cache key.
184 @staticmethod 185 def encode(obj): 186 """ 187 The encode function of a View must return a string (to be written 188 to a file). 189 """ 190 raise NotImplementedError("Don't use the base View class.")
The encode function of a View must return a string (to be written to a file).
192 @staticmethod 193 def decode(string): 194 """ 195 The encode function of a View must accept a string, and if given 196 a string produced by encode, should return an equivalent object. 197 """ 198 raise NotImplementedError("Don't use the base View class.")
The encode function of a View must accept a string, and if given a string produced by encode, should return an equivalent object.
201class AsIs(View): 202 """ 203 A pass-through view that returns strings unaltered. 204 """ 205 @staticmethod 206 def encode(obj): 207 """Returns the object it is given unaltered.""" 208 return obj 209 210 @staticmethod 211 def decode(string): 212 """Returns the string it is given unaltered.""" 213 return string
A pass-through view that returns strings unaltered.
216class AsJSON(View): 217 """ 218 A view that converts objects to JSON for file storage and back on 219 access. It passes through None. 220 """ 221 @staticmethod 222 def encode(obj): 223 """Returns the JSON encoding of the object.""" 224 return json.dumps(obj) 225 226 @staticmethod 227 def decode(string): 228 """ 229 Returns a JSON object parsed from the string. 230 Returns None if it gets None. 231 """ 232 if string is None: 233 return None 234 return json.loads(string)
A view that converts objects to JSON for file storage and back on access. It passes through None.
221 @staticmethod 222 def encode(obj): 223 """Returns the JSON encoding of the object.""" 224 return json.dumps(obj)
Returns the JSON encoding of the object.
226 @staticmethod 227 def decode(string): 228 """ 229 Returns a JSON object parsed from the string. 230 Returns None if it gets None. 231 """ 232 if string is None: 233 return None 234 return json.loads(string)
Returns a JSON object parsed from the string. Returns None if it gets None.
237def build_view(name, encoder, decoder, pass_none=True): 238 """ 239 Function for building a view given a name, an encoding function, and 240 a decoding function. Unless pass_none is given as False, the decoder 241 will be skipped if the decode argument is None and the None will pass 242 through, in which case the decoder will *always* get a string as an 243 argument. 244 """ 245 class SyntheticView(View): 246 """ 247 View class created using build_view. 248 """ 249 @staticmethod 250 def encode(obj): 251 return encoder(obj) 252 253 @staticmethod 254 def decode(string): 255 if pass_none and string is None: 256 return None 257 return decoder(string) 258 259 SyntheticView.__name__ = name 260 SyntheticView.__doc__ = ( 261 "View that uses '{}' for encoding and '{}' for decoding." 262 ).format(encoder.__name__, decoder.__name__) 263 SyntheticView.encode.__doc__ = encoder.__doc__ 264 SyntheticView.decode.__doc__ = decoder.__doc__ 265 266 return SyntheticView
Function for building a view given a name, an encoding function, and a decoding function. Unless pass_none is given as False, the decoder will be skipped if the decode argument is None and the None will pass through, in which case the decoder will always get a string as an argument.
273def build_or_get_singleton(cache_key, builder): 274 """ 275 Caches the result of the given builder function under the given cache 276 key, always returning the cached result once it's been created. 277 This is thread-safe, but each singleton is unique to the process that 278 created it. 279 """ 280 return _gen_or_get_cached( 281 _CACHE_LOCK, 282 _CACHE, 283 cache_key, 284 lambda key, ts: ts is not None, 285 builder 286 )
Caches the result of the given builder function under the given cache key, always returning the cached result once it's been created. This is thread-safe, but each singleton is unique to the process that created it.
293def build_file_freshness_checker(missing=Exception): 294 """ 295 Builds a freshness checker that checks the mtime of a filename, but 296 if that file doesn't exist, it returns AbortGeneration with the given 297 missing value (unless missing is left as the default of Exception, in 298 which case it lets the exception bubble out). 299 """ 300 def check_file_is_changed(cache_key, ts): 301 """ 302 Checks whether a file has been modified more recently than the given 303 timestamp. 304 """ 305 filename = cache_key_filename(cache_key) 306 try: 307 mtime = os.path.getmtime(filename) 308 except OSError_or_FileNotFoundError: 309 if missing == Exception: 310 raise 311 else: 312 return AbortGeneration(missing) 313 314 # File is changed if the mtime is after the given cache 315 # timestamp, or if the timestamp is None 316 return ts is None or mtime >= ts 317 318 return check_file_is_changed
Builds a freshness checker that checks the mtime of a filename, but if that file doesn't exist, it returns AbortGeneration with the given missing value (unless missing is left as the default of Exception, in which case it lets the exception bubble out).
321def build_file_reader(view=AsJSON): 322 """ 323 Builds a file reader function which returns the result of the given 324 view on the file contents. 325 """ 326 def read_file(cache_key): 327 """ 328 Reads a file and returns the result of calling a view's decode 329 function on the file contents. Returns None if there's an error, 330 and prints the error unless it's a FileNotFoundError. 331 """ 332 filename = cache_key_filename(cache_key) 333 try: 334 with open(filename, 'r') as fin: 335 return view.decode(fin.read()) 336 except IOError_or_FileNotFoundError: 337 return None 338 except Exception as e: 339 sys.stderr.write( 340 "[sync module] Exception viewing file:\n" + str(e) + '\n' 341 ) 342 return None 343 344 return read_file
Builds a file reader function which returns the result of the given view on the file contents.
361def load_or_get_cached( 362 filename, 363 view=AsJSON, 364 missing=Exception 365): 366 """ 367 Loads the contents of the given file and returns the result of the 368 given view function on it (or the contents as a string if the view is 369 None). It first checks the modification time and returns a cached 370 copy of the view result if one exists. Uses a lock for thread safety. 371 The default view parses file contents as JSON and returns the 372 resulting object. If the file is missing, an exception would normally 373 be raised, but if the missing value is provided as something other 374 than Exception, a deep copy of that value will be returned instead. 375 The __name__ of the view class will be used to compute a cache key 376 for that view; avoid view name collisions. 377 378 Note: On a cache hit, a deep copy of the cached value is returned, so 379 modifying that value should not affect what is stored in the cache. 380 """ 381 382 # Figure out our view object (& cache key): 383 if view is None: 384 view = AsIs 385 386 cache_key = cache_key_for(filename, view) 387 388 # Build functions for checking freshness and reading the file 389 check_mtime = build_file_freshness_checker(missing) 390 read_file = build_file_reader(view) 391 392 return _gen_or_get_cached( 393 _CACHE_LOCK, 394 _CACHE, 395 cache_key, 396 check_mtime, 397 read_file 398 )
Loads the contents of the given file and returns the result of the given view function on it (or the contents as a string if the view is None). It first checks the modification time and returns a cached copy of the view result if one exists. Uses a lock for thread safety. The default view parses file contents as JSON and returns the resulting object. If the file is missing, an exception would normally be raised, but if the missing value is provided as something other than Exception, a deep copy of that value will be returned instead. The __name__ of the view class will be used to compute a cache key for that view; avoid view name collisions.
Note: On a cache hit, a deep copy of the cached value is returned, so modifying that value should not affect what is stored in the cache.
413def read_file( 414 filename, 415 view=AsJSON, 416 missing=Exception, 417 cache=True 418): 419 """ 420 Reads from a file, operating via proxy through a manager so that 421 multiple threads/processes' requests will happen sequentially to 422 avoid simultaneous reads/writes to any file. Returns the string 423 contents of the file, or None if the file does not exist or there's 424 some other error in accessing the file. If there's an error other 425 than the file not existing, an error message will be printed to 426 stderr. 427 428 This model is pretty draconian and sets up file access as competitive 429 across all files, so only use it for files that may be written to by 430 the server; read-only files can use load_or_get_cached instead. This 431 function does set up a local in-process cache and check file mtimes, 432 to avoid the entire proxy call if it can (only if cache is True). 433 434 The arguments behave as follows: 435 436 filename: The full path to the file to be accessed. 437 view: A View object which can encode and decode strings. AsJSON 438 is the default, which converts strings in files to objects 439 using JSON. The view's decode function is applied to file 440 contents before they are cached. Using two different views 441 with the same __name__ will cause incorrect cached values to 442 be returned. 443 missing: A value to be returned (but not cached or written to 444 disk) if the file does not exist. Leave it as Exception and 445 whatever exception causes os.path.getmtime to fail will be 446 allowed to bubble out. 447 cache: True by default, results will be cached, and the 448 file-access-via-proxy chain of events will be skipped 449 entirely if the cache value is newer than the file on disk. 450 Set to False for large and/or constantly changing files where 451 you expect this to be a waste of time or space. The cache is 452 per-process but shared between threads. 453 """ 454 455 # Figure out our view function (& cache key just in case): 456 if view is None: 457 view = AsIs 458 459 # Define our reader function which incorporates our view: 460 def read_file_contents(cache_key): 461 """ 462 File reading function that accesses file contents through an 463 _ACCESS proxy. Applies a view function. Returns None if the view 464 function encounters an error (and prints that error to stderr). 465 """ 466 filename = cache_key_filename(cache_key) 467 return view.decode(_ACCESS.read_file(filename)) 468 469 if cache: # If we're caching, do that: 470 cache_key = cache_key_for(filename, view) 471 check_mtime = build_file_freshness_checker(missing) 472 473 return _gen_or_get_cached( 474 _ACCESS_LOCK, 475 _ACCESS_CACHE, 476 cache_key, 477 check_mtime, 478 read_file_contents 479 ) 480 481 else: # Otherwise, just call our reader function 482 # But first check whether the file exists... 483 if not os.path.exists(filename): 484 if missing == Exception: 485 raise OSError("File not found: '{}'".format(filename)) 486 else: 487 return missing 488 else: 489 return read_file_contents(cache_key_for(filename, view))
Reads from a file, operating via proxy through a manager so that multiple threads/processes' requests will happen sequentially to avoid simultaneous reads/writes to any file. Returns the string contents of the file, or None if the file does not exist or there's some other error in accessing the file. If there's an error other than the file not existing, an error message will be printed to stderr.
This model is pretty draconian and sets up file access as competitive across all files, so only use it for files that may be written to by the server; read-only files can use load_or_get_cached instead. This function does set up a local in-process cache and check file mtimes, to avoid the entire proxy call if it can (only if cache is True).
The arguments behave as follows:
filename: The full path to the file to be accessed.
view: A View object which can encode and decode strings. AsJSON
is the default, which converts strings in files to objects
using JSON. The view's decode function is applied to file
contents before they are cached. Using two different views
with the same __name__ will cause incorrect cached values to
be returned.
missing: A value to be returned (but not cached or written to
disk) if the file does not exist. Leave it as Exception and
whatever exception causes os.path.getmtime to fail will be
allowed to bubble out.
cache: True by default, results will be cached, and the
file-access-via-proxy chain of events will be skipped
entirely if the cache value is newer than the file on disk.
Set to False for large and/or constantly changing files where
you expect this to be a waste of time or space. The cache is
per-process but shared between threads.
492def write_file(filename, content, view=AsJSON, cache=True): 493 """ 494 Writes the given content to the given file, encoding the content 495 using the encode function of the given view. If cache is True, 496 updates the _ACCESS_CACHE with the new file contents to avoid 497 unnecessary round trips to disk. The original content, not the 498 encoded string, will be cached. The view may be given as None to 499 write the content directly to the file as a string. 500 """ 501 if view is None: 502 view = AsIs 503 504 file_content = view.encode(content) 505 506 _ACCESS.write_file(filename, file_content) 507 508 if cache: 509 cache_key = cache_key_for(filename, view) 510 set_new_cached_value( 511 _ACCESS_LOCK, 512 _ACCESS_CACHE, 513 cache_key, 514 content 515 )
Writes the given content to the given file, encoding the content using the encode function of the given view. If cache is True, updates the _ACCESS_CACHE with the new file contents to avoid unnecessary round trips to disk. The original content, not the encoded string, will be cached. The view may be given as None to write the content directly to the file as a string.
564class FileSystemManager(multiprocessing.managers.BaseManager): 565 """ 566 Custom manager class for managing read/write file system access from 567 multiple threads. Go use a database instead! 568 """ 569 pass
Custom manager class for managing read/write file system access from multiple threads. Go use a database instead!
722 def temp(self, /, *args, **kwds): 723 util.debug('requesting creation of a shared %r object', typeid) 724 token, exp = self._create(typeid, *args, **kwds) 725 proxy = proxytype( 726 token, self._serializer, manager=self, 727 authkey=self._authkey, exposed=exp 728 ) 729 conn = self._Client(token.address, authkey=self._authkey) 730 dispatch(conn, None, 'decref', (token.id,)) 731 return proxy
Inherited Members
- multiprocessing.managers.BaseManager
- BaseManager
- get_server
- connect
- start
- join
- register
579def init_sync(port=51723, key=None): 580 """ 581 init_sync should be called once per process, ideally early in the 582 life of the process, like right after importing the sync module. 583 Calling access_file before init_sync will fail. A file named 584 'syncauth' should exist unless a key is given (should be a 585 byte-string). If 'syncauth' doesn't exist, it will be created. Don't 586 rely too heavily on the default port, since any two apps using sync 587 with the default port on the same machine will collide. 588 """ 589 global _MANAGER, _ACCESS 590 # Get synchronization key: 591 if key is None: 592 try: 593 if os.path.exists('syncauth'): 594 with open('syncauth', 'rb') as fin: 595 key = fin.read() 596 else: 597 print("Creating new sync secret file 'syncauth'.") 598 key = os.urandom(16) 599 with open('syncauth', 'wb') as fout: 600 fout.write(key) 601 except Exception: 602 raise IOError_or_FileNotFoundError( 603 "Unable to access 'syncauth' file.\n" 604 + " Create it and put some random bytes in there, or pass" 605 + " key= to init_sync." 606 ) 607 608 _MANAGER = FileSystemManager(address=('localhost', port), authkey=key) 609 # Attempt to connect; if that fails, attempt to start a new manager 610 # if that fails (due to simultaneous starts and one wins the port 611 # that's not us) attempt to connect again. Repeat until we connect. 612 while True: 613 print("Attempting to connect to MP manager...") 614 615 # Reduce multiprocessing's connection timeout value since we're 616 # pretty sure we'll fail first and succeed later 617 old_init_timeout = multiprocessing.connection._init_timeout 618 multiprocessing.connection._init_timeout = lambda t=2.0: ( 619 time.time() + t 620 ) 621 try: 622 # NOTE: This call may just never return if we accidentally 623 # connect to e.g., an HTTP server. 624 _MANAGER.connect() # Check your port settings! 625 print("...connected successfully.") 626 break # only way out of loop is to connect successfully 627 except ConnectionRefusedError: # Nobody listening on that port 628 pass 629 except AuthenticationError: 630 raise ValueError( 631 "Your authkey is not correct. Make sure you're not" 632 + " sharing the port you chose!" 633 ) 634 print("...failed to connect...") 635 636 # Restore old timeout function 637 multiprocessing.connection._init_timeout = old_init_timeout 638 639 # We didn't get to connect? Let's try to start a manager: 640 try: 641 print("...starting manager process...") 642 _MANAGER.start() 643 print("...finished starting manager...") 644 except EOFError: # Error generated if port is in use 645 pass 646 except RuntimeError: 647 # Error generated as of 3.8 when attempting to start the 648 # manager during the post-fork module reloading phase of a 649 # manager sub-process. 650 return 651 652 # Sleep for quite a while to try to avoid calling _MANAGER.start 653 # multiple times before the first call can bind the port. 654 time.sleep(0.2) 655 656 # And now we'll return to the top of the loop and attempt to 657 # connect again. 658 659 # At this point, _MANAGER is a connected manager for this process, so 660 # we're done. 661 662 # Set up _ACCESS as a proxy object: 663 _ACCESS = _MANAGER.get() 664 665 if _ACCESS is None: 666 raise ValueError("_MANAGER.get() returned None")
init_sync should be called once per process, ideally early in the life of the process, like right after importing the sync module. Calling access_file before init_sync will fail. A file named 'syncauth' should exist unless a key is given (should be a byte-string). If 'syncauth' doesn't exist, it will be created. Don't rely too heavily on the default port, since any two apps using sync with the default port on the same machine will collide.