New controller.Groups class and a new controller.Users.check_group() method.
Also a new model.Group.sql_load_users() method. Lots of unit tests.
This commit is contained in:
parent
173adffa82
commit
79e2c45533
|
@ -0,0 +1,55 @@
|
|||
from Expose import expose
|
||||
from Expire import strongly_expire
|
||||
from Users import grab_user_id, Access_error
|
||||
from model.Group import Group
|
||||
from model.User import User
|
||||
from view.Json import Json
|
||||
from Validate import validate, Valid_string, Valid_bool, Valid_int, Validation_error
|
||||
from Database import Valid_id, end_transaction
|
||||
|
||||
|
||||
class Groups( object ):
|
||||
def __init__( self, database, users ):
|
||||
self.__database = database
|
||||
self.__users = users
|
||||
|
||||
@expose( view = Json )
|
||||
@strongly_expire
|
||||
@end_transaction
|
||||
@grab_user_id
|
||||
@validate(
|
||||
group_id = Valid_id(),
|
||||
user_id = Valid_id( none_okay = True ),
|
||||
)
|
||||
def load_users( self, group_id, user_id = None ):
|
||||
"""
|
||||
Return the users within the given group. This method is only available to an admin of the
|
||||
group.
|
||||
|
||||
@type group_id: unicode
|
||||
@param group_id: id of group whose users to return
|
||||
@type user_id: unicode or NoneType
|
||||
@param user_id: id of current logged-in user (if any)
|
||||
@rtype: dict
|
||||
@return: {
|
||||
'admin_users': admin_user_list,
|
||||
'other_users': non_admin_user_list,
|
||||
}
|
||||
@raise Access_error: the current user doesn't have admin membership to the given group
|
||||
@raise Validation_error: one of the arguments is invalid
|
||||
"""
|
||||
if not self.__users.check_group( user_id, group_id, admin = True ):
|
||||
raise Access_error()
|
||||
|
||||
group = self.__database.load( Group, group_id )
|
||||
|
||||
if group is None:
|
||||
raise Access_error()
|
||||
|
||||
admin_users = self.__database.select_many( User, group.sql_load_users( admin = True ) )
|
||||
other_users = self.__database.select_many( User, group.sql_load_users( admin = False ) )
|
||||
|
||||
return dict(
|
||||
admin_users = admin_users,
|
||||
other_users = other_users,
|
||||
)
|
|
@ -5,6 +5,7 @@ from Expire import strongly_expire
|
|||
from Validate import validate, Valid_int, Valid_string, Valid_bool
|
||||
from Notebooks import Notebooks
|
||||
from Users import Users, grab_user_id
|
||||
from Groups import Groups
|
||||
from Files import Files
|
||||
from Forums import Forums
|
||||
from Database import Valid_id, end_transaction
|
||||
|
@ -47,6 +48,7 @@ class Root( object ):
|
|||
settings[ u"global" ].get( u"luminotes.payment_email", u"" ),
|
||||
settings[ u"global" ].get( u"luminotes.rate_plans", [] ),
|
||||
)
|
||||
self.__groups = Groups( database, self.__users )
|
||||
self.__files = Files( database, self.__users )
|
||||
self.__notebooks = Notebooks( database, self.__users, self.__files, settings[ u"global" ].get( u"luminotes.https_url", u"" ) )
|
||||
self.__forums = Forums( database, self.__users )
|
||||
|
@ -391,5 +393,6 @@ class Root( object ):
|
|||
database = property( lambda self: self.__database )
|
||||
notebooks = property( lambda self: self.__notebooks )
|
||||
users = property( lambda self: self.__users )
|
||||
groups = property( lambda self: self.__groups )
|
||||
files = property( lambda self: self.__files )
|
||||
# forums = property( lambda self: self.__forums )
|
||||
|
|
|
@ -596,6 +596,27 @@ class Users( object ):
|
|||
|
||||
return False
|
||||
|
||||
def check_group( self, user_id, group_id, admin = False ):
|
||||
"""
|
||||
Determine whether the given user has membership to the given group.
|
||||
|
||||
@type user_id: unicode
|
||||
@param user_id: id of user whose membership to check
|
||||
@type group_id: unicode
|
||||
@param group_id: id of group to check membership in
|
||||
@type admin: bool
|
||||
@param admin: True if admin-level membership is being checked (defaults to False)
|
||||
@rtype: bool
|
||||
@return: True if the user has membership
|
||||
"""
|
||||
# check if the given user has access to this notebook
|
||||
user = self.__database.load( User, user_id )
|
||||
|
||||
if user and self.__database.select_one( bool, user.sql_in_group( group_id, admin ) ):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@expose( view = Json )
|
||||
@end_transaction
|
||||
@validate(
|
||||
|
|
|
@ -31,6 +31,7 @@ class Truncated_StringIO( Wrapped_StringIO ):
|
|||
class Test_controller( object ):
|
||||
def __init__( self ):
|
||||
from model.User import User
|
||||
from model.Group import Group
|
||||
from model.Notebook import Notebook
|
||||
from model.Note import Note
|
||||
from model.Invite import Invite
|
||||
|
@ -198,6 +199,31 @@ class Test_controller( object ):
|
|||
User.sql_load_groups = lambda self: \
|
||||
lambda database: sql_load_groups( self, database )
|
||||
|
||||
def sql_save_group( self, group_id, admin, database ):
|
||||
if self.object_id in database.user_group:
|
||||
database.user_group[ self.object_id ].append( ( group_id, admin ) )
|
||||
else:
|
||||
database.user_group[ self.object_id ] = [ ( group_id, admin ) ]
|
||||
|
||||
User.sql_save_group = lambda self, group_id, admin = False: \
|
||||
lambda database: sql_save_group( self, group_id, admin, database )
|
||||
|
||||
def sql_in_group( self, group_id, admin, database ):
|
||||
for ( user_id, group_infos ) in database.user_group.items():
|
||||
for group_info in group_infos:
|
||||
( db_group_id, db_admin ) = group_info
|
||||
|
||||
if self.object_id == user_id and group_id == db_group_id:
|
||||
if admin is True and db_admin is False:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
User.sql_in_group = lambda self, group_id, admin = False: \
|
||||
lambda database: sql_in_group( self, group_id, admin, database )
|
||||
|
||||
def sql_revoke_invite_access( notebook_id, trash_id, email_address, database ):
|
||||
invites = []
|
||||
|
||||
|
@ -214,6 +240,27 @@ class Test_controller( object ):
|
|||
User.sql_revoke_invite_access = staticmethod( lambda notebook_id, trash_id, email_address: \
|
||||
lambda database: sql_revoke_invite_access( notebook_id, trash_id, email_address, database ) )
|
||||
|
||||
def sql_load_users( self, admin, database ):
|
||||
users = []
|
||||
|
||||
for ( user_id, group_infos ) in database.user_group.items():
|
||||
for group_info in group_infos:
|
||||
( db_group_id, db_admin ) = group_info
|
||||
|
||||
if db_group_id != self.object_id: continue
|
||||
if admin is True and db_admin != True: continue
|
||||
if admin is False and db_admin != False: continue
|
||||
|
||||
user = database.objects.get( user_id )[ -1 ]
|
||||
users.append( user )
|
||||
|
||||
users.sort( lambda a, b: cmp( a.username, b.username ) )
|
||||
|
||||
return users
|
||||
|
||||
Group.sql_load_users = lambda self, admin = None: \
|
||||
lambda database: sql_load_users( self, admin, database )
|
||||
|
||||
def sql_load_revisions( self, database ):
|
||||
note_list = database.objects.get( self.object_id )
|
||||
if not note_list: return None
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
from Test_controller import Test_controller
|
||||
import Stub_urllib2
|
||||
from controller.Groups import Groups
|
||||
from model.Group import Group
|
||||
from model.User import User
|
||||
|
||||
|
||||
class Test_groups( Test_controller ):
|
||||
def setUp( self ):
|
||||
Test_controller.setUp( self )
|
||||
Groups.urllib2 = Stub_urllib2
|
||||
|
||||
self.group_name = u"my group"
|
||||
self.group_name2 = u"other group"
|
||||
self.username = u"mulder"
|
||||
self.password = u"trustno1"
|
||||
self.email_address = u"out-there@example.com"
|
||||
self.username2 = u"scully"
|
||||
self.password2 = u"trustsome1"
|
||||
self.email_address2 = u"out-there@example.com"
|
||||
self.username3 = u"skinner"
|
||||
self.password3 = u"trustne1"
|
||||
self.email_address3 = u"somewhere@gov.gov"
|
||||
|
||||
self.group = Group.create( self.database.next_id( Group ), self.group_name )
|
||||
self.database.save( self.group, commit = False )
|
||||
|
||||
self.group2 = Group.create( self.database.next_id( Group ), self.group_name )
|
||||
self.database.save( self.group2, commit = False )
|
||||
|
||||
self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address )
|
||||
self.database.save( self.user, commit = False )
|
||||
self.database.execute( self.user.sql_save_group( self.group.object_id, admin = False ) )
|
||||
|
||||
self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 )
|
||||
self.database.save( self.user2, commit = False )
|
||||
self.database.execute( self.user2.sql_save_group( self.group.object_id, admin = True ) )
|
||||
|
||||
self.user3 = User.create( self.database.next_id( User ), self.username3, self.password3, self.email_address3 )
|
||||
self.database.save( self.user3, commit = False )
|
||||
self.database.execute( self.user3.sql_save_group( self.group.object_id, admin = False ) )
|
||||
|
||||
self.database.commit()
|
||||
|
||||
def test_load_users( self ):
|
||||
self.login2()
|
||||
|
||||
result = self.http_post( "/groups/load_users", dict(
|
||||
group_id = self.group.object_id,
|
||||
), session_id = self.session_id )
|
||||
|
||||
assert len( result[ u"admin_users" ] ) == 1
|
||||
assert result[ u"admin_users" ][ 0 ].object_id == self.user2.object_id
|
||||
assert result[ u"admin_users" ][ 0 ].username == self.user2.username
|
||||
|
||||
assert len( result[ u"other_users" ] ) == 2
|
||||
assert result[ u"other_users" ][ 0 ].object_id == self.user.object_id
|
||||
assert result[ u"other_users" ][ 0 ].username == self.user.username
|
||||
assert result[ u"other_users" ][ 1 ].object_id == self.user3.object_id
|
||||
assert result[ u"other_users" ][ 1 ].username == self.user3.username
|
||||
|
||||
def login( self ):
|
||||
result = self.http_post( "/users/login", dict(
|
||||
username = self.username,
|
||||
password = self.password,
|
||||
login_button = u"login",
|
||||
) )
|
||||
self.session_id = result[ u"session_id" ]
|
||||
|
||||
def login2( self ):
|
||||
result = self.http_post( "/users/login", dict(
|
||||
username = self.username2,
|
||||
password = self.password2,
|
||||
login_button = u"login",
|
||||
) )
|
||||
self.session_id = result[ u"session_id" ]
|
|
@ -9,6 +9,7 @@ from Test_controller import Test_controller
|
|||
from Stub_smtp import Stub_smtp
|
||||
import Stub_urllib2
|
||||
from model.User import User
|
||||
from model.Group import Group
|
||||
from model.Notebook import Notebook
|
||||
from model.Note import Note
|
||||
from model.Password_reset import Password_reset
|
||||
|
@ -36,6 +37,8 @@ class Test_users( Test_controller ):
|
|||
self.email_address2 = u"out-there@example.com"
|
||||
self.user = None
|
||||
self.user2 = None
|
||||
self.group = None
|
||||
self.group2 = None
|
||||
self.anonymous = None
|
||||
self.notebooks = None
|
||||
self.session_id = None
|
||||
|
@ -66,15 +69,22 @@ class Test_users( Test_controller ):
|
|||
)
|
||||
self.database.save( self.startup_note )
|
||||
|
||||
self.group = Group.create( self.database.next_id( Group ), u"my group" )
|
||||
self.database.save( self.group, commit = False )
|
||||
self.group2 = Group.create( self.database.next_id( Group ), u"other group" )
|
||||
self.database.save( self.group2, commit = False )
|
||||
|
||||
self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address )
|
||||
self.database.save( self.user, commit = False )
|
||||
self.database.execute( self.user.sql_save_notebook( notebook_id1, read_write = True, owner = True, rank = 0 ), commit = False )
|
||||
self.database.execute( self.user.sql_save_notebook( trash_id1, read_write = True, owner = True ), commit = False )
|
||||
self.database.execute( self.user.sql_save_notebook( notebook_id2, read_write = True, owner = True, rank = 1 ), commit = False )
|
||||
self.database.execute( self.user.sql_save_notebook( trash_id2, read_write = True, owner = True ), commit = False )
|
||||
self.database.execute( self.user.sql_save_group( self.group.object_id, admin = False ) )
|
||||
|
||||
self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 )
|
||||
self.database.save( self.user2, commit = False )
|
||||
self.database.execute( self.user2.sql_save_group( self.group.object_id, admin = True ) )
|
||||
|
||||
self.anonymous = User.create( self.database.next_id( User ), u"anonymous" )
|
||||
self.database.save( self.anonymous, commit = False )
|
||||
|
@ -570,7 +580,10 @@ class Test_users( Test_controller ):
|
|||
assert rate_plan[ u"name" ] == u"super"
|
||||
assert rate_plan[ u"storage_quota_bytes" ] == 1337 * 10
|
||||
|
||||
assert result[ u"groups" ] == []
|
||||
assert result[ u"groups" ]
|
||||
assert result[ u"groups" ][ 0 ].object_id == self.group.object_id
|
||||
assert result[ u"groups" ][ 0 ].name == self.group.name
|
||||
assert result[ u"groups" ][ 0 ].admin == False
|
||||
|
||||
def test_current_anonymous( self ):
|
||||
result = cherrypy.root.users.current( self.anonymous.object_id )
|
||||
|
@ -719,6 +732,41 @@ class Test_users( Test_controller ):
|
|||
|
||||
assert access is False
|
||||
|
||||
def test_check_group( self ):
|
||||
membership = cherrypy.root.users.check_group( self.user.object_id, self.group.object_id )
|
||||
|
||||
assert membership is True
|
||||
|
||||
def test_check_group_with_admin( self ):
|
||||
membership = cherrypy.root.users.check_group( self.user2.object_id, self.group.object_id )
|
||||
|
||||
assert membership is True
|
||||
|
||||
def test_check_group_anon( self ):
|
||||
membership = cherrypy.root.users.check_group( self.anonymous.object_id, self.group.object_id )
|
||||
|
||||
assert membership is False
|
||||
|
||||
def test_check_group_without_membership( self ):
|
||||
membership = cherrypy.root.users.check_group( self.user.object_id, self.group2.object_id )
|
||||
|
||||
assert membership is False
|
||||
|
||||
def test_check_group_without_user( self ):
|
||||
membership = cherrypy.root.users.check_group( None, self.group2.object_id )
|
||||
|
||||
assert membership is False
|
||||
|
||||
def test_check_group_admin( self ):
|
||||
membership = cherrypy.root.users.check_group( self.user.object_id, self.group.object_id, admin = True )
|
||||
|
||||
assert membership is False
|
||||
|
||||
def test_check_group_admin_with_admin( self ):
|
||||
membership = cherrypy.root.users.check_group( self.user2.object_id, self.group.object_id, admin = True )
|
||||
|
||||
assert membership is True
|
||||
|
||||
def test_send_reset( self ):
|
||||
# trick send_reset() into using a fake SMTP server
|
||||
Stub_smtp.reset()
|
||||
|
|
|
@ -67,6 +67,29 @@ class Group( Persistent ):
|
|||
def sql_update( self ):
|
||||
return self.sql_create()
|
||||
|
||||
def sql_load_users( self, admin = None ):
|
||||
"""
|
||||
Return a SQL string to load a list of the users with membership to this group.
|
||||
"""
|
||||
if admin is True:
|
||||
admin_clause = " and user_group.admin = 't'"
|
||||
elif admin is False:
|
||||
admin_clause = " and user_group.admin = 'f'"
|
||||
else:
|
||||
admin_clause = ""
|
||||
|
||||
return \
|
||||
"""
|
||||
select
|
||||
luminotes_user_current.*
|
||||
from
|
||||
user_group, luminotes_user_current
|
||||
where
|
||||
user_group.group_id = %s and
|
||||
user_group.user_id = luminotes_user_current.id%s
|
||||
order by user_group.username;
|
||||
""" % quote( self.object_id )
|
||||
|
||||
def to_dict( self ):
|
||||
d = Persistent.to_dict( self )
|
||||
|
||||
|
|
|
@ -242,8 +242,6 @@ class User( Persistent ):
|
|||
"""
|
||||
Return a SQL string to save the id of a group to which this user has membership.
|
||||
"""
|
||||
if rank is None: rank = quote( None )
|
||||
|
||||
return \
|
||||
"insert into user_group ( user_id, group_id, admin ) values " + \
|
||||
"( %s, %s, %s );" % ( quote( self.object_id ), quote( group_id ), quote( admin and 't' or 'f' ) )
|
||||
|
|
Reference in New Issue