[ZODB-Dev] Ape newbie
Adam Groszer
adamg at mailbox.hu
Tue May 27 13:01:07 EDT 2003
Shane,
I'm fighting on, but stuck again.
My current problem is that myClassifier.store hits _before_ .classifyObject
with event=StoreEvent, classification=None.
event._keychain=()
And in turn I get the following stack trace:
Traceback (most recent call last):
File "P:\PMSpy\test\apetest3.py", line 328, in ?
x.testStoreAndLoad()
File "P:\PMSpy\test\apetest3.py", line 283, in testStoreAndLoad
get_transaction().commit()
File "C:\Python22\lib\site-packages\ZODB\Transaction.py", line 272, in
commit
ncommitted += self._commit_objects(objects)
File "C:\Python22\lib\site-packages\ZODB\Transaction.py", line 388, in
_commit
_objects
jar.commit(o, self)
File "C:\Python22\lib\site-packages\apelib\zodb3\connection.py", line 281,
in
commit
s=dbstore(oid,serial,p,version,transaction)
File "C:\Python22\lib\site-packages\apelib\zodb3\storage.py", line 224, in
sto
re
cfr.store(event, classification)
File "P:\PMSpy\test\apetest3.py", line 81, in store
return self.gw.store(event, classification)
File "C:\Python22\lib\site-packages\apelib\sql\classification.py", line
52, in
store
key = long(event.getKey())
File "C:\Python22\lib\site-packages\apelib\core\events.py", line 68, in
getKey
return self._keychain[-1]
IndexError: tuple index out of range
Am I doing something wrong?
Adam
-------------- next part --------------
import ZODB
from Persistence import PersistentMapping
from Persistence import Persistent
from cPickle import dumps, loads
from apelib.core.classifiers import FixedClassifier
from apelib.core.interfaces import ISerializer
from apelib.core.gateways import CompositeGateway, MappingGateway
from apelib.core.mapper import Mapper
from apelib.core.schemas import RowSequenceSchema, FieldSchema
from apelib.core.serializers import CompositeSerializer, StringDataAttribute
from apelib.zodb3.serializers \
import FixedPersistentMapping, RollCall, RemainingState
from apelib.sql.sqlbase import SQLGatewayBase
from apelib.sql.dbapi import DBAPIConnector
from apelib.zodb3.db import ApeDB
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
from apelib.zodb3.utils import copyOf
from apelib.sql.classification import SQLClassification
from apelib.sql.structure import SQLFolderItems
from apelib.sql.properties import SQLProperties
from apelib.sql.keygen import SQLKeychainGenerator
from apelib.core.interfaces import IClassifier
from apedummy import dummy
TESTKEY = 10
TEST2KEY = 11
class myClassifier:
__implements__ = IClassifier
def __init__(self):
self._klass_to_map = {}
self._default_res = None
def setGateway(self, gw):
self.gw = gw
def _toname(self, klass):
ci = '%s:%s' % (klass.__module__, klass.__name__)
return ci
def register(self, klass, mapper_name):
ci=self._toname(klass)
self._klass_to_map[ci] = mapper_name
def registerDefault(self, mapper_name):
self._default_res = mapper_name
def getResult(self, k):
res = self._klass_to_map.get(k)
if res is None:
res = self._default_res
if res is None:
raise KeyError("Class %s is not known to myClassifier %s" %
(k, repr(self)))
return res
def classifyObject(self, value, keychain):
klass = value.__class__
mt = klass.__name__
ci = self._toname(klass)
classification = {'meta_type': mt, 'class_name': ci}
return classification, self.getResult(ci)
def classifyState(self, event):
classification, serial = self.gw.load(event)
ci = classification.get('class_name')
return classification, self.getResult(ci)
def store(self, event, classification):
#if classification:
return self.gw.store(event, classification)
class myGW(SQLGatewayBase):
__implements__ = SQLGatewayBase.__implements__
schema = FieldSchema('value', 'int')
table_base_name = 'value_data'
column_defs = (
('value', 'int', 0),
)
def load(self, event):
key = long(event.getKey())
items = self.execute(event, 'read', 1, key=key)
if items:
state = items[0][0]
else:
state = ''
return state, state
def store(self, event, state):
key = long(event.getKey())
items = self.execute(event, 'read', 1, key=key)
col_name = self.column_defs[0][0]
conn = event.getConnection(self.conn_name)
kw = {'key': key, col_name: conn.asBinary(state)}
if items:
# update.
self.execute(event, 'update', **kw)
else:
# insert.
self.execute(event, 'insert', **kw)
return state
class mySerializer:
__implements__ = ISerializer
schema = FieldSchema('value', 'int')
def getSchema(self):
return self.schema
def serialize(self, object, event):
attrname = 'value'
state = getattr(object, attrname)
assert type(state) == int or type(state) == long
event.notifySerialized(attrname, state, 1)
return state
def deserialize(self, object, event, state):
attrname = 'value'
assert type(state) == int or type(state) == long
setattr(object, attrname, state)
event.notifyDeserialized(attrname, state)
class SerialTestBase:
def setUp(self):
self.conns = {}
classifier = myClassifier()
#classifier = FixedClassifier()
classifier.register(dummy, 'test_mapper')
#a
class_gw=SQLClassification()
classifier.setGateway(class_gw)
ser2 = CompositeSerializer('Persistence', 'PersistentMapping')
fixed_items_serializer = FixedPersistentMapping()
fixed_items_serializer.add('TestRoot', (TESTKEY,), ('test_mapper',))
ser2.addSerializer('fixed_items', fixed_items_serializer)
ser2.addSerializer('roll_call', RollCall())
root_mapper = Mapper(None, ser2, CompositeGateway(), classifier)
self.root_mapper = root_mapper
#a
keychain_gen = SQLKeychainGenerator()
root_mapper.setKeychainGenerator(keychain_gen)
# Create "test_mapper", which allows a "strdata" attribute.
#ser1 = CompositeSerializer('Persistence', 'PersistentMapping')
#items_serializer = SimpleItemsSerializer()
#ser1.addSerializer('items', items_serializer)
#props_serializer = StringDataAttribute('strdata')
#ser1.addSerializer('properties', props_serializer)
#ser1.addSerializer('roll_call', RollCall())
ser1 = CompositeSerializer('apedummy', 'dummy')
my_serializer = mySerializer()
ser1.addSerializer('value', my_serializer)
#props_serializer = StringDataAttribute('strdata')
#ser1.addSerializer('properties', props_serializer)
ser1.addSerializer('roll_call', RollCall())
gw1 = CompositeGateway()
#a
my_gw = myGW()
#items_gw = MappingGateway(items_serializer.getSchema())
self.my_gw = my_gw
gw1.addGateway('value', my_gw)
#a
#classifier.setGateway(gw1)
om1 = Mapper(root_mapper, ser1, gw1)
self.om1 = om1
root_mapper.addSubMapper('test_mapper', om1)
for initializer in (
my_gw,
class_gw,
keychain_gen):
root_mapper.addInitializer(initializer)
# Create "test_mapper_2", which stores a remainder.
#ser = CompositeSerializer('Persistence', 'PersistentMapping')
#items_serializer = SimpleItemsSerializer()
#ser.addSerializer('items', items_serializer)
#remainder_serializer = RemainingState()
#ser.addSerializer('remainder', remainder_serializer)
#
#gw = CompositeGateway()
#items_gw = MappingGateway(items_serializer.getSchema())
#gw.addGateway('items', items_gw)
#remainder_gw = MappingGateway(remainder_serializer.getSchema())
#gw.addGateway('remainder', remainder_gw)
#
#om = Mapper(root_mapper, ser, gw)
#root_mapper.addSubMapper('test_mapper_2', om)
root_mapper.checkConfiguration()
def tearDown(self):
pass
class ApeStorageTests (SerialTestBase):
# Tests of ApeStorage and ApeConnection.
dbapi_module = None # Name of the Database API module (required)
dbapi_params = () # Positional args for connect()
dbapi_kwparams = {} # Keyword args for connect()
def getConnector(self):
return DBAPIConnector(self.dbapi_module, self.dbapi_params,
self.dbapi_kwparams, prefix='test_temp')
def setUp(self):
SerialTestBase.setUp(self)
conn = self.getConnector()
resource = StaticResource(self.root_mapper)
self.conns = {'db': conn}
storage = ApeStorage(resource, self.conns, clear_all=1)
self.storage = storage
db = ApeDB(storage, resource)
self.db = db
def clear(self):
self.storage.initDatabases(clear_all=1)
for conn in self.conns.values():
conn.db.commit()
def tearDown(self):
get_transaction().abort()
self.clear()
self.db.close()
SerialTestBase.tearDown(self)
def testStoreAndLoad(self):
ob = PersistentMapping()
#ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
#dummy = PersistentMapping()
dum = dummy()
dum.setvalue(33)
conn1 = self.db.open()
conn2 = None
conn3 = None
try:
# Load the root and create a new object
root = conn1.root()
get_transaction().begin()
#tr=root['TestRoot']
#tr2=root['TestRoot2']
#print tr
#print tr.getvalue()
root['TestRoot'] = dum
#root['TestRoot2'] = dummy
get_transaction().commit()
ob1 = conn1.loadStub((TESTKEY,))
#self.assertEqual(ob1.strdata, ob.strdata)
#self.assertEqual(ob1.items(), ob.items())
# Verify a new object was stored and make a change
get_transaction().begin()
conn2 = self.db.open()
ob2 = conn2.loadStub((TESTKEY,))
#self.assertEqual(ob2.strdata, ob.strdata)
#self.assertEqual(ob2.items(), ob.items())
#ob2.strdata = '678'
get_transaction().commit()
# Verify the change was stored and make another change
conn3 = self.db.open()
ob3 = conn3.loadStub((TESTKEY,))
#self.assertEqual(ob3.strdata, '678')
#self.assertEqual(ob3.items(), ob.items())
#ob3.strdata = '901'
get_transaction().commit()
conn3.close()
conn3 = None
conn3 = self.db.open()
ob3 = conn3.loadStub((TESTKEY,))
#self.assertEqual(ob3.strdata, '901')
# Verify we didn't accidentally change the original object
#self.assertEqual(ob.strdata, '345')
finally:
conn1.close()
if conn2 is not None:
conn2.close()
if conn3 is not None:
conn3.close()
class MySQLTests (ApeStorageTests):
dbapi_module = 'MySQLdb'
dbapi_kwparams = {'host':'localhost', 'user':'adi', 'passwd':'r', 'db': 'ape'}
if __name__ == '__main__':
x = MySQLTests()
x.setUp()
x.testStoreAndLoad()
x.tearDown()
-------------- next part --------------
import ZODB
from Persistence import PersistentMapping
from Persistence import Persistent
class dummy(Persistent):
value=0
def __init__(self):
print 'olee'
def setvalue(self,v):
self.value = v
def getvalue(self):
return self.value
More information about the ZODB-Dev
mailing list