import re from nose.tools import raises from pysqlite2 import dbapi2 as sqlite from Stub_object import Stub_object from Stub_cache import Stub_cache from model.Persistent import Persistent from controller.Database import Database, Connection_wrapper from controller.Schema_upgrader import Schema_upgrader class Test_schema_upgrader( object ): def setUp( self ): # make an in-memory sqlite database to use during testing self.connection = Connection_wrapper( sqlite.connect( ":memory:", detect_types = sqlite.PARSE_DECLTYPES, check_same_thread = False ) ) self.cache = Stub_cache() cursor = self.connection.cursor() cursor.execute( Stub_object.sql_create_table() ) self.fake_files = {} # map of fake filename (full path) to fake file contents self.database = Database( self.connection, self.cache ) self.upgrader = Schema_upgrader( self.database, glob = self.glob, read_file = self.read_file ) def tearDown( self ): self.database.close() def glob( self, glob_pattern ): """ A fake glob function that doesn't use the filesystem. """ re_pattern = re.compile( glob_pattern.replace( "*", "[^/]*" ) ) return [ filename for filename in self.fake_files.keys() if re_pattern.search( filename ) ] def read_file( self, filename ): """ A fake read file function that doesn't use the filesystem. """ contents = self.fake_files.get( filename ) if not contents: raise IOError() return contents def test_upgrade_schema( self, to_version = None ): if not to_version: to_version = u"5.7.11" self.fake_files = { u"model/delta/5.6.7.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );", u"model/delta/5.6.8.sqlite": u"insert into new_table values ( 'bye' );", u"model/delta/5.6.10.sqlite": u"alter table new_table add column bar text;", u"model/delta/5.7.11.sqlite": u"insert into new_table values ( 'whee', 'stuff' );", u"model/delta/5.7.18.sqlite": u"insert into new_table values ( 'more', 'things' );", } self.upgrader.upgrade_schema( to_version ) result = self.database.select_many( tuple, u"select * from new_table;" ) if to_version == u"5.7.11": assert result == [ ( u"hi", None ), ( u"bye", None ), ( "whee", "stuff" ) ] else: assert result == [ ( u"hi", None ), ( u"bye", None ), ( "whee", "stuff" ), ( "more", "things" ) ] result = self.database.select_many( tuple, u"select * from schema_version;" ) if to_version == u"5.7.11": assert result == [ ( 5, 7, 11 ) ] else: assert result == [ ( 5, 7, 18 ) ] def test_upgrade_schema_with_schema_version_table( self ): self.database.execute( u"create table schema_version ( major numeric, minor numeric, \"release\" numeric );" ) self.database.execute( u"insert into schema_version values ( 0, 0, 0 );" ) self.test_upgrade_schema() def test_upgrade_schema_with_schema_version_table_and_starting_version( self ): self.database.execute( u"create table schema_version ( major numeric, minor numeric, \"release\" numeric );" ) self.database.execute( u"insert into schema_version values ( 5, 6, 6 );" ) self.fake_files[ u"model/delta/5.6.1.sqlite" ] = u"this is not valid sql and should not be executed anyway;" self.fake_files[ u"model/delta/5.6.6.sqlite" ] = u"also invalid;" self.test_upgrade_schema() def test_upgrade_schema_with_schema_version_table_and_target_version_without_schema( self ): self.database.execute( u"create table schema_version ( major numeric, minor numeric, \"release\" numeric );" ) self.database.execute( u"insert into schema_version values ( 0, 0, 0 );" ) self.test_upgrade_schema( to_version = u"5.7.20" ) def test_upgrade_schema_with_schema_version_table_and_starting_version_and_target_version_without_schema( self ): self.database.execute( u"create table schema_version ( major numeric, minor numeric, \"release\" numeric );" ) self.database.execute( u"insert into schema_version values ( 5, 6, 6 );" ) self.test_upgrade_schema( to_version = u"5.7.20" ) def test_upgrade_schema_with_future_ending_version( self ): self.fake_files = { u"model/delta/5.6.7.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );", u"model/delta/5.6.8.sqlite": u"insert into new_table values ( 'bye' );", u"model/delta/5.6.10.sqlite": u"alter table new_table add column bar text;", u"model/delta/5.7.11.sqlite": u"insert into new_table values ( 'whee', 'stuff' );", u"model/delta/5.7.18.sqlite": u"insert into new_table values ( 'more', 'and more' );", } self.upgrader.upgrade_schema( u"5.8.55" ) result = self.database.select_many( tuple, u"select * from new_table;" ) assert result == [ ( u"hi", None ), ( u"bye", None ), ( "whee", "stuff" ), ( "more", "and more" ) ] result = self.database.select_many( tuple, u"select * from schema_version;" ) assert result == [ ( 5, 7, 18 ) ] def test_upgrade_schema_twice( self ): self.test_upgrade_schema() # the second upgrade should have no effect, because at this point it's already upgraded self.test_upgrade_schema() def test_upgrade_schema_with_filename_with_invalid_version( self ): # the filename, not composed of all-integer parts, should be skipped self.fake_files[ u"model/delta/5.6.9b.sqlite" ] = u"this is not valid sql and should not be executed anyway;" self.test_upgrade_schema() def test_upgrade_schema_default_to_start_version_of_1_5_4( self ): # test that if no schema_version table exists, then the starting version is assumed to be 1.5.4 self.fake_files = { u"model/delta/1.5.3.sqlite": u"invalid sql;", u"model/delta/1.5.4.sqlite": u"should not be invoked;", u"model/delta/1.5.5.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );", u"model/delta/1.5.6.sqlite": u"insert into new_table values ( 'bye' );", } self.upgrader.upgrade_schema( u"1.5.6" ) result = self.database.select_many( tuple, u"select * from new_table;" ) assert result == [ ( u"hi", ), ( u"bye", ), ] result = self.database.select_many( tuple, u"select * from schema_version;" ) assert result == [ ( 1, 5, 6 ) ] def test_apply_schema_delta( self ): self.database.execute( u"create table schema_version ( major numeric, minor numeric, \"release\" numeric );" ) self.database.execute( u"insert into schema_version values ( 0, 0, 0 );" ) self.fake_files = { u"model/delta/5.6.5.sqlite": u"insert into new_table values ( 'should not show up' );", u"model/delta/5.6.7.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );", u"model/delta/5.7.18.sqlite": u"insert into new_table values ( 'should not be present' );", } self.upgrader.apply_schema_delta( ( 5, 6, 7 ), u"model/delta/5.6.7.sqlite" ) result = self.database.select_many( unicode, u"select * from new_table;" ); assert result == [ u"hi" ] result = self.database.select_many( tuple, u"select * from schema_version;" ); assert result == [ ( 5, 6, 7 ) ] @raises( IOError ) def test_apply_schema_delta_with_unknown_file( self ): self.upgrader.apply_schema_delta( ( 5, 6, 7 ), u"model/delta/5.6.7.sqlite" ) def test_version_string_to_tuple( self ): version = self.upgrader.version_string_to_tuple( "2.5.13" ) assert len( version ) == 3 assert version[ 0 ] == 2 assert version[ 1 ] == 5 assert version[ 2 ] == 13 def test_version_string_to_tuple_with_extension( self ): version = self.upgrader.version_string_to_tuple( "2.5.13.sqlite" ) assert len( version ) == 3 assert version[ 0 ] == 2 assert version[ 1 ] == 5 assert version[ 2 ] == 13 @raises( ValueError ) def test_version_string_to_tuple_with_too_many_parts( self ): version = self.upgrader.version_string_to_tuple( "3.14.159.26.5" ) @raises( ValueError ) def test_version_string_to_tuple_with_too_few_parts( self ): version = self.upgrader.version_string_to_tuple( "3.14" ) @raises( ValueError ) def test_version_string_to_tuple_with_non_integer_part( self ): version = self.upgrader.version_string_to_tuple( "2.5b.13" ) @raises( ValueError ) def test_version_string_to_tuple_with_empty_part( self ): version = self.upgrader.version_string_to_tuple( "2..13" )