potluck_server.sync

Simple synchronization & caching for Flask apps.

sync.py

Implements simple/inefficient shared state through the filesystem. For read-only access w/ caching to reduce disk overhead, provides load_or_get_cached: a thread-safe way of loading from a file (if multiple WSGI processes are used, each will have its own cache). For read/write access to control application global state, access_state_file is provided: a process-safe method (using a Manager that gets started automatically) to read or write file contents. By default, both functions are JSON-in, JSON-out, but raw strings can be requested instead.

  1"""
  2Simple synchronization & caching for Flask apps.
  3
  4sync.py
  5
  6Implements simple/inefficient shared state through the filesystem. For
  7read-only access w/ caching to reduce disk overhead, provides
  8load_or_get_cached: a thread-safe way of loading from a file (if multiple
  9WSGI processes are used, each will have its own cache). For read/write
 10access to control application global state, access_state_file is
 11provided: a process-safe method (using a Manager that gets started
 12automatically) to read or write file contents. By default, both functions
 13are JSON-in, JSON-out, but raw strings can be requested instead.
 14"""
 15
 16__version__ = "0.9.1"
 17
 18from flask import json
 19import os, sys, copy, time
 20import threading
 21import multiprocessing, multiprocessing.connection, multiprocessing.managers
 22
 23# Python 2/3 dual compatibility
 24if sys.version_info[0] < 3:
 25    reload(sys) # noqa F821
 26    sys.setdefaultencoding('utf-8')
 27    import socket
 28    ConnectionRefusedError = socket.error
 29    IOError_or_FileNotFoundError = IOError
 30    OSError_or_FileNotFoundError = OSError
 31    AuthenticationError = multiprocessing.AuthenticationError
 32else:
 33    IOError_or_FileNotFoundError = FileNotFoundError
 34    OSError_or_FileNotFoundError = FileNotFoundError
 35    AuthenticationError = multiprocessing.context.AuthenticationError
 36
 37# Python 2.7.5 multiprocessing bug workaround
 38# Bug thread of same behavior:
 39# https://bugzilla.redhat.com/show_bug.cgi?id=1717330
 40# Python bugfix:
 41# https://github.com/python/cpython/commit/e8a57b98ec8f2b161d4ad68ecc1433c9e3caad57
 42# Source for this workaround: https://github.com/oamg/leapp/pull/533/files
 43
 44# Implements:
 45# https://github.com/python/cpython/commit/e8a57b98ec8f2b161d4ad68ecc1433c9e3caad57
 46#
 47# Detection of fix: os imported to compare pids, before the fix os has not
 48# been imported
 49import multiprocessing.util
 50if getattr(multiprocessing.util, 'os', None):
 51    # we're on a version that has the fix; no need to apply it
 52    pass
 53else:
 54    # we'd better apply the fix
 55    class FixedFinalize(multiprocessing.util.Finalize):
 56        def __init__(self, *args, **kwargs):
 57            super(FixedFinalize, self).__init__(*args, **kwargs)
 58            self._pid = os.getpid()
 59
 60        def __call__(self, *args, **kwargs):
 61            if self._pid != os.getpid():
 62                return None
 63            return super(FixedFinalize, self).__call__(*args, **kwargs)
 64
 65    setattr(multiprocessing.util, 'Finalize', FixedFinalize)
 66
 67
 68#----------------------#
 69# Core caching routine #
 70#----------------------#
 71
 72class AbortGeneration:
 73    """
 74    Class to signal that generation of a cached item should not proceed.
 75    Holds a default value to return instead.
 76    """
 77    def __init__(self, replacement):
 78        self.replacement = replacement
 79
 80
 81class NotInCache:
 82    """
 83    Placeholder for recognizing that a value is not in the cache (when
 84    e.g., None might be a valid cache value).
 85    """
 86    pass
 87
 88
 89def _gen_or_get_cached(lock, cache, cache_key, check_dirty, result_generator):
 90    """
 91    Common functionality that uses a reentrant lock and a cache
 92    dictionary to return a cached value if the cached value is fresh. The
 93    value from the cache is deep-copied before being returned, so that
 94    any modifications to the returned value shouldn't alter the cache.
 95    Parameters are:
 96
 97        lock: Specifies the lock to use. Should be a threading.RLock.
 98        cache: The cache dictionary.
 99        cache_key: String key for this cache item.
100        check_dirty: Function which will be given the cache_key and a
101            timestamp and must return True if the cached value (created
102            at that instant) is dirty (needs to be updated) and False
103            otherwise. May also return an AbortGeneration instance with a
104            default value inside to be returned directly. If there is no
105            cached value, check_dirty will be given a timestamp of None.
106        result_generator: Function to call to build a new result if the
107            cached value is stale. This new result will be cached. It
108            will be given the cache_key as a parameter.
109    """
110    with lock:
111        # We need to read the file contents and return them.
112        cache_ts, cached = cache.get(cache_key, (None, NotInCache))
113        safe_cached = copy.deepcopy(cached)
114
115    # Get mtime of file, or find out it's missing. No point in
116    # caching the missing value, because the mtime check the
117    # next time will hit the same branch.
118    is_dirty = check_dirty(cache_key, cache_ts)
119    if isinstance(is_dirty, AbortGeneration):
120        # check_fresh calls for an abort: return replacement value
121        return is_dirty.replacement
122    elif not is_dirty and cached != NotInCache:
123        # Cache is fresh: return cached value
124        return safe_cached
125    else:
126        # Cache is stale
127
128        # Get timestamp before we even start generating value:
129        ts = time.time()
130
131        # Generate reuslt:
132        result = result_generator(cache_key)
133
134        # Safely store new result value + timestamp in cache:
135        with lock:
136            cache[cache_key] = (ts, result)
137            # Don't allow outside code to mess with internals of
138            # cached value (JSON results could be mutable):
139            safe_result = copy.deepcopy(result)
140
141        # Return fresh deep copy of cached value:
142        return safe_result
143
144
145def set_new_cached_value(lock, cache, cache_key, value):
146    """
147    Directly overrides a cached value, inserting a new value timestamped
148    when this function is called.
149    """
150    with lock:
151        # Create timestamp first
152        ts = time.time()
153        # Store new value in cache:
154        cache[cache_key] = (ts, copy.deepcopy(value))
155
156
157def cache_key_for(target, view):
158    """
159    Builds a hybrid cache key value with a certain target and view.
160    """
161    return target + "::" + view.__name__
162
163
164def cache_key_filename(cache_key):
165    """
166    Returns just the filename given a cache key.
167    """
168    if '::' not in cache_key:
169        raise ValueError("Value '{}' is not a cache key!".format(cache_key))
170    return '::'.join(cache_key.split('::')[:-1])
171
172
173#----------------------------------#
174# View objects for managing caches #
175#----------------------------------#
176
177class View:
178    """
179    Abstract View class to organize decoding/encoding of views. Each View
180    must define encode and decode class methods which are each others'
181    inverse. The class name is used as part of the cache key.
182    """
183    @staticmethod
184    def encode(obj):
185        """
186        The encode function of a View must return a string (to be written
187        to a file).
188        """
189        raise NotImplementedError("Don't use the base View class.")
190
191    @staticmethod
192    def decode(string):
193        """
194        The encode function of a View must accept a string, and if given
195        a string produced by encode, should return an equivalent object.
196        """
197        raise NotImplementedError("Don't use the base View class.")
198
199
200class AsIs(View):
201    """
202    A pass-through view that returns strings unaltered.
203    """
204    @staticmethod
205    def encode(obj):
206        """Returns the object it is given unaltered."""
207        return obj
208
209    @staticmethod
210    def decode(string):
211        """Returns the string it is given unaltered."""
212        return string
213
214
215class AsJSON(View):
216    """
217    A view that converts objects to JSON for file storage and back on
218    access. It passes through None.
219    """
220    @staticmethod
221    def encode(obj):
222        """Returns the JSON encoding of the object."""
223        return json.dumps(obj)
224
225    @staticmethod
226    def decode(string):
227        """
228        Returns a JSON object parsed from the string.
229        Returns None if it gets None.
230        """
231        if string is None:
232            return None
233        return json.loads(string)
234
235
236def build_view(name, encoder, decoder, pass_none=True):
237    """
238    Function for building a view given a name, an encoding function, and
239    a decoding function. Unless pass_none is given as False, the decoder
240    will be skipped if the decode argument is None and the None will pass
241    through, in which case the decoder will *always* get a string as an
242    argument.
243    """
244    class SyntheticView(View):
245        """
246        View class created using build_view.
247        """
248        @staticmethod
249        def encode(obj):
250            return encoder(obj)
251
252        @staticmethod
253        def decode(string):
254            if pass_none and string is None:
255                return None
256            return decoder(string)
257
258    SyntheticView.__name__ = name
259    SyntheticView.__doc__ = (
260        "View that uses '{}' for encoding and '{}' for decoding."
261    ).format(encoder.__name__, decoder.__name__)
262    SyntheticView.encode.__doc__ = encoder.__doc__
263    SyntheticView.decode.__doc__ = decoder.__doc__
264
265    return SyntheticView
266
267
268#--------------------------#
269# Process-local singletons #
270#--------------------------#
271
272def build_or_get_singleton(cache_key, builder):
273    """
274    Caches the result of the given builder function under the given cache
275    key, always returning the cached result once it's been created.
276    This is thread-safe, but each singleton is unique to the process that
277    created it.
278    """
279    return _gen_or_get_cached(
280        _CACHE_LOCK,
281        _CACHE,
282        cache_key,
283        lambda key, ts: ts is not None,
284        builder
285    )
286
287
288#--------------------------------------#
289# Process-local read-only file caching #
290#--------------------------------------#
291
292def build_file_freshness_checker(missing=Exception):
293    """
294    Builds a freshness checker that checks the mtime of a filename, but
295    if that file doesn't exist, it returns AbortGeneration with the given
296    missing value (unless missing is left as the default of Exception, in
297    which case it lets the exception bubble out).
298    """
299    def check_file_is_changed(cache_key, ts):
300        """
301        Checks whether a file has been modified more recently than the given
302        timestamp.
303        """
304        filename = cache_key_filename(cache_key)
305        try:
306            mtime = os.path.getmtime(filename)
307        except OSError_or_FileNotFoundError:
308            if missing == Exception:
309                raise
310            else:
311                return AbortGeneration(missing)
312
313        # File is changed if the mtime is after the given cache
314        # timestamp, or if the timestamp is None
315        return ts is None or mtime >= ts
316
317    return check_file_is_changed
318
319
320def build_file_reader(view=AsJSON):
321    """
322    Builds a file reader function which returns the result of the given
323    view on the file contents.
324    """
325    def read_file(cache_key):
326        """
327        Reads a file and returns the result of calling a view's decode
328        function on the file contents. Returns None if there's an error,
329        and prints the error unless it's a FileNotFoundError.
330        """
331        filename = cache_key_filename(cache_key)
332        try:
333            with open(filename, 'r') as fin:
334                return view.decode(fin.read())
335        except IOError_or_FileNotFoundError:
336            return None
337        except Exception as e:
338            sys.stderr.write(
339                "[sync module] Exception viewing file:\n" + str(e) + '\n'
340            )
341            return None
342
343    return read_file
344
345
346# Note: by using a threading.RLock and a global variable here, we are not
347# process-safe, which is fine, because this is just a cache: each process
348# in a multi-process environment can safely maintain its own cache which
349# will waste a bit of memory but not lead to corruption. As a corollary,
350# load_or_get_cached should be treated as read-only, and
351# access_state_file should be used to access a file which needs to have
352# consistent state viewable by potentially multiple processes.
353
354# TODO: We should probably have some kind of upper limit on the cache
355# size, and maintain staleness so we can get rid of stale items...
356_CACHE_LOCK = threading.RLock() # Make this reentrant just in case...
357_CACHE = {}
358
359
360def load_or_get_cached(
361    filename,
362    view=AsJSON,
363    missing=Exception
364):
365    """
366    Loads the contents of the given file and returns the result of the
367    given view function on it (or the contents as a string if the view is
368    None). It first checks the modification time and returns a cached
369    copy of the view result if one exists. Uses a lock for thread safety.
370    The default view parses file contents as JSON and returns the
371    resulting object. If the file is missing, an exception would normally
372    be raised, but if the missing value is provided as something other
373    than Exception, a deep copy of that value will be returned instead.
374    The __name__ of the view class will be used to compute a cache key
375    for that view; avoid view name collisions.
376
377    Note: On a cache hit, a deep copy of the cached value is returned, so
378    modifying that value should not affect what is stored in the cache.
379    """
380
381    # Figure out our view object (& cache key):
382    if view is None:
383        view = AsIs
384
385    cache_key = cache_key_for(filename, view)
386
387    # Build functions for checking freshness and reading the file
388    check_mtime = build_file_freshness_checker(missing)
389    read_file = build_file_reader(view)
390
391    return _gen_or_get_cached(
392        _CACHE_LOCK,
393        _CACHE,
394        cache_key,
395        check_mtime,
396        read_file
397    )
398
399
400#-------------------------------------#
401# Cross-process file access & caching #
402#-------------------------------------#
403
404# A lock and cache for access_file
405_ACCESS_LOCK = threading.RLock()
406_ACCESS_CACHE = {}
407
408# The _FileAccess proxy to use for file access:
409_ACCESS = None
410
411
412def read_file(
413    filename,
414    view=AsJSON,
415    missing=Exception,
416    cache=True
417):
418    """
419    Reads from a file, operating via proxy through a manager so that
420    multiple threads/processes' requests will happen sequentially to
421    avoid simultaneous reads/writes to any file. Returns the string
422    contents of the file, or None if the file does not exist or there's
423    some other error in accessing the file. If there's an error other
424    than the file not existing, an error message will be printed to
425    stderr.
426
427    This model is pretty draconian and sets up file access as competitive
428    across all files, so only use it for files that may be written to by
429    the server; read-only files can use load_or_get_cached instead. This
430    function does set up a local in-process cache and check file mtimes,
431    to avoid the entire proxy call if it can (only if cache is True).
432
433    The arguments behave as follows:
434
435        filename: The full path to the file to be accessed.
436        view: A View object which can encode and decode strings. AsJSON
437            is the default, which converts strings in files to objects
438            using JSON. The view's decode function is applied to file
439            contents before they are cached. Using two different views
440            with the same __name__ will cause incorrect cached values to
441            be returned.
442        missing: A value to be returned (but not cached or written to
443            disk) if the file does not exist. Leave it as Exception and
444            whatever exception causes os.path.getmtime to fail will be
445            allowed to bubble out.
446        cache: True by default, results will be cached, and the
447            file-access-via-proxy chain of events will be skipped
448            entirely if the cache value is newer than the file on disk.
449            Set to False for large and/or constantly changing files where
450            you expect this to be a waste of time or space. The cache is
451            per-process but shared between threads.
452    """
453
454    # Figure out our view function (& cache key just in case):
455    if view is None:
456        view = AsIs
457
458    # Define our reader function which incorporates our view:
459    def read_file_contents(cache_key):
460        """
461        File reading function that accesses file contents through an
462        _ACCESS proxy. Applies a view function. Returns None if the view
463        function encounters an error (and prints that error to stderr).
464        """
465        filename = cache_key_filename(cache_key)
466        return view.decode(_ACCESS.read_file(filename))
467
468    if cache: # If we're caching, do that:
469        cache_key = cache_key_for(filename, view)
470        check_mtime = build_file_freshness_checker(missing)
471
472        return _gen_or_get_cached(
473            _ACCESS_LOCK,
474            _ACCESS_CACHE,
475            cache_key,
476            check_mtime,
477            read_file_contents
478        )
479
480    else: # Otherwise, just call our reader function
481        # But first check whether the file exists...
482        if not os.path.exists(filename):
483            if missing == Exception:
484                raise OSError("File not found: '{}'".format(filename))
485            else:
486                return missing
487        else:
488            return read_file_contents(cache_key_for(filename, view))
489
490
491def write_file(filename, content, view=AsJSON, cache=True):
492    """
493    Writes the given content to the given file, encoding the content
494    using the encode function of the given view. If cache is True,
495    updates the _ACCESS_CACHE with the new file contents to avoid
496    unnecessary round trips to disk. The original content, not the
497    encoded string, will be cached. The view may be given as None to
498    write the content directly to the file as a string.
499    """
500    if view is None:
501        view = AsIs
502
503    file_content = view.encode(content)
504
505    _ACCESS.write_file(filename, file_content)
506
507    if cache:
508        cache_key = cache_key_for(filename, view)
509        set_new_cached_value(
510            _ACCESS_LOCK,
511            _ACCESS_CACHE,
512            cache_key,
513            content
514        )
515
516
517class _FileAccessor:
518    """
519    A class for accessing files. Meant to be used via Proxy, with a
520    single Manager having the real instance so that all access is
521    sequential and we don't have conflicting writes or partial reads.
522    Instead of instantiating this class, use FileSystemManager.get (see
523    below).
524    """
525    def __init__(self):
526        # This... really isn't necessary, surely. But just in case the
527        # Manager uses multiple threads...
528        self.lock = threading.RLock()
529
530    def read_file(self, filename):
531        """
532        Reads file contents as a string, returning None if the file
533        doesn't exist or there's some other error (e.g., encoding issue).
534        If the file doesn't exist, no error is printed, but otherwise the
535        error messages is printed to stderr.
536        """
537        with self.lock:
538            try:
539                with open(filename, 'r') as fin:
540                    return fin.read()
541            except IOError_or_FileNotFoundError:
542                return None
543            except Exception as e:
544                sys.stderr.write("Exception reading file:\n" + str(e) + '\n')
545                return None
546
547    def write_file(self, filename, content):
548        """
549        Writes the given content into the given file, replacing any
550        previous content.
551        TODO: What happens if there's an exception here? Does the manager
552        process crash entirely?
553        """
554        with self.lock:
555            with open(filename, 'w') as fin:
556                fin.write(content)
557
558
559#--------------------#
560# Manager management #
561#--------------------#
562
563class FileSystemManager(multiprocessing.managers.BaseManager):
564    """
565    Custom manager class for managing read/write file system access from
566    multiple threads. Go use a database instead!
567    """
568    pass
569
570
571FileSystemManager.register("get", _FileAccessor)
572
573
574# The manager connection for this process:
575_MANAGER = None
576
577
578def init_sync(port=51723, key=None):
579    """
580    init_sync should be called once per process, ideally early in the
581    life of the process, like right after importing the sync module.
582    Calling access_file before init_sync will fail. A file named
583    'syncauth' should exist unless a key is given (should be a
584    byte-string). If 'syncauth' doesn't exist, it will be created. Don't
585    rely too heavily on the default port, since any two apps using sync
586    with the default port on the same machine will collide.
587    """
588    global _MANAGER, _ACCESS
589    # Get synchronization key:
590    if key is None:
591        try:
592            if os.path.exists('syncauth'):
593                with open('syncauth', 'rb') as fin:
594                    key = fin.read()
595            else:
596                print("Creating new sync secret file 'syncauth'.")
597                key = os.urandom(16)
598                with open('syncauth', 'wb') as fout:
599                    fout.write(key)
600        except Exception:
601            raise IOError_or_FileNotFoundError(
602                "Unable to access 'syncauth' file.\n"
603              + "  Create it and put some random bytes in there, or pass"
604              + " key= to init_sync."
605            )
606
607    _MANAGER = FileSystemManager(address=('localhost', port), authkey=key)
608    # Attempt to connect; if that fails, attempt to start a new manager
609    # if that fails (due to simultaneous starts and one wins the port
610    # that's not us) attempt to connect again. Repeat until we connect.
611    while True:
612        print("Attempting to connect to MP manager...")
613
614        # Reduce multiprocessing's connection timeout value since we're
615        # pretty sure we'll fail first and succeed later
616        old_init_timeout = multiprocessing.connection._init_timeout
617        multiprocessing.connection._init_timeout = lambda t=2.0: (
618            time.time() + t
619        )
620        try:
621            # NOTE: This call may just never return if we accidentally
622            # connect to e.g., an HTTP server.
623            _MANAGER.connect() # Check your port settings!
624            print("...connected successfully.")
625            break # only way out of loop is to connect successfully
626        except ConnectionRefusedError: # Nobody listening on that port
627            pass
628        except AuthenticationError:
629            raise ValueError(
630                "Your authkey is not correct. Make sure you're not"
631              + " sharing the port you chose!"
632            )
633        print("...failed to connect...")
634
635        # Restore old timeout function
636        multiprocessing.connection._init_timeout = old_init_timeout
637
638        # We didn't get to connect? Let's try to start a manager:
639        try:
640            print("...starting manager process...")
641            _MANAGER.start()
642            print("...finished starting manager...")
643        except EOFError: # Error generated if port is in use
644            pass
645        except RuntimeError:
646            # Error generated as of 3.8 when attempting to start the
647            # manager during the post-fork module reloading phase of a
648            # manager sub-process.
649            return
650
651        # Sleep for quite a while to try to avoid calling _MANAGER.start
652        # multiple times before the first call can bind the port.
653        time.sleep(0.2)
654
655        # And now we'll return to the top of the loop and attempt to
656        # connect again.
657
658    # At this point, _MANAGER is a connected manager for this process, so
659    # we're done.
660
661    # Set up _ACCESS as a proxy object:
662    _ACCESS = _MANAGER.get()
663
664    if _ACCESS is None:
665        raise ValueError("_MANAGER.get() returned None")
class AbortGeneration:
73class AbortGeneration:
74    """
75    Class to signal that generation of a cached item should not proceed.
76    Holds a default value to return instead.
77    """
78    def __init__(self, replacement):
79        self.replacement = replacement

Class to signal that generation of a cached item should not proceed. Holds a default value to return instead.

AbortGeneration(replacement)
78    def __init__(self, replacement):
79        self.replacement = replacement
class NotInCache:
82class NotInCache:
83    """
84    Placeholder for recognizing that a value is not in the cache (when
85    e.g., None might be a valid cache value).
86    """
87    pass

Placeholder for recognizing that a value is not in the cache (when e.g., None might be a valid cache value).

NotInCache()
def set_new_cached_value(lock, cache, cache_key, value):
146def set_new_cached_value(lock, cache, cache_key, value):
147    """
148    Directly overrides a cached value, inserting a new value timestamped
149    when this function is called.
150    """
151    with lock:
152        # Create timestamp first
153        ts = time.time()
154        # Store new value in cache:
155        cache[cache_key] = (ts, copy.deepcopy(value))

Directly overrides a cached value, inserting a new value timestamped when this function is called.

def cache_key_for(target, view):
158def cache_key_for(target, view):
159    """
160    Builds a hybrid cache key value with a certain target and view.
161    """
162    return target + "::" + view.__name__

Builds a hybrid cache key value with a certain target and view.

def cache_key_filename(cache_key):
165def cache_key_filename(cache_key):
166    """
167    Returns just the filename given a cache key.
168    """
169    if '::' not in cache_key:
170        raise ValueError("Value '{}' is not a cache key!".format(cache_key))
171    return '::'.join(cache_key.split('::')[:-1])

Returns just the filename given a cache key.

class View:
178class View:
179    """
180    Abstract View class to organize decoding/encoding of views. Each View
181    must define encode and decode class methods which are each others'
182    inverse. The class name is used as part of the cache key.
183    """
184    @staticmethod
185    def encode(obj):
186        """
187        The encode function of a View must return a string (to be written
188        to a file).
189        """
190        raise NotImplementedError("Don't use the base View class.")
191
192    @staticmethod
193    def decode(string):
194        """
195        The encode function of a View must accept a string, and if given
196        a string produced by encode, should return an equivalent object.
197        """
198        raise NotImplementedError("Don't use the base View class.")

Abstract View class to organize decoding/encoding of views. Each View must define encode and decode class methods which are each others' inverse. The class name is used as part of the cache key.

View()
@staticmethod
def encode(obj):
184    @staticmethod
185    def encode(obj):
186        """
187        The encode function of a View must return a string (to be written
188        to a file).
189        """
190        raise NotImplementedError("Don't use the base View class.")

The encode function of a View must return a string (to be written to a file).

@staticmethod
def decode(string):
192    @staticmethod
193    def decode(string):
194        """
195        The encode function of a View must accept a string, and if given
196        a string produced by encode, should return an equivalent object.
197        """
198        raise NotImplementedError("Don't use the base View class.")

The encode function of a View must accept a string, and if given a string produced by encode, should return an equivalent object.

class AsIs(View):
201class AsIs(View):
202    """
203    A pass-through view that returns strings unaltered.
204    """
205    @staticmethod
206    def encode(obj):
207        """Returns the object it is given unaltered."""
208        return obj
209
210    @staticmethod
211    def decode(string):
212        """Returns the string it is given unaltered."""
213        return string

A pass-through view that returns strings unaltered.

AsIs()
@staticmethod
def encode(obj):
205    @staticmethod
206    def encode(obj):
207        """Returns the object it is given unaltered."""
208        return obj

Returns the object it is given unaltered.

@staticmethod
def decode(string):
210    @staticmethod
211    def decode(string):
212        """Returns the string it is given unaltered."""
213        return string

Returns the string it is given unaltered.

class AsJSON(View):
216class AsJSON(View):
217    """
218    A view that converts objects to JSON for file storage and back on
219    access. It passes through None.
220    """
221    @staticmethod
222    def encode(obj):
223        """Returns the JSON encoding of the object."""
224        return json.dumps(obj)
225
226    @staticmethod
227    def decode(string):
228        """
229        Returns a JSON object parsed from the string.
230        Returns None if it gets None.
231        """
232        if string is None:
233            return None
234        return json.loads(string)

A view that converts objects to JSON for file storage and back on access. It passes through None.

AsJSON()
@staticmethod
def encode(obj):
221    @staticmethod
222    def encode(obj):
223        """Returns the JSON encoding of the object."""
224        return json.dumps(obj)

Returns the JSON encoding of the object.

@staticmethod
def decode(string):
226    @staticmethod
227    def decode(string):
228        """
229        Returns a JSON object parsed from the string.
230        Returns None if it gets None.
231        """
232        if string is None:
233            return None
234        return json.loads(string)

Returns a JSON object parsed from the string. Returns None if it gets None.

def build_view(name, encoder, decoder, pass_none=True):
237def build_view(name, encoder, decoder, pass_none=True):
238    """
239    Function for building a view given a name, an encoding function, and
240    a decoding function. Unless pass_none is given as False, the decoder
241    will be skipped if the decode argument is None and the None will pass
242    through, in which case the decoder will *always* get a string as an
243    argument.
244    """
245    class SyntheticView(View):
246        """
247        View class created using build_view.
248        """
249        @staticmethod
250        def encode(obj):
251            return encoder(obj)
252
253        @staticmethod
254        def decode(string):
255            if pass_none and string is None:
256                return None
257            return decoder(string)
258
259    SyntheticView.__name__ = name
260    SyntheticView.__doc__ = (
261        "View that uses '{}' for encoding and '{}' for decoding."
262    ).format(encoder.__name__, decoder.__name__)
263    SyntheticView.encode.__doc__ = encoder.__doc__
264    SyntheticView.decode.__doc__ = decoder.__doc__
265
266    return SyntheticView

Function for building a view given a name, an encoding function, and a decoding function. Unless pass_none is given as False, the decoder will be skipped if the decode argument is None and the None will pass through, in which case the decoder will always get a string as an argument.

def build_or_get_singleton(cache_key, builder):
273def build_or_get_singleton(cache_key, builder):
274    """
275    Caches the result of the given builder function under the given cache
276    key, always returning the cached result once it's been created.
277    This is thread-safe, but each singleton is unique to the process that
278    created it.
279    """
280    return _gen_or_get_cached(
281        _CACHE_LOCK,
282        _CACHE,
283        cache_key,
284        lambda key, ts: ts is not None,
285        builder
286    )

Caches the result of the given builder function under the given cache key, always returning the cached result once it's been created. This is thread-safe, but each singleton is unique to the process that created it.

def build_file_freshness_checker(missing=<class 'Exception'>):
293def build_file_freshness_checker(missing=Exception):
294    """
295    Builds a freshness checker that checks the mtime of a filename, but
296    if that file doesn't exist, it returns AbortGeneration with the given
297    missing value (unless missing is left as the default of Exception, in
298    which case it lets the exception bubble out).
299    """
300    def check_file_is_changed(cache_key, ts):
301        """
302        Checks whether a file has been modified more recently than the given
303        timestamp.
304        """
305        filename = cache_key_filename(cache_key)
306        try:
307            mtime = os.path.getmtime(filename)
308        except OSError_or_FileNotFoundError:
309            if missing == Exception:
310                raise
311            else:
312                return AbortGeneration(missing)
313
314        # File is changed if the mtime is after the given cache
315        # timestamp, or if the timestamp is None
316        return ts is None or mtime >= ts
317
318    return check_file_is_changed

Builds a freshness checker that checks the mtime of a filename, but if that file doesn't exist, it returns AbortGeneration with the given missing value (unless missing is left as the default of Exception, in which case it lets the exception bubble out).

def build_file_reader(view=<class 'potluck_server.sync.AsJSON'>):
321def build_file_reader(view=AsJSON):
322    """
323    Builds a file reader function which returns the result of the given
324    view on the file contents.
325    """
326    def read_file(cache_key):
327        """
328        Reads a file and returns the result of calling a view's decode
329        function on the file contents. Returns None if there's an error,
330        and prints the error unless it's a FileNotFoundError.
331        """
332        filename = cache_key_filename(cache_key)
333        try:
334            with open(filename, 'r') as fin:
335                return view.decode(fin.read())
336        except IOError_or_FileNotFoundError:
337            return None
338        except Exception as e:
339            sys.stderr.write(
340                "[sync module] Exception viewing file:\n" + str(e) + '\n'
341            )
342            return None
343
344    return read_file

Builds a file reader function which returns the result of the given view on the file contents.

def load_or_get_cached( filename, view=<class 'potluck_server.sync.AsJSON'>, missing=<class 'Exception'>):
361def load_or_get_cached(
362    filename,
363    view=AsJSON,
364    missing=Exception
365):
366    """
367    Loads the contents of the given file and returns the result of the
368    given view function on it (or the contents as a string if the view is
369    None). It first checks the modification time and returns a cached
370    copy of the view result if one exists. Uses a lock for thread safety.
371    The default view parses file contents as JSON and returns the
372    resulting object. If the file is missing, an exception would normally
373    be raised, but if the missing value is provided as something other
374    than Exception, a deep copy of that value will be returned instead.
375    The __name__ of the view class will be used to compute a cache key
376    for that view; avoid view name collisions.
377
378    Note: On a cache hit, a deep copy of the cached value is returned, so
379    modifying that value should not affect what is stored in the cache.
380    """
381
382    # Figure out our view object (& cache key):
383    if view is None:
384        view = AsIs
385
386    cache_key = cache_key_for(filename, view)
387
388    # Build functions for checking freshness and reading the file
389    check_mtime = build_file_freshness_checker(missing)
390    read_file = build_file_reader(view)
391
392    return _gen_or_get_cached(
393        _CACHE_LOCK,
394        _CACHE,
395        cache_key,
396        check_mtime,
397        read_file
398    )

Loads the contents of the given file and returns the result of the given view function on it (or the contents as a string if the view is None). It first checks the modification time and returns a cached copy of the view result if one exists. Uses a lock for thread safety. The default view parses file contents as JSON and returns the resulting object. If the file is missing, an exception would normally be raised, but if the missing value is provided as something other than Exception, a deep copy of that value will be returned instead. The __name__ of the view class will be used to compute a cache key for that view; avoid view name collisions.

Note: On a cache hit, a deep copy of the cached value is returned, so modifying that value should not affect what is stored in the cache.

def read_file( filename, view=<class 'potluck_server.sync.AsJSON'>, missing=<class 'Exception'>, cache=True):
413def read_file(
414    filename,
415    view=AsJSON,
416    missing=Exception,
417    cache=True
418):
419    """
420    Reads from a file, operating via proxy through a manager so that
421    multiple threads/processes' requests will happen sequentially to
422    avoid simultaneous reads/writes to any file. Returns the string
423    contents of the file, or None if the file does not exist or there's
424    some other error in accessing the file. If there's an error other
425    than the file not existing, an error message will be printed to
426    stderr.
427
428    This model is pretty draconian and sets up file access as competitive
429    across all files, so only use it for files that may be written to by
430    the server; read-only files can use load_or_get_cached instead. This
431    function does set up a local in-process cache and check file mtimes,
432    to avoid the entire proxy call if it can (only if cache is True).
433
434    The arguments behave as follows:
435
436        filename: The full path to the file to be accessed.
437        view: A View object which can encode and decode strings. AsJSON
438            is the default, which converts strings in files to objects
439            using JSON. The view's decode function is applied to file
440            contents before they are cached. Using two different views
441            with the same __name__ will cause incorrect cached values to
442            be returned.
443        missing: A value to be returned (but not cached or written to
444            disk) if the file does not exist. Leave it as Exception and
445            whatever exception causes os.path.getmtime to fail will be
446            allowed to bubble out.
447        cache: True by default, results will be cached, and the
448            file-access-via-proxy chain of events will be skipped
449            entirely if the cache value is newer than the file on disk.
450            Set to False for large and/or constantly changing files where
451            you expect this to be a waste of time or space. The cache is
452            per-process but shared between threads.
453    """
454
455    # Figure out our view function (& cache key just in case):
456    if view is None:
457        view = AsIs
458
459    # Define our reader function which incorporates our view:
460    def read_file_contents(cache_key):
461        """
462        File reading function that accesses file contents through an
463        _ACCESS proxy. Applies a view function. Returns None if the view
464        function encounters an error (and prints that error to stderr).
465        """
466        filename = cache_key_filename(cache_key)
467        return view.decode(_ACCESS.read_file(filename))
468
469    if cache: # If we're caching, do that:
470        cache_key = cache_key_for(filename, view)
471        check_mtime = build_file_freshness_checker(missing)
472
473        return _gen_or_get_cached(
474            _ACCESS_LOCK,
475            _ACCESS_CACHE,
476            cache_key,
477            check_mtime,
478            read_file_contents
479        )
480
481    else: # Otherwise, just call our reader function
482        # But first check whether the file exists...
483        if not os.path.exists(filename):
484            if missing == Exception:
485                raise OSError("File not found: '{}'".format(filename))
486            else:
487                return missing
488        else:
489            return read_file_contents(cache_key_for(filename, view))

Reads from a file, operating via proxy through a manager so that multiple threads/processes' requests will happen sequentially to avoid simultaneous reads/writes to any file. Returns the string contents of the file, or None if the file does not exist or there's some other error in accessing the file. If there's an error other than the file not existing, an error message will be printed to stderr.

This model is pretty draconian and sets up file access as competitive across all files, so only use it for files that may be written to by the server; read-only files can use load_or_get_cached instead. This function does set up a local in-process cache and check file mtimes, to avoid the entire proxy call if it can (only if cache is True).

The arguments behave as follows:

filename: The full path to the file to be accessed.
view: A View object which can encode and decode strings. AsJSON
    is the default, which converts strings in files to objects
    using JSON. The view's decode function is applied to file
    contents before they are cached. Using two different views
    with the same __name__ will cause incorrect cached values to
    be returned.
missing: A value to be returned (but not cached or written to
    disk) if the file does not exist. Leave it as Exception and
    whatever exception causes os.path.getmtime to fail will be
    allowed to bubble out.
cache: True by default, results will be cached, and the
    file-access-via-proxy chain of events will be skipped
    entirely if the cache value is newer than the file on disk.
    Set to False for large and/or constantly changing files where
    you expect this to be a waste of time or space. The cache is
    per-process but shared between threads.
def write_file( filename, content, view=<class 'potluck_server.sync.AsJSON'>, cache=True):
492def write_file(filename, content, view=AsJSON, cache=True):
493    """
494    Writes the given content to the given file, encoding the content
495    using the encode function of the given view. If cache is True,
496    updates the _ACCESS_CACHE with the new file contents to avoid
497    unnecessary round trips to disk. The original content, not the
498    encoded string, will be cached. The view may be given as None to
499    write the content directly to the file as a string.
500    """
501    if view is None:
502        view = AsIs
503
504    file_content = view.encode(content)
505
506    _ACCESS.write_file(filename, file_content)
507
508    if cache:
509        cache_key = cache_key_for(filename, view)
510        set_new_cached_value(
511            _ACCESS_LOCK,
512            _ACCESS_CACHE,
513            cache_key,
514            content
515        )

Writes the given content to the given file, encoding the content using the encode function of the given view. If cache is True, updates the _ACCESS_CACHE with the new file contents to avoid unnecessary round trips to disk. The original content, not the encoded string, will be cached. The view may be given as None to write the content directly to the file as a string.

class FileSystemManager(multiprocessing.managers.BaseManager):
564class FileSystemManager(multiprocessing.managers.BaseManager):
565    """
566    Custom manager class for managing read/write file system access from
567    multiple threads. Go use a database instead!
568    """
569    pass

Custom manager class for managing read/write file system access from multiple threads. Go use a database instead!

def get(self, /, *args, **kwds):
722            def temp(self, /, *args, **kwds):
723                util.debug('requesting creation of a shared %r object', typeid)
724                token, exp = self._create(typeid, *args, **kwds)
725                proxy = proxytype(
726                    token, self._serializer, manager=self,
727                    authkey=self._authkey, exposed=exp
728                    )
729                conn = self._Client(token.address, authkey=self._authkey)
730                dispatch(conn, None, 'decref', (token.id,))
731                return proxy
Inherited Members
multiprocessing.managers.BaseManager
BaseManager
get_server
connect
start
join
register
def init_sync(port=51723, key=None):
579def init_sync(port=51723, key=None):
580    """
581    init_sync should be called once per process, ideally early in the
582    life of the process, like right after importing the sync module.
583    Calling access_file before init_sync will fail. A file named
584    'syncauth' should exist unless a key is given (should be a
585    byte-string). If 'syncauth' doesn't exist, it will be created. Don't
586    rely too heavily on the default port, since any two apps using sync
587    with the default port on the same machine will collide.
588    """
589    global _MANAGER, _ACCESS
590    # Get synchronization key:
591    if key is None:
592        try:
593            if os.path.exists('syncauth'):
594                with open('syncauth', 'rb') as fin:
595                    key = fin.read()
596            else:
597                print("Creating new sync secret file 'syncauth'.")
598                key = os.urandom(16)
599                with open('syncauth', 'wb') as fout:
600                    fout.write(key)
601        except Exception:
602            raise IOError_or_FileNotFoundError(
603                "Unable to access 'syncauth' file.\n"
604              + "  Create it and put some random bytes in there, or pass"
605              + " key= to init_sync."
606            )
607
608    _MANAGER = FileSystemManager(address=('localhost', port), authkey=key)
609    # Attempt to connect; if that fails, attempt to start a new manager
610    # if that fails (due to simultaneous starts and one wins the port
611    # that's not us) attempt to connect again. Repeat until we connect.
612    while True:
613        print("Attempting to connect to MP manager...")
614
615        # Reduce multiprocessing's connection timeout value since we're
616        # pretty sure we'll fail first and succeed later
617        old_init_timeout = multiprocessing.connection._init_timeout
618        multiprocessing.connection._init_timeout = lambda t=2.0: (
619            time.time() + t
620        )
621        try:
622            # NOTE: This call may just never return if we accidentally
623            # connect to e.g., an HTTP server.
624            _MANAGER.connect() # Check your port settings!
625            print("...connected successfully.")
626            break # only way out of loop is to connect successfully
627        except ConnectionRefusedError: # Nobody listening on that port
628            pass
629        except AuthenticationError:
630            raise ValueError(
631                "Your authkey is not correct. Make sure you're not"
632              + " sharing the port you chose!"
633            )
634        print("...failed to connect...")
635
636        # Restore old timeout function
637        multiprocessing.connection._init_timeout = old_init_timeout
638
639        # We didn't get to connect? Let's try to start a manager:
640        try:
641            print("...starting manager process...")
642            _MANAGER.start()
643            print("...finished starting manager...")
644        except EOFError: # Error generated if port is in use
645            pass
646        except RuntimeError:
647            # Error generated as of 3.8 when attempting to start the
648            # manager during the post-fork module reloading phase of a
649            # manager sub-process.
650            return
651
652        # Sleep for quite a while to try to avoid calling _MANAGER.start
653        # multiple times before the first call can bind the port.
654        time.sleep(0.2)
655
656        # And now we'll return to the top of the loop and attempt to
657        # connect again.
658
659    # At this point, _MANAGER is a connected manager for this process, so
660    # we're done.
661
662    # Set up _ACCESS as a proxy object:
663    _ACCESS = _MANAGER.get()
664
665    if _ACCESS is None:
666        raise ValueError("_MANAGER.get() returned None")

init_sync should be called once per process, ideally early in the life of the process, like right after importing the sync module. Calling access_file before init_sync will fail. A file named 'syncauth' should exist unless a key is given (should be a byte-string). If 'syncauth' doesn't exist, it will be created. Don't rely too heavily on the default port, since any two apps using sync with the default port on the same machine will collide.