Old model no longer needed now that all databases are convered to PostgreSQL.
This commit is contained in:
parent
6786f9801a
commit
17ad869635
|
@ -1,303 +0,0 @@
|
|||
import re
|
||||
import bsddb
|
||||
import random
|
||||
import cPickle
|
||||
from cStringIO import StringIO
|
||||
from copy import copy
|
||||
from model.Persistent import Persistent
|
||||
from Async import async
|
||||
|
||||
|
||||
class Old_database( object ):
|
||||
ID_BITS = 128 # number of bits within an id
|
||||
ID_DIGITS = "0123456789abcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
def __init__( self, scheduler, database_path = None ):
|
||||
"""
|
||||
Create a new database and return it.
|
||||
|
||||
@type scheduler: Scheduler
|
||||
@param scheduler: scheduler to use
|
||||
@type database_path: unicode
|
||||
@param database_path: path to the database file
|
||||
@rtype: Old_database
|
||||
@return: database at the given path
|
||||
"""
|
||||
self.__scheduler = scheduler
|
||||
self.__env = bsddb.db.DBEnv()
|
||||
self.__env.open( None, bsddb.db.DB_CREATE | bsddb.db.DB_PRIVATE | bsddb.db.DB_INIT_MPOOL )
|
||||
self.__db = bsddb.db.DB( self.__env )
|
||||
self.__db.open( database_path, "database", bsddb.db.DB_HASH, bsddb.db.DB_CREATE )
|
||||
self.__cache = {}
|
||||
|
||||
def __persistent_id( self, obj, skip = None ):
|
||||
# save the object and return its persistent id
|
||||
if obj != skip and isinstance( obj, Persistent ):
|
||||
self.__save( obj )
|
||||
return obj.object_id
|
||||
|
||||
# returning None indicates that the object should be pickled normally without using a persistent id
|
||||
return None
|
||||
|
||||
@async
|
||||
def save( self, obj, callback = None ):
|
||||
"""
|
||||
Save the given object to the database, including any objects that it references.
|
||||
|
||||
@type obj: Persistent
|
||||
@param obj: object to save
|
||||
@type callback: generator or NoneType
|
||||
@param callback: generator to wakeup when the save is complete (optional)
|
||||
"""
|
||||
self.__save( obj )
|
||||
yield callback
|
||||
|
||||
def __save( self, obj ):
|
||||
# if this object's current revision is already saved, bail
|
||||
revision_id = obj.revision_id()
|
||||
if revision_id in self.__cache:
|
||||
return
|
||||
|
||||
object_id = unicode( obj.object_id ).encode( "utf8" )
|
||||
revision_id = unicode( obj.revision_id() ).encode( "utf8" )
|
||||
secondary_id = obj.secondary_id and unicode( obj.full_secondary_id() ).encode( "utf8" ) or None
|
||||
|
||||
# update the cache with this saved object
|
||||
self.__cache[ object_id ] = obj
|
||||
self.__cache[ revision_id ] = copy( obj )
|
||||
if secondary_id:
|
||||
self.__cache[ secondary_id ] = obj
|
||||
|
||||
# set the pickler up to save persistent ids for every object except for the obj passed in, which
|
||||
# will be pickled normally
|
||||
buffer = StringIO()
|
||||
pickler = cPickle.Pickler( buffer, protocol = -1 )
|
||||
pickler.persistent_id = lambda o: self.__persistent_id( o, skip = obj )
|
||||
|
||||
# pickle the object and write it to the database under both its id key and its revision id key
|
||||
pickler.dump( obj )
|
||||
pickled = buffer.getvalue()
|
||||
self.__db.put( object_id, pickled )
|
||||
self.__db.put( revision_id, pickled )
|
||||
|
||||
# write the pickled object id (only) to the database under its secondary id
|
||||
if secondary_id:
|
||||
buffer = StringIO()
|
||||
pickler = cPickle.Pickler( buffer, protocol = -1 )
|
||||
pickler.persistent_id = lambda o: self.__persistent_id( o )
|
||||
pickler.dump( obj )
|
||||
self.__db.put( secondary_id, buffer.getvalue() )
|
||||
|
||||
self.__db.sync()
|
||||
|
||||
@async
|
||||
def load( self, object_id, callback, revision = None ):
|
||||
"""
|
||||
Load the object corresponding to the given object id from the database, and yield the provided
|
||||
callback generator with the loaded object as its argument, or None if the object_id is unknown.
|
||||
If a revision is provided, a specific revision of the object will be loaded.
|
||||
|
||||
@type object_id: unicode
|
||||
@param object_id: id of the object to load
|
||||
@type callback: generator
|
||||
@param callback: generator to send the loaded object to
|
||||
@type revision: int or NoneType
|
||||
@param revision: revision of the object to load (optional)
|
||||
"""
|
||||
obj = self.__load( object_id, revision )
|
||||
yield callback, obj
|
||||
|
||||
def __load( self, object_id, revision = None ):
|
||||
if revision is not None:
|
||||
object_id = Persistent.make_revision_id( object_id, revision )
|
||||
|
||||
object_id = unicode( object_id ).encode( "utf8" )
|
||||
|
||||
# if the object corresponding to the given id has already been loaded, simply return it without
|
||||
# loading it again
|
||||
obj = self.__cache.get( object_id )
|
||||
if obj is not None:
|
||||
return obj
|
||||
|
||||
# grab the object for the given id from the database
|
||||
buffer = StringIO()
|
||||
unpickler = cPickle.Unpickler( buffer )
|
||||
unpickler.persistent_load = self.__load
|
||||
|
||||
pickled = self.__db.get( object_id )
|
||||
if pickled is None or pickled == "":
|
||||
return None
|
||||
|
||||
buffer.write( pickled )
|
||||
buffer.flush()
|
||||
buffer.seek( 0 )
|
||||
|
||||
# unpickle the object and update the cache with this saved object
|
||||
obj = unpickler.load()
|
||||
if obj is None:
|
||||
print "error unpickling %s: %s" % ( object_id, pickled )
|
||||
return None
|
||||
self.__cache[ unicode( obj.object_id ).encode( "utf8" ) ] = obj
|
||||
self.__cache[ unicode( obj.revision_id() ).encode( "utf8" ) ] = copy( obj )
|
||||
|
||||
return obj
|
||||
|
||||
@async
|
||||
def reload( self, object_id, callback = None ):
|
||||
"""
|
||||
Load and immediately save the object corresponding to the given object id or database key. This
|
||||
is useful when the object has a __setstate__() method that performs some sort of schema
|
||||
evolution operation.
|
||||
|
||||
@type object_id: unicode
|
||||
@param object_id: id or key of the object to reload
|
||||
@type callback: generator or NoneType
|
||||
@param callback: generator to wakeup when the save is complete (optional)
|
||||
"""
|
||||
self.__reload( object_id )
|
||||
yield callback
|
||||
|
||||
def __reload( self, object_id, revision = None ):
|
||||
object_id = unicode( object_id ).encode( "utf8" )
|
||||
|
||||
# grab the object for the given id from the database
|
||||
buffer = StringIO()
|
||||
unpickler = cPickle.Unpickler( buffer )
|
||||
unpickler.persistent_load = self.__load
|
||||
|
||||
pickled = self.__db.get( object_id )
|
||||
if pickled is None or pickled == "":
|
||||
return
|
||||
|
||||
buffer.write( pickled )
|
||||
buffer.flush()
|
||||
buffer.seek( 0 )
|
||||
|
||||
# unpickle the object. this should trigger __setstate__() if the object has such a method
|
||||
obj = unpickler.load()
|
||||
if obj is None:
|
||||
print "error unpickling %s: %s" % ( object_id, pickled )
|
||||
return
|
||||
self.__cache[ object_id ] = obj
|
||||
|
||||
# set the pickler up to save persistent ids for every object except for the obj passed in, which
|
||||
# will be pickled normally
|
||||
buffer = StringIO()
|
||||
pickler = cPickle.Pickler( buffer, protocol = -1 )
|
||||
pickler.persistent_id = lambda o: self.__persistent_id( o, skip = obj )
|
||||
|
||||
# pickle the object and write it to the database under its id key
|
||||
pickler.dump( obj )
|
||||
pickled = buffer.getvalue()
|
||||
self.__db.put( object_id, pickled )
|
||||
|
||||
self.__db.sync()
|
||||
|
||||
def size( self, object_id, revision = None ):
|
||||
"""
|
||||
Load the object corresponding to the given object id from the database, and return the size of
|
||||
its pickled data in bytes. If a revision is provided, a specific revision of the object will be
|
||||
loaded.
|
||||
|
||||
@type object_id: unicode
|
||||
@param object_id: id of the object whose size should be returned
|
||||
@type revision: int or NoneType
|
||||
@param revision: revision of the object to load (optional)
|
||||
"""
|
||||
if revision is not None:
|
||||
object_id = Persistent.make_revision_id( object_id, revision )
|
||||
|
||||
object_id = unicode( object_id ).encode( "utf8" )
|
||||
|
||||
pickled = self.__db.get( object_id )
|
||||
if pickled is None or pickled == "":
|
||||
return None
|
||||
|
||||
return len( pickled )
|
||||
|
||||
@staticmethod
|
||||
def generate_id():
|
||||
int_id = random.getrandbits( Old_database.ID_BITS )
|
||||
|
||||
base = len( Old_database.ID_DIGITS )
|
||||
digits = []
|
||||
|
||||
while True:
|
||||
index = int_id % base
|
||||
digits.insert( 0, Old_database.ID_DIGITS[ index ] )
|
||||
int_id = int_id / base
|
||||
if int_id == 0:
|
||||
break
|
||||
|
||||
return "".join( digits )
|
||||
|
||||
@async
|
||||
def next_id( self, callback ):
|
||||
"""
|
||||
Generate the next available object id, and yield the provided callback generator with the
|
||||
object id as its argument.
|
||||
|
||||
@type callback: generator
|
||||
@param callback: generator to send the next available object id to
|
||||
"""
|
||||
# generate a random id, but on the off-chance that it collides with something else already in
|
||||
# the database, try again
|
||||
next_id = Old_database.generate_id()
|
||||
while self.__db.get( next_id, default = None ) is not None:
|
||||
next_id = Old_database.generate_id()
|
||||
|
||||
# save the next_id as a key in the database so that it's not handed out again to another client
|
||||
self.__db[ next_id ] = ""
|
||||
|
||||
yield callback, next_id
|
||||
|
||||
@async
|
||||
def close( self ):
|
||||
"""
|
||||
Shutdown the database.
|
||||
"""
|
||||
self.__db.close()
|
||||
self.__env.close()
|
||||
yield None
|
||||
|
||||
@async
|
||||
def clear_cache( self ):
|
||||
"""
|
||||
Clear the memory object cache.
|
||||
"""
|
||||
self.__cache.clear()
|
||||
yield None
|
||||
|
||||
scheduler = property( lambda self: self.__scheduler )
|
||||
|
||||
|
||||
class Valid_id( object ):
|
||||
"""
|
||||
Validator for an object id.
|
||||
"""
|
||||
ID_PATTERN = re.compile( "^[%s]+$" % Old_database.ID_DIGITS )
|
||||
|
||||
def __init__( self, none_okay = False ):
|
||||
self.__none_okay = none_okay
|
||||
|
||||
def __call__( self, value ):
|
||||
if self.__none_okay and value in ( None, "None", "" ): return None
|
||||
if self.ID_PATTERN.search( value ): return str( value )
|
||||
|
||||
raise ValueError()
|
||||
|
||||
|
||||
class Valid_revision( object ):
|
||||
"""
|
||||
Validator for an object id.
|
||||
"""
|
||||
REVISION_PATTERN = re.compile( "^\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d+$" )
|
||||
|
||||
def __init__( self, none_okay = False ):
|
||||
self.__none_okay = none_okay
|
||||
|
||||
def __call__( self, value ):
|
||||
if self.__none_okay and value in ( None, "None", "" ): return None
|
||||
if self.REVISION_PATTERN.search( value ): return str( value )
|
||||
|
||||
raise ValueError()
|
|
@ -1,157 +0,0 @@
|
|||
from time import time, sleep
|
||||
from threading import Thread, Semaphore
|
||||
|
||||
|
||||
class Scheduler( object ):
|
||||
SLEEP = 0 # yielded by a generator to indicate that it should be put to sleep
|
||||
|
||||
def __init__( self ):
|
||||
"""
|
||||
A scheduler for generator-based microthreads.
|
||||
"""
|
||||
self.__running = [] # list of active microthreads
|
||||
self.__sleeping = [] # list of sleeping microthreads
|
||||
self.__messages = {} # map of thread to list of its incoming messages
|
||||
self.__thread = None # currently executing microthread (if any)
|
||||
self.__done = False # whether it's time to exit
|
||||
self.__idle = Semaphore( 0 )
|
||||
self.__last_error = None # used for unit tests
|
||||
|
||||
self.add( self.__idle_thread() )
|
||||
self.__idle.acquire() # don't count the idle thread
|
||||
|
||||
# TODO: Running the scheduler from anything other than the main Python thread somehow prevents
|
||||
# tracebacks from within a generator from indicating the offending line and line number. So it
|
||||
# would be really useful for debugging purposes to start the scheduler from the main thread.
|
||||
# The reason that it's not done here is because CherryPy's blocking server must be started
|
||||
# from the main Python thread.
|
||||
self.__scheduler_thread = Thread( target = self.run )
|
||||
self.__scheduler_thread.setDaemon( True )
|
||||
self.__scheduler_thread.start()
|
||||
|
||||
def run( self ):
|
||||
"""
|
||||
Run all threads repeatedly.
|
||||
"""
|
||||
while not self.__done:
|
||||
self.__run_once()
|
||||
|
||||
def __run_once( self ):
|
||||
"""
|
||||
Run all active threads once.
|
||||
"""
|
||||
turn_start = time()
|
||||
|
||||
for thread in list( self.__running ):
|
||||
try:
|
||||
messages = self.__messages.get( thread )
|
||||
|
||||
self.__thread = thread
|
||||
try:
|
||||
if messages:
|
||||
result = thread.send( *messages.pop( 0 ) )
|
||||
else:
|
||||
result = thread.next()
|
||||
except StopIteration:
|
||||
raise
|
||||
except Exception, e:
|
||||
self.__last_error = e
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise StopIteration()
|
||||
|
||||
self__thread = None
|
||||
|
||||
if self.__done:
|
||||
return True
|
||||
|
||||
if result is None:
|
||||
continue
|
||||
|
||||
# a yielded result of SLEEP indicates to put the thread to sleep
|
||||
if result == Scheduler.SLEEP:
|
||||
self.sleep( thread )
|
||||
# any other result indicates to run the yielded thread
|
||||
elif isinstance( result, ( tuple, list ) ):
|
||||
self.add( *result )
|
||||
else:
|
||||
self.add( result )
|
||||
|
||||
except StopIteration:
|
||||
self.__idle.acquire( blocking = False )
|
||||
self.__running.remove( thread )
|
||||
self.__messages.pop( thread, None )
|
||||
|
||||
def __idle_thread( self ):
|
||||
while not self.__done:
|
||||
# if the idle thread is the only one running, block until there's another running thread
|
||||
self.__idle.acquire( blocking = True )
|
||||
self.__idle.release()
|
||||
yield None
|
||||
|
||||
IDLE_SLEEP_SECONDS = 0.01
|
||||
def wait_for( self, thread ):
|
||||
"""
|
||||
Block until the given thread exits. Intended for use in unit tests only.
|
||||
|
||||
@type thread: generator
|
||||
@param thread: thread to wait for
|
||||
"""
|
||||
while thread in self.__running or thread in self.__sleeping:
|
||||
sleep( self.IDLE_SLEEP_SECONDS )
|
||||
|
||||
if self.__last_error:
|
||||
raise self.__last_error
|
||||
|
||||
def wait_until_idle( self ):
|
||||
"""
|
||||
Block until all threads have exited. Intended for use in unit tests only.
|
||||
"""
|
||||
while len( self.__running ) > 1 or len( self.__sleeping ) > 0:
|
||||
sleep( self.IDLE_SLEEP_SECONDS )
|
||||
|
||||
def sleep( self, thread ):
|
||||
"""
|
||||
Put the given thread to sleep so that is is no longer actively running.
|
||||
|
||||
@type thread: generator
|
||||
@param thread: thread to put to sleep
|
||||
"""
|
||||
self.__idle.acquire( blocking = False )
|
||||
self.__sleeping.append( thread )
|
||||
self.__running.remove( thread )
|
||||
|
||||
def add( self, thread, *args ):
|
||||
"""
|
||||
Add the given thread to the running list for this Scheduler, and wake it up if it's asleep.
|
||||
|
||||
@type thread: generator
|
||||
@param thread: thread to add
|
||||
@type args: tuple
|
||||
@param args: arguments to send() to the given thread when it is executed
|
||||
"""
|
||||
if thread is None:
|
||||
return
|
||||
|
||||
self.__idle.release()
|
||||
|
||||
if thread in self.__sleeping:
|
||||
self.__sleeping.remove( thread )
|
||||
else:
|
||||
self.__messages[ thread ] = [ ( None, ) ]
|
||||
|
||||
self.__running.append( thread )
|
||||
|
||||
if len( args ) > 0:
|
||||
self.__messages[ thread ].append( args )
|
||||
|
||||
def shutdown( self ):
|
||||
"""
|
||||
Stop all running threads and shutdown the Scheduler.
|
||||
"""
|
||||
self.__done = True
|
||||
self.__idle.release()
|
||||
self.__scheduler_thread.join()
|
||||
|
||||
# currently executing microthread (if any)
|
||||
thread = property( lambda self: self.__thread )
|
|
@ -1,70 +0,0 @@
|
|||
import re
|
||||
from Persistent import Persistent
|
||||
from controller.Html_nuker import Html_nuker
|
||||
|
||||
|
||||
class Note( Persistent ):
|
||||
"""
|
||||
An single textual wiki note.
|
||||
"""
|
||||
TITLE_PATTERN = re.compile( u"<h3>(.*?)</h3>", flags = re.IGNORECASE )
|
||||
|
||||
def __setstate__( self, state ):
|
||||
if "_Note__deleted_from" not in state:
|
||||
state[ "_Note__deleted_from" ] = False
|
||||
|
||||
self.__dict__.update( state )
|
||||
|
||||
def __init__( self, id, contents = None ):
|
||||
"""
|
||||
Create a new note with the given id and contents.
|
||||
|
||||
@type id: unicode
|
||||
@param id: id of the note
|
||||
@type contents: unicode or NoneType
|
||||
@param contents: initial contents of the note (optional)
|
||||
@rtype: Note
|
||||
@return: newly constructed note
|
||||
"""
|
||||
Persistent.__init__( self, id )
|
||||
self.__title = None
|
||||
self.__contents = None or ""
|
||||
self.__deleted_from = None
|
||||
|
||||
self.__set_contents( contents, new_revision = False )
|
||||
|
||||
def __set_contents( self, contents, new_revision = True ):
|
||||
if new_revision:
|
||||
self.update_revision()
|
||||
self.__contents = contents
|
||||
|
||||
if contents is None:
|
||||
self.__title = None
|
||||
return
|
||||
|
||||
# parse title out of the beginning of the contents
|
||||
result = Note.TITLE_PATTERN.search( contents )
|
||||
|
||||
if result:
|
||||
self.__title = result.groups()[ 0 ]
|
||||
self.__title = Html_nuker( allow_refs = True ).nuke( self.__title )
|
||||
else:
|
||||
self.__title = None
|
||||
|
||||
def __set_deleted_from( self, deleted_from ):
|
||||
self.__deleted_from = deleted_from
|
||||
self.update_revision()
|
||||
|
||||
def to_dict( self ):
|
||||
d = Persistent.to_dict( self )
|
||||
d.update( dict(
|
||||
contents = self.__contents,
|
||||
title = self.__title,
|
||||
deleted_from = self.__deleted_from,
|
||||
) )
|
||||
|
||||
return d
|
||||
|
||||
contents = property( lambda self: self.__contents, __set_contents )
|
||||
title = property( lambda self: self.__title )
|
||||
deleted_from = property( lambda self: self.__deleted_from, __set_deleted_from )
|
|
@ -1,185 +0,0 @@
|
|||
from copy import copy
|
||||
from Note import Note
|
||||
from Persistent import Persistent
|
||||
|
||||
|
||||
class Notebook( Persistent ):
|
||||
"""
|
||||
A collection of wiki notes.
|
||||
"""
|
||||
|
||||
class UnknownNoteError( ValueError ):
|
||||
"""
|
||||
Indicates that an accessed note is not in this notebook.
|
||||
"""
|
||||
def __init__( self, note_id ):
|
||||
ValueError.__init__( self, note_id )
|
||||
|
||||
def __setstate__( self, state ):
|
||||
if "_Notebook__trash" not in state:
|
||||
state[ "_Notebook__trash" ] = None
|
||||
|
||||
self.__dict__.update( state )
|
||||
|
||||
def __init__( self, id, name, trash = None ):
|
||||
"""
|
||||
Create a new note with the given id and name.
|
||||
|
||||
@type id: unicode
|
||||
@param id: id of the notebook
|
||||
@type name: unicode
|
||||
@param name: name of this notebook
|
||||
@type trash: Notebook or NoneType
|
||||
@param trash: where deleted notes within this Notebook go to die (optional)
|
||||
@rtype: Notebook
|
||||
@return: newly constructed notebook
|
||||
"""
|
||||
Persistent.__init__( self, id )
|
||||
self.__name = name
|
||||
self.__trash = trash
|
||||
self.__notes = {} # map of note id to note
|
||||
self.__titles = {} # map of note title to note
|
||||
self.__startup_notes = [] # list of notes shown on startup
|
||||
|
||||
def add_note( self, note ):
|
||||
"""
|
||||
Add a note to this notebook.
|
||||
|
||||
@type note: Note
|
||||
@param note: note to add
|
||||
"""
|
||||
self.update_revision()
|
||||
self.__notes[ note.object_id ] = note
|
||||
self.__titles[ note.title ] = note
|
||||
|
||||
def remove_note( self, note ):
|
||||
"""
|
||||
Remove a note from this notebook.
|
||||
|
||||
@type note: Note
|
||||
@param note: note to remove
|
||||
@rtype: bool
|
||||
@return: True if the note was removed, False if the note wasn't in this notebook
|
||||
"""
|
||||
if self.__notes.pop( note.object_id, None ):
|
||||
self.update_revision()
|
||||
self.__titles.pop( note.title, None )
|
||||
if self.is_startup_note( note ):
|
||||
self.__startup_notes.remove( note )
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def lookup_note( self, note_id ):
|
||||
"""
|
||||
Return the note in this notebook with the given id.
|
||||
|
||||
@type note_id: unicode
|
||||
@param note_id: id of the note to return
|
||||
@rtype: Note or NoneType
|
||||
@return: note corresponding to the note id or None
|
||||
"""
|
||||
return self.__notes.get( note_id )
|
||||
|
||||
def lookup_note_by_title( self, title ):
|
||||
"""
|
||||
Return the note in this notebook with the given title.
|
||||
|
||||
@type title: unicode
|
||||
@param title: title of the note to return
|
||||
@rtype: Note or NoneType
|
||||
@return: note corresponding to the title or None
|
||||
"""
|
||||
return self.__titles.get( title )
|
||||
|
||||
def update_note( self, note, contents ):
|
||||
"""
|
||||
Update the given note with new contents. Bail if the note's contents are unchanged.
|
||||
|
||||
@type note: Note
|
||||
@param note: note to update
|
||||
@type contents: unicode
|
||||
@param contents: new textual contents for the note
|
||||
@raises UnknownNoteError: note to update is not in this notebook
|
||||
"""
|
||||
old_note = self.__notes.get( note.object_id )
|
||||
if old_note is None:
|
||||
raise Notebook.UnknownNoteError( note.object_id )
|
||||
|
||||
self.update_revision()
|
||||
self.__titles.pop( note.title, None )
|
||||
|
||||
note.contents = contents
|
||||
|
||||
self.__titles[ note.title ] = note
|
||||
|
||||
def add_startup_note( self, note ):
|
||||
"""
|
||||
Add the given note to be shown on startup. It must already be a note in this notebook.
|
||||
|
||||
@type note: Note
|
||||
@param note: note to be added for startup
|
||||
@rtype: bool
|
||||
@return: True if the note was added for startup
|
||||
@raises UnknownNoteError: given note is not in this notebook
|
||||
"""
|
||||
if self.__notes.get( note.object_id ) is None:
|
||||
raise Notebook.UnknownNoteError( note.object_id )
|
||||
|
||||
if not self.is_startup_note( note ):
|
||||
self.update_revision()
|
||||
self.__startup_notes.append( note )
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def remove_startup_note( self, note ):
|
||||
"""
|
||||
Remove the given note from being shown on startup.
|
||||
|
||||
@type note: Note
|
||||
@param note: note to be removed from startup
|
||||
@rtype: bool
|
||||
@return: True if the note was removed from startup
|
||||
"""
|
||||
if self.is_startup_note( note ):
|
||||
self.update_revision()
|
||||
self.__startup_notes.remove( note )
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def is_startup_note( self, note ):
|
||||
"""
|
||||
Return whether the given note is a startup note.
|
||||
|
||||
@type note: Note
|
||||
@param note: note to test for startup status
|
||||
@rtype bool
|
||||
@return: True if the note is a startup note
|
||||
"""
|
||||
return note.object_id in [ n.object_id for n in self.__startup_notes if n ]
|
||||
|
||||
def to_dict( self ):
|
||||
d = Persistent.to_dict( self )
|
||||
|
||||
# as an optimization, don't include the revisions list because it's not
|
||||
# currently used anywhere for Notebook objects
|
||||
del d[ "revisions_list" ]
|
||||
|
||||
d.update( dict(
|
||||
name = self.__name,
|
||||
trash = self.__trash,
|
||||
read_write = True,
|
||||
) )
|
||||
|
||||
return d
|
||||
|
||||
def __set_name( self, name ):
|
||||
self.__name = name
|
||||
self.update_revision()
|
||||
|
||||
name = property( lambda self: self.__name, __set_name )
|
||||
trash = property( lambda self: self.__trash )
|
||||
startup_notes = property( lambda self: [ note for note in copy( self.__startup_notes ) if note is not None ] )
|
||||
notes = property( lambda self: [ note for note in self.__notes.values() if note is not None ] )
|
|
@ -1,29 +0,0 @@
|
|||
from Persistent import Persistent
|
||||
|
||||
|
||||
class Password_reset( Persistent ):
|
||||
"""
|
||||
A request for a password reset.
|
||||
"""
|
||||
def __init__( self, id, email_address ):
|
||||
"""
|
||||
Create a password reset request with the given id.
|
||||
|
||||
@type id: unicode
|
||||
@param id: id of the password reset
|
||||
@type email_address: unicode
|
||||
@param email_address: where the reset confirmation was emailed
|
||||
@rtype: Password_reset
|
||||
@return: newly constructed password reset
|
||||
"""
|
||||
Persistent.__init__( self, id )
|
||||
self.__email_address = email_address
|
||||
self.__redeemed = False
|
||||
|
||||
def __set_redeemed( self, redeemed ):
|
||||
if redeemed != self.__redeemed:
|
||||
self.update_revision()
|
||||
self.__redeemed = redeemed
|
||||
|
||||
email_address = property( lambda self: self.__email_address )
|
||||
redeemed = property( lambda self: self.__redeemed, __set_redeemed )
|
|
@ -1,37 +0,0 @@
|
|||
from datetime import datetime
|
||||
|
||||
|
||||
class Persistent( object ):
|
||||
def __init__( self, object_id, secondary_id = None ):
|
||||
self.__object_id = object_id
|
||||
self.__secondary_id = secondary_id
|
||||
self.__revision = datetime.now()
|
||||
self.__revisions_list = [ self.__revision ]
|
||||
|
||||
def update_revision( self ):
|
||||
self.__revision = datetime.now()
|
||||
|
||||
# make a new copy of the list to prevent sharing of this list between different revisions
|
||||
self.__revisions_list = self.__revisions_list + [ self.__revision ]
|
||||
|
||||
def revision_id( self ):
|
||||
return "%s %s" % ( self.__object_id, self.__revision )
|
||||
|
||||
@staticmethod
|
||||
def make_revision_id( object_id, revision ):
|
||||
return "%s %s" % ( object_id, revision )
|
||||
|
||||
def full_secondary_id( self ):
|
||||
return "%s %s" % ( type( self ).__name__, self.secondary_id )
|
||||
|
||||
def to_dict( self ):
|
||||
return dict(
|
||||
object_id = self.__object_id,
|
||||
revision = self.__revision,
|
||||
revisions_list = self.__revisions_list,
|
||||
)
|
||||
|
||||
object_id = property( lambda self: self.__object_id )
|
||||
secondary_id = property( lambda self: self.__secondary_id )
|
||||
revision = property( lambda self: self.__revision )
|
||||
revisions_list = property( lambda self: self.__revisions_list )
|
|
@ -1,31 +0,0 @@
|
|||
from Persistent import Persistent
|
||||
|
||||
|
||||
class Read_only_notebook( Persistent ):
|
||||
"""
|
||||
A wrapper for Notebook that hides all of its destructive update functions.
|
||||
"""
|
||||
def __init__( self, id, notebook ):
|
||||
Persistent.__init__( self, id )
|
||||
self.__wrapped = notebook
|
||||
|
||||
def lookup_note( self, note_id ):
|
||||
return self.__wrapped.lookup_note( note_id )
|
||||
|
||||
def lookup_note_by_title( self, title ):
|
||||
return self.__wrapped.lookup_note_by_title( title )
|
||||
|
||||
def to_dict( self ):
|
||||
d = self.__wrapped.to_dict()
|
||||
del( d[ "trash" ] ) # don't expose the trash to read-only views of this notebook
|
||||
d.update( dict(
|
||||
object_id = self.object_id,
|
||||
read_write = False,
|
||||
) )
|
||||
|
||||
return d
|
||||
|
||||
name = property( lambda self: self.__wrapped.name )
|
||||
trash = None # read-only access doesn't give you access to the Notebook's trash
|
||||
notes = property( lambda self: self.__wrapped.notes )
|
||||
startup_notes = property( lambda self: self.__wrapped.startup_notes )
|
122
model/User.py
122
model/User.py
|
@ -1,122 +0,0 @@
|
|||
import sha
|
||||
import random
|
||||
from copy import copy
|
||||
from Persistent import Persistent
|
||||
|
||||
|
||||
class User( Persistent ):
|
||||
"""
|
||||
A user of this application.
|
||||
"""
|
||||
SALT_CHARS = [ chr( c ) for c in range( ord( "!" ), ord( "~" ) + 1 ) ]
|
||||
SALT_SIZE = 12
|
||||
|
||||
def __setstate__( self, state ):
|
||||
if "_User__storage_bytes" not in state:
|
||||
state[ "_User__storage_bytes" ] = 0
|
||||
if "_User__rate_plan" not in state:
|
||||
state[ "_User__rate_plan" ] = 0
|
||||
|
||||
self.__dict__.update( state )
|
||||
|
||||
def __init__( self, id, username, password, email_address, notebooks = None ):
|
||||
"""
|
||||
Create a new user with the given credentials and information.
|
||||
|
||||
@type id: unicode
|
||||
@param id: id of the user
|
||||
@type username: unicode
|
||||
@param username: unique user identifier for login purposes
|
||||
@type password: unicode
|
||||
@param password: secret password for login purposes
|
||||
@type email_address: unicode
|
||||
@param email_address: a hopefully valid email address
|
||||
@type notebooks: [ Notebook ]
|
||||
@param notebooks: list of notebooks (read-only and read-write) that this user has access to
|
||||
@rtype: User
|
||||
@return: newly created user
|
||||
"""
|
||||
Persistent.__init__( self, id, secondary_id = username )
|
||||
self.__salt = self.__create_salt()
|
||||
self.__password_hash = self.__hash_password( password )
|
||||
self.__email_address = email_address
|
||||
self.__notebooks = notebooks or []
|
||||
self.__storage_bytes = 0 # total storage bytes for this user's notebooks, notes, and revisions
|
||||
self.__rate_plan = 0 # each rate plan is an integer index into the array in config/Common.py
|
||||
|
||||
def __create_salt( self ):
|
||||
return "".join( [ random.choice( self.SALT_CHARS ) for i in range( self.SALT_SIZE ) ] )
|
||||
|
||||
def __hash_password( self, password ):
|
||||
if password is None or len( password ) == 0:
|
||||
return None
|
||||
|
||||
return sha.new( self.__salt + password ).hexdigest()
|
||||
|
||||
def check_password( self, password ):
|
||||
"""
|
||||
Check that the given password matches this user's password.
|
||||
|
||||
@type password: unicode
|
||||
@param password: password to check
|
||||
@rtype: bool
|
||||
@return: True if the password matches
|
||||
"""
|
||||
if self.__password_hash == None:
|
||||
return False
|
||||
|
||||
hash = self.__hash_password( password )
|
||||
if hash == self.__password_hash:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def has_access( self, notebook_id ):
|
||||
if notebook_id in [ notebook.object_id for notebook in self.__notebooks ]:
|
||||
return True
|
||||
|
||||
# a user who has read-write access to a notebook also has access to that notebook's trash
|
||||
if notebook_id in [ notebook.trash.object_id for notebook in self.__notebooks if notebook.trash ]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def to_dict( self ):
|
||||
d = Persistent.to_dict( self )
|
||||
d.update( dict(
|
||||
username = self.username,
|
||||
storage_bytes = self.__storage_bytes,
|
||||
rate_plan = self.__rate_plan,
|
||||
) )
|
||||
|
||||
return d
|
||||
|
||||
def __set_email_address( self, email_address ):
|
||||
self.update_revision()
|
||||
self.__email_address = email_address
|
||||
|
||||
def __set_password( self, password ):
|
||||
self.update_revision()
|
||||
self.__salt = self.__create_salt()
|
||||
self.__password_hash = self.__hash_password( password )
|
||||
|
||||
def __set_notebooks( self, notebooks ):
|
||||
self.update_revision()
|
||||
self.__notebooks = notebooks
|
||||
|
||||
def __set_storage_bytes( self, storage_bytes ):
|
||||
self.update_revision()
|
||||
self.__storage_bytes = storage_bytes
|
||||
|
||||
def __set_rate_plan( self, rate_plan ):
|
||||
self.update_revision()
|
||||
self.__rate_plan = rate_plan
|
||||
|
||||
username = property( lambda self: self.secondary_id )
|
||||
email_address = property( lambda self: self.__email_address, __set_email_address )
|
||||
password = property( None, __set_password )
|
||||
storage_bytes = property( lambda self: self.__storage_bytes, __set_storage_bytes )
|
||||
rate_plan = property( lambda self: self.__rate_plan, __set_rate_plan )
|
||||
|
||||
# the notebooks (read-only and read-write) that this user has access to
|
||||
notebooks = property( lambda self: copy( self.__notebooks ), __set_notebooks )
|
|
@ -1,48 +0,0 @@
|
|||
from copy import copy
|
||||
from Persistent import Persistent
|
||||
|
||||
|
||||
class User_list( Persistent ):
|
||||
"""
|
||||
A list of users.
|
||||
"""
|
||||
def __init__( self, id, secondary_id = None ):
|
||||
"""
|
||||
Create a list of users, and give the list the provided id.
|
||||
|
||||
@type id: unicode
|
||||
@param id: id of the user list
|
||||
@type secondary_id: unicode or NoneType
|
||||
@param secondary_id: convenience id for easy access (optional)
|
||||
@rtype: User_list
|
||||
@return: newly constructed user list
|
||||
"""
|
||||
Persistent.__init__( self, id, secondary_id )
|
||||
self.__users = []
|
||||
|
||||
def add_user( self, user ):
|
||||
"""
|
||||
Add a user to this list.
|
||||
|
||||
@type user: User
|
||||
@param user: user to add
|
||||
"""
|
||||
if user.object_id not in [ u.object_id for u in self.__users ]:
|
||||
self.update_revision()
|
||||
self.__users.append( user )
|
||||
|
||||
def remove_user( self, user ):
|
||||
"""
|
||||
Remove a user from this list.
|
||||
|
||||
@type user: User
|
||||
@param user: user to remove
|
||||
"""
|
||||
if user in self.__users:
|
||||
self.update_revision()
|
||||
self.__users.remove( user )
|
||||
|
||||
def __set_users( self, users ):
|
||||
self.__users = users
|
||||
|
||||
users = property( lambda self: copy( self.__users ), __set_users )
|
|
@ -1,87 +0,0 @@
|
|||
from model.Note import Note
|
||||
|
||||
|
||||
class Test_note( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = u"17"
|
||||
self.title = u"title goes here"
|
||||
self.contents = u"<h3>%s</h3>blah" % self.title
|
||||
|
||||
self.note = Note( self.object_id, self.contents )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.note.object_id == self.object_id
|
||||
assert self.note.contents == self.contents
|
||||
assert self.note.title == self.title
|
||||
assert self.note.deleted_from == None
|
||||
|
||||
def test_create_blank( self ):
|
||||
object_id = u"22"
|
||||
blank_note = Note( object_id )
|
||||
|
||||
assert blank_note.object_id == object_id
|
||||
assert blank_note.contents == None
|
||||
assert blank_note.title == None
|
||||
assert blank_note.deleted_from == None
|
||||
|
||||
def test_set_contents( self ):
|
||||
new_title = u"new title"
|
||||
new_contents = u"<h3>%s</h3>new blah" % new_title
|
||||
previous_revision = self.note.revision
|
||||
|
||||
self.note.contents = new_contents
|
||||
|
||||
assert self.note.contents == new_contents
|
||||
assert self.note.title == new_title
|
||||
assert self.note.deleted_from == None
|
||||
assert self.note.revision > previous_revision
|
||||
|
||||
def test_set_contents_with_html_title( self ):
|
||||
new_title = u"new title"
|
||||
new_contents = u"<h3>new<br /> title</h3>new blah"
|
||||
previous_revision = self.note.revision
|
||||
|
||||
self.note.contents = new_contents
|
||||
|
||||
# html should be stripped out of the title
|
||||
assert self.note.contents == new_contents
|
||||
assert self.note.title == new_title
|
||||
assert self.note.deleted_from == None
|
||||
assert self.note.revision > previous_revision
|
||||
|
||||
def test_set_contents_with_multiple_titles( self ):
|
||||
new_title = u"new title"
|
||||
new_contents = u"<h3>new<br /> title</h3>new blah<h3>other title</h3>hmm"
|
||||
previous_revision = self.note.revision
|
||||
|
||||
self.note.contents = new_contents
|
||||
|
||||
# should only use the first title
|
||||
assert self.note.contents == new_contents
|
||||
assert self.note.title == new_title
|
||||
assert self.note.deleted_from == None
|
||||
assert self.note.revision > previous_revision
|
||||
|
||||
def test_delete( self ):
|
||||
previous_revision = self.note.revision
|
||||
self.note.deleted_from = u"55"
|
||||
|
||||
assert self.note.deleted_from == u"55"
|
||||
assert self.note.revision > previous_revision
|
||||
|
||||
def test_undelete( self ):
|
||||
previous_revision = self.note.revision
|
||||
self.note.deleted_from = None
|
||||
|
||||
assert self.note.deleted_from == None
|
||||
assert self.note.revision > previous_revision
|
||||
|
||||
def test_to_dict( self ):
|
||||
d = self.note.to_dict()
|
||||
|
||||
assert d.get( "contents" ) == self.contents
|
||||
assert d.get( "title" ) == self.title
|
||||
assert d.get( "deleted_from" ) == None
|
||||
assert d.get( "object_id" ) == self.note.object_id
|
||||
assert d.get( "revision" )
|
||||
assert d.get( "revisions_list" )
|
|
@ -1,177 +0,0 @@
|
|||
from nose.tools import raises
|
||||
from model.Notebook import Notebook
|
||||
from model.Note import Note
|
||||
|
||||
|
||||
class Test_notebook( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = "17"
|
||||
self.trash_id = "18"
|
||||
self.name = u"my notebook"
|
||||
self.trash_name = u"trash"
|
||||
|
||||
self.trash = Notebook( self.trash_id, self.trash_name )
|
||||
self.notebook = Notebook( self.object_id, self.name, self.trash )
|
||||
self.note = Note( "19", u"<h3>title</h3>blah" )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.notebook.object_id == self.object_id
|
||||
assert self.notebook.name == self.name
|
||||
assert self.notebook.trash
|
||||
assert self.notebook.trash.object_id == self.trash_id
|
||||
assert self.notebook.trash.name == self.trash_name
|
||||
|
||||
def test_set_name( self ):
|
||||
new_name = u"my new notebook"
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.name = new_name
|
||||
|
||||
assert self.notebook.name == new_name
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
def test_add_and_lookup_note( self ):
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.add_note( self.note )
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
note = self.notebook.lookup_note( self.note.object_id )
|
||||
assert note == self.note
|
||||
|
||||
def test_lookup_unknown_note( self ):
|
||||
note = self.notebook.lookup_note( self.note.object_id )
|
||||
assert note == None
|
||||
|
||||
def test_add_and_lookup_note_by_title( self ):
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.add_note( self.note )
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
note = self.notebook.lookup_note_by_title( self.note.title )
|
||||
assert note == self.note
|
||||
|
||||
def test_lookup_unknown_note_by_title( self ):
|
||||
note = self.notebook.lookup_note( self.note.title )
|
||||
assert note == None
|
||||
|
||||
def test_remove_note( self ):
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.add_note( self.note )
|
||||
result = self.notebook.remove_note( self.note )
|
||||
assert result == True
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
note = self.notebook.lookup_note( self.note.object_id )
|
||||
assert note == None
|
||||
|
||||
note = self.notebook.lookup_note_by_title( self.note.title )
|
||||
assert note == None
|
||||
|
||||
assert not note in self.notebook.startup_notes
|
||||
|
||||
def test_remove_unknown_note( self ):
|
||||
revision = self.notebook.revision
|
||||
result = self.notebook.remove_note( self.note )
|
||||
assert result == False
|
||||
assert self.notebook.revision == revision
|
||||
|
||||
note = self.notebook.lookup_note( self.note.object_id )
|
||||
assert note == None
|
||||
|
||||
def test_update_note( self ):
|
||||
self.notebook.add_note( self.note )
|
||||
old_title = self.note.title
|
||||
|
||||
new_title = u"new title"
|
||||
new_contents = u"<h3>%s</h3>new blah" % new_title
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.update_note( self.note, new_contents )
|
||||
|
||||
assert self.note.contents == new_contents
|
||||
assert self.note.title == new_title
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
note = self.notebook.lookup_note( self.note.object_id )
|
||||
assert note == self.note
|
||||
|
||||
note = self.notebook.lookup_note_by_title( old_title )
|
||||
assert note == None
|
||||
|
||||
note = self.notebook.lookup_note_by_title( new_title )
|
||||
assert note == self.note
|
||||
|
||||
@raises( Notebook.UnknownNoteError )
|
||||
def test_update_unknown_note( self ):
|
||||
new_contents = u"<h3>new title</h3>new blah"
|
||||
self.notebook.update_note( self.note, new_contents )
|
||||
|
||||
def test_add_startup_note( self ):
|
||||
self.notebook.add_note( self.note )
|
||||
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
assert self.note in self.notebook.startup_notes
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
def test_add_duplicate_startup_note( self ):
|
||||
self.notebook.add_note( self.note )
|
||||
|
||||
previous_revision = self.notebook.revision
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
assert self.note in self.notebook.startup_notes
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
revision = self.notebook.revision
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
assert self.notebook.startup_notes.count( self.note ) == 1
|
||||
assert self.notebook.revision == revision
|
||||
|
||||
@raises( Notebook.UnknownNoteError )
|
||||
def test_add_unknown_startup_note( self ):
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
def test_remove_startup_note( self ):
|
||||
self.notebook.add_note( self.note )
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
previous_revision = self.notebook.revision
|
||||
result = self.notebook.remove_startup_note( self.note )
|
||||
|
||||
assert result == True
|
||||
assert not self.note in self.notebook.startup_notes
|
||||
assert self.notebook.revision > previous_revision
|
||||
|
||||
def test_remove_unknown_startup_note( self ):
|
||||
self.notebook.add_note( self.note )
|
||||
|
||||
revision = self.notebook.revision
|
||||
result = self.notebook.remove_startup_note( self.note )
|
||||
|
||||
assert result == False
|
||||
assert not self.note in self.notebook.startup_notes
|
||||
assert self.notebook.revision == revision
|
||||
|
||||
def test_to_dict( self ):
|
||||
d = self.notebook.to_dict()
|
||||
|
||||
assert d.get( "name" ) == self.name
|
||||
assert d.get( "trash" ) == self.trash
|
||||
assert d.get( "read_write" ) == True
|
||||
assert d.get( "object_id" ) == self.notebook.object_id
|
||||
assert d.get( "revision" )
|
||||
assert d.get( "revisions_list" ) == None
|
||||
|
||||
def test_to_dict_with_startup_notes( self ):
|
||||
self.notebook.add_note( self.note )
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
d = self.notebook.to_dict()
|
||||
|
||||
assert d.get( "name" ) == self.name
|
||||
assert d.get( "trash" ) == self.trash
|
||||
assert d.get( "read_write" ) == True
|
||||
assert d.get( "object_id" ) == self.notebook.object_id
|
||||
assert d.get( "revision" )
|
||||
assert d.get( "revisions_list" ) == None
|
|
@ -1,38 +0,0 @@
|
|||
from model.User import User
|
||||
from model.Password_reset import Password_reset
|
||||
|
||||
|
||||
class Test_password_reset( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = u"17"
|
||||
self.email_address = u"bob@example.com"
|
||||
|
||||
self.password_reset = Password_reset( self.object_id, self.email_address )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.password_reset.object_id == self.object_id
|
||||
assert self.password_reset.email_address == self.email_address
|
||||
assert self.password_reset.redeemed == False
|
||||
|
||||
def test_redeem( self ):
|
||||
previous_revision = self.password_reset.revision
|
||||
self.password_reset.redeemed = True
|
||||
|
||||
assert self.password_reset.redeemed == True
|
||||
assert self.password_reset.revision > previous_revision
|
||||
|
||||
def test_redeem_twice( self ):
|
||||
self.password_reset.redeemed = True
|
||||
current_revision = self.password_reset.revision
|
||||
self.password_reset.redeemed = True
|
||||
|
||||
assert self.password_reset.redeemed == True
|
||||
assert self.password_reset.revision == current_revision
|
||||
|
||||
def test_unredeem( self ):
|
||||
self.password_reset.redeemed = True
|
||||
previous_revision = self.password_reset.revision
|
||||
self.password_reset.redeemed = False
|
||||
|
||||
assert self.password_reset.redeemed == False
|
||||
assert self.password_reset.revision > previous_revision
|
|
@ -1,51 +0,0 @@
|
|||
from datetime import datetime, timedelta
|
||||
from model.Persistent import Persistent
|
||||
|
||||
|
||||
class Test_persistent( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = "17"
|
||||
self.obj = Persistent( self.object_id )
|
||||
self.delta = timedelta( seconds = 1 )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.obj.object_id == self.object_id
|
||||
assert self.obj.secondary_id == None
|
||||
assert datetime.now() - self.obj.revision < self.delta
|
||||
|
||||
def test_revision_id( self ):
|
||||
assert self.obj.revision_id() == "%s %s" % ( self.object_id, self.obj.revision )
|
||||
|
||||
def test_make_revision_id( self ):
|
||||
assert self.obj.revision_id() == Persistent.make_revision_id( self.object_id, self.obj.revision )
|
||||
|
||||
def test_update_revision( self ):
|
||||
previous_revision = self.obj.revision
|
||||
self.obj.update_revision()
|
||||
assert self.obj.revision > previous_revision
|
||||
assert datetime.now() - self.obj.revision < self.delta
|
||||
|
||||
previous_revision = self.obj.revision
|
||||
self.obj.update_revision()
|
||||
assert self.obj.revision > previous_revision
|
||||
assert datetime.now() - self.obj.revision < self.delta
|
||||
|
||||
def test_to_dict( self ):
|
||||
d = self.obj.to_dict()
|
||||
|
||||
assert d.get( "object_id" ) == self.object_id
|
||||
assert d.get( "revision" ) == self.obj.revision
|
||||
assert d.get( "revisions_list" ) == self.obj.revisions_list
|
||||
|
||||
|
||||
class Test_persistent_with_secondary_id( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = "17"
|
||||
self.secondary_id = u"foo"
|
||||
self.obj = Persistent( self.object_id, self.secondary_id )
|
||||
self.delta = timedelta( seconds = 1 )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.obj.object_id == self.object_id
|
||||
assert self.obj.secondary_id == self.secondary_id
|
||||
assert datetime.now() - self.obj.revision < self.delta
|
|
@ -1,78 +0,0 @@
|
|||
from nose.tools import raises
|
||||
from model.Notebook import Notebook
|
||||
from model.Read_only_notebook import Read_only_notebook
|
||||
from model.Note import Note
|
||||
|
||||
|
||||
class Test_read_only_notebook( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = "17"
|
||||
self.read_only_id = "22"
|
||||
self.trash_id = 18
|
||||
self.name = u"my notebook"
|
||||
self.trash_name = u"trash"
|
||||
|
||||
self.trash = Notebook( self.trash_id, self.trash_name )
|
||||
self.notebook = Notebook( self.object_id, self.name, self.trash )
|
||||
self.note = Note( "19", u"<h3>title</h3>blah" )
|
||||
self.notebook.add_note( self.note )
|
||||
self.notebook.add_startup_note( self.note )
|
||||
|
||||
self.read_only = Read_only_notebook( self.read_only_id, self.notebook )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.read_only.object_id == self.read_only_id
|
||||
assert self.read_only.name == self.name
|
||||
assert self.read_only.trash == None
|
||||
assert self.read_only.notes == [ self.note ]
|
||||
assert self.read_only.startup_notes == [ self.note ]
|
||||
|
||||
@raises( AttributeError )
|
||||
def test_set_name( self ):
|
||||
self.read_only.name = u"my new notebook"
|
||||
|
||||
@raises( AttributeError )
|
||||
def test_add_note( self ):
|
||||
self.read_only.add_note( self.note )
|
||||
|
||||
def test_lookup_note( self ):
|
||||
note = self.read_only.lookup_note( self.note.object_id )
|
||||
assert note == self.note
|
||||
|
||||
def test_lookup_unknown_note( self ):
|
||||
note = self.read_only.lookup_note( "55" )
|
||||
assert note == None
|
||||
|
||||
def test_lookup_note_by_title( self ):
|
||||
note = self.read_only.lookup_note_by_title( self.note.title )
|
||||
assert note == self.note
|
||||
|
||||
def test_lookup_unknown_note_by_title( self ):
|
||||
note = self.read_only.lookup_note( self.note.title )
|
||||
assert note == None
|
||||
|
||||
@raises( AttributeError )
|
||||
def test_remove_note( self ):
|
||||
self.read_only.remove_note( self.note )
|
||||
|
||||
@raises( AttributeError )
|
||||
def test_update_note( self ):
|
||||
new_title = u"new title"
|
||||
new_contents = u"<h3>%s</h3>new blah" % new_title
|
||||
self.read_only.update_note( self.note, new_contents )
|
||||
|
||||
@raises( AttributeError )
|
||||
def test_add_startup_note( self ):
|
||||
self.read_only.add_startup_note( self.note )
|
||||
|
||||
@raises( AttributeError )
|
||||
def test_remove_startup_note( self ):
|
||||
self.read_only.remove_startup_note( self.note )
|
||||
|
||||
def test_to_dict( self ):
|
||||
d = self.read_only.to_dict()
|
||||
|
||||
assert d.get( "object_id" ) == self.read_only_id
|
||||
assert d.get( "name" ) == self.name
|
||||
assert d.get( "trash" ) == None
|
||||
assert d.get( "read_write" ) == False
|
|
@ -1,134 +0,0 @@
|
|||
from nose.tools import raises
|
||||
from model.User import User
|
||||
from model.Notebook import Notebook
|
||||
|
||||
|
||||
class Test_user( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = u"17"
|
||||
self.username = u"bob"
|
||||
self.password = u"foobar"
|
||||
self.email_address = u"bob@example.com"
|
||||
|
||||
self.user = User( self.object_id, self.username, self.password, self.email_address )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.user.username == self.username
|
||||
assert self.user.email_address == self.email_address
|
||||
assert self.user.notebooks == []
|
||||
assert self.user.storage_bytes == 0
|
||||
assert self.user.rate_plan == 0
|
||||
|
||||
def test_check_correct_password( self ):
|
||||
assert self.user.check_password( self.password ) == True
|
||||
|
||||
def test_check_incorrect_password( self ):
|
||||
assert self.user.check_password( u"wrong" ) == False
|
||||
|
||||
def test_set_password( self ):
|
||||
previous_revision = self.user.revision
|
||||
new_password = u"newpass"
|
||||
self.user.password = new_password
|
||||
|
||||
assert self.user.check_password( self.password ) == False
|
||||
assert self.user.check_password( new_password ) == True
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
def test_set_none_password( self ):
|
||||
previous_revision = self.user.revision
|
||||
new_password = None
|
||||
self.user.password = new_password
|
||||
|
||||
assert self.user.check_password( self.password ) == False
|
||||
assert self.user.check_password( new_password ) == False
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
def test_set_notebooks( self ):
|
||||
previous_revision = self.user.revision
|
||||
notebook_id = u"33"
|
||||
notebook = Notebook( notebook_id, u"my notebook" )
|
||||
self.user.notebooks = [ notebook ]
|
||||
|
||||
assert len( self.user.notebooks ) == 1
|
||||
assert self.user.notebooks[ 0 ].object_id == notebook_id
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
def test_set_storage_bytes( self ):
|
||||
previous_revision = self.user.revision
|
||||
storage_bytes = 44
|
||||
self.user.storage_bytes = storage_bytes
|
||||
|
||||
assert self.user.storage_bytes == storage_bytes
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
def test_set_rate_plan( self ):
|
||||
previous_revision = self.user.revision
|
||||
rate_plan = 2
|
||||
self.user.rate_plan = rate_plan
|
||||
|
||||
assert self.user.rate_plan == rate_plan
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
|
||||
class Test_user_with_notebooks( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = u"17"
|
||||
self.username = u"bob"
|
||||
self.password = u"foobar"
|
||||
self.email_address = u"bob@example.com"
|
||||
trash = Notebook( u"32", u"trash" )
|
||||
|
||||
self.notebooks = [
|
||||
Notebook( u"33", u"my notebook", trash ),
|
||||
Notebook( u"34", u"my other notebook" ),
|
||||
]
|
||||
|
||||
self.user = User( self.object_id, self.username, self.password, self.email_address, self.notebooks )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.user.username == self.username
|
||||
assert self.user.email_address == self.email_address
|
||||
assert self.user.notebooks == self.notebooks
|
||||
|
||||
def test_set_existing_notebooks( self ):
|
||||
previous_revision = self.user.revision
|
||||
self.user.notebooks = [ self.notebooks[ 1 ] ]
|
||||
|
||||
assert len( self.user.notebooks ) == 1
|
||||
assert self.user.notebooks[ 0 ].object_id == self.notebooks[ 1 ].object_id
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
def test_set_new_notebooks( self ):
|
||||
previous_revision = self.user.revision
|
||||
notebook_id = u"35"
|
||||
notebook = Notebook( notebook_id, u"my new notebook" )
|
||||
self.user.notebooks = [ notebook ]
|
||||
|
||||
assert len( self.user.notebooks ) == 1
|
||||
assert self.user.notebooks[ 0 ].object_id == notebook_id
|
||||
assert self.user.revision > previous_revision
|
||||
|
||||
def test_has_access_true( self ):
|
||||
assert self.user.has_access( self.notebooks[ 0 ].object_id ) == True
|
||||
|
||||
def test_has_access_false( self ):
|
||||
notebook_id = u"35"
|
||||
notebook = Notebook( notebook_id, u"my new notebook" )
|
||||
assert self.user.has_access( notebook.object_id ) == False
|
||||
|
||||
def test_has_access_to_trash_true( self ):
|
||||
assert self.user.has_access( self.notebooks[ 0 ].trash.object_id ) == True
|
||||
|
||||
def test_has_access_to_trash_false( self ):
|
||||
notebook_id = u"35"
|
||||
trash_id = u"36"
|
||||
trash = Notebook( trash_id, u"trash" )
|
||||
notebook = Notebook( notebook_id, u"my new notebook", trash )
|
||||
assert self.user.has_access( notebook.object_id ) == False
|
||||
|
||||
def test_to_dict( self ):
|
||||
d = self.user.to_dict()
|
||||
|
||||
assert d.get( "username" ) == self.username
|
||||
assert d.get( "storage_bytes" ) == self.user.storage_bytes
|
||||
assert d.get( "rate_plan" ) == self.user.rate_plan
|
|
@ -1,57 +0,0 @@
|
|||
from model.User import User
|
||||
from model.User_list import User_list
|
||||
|
||||
|
||||
class Test_user_list( object ):
|
||||
def setUp( self ):
|
||||
self.object_id = u"17"
|
||||
self.secondary_id = u"mylist"
|
||||
|
||||
self.user_list = User_list( self.object_id, self.secondary_id )
|
||||
self.user = User( u"18", u"bob", u"pass", u"bob@example.com" )
|
||||
self.user2 = User( u"19", u"rob", u"pass2", u"rob@example.com" )
|
||||
|
||||
def test_create( self ):
|
||||
assert self.user_list.object_id == self.object_id
|
||||
assert self.user_list.secondary_id == self.secondary_id
|
||||
assert self.user_list.users == []
|
||||
|
||||
def test_add_user( self ):
|
||||
previous_revision = self.user_list.revision
|
||||
self.user_list.add_user( self.user )
|
||||
|
||||
assert self.user_list.users == [ self.user ]
|
||||
assert self.user_list.revision > previous_revision
|
||||
|
||||
def test_add_user_twice( self ):
|
||||
self.user_list.add_user( self.user )
|
||||
current_revision = self.user_list.revision
|
||||
self.user_list.add_user( self.user )
|
||||
|
||||
assert self.user_list.users == [ self.user ]
|
||||
assert self.user_list.revision == current_revision
|
||||
|
||||
def test_add_two_users( self ):
|
||||
previous_revision = self.user_list.revision
|
||||
self.user_list.add_user( self.user )
|
||||
self.user_list.add_user( self.user2 )
|
||||
|
||||
assert self.user_list.users == [ self.user, self.user2 ]
|
||||
assert self.user_list.revision > previous_revision
|
||||
|
||||
def test_remove_user( self ):
|
||||
self.user_list.add_user( self.user )
|
||||
previous_revision = self.user_list.revision
|
||||
self.user_list.remove_user( self.user )
|
||||
|
||||
assert self.user_list.users == []
|
||||
assert self.user_list.revision > previous_revision
|
||||
|
||||
def test_remove_user_twice( self ):
|
||||
self.user_list.add_user( self.user )
|
||||
self.user_list.remove_user( self.user )
|
||||
current_revision = self.user_list.revision
|
||||
self.user_list.remove_user( self.user )
|
||||
|
||||
assert self.user_list.users == []
|
||||
assert self.user_list.revision == current_revision
|
|
@ -1,4 +1,4 @@
|
|||
from model.User import User
|
||||
from new_model.User import User
|
||||
from new_model.Password_reset import Password_reset
|
||||
|
||||
|
||||
|
|
|
@ -1,167 +0,0 @@
|
|||
#!/usr/bin/python2.5
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import psycopg2 as psycopg
|
||||
from pytz import timezone, utc
|
||||
from datetime import datetime
|
||||
from controller.Old_database import Old_database
|
||||
from controller.Scheduler import Scheduler
|
||||
|
||||
|
||||
pacific = timezone( "US/Pacific" )
|
||||
|
||||
|
||||
def quote( value ):
|
||||
if value is None:
|
||||
return "null"
|
||||
|
||||
# if this is a datetime, assume it's in the Pacific timezone, and then convert it to UTC
|
||||
if isinstance( value, datetime ):
|
||||
value = value.replace( tzinfo = pacific ).astimezone( utc )
|
||||
|
||||
value = unicode( value )
|
||||
|
||||
return "'%s'" % value.replace( "'", "''" ).replace( "\\", "\\\\" )
|
||||
|
||||
|
||||
class Converter( object ):
|
||||
"""
|
||||
Converts a Luminotes database from bsddb to PostgreSQL, using the old bsddb controller.Old_database.
|
||||
This assumes that the PostgreSQL schema from model/schema.sql is already in the database.
|
||||
"""
|
||||
def __init__( self, scheduler, database ):
|
||||
self.scheduler = scheduler
|
||||
self.database = database
|
||||
|
||||
self.conn = psycopg.connect( "dbname=luminotes user=luminotes password=dev" )
|
||||
self.cursor = self.conn.cursor()
|
||||
|
||||
thread = self.convert_database()
|
||||
self.scheduler.add( thread )
|
||||
self.scheduler.wait_for( thread )
|
||||
|
||||
def convert_database( self ):
|
||||
inserts = set()
|
||||
notes = {} # map of note object id to its notebook
|
||||
startup_notes = {} # map of startup note object id to its notebook
|
||||
|
||||
for key in self.database._Old_database__db.keys():
|
||||
if not self.database._Old_database__db.get( key ):
|
||||
continue
|
||||
|
||||
self.database.load( key, self.scheduler.thread )
|
||||
value = ( yield Scheduler.SLEEP )
|
||||
|
||||
class_name = value.__class__.__name__
|
||||
|
||||
if class_name == "Notebook":
|
||||
if ( value.object_id, value.revision ) in inserts: continue
|
||||
inserts.add( ( value.object_id, value.revision ) )
|
||||
|
||||
self.cursor.execute(
|
||||
"insert into notebook " +
|
||||
"( id, revision, name, trash_id ) " +
|
||||
"values ( %s, %s, %s, %s );" %
|
||||
( quote( value.object_id ), quote( value.revision ), quote( value.name ), value.trash and quote( value.trash.object_id ) or "null" )
|
||||
)
|
||||
|
||||
for note in value.notes:
|
||||
notes[ note.object_id ] = value
|
||||
for startup_note in value.startup_notes:
|
||||
startup_notes[ startup_note.object_id ] = value
|
||||
elif class_name == "Note":
|
||||
if ( value.object_id, value.revision ) in inserts: continue
|
||||
inserts.add( ( value.object_id, value.revision ) )
|
||||
|
||||
# notebook_id, startup, and rank are all set below since they're pulled out of Notebook objects
|
||||
self.cursor.execute(
|
||||
"insert into note " +
|
||||
"( id, revision, title, contents, notebook_id, startup, deleted_from_id, rank ) " +
|
||||
"values ( %s, %s, %s, %s, %s, %s, %s, %s );" %
|
||||
( quote( value.object_id ), quote( value.revision ), quote( value.title or None ), quote( value.contents or None ), quote( None ), quote( "f" ), quote( value.deleted_from or None ), quote( None ) )
|
||||
)
|
||||
elif class_name == "User":
|
||||
if value.username is None: continue # note: this will skip all demo users
|
||||
|
||||
if ( value.object_id, value.revision ) in inserts: continue
|
||||
inserts.add( ( value.object_id, value.revision ) )
|
||||
|
||||
self.cursor.execute(
|
||||
"insert into luminotes_user " +
|
||||
"( id, revision, username, salt, password_hash, email_address, storage_bytes, rate_plan ) " +
|
||||
"values ( %s, %s, %s, %s, %s, %s, %s, %s );" %
|
||||
( quote( value.object_id ), quote( value.revision ), quote( value.username ), quote( value._User__salt ), quote( value._User__password_hash ), quote( value.email_address ), value.storage_bytes, value.rate_plan )
|
||||
)
|
||||
for notebook in value.notebooks:
|
||||
if notebook is None: continue
|
||||
|
||||
read_only = ( notebook.__class__.__name__ == "Read_only_notebook" )
|
||||
if read_only:
|
||||
notebook_id = notebook._Read_only_notebook__wrapped.object_id
|
||||
else:
|
||||
notebook_id = notebook.object_id
|
||||
|
||||
if ( value.object_id, notebook_id ) in inserts: continue
|
||||
inserts.add( ( value.object_id, notebook_id ) )
|
||||
|
||||
self.cursor.execute(
|
||||
"insert into user_notebook " +
|
||||
"( user_id, notebook_id, read_write ) " +
|
||||
"values ( %s, %s, %s );" %
|
||||
( quote( value.object_id ), quote( notebook_id ),
|
||||
quote( read_only and "f" or "t" ) )
|
||||
)
|
||||
if notebook.trash:
|
||||
self.cursor.execute(
|
||||
"insert into user_notebook " +
|
||||
"( user_id, notebook_id, read_write ) " +
|
||||
"values ( %s, %s, %s );" %
|
||||
( quote( value.object_id ), quote( notebook.trash.object_id ),
|
||||
quote( read_only and "f" or "t" ) )
|
||||
)
|
||||
elif class_name == "Read_only_notebook":
|
||||
pass
|
||||
elif class_name == "Password_reset":
|
||||
if value.redeemed == True: continue
|
||||
|
||||
if ( value.object_id, value.revision ) in inserts: continue
|
||||
inserts.add( ( value.object_id, value.revision ) )
|
||||
|
||||
self.cursor.execute(
|
||||
"insert into password_reset " +
|
||||
"( id, email_address, redeemed ) " +
|
||||
"values ( %s, %s, %s );" %
|
||||
( quote( value.object_id ), quote( value.email_address ), quote( value.redeemed and "t" or "f" ) )
|
||||
)
|
||||
elif class_name == "User_list":
|
||||
pass
|
||||
else:
|
||||
raise Exception( "Unconverted value of type %s" % class_name )
|
||||
|
||||
for ( note_id, notebook ) in notes.items():
|
||||
self.cursor.execute(
|
||||
"update note set notebook_id = %s where id = %s" % ( quote( notebook.object_id ), quote( note_id ) )
|
||||
)
|
||||
|
||||
for ( startup_note_id, notebook ) in startup_notes.items():
|
||||
startup_ids = [ note.object_id for note in notebook.startup_notes ]
|
||||
rank = startup_ids.index( startup_note_id )
|
||||
|
||||
self.cursor.execute(
|
||||
"update note set startup = 't', rank = %s where id = %s" % ( rank, quote( startup_note_id ) )
|
||||
)
|
||||
|
||||
self.conn.commit()
|
||||
yield None
|
||||
|
||||
|
||||
def main():
|
||||
scheduler = Scheduler()
|
||||
database = Old_database( scheduler, "data.db" )
|
||||
initializer = Converter( scheduler, database )
|
||||
scheduler.wait_until_idle()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -1,186 +0,0 @@
|
|||
#!/usr/bin/python2.5
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import psycopg2 as psycopg
|
||||
from controller.Old_database import Old_database
|
||||
from controller.Scheduler import Scheduler
|
||||
|
||||
|
||||
def quote( value ):
|
||||
if value is None:
|
||||
return "null"
|
||||
|
||||
value = unicode( value )
|
||||
return "'%s'" % value.replace( "'", "''" ).replace( "\\", "\\\\" )
|
||||
|
||||
|
||||
class Verifier( object ):
|
||||
"""
|
||||
Verifies a conversion of a Luminotes database from bsddb to PostgreSQL that was performed with
|
||||
convertdb.py.
|
||||
"""
|
||||
def __init__( self, scheduler, database ):
|
||||
self.scheduler = scheduler
|
||||
self.database = database
|
||||
|
||||
self.conn = psycopg.connect( "dbname=luminotes user=luminotes password=dev" )
|
||||
self.cursor = self.conn.cursor()
|
||||
|
||||
thread = self.verify_database()
|
||||
self.scheduler.add( thread )
|
||||
self.scheduler.wait_for( thread )
|
||||
|
||||
def verify_database( self ):
|
||||
inserts = set()
|
||||
|
||||
for key in self.database._Old_database__db.keys():
|
||||
if not self.database._Old_database__db.get( key ):
|
||||
continue
|
||||
|
||||
self.database.load( key, self.scheduler.thread )
|
||||
value = ( yield Scheduler.SLEEP )
|
||||
|
||||
class_name = value.__class__.__name__
|
||||
|
||||
if class_name == "Notebook":
|
||||
self.verify_notebook( value )
|
||||
elif class_name == "Note":
|
||||
self.cursor.execute(
|
||||
"select * from note where id = %s and revision = %s;" % ( quote( value.object_id ), quote( value.revision ) )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == value.object_id
|
||||
assert row[ 1 ].replace( tzinfo = None ) == value.revision
|
||||
assert row[ 2 ] == ( value.title and value.title.encode( "utf8" ) or None )
|
||||
assert row[ 3 ] == ( value.contents and value.contents.encode( "utf8" ) or None )
|
||||
# not checking for existence of row 4 (notebook_id), because notes deleted from the trash don't have a notebook id
|
||||
assert row[ 5 ] is not None
|
||||
assert row[ 6 ] == ( value.deleted_from or None )
|
||||
if row[ 5 ] is True: # if this is a startup note, it should have a rank
|
||||
assert row[ 7 ] is not None
|
||||
elif class_name == "User":
|
||||
# skip demo users
|
||||
if value.username is None: continue
|
||||
|
||||
self.cursor.execute(
|
||||
"select * from luminotes_user where id = %s and revision = %s;" % ( quote( value.object_id ), quote( value.revision ) )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == value.object_id
|
||||
assert row[ 1 ].replace( tzinfo = None ) == value.revision
|
||||
assert row[ 2 ] == value.username
|
||||
assert row[ 3 ] == value._User__salt
|
||||
assert row[ 4 ] == value._User__password_hash
|
||||
assert row[ 5 ] == value.email_address
|
||||
assert row[ 6 ] == value.storage_bytes
|
||||
assert row[ 7 ] == value.rate_plan
|
||||
|
||||
for notebook in value.notebooks:
|
||||
if notebook is None: continue
|
||||
|
||||
read_write = ( notebook.__class__.__name__ == "Notebook" )
|
||||
|
||||
self.cursor.execute(
|
||||
"select * from user_notebook where user_id = %s and notebook_id = %s;" % ( quote( value.object_id ), quote( notebook.object_id ) )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == value.object_id
|
||||
assert row[ 1 ] == notebook.object_id
|
||||
assert row[ 2 ] == read_write
|
||||
|
||||
if notebook.trash:
|
||||
self.cursor.execute(
|
||||
"select * from user_notebook where user_id = %s and notebook_id = %s;" % ( quote( value.object_id ), quote( notebook.trash.object_id ) )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == value.object_id
|
||||
assert row[ 1 ] == notebook.trash.object_id
|
||||
assert row[ 2 ] == read_write
|
||||
|
||||
self.verify_notebook( notebook )
|
||||
|
||||
elif class_name == "Read_only_notebook":
|
||||
self.verify_notebook( value._Read_only_notebook__wrapped )
|
||||
elif class_name == "Password_reset":
|
||||
# skip password resets that are already redeemed
|
||||
if value.redeemed: continue
|
||||
|
||||
self.cursor.execute(
|
||||
"select * from password_reset where id = %s;" % quote( value.object_id )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == value.object_id
|
||||
assert row[ 1 ] == value.email_address
|
||||
assert row[ 2 ] == False
|
||||
elif class_name == "User_list":
|
||||
pass
|
||||
else:
|
||||
raise Exception( "Unverified value of type %s" % class_name )
|
||||
|
||||
self.conn.commit()
|
||||
yield None
|
||||
|
||||
def verify_notebook( self, value ):
|
||||
self.cursor.execute(
|
||||
"select * from notebook where id = %s and revision = %s;" % ( quote( value.object_id ), quote( value.revision ) )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == value.object_id
|
||||
assert row[ 1 ].replace( tzinfo = None ) == value.revision
|
||||
assert row[ 2 ] == value.name
|
||||
if value.trash:
|
||||
assert row[ 3 ] == value.trash.object_id
|
||||
else:
|
||||
assert row[ 3 ] == None
|
||||
|
||||
startup_note_ids = [ note.object_id for note in value.startup_notes ]
|
||||
for note in value.notes:
|
||||
self.cursor.execute(
|
||||
"select * from note where id = %s and revision = %s;" % ( quote( note.object_id ), quote( value.revision ) )
|
||||
)
|
||||
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == note.object_id
|
||||
assert row[ 1 ].replace( tzinfo = None ) == note.revision
|
||||
assert row[ 2 ] == note.title
|
||||
assert row[ 3 ] == note.contents
|
||||
assert row[ 4 ] == value.object_id
|
||||
assert row[ 5 ] == ( note.object_id in startup_note_ids )
|
||||
assert row[ 6 ] == note.deleted_from
|
||||
if row[ 5 ] is True: # if this is a startup note, it should have a rank
|
||||
assert row[ 7 ] is not None
|
||||
|
||||
for note in value.startup_notes:
|
||||
self.cursor.execute(
|
||||
"select * from note where id = %s and revision = %s order by rank;" % ( quote( note.object_id ), quote( value.revision ) )
|
||||
)
|
||||
|
||||
rank = 0
|
||||
for row in self.cursor.fetchmany():
|
||||
assert row[ 0 ] == note.object_id
|
||||
assert row[ 1 ].replace( tzinfo = None ) == note.revision
|
||||
assert row[ 2 ] == note.title
|
||||
assert row[ 3 ] == note.contents
|
||||
assert row[ 4 ] == value.object_id
|
||||
assert row[ 5 ] == True
|
||||
assert row[ 6 ] == note.deleted_from
|
||||
assert row[ 7 ] == rank
|
||||
rank += 1
|
||||
|
||||
|
||||
def main():
|
||||
scheduler = Scheduler()
|
||||
database = Old_database( scheduler, "data.db" )
|
||||
initializer = Verifier( scheduler, database )
|
||||
scheduler.wait_until_idle()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue