[Zodb-checkins] SVN: ZODB/branches/tseaver-python_picklecache-2/src/persistent/ Rename 'pypersistent' -> 'pyPersistence' to fit the pattern expected by
Tres Seaver
tseaver at palladion.com
Wed Feb 16 00:04:27 EST 2011
Log message for revision 120362:
Rename 'pypersistent' -> 'pyPersistence' to fit the pattern expected by
conditionl imports.
Move the state flags into the interfaces module to avoid cycle (they
are part of the API, anyway).
Fix __setattr__ / __delattr__ semantics as revealed by other tests.
Changed:
U ZODB/branches/tseaver-python_picklecache-2/src/persistent/interfaces.py
A ZODB/branches/tseaver-python_picklecache-2/src/persistent/pyPersistence.py
D ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py
A ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pyPersistence.py
D ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py
-=-
Modified: ZODB/branches/tseaver-python_picklecache-2/src/persistent/interfaces.py
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/interfaces.py 2011-02-16 00:00:51 UTC (rev 120361)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/interfaces.py 2011-02-16 05:04:26 UTC (rev 120362)
@@ -17,16 +17,17 @@
from zope.interface import Interface
from zope.interface import Attribute
+# Allowed values for _p_state
try:
- from cPersistence import GHOST
- from cPersistence import UPTODATE
- from cPersistence import CHANGED
- from cPersistence import STICKY
+ from .cPersistence import GHOST
+ from .cPersistence import UPTODATE
+ from .cPersistence import CHANGED
+ from .cPersistence import STICKY
except ImportError:
- from pypersistence import GHOST
- from pypersistence import UPTODATE
- from pypersistence import CHANGED
- from pypersistence import STICKY
+ GHOST = -1
+ UPTODATE = 0
+ CHANGED = 1
+ STICKY = 2
class IPersistent(Interface):
Copied: ZODB/branches/tseaver-python_picklecache-2/src/persistent/pyPersistence.py (from rev 120361, ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py)
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/pyPersistence.py (rev 0)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/pyPersistence.py 2011-02-16 05:04:26 UTC (rev 120362)
@@ -0,0 +1,348 @@
+##############################################################################
+#
+# Copyright (c) 2011 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import datetime
+import struct
+import sys
+import time
+
+from zope.interface import implements
+
+from persistent.interfaces import IPersistent
+from persistent.interfaces import IPersistentDataManager
+from persistent.interfaces import GHOST
+from persistent.interfaces import UPTODATE
+from persistent.interfaces import CHANGED
+from persistent.interfaces import STICKY
+
+if sys.version_info < (2.6,):
+ OID_TYPE = SERIAL_TYPE = str
+else:
+ OID_TYPE = SERIAL_TYPE = bytes
+
+# Bitwise flags
+_CHANGED = 0x0001
+_STICKY = 0x0002
+
+_OGA = object.__getattribute__
+_OSA = object.__setattr__
+
+# These names can be used from a ghost without causing it to be activated.
+SPECIAL_NAMES = ('__class__',
+ '__del__',
+ '__dict__',
+ '__of__',
+ '__setstate__'
+ )
+
+_SCONV = 60.0 / (1<<16) / (1<<16)
+
+def makeTimestamp(year, month, day, hour, minute, second):
+ a = (((year - 1900) * 12 + month - 1) * 31 + day - 1)
+ a = (a * 24 + hour) * 60 + minute
+ b = int(second / _SCONV)
+ return struct.pack('>II', a, b)
+
+def parseTimestamp(octets):
+ a, b = struct.unpack('>II', octets)
+ minute = a % 60
+ hour = a // 60 % 24
+ day = a // (60 * 24) % 31 + 1
+ month = a // (60 * 24 * 31) % 12 + 1
+ year = a // (60 * 24 * 31 * 12) + 1900
+ second = b * _SCONV
+ return (year, month, day, hour, minute, second)
+
+
+class Persistent(object):
+ """ Pure Python implmentation of Persistent base class
+ """
+ __slots__ = ('__jar', '__oid', '__serial', '__flags')
+ implements(IPersistent)
+
+ def __new__(cls):
+ inst = super(Persistent, cls).__new__(cls)
+ inst.__jar = inst.__oid = inst.__serial = None
+ inst.__flags = None
+ return inst
+
+ # _p_jar: see IPersistent.
+ def _get_jar(self):
+ return self.__jar
+
+ def _set_jar(self, value):
+ if value is self.__jar:
+ return
+ if self.__jar is not None:
+ raise ValueError('Already assigned a data manager')
+ if not IPersistentDataManager.providedBy(value):
+ raise ValueError('Not a data manager: %s' % value)
+ self.__jar = value
+
+ _p_jar = property(_get_jar, _set_jar)
+
+ # _p_oid: see IPersistent.
+ def _get_oid(self):
+ return self.__oid
+
+ def _set_oid(self, value):
+ if value == self.__oid:
+ return
+ if value is not None:
+ if not isinstance(value, OID_TYPE):
+ raise ValueError('Invalid OID type: %s' % value)
+ if self.__oid is not None:
+ raise ValueError('Already assigned an OID')
+ self.__oid = value
+
+ _p_oid = property(_get_oid, _set_oid)
+
+ # _p_serial: see IPersistent.
+ def _get_serial(self):
+ return self.__serial
+
+ def _set_serial(self, value):
+ if value is not None:
+ if not isinstance(value, SERIAL_TYPE):
+ raise ValueError('Invalid SERIAL type: %s' % value)
+ self.__serial = value
+
+ _p_serial = property(_get_serial, _set_serial)
+
+ # _p_changed: see IPersistent.
+ def _get_changed(self):
+ if self.__flags is None: # ghost
+ return None
+ return self.__flags & _CHANGED
+
+ def _set_changed(self, value):
+ if self.__flags is None:
+ if value is not None:
+ self._p_activate()
+ self._p_set_changed_flag(value)
+ else:
+ if value is None: # -> ghost
+ self._p_deactivate()
+ else:
+ self._p_set_changed_flag(value)
+
+ def _del_changed(self):
+ self._p_invalidate()
+
+ _p_changed = property(_get_changed, _set_changed, _del_changed)
+
+ # _p_mtime
+ def _get_mtime(self):
+ if self.__serial is not None:
+ when = datetime.datetime(*parseTimestamp(self.__serial))
+ return time.mktime(when.timetuple())
+
+ _p_mtime = property(_get_mtime)
+
+ # _p_state
+ def _get_state(self):
+ if self.__flags is None:
+ if self.__jar is None:
+ return UPTODATE
+ return GHOST
+ if self.__flags & _CHANGED:
+ if self.__jar is None:
+ return UPTODATE
+ result = CHANGED
+ else:
+ result = UPTODATE
+ if self.__flags & _STICKY:
+ return STICKY
+ return result
+
+ _p_state = property(_get_state)
+
+ # _p_estimated_size: XXX don't want to reserve the space?
+ def _get_estimated_size(self):
+ return 0
+
+ def _set_estimated_size(self, value):
+ pass
+
+ _p_estimated_size = property(_get_estimated_size, _set_estimated_size)
+
+ # The '_p_sticky' property is not (yet) part of the API: for now,
+ # it exists to simplify debugging and testing assertions.
+ def _get_sticky(self):
+ if self.__flags is None:
+ return False
+ return self.__flags & _STICKY
+ def _set_sticky(self, value):
+ if self.__flags is None:
+ raise ValueError('Ghost')
+ if value:
+ self.__flags |= _STICKY
+ else:
+ self.__flags &= ~_STICKY
+ _p_sticky = property(_get_sticky, _set_sticky)
+
+ # The '_p_status' property is not (yet) part of the API: for now,
+ # it exists to simplify debugging and testing assertions.
+ def _get_status(self):
+ if self.__flags is None:
+ if self.__jar is None:
+ return 'new'
+ return 'ghost'
+ if self.__flags & _CHANGED:
+ if self.__jar is None:
+ return 'unsaved'
+ result = 'changed'
+ else:
+ result = 'saved'
+ if self.__flags & _STICKY:
+ return '%s (sticky)' % result
+ return result
+
+ _p_status = property(_get_status)
+
+ # Methods from IPersistent.
+ def __getattribute__(self, name):
+ """ See IPersistent.
+ """
+ if (not name.startswith('_Persistent__') and
+ not name.startswith('_p_') and
+ name not in SPECIAL_NAMES):
+ if _OGA(self, '_Persistent__flags') is None:
+ _OGA(self, '_p_activate')()
+ _OGA(self, '_p_accessed')()
+ return _OGA(self, name)
+
+ def __setattr__(self, name, value):
+ special_name = (name.startswith('_Persistent__') or
+ name.startswith('_p_'))
+ if not special_name:
+ if _OGA(self, '_Persistent__flags') is None:
+ _OGA(self, '_p_activate')()
+ _OGA(self, '_p_accessed')()
+ _OSA(self, name, value)
+ if not special_name:
+ before = _OGA(self, '_Persistent__flags')
+ after = before | _CHANGED
+ if before != after:
+ _OSA(self, '_Persistent__flags', after)
+ if (_OGA(self, '_Persistent__jar') is not None and
+ _OGA(self, '_Persistent__oid') is not None):
+ _OGA(self, '_p_register')()
+
+ def __delattr__(self, name):
+ special_name = (name.startswith('_Persistent__') or
+ name.startswith('_p_'))
+ if not special_name:
+ if _OGA(self, '_Persistent__flags') is None:
+ _OGA(self, '_p_activate')()
+ _OGA(self, '_p_accessed')()
+ object.__delattr__(self, name)
+ if not special_name:
+ before = _OGA(self, '_Persistent__flags')
+ after = before | _CHANGED
+ if before != after:
+ _OSA(self, '_Persistent__flags', after)
+ if (_OGA(self, '_Persistent__jar') is not None and
+ _OGA(self, '_Persistent__oid') is not None):
+ _OGA(self, '_p_register')()
+
+ def __getstate__(self):
+ """ See IPersistent.
+ """
+ return ()
+
+ def __setstate__(self, state):
+ """ See IPersistent.
+ """
+ if state != ():
+ raise ValueError('No state allowed on base Persistent class')
+
+ def __reduce__(self):
+ """ See IPersistent.
+ """
+ gna = getattr(self, '__getnewargs__', lambda: ())
+ return ((type(self),) + gna(), self.__getstate__())
+
+ def _p_activate(self):
+ """ See IPersistent.
+ """
+ if self.__flags is None:
+ self.__flags = 0
+ if self.__jar is not None and self.__oid is not None:
+ self.__jar.setstate(self)
+
+ def _p_deactivate(self):
+ """ See IPersistent.
+ """
+ if self.__flags is not None and not self.__flags & _CHANGED:
+ self._p_invalidate()
+
+ def _p_invalidate(self):
+ """ See IPersistent.
+ """
+ if self.__flags is not None and self.__flags & _STICKY:
+ raise ValueError('Sticky')
+ self.__flags = None
+
+ def _p_getattr(self, name):
+ """ See IPersistent.
+ """
+ if name.startswith('_p_') or name in SPECIAL_NAMES:
+ return True
+ self._p_activate()
+ self._p_accessed()
+ return False
+
+ def _p_setattr(self, name, value):
+ """ See IPersistent.
+ """
+ if name.startswith('_p_'):
+ setattr(self, name, value)
+ return True
+ self._p_activate()
+ self._p_accessed()
+ return False
+
+ def _p_delattr(self, name):
+ """ See IPersistent.
+ """
+ if name.startswith('_p_'):
+ delattr(self, name)
+ return True
+ self._p_activate()
+ self._p_accessed()
+ return False
+
+ # Helper methods: not APIs: we name them with '_p_' to bypass
+ # the __getattribute__ bit which bumps the cache.
+ def _p_register(self):
+ if self.__jar is not None and self.__oid is not None:
+ self.__jar.register(self)
+
+ def _p_set_changed_flag(self, value):
+ if value:
+ before = self.__flags
+ self.__flags |= _CHANGED
+ if before != self.__flags:
+ self._p_register()
+ else:
+ self.__flags &= ~_CHANGED
+
+ def _p_accessed(self):
+ # Notify the jar's pickle cache that we have been accessed.
+ # This relies on what has been (until now) an implementation
+ # detail, the '_cache' attribute of the jar. We made it a
+ # private API to avoid the cycle of keeping a reference to
+ # the cache on the persistent object.
+ if self.__jar is not None and self.__oid is not None:
+ self.__jar._cache.mru(self.__oid)
Deleted: ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py 2011-02-16 00:00:51 UTC (rev 120361)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py 2011-02-16 05:04:26 UTC (rev 120362)
@@ -1,337 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2011 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-import datetime
-import struct
-import sys
-import time
-
-from zope.interface import implements
-
-from persistent.interfaces import IPersistent
-from persistent.interfaces import IPersistentDataManager
-
-if sys.version_info < (2.6,):
- OID_TYPE = SERIAL_TYPE = str
-else:
- OID_TYPE = SERIAL_TYPE = bytes
-
-# Bitwise flags
-_CHANGED = 0x0001
-_STICKY = 0x0002
-
-_OGA = object.__getattribute__
-
-# Allowed values for _p_state
-GHOST = -1
-UPTODATE = 0
-CHANGED = 1
-STICKY = 2
-
-# These names can be used from a ghost without causing it to be activated.
-SPECIAL_NAMES = ('__class__',
- '__del__',
- '__dict__',
- '__of__',
- '__setstate__'
- )
-
-_SCONV = 60.0 / (1<<16) / (1<<16)
-
-def makeTimestamp(year, month, day, hour, minute, second):
- a = (((year - 1900) * 12 + month - 1) * 31 + day - 1)
- a = (a * 24 + hour) * 60 + minute
- b = int(second / _SCONV)
- return struct.pack('>II', a, b)
-
-def parseTimestamp(octets):
- a, b = struct.unpack('>II', octets)
- minute = a % 60
- hour = a // 60 % 24
- day = a // (60 * 24) % 31 + 1
- month = a // (60 * 24 * 31) % 12 + 1
- year = a // (60 * 24 * 31 * 12) + 1900
- second = b * _SCONV
- return (year, month, day, hour, minute, second)
-
-
-class Persistent(object):
- """ Pure Python implmentation of Persistent base class
- """
- __slots__ = ('__jar', '__oid', '__serial', '__flags')
- implements(IPersistent)
-
- def __new__(cls):
- inst = super(Persistent, cls).__new__(cls)
- inst.__jar = inst.__oid = inst.__serial = None
- inst.__flags = None
- return inst
-
- # _p_jar: see IPersistent.
- def _get_jar(self):
- return self.__jar
-
- def _set_jar(self, value):
- if value is self.__jar:
- return
- if self.__jar is not None:
- raise ValueError('Already assigned a data manager')
- if not IPersistentDataManager.providedBy(value):
- raise ValueError('Not a data manager: %s' % value)
- self.__jar = value
-
- _p_jar = property(_get_jar, _set_jar)
-
- # _p_oid: see IPersistent.
- def _get_oid(self):
- return self.__oid
-
- def _set_oid(self, value):
- if value == self.__oid:
- return
- if value is not None:
- if not isinstance(value, OID_TYPE):
- raise ValueError('Invalid OID type: %s' % value)
- if self.__oid is not None:
- raise ValueError('Already assigned an OID')
- self.__oid = value
-
- _p_oid = property(_get_oid, _set_oid)
-
- # _p_serial: see IPersistent.
- def _get_serial(self):
- return self.__serial
-
- def _set_serial(self, value):
- if value is not None:
- if not isinstance(value, SERIAL_TYPE):
- raise ValueError('Invalid SERIAL type: %s' % value)
- self.__serial = value
-
- _p_serial = property(_get_serial, _set_serial)
-
- # _p_changed: see IPersistent.
- def _get_changed(self):
- if self.__flags is None: # ghost
- return None
- return self.__flags & _CHANGED
-
- def _set_changed(self, value):
- if self.__flags is None:
- if value is not None:
- self._p_activate()
- self._p_set_changed_flag(value)
- else:
- if value is None: # -> ghost
- self._p_deactivate()
- else:
- self._p_set_changed_flag(value)
-
- def _del_changed(self):
- self._p_invalidate()
-
- _p_changed = property(_get_changed, _set_changed, _del_changed)
-
- # _p_mtime
- def _get_mtime(self):
- if self.__serial is not None:
- when = datetime.datetime(*parseTimestamp(self.__serial))
- return time.mktime(when.timetuple())
-
- _p_mtime = property(_get_mtime)
-
- # _p_state
- def _get_state(self):
- if self.__flags is None:
- if self.__jar is None:
- return UPTODATE
- return GHOST
- if self.__flags & _CHANGED:
- if self.__jar is None:
- return UPTODATE
- result = CHANGED
- else:
- result = UPTODATE
- if self.__flags & _STICKY:
- return STICKY
- return result
-
- _p_state = property(_get_state)
-
- # _p_estimated_size: XXX don't want to reserve the space?
- def _get_estimated_size(self):
- return 0
-
- def _set_estimated_size(self, value):
- pass
-
- _p_estimated_size = property(_get_estimated_size, _set_estimated_size)
-
- # The '_p_sticky' property is not (yet) part of the API: for now,
- # it exists to simplify debugging and testing assertions.
- def _get_sticky(self):
- if self.__flags is None:
- return False
- return self.__flags & _STICKY
- def _set_sticky(self, value):
- if self.__flags is None:
- raise ValueError('Ghost')
- if value:
- self.__flags |= _STICKY
- else:
- self.__flags &= ~_STICKY
- _p_sticky = property(_get_sticky, _set_sticky)
-
- # The '_p_status' property is not (yet) part of the API: for now,
- # it exists to simplify debugging and testing assertions.
- def _get_status(self):
- if self.__flags is None:
- if self.__jar is None:
- return 'new'
- return 'ghost'
- if self.__flags & _CHANGED:
- if self.__jar is None:
- return 'unsaved'
- result = 'changed'
- else:
- result = 'saved'
- if self.__flags & _STICKY:
- return '%s (sticky)' % result
- return result
-
- _p_status = property(_get_status)
-
- # Methods from IPersistent.
- def __getattribute__(self, name):
- """ See IPersistent.
- """
- if (not name.startswith('_Persistent__') and
- not name.startswith('_p_') and
- name not in SPECIAL_NAMES):
- if _OGA(self, '_Persistent__flags') is None:
- _OGA(self, '_p_activate')()
- _OGA(self, '_p_accessed')()
- return _OGA(self, name)
-
- def __setattr__(self, name, value):
- if (not name.startswith('_Persistent__') and
- not name.startswith('_p_')):
- if _OGA(self, '_Persistent__flags') is None:
- _OGA(self, '_p_activate')()
- _OGA(self, '_p_accessed')()
- if (_OGA(self, '_Persistent__jar') is not None and
- _OGA(self, '_Persistent__oid') is not None):
- _OGA(self, '_p_register')()
- object.__setattr__(self, name, value)
-
- def __delattr__(self, name):
- if (not name.startswith('_Persistent__') and
- not name.startswith('_p_')):
- if _OGA(self, '_Persistent__flags') is None:
- _OGA(self, '_p_activate')()
- _OGA(self, '_p_accessed')()
- if (_OGA(self, '_Persistent__jar') is not None and
- _OGA(self, '_Persistent__oid') is not None):
- _OGA(self, '_p_register')()
- object.__delattr__(self, name)
-
- def __getstate__(self):
- """ See IPersistent.
- """
- return ()
-
- def __setstate__(self, state):
- """ See IPersistent.
- """
- if state != ():
- raise ValueError('No state allowed on base Persistent class')
-
- def __reduce__(self):
- """ See IPersistent.
- """
- gna = getattr(self, '__getnewargs__', lambda: ())
- return ((type(self),) + gna(), self.__getstate__())
-
- def _p_activate(self):
- """ See IPersistent.
- """
- if self.__flags is None:
- self.__flags = 0
- if self.__jar is not None and self.__oid is not None:
- self.__jar.setstate(self)
-
- def _p_deactivate(self):
- """ See IPersistent.
- """
- if self.__flags is not None and not self.__flags & _CHANGED:
- self._p_invalidate()
-
- def _p_invalidate(self):
- """ See IPersistent.
- """
- if self.__flags is not None and self.__flags & _STICKY:
- raise ValueError('Sticky')
- self.__flags = None
-
- def _p_getattr(self, name):
- """ See IPersistent.
- """
- if name.startswith('_p_') or name in SPECIAL_NAMES:
- return True
- self._p_activate()
- self._p_accessed()
- return False
-
- def _p_setattr(self, name, value):
- """ See IPersistent.
- """
- if name.startswith('_p_'):
- setattr(self, name, value)
- return True
- self._p_activate()
- self._p_accessed()
- return False
-
- def _p_delattr(self, name):
- """ See IPersistent.
- """
- if name.startswith('_p_'):
- delattr(self, name)
- return True
- self._p_activate()
- self._p_accessed()
- return False
-
- # Helper methods: not APIs: we name them with '_p_' to bypass
- # the __getattribute__ bit which bumps the cache.
- def _p_register(self):
- if self.__jar is not None and self.__oid is not None:
- self.__jar.register(self)
-
- def _p_set_changed_flag(self, value):
- if value:
- before = self.__flags
- self.__flags |= _CHANGED
- if before != self.__flags:
- self._p_register()
- else:
- self.__flags &= ~_CHANGED
-
- def _p_accessed(self):
- # Notify the jar's pickle cache that we have been accessed.
- # This relies on what has been (until now) an implementation
- # detail, the '_cache' attribute of the jar. We made it a
- # private API to avoid the cycle of keeping a reference to
- # the cache on the persistent object.
- if self.__jar is not None and self.__oid is not None:
- self.__jar._cache.mru(self.__oid)
Copied: ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pyPersistence.py (from rev 120361, ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py)
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pyPersistence.py (rev 0)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pyPersistence.py 2011-02-16 05:04:26 UTC (rev 120362)
@@ -0,0 +1,849 @@
+##############################################################################
+#
+# Copyright (c) 2011 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+
+class PersistentTests(unittest.TestCase):
+
+ def _getTargetClass(self):
+ from persistent.pyPersistence import Persistent
+ return Persistent
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def _makeJar(self):
+ from zope.interface import implements
+ from persistent.interfaces import IPersistentDataManager
+
+ class _Cache(object):
+ def __init__(self):
+ self._mru = []
+ def mru(self, oid):
+ self._mru.append(oid)
+
+ class _Jar(object):
+ implements(IPersistentDataManager)
+ def __init__(self):
+ self._loaded = []
+ self._registered = []
+ self._cache = _Cache()
+ def setstate(self, obj):
+ self._loaded.append(obj._p_oid)
+ def register(self, obj):
+ self._registered.append(obj._p_oid)
+
+ return _Jar()
+
+ def _makeOneWithJar(self, klass=None):
+ OID = '\x01' * 8
+ if klass is not None:
+ inst = klass()
+ else:
+ inst = self._makeOne()
+ jar = self._makeJar()
+ inst._p_jar = jar
+ inst._p_oid = OID
+ return inst, jar, OID
+
+ def test_class_conforms_to_IPersistent(self):
+ from zope.interface.verify import verifyClass
+ from persistent.interfaces import IPersistent
+ verifyClass(IPersistent, self._getTargetClass())
+
+ def test_instance_conforms_to_IPersistent(self):
+ from zope.interface.verify import verifyObject
+ from persistent.interfaces import IPersistent
+ verifyObject(IPersistent, self._makeOne())
+
+ def test_ctor(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_jar, None)
+ self.assertEqual(inst._p_oid, None)
+ self.assertEqual(inst._p_serial, None)
+ self.assertEqual(inst._p_changed, None)
+ self.assertEqual(inst._p_sticky, False)
+
+ def test_assign_p_jar_w_invalid_jar(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_jar = object()
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_jar_w_new_jar(self):
+ inst = self._makeOne()
+ inst._p_jar = self._makeJar()
+ jar = self._makeJar()
+ def _test():
+ inst._p_jar = jar
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_jar_w_valid_jar(self):
+ jar = self._makeJar()
+ inst = self._makeOne()
+ inst._p_jar = jar
+ self.failUnless(inst._p_jar is jar)
+ inst._p_jar = jar # reassign only to same DM
+
+ def test_assign_p_oid_w_invalid_oid(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_oid = object()
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_oid_w_valid_oid(self):
+ OID = '1' * 8
+ inst = self._makeOne()
+ inst._p_oid = OID
+ self.assertEqual(inst._p_oid, OID)
+ inst._p_oid = OID # reassign only same OID
+
+ def test_assign_p_oid_w_new_oid(self):
+ OID1 = '1' * 8
+ OID2 = '2' * 8
+ inst = self._makeOne()
+ inst._p_oid = OID1
+ def _test():
+ inst._p_oid = OID2
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_serial_w_invalid_serial(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_serial = object()
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_serial_w_valid_serial(self):
+ SERIAL = '1' * 8
+ inst = self._makeOne()
+ inst._p_serial = SERIAL
+ self.assertEqual(inst._p_serial, SERIAL)
+ inst._p_serial = None
+ self.assertEqual(inst._p_serial, None)
+
+ def test_query_p_changed(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_changed, None)
+ inst._p_changed = True
+ self.assertEqual(inst._p_changed, True)
+ inst._p_changed = False
+ self.assertEqual(inst._p_changed, False)
+
+ def test_assign_p_changed_none_from_new(self):
+ inst = self._makeOne()
+ inst._p_changed = None
+ self.assertEqual(inst._p_status, 'new')
+
+ def test_assign_p_changed_true_from_new(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'unsaved')
+
+ def test_assign_p_changed_false_from_new(self):
+ inst = self._makeOne()
+ inst._p_changed = False # activates
+ self.assertEqual(inst._p_status, 'saved')
+
+ def test_assign_p_changed_none_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_changed = None
+ # can't transition 'unsaved' -> 'new'
+ self.assertEqual(inst._p_status, 'unsaved')
+
+ def test_assign_p_changed_true_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'unsaved')
+
+ def test_assign_p_changed_false_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_changed = False
+ self.assertEqual(inst._p_status, 'saved')
+
+ def test_assign_p_changed_none_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = None
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_true_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'changed')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._registered), [OID])
+
+ def test_assign_p_changed_false_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ self.assertEqual(inst._p_status, 'saved')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_none_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ inst._p_changed = None
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_true_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'changed')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._registered), [OID])
+
+ def test_assign_p_changed_false_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ inst._p_changed = False
+ self.assertEqual(inst._p_status, 'saved')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_none_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_changed = None
+ # assigning None is ignored when dirty
+ self.assertEqual(inst._p_status, 'changed')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_true_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'changed')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_false_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_changed = False
+ self.assertEqual(inst._p_status, 'saved')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_none_when_sticky(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ def _test():
+ inst._p_changed = None
+ self.assertRaises(ValueError, _test)
+
+ def test_delete_p_changed_from_new(self):
+ inst = self._makeOne()
+ del inst._p_changed
+ self.assertEqual(inst._p_status, 'new')
+
+ def test_delete_p_changed_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ del inst._p_changed
+ self.assertEqual(inst._p_status, 'new')
+
+ def test_delete_p_changed_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ del inst._p_changed
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_delete_p_changed_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ jar._registered = []
+ del inst._p_changed
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_delete_p_changed_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ del inst._p_changed
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_delete_p_changed_when_sticky(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ def _test():
+ del inst._p_changed
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_sticky_true_when_ghost(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_sticky = True
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_sticky_false_when_ghost(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_sticky = False
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_sticky_true_non_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.failUnless(inst._p_sticky)
+
+ def test_assign_p_sticky_false_non_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = False
+ self.failIf(inst._p_sticky)
+
+ def test__p_status_new(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_status, 'new')
+
+ def test__p_status_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'unsaved')
+
+ def test__p_status_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ self.assertEqual(inst._p_status, 'ghost')
+
+ def test__p_status_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ self.assertEqual(inst._p_status, 'changed')
+
+ def test__p_status_changed_sticky(self):
+ # 'sticky' is not a state, but a separate flag.
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ inst._p_sticky = True
+ self.assertEqual(inst._p_status, 'changed (sticky)')
+
+ def test__p_status_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ self.assertEqual(inst._p_status, 'saved')
+
+ def test__p_status_saved_sticky(self):
+ # 'sticky' is not a state, but a separate flag.
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.assertEqual(inst._p_status, 'saved (sticky)')
+
+ def test__p_mtime_no_serial(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_mtime, None)
+
+ def test__p_mtime_w_serial(self):
+ import datetime
+ import time
+ from persistent.pyPersistence import makeTimestamp
+ WHEN_TUPLE = (2011, 2, 15, 13, 33, 27.5)
+ WHEN = datetime.datetime(*WHEN_TUPLE)
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_serial = makeTimestamp(*WHEN_TUPLE)
+ self.assertEqual(inst._p_mtime, time.mktime(WHEN.timetuple()))
+
+ def test__p_state_new(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_state, 0)
+
+ def test__p_state_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 0)
+
+ def test__p_state_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ self.assertEqual(inst._p_state, -1)
+
+ def test__p_state_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 1)
+
+ def test__p_state_changed_sticky(self):
+ # 'sticky' is not a state, but a separate flag.
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ inst._p_sticky = True
+ self.assertEqual(inst._p_state, 2)
+
+ def test__p_state_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ self.assertEqual(inst._p_state, 0)
+
+ def test__p_state_saved_sticky(self):
+ # 'sticky' is not a state, but a separate flag.
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.assertEqual(inst._p_state, 2)
+
+ def test_query_p_estimated_size(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_estimated_size, 0)
+
+ def test_assign_p_estimated_size(self):
+ # XXX at the moment, we don't store this value.
+ inst = self._makeOne()
+ inst._p_estimated_size = 123
+ self.assertEqual(inst._p_estimated_size, 0)
+
+ def test___getattribute___p__names(self):
+ NAMES = ['_p_jar',
+ '_p_oid',
+ '_p_changed',
+ '_p_serial',
+ '_p_mtime',
+ '_p_state',
+ '_p_estimated_size',
+ '_p_sticky',
+ '_p_status',
+ ]
+ inst, jar, OID = self._makeOneWithJar()
+ jar._cache._mru = []
+ for name in NAMES:
+ getattr(inst, name)
+ self.assertEqual(jar._cache._mru, [])
+
+ def test___getattribute__special_name(self):
+ from persistent.pyPersistence import SPECIAL_NAMES
+ inst, jar, OID = self._makeOneWithJar()
+ jar._cache._mru = []
+ for name in SPECIAL_NAMES:
+ getattr(inst, name, None)
+ self.assertEqual(jar._cache._mru, [])
+
+ def test___getattribute__normal_name_from_new(self):
+ class Derived(self._getTargetClass()):
+ normal = 'value'
+ inst = Derived()
+ self.assertEqual(getattr(inst, 'normal', None), 'value')
+
+ def test___getattribute__normal_name_from_unsaved(self):
+ class Derived(self._getTargetClass()):
+ normal = 'value'
+ inst = Derived()
+ inst._p_changed = True
+ self.assertEqual(getattr(inst, 'normal', None), 'value')
+
+ def test___getattribute__normal_name_from_ghost(self):
+ class Derived(self._getTargetClass()):
+ normal = 'value'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ jar._cache._mru = []
+ self.assertEqual(getattr(inst, 'normal', None), 'value')
+ self.assertEqual(jar._cache._mru, [OID])
+
+ def test___getattribute__normal_name_from_saved(self):
+ class Derived(self._getTargetClass()):
+ normal = 'value'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ inst._p_changed = False
+ jar._cache._mru = []
+ self.assertEqual(getattr(inst, 'normal', None), 'value')
+ self.assertEqual(jar._cache._mru, [OID])
+
+ def test___getattribute__normal_name_from_changed(self):
+ class Derived(self._getTargetClass()):
+ normal = 'value'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ inst._p_changed = True
+ jar._cache._mru = []
+ self.assertEqual(getattr(inst, 'normal', None), 'value')
+ self.assertEqual(jar._cache._mru, [OID])
+
+ def test___setattr___p__names(self):
+ inst, jar, OID = self._makeOneWithJar()
+ NAMES = [('_p_jar', jar),
+ ('_p_oid', OID),
+ ('_p_changed', False),
+ ('_p_serial', '\x01' * 8),
+ ('_p_estimated_size', 0),
+ ('_p_sticky', False),
+ ]
+ jar._cache._mru = []
+ for name, value in NAMES:
+ setattr(inst, name, value)
+ self.assertEqual(jar._cache._mru, [])
+
+ def test___setattr__normal_name_from_new(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ inst = Derived()
+ setattr(inst, 'normal', 'after')
+ self.assertEqual(getattr(inst, 'normal', None), 'after')
+
+ def test___setattr__normal_name_from_unsaved(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ inst = Derived()
+ inst._p_changed = True
+ setattr(inst, 'normal', 'after')
+ self.assertEqual(getattr(inst, 'normal', None), 'after')
+
+ def test___setattr__normal_name_from_ghost(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ jar._cache._mru = []
+ setattr(inst, 'normal', 'after')
+ self.assertEqual(jar._cache._mru, [OID])
+ self.assertEqual(jar._registered, [OID])
+ self.assertEqual(getattr(inst, 'normal', None), 'after')
+
+ def test___setattr__normal_name_from_saved(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ inst._p_changed = False
+ jar._cache._mru = []
+ setattr(inst, 'normal', 'after')
+ self.assertEqual(jar._cache._mru, [OID])
+ self.assertEqual(jar._registered, [OID])
+ self.assertEqual(getattr(inst, 'normal', None), 'after')
+
+ def test___setattr__normal_name_from_changed(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ inst._p_changed = True
+ jar._cache._mru = []
+ jar._registered = []
+ setattr(inst, 'normal', 'after')
+ self.assertEqual(jar._cache._mru, [OID])
+ self.assertEqual(jar._registered, [])
+ self.assertEqual(getattr(inst, 'normal', None), 'after')
+
+ def test___delattr___p__names(self):
+ inst, jar, OID = self._makeOneWithJar()
+ jar._cache._mru = []
+ jar._registered = []
+ delattr(inst, '_p_changed') #only del-able _p_ attribute.
+ self.assertEqual(jar._cache._mru, [])
+ self.assertEqual(jar._registered, [])
+
+ def test___delattr__normal_name_from_new(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ def __init__(self):
+ self.__dict__['normal'] = 'after'
+ inst = Derived()
+ delattr(inst, 'normal')
+ self.assertEqual(getattr(inst, 'normal', None), 'before')
+
+ def test___delattr__normal_name_from_unsaved(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ def __init__(self):
+ self.__dict__['normal'] = 'after'
+ inst = Derived()
+ inst._p_changed = True
+ delattr(inst, 'normal')
+ self.assertEqual(getattr(inst, 'normal', None), 'before')
+
+ def test___delattr__normal_name_from_ghost(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ def __init__(self):
+ self.__dict__['normal'] = 'after'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ jar._cache._mru = []
+ jar._registered = []
+ delattr(inst, 'normal')
+ self.assertEqual(jar._cache._mru, [OID])
+ self.assertEqual(jar._registered, [OID])
+ self.assertEqual(getattr(inst, 'normal', None), 'before')
+
+ def test___delattr__normal_name_from_saved(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ def __init__(self):
+ self.__dict__['normal'] = 'after'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ inst._p_changed = False
+ jar._cache._mru = []
+ jar._registered = []
+ delattr(inst, 'normal')
+ self.assertEqual(jar._cache._mru, [OID])
+ self.assertEqual(jar._registered, [OID])
+ self.assertEqual(getattr(inst, 'normal', None), 'before')
+
+ def test___delattr__normal_name_from_changed(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ def __init__(self):
+ self.__dict__['normal'] = 'after'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ inst._p_changed = True
+ jar._cache._mru = []
+ jar._registered = []
+ delattr(inst, 'normal')
+ self.assertEqual(jar._cache._mru, [OID])
+ self.assertEqual(jar._registered, [])
+ self.assertEqual(getattr(inst, 'normal', None), 'before')
+
+ def test___getstate__(self):
+ inst = self._makeOne()
+ self.assertEqual(inst.__getstate__(), ())
+
+ def test___setstate___empty(self):
+ inst = self._makeOne()
+ inst.__setstate__(()) # doesn't raise, but doesn't change anything
+
+ def test___setstate___nonempty(self):
+ inst = self._makeOne()
+ self.assertRaises(ValueError, inst.__setstate__, {'bogus': 1})
+ self.assertEqual(inst._p_jar, None)
+ self.assertEqual(inst._p_oid, None)
+ self.assertEqual(inst._p_serial, None)
+ self.assertEqual(inst._p_changed, None)
+ self.assertEqual(inst._p_sticky, False)
+
+ def test___reduce__(self):
+ inst = self._makeOne()
+ first, second = inst.__reduce__()
+ self.assertEqual(first, (self._getTargetClass(),))
+ self.assertEqual(second, ())
+
+ def test___reduce__w_subclass_having_getstate(self):
+ class Derived(self._getTargetClass()):
+ def __getstate__(self):
+ return {}
+ inst = Derived()
+ first, second = inst.__reduce__()
+ self.assertEqual(first, (Derived,))
+ self.assertEqual(second, {})
+
+ def test___reduce__w_subclass_having_gna_and_getstate(self):
+ class Derived(self._getTargetClass()):
+ def __getnewargs__(self):
+ return ('a', 'b')
+ def __getstate__(self):
+ return {'foo': 'bar'}
+ inst = Derived()
+ first, second = inst.__reduce__()
+ self.assertEqual(first, (Derived, 'a', 'b'))
+ self.assertEqual(second, {'foo': 'bar'})
+
+ def test__p_activate_from_new(self):
+ inst = self._makeOne()
+ inst._p_activate()
+ self.assertEqual(inst._p_status, 'saved')
+
+ def test__p_activate_from_saved(self):
+ inst = self._makeOne()
+ inst._p_changed = False
+ inst._p_activate() # noop from 'unsaved' state
+ self.assertEqual(inst._p_status, 'saved')
+
+ def test__p_activate_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_activate() # noop from 'saved' state
+ self.assertEqual(inst._p_status, 'unsaved')
+
+ def test__p_deactivate_from_new(self):
+ inst = self._makeOne()
+ inst._p_deactivate()
+ self.assertEqual(inst._p_status, 'new')
+
+ def test__p_deactivate_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_deactivate()
+ # can't transition 'unsaved' -> 'new'
+ self.assertEqual(inst._p_status, 'unsaved')
+
+ def test__p_deactivate_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_deactivate()
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test__p_deactivate_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ inst._p_deactivate()
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test__p_deactivate_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_deactivate()
+ # assigning None is ignored when dirty
+ self.assertEqual(inst._p_status, 'changed')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test__p_deactivate_when_sticky(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.assertRaises(ValueError, inst._p_deactivate)
+
+ def test__p_invalidate_from_new(self):
+ inst = self._makeOne()
+ inst._p_invalidate()
+ self.assertEqual(inst._p_status, 'new')
+
+ def test__p_invalidate_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_invalidate()
+ self.assertEqual(inst._p_status, 'new')
+
+ def test__p_invalidate_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_invalidate()
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test__p_invalidate_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ jar._registered = []
+ inst._p_invalidate()
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test__p_invalidate_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_invalidate()
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test__p_invalidate_when_sticky(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.assertRaises(ValueError, inst._p_invalidate)
+
+ def test__p_getattr_w__p__names(self):
+ NAMES = ['_p_jar',
+ '_p_oid',
+ '_p_changed',
+ '_p_serial',
+ '_p_mtime',
+ '_p_state',
+ '_p_estimated_size',
+ '_p_sticky',
+ '_p_status',
+ ]
+ inst, jar, OID = self._makeOneWithJar()
+ for name in NAMES:
+ self.failUnless(inst._p_getattr(name))
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._cache._mru), [])
+
+ def test__p_getattr_w_special_names(self):
+ from persistent.pyPersistence import SPECIAL_NAMES
+ inst, jar, OID = self._makeOneWithJar()
+ for name in SPECIAL_NAMES:
+ self.failUnless(inst._p_getattr(name))
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._cache._mru), [])
+
+ def test__p_getattr_w_normal_name(self):
+ inst, jar, OID = self._makeOneWithJar()
+ self.failIf(inst._p_getattr('normal'))
+ self.assertEqual(inst._p_status, 'saved')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._cache._mru), [OID])
+
+ def test__p_setattr_w__p__name(self):
+ inst, jar, OID = self._makeOneWithJar()
+ self.failUnless(inst._p_setattr('_p_serial', '1' * 8))
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(inst._p_serial, '1' * 8)
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._cache._mru), [])
+
+ def test__p_setattr_w_normal_name(self):
+ inst, jar, OID = self._makeOneWithJar()
+ self.failIf(inst._p_setattr('normal', 'value'))
+ # _p_setattr doesn't do the actual write for normal names
+ self.assertEqual(inst._p_status, 'saved')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._cache._mru), [OID])
+
+ def test__p_delattr_w__p__name(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ jar._loaded = []
+ self.failUnless(inst._p_delattr('_p_changed'))
+ self.assertEqual(inst._p_status, 'ghost')
+ self.assertEqual(inst._p_changed, None)
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._cache._mru), [])
+
+ def test__p_delattr_w_normal_name(self):
+ class Derived(self._getTargetClass()):
+ normal = 'before'
+ def __init__(self):
+ self.__dict__['normal'] = 'after'
+ inst, jar, OID = self._makeOneWithJar(Derived)
+ self.failIf(inst._p_delattr('normal'))
+ # _p_delattr doesn't do the actual delete for normal names
+ self.assertEqual(inst._p_status, 'saved')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._cache._mru), [OID])
Deleted: ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py 2011-02-16 00:00:51 UTC (rev 120361)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py 2011-02-16 05:04:26 UTC (rev 120362)
@@ -1,838 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2011 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-import unittest
-
-class PersistentTests(unittest.TestCase):
-
- def _getTargetClass(self):
- from persistent.pypersistent import Persistent
- return Persistent
-
- def _makeOne(self, *args, **kw):
- return self._getTargetClass()(*args, **kw)
-
- def _makeJar(self):
- from zope.interface import implements
- from persistent.interfaces import IPersistentDataManager
-
- class _Cache(object):
- def __init__(self):
- self._mru = []
- def mru(self, oid):
- self._mru.append(oid)
-
- class _Jar(object):
- implements(IPersistentDataManager)
- def __init__(self):
- self._loaded = []
- self._registered = []
- self._cache = _Cache()
- def setstate(self, obj):
- self._loaded.append(obj._p_oid)
- def register(self, obj):
- self._registered.append(obj._p_oid)
-
- return _Jar()
-
- def _makeOneWithJar(self, klass=None):
- OID = '\x01' * 8
- if klass is not None:
- inst = klass()
- else:
- inst = self._makeOne()
- jar = self._makeJar()
- inst._p_jar = jar
- inst._p_oid = OID
- return inst, jar, OID
-
- def test_class_conforms_to_IPersistent(self):
- from zope.interface.verify import verifyClass
- from persistent.interfaces import IPersistent
- verifyClass(IPersistent, self._getTargetClass())
-
- def test_instance_conforms_to_IPersistent(self):
- from zope.interface.verify import verifyObject
- from persistent.interfaces import IPersistent
- verifyObject(IPersistent, self._makeOne())
-
- def test_ctor(self):
- inst = self._makeOne()
- self.assertEqual(inst._p_jar, None)
- self.assertEqual(inst._p_oid, None)
- self.assertEqual(inst._p_serial, None)
- self.assertEqual(inst._p_changed, None)
- self.assertEqual(inst._p_sticky, False)
-
- def test_assign_p_jar_w_invalid_jar(self):
- inst = self._makeOne()
- def _test():
- inst._p_jar = object()
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_jar_w_new_jar(self):
- inst = self._makeOne()
- inst._p_jar = self._makeJar()
- jar = self._makeJar()
- def _test():
- inst._p_jar = jar
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_jar_w_valid_jar(self):
- jar = self._makeJar()
- inst = self._makeOne()
- inst._p_jar = jar
- self.failUnless(inst._p_jar is jar)
- inst._p_jar = jar # reassign only to same DM
-
- def test_assign_p_oid_w_invalid_oid(self):
- inst = self._makeOne()
- def _test():
- inst._p_oid = object()
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_oid_w_valid_oid(self):
- OID = '1' * 8
- inst = self._makeOne()
- inst._p_oid = OID
- self.assertEqual(inst._p_oid, OID)
- inst._p_oid = OID # reassign only same OID
-
- def test_assign_p_oid_w_new_oid(self):
- OID1 = '1' * 8
- OID2 = '2' * 8
- inst = self._makeOne()
- inst._p_oid = OID1
- def _test():
- inst._p_oid = OID2
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_serial_w_invalid_serial(self):
- inst = self._makeOne()
- def _test():
- inst._p_serial = object()
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_serial_w_valid_serial(self):
- SERIAL = '1' * 8
- inst = self._makeOne()
- inst._p_serial = SERIAL
- self.assertEqual(inst._p_serial, SERIAL)
- inst._p_serial = None
- self.assertEqual(inst._p_serial, None)
-
- def test_query_p_changed(self):
- inst = self._makeOne()
- self.assertEqual(inst._p_changed, None)
- inst._p_changed = True
- self.assertEqual(inst._p_changed, True)
- inst._p_changed = False
- self.assertEqual(inst._p_changed, False)
-
- def test_assign_p_changed_none_from_new(self):
- inst = self._makeOne()
- inst._p_changed = None
- self.assertEqual(inst._p_status, 'new')
-
- def test_assign_p_changed_true_from_new(self):
- inst = self._makeOne()
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'unsaved')
-
- def test_assign_p_changed_false_from_new(self):
- inst = self._makeOne()
- inst._p_changed = False # activates
- self.assertEqual(inst._p_status, 'saved')
-
- def test_assign_p_changed_none_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- inst._p_changed = None
- # can't transition 'unsaved' -> 'new'
- self.assertEqual(inst._p_status, 'unsaved')
-
- def test_assign_p_changed_true_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'unsaved')
-
- def test_assign_p_changed_false_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- inst._p_changed = False
- self.assertEqual(inst._p_status, 'saved')
-
- def test_assign_p_changed_none_from_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = None
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_true_from_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'changed')
- self.assertEqual(list(jar._loaded), [OID])
- self.assertEqual(list(jar._registered), [OID])
-
- def test_assign_p_changed_false_from_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- self.assertEqual(inst._p_status, 'saved')
- self.assertEqual(list(jar._loaded), [OID])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_none_from_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- jar._loaded = []
- inst._p_changed = None
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_true_from_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'changed')
- self.assertEqual(list(jar._loaded), [OID])
- self.assertEqual(list(jar._registered), [OID])
-
- def test_assign_p_changed_false_from_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- jar._loaded = []
- inst._p_changed = False
- self.assertEqual(inst._p_status, 'saved')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_none_from_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- jar._loaded = []
- jar._registered = []
- inst._p_changed = None
- # assigning None is ignored when dirty
- self.assertEqual(inst._p_status, 'changed')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_true_from_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- jar._loaded = []
- jar._registered = []
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'changed')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_false_from_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- jar._loaded = []
- jar._registered = []
- inst._p_changed = False
- self.assertEqual(inst._p_status, 'saved')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_assign_p_changed_none_when_sticky(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- def _test():
- inst._p_changed = None
- self.assertRaises(ValueError, _test)
-
- def test_delete_p_changed_from_new(self):
- inst = self._makeOne()
- del inst._p_changed
- self.assertEqual(inst._p_status, 'new')
-
- def test_delete_p_changed_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- del inst._p_changed
- self.assertEqual(inst._p_status, 'new')
-
- def test_delete_p_changed_from_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- del inst._p_changed
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_delete_p_changed_from_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- jar._loaded = []
- jar._registered = []
- del inst._p_changed
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_delete_p_changed_from_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- jar._loaded = []
- jar._registered = []
- del inst._p_changed
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test_delete_p_changed_when_sticky(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- def _test():
- del inst._p_changed
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_sticky_true_when_ghost(self):
- inst = self._makeOne()
- def _test():
- inst._p_sticky = True
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_sticky_false_when_ghost(self):
- inst = self._makeOne()
- def _test():
- inst._p_sticky = False
- self.assertRaises(ValueError, _test)
-
- def test_assign_p_sticky_true_non_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- self.failUnless(inst._p_sticky)
-
- def test_assign_p_sticky_false_non_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = False
- self.failIf(inst._p_sticky)
-
- def test__p_status_new(self):
- inst = self._makeOne()
- self.assertEqual(inst._p_status, 'new')
-
- def test__p_status_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'unsaved')
-
- def test__p_status_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- self.assertEqual(inst._p_status, 'ghost')
-
- def test__p_status_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = True
- self.assertEqual(inst._p_status, 'changed')
-
- def test__p_status_changed_sticky(self):
- # 'sticky' is not a state, but a separate flag.
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = True
- inst._p_sticky = True
- self.assertEqual(inst._p_status, 'changed (sticky)')
-
- def test__p_status_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- self.assertEqual(inst._p_status, 'saved')
-
- def test__p_status_saved_sticky(self):
- # 'sticky' is not a state, but a separate flag.
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- self.assertEqual(inst._p_status, 'saved (sticky)')
-
- def test__p_mtime_no_serial(self):
- inst = self._makeOne()
- self.assertEqual(inst._p_mtime, None)
-
- def test__p_mtime_w_serial(self):
- import datetime
- import time
- from persistent.pypersistent import makeTimestamp
- WHEN_TUPLE = (2011, 2, 15, 13, 33, 27.5)
- WHEN = datetime.datetime(*WHEN_TUPLE)
- inst, jar, OID = self._makeOneWithJar()
- inst._p_serial = makeTimestamp(*WHEN_TUPLE)
- self.assertEqual(inst._p_mtime, time.mktime(WHEN.timetuple()))
-
- def test__p_state_new(self):
- inst = self._makeOne()
- self.assertEqual(inst._p_state, 0)
-
- def test__p_state_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- self.assertEqual(inst._p_state, 0)
-
- def test__p_state_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- self.assertEqual(inst._p_state, -1)
-
- def test__p_state_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = True
- self.assertEqual(inst._p_state, 1)
-
- def test__p_state_changed_sticky(self):
- # 'sticky' is not a state, but a separate flag.
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = True
- inst._p_sticky = True
- self.assertEqual(inst._p_state, 2)
-
- def test__p_state_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- self.assertEqual(inst._p_state, 0)
-
- def test__p_state_saved_sticky(self):
- # 'sticky' is not a state, but a separate flag.
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- self.assertEqual(inst._p_state, 2)
-
- def test_query_p_estimated_size(self):
- inst = self._makeOne()
- self.assertEqual(inst._p_estimated_size, 0)
-
- def test_assign_p_estimated_size(self):
- # XXX at the moment, we don't store this value.
- inst = self._makeOne()
- inst._p_estimated_size = 123
- self.assertEqual(inst._p_estimated_size, 0)
-
- def test___getattribute___p__names(self):
- NAMES = ['_p_jar',
- '_p_oid',
- '_p_changed',
- '_p_serial',
- '_p_mtime',
- '_p_state',
- '_p_estimated_size',
- '_p_sticky',
- '_p_status',
- ]
- inst, jar, OID = self._makeOneWithJar()
- jar._cache._mru = []
- for name in NAMES:
- getattr(inst, name)
- self.assertEqual(jar._cache._mru, [])
-
- def test___getattribute__special_name(self):
- from persistent.pypersistent import SPECIAL_NAMES
- inst, jar, OID = self._makeOneWithJar()
- jar._cache._mru = []
- for name in SPECIAL_NAMES:
- getattr(inst, name, None)
- self.assertEqual(jar._cache._mru, [])
-
- def test___getattribute__normal_name_from_new(self):
- class Derived(self._getTargetClass()):
- normal = 'value'
- inst = Derived()
- self.assertEqual(getattr(inst, 'normal', None), 'value')
-
- def test___getattribute__normal_name_from_unsaved(self):
- class Derived(self._getTargetClass()):
- normal = 'value'
- inst = Derived()
- inst._p_changed = True
- self.assertEqual(getattr(inst, 'normal', None), 'value')
-
- def test___getattribute__normal_name_from_ghost(self):
- class Derived(self._getTargetClass()):
- normal = 'value'
- inst, jar, OID = self._makeOneWithJar(Derived)
- jar._cache._mru = []
- self.assertEqual(getattr(inst, 'normal', None), 'value')
- self.assertEqual(jar._cache._mru, [OID])
-
- def test___getattribute__normal_name_from_saved(self):
- class Derived(self._getTargetClass()):
- normal = 'value'
- inst, jar, OID = self._makeOneWithJar(Derived)
- inst._p_changed = False
- jar._cache._mru = []
- self.assertEqual(getattr(inst, 'normal', None), 'value')
- self.assertEqual(jar._cache._mru, [OID])
-
- def test___getattribute__normal_name_from_changed(self):
- class Derived(self._getTargetClass()):
- normal = 'value'
- inst, jar, OID = self._makeOneWithJar(Derived)
- inst._p_changed = True
- jar._cache._mru = []
- self.assertEqual(getattr(inst, 'normal', None), 'value')
- self.assertEqual(jar._cache._mru, [OID])
-
- def test___setattr___p__names(self):
- inst, jar, OID = self._makeOneWithJar()
- NAMES = [('_p_jar', jar),
- ('_p_oid', OID),
- ('_p_changed', False),
- ('_p_serial', '\x01' * 8),
- ('_p_estimated_size', 0),
- ('_p_sticky', False),
- ]
- jar._cache._mru = []
- for name, value in NAMES:
- setattr(inst, name, value)
- self.assertEqual(jar._cache._mru, [])
-
- def test___setattr__normal_name_from_new(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst = Derived()
- setattr(inst, 'normal', 'after')
- self.assertEqual(getattr(inst, 'normal', None), 'after')
-
- def test___setattr__normal_name_from_unsaved(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst = Derived()
- inst._p_changed = True
- setattr(inst, 'normal', 'after')
- self.assertEqual(getattr(inst, 'normal', None), 'after')
-
- def test___setattr__normal_name_from_ghost(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst, jar, OID = self._makeOneWithJar(Derived)
- jar._cache._mru = []
- setattr(inst, 'normal', 'after')
- self.assertEqual(jar._cache._mru, [OID])
- self.assertEqual(jar._registered, [OID])
- self.assertEqual(getattr(inst, 'normal', None), 'after')
-
- def test___setattr__normal_name_from_saved(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst, jar, OID = self._makeOneWithJar(Derived)
- inst._p_changed = False
- jar._cache._mru = []
- setattr(inst, 'normal', 'after')
- self.assertEqual(jar._cache._mru, [OID])
- self.assertEqual(jar._registered, [OID])
- self.assertEqual(getattr(inst, 'normal', None), 'after')
-
- def test___setattr__normal_name_from_changed(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst, jar, OID = self._makeOneWithJar(Derived)
- inst._p_changed = True
- jar._cache._mru = []
- jar._registered = []
- setattr(inst, 'normal', 'after')
- self.assertEqual(jar._cache._mru, [OID])
- self.assertEqual(jar._registered, [OID])
- self.assertEqual(getattr(inst, 'normal', None), 'after')
-
- def test___delattr___p__names(self):
- inst, jar, OID = self._makeOneWithJar()
- jar._cache._mru = []
- jar._registered = []
- delattr(inst, '_p_changed') #only del-able _p_ attribute.
- self.assertEqual(jar._cache._mru, [])
- self.assertEqual(jar._registered, [])
-
- def test___delattr__normal_name_from_new(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst = Derived()
- setattr(inst, 'normal', 'after')
- delattr(inst, 'normal')
- self.assertEqual(getattr(inst, 'normal', None), 'before')
-
- def test___delattr__normal_name_from_unsaved(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst = Derived()
- inst._p_changed = True
- setattr(inst, 'normal', 'after')
- delattr(inst, 'normal')
- self.assertEqual(getattr(inst, 'normal', None), 'before')
-
- def test___delattr__normal_name_from_ghost(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst, jar, OID = self._makeOneWithJar(Derived)
- setattr(inst, 'normal', 'after')
- jar._cache._mru = []
- jar._registered = []
- delattr(inst, 'normal')
- self.assertEqual(jar._cache._mru, [OID])
- self.assertEqual(jar._registered, [OID])
- self.assertEqual(getattr(inst, 'normal', None), 'before')
-
- def test___delattr__normal_name_from_saved(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst, jar, OID = self._makeOneWithJar(Derived)
- setattr(inst, 'normal', 'after')
- inst._p_changed = False
- jar._cache._mru = []
- jar._registered = []
- delattr(inst, 'normal')
- self.assertEqual(jar._cache._mru, [OID])
- self.assertEqual(jar._registered, [OID])
- self.assertEqual(getattr(inst, 'normal', None), 'before')
-
- def test___delattr__normal_name_from_changed(self):
- class Derived(self._getTargetClass()):
- normal = 'before'
- inst, jar, OID = self._makeOneWithJar(Derived)
- setattr(inst, 'normal', 'after')
- inst._p_changed = True
- jar._cache._mru = []
- jar._registered = []
- delattr(inst, 'normal')
- self.assertEqual(jar._cache._mru, [OID])
- self.assertEqual(jar._registered, [OID])
- self.assertEqual(getattr(inst, 'normal', None), 'before')
-
- def test___getstate__(self):
- inst = self._makeOne()
- self.assertEqual(inst.__getstate__(), ())
-
- def test___setstate___empty(self):
- inst = self._makeOne()
- inst.__setstate__(()) # doesn't raise, but doesn't change anything
-
- def test___setstate___nonempty(self):
- inst = self._makeOne()
- self.assertRaises(ValueError, inst.__setstate__, {'bogus': 1})
- self.assertEqual(inst._p_jar, None)
- self.assertEqual(inst._p_oid, None)
- self.assertEqual(inst._p_serial, None)
- self.assertEqual(inst._p_changed, None)
- self.assertEqual(inst._p_sticky, False)
-
- def test___reduce__(self):
- inst = self._makeOne()
- first, second = inst.__reduce__()
- self.assertEqual(first, (self._getTargetClass(),))
- self.assertEqual(second, ())
-
- def test___reduce__w_subclass_having_getstate(self):
- class Derived(self._getTargetClass()):
- def __getstate__(self):
- return {}
- inst = Derived()
- first, second = inst.__reduce__()
- self.assertEqual(first, (Derived,))
- self.assertEqual(second, {})
-
- def test___reduce__w_subclass_having_gna_and_getstate(self):
- class Derived(self._getTargetClass()):
- def __getnewargs__(self):
- return ('a', 'b')
- def __getstate__(self):
- return {'foo': 'bar'}
- inst = Derived()
- first, second = inst.__reduce__()
- self.assertEqual(first, (Derived, 'a', 'b'))
- self.assertEqual(second, {'foo': 'bar'})
-
- def test__p_activate_from_new(self):
- inst = self._makeOne()
- inst._p_activate()
- self.assertEqual(inst._p_status, 'saved')
-
- def test__p_activate_from_saved(self):
- inst = self._makeOne()
- inst._p_changed = False
- inst._p_activate() # noop from 'unsaved' state
- self.assertEqual(inst._p_status, 'saved')
-
- def test__p_activate_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- inst._p_activate() # noop from 'saved' state
- self.assertEqual(inst._p_status, 'unsaved')
-
- def test__p_deactivate_from_new(self):
- inst = self._makeOne()
- inst._p_deactivate()
- self.assertEqual(inst._p_status, 'new')
-
- def test__p_deactivate_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- inst._p_deactivate()
- # can't transition 'unsaved' -> 'new'
- self.assertEqual(inst._p_status, 'unsaved')
-
- def test__p_deactivate_from_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_deactivate()
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test__p_deactivate_from_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- jar._loaded = []
- inst._p_deactivate()
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test__p_deactivate_from_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- jar._loaded = []
- jar._registered = []
- inst._p_deactivate()
- # assigning None is ignored when dirty
- self.assertEqual(inst._p_status, 'changed')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test__p_deactivate_when_sticky(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- self.assertRaises(ValueError, inst._p_deactivate)
-
- def test__p_invalidate_from_new(self):
- inst = self._makeOne()
- inst._p_invalidate()
- self.assertEqual(inst._p_status, 'new')
-
- def test__p_invalidate_from_unsaved(self):
- inst = self._makeOne()
- inst._p_changed = True
- inst._p_invalidate()
- self.assertEqual(inst._p_status, 'new')
-
- def test__p_invalidate_from_ghost(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_invalidate()
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test__p_invalidate_from_saved(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- jar._loaded = []
- jar._registered = []
- inst._p_invalidate()
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test__p_invalidate_from_changed(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_activate()
- inst._p_changed = True
- jar._loaded = []
- jar._registered = []
- inst._p_invalidate()
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._registered), [])
-
- def test__p_invalidate_when_sticky(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = False
- inst._p_sticky = True
- self.assertRaises(ValueError, inst._p_invalidate)
-
- def test__p_getattr_w__p__names(self):
- NAMES = ['_p_jar',
- '_p_oid',
- '_p_changed',
- '_p_serial',
- '_p_mtime',
- '_p_state',
- '_p_estimated_size',
- '_p_sticky',
- '_p_status',
- ]
- inst, jar, OID = self._makeOneWithJar()
- for name in NAMES:
- self.failUnless(inst._p_getattr(name))
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._cache._mru), [])
-
- def test__p_getattr_w_special_names(self):
- from persistent.pypersistent import SPECIAL_NAMES
- inst, jar, OID = self._makeOneWithJar()
- for name in SPECIAL_NAMES:
- self.failUnless(inst._p_getattr(name))
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._cache._mru), [])
-
- def test__p_getattr_w_normal_name(self):
- inst, jar, OID = self._makeOneWithJar()
- self.failIf(inst._p_getattr('normal'))
- self.assertEqual(inst._p_status, 'saved')
- self.assertEqual(list(jar._loaded), [OID])
- self.assertEqual(list(jar._cache._mru), [OID])
-
- def test__p_setattr_w__p__name(self):
- inst, jar, OID = self._makeOneWithJar()
- self.failUnless(inst._p_setattr('_p_serial', '1' * 8))
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(inst._p_serial, '1' * 8)
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._cache._mru), [])
-
- def test__p_setattr_w_normal_name(self):
- inst, jar, OID = self._makeOneWithJar()
- self.failIf(inst._p_setattr('normal', 'value'))
- self.assertEqual(inst._p_status, 'saved')
- self.assertEqual(list(jar._loaded), [OID])
- self.assertEqual(list(jar._cache._mru), [OID])
-
- def test__p_delattr_w__p__name(self):
- inst, jar, OID = self._makeOneWithJar()
- inst._p_changed = True
- jar._loaded = []
- self.failUnless(inst._p_delattr('_p_changed'))
- self.assertEqual(inst._p_status, 'ghost')
- self.assertEqual(inst._p_changed, None)
- self.assertEqual(list(jar._loaded), [])
- self.assertEqual(list(jar._cache._mru), [])
-
- def test__p_delattr_w_normal_name(self):
- inst, jar, OID = self._makeOneWithJar()
- self.failIf(inst._p_delattr('normal'))
- self.assertEqual(inst._p_status, 'saved')
- self.assertEqual(list(jar._loaded), [OID])
- self.assertEqual(list(jar._cache._mru), [OID])
More information about the Zodb-checkins
mailing list