witten
/
luminotes
Archived
1
0
Fork 0

Began conversion of all controller unit tests to use a real SQLite database. Still incomplete.

This commit is contained in:
Dan Helfman 2008-08-19 14:22:40 -07:00
parent d0d0946788
commit 95577bff55
6 changed files with 146 additions and 803 deletions

View File

@ -659,7 +659,7 @@ class Users( object ):
@rtype: int
@return: total bytes used for storage
"""
return sum( self.__database.select_one( tuple, user.sql_calculate_storage() ), 0 )
return sum( self.__database.select_one( tuple, user.sql_calculate_storage( self.__database.backend ) ), 0 )
def calculate_group_storage( self, user ):
"""
@ -674,8 +674,7 @@ class Users( object ):
def update_storage( self, user_id, commit = True ):
"""
If there is a storage quota for the user's rate plan, calculate and record total storage
utilization for the given user. If there isn't a storage quota, then bail.
Calculate and record total storage utilization for the given user.
@type user_id: unicode
@param user_id: id of user for which to calculate storage utilization
@ -689,10 +688,6 @@ class Users( object ):
if user is None:
return None
plan = self.__rate_plans[ user.rate_plan ]
if not plan[ "storage_quota_bytes" ]:
return user
user.storage_bytes = self.calculate_storage( user )
self.__database.save( user, commit )
@ -1212,7 +1207,7 @@ class Users( object ):
result = self.current( anonymous.object_id )
result[ "notebook" ] = main_notebook
result[ "startup_notes" ] = self.__database.select_many( Note, main_notebook.sql_load_startup_notes() )
result[ "total_notes_count" ] = self.__database.select_one( Note, main_notebook.sql_count_notes(), use_cache = True )
result[ "total_notes_count" ] = self.__database.select_one( int, main_notebook.sql_count_notes(), use_cache = True )
result[ "note_read_write" ] = False
result[ "notes" ] = [ Note.create(
object_id = u"redeem_invite",

View File

@ -7,3 +7,7 @@ class Stub_cache( object ):
def set( self, key, value ):
self.__objects[ key ] = value
def delete( self, key ):
if key in self.__objects:
del( self.__objects[ key ] )

View File

@ -1,85 +0,0 @@
from copy import copy
from model.User import User
class Stub_database( object ):
def __init__( self, connection = None ):
# map of object id to list of saved objects (presumably in increasing order of revisions)
self.objects = {}
self.user_notebook = {} # map of user_id to ( notebook_id, read_write, owner )
self.user_group = {} # map of user_id to ( group_id, admin )
self.last_saved_obj = None
self.last_saved_user = None
self.__next_id = 0
def save( self, obj, commit = False ):
self.last_saved_obj = obj
if isinstance( obj, User ):
self.last_saved_user = obj
if obj.object_id in self.objects:
self.objects[ obj.object_id ].append( copy( obj ) )
else:
self.objects[ obj.object_id ] = [ copy( obj ) ]
def load( self, Object_type, object_id, revision = None ):
obj_list = self.objects.get( object_id )
if not obj_list:
return None
# if a particular revision wasn't requested, just return the most recently saved object
# matching the given object_id
if revision is None:
if not isinstance( obj_list[ -1 ], Object_type ):
return None
return copy( obj_list[ -1 ] )
# a particular revision was requested, so pick it out of the objects matching the given id
matching_objs = [ obj for obj in obj_list if str( obj.revision ) == str( revision ) ]
if len( matching_objs ) > 0:
if not isinstance( matching_objs[ -1 ], Object_type ):
return None
return copy( matching_objs[ -1 ] )
return None
def select_one( self, Object_type, sql_command, use_cache = False ):
if callable( sql_command ):
result = sql_command( self )
if isinstance( result, list ):
if len( result ) == 0: return None
return result[ 0 ]
return result
raise NotImplementedError( sql_command )
def select_many( self, Object_type, sql_command ):
if callable( sql_command ):
result = sql_command( self )
if isinstance( result, list ):
return result
return [ result ]
raise NotImplementedError( sql_command )
def execute( self, sql_command, commit = False ):
if callable( sql_command ):
return sql_command( self )
raise NotImplementedError( sql_command )
def uncache_command( self, sql_command ):
pass
def next_id( self, Object_type, commit = True ):
self.__next_id += 1
return unicode( self.__next_id )
def commit( self ):
pass
def rollback( self ):
pass
def close( self ):
pass

View File

@ -1,6 +1,8 @@
import smtplib
import cherrypy
from Stub_database import Stub_database
from pysqlite2 import dbapi2 as sqlite
from controller.Database import Database, Connection_wrapper
from Stub_cache import Stub_cache
from Stub_view import Stub_view
from Stub_smtp import Stub_smtp
from config import Common
@ -40,475 +42,6 @@ class Test_controller( object ):
from model.User_revision import User_revision
from model.File import File
# Since Stub_database isn't a real database and doesn't know SQL, replace some of the
# SQL-returning methods in User, Note, and Notebook to return functions that manipulate data in
# Stub_database directly instead. This is all a little fragile, but it's better than relying on
# the presence of a real database for unit tests.
def sql_save_notebook( self, notebook_id, read_write, owner, rank, database ):
if self.object_id in database.user_notebook:
database.user_notebook[ self.object_id ].append( ( notebook_id, read_write, owner, rank ) )
else:
database.user_notebook[ self.object_id ] = [ ( notebook_id, read_write, owner, rank ) ]
User.sql_save_notebook = lambda self, notebook_id, read_write = False, owner = False, rank = None: \
lambda database: sql_save_notebook( self, notebook_id, read_write, owner, rank, database )
def sql_remove_notebook( self, notebook_id, database ):
if self.object_id in database.user_notebook:
for notebook_info in database.user_notebook[ self.object_id ]:
if notebook_info[ 0 ] == notebook_id:
database.user_notebook[ self.object_id ].remove( notebook_info )
User.sql_remove_notebook = lambda self, notebook_id: \
lambda database: sql_remove_notebook( self, notebook_id, database )
def sql_load_notebooks( self, parents_only, undeleted_only, read_write, database ):
notebooks = []
notebook_infos = database.user_notebook.get( self.object_id )
if not notebook_infos: return []
for notebook_info in notebook_infos:
( notebook_id, notebook_read_write, owner, rank ) = notebook_info
notebook = database.objects.get( notebook_id )[ -1 ]
notebook.read_write = notebook_read_write
notebook.owner = owner
notebook.rank = rank
if parents_only and notebook.trash_id is None:
continue
if undeleted_only and notebook.deleted is True:
continue
if read_write and notebook_read_write is False:
continue
notebooks.append( notebook )
notebooks.sort( lambda a, b: a.rank is None and 1 or cmp( a.rank, b.rank ) )
return notebooks
User.sql_load_notebooks = lambda self, parents_only = False, undeleted_only = False, read_write = False: \
lambda database: sql_load_notebooks( self, parents_only, undeleted_only, read_write, database )
def sql_load_by_username( username, database ):
users = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, User ) and obj.username == username:
users.append( obj )
return users
User.sql_load_by_username = staticmethod( lambda username: \
lambda database: sql_load_by_username( username, database ) )
def sql_load_by_email_address( email_address, database ):
users = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, User ) and obj.email_address == email_address:
users.append( obj )
return users
User.sql_load_by_email_address = staticmethod( lambda email_address: \
lambda database: sql_load_by_email_address( email_address, database ) )
def sql_calculate_storage( self, database ):
return ( 17, 3, 4, 22 ) # rather than actually calculating anything, return arbitrary numbers
User.sql_calculate_storage = lambda self: \
lambda database: sql_calculate_storage( self, database )
def sql_calculate_group_storage( self, database ):
return ( 0, 0 )
User.sql_calculate_group_storage = lambda self: \
lambda database: sql_calculate_group_storage( self, database )
def sql_has_access( self, notebook_id, read_write, owner, database ):
for ( user_id, notebook_infos ) in database.user_notebook.items():
for notebook_info in notebook_infos:
( db_notebook_id, db_read_write, db_owner, rank ) = notebook_info
if self.object_id == user_id and notebook_id == db_notebook_id:
if read_write is True and db_read_write is False:
return False
if owner is True and db_owner is False:
return False
return True
return False
User.sql_has_access = lambda self, notebook_id, read_write = False, owner = False: \
lambda database: sql_has_access( self, notebook_id, read_write, owner, database )
def sql_update_access( self, notebook_id, read_write, owner, database ):
for ( user_id, notebook_infos ) in database.user_notebook.items():
for notebook_info in notebook_infos:
( db_notebook_id, db_read_write, db_owner, rank ) = notebook_info
if self.object_id == user_id and notebook_id == db_notebook_id:
notebook_infos_copy = list( notebook_infos )
notebook_infos_copy.remove( notebook_info )
notebook_infos_copy.append( ( notebook_id, read_write, owner, rank ) )
database.user_notebook[ user_id ] = notebook_infos_copy
User.sql_update_access = lambda self, notebook_id, read_write = False, owner = False: \
lambda database: sql_update_access( self, notebook_id, read_write, owner, database )
def sql_update_notebook_rank( self, notebook_id, rank, database ):
max_rank = -1
for ( user_id, notebook_infos ) in database.user_notebook.items():
for notebook_info in notebook_infos:
( db_notebook_id, db_read_write, db_owner, db_rank ) = notebook_info
if self.object_id == user_id and notebook_id == db_notebook_id:
notebook_infos_copy = list( notebook_infos )
notebook_infos_copy.remove( notebook_info )
notebook_infos_copy.append( ( db_notebook_id, db_read_write, db_owner, rank ) )
database.user_notebook[ user_id ] = notebook_infos_copy
User.sql_update_notebook_rank = lambda self, notebook_id, rank: \
lambda database: sql_update_notebook_rank( self, notebook_id, rank, database )
def sql_highest_notebook_rank( self, database ):
max_rank = -1
for ( user_id, notebook_infos ) in database.user_notebook.items():
for notebook_info in notebook_infos:
( db_notebook_id, db_read_write, db_owner, db_rank ) = notebook_info
if self.object_id == user_id and db_rank > max_rank:
max_rank = db_rank
return max_rank
User.sql_highest_notebook_rank = lambda self: \
lambda database: sql_highest_notebook_rank( self, database )
def sql_load_groups( self, database ):
groups = []
group_infos = database.user_group.get( self.object_id )
if not group_infos: return []
for group_info in group_infos:
( group_id, admin ) = group_info
group = database.objects.get( group_id )[ -1 ]
group.admin = admin
groups.append( group )
groups.sort( lambda a, b: cmp( a.name, b.name ) )
return groups
User.sql_load_groups = lambda self: \
lambda database: sql_load_groups( self, database )
def sql_save_group( self, group_id, admin, database ):
if self.object_id in database.user_group:
database.user_group[ self.object_id ].append( ( group_id, admin ) )
else:
database.user_group[ self.object_id ] = [ ( group_id, admin ) ]
User.sql_save_group = lambda self, group_id, admin = False: \
lambda database: sql_save_group( self, group_id, admin, database )
def sql_remove_group( self, group_id, database ):
for ( user_id, group_infos ) in database.user_group.items():
for group_info in group_infos:
( db_group_id, db_admin ) = group_info
if self.object_id == user_id and group_id == db_group_id:
database.user_group[ user_id ].remove( group_info )
User.sql_remove_group = lambda self, group_id: \
lambda database: sql_remove_group( self, group_id, database )
def sql_in_group( self, group_id, admin, database ):
for ( user_id, group_infos ) in database.user_group.items():
for group_info in group_infos:
( db_group_id, db_admin ) = group_info
if self.object_id == user_id and group_id == db_group_id:
if admin is True and db_admin is False:
return False
return True
return False
User.sql_in_group = lambda self, group_id, admin = False: \
lambda database: sql_in_group( self, group_id, admin, database )
def sql_update_group_admin( self, group_id, admin, database ):
for ( user_id, group_infos ) in database.user_group.items():
for group_info in group_infos:
( db_group_id, db_admin ) = group_info
if self.object_id == user_id and group_id == db_group_id:
group_infos.remove( group_info )
group_infos.append( ( db_group_id, admin ) )
User.sql_update_group_admin = lambda self, group_id, admin = False: \
lambda database: sql_update_group_admin( self, group_id, admin, database )
def sql_revoke_invite_access( notebook_id, trash_id, email_address, database ):
invites = []
for ( user_id, notebook_infos ) in database.user_notebook.items():
for notebook_info in list( notebook_infos ):
( db_notebook_id, read_write, owner, rank ) = notebook_info
if db_notebook_id not in ( notebook_id, trash_id ): continue
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Invite ) and obj.notebook_id == notebook_id and \
obj.email_address == email_address:
database.user_notebook[ user_id ].remove( notebook_info )
User.sql_revoke_invite_access = staticmethod( lambda notebook_id, trash_id, email_address: \
lambda database: sql_revoke_invite_access( notebook_id, trash_id, email_address, database ) )
def sql_load_users( self, admin, database ):
users = []
for ( user_id, group_infos ) in database.user_group.items():
for group_info in group_infos:
( db_group_id, db_admin ) = group_info
if db_group_id != self.object_id: continue
if admin is True and db_admin != True: continue
if admin is False and db_admin != False: continue
user = database.objects.get( user_id )[ -1 ]
users.append( user )
users.sort( lambda a, b: cmp( a.username, b.username ) )
return users
Group.sql_load_users = lambda self, admin = None: \
lambda database: sql_load_users( self, admin, database )
def sql_load_revisions( self, database ):
note_list = database.objects.get( self.object_id )
if not note_list: return None
revisions = []
for note in note_list:
user_list = database.objects.get( note.user_id )
user_id = None
username = None
if user_list:
user_id = user_list[ -1 ].object_id
username = user_list[ -1 ].username
revisions.append( User_revision( note.revision, user_id, username ) )
return revisions
Note.sql_load_revisions = lambda self: \
lambda database: sql_load_revisions( self, database )
def sql_load_notes( self, start, count, database ):
notes = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Note ) and obj.notebook_id == self.object_id:
notes.append( obj )
notes.sort( lambda a, b: -cmp( a.revision, b.revision ) )
if count is None:
return notes[ start : ]
else:
return notes[ start : start + count ]
Notebook.sql_load_notes = lambda self, start = 0, count = None: \
lambda database: sql_load_notes( self, start, count, database )
def sql_load_startup_notes( self, database ):
notes = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Note ) and obj.notebook_id == self.object_id and obj.startup:
notes.append( obj )
return notes
Notebook.sql_load_startup_notes = lambda self: \
lambda database: sql_load_startup_notes( self, database )
def sql_load_recent_notes( self, database, start, count ):
notes = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Note ) and obj.notebook_id == self.object_id:
obj = copy( obj )
obj._Note__creation = database.objects[ object_id ][ 0 ].revision
notes.append( obj )
notes.sort( lambda a, b: -cmp( a.creation, b.creation ) )
notes = notes[ start : start + count ]
return notes
Notebook.sql_load_recent_notes = lambda self, start = 0, count = 10: \
lambda database: sql_load_recent_notes( self, database, start, count )
def sql_load_note_by_title( self, title, database ):
notes = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Note ) and obj.notebook_id == self.object_id and obj.title.lower() == title.lower():
notes.append( obj )
return notes
Notebook.sql_load_note_by_title = lambda self, title: \
lambda database: sql_load_note_by_title( self, title, database )
def sql_search_notes( user_id, first_notebook_id, search_text, database ):
first_notes = []
other_notes = []
search_text = search_text.lower()
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if not isinstance( obj, Note ):
continue
if user_id in database.user_notebook:
for notebook_info in database.user_notebook[ user_id ]:
if notebook_info[ 0 ] != obj.notebook_id:
continue
if obj.deleted_from_id == None and \
search_text in obj.contents.lower():
if obj.notebook_id == first_notebook_id:
first_notes.append( obj )
else:
other_notes.append( obj )
return first_notes + other_notes
Notebook.sql_search_notes = staticmethod( lambda user_id, first_notebook_id, search_text: \
lambda database: sql_search_notes( user_id, first_notebook_id, search_text, database ) )
def sql_search_titles( notebook_id, search_text, database ):
notes = []
search_text = search_text.lower()
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if not isinstance( obj, Note ):
continue
if obj.deleted_from_id == None and \
obj.notebook_id == notebook_id and \
search_text in obj.title:
obj.summary = obj.title
notes.append( obj )
return notes
Notebook.sql_search_titles = staticmethod( lambda notebook_id, search_text: \
lambda database: sql_search_titles( notebook_id, search_text, database ) )
def sql_highest_note_rank( self, database ):
max_rank = -1
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Note ) and obj.notebook_id == self.object_id and obj.rank > max_rank:
max_rank = obj.rank
return max_rank
Notebook.sql_highest_note_rank = lambda self: \
lambda database: sql_highest_note_rank( self, database )
def sql_count_notes( self, database ):
count = 0
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Note ) and obj.notebook_id == self.object_id:
count += 1
return count
Notebook.sql_count_notes = lambda self: \
lambda database: sql_count_notes( self, database )
def sql_load_similar( self, database ):
invites = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Invite ) and obj.notebook_id == self.notebook_id and \
obj.email_address == self.email_address and \
obj.object_id != self.object_id:
invites.append( obj )
return invites
Invite.sql_load_similar = lambda self: \
lambda database: sql_load_similar( self, database )
def sql_load_notebook_invites( notebook_id, database ):
invites = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Invite ) and obj.notebook_id == notebook_id and \
obj.email_address not in [ i.email_address for i in invites ]:
invites.append( obj )
return invites
Invite.sql_load_notebook_invites = staticmethod( lambda notebook_id:
lambda database: sql_load_notebook_invites( notebook_id, database ) )
def sql_revoke_invites( self, database ):
invites = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Invite ) and obj.notebook_id == self.notebook_id and \
obj.email_address == self.email_address:
del( database.objects[ object_id ] )
Invite.sql_revoke_invites = lambda self: \
lambda database: sql_revoke_invites( self, database )
def sql_load_note_files( note_id, database ):
files = []
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, File ) and obj.note_id == note_id:
files.append( obj )
return files
File.sql_load_note_files = staticmethod( lambda note_id:
lambda database: sql_load_note_files( note_id, database ) )
def sql_delete( self, database ):
del( database.objects[ self.object_id ] )
File.sql_delete = lambda self: \
lambda database: sql_delete( self, database )
def setUp( self ):
# trick tested methods into using a fake SMTP server
Stub_smtp.reset()
@ -516,7 +49,12 @@ class Test_controller( object ):
from controller.Root import Root
cherrypy.lowercase_api = True
self.database = Stub_database()
self.database = Database(
Connection_wrapper( sqlite.connect( ":memory:", detect_types = sqlite.PARSE_DECLTYPES, check_same_thread = False ) ),
cache = Stub_cache(),
)
self.database.execute_script( file( "model/schema.sqlite" ).read(), commit = True )
self.settings = {
u"global": {
u"server.environment": "production",
@ -578,6 +116,7 @@ class Test_controller( object ):
controller.Expose.view_override = Stub_view
def tearDown( self ):
self.database.close()
cherrypy.server.stop()
def http_get( self, http_path, headers = None, session_id = None, pretend_https = False ):

View File

@ -908,6 +908,7 @@ class Test_files( Test_controller ):
def test_upload( self, filename = None ):
self.login()
orig_storage_bytes = self.user.storage_bytes
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
@ -937,7 +938,6 @@ class Test_files( Test_controller ):
assert Upload_file.open_file( self.file_id ).read() == self.file_data
# assert that storage bytes increased
orig_storage_bytes = self.user.storage_bytes
user = self.database.load( User, self.user.object_id )
assert user.storage_bytes > orig_storage_bytes
@ -1279,6 +1279,7 @@ class Test_files( Test_controller ):
def test_stats( self ):
self.login()
orig_storage_bytes = self.user.storage_bytes
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
@ -1300,7 +1301,6 @@ class Test_files( Test_controller ):
assert result[ u"filename" ] == self.filename
assert result[ u"size_bytes" ] == len( self.file_data )
orig_storage_bytes = self.user.storage_bytes
user = self.database.load( User, self.user.object_id )
assert result[ u"storage_bytes" ] == user.storage_bytes
assert user.storage_bytes > orig_storage_bytes
@ -1376,6 +1376,8 @@ class Test_files( Test_controller ):
session_id = self.session_id,
)
orig_storage_bytes = self.user.storage_bytes
result = self.http_post(
"/files/delete",
dict(
@ -1388,7 +1390,6 @@ class Test_files( Test_controller ):
assert db_file is None
assert not Upload_file.exists( self.file_id )
orig_storage_bytes = self.user.storage_bytes
user = self.database.load( User, self.user.object_id )
assert result[ u"storage_bytes" ] == user.storage_bytes
assert user.storage_bytes != orig_storage_bytes
@ -2070,7 +2071,7 @@ class Test_files( Test_controller ):
def test_purge_unused( self ):
self.login()
self.http_upload(
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
@ -2082,6 +2083,9 @@ class Test_files( Test_controller ):
session_id = self.session_id,
)
db_file = self.database.load( File, self.file_id )
assert db_file
# the file is not linked to from the note's contents, so this should delete it
cherrypy.root.files.purge_unused( self.note )

View File

@ -87,7 +87,7 @@ class Test_users( Test_controller ):
self.anonymous = User.create( self.database.next_id( User ), u"anonymous" )
self.database.save( self.anonymous, commit = False )
self.database.execute( self.anonymous.sql_save_notebook( self.anon_notebook.object_id ), commit = False )
self.database.execute( self.anonymous.sql_save_notebook( self.anon_notebook.object_id, read_write = False, owner = False ), commit = False )
self.database.commit()
@ -149,6 +149,9 @@ class Test_users( Test_controller ):
assert u"error" in result
def __get_recent_user( self ):
return self.database.select_one( User, "select * from luminotes_user order by revision desc limit 1;" );
def test_current_after_signup( self ):
result = self.http_post( "/users/signup", dict(
username = self.new_username,
@ -161,7 +164,7 @@ class Test_users( Test_controller ):
new_notebook_id = result[ u"redirect" ].split( u"/notebooks/" )[ -1 ]
user = self.database.last_saved_obj
user = self.__get_recent_user()
assert isinstance( user, User )
result = cherrypy.root.users.current( user.object_id )
@ -171,6 +174,15 @@ class Test_users( Test_controller ):
notebooks = result[ u"notebooks" ]
notebook = notebooks[ 0 ]
assert notebook.object_id == notebooks[ 1 ].trash_id
assert notebook.revision
assert notebook.name == u"trash"
assert notebook.trash_id == None
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == None
notebook = notebooks[ 1 ]
assert notebook.object_id == new_notebook_id
assert notebook.revision
assert notebook.name == u"my notebook"
@ -179,15 +191,6 @@ class Test_users( Test_controller ):
assert notebook.owner == True
assert notebook.rank == 0
notebook = notebooks[ 1 ]
assert notebook.object_id == notebooks[ 0 ].trash_id
assert notebook.revision
assert notebook.name == u"trash"
assert notebook.trash_id == None
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == None
notebook = notebooks[ 2 ]
assert notebook.object_id == self.anon_notebook.object_id
assert notebook.revision == self.anon_notebook.revision
@ -237,7 +240,7 @@ class Test_users( Test_controller ):
invite_notebook_id = result[ u"redirect" ].split( u"/notebooks/" )[ -1 ]
assert invite_notebook_id == self.notebooks[ 0 ].object_id
user = self.database.last_saved_user
user = self.__get_recent_user()
assert isinstance( user, User )
result = cherrypy.root.users.current( user.object_id )
@ -298,7 +301,7 @@ class Test_users( Test_controller ):
assert result[ u"redirect" ] == u"/users/subscribe?rate_plan=2&yearly=False"
user = self.database.last_saved_obj
user = self.__get_recent_user()
assert isinstance( user, User )
result = cherrypy.root.users.current( user.object_id )
@ -308,6 +311,15 @@ class Test_users( Test_controller ):
notebooks = result[ u"notebooks" ]
notebook = notebooks[ 0 ]
assert notebook.object_id == notebooks[ 1 ].trash_id
assert notebook.revision
assert notebook.name == u"trash"
assert notebook.trash_id == None
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == None
notebook = notebooks[ 1 ]
assert notebook.object_id
assert notebook.revision
assert notebook.name == u"my notebook"
@ -316,15 +328,6 @@ class Test_users( Test_controller ):
assert notebook.owner == True
assert notebook.rank == 0
notebook = notebooks[ 1 ]
assert notebook.object_id == notebooks[ 0 ].trash_id
assert notebook.revision
assert notebook.name == u"trash"
assert notebook.trash_id == None
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == None
notebook = notebooks[ 2 ]
assert notebook.object_id == self.anon_notebook.object_id
assert notebook.revision == self.anon_notebook.revision
@ -619,7 +622,7 @@ class Test_users( Test_controller ):
new_notebook_id = result[ u"redirect" ].split( u"/notebooks/" )[ -1 ]
user = self.database.last_saved_obj
user = self.__get_recent_user()
assert isinstance( user, User )
result = cherrypy.root.users.current( user.object_id )
@ -630,6 +633,15 @@ class Test_users( Test_controller ):
notebooks = result[ u"notebooks" ]
assert len( notebooks ) == 3
notebook = notebooks[ 0 ]
assert notebook.object_id == notebooks[ 1 ].trash_id
assert notebook.revision
assert notebook.name == u"trash"
assert notebook.trash_id == None
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == None
notebook = notebooks[ 1 ]
assert notebook.object_id == new_notebook_id
assert notebook.revision
assert notebook.name == u"my notebook"
@ -638,15 +650,6 @@ class Test_users( Test_controller ):
assert notebook.owner == True
assert notebook.rank == 0
notebook = notebooks[ 1 ]
assert notebook.object_id == notebooks[ 0 ].trash_id
assert notebook.revision
assert notebook.name == u"trash"
assert notebook.trash_id == None
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == None
notebook = notebooks[ 2 ]
assert notebook.object_id == self.anon_notebook.object_id
assert notebook.revision == self.anon_notebook.revision
@ -671,7 +674,7 @@ class Test_users( Test_controller ):
new_notebook_id = result[ u"redirect" ].split( u"/notebooks/" )[ -1 ]
user = self.database.last_saved_obj
user = self.__get_recent_user()
assert isinstance( user, User )
result = cherrypy.root.users.current( user.object_id )
@ -733,26 +736,26 @@ class Test_users( Test_controller ):
assert result[ u"user" ].object_id == self.user.object_id
assert result[ u"user" ].username == self.user.username
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == u"trash"
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 0 ].rank == 0
assert result[ u"notebooks" ][ 0 ].rank == None
assert result[ u"notebooks" ][ 1 ].object_id
assert result[ u"notebooks" ][ 1 ].name == u"trash"
assert result[ u"notebooks" ][ 1 ].read_write == True
assert result[ u"notebooks" ][ 1 ].owner == True
assert result[ u"notebooks" ][ 1 ].rank == None
assert result[ u"notebooks" ][ 2 ].object_id == self.notebooks[ 1 ].object_id
assert result[ u"notebooks" ][ 2 ].name == self.notebooks[ 1 ].name
assert result[ u"notebooks" ][ 2 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 2 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 2 ].read_write == True
assert result[ u"notebooks" ][ 2 ].owner == True
assert result[ u"notebooks" ][ 2 ].rank == 1
assert result[ u"notebooks" ][ 3 ].object_id
assert result[ u"notebooks" ][ 3 ].name == u"trash"
assert result[ u"notebooks" ][ 2 ].rank == 0
assert result[ u"notebooks" ][ 3 ].object_id == self.notebooks[ 1 ].object_id
assert result[ u"notebooks" ][ 3 ].name == self.notebooks[ 1 ].name
assert result[ u"notebooks" ][ 3 ].read_write == True
assert result[ u"notebooks" ][ 3 ].owner == True
assert result[ u"notebooks" ][ 3 ].rank == None
assert result[ u"notebooks" ][ 3 ].rank == 1
assert result[ u"notebooks" ][ 4 ].object_id == self.anon_notebook.object_id
assert result[ u"notebooks" ][ 4 ].name == self.anon_notebook.name
assert result[ u"notebooks" ][ 4 ].read_write == False
@ -878,17 +881,17 @@ class Test_users( Test_controller ):
assert self.user.revision == original_revision
def test_update_storage_without_quota( self ):
original_revision = self.user.revision
self.settings[ u"global" ][ u"luminotes.rate_plans" ][ 0 ][ u"storage_bytes" ] = None
previous_revision = self.user.revision
cherrypy.root.users.update_storage( self.user.object_id )
expected_size = cherrypy.root.users.calculate_storage( self.user )
user = self.database.load( User, self.user.object_id )
assert self.user.storage_bytes == 0
assert self.user.group_storage_bytes == 0
assert self.user.revision == original_revision
assert user.storage_bytes == expected_size
assert user.group_storage_bytes == 0
assert user.revision > previous_revision
def test_check_access( self ):
access = cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].object_id )
@ -1417,10 +1420,7 @@ class Test_users( Test_controller ):
assert invite_id
# assert that the invite has the read_write / owner flags set appropriately
invites = self.database.objects.get( invite_id )
assert invites
assert len( invites ) == 1
invite = invites[ -1 ]
invite = self.database.load( Invite, invite_id )
assert invite
assert invite.read_write is False
assert invite.owner is False
@ -1470,10 +1470,7 @@ class Test_users( Test_controller ):
assert invite_id
# assert that the invite has the read_write / owner flags set appropriately
invites = self.database.objects.get( invite_id )
assert invites
assert len( invites ) == 1
invite = invites[ -1 ]
invite = self.database.load( Invite, invite_id )
assert invite
assert invite.read_write is False
assert invite.owner is False
@ -1515,10 +1512,7 @@ class Test_users( Test_controller ):
assert invite_id
# assert that the invite has the read_write / owner flags set appropriately
invites = self.database.objects.get( invite_id )
assert invites
assert len( invites ) == 1
invite = invites[ -1 ]
invite = self.database.load( Invite, invite_id )
assert invite
assert invite.read_write is True
assert invite.owner is False
@ -1560,10 +1554,7 @@ class Test_users( Test_controller ):
assert invite_id
# assert that the invite has the read_write / owner flags set appropriately
invites = self.database.objects.get( invite_id )
assert invites
assert len( invites ) == 1
invite = invites[ -1 ]
invite = self.database.load( Invite, invite_id )
assert invite
assert invite.read_write is True
assert invite.owner is True
@ -1681,29 +1672,27 @@ class Test_users( Test_controller ):
), session_id = self.session_id )
invites = result[ u"invites" ]
assert len( invites ) == 1
assert len( invites ) == 2
invite = invites[ 0 ]
assert invite
assert invite.read_write is True
assert invite.owner is True
invite = invites[ 1 ]
assert invite
assert invite.read_write is True
assert invite.owner is True
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id2 = matches.group( 2 )
assert invite_id2
# assert that both invites have the read_write / owner flags set to True now
invite1_list = self.database.objects.get( invite_id1 )
assert invite1_list
assert len( invite1_list ) >= 2
invite1 = invite1_list[ -1 ]
invite1 = self.database.load( Invite, invite_id1 )
assert invite1
assert invite1.read_write is True
assert invite1.owner is True
invite2_list = self.database.objects.get( invite_id2 )
assert invite2_list
assert len( invite2_list ) >= 1
invite2 = invite2_list[ -1 ]
invite2 = self.database.load( Invite, invite_id2 )
assert invite2
assert invite2.read_write is True
assert invite2.owner is True
@ -1758,30 +1747,29 @@ class Test_users( Test_controller ):
invite_button = u"send invites",
), session_id = self.session_id )
print result
invites = result[ u"invites" ]
assert len( invites ) == 1
assert len( invites ) == 2
invite = invites[ 0 ]
assert invite
assert invite.read_write is False
assert invite.owner is False
invite = invites[ 1 ]
assert invite
assert invite.read_write is False
assert invite.owner is False
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id2 = matches.group( 2 )
assert invite_id2
# assert that both invites have the read_write / owner flags set to False now
invite1_list = self.database.objects.get( invite_id1 )
assert invite1_list
assert len( invite1_list ) >= 2
invite1 = invite1_list[ -1 ]
invite1 = self.database.load( Invite, invite_id1 )
assert invite1
assert invite1.read_write is False
assert invite1.owner is False
invite2_list = self.database.objects.get( invite_id2 )
assert invite2_list
assert len( invite2_list ) >= 1
invite2 = invite2_list[ -1 ]
invite2 = self.database.load( Invite, invite_id2 )
assert invite2
assert invite2.read_write is False
assert invite2.owner is False
@ -1934,10 +1922,9 @@ class Test_users( Test_controller ):
assert "access" in result[ u"error" ]
def test_send_invites_without_any_access( self ):
self.login()
self.login2()
self.database.user_notebook = {}
self.user.rate_plan = 1
self.user2.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
@ -1957,8 +1944,7 @@ class Test_users( Test_controller ):
def test_send_invites_without_read_write_access( self ):
self.login()
self.database.user_notebook = {}
self.database.execute( self.user.sql_save_notebook( self.notebooks[ 0 ].object_id, read_write = False, owner = True ) )
self.database.execute( self.user.sql_update_access( self.notebooks[ 0 ].object_id, read_write = False, owner = True ) )
self.user.rate_plan = 1
self.database.save( self.user )
@ -1979,8 +1965,7 @@ class Test_users( Test_controller ):
def test_send_invites_without_owner_access( self ):
self.login()
self.database.user_notebook = {}
self.database.execute( self.user.sql_save_notebook( self.notebooks[ 0 ].object_id, read_write = True, owner = False ) )
self.database.execute( self.user.sql_update_access( self.notebooks[ 0 ].object_id, read_write = True, owner = False ) )
self.user.rate_plan = 1
self.database.save( self.user )
@ -2236,7 +2221,7 @@ class Test_users( Test_controller ):
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
assert len( result[ u"invites" ] ) == 1
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
@ -2457,6 +2442,8 @@ class Test_users( Test_controller ):
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
self.login2()
self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id,
@ -2469,94 +2456,10 @@ class Test_users( Test_controller ):
assert result[ u"error" ]
assert u"already" in result[ u"error" ]
def test_redeem_invite_already_redeemed( self ):
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
del( self.database.objects[ self.notebooks[ 0 ].object_id ] )
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id,
) )
assert result[ u"error" ]
assert u"unknown" in result[ u"error" ]
def test_redeem_invite_missing_anonymous_user( self ):
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
del( self.database.objects[ self.anonymous.object_id ] )
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id,
) )
assert result[ u"error" ]
def test_redeem_invite_missing_invite_notebook( self ):
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
del( self.database.objects[ self.notebooks[ 0 ].object_id ] )
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id,
) )
assert result[ u"error" ]
def test_convert_invite_to_access( self ):
# start the invitee out with access to one notebook
self.database.execute( self.user2.sql_save_notebook( self.notebooks[ 1 ].object_id, read_write = True, owner = False, rank = 7 ), commit = False )
self.database.commit()
self.login()
@ -2595,7 +2498,6 @@ class Test_users( Test_controller ):
) )
assert access is True
self.user.sql_load_notebooks()
notebooks = self.database.select_many( Notebook, self.user2.sql_load_notebooks() )
new_notebook = [ notebook for notebook in notebooks if notebook.object_id == invite.notebook_id ][ 0 ]
assert new_notebook.rank == 8 # one higher than the other notebook this user has access to
@ -3065,46 +2967,26 @@ class Test_users( Test_controller ):
self.__assert_has_admin_group()
def __load_admin_groups( self ):
groups = self.database.select_many( Group, self.user.sql_load_groups() )
return [ group for group in groups if group.admin ]
def __assert_has_admin_group( self, exactly_one = False ):
user_group_infos = self.database.user_group.get( self.user.object_id )
found_admin_count = 0
group_id = None
groups = self.__load_admin_groups()
# look through the user's groups and try to find at least one admin group
for user_group_info in user_group_infos:
assert user_group_info
assert len( user_group_info ) == 2
( group_id, admin ) = user_group_info
assert group_id
if admin is True:
found_admin_count += 1
assert found_admin_count > 0
assert len( groups ) > 0
if exactly_one is True:
assert found_admin_count == 1
assert len( groups ) == 1
# load the group itself and make sure it looks kosher
group = self.database.load( Group, group_id )
assert group
assert group.name == u"my group"
assert group.admin is True
assert groups[ 0 ]
assert groups[ 0 ].name == u"my group"
assert groups[ 0 ].admin is True
def __assert_no_admin_group( self ):
user_group_infos = self.database.user_group.get( self.user.object_id )
found_admin = False
group_id = None
groups = self.__load_admin_groups()
# look through the user's groups and make sure there are no admin groups
for user_group_info in user_group_infos:
assert user_group_info
assert len( user_group_info ) == 2
( group_id, admin ) = user_group_info
assert group_id
if admin is True:
found_admin = True
break
assert found_admin is False
assert len( groups ) == 0
def test_paypal_notify_signup_yearly( self ):
data = dict( self.SUBSCRIPTION_DATA )
@ -3698,10 +3580,10 @@ class Test_users( Test_controller ):
group_id = self.database.next_id( Group, commit = False )
group = Group.create( group_id, name = u"my group", admin = True )
self.database.save( group, commit = False )
self.database.user_group[ self.user.object_id ].append( ( group_id, True ) )
self.database.execute( self.user.sql_save_group( group_id, admin = True ) )
if add_non_admin_user is True:
self.database.user_group [ self.user2.object_id ].append( ( group_id, False ) )
self.database.execute( self.user2.sql_save_group( group_id, admin = False ) )
self.database.commit()
@ -4064,11 +3946,12 @@ class Test_users( Test_controller ):
assert result[ u"user" ].username == self.user.username
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 0 ].rank == 0
notebook = [ notebook for notebook in result[ u"notebooks" ] if notebook.object_id == self.notebooks[ 0 ].object_id ][ 0 ]
assert notebook.object_id == self.notebooks[ 0 ].object_id
assert notebook.name == self.notebooks[ 0 ].name
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == 0
assert result[ u"login_url" ] == None
assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/users/logout"
@ -4104,11 +3987,12 @@ class Test_users( Test_controller ):
assert result[ u"user" ].username == self.user.username
assert result.get( u"conversion" ) == None
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 0 ].rank == 0
notebook = [ notebook for notebook in result[ u"notebooks" ] if notebook.object_id == self.notebooks[ 0 ].object_id ][ 0 ]
assert notebook.object_id == self.notebooks[ 0 ].object_id
assert notebook.name == self.notebooks[ 0 ].name
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == 0
assert result[ u"login_url" ] == None
assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/users/logout"
@ -4142,12 +4026,12 @@ class Test_users( Test_controller ):
assert result[ u"user" ].username == self.user.username
assert result.get( u"conversion" ) == None
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 0 ].rank == 0
notebook = [ notebook for notebook in result[ u"notebooks" ] if notebook.object_id == self.notebooks[ 0 ].object_id ][ 0 ]
assert notebook.object_id == self.notebooks[ 0 ].object_id
assert notebook.name == self.notebooks[ 0 ].name
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == 0
assert result[ u"login_url" ] == None
assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/users/logout"
@ -4182,11 +4066,12 @@ class Test_users( Test_controller ):
assert result[ u"user" ].username == self.user.username
assert result.get( u"conversion" ) == None
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 0 ].rank == 0
notebook = [ notebook for notebook in result[ u"notebooks" ] if notebook.object_id == self.notebooks[ 0 ].object_id ][ 0 ]
assert notebook.object_id == self.notebooks[ 0 ].object_id
assert notebook.name == self.notebooks[ 0 ].name
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == 0
assert result[ u"login_url" ] == None
assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/users/logout"
@ -4219,11 +4104,12 @@ class Test_users( Test_controller ):
assert result[ u"user" ].username == self.user.username
assert result.get( u"conversion" ) == None
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 0 ].rank == 0
notebook = [ notebook for notebook in result[ u"notebooks" ] if notebook.object_id == self.notebooks[ 0 ].object_id ][ 0 ]
assert notebook.object_id == self.notebooks[ 0 ].object_id
assert notebook.name == self.notebooks[ 0 ].name
assert notebook.read_write == True
assert notebook.owner == True
assert notebook.rank == 0
assert result[ u"login_url" ] == None
assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/users/logout"