witten
/
luminotes
Archived
1
0
Fork 0
This repository has been archived on 2023-12-16. You can view files and clone it, but cannot push or open issues or pull requests.
luminotes/controller/test/Test_database.py

257 lines
9.5 KiB
Python

from pytz import utc
from threading import Thread
from pysqlite2 import dbapi2 as sqlite
from datetime import datetime, timedelta
from Stub_object import Stub_object
from Stub_cache import Stub_cache
from model.Persistent import Persistent
from controller.Database import Database, Connection_wrapper
class Test_database( object ):
def setUp( self ):
# make an in-memory sqlite database to use during testing
self.connection = Connection_wrapper( sqlite.connect( ":memory:", detect_types = sqlite.PARSE_DECLTYPES, check_same_thread = False ) )
self.cache = Stub_cache()
cursor = self.connection.cursor()
cursor.execute( Stub_object.sql_create_table() )
self.database = Database( self.connection, self.cache )
def tearDown( self ):
self.database.close()
def test_save_and_load( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.save( basic_obj )
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj.object_id == basic_obj.object_id
assert obj.revision.replace( tzinfo = utc ) == original_revision
assert obj.value == basic_obj.value
def test_save_and_load_without_commit( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.save( basic_obj, commit = False )
self.connection.rollback() # if commit wasn't called, this should back out the save
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj == None
def test_save_and_load_with_explicit_commit( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.save( basic_obj, commit = False )
self.database.commit()
self.connection.rollback() # should have no effect because of the call to commit
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj.object_id == basic_obj.object_id
assert obj.revision.replace( tzinfo = utc ) == original_revision
assert obj.value == basic_obj.value
def test_select_one( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.save( basic_obj )
obj = self.database.select_one( Stub_object, Stub_object.sql_load( basic_obj.object_id ) )
assert obj.object_id == basic_obj.object_id
assert obj.revision.replace( tzinfo = utc ) == original_revision
assert obj.value == basic_obj.value
def test_select_datetime( self ):
# this revision (with .504099) happens to test for a bug caused by floating point rounding errors
original_revision = "2008-01-01 01:00:42.504099+00:00"
basic_obj = Stub_object( object_id = "5", revision = original_revision, value = 1 )
self.database.save( basic_obj )
obj = self.database.select_one( Stub_object, Stub_object.sql_load( basic_obj.object_id ) )
assert obj.object_id == basic_obj.object_id
assert str( obj.revision.replace( tzinfo = utc ) ) == original_revision
assert obj.value == basic_obj.value
def test_select_datetime_with_many_fractional_digits( self ):
original_revision = "2008-01-01 01:00:42.5032429489284+00:00"
basic_obj = Stub_object( object_id = "5", revision = original_revision, value = 1 )
self.database.save( basic_obj )
obj = self.database.select_one( Stub_object, Stub_object.sql_load( basic_obj.object_id ) )
assert obj.object_id == basic_obj.object_id
assert str( obj.revision.replace( tzinfo = utc ) ) == "2008-01-01 01:00:42.503242+00:00"
assert obj.value == basic_obj.value
def test_select_datetime_with_zero_fractional_seconds( self ):
original_revision = "2008-01-01 01:00:42.0+00:00"
basic_obj = Stub_object( object_id = "5", revision = original_revision, value = 1 )
self.database.save( basic_obj )
obj = self.database.select_one( Stub_object, Stub_object.sql_load( basic_obj.object_id ) )
assert obj.object_id == basic_obj.object_id
assert str( obj.revision.replace( tzinfo = utc ) ) == "2008-01-01 01:00:42+00:00"
assert obj.value == basic_obj.value
def test_select_one_tuple( self ):
obj = self.database.select_one( tuple, Stub_object.sql_tuple() )
assert len( obj ) == 2
assert obj[ 0 ] == 1
assert obj[ 1 ] == 2
def test_select_many( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
basic_obj2 = Stub_object( object_id = "6", value = 2 )
original_revision2 = basic_obj2.revision
self.database.save( basic_obj )
self.database.save( basic_obj2 )
objs = self.database.select_many( Stub_object, Stub_object.sql_load_em_all() )
assert len( objs ) == 2
assert objs[ 0 ].object_id == basic_obj.object_id
assert objs[ 0 ].revision.replace( tzinfo = utc ) == original_revision
assert objs[ 0 ].value == basic_obj.value
assert objs[ 1 ].object_id == basic_obj2.object_id
assert objs[ 1 ].revision.replace( tzinfo = utc ) == original_revision2
assert objs[ 1 ].value == basic_obj2.value
def test_select_many_tuples( self ):
objs = self.database.select_many( tuple, Stub_object.sql_tuple() )
assert len( objs ) == 1
assert len( objs[ 0 ] ) == 2
assert objs[ 0 ][ 0 ] == 1
assert objs[ 0 ][ 1 ] == 2
def test_select_many_with_no_matches( self ):
objs = self.database.select_many( Stub_object, Stub_object.sql_load_em_all() )
assert len( objs ) == 0
def test_save_and_load_revision( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.save( basic_obj )
basic_obj.value = 2
self.database.save( basic_obj )
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj.object_id == basic_obj.object_id
assert obj.revision.replace( tzinfo = utc ) == basic_obj.revision
assert obj.value == basic_obj.value
revised = self.database.load( Stub_object, basic_obj.object_id, revision = original_revision )
assert revised.object_id == basic_obj.object_id
assert revised.value == 1
assert revised.revision.replace( tzinfo = utc ) == original_revision
def test_execute( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.execute( basic_obj.sql_create() )
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj.object_id == basic_obj.object_id
assert obj.revision.replace( tzinfo = utc ) == original_revision
assert obj.value == basic_obj.value
def test_execute_without_commit( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.execute( basic_obj.sql_create(), commit = False )
self.connection.rollback()
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj == None
def test_execute_with_explicit_commit( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
original_revision = basic_obj.revision
self.database.execute( basic_obj.sql_create(), commit = False )
self.database.commit()
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj.object_id == basic_obj.object_id
assert obj.revision.replace( tzinfo = utc ) == original_revision
assert obj.value == basic_obj.value
def test_load_unknown( self ):
basic_obj = Stub_object( object_id = "5", value = 1 )
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj == None
def test_next_id( self ):
next_id = self.database.next_id( Stub_object )
assert next_id
assert self.database.load( Stub_object, next_id )
prev_ids = [ next_id ]
next_id = self.database.next_id( Stub_object )
assert next_id
assert next_id not in prev_ids
assert self.database.load( Stub_object, next_id )
prev_ids.append( next_id )
next_id = self.database.next_id( Stub_object )
assert next_id
assert next_id not in prev_ids
assert self.database.load( Stub_object, next_id )
def test_next_id_without_commit( self ):
next_id = self.database.next_id( Stub_object, commit = False )
self.connection.rollback()
assert self.database.load( Stub_object, next_id ) == None
def test_next_id_with_explicit_commit( self ):
next_id = self.database.next_id( Stub_object, commit = False )
self.database.commit()
assert next_id
assert self.database.load( Stub_object, next_id )
def test_synchronize( self ):
def make_objects():
for i in range( 50 ):
object_id = self.database.next_id( Stub_object )
basic_obj = Stub_object( object_id, value = 1 )
original_revision = basic_obj.revision
self.database.execute( basic_obj.sql_create() )
obj = self.database.load( Stub_object, basic_obj.object_id )
assert obj.object_id == basic_obj.object_id
delta = abs( obj.revision.replace( tzinfo = utc ) - original_revision )
assert delta <= timedelta( seconds = 0.000001 )
assert obj.value == basic_obj.value
object_id = self.database.next_id( Stub_object )
# if synchronization (locking) is working properly, then these two threads should be able to run
# simultaneously without error. without locking, SQLite will raise
thread1 = Thread( target = make_objects )
thread2 = Thread( target = make_objects )
thread1.start()
thread2.start()
thread1.join()
thread2.join()
def test_backend( self ):
assert self.database.backend == Persistent.SQLITE_BACKEND