witten
/
luminotes
Archived
1
0
Fork 0

New programatic schema upgrader that can upgrade a database from one version to another.

This commit is contained in:
Dan Helfman 2008-10-15 17:44:05 -07:00
parent c94443a8c3
commit 0a1d481201
2 changed files with 297 additions and 0 deletions

View File

@ -0,0 +1,138 @@
import os.path
from model.Persistent import Persistent
class Schema_upgrader:
def __init__( self, database, glob = None, read_file = None ):
"""
Create a new schema upgrader and return it.
@type database: Database
@param database: the database to use for all schema upgrades
@type glob: function or NoneType
@param glob: a custom function to use for globbing files as per glob.glob() (optional)
@type read_file: function or NoneType
@param read_file: a custom function to use for reading schema files (optional)
@rtype: Schema_upgrader
@return: newly constructed Schema_upgrader
"""
from glob import glob as real_glob
self.__database = database
self.__glob = glob or real_glob
self.__read_file = read_file or Schema_upgrader.__read_file
@staticmethod
def __read_file( filename ):
"""
Read a file and return all of its contents.
@type filename: unicode
@param filename: full path of the file to read
@rtype: unicode
@return: full contents of the file
"""
return file( filename ).read()
def upgrade_schema( self, to_version ):
"""
Upgrade the database from its current version to a given version, applying all intervening
schema delta files necessary to get there, and apply them in version order. If the given
version is unknown, this method will upgrade to the latest known schema version that is
before the given version.
@type to_version: unicode
@param to_version: the desired version to upgrade to, as a string
"""
to_version = self.version_string_to_tuple( to_version )
try:
from_version = self.__database.select_one( tuple, "select * from schema_version;" );
except:
from_version = None
if self.__database.backend == Persistent.SQLITE_BACKEND:
extension = u"sqlite"
else:
extension = u"sql"
filenames = self.__glob( u"model/delta/*.%s" % extension )
versions = []
# make a list of all available schema delta files
for filename in filenames:
base_filename = os.path.basename( filename )
try:
version = self.version_string_to_tuple( base_filename )
except ValueError:
continue
# skip those versions that won't help us upgrade
if from_version and version <= from_version:
continue
if version > to_version:
continue
versions.append( ( version, filename ) )
# sort the schema delta files by version
versions.sort( lambda a, b: cmp( a[ 0 ], b[ 0 ] ) )
# apply the schema delta files in sorted order
for ( version, filename ) in versions:
self.apply_schema_delta( version, filename )
self.__database.commit()
def apply_schema_delta( self, version, filename ):
"""
Upgrade the database from its current version to a given version, applying only the named
schema delta file to do so.
@type version: tuple
@param version: ( major, minor, release ) with each version part as an integer
@type filename: unicode
@param filename: full path to the schema delta file to apply
"""
self.__database.execute_script( self.__read_file( filename ), commit = False )
try:
self.__database.execute( "update schema_version set major = %s, minor = %s, release = %s;" % version, commit = False );
# if the table doesn't yet exist, create it
except:
self.__database.execute( "create table schema_version ( major numeric, minor numeric, release numeric );", commit = False );
self.__database.execute( "insert into schema_version values ( %s, %s, %s );" % version, commit = False );
@staticmethod
def version_string_to_tuple( version ):
"""
Given a version string with an optional file extension tacked on, convert the version to a
tuple of integers and return it.
@type version: unicode
@param version: a version string of the form "major.minor.release"
@rtype: tuple
@return: ( major, minor, release ) with each version part as an integer
@raises ValueError: invalid version or version parts cannot be converted to integers
"""
VERSION_PARTS_COUNT = 3
parts = version.split( "." )
length = len( parts )
if length == VERSION_PARTS_COUNT + 1:
( major, minor, release, extension ) = parts
elif length == VERSION_PARTS_COUNT:
( major, minor, release ) = parts
else:
raise ValueError()
try:
major = int( major )
minor = int( minor )
release = int( release )
except ( TypeError, ValueError ):
raise ValueError()
return ( major, minor, release )

View File

@ -0,0 +1,159 @@
import re
from nose.tools import raises
from pysqlite2 import dbapi2 as sqlite
from Stub_object import Stub_object
from Stub_cache import Stub_cache
from model.Persistent import Persistent
from controller.Database import Database, Connection_wrapper
from controller.Schema_upgrader import Schema_upgrader
class Test_schema_upgrader( object ):
def setUp( self ):
# make an in-memory sqlite database to use during testing
self.connection = Connection_wrapper( sqlite.connect( ":memory:", detect_types = sqlite.PARSE_DECLTYPES, check_same_thread = False ) )
self.cache = Stub_cache()
cursor = self.connection.cursor()
cursor.execute( Stub_object.sql_create_table() )
self.fake_files = {} # map of fake filename (full path) to fake file contents
self.database = Database( self.connection, self.cache )
self.upgrader = Schema_upgrader( self.database, glob = self.glob, read_file = self.read_file )
def tearDown( self ):
self.database.close()
def glob( self, glob_pattern ):
"""
A fake glob function that doesn't use the filesystem.
"""
re_pattern = re.compile( glob_pattern.replace( "*", "[^/]*" ) )
return [ filename for filename in self.fake_files.keys() if re_pattern.search( filename ) ]
def read_file( self, filename ):
"""
A fake read file function that doesn't use the filesystem.
"""
contents = self.fake_files.get( filename )
if not contents:
raise IOError()
return contents
def test_upgrade_schema( self ):
self.fake_files = {
u"model/delta/5.6.7.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );",
u"model/delta/5.6.8.sqlite": u"insert into new_table values ( 'bye' );",
u"model/delta/5.6.10.sqlite": u"alter table new_table add column bar text;",
u"model/delta/5.7.11.sqlite": u"insert into new_table values ( 'whee', 'stuff' );",
u"model/delta/5.7.18.sqlite": u"insert into new_table values ( 'should not be present', 'nope' );",
}
self.upgrader.upgrade_schema( u"5.7.11" );
result = self.database.select_many( tuple, u"select * from new_table;" );
assert result == [ ( u"hi", None ), ( u"bye", None ), ( "whee", "stuff" ) ];
result = self.database.select_many( tuple, u"select * from schema_version;" );
assert result == [ ( 5, 7, 11 ) ];
def test_upgrade_schema_with_schema_version_table( self ):
self.database.execute( u"create table schema_version ( major numeric, minor numeric, release numeric );" );
self.database.execute( u"insert into schema_version values ( 0, 0, 0 );" )
self.test_upgrade_schema();
def test_upgrade_schema_with_schema_version_table_and_specific_starting_version( self ):
self.database.execute( u"create table schema_version ( major numeric, minor numeric, release numeric );" );
self.database.execute( u"insert into schema_version values ( 5, 6, 6 );" )
self.fake_files[ u"model/delta/5.6.1.sqlite" ] = u"this is not valid sql and should not be executed anyway;";
self.fake_files[ u"model/delta/5.6.6.sqlite" ] = u"also invalid;";
self.test_upgrade_schema();
def test_upgrade_schema_with_future_ending_version( self ):
self.fake_files = {
u"model/delta/5.6.7.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );",
u"model/delta/5.6.8.sqlite": u"insert into new_table values ( 'bye' );",
u"model/delta/5.6.10.sqlite": u"alter table new_table add column bar text;",
u"model/delta/5.7.11.sqlite": u"insert into new_table values ( 'whee', 'stuff' );",
u"model/delta/5.7.18.sqlite": u"insert into new_table values ( 'more', 'and more' );",
}
self.upgrader.upgrade_schema( u"5.8.55" );
result = self.database.select_many( tuple, u"select * from new_table;" );
assert result == [ ( u"hi", None ), ( u"bye", None ), ( "whee", "stuff" ), ( "more", "and more" ) ];
result = self.database.select_many( tuple, u"select * from schema_version;" );
assert result == [ ( 5, 7, 18 ) ];
def test_upgrade_schema_twice( self ):
self.test_upgrade_schema();
# the second upgrade should have no effect, because at this point it's already upgraded
self.test_upgrade_schema();
def test_upgrade_schema_with_filename_with_invalid_version( self ):
# the filename, not composed of all-integer parts, should be skipped
self.fake_files[ u"model/delta/5.6.9b.sqlite" ] = u"this is not valid sql and should not be executed anyway;";
self.test_upgrade_schema();
def test_apply_schema_delta( self ):
self.fake_files = {
u"model/delta/5.6.5.sqlite": u"insert into new_table values ( 'should not show up' );",
u"model/delta/5.6.7.sqlite": u"create table new_table ( foo text ); insert into new_table values ( 'hi' );",
u"model/delta/5.7.18.sqlite": u"insert into new_table values ( 'should not be present' );",
}
self.upgrader.apply_schema_delta( ( 5, 6, 7 ), u"model/delta/5.6.7.sqlite" )
result = self.database.select_many( unicode, u"select * from new_table;" );
assert result == [ u"hi" ];
result = self.database.select_many( tuple, u"select * from schema_version;" );
assert result == [ ( 5, 6, 7 ) ];
def test_apply_schema_delta_with_schema_version_table( self ):
self.database.execute( u"create table schema_version ( major numeric, minor numeric, release numeric );" );
self.database.execute( u"insert into schema_version values ( 0, 0, 0 );" )
self.test_apply_schema_delta();
@raises( IOError )
def test_apply_schema_delta_with_unknown_file( self ):
self.upgrader.apply_schema_delta( ( 5, 6, 7 ), u"model/delta/5.6.7.sqlite" )
def test_version_string_to_tuple( self ):
version = self.upgrader.version_string_to_tuple( "2.5.13" )
assert len( version ) == 3
assert version[ 0 ] == 2
assert version[ 1 ] == 5
assert version[ 2 ] == 13
def test_version_string_to_tuple_with_extension( self ):
version = self.upgrader.version_string_to_tuple( "2.5.13.sqlite" )
assert len( version ) == 3
assert version[ 0 ] == 2
assert version[ 1 ] == 5
assert version[ 2 ] == 13
@raises( ValueError )
def test_version_string_to_tuple_with_too_many_parts( self ):
version = self.upgrader.version_string_to_tuple( "3.14.159.26.5" )
@raises( ValueError )
def test_version_string_to_tuple_with_too_few_parts( self ):
version = self.upgrader.version_string_to_tuple( "3.14" )
@raises( ValueError )
def test_version_string_to_tuple_with_non_integer_part( self ):
version = self.upgrader.version_string_to_tuple( "2.5b.13" )
@raises( ValueError )
def test_version_string_to_tuple_with_empty_part( self ):
version = self.upgrader.version_string_to_tuple( "2..13" )