[Zodb-checkins] SVN: ZODB/branches/tseaver-python_picklecache-2/src/persistent/ Snapshot of work toward implementing IPersistent in pure Python.
Tres Seaver
tseaver at palladion.com
Mon Feb 14 18:27:40 EST 2011
Log message for revision 120337:
Snapshot of work toward implementing IPersistent in pure Python.
No attempt yet to do the metaclass work, including integration with the
pickle cache.
Changed:
A ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py
A ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py
-=-
Added: ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py (rev 0)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/pypersistent.py 2011-02-14 23:27:40 UTC (rev 120337)
@@ -0,0 +1,181 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import sys
+
+from zope.interface import implements
+
+from persistent.interfaces import IPersistent
+from persistent.interfaces import IPersistentDataManager
+
+if sys.version_info < (2.6,):
+ OID_TYPE = SERIAL_TYPE = str
+else:
+ OID_TYPE = SERIAL_TYPE = bytes
+
+_CHANGED = 0x0001
+_STICKY = 0x0002
+
+class Persistent(object):
+ __slots__ = ('__jar', '__oid', '__serial', '__flags')
+ implements(IPersistent)
+
+ def __new__(cls):
+ inst = super(Persistent, cls).__new__(cls)
+ inst.__jar = inst.__oid = inst.__serial = None
+ inst.__flags = None
+ return inst
+
+ # _p_jar: see IPersistent.
+ def _get_jar(self):
+ return self.__jar
+
+ def _set_jar(self, value):
+ if value is self.__jar:
+ return
+ if self.__jar is not None:
+ raise ValueError('Already assigned a data manager')
+ if not IPersistentDataManager.providedBy(value):
+ raise ValueError('Not a data manager: %s' % value)
+ self.__jar = value
+ _p_jar = property(_get_jar, _set_jar)
+
+ # _p_oid: see IPersistent.
+ def _get_oid(self):
+ return self.__oid
+
+ def _set_oid(self, value):
+ if value == self.__oid:
+ return
+ if value is not None:
+ if not isinstance(value, OID_TYPE):
+ raise ValueError('Invalid OID type: %s' % value)
+ if self.__oid is not None:
+ raise ValueError('Already assigned an OID')
+ self.__oid = value
+
+ _p_oid = property(_get_oid, _set_oid)
+
+ # _p_serial: see IPersistent.
+ def _get_serial(self):
+ return self.__serial
+
+ def _set_serial(self, value):
+ if value is not None:
+ if not isinstance(value, SERIAL_TYPE):
+ raise ValueError('Invalid SERIAL type: %s' % value)
+ self.__serial = value
+
+ _p_serial = property(_get_serial, _set_serial)
+
+ # _p_changed: see IPersistent.
+ def _get_changed(self):
+ if self.__flags is None: # ghost
+ return None
+ return self.__flags & _CHANGED
+
+ def _set_changed(self, value):
+ if self.__flags is None:
+ if value is not None:
+ self._p_activate()
+ self._set_changed_flag(value)
+ else:
+ if value is None: # -> ghost
+ if self.__flags & _STICKY:
+ raise ValueError('Sticky')
+ if not self.__flags & _CHANGED:
+ self._p_invalidate()
+ else:
+ self._set_changed_flag(value)
+
+ def _del_changed(self):
+ self._set_changed(None)
+
+ _p_changed = property(_get_changed, _set_changed, _del_changed)
+
+ # The '_p_sticky' property is not (yet) part of the API: for now,
+ # it exists to simplify debugging and testing assertions.
+ def _get_sticky(self):
+ if self.__flags is None:
+ return False
+ return self.__flags & _STICKY
+ def _set_sticky(self, value):
+ if self.__flags is None:
+ raise ValueError('Ghost')
+ if value:
+ self.__flags |= _STICKY
+ else:
+ self.__flags &= ~_STICKY
+ _p_sticky = property(_get_sticky, _set_sticky)
+
+ # The '_p_state' property is not (yet) part of the API: for now,
+ # it exists to simplify debugging and testing assertions.
+ def _get_state(self):
+ if self.__flags is None:
+ if self.__jar is None:
+ return 'new'
+ return 'ghost'
+ if self.__flags & _CHANGED:
+ if self.__jar is None:
+ return 'unsaved'
+ result = 'changed'
+ else:
+ result = 'saved'
+ if self.__flags & _STICKY:
+ return '%s (sticky)' % result
+ return result
+
+ _p_state = property(_get_state)
+
+ def __getstate__(self):
+ """ See IPersistent.
+ """
+ return {}
+
+ def __setstate__(self, state):
+ """ See IPersistent.
+ """
+ if state != {}:
+ raise ValueError('No state allowed on base Persistent class')
+
+ def _p_activate(self):
+ """ See IPersistent.
+ """
+ if self.__flags is None:
+ self.__flags = 0
+ if self.__jar is not None and self.__oid is not None:
+ self.__jar.setstate(self)
+
+ def _p_deactivate(self):
+ """ See IPersistent.
+ """
+
+ def _p_invalidate(self):
+ """ See IPersistent.
+ """
+ # XXX check
+ self.__flags = None
+
+ # Helper methods: not APIs
+ def _register(self):
+ if self.__jar is not None and self.__oid is not None:
+ self.__jar.register(self)
+
+ def _set_changed_flag(self, value):
+ if value:
+ before = self.__flags
+ self.__flags |= _CHANGED
+ if before != self.__flags:
+ self._register()
+ else:
+ self.__flags &= ~_CHANGED
Added: ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py
===================================================================
--- ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py (rev 0)
+++ ZODB/branches/tseaver-python_picklecache-2/src/persistent/tests/test_pypersistent.py 2011-02-14 23:27:40 UTC (rev 120337)
@@ -0,0 +1,395 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+
+class PersistentTests(unittest.TestCase):
+
+ def _getTargetClass(self):
+ from persistent.pypersistent import Persistent
+ return Persistent
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def _makeJar(self):
+ from zope.interface import implements
+ from persistent.interfaces import IPersistentDataManager
+ class _Jar(object):
+ implements(IPersistentDataManager)
+ def __init__(self):
+ self._loaded = []
+ self._registered = []
+ def setstate(self, obj):
+ self._loaded.append(obj._p_oid)
+ def register(self, obj):
+ self._registered.append(obj._p_oid)
+ return _Jar()
+
+ def _makeOneWithJar(self, *args, **kw):
+ OID = '1' * 8
+ inst = self._makeOne()
+ jar = self._makeJar()
+ inst._p_jar = jar
+ inst._p_oid = OID
+ return inst, jar, OID
+
+ def test_class_conforms_to_IPersistent(self):
+ from zope.interface.verify import verifyClass
+ from persistent.interfaces import IPersistent
+ verifyClass(IPersistent, self._getTargetClass())
+
+ def test_instance_conforms_to_IPersistent(self):
+ from zope.interface.verify import verifyObject
+ from persistent.interfaces import IPersistent
+ verifyObject(IPersistent, self._makeOne())
+
+ def test_ctor(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_jar, None)
+ self.assertEqual(inst._p_oid, None)
+ self.assertEqual(inst._p_serial, None)
+ self.assertEqual(inst._p_changed, None)
+ self.assertEqual(inst._p_sticky, False)
+
+ def test_assign_p_jar_w_invalid_jar(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_jar = object()
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_jar_w_new_jar(self):
+ inst = self._makeOne()
+ inst._p_jar = self._makeJar()
+ jar = self._makeJar()
+ def _test():
+ inst._p_jar = jar
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_jar_w_valid_jar(self):
+ jar = self._makeJar()
+ inst = self._makeOne()
+ inst._p_jar = jar
+ self.failUnless(inst._p_jar is jar)
+ inst._p_jar = jar # reassign only to same DM
+
+ def test_assign_p_oid_w_invalid_oid(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_oid = object()
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_oid_w_valid_oid(self):
+ OID = '1' * 8
+ inst = self._makeOne()
+ inst._p_oid = OID
+ self.assertEqual(inst._p_oid, OID)
+ inst._p_oid = OID # reassign only same OID
+
+ def test_assign_p_oid_w_new_oid(self):
+ OID1 = '1' * 8
+ OID2 = '2' * 8
+ inst = self._makeOne()
+ inst._p_oid = OID1
+ def _test():
+ inst._p_oid = OID2
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_serial_w_invalid_serial(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_serial = object()
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_serial_w_valid_serial(self):
+ SERIAL = '1' * 8
+ inst = self._makeOne()
+ inst._p_serial = SERIAL
+ self.assertEqual(inst._p_serial, SERIAL)
+ inst._p_serial = None
+ self.assertEqual(inst._p_serial, None)
+
+ def test_query_p_changed(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_changed, None)
+ inst._p_changed = True
+ self.assertEqual(inst._p_changed, True)
+ inst._p_changed = False
+ self.assertEqual(inst._p_changed, False)
+
+ def test_assign_p_changed_none_from_new(self):
+ inst = self._makeOne()
+ inst._p_changed = None
+ self.assertEqual(inst._p_state, 'new')
+
+ def test_assign_p_changed_true_from_new(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'unsaved')
+
+ def test_assign_p_changed_false_from_new(self):
+ inst = self._makeOne()
+ inst._p_changed = False # activates
+ self.assertEqual(inst._p_state, 'saved')
+
+ def test_assign_p_changed_none_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_changed = None
+ # can't transition 'unsaved' -> 'new'
+ self.assertEqual(inst._p_state, 'unsaved')
+
+ def test_assign_p_changed_true_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'unsaved')
+
+ def test_assign_p_changed_false_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_changed = False
+ self.assertEqual(inst._p_state, 'saved')
+
+ def test_assign_p_changed_none_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = None
+ self.assertEqual(inst._p_state, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_true_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'changed')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._registered), [OID])
+
+ def test_assign_p_changed_false_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ self.assertEqual(inst._p_state, 'saved')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_none_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ inst._p_changed = None
+ self.assertEqual(inst._p_state, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_true_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'changed')
+ self.assertEqual(list(jar._loaded), [OID])
+ self.assertEqual(list(jar._registered), [OID])
+
+ def test_assign_p_changed_false_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ inst._p_changed = False
+ self.assertEqual(inst._p_state, 'saved')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_none_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_changed = None
+ # assigning None is ignored when dirty
+ self.assertEqual(inst._p_state, 'changed')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_true_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'changed')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_false_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ inst._p_changed = False
+ self.assertEqual(inst._p_state, 'saved')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_assign_p_changed_none_when_sticky(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ def _test():
+ inst._p_changed = None
+ self.assertRaises(ValueError, _test)
+
+ def test_delete_p_changed_from_new(self):
+ inst = self._makeOne()
+ del inst._p_changed
+ self.assertEqual(inst._p_state, 'new')
+
+ def test_delete_p_changed_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ del inst._p_changed
+ # can't transition 'unsaved' -> 'new'
+ self.assertEqual(inst._p_state, 'unsaved')
+
+ def test_delete_p_changed_from_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ del inst._p_changed
+ self.assertEqual(inst._p_state, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_delete_p_changed_from_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ jar._loaded = []
+ jar._registered = []
+ del inst._p_changed
+ self.assertEqual(inst._p_state, 'ghost')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_delete_p_changed_from_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_activate()
+ inst._p_changed = True
+ jar._loaded = []
+ jar._registered = []
+ del inst._p_changed
+ # del is ignored when dirty
+ self.assertEqual(inst._p_state, 'changed')
+ self.assertEqual(list(jar._loaded), [])
+ self.assertEqual(list(jar._registered), [])
+
+ def test_delete_p_changed_when_sticky(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ def _test():
+ del inst._p_changed
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_sticky_true_when_ghost(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_sticky = True
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_sticky_false_when_ghost(self):
+ inst = self._makeOne()
+ def _test():
+ inst._p_sticky = False
+ self.assertRaises(ValueError, _test)
+
+ def test_assign_p_sticky_true_non_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.failUnless(inst._p_sticky)
+
+ def test_assign_p_sticky_false_non_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = False
+ self.failIf(inst._p_sticky)
+
+ def test__p_state_new(self):
+ inst = self._makeOne()
+ self.assertEqual(inst._p_state, 'new')
+
+ def test__p_state_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'unsaved')
+
+ def test__p_state_ghost(self):
+ inst, jar, OID = self._makeOneWithJar()
+ self.assertEqual(inst._p_state, 'ghost')
+
+ def test__p_state_changed(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ self.assertEqual(inst._p_state, 'changed')
+
+ def test__p_state_changed_sticky(self):
+ # 'sticky' is not a state, but a separate flag.
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = True
+ inst._p_sticky = True
+ self.assertEqual(inst._p_state, 'changed (sticky)')
+
+ def test__p_state_saved(self):
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ self.assertEqual(inst._p_state, 'saved')
+
+ def test__p_state_saved_sticky(self):
+ # 'sticky' is not a state, but a separate flag.
+ inst, jar, OID = self._makeOneWithJar()
+ inst._p_changed = False
+ inst._p_sticky = True
+ self.assertEqual(inst._p_state, 'saved (sticky)')
+
+ def test___getstate__(self):
+ inst = self._makeOne()
+ self.assertEqual(inst.__getstate__(), {})
+
+ def test___setstate___empty(self):
+ inst = self._makeOne()
+ inst.__setstate__({}) # doesn't raise, but doesn't change anything
+
+ def test___setstate___nonempty(self):
+ inst = self._makeOne()
+ self.assertRaises(ValueError, inst.__setstate__, {'bogus': 1})
+ self.assertEqual(inst._p_jar, None)
+ self.assertEqual(inst._p_oid, None)
+ self.assertEqual(inst._p_serial, None)
+ self.assertEqual(inst._p_changed, None)
+ self.assertEqual(inst._p_sticky, False)
+
+ def test__p_activate_from_new(self):
+ inst = self._makeOne()
+ inst._p_activate()
+ self.assertEqual(inst._p_state, 'saved')
+
+ def test__p_activate_from_saved(self):
+ inst = self._makeOne()
+ inst._p_changed = False
+ inst._p_activate() # noop from 'unsaved' state
+ self.assertEqual(inst._p_state, 'saved')
+
+ def test__p_activate_from_unsaved(self):
+ inst = self._makeOne()
+ inst._p_changed = True
+ inst._p_activate() # noop from 'saved' state
+ self.assertEqual(inst._p_state, 'unsaved')
More information about the Zodb-checkins
mailing list