223 lines
7.9 KiB
Python
223 lines
7.9 KiB
Python
from pytz import utc
|
|
from threading import Thread
|
|
from pysqlite2 import dbapi2 as sqlite
|
|
from datetime import datetime, timedelta
|
|
from Stub_object import Stub_object
|
|
from Stub_cache import Stub_cache
|
|
from model.Persistent import Persistent
|
|
from controller.Database import Database, Connection_wrapper
|
|
|
|
|
|
class Test_database( object ):
|
|
def setUp( self ):
|
|
# make an in-memory sqlite database to use during testing
|
|
self.connection = Connection_wrapper( sqlite.connect( ":memory:", detect_types = sqlite.PARSE_DECLTYPES, check_same_thread = False ) )
|
|
self.cache = Stub_cache()
|
|
cursor = self.connection.cursor()
|
|
cursor.execute( Stub_object.sql_create_table() )
|
|
|
|
self.database = Database( self.connection, self.cache )
|
|
|
|
def tearDown( self ):
|
|
self.database.close()
|
|
|
|
def test_save_and_load( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.save( basic_obj )
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
assert obj.revision.replace( tzinfo = utc ) == original_revision
|
|
assert obj.value == basic_obj.value
|
|
|
|
def test_save_and_load_without_commit( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.save( basic_obj, commit = False )
|
|
self.connection.rollback() # if commit wasn't called, this should back out the save
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj == None
|
|
|
|
def test_save_and_load_with_explicit_commit( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.save( basic_obj, commit = False )
|
|
self.database.commit()
|
|
self.connection.rollback() # should have no effect because of the call to commit
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
assert obj.revision.replace( tzinfo = utc ) == original_revision
|
|
assert obj.value == basic_obj.value
|
|
|
|
def test_select_one( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.save( basic_obj )
|
|
obj = self.database.select_one( Stub_object, Stub_object.sql_load( basic_obj.object_id ) )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
assert obj.revision.replace( tzinfo = utc ) == original_revision
|
|
assert obj.value == basic_obj.value
|
|
|
|
def test_select_one_tuple( self ):
|
|
obj = self.database.select_one( tuple, Stub_object.sql_tuple() )
|
|
|
|
assert len( obj ) == 2
|
|
assert obj[ 0 ] == 1
|
|
assert obj[ 1 ] == 2
|
|
|
|
def test_select_many( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
basic_obj2 = Stub_object( object_id = "6", value = 2 )
|
|
original_revision2 = basic_obj2.revision
|
|
|
|
self.database.save( basic_obj )
|
|
self.database.save( basic_obj2 )
|
|
objs = self.database.select_many( Stub_object, Stub_object.sql_load_em_all() )
|
|
|
|
assert len( objs ) == 2
|
|
assert objs[ 0 ].object_id == basic_obj.object_id
|
|
assert objs[ 0 ].revision.replace( tzinfo = utc ) == original_revision
|
|
assert objs[ 0 ].value == basic_obj.value
|
|
assert objs[ 1 ].object_id == basic_obj2.object_id
|
|
assert objs[ 1 ].revision.replace( tzinfo = utc ) == original_revision2
|
|
assert objs[ 1 ].value == basic_obj2.value
|
|
|
|
def test_select_many_tuples( self ):
|
|
objs = self.database.select_many( tuple, Stub_object.sql_tuple() )
|
|
|
|
assert len( objs ) == 1
|
|
assert len( objs[ 0 ] ) == 2
|
|
assert objs[ 0 ][ 0 ] == 1
|
|
assert objs[ 0 ][ 1 ] == 2
|
|
|
|
def test_select_many_with_no_matches( self ):
|
|
objs = self.database.select_many( Stub_object, Stub_object.sql_load_em_all() )
|
|
|
|
assert len( objs ) == 0
|
|
|
|
def test_save_and_load_revision( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.save( basic_obj )
|
|
basic_obj.value = 2
|
|
|
|
self.database.save( basic_obj )
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
assert obj.revision.replace( tzinfo = utc ) == basic_obj.revision
|
|
assert obj.value == basic_obj.value
|
|
|
|
revised = self.database.load( Stub_object, basic_obj.object_id, revision = original_revision )
|
|
|
|
assert revised.object_id == basic_obj.object_id
|
|
assert revised.value == 1
|
|
assert revised.revision.replace( tzinfo = utc ) == original_revision
|
|
|
|
def test_execute( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.execute( basic_obj.sql_create() )
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
assert obj.revision.replace( tzinfo = utc ) == original_revision
|
|
assert obj.value == basic_obj.value
|
|
|
|
def test_execute_without_commit( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.execute( basic_obj.sql_create(), commit = False )
|
|
self.connection.rollback()
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj == None
|
|
|
|
def test_execute_with_explicit_commit( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.execute( basic_obj.sql_create(), commit = False )
|
|
self.database.commit()
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
assert obj.revision.replace( tzinfo = utc ) == original_revision
|
|
assert obj.value == basic_obj.value
|
|
|
|
def test_load_unknown( self ):
|
|
basic_obj = Stub_object( object_id = "5", value = 1 )
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj == None
|
|
|
|
def test_next_id( self ):
|
|
next_id = self.database.next_id( Stub_object )
|
|
assert next_id
|
|
assert self.database.load( Stub_object, next_id )
|
|
prev_ids = [ next_id ]
|
|
|
|
next_id = self.database.next_id( Stub_object )
|
|
assert next_id
|
|
assert next_id not in prev_ids
|
|
assert self.database.load( Stub_object, next_id )
|
|
prev_ids.append( next_id )
|
|
|
|
next_id = self.database.next_id( Stub_object )
|
|
assert next_id
|
|
assert next_id not in prev_ids
|
|
assert self.database.load( Stub_object, next_id )
|
|
|
|
def test_next_id_without_commit( self ):
|
|
next_id = self.database.next_id( Stub_object, commit = False )
|
|
self.connection.rollback()
|
|
assert self.database.load( Stub_object, next_id ) == None
|
|
|
|
def test_next_id_with_explit_commit( self ):
|
|
next_id = self.database.next_id( Stub_object, commit = False )
|
|
self.database.commit()
|
|
assert next_id
|
|
assert self.database.load( Stub_object, next_id )
|
|
|
|
def test_synchronize( self ):
|
|
def make_objects():
|
|
for i in range( 50 ):
|
|
object_id = self.database.next_id( Stub_object )
|
|
basic_obj = Stub_object( object_id, value = 1 )
|
|
original_revision = basic_obj.revision
|
|
|
|
self.database.execute( basic_obj.sql_create() )
|
|
obj = self.database.load( Stub_object, basic_obj.object_id )
|
|
|
|
assert obj.object_id == basic_obj.object_id
|
|
delta = abs( obj.revision.replace( tzinfo = utc ) - original_revision )
|
|
assert delta <= timedelta( seconds = 0.000001 )
|
|
assert obj.value == basic_obj.value
|
|
|
|
object_id = self.database.next_id( Stub_object )
|
|
|
|
# if synchronization (locking) is working properly, then these two threads should be able to run
|
|
# simultaneously without error. without locking, SQLite will raise
|
|
thread1 = Thread( target = make_objects )
|
|
thread2 = Thread( target = make_objects )
|
|
thread1.start()
|
|
thread2.start()
|
|
|
|
thread1.join()
|
|
thread2.join()
|
|
|
|
def test_backend( self ):
|
|
assert self.database.backend == Persistent.SQLITE_BACKEND
|