diff --git a/controller/Groups.py b/controller/Groups.py new file mode 100644 index 0000000..79a28c9 --- /dev/null +++ b/controller/Groups.py @@ -0,0 +1,55 @@ +from Expose import expose +from Expire import strongly_expire +from Users import grab_user_id, Access_error +from model.Group import Group +from model.User import User +from view.Json import Json +from Validate import validate, Valid_string, Valid_bool, Valid_int, Validation_error +from Database import Valid_id, end_transaction + + +class Groups( object ): + def __init__( self, database, users ): + self.__database = database + self.__users = users + + @expose( view = Json ) + @strongly_expire + @end_transaction + @grab_user_id + @validate( + group_id = Valid_id(), + user_id = Valid_id( none_okay = True ), + ) + def load_users( self, group_id, user_id = None ): + """ + Return the users within the given group. This method is only available to an admin of the + group. + + @type group_id: unicode + @param group_id: id of group whose users to return + @type user_id: unicode or NoneType + @param user_id: id of current logged-in user (if any) + @rtype: dict + @return: { + 'admin_users': admin_user_list, + 'other_users': non_admin_user_list, + } + @raise Access_error: the current user doesn't have admin membership to the given group + @raise Validation_error: one of the arguments is invalid + """ + if not self.__users.check_group( user_id, group_id, admin = True ): + raise Access_error() + + group = self.__database.load( Group, group_id ) + + if group is None: + raise Access_error() + + admin_users = self.__database.select_many( User, group.sql_load_users( admin = True ) ) + other_users = self.__database.select_many( User, group.sql_load_users( admin = False ) ) + + return dict( + admin_users = admin_users, + other_users = other_users, + ) diff --git a/controller/Root.py b/controller/Root.py index 86797a7..7490c6a 100644 --- a/controller/Root.py +++ b/controller/Root.py @@ -5,6 +5,7 @@ from Expire import strongly_expire from Validate import validate, Valid_int, Valid_string, Valid_bool from Notebooks import Notebooks from Users import Users, grab_user_id +from Groups import Groups from Files import Files from Forums import Forums from Database import Valid_id, end_transaction @@ -47,6 +48,7 @@ class Root( object ): settings[ u"global" ].get( u"luminotes.payment_email", u"" ), settings[ u"global" ].get( u"luminotes.rate_plans", [] ), ) + self.__groups = Groups( database, self.__users ) self.__files = Files( database, self.__users ) self.__notebooks = Notebooks( database, self.__users, self.__files, settings[ u"global" ].get( u"luminotes.https_url", u"" ) ) self.__forums = Forums( database, self.__users ) @@ -391,5 +393,6 @@ class Root( object ): database = property( lambda self: self.__database ) notebooks = property( lambda self: self.__notebooks ) users = property( lambda self: self.__users ) + groups = property( lambda self: self.__groups ) files = property( lambda self: self.__files ) # forums = property( lambda self: self.__forums ) diff --git a/controller/Users.py b/controller/Users.py index b0d7432..9785fc5 100644 --- a/controller/Users.py +++ b/controller/Users.py @@ -596,6 +596,27 @@ class Users( object ): return False + def check_group( self, user_id, group_id, admin = False ): + """ + Determine whether the given user has membership to the given group. + + @type user_id: unicode + @param user_id: id of user whose membership to check + @type group_id: unicode + @param group_id: id of group to check membership in + @type admin: bool + @param admin: True if admin-level membership is being checked (defaults to False) + @rtype: bool + @return: True if the user has membership + """ + # check if the given user has access to this notebook + user = self.__database.load( User, user_id ) + + if user and self.__database.select_one( bool, user.sql_in_group( group_id, admin ) ): + return True + + return False + @expose( view = Json ) @end_transaction @validate( diff --git a/controller/test/Test_controller.py b/controller/test/Test_controller.py index 46195c9..d561931 100644 --- a/controller/test/Test_controller.py +++ b/controller/test/Test_controller.py @@ -31,6 +31,7 @@ class Truncated_StringIO( Wrapped_StringIO ): class Test_controller( object ): def __init__( self ): from model.User import User + from model.Group import Group from model.Notebook import Notebook from model.Note import Note from model.Invite import Invite @@ -198,6 +199,31 @@ class Test_controller( object ): User.sql_load_groups = lambda self: \ lambda database: sql_load_groups( self, database ) + def sql_save_group( self, group_id, admin, database ): + if self.object_id in database.user_group: + database.user_group[ self.object_id ].append( ( group_id, admin ) ) + else: + database.user_group[ self.object_id ] = [ ( group_id, admin ) ] + + User.sql_save_group = lambda self, group_id, admin = False: \ + lambda database: sql_save_group( self, group_id, admin, database ) + + def sql_in_group( self, group_id, admin, database ): + for ( user_id, group_infos ) in database.user_group.items(): + for group_info in group_infos: + ( db_group_id, db_admin ) = group_info + + if self.object_id == user_id and group_id == db_group_id: + if admin is True and db_admin is False: + return False + + return True + + return False + + User.sql_in_group = lambda self, group_id, admin = False: \ + lambda database: sql_in_group( self, group_id, admin, database ) + def sql_revoke_invite_access( notebook_id, trash_id, email_address, database ): invites = [] @@ -214,6 +240,27 @@ class Test_controller( object ): User.sql_revoke_invite_access = staticmethod( lambda notebook_id, trash_id, email_address: \ lambda database: sql_revoke_invite_access( notebook_id, trash_id, email_address, database ) ) + def sql_load_users( self, admin, database ): + users = [] + + for ( user_id, group_infos ) in database.user_group.items(): + for group_info in group_infos: + ( db_group_id, db_admin ) = group_info + + if db_group_id != self.object_id: continue + if admin is True and db_admin != True: continue + if admin is False and db_admin != False: continue + + user = database.objects.get( user_id )[ -1 ] + users.append( user ) + + users.sort( lambda a, b: cmp( a.username, b.username ) ) + + return users + + Group.sql_load_users = lambda self, admin = None: \ + lambda database: sql_load_users( self, admin, database ) + def sql_load_revisions( self, database ): note_list = database.objects.get( self.object_id ) if not note_list: return None diff --git a/controller/test/Test_groups.py b/controller/test/Test_groups.py new file mode 100644 index 0000000..e9f265a --- /dev/null +++ b/controller/test/Test_groups.py @@ -0,0 +1,76 @@ +from Test_controller import Test_controller +import Stub_urllib2 +from controller.Groups import Groups +from model.Group import Group +from model.User import User + + +class Test_groups( Test_controller ): + def setUp( self ): + Test_controller.setUp( self ) + Groups.urllib2 = Stub_urllib2 + + self.group_name = u"my group" + self.group_name2 = u"other group" + self.username = u"mulder" + self.password = u"trustno1" + self.email_address = u"out-there@example.com" + self.username2 = u"scully" + self.password2 = u"trustsome1" + self.email_address2 = u"out-there@example.com" + self.username3 = u"skinner" + self.password3 = u"trustne1" + self.email_address3 = u"somewhere@gov.gov" + + self.group = Group.create( self.database.next_id( Group ), self.group_name ) + self.database.save( self.group, commit = False ) + + self.group2 = Group.create( self.database.next_id( Group ), self.group_name ) + self.database.save( self.group2, commit = False ) + + self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address ) + self.database.save( self.user, commit = False ) + self.database.execute( self.user.sql_save_group( self.group.object_id, admin = False ) ) + + self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 ) + self.database.save( self.user2, commit = False ) + self.database.execute( self.user2.sql_save_group( self.group.object_id, admin = True ) ) + + self.user3 = User.create( self.database.next_id( User ), self.username3, self.password3, self.email_address3 ) + self.database.save( self.user3, commit = False ) + self.database.execute( self.user3.sql_save_group( self.group.object_id, admin = False ) ) + + self.database.commit() + + def test_load_users( self ): + self.login2() + + result = self.http_post( "/groups/load_users", dict( + group_id = self.group.object_id, + ), session_id = self.session_id ) + + assert len( result[ u"admin_users" ] ) == 1 + assert result[ u"admin_users" ][ 0 ].object_id == self.user2.object_id + assert result[ u"admin_users" ][ 0 ].username == self.user2.username + + assert len( result[ u"other_users" ] ) == 2 + assert result[ u"other_users" ][ 0 ].object_id == self.user.object_id + assert result[ u"other_users" ][ 0 ].username == self.user.username + assert result[ u"other_users" ][ 1 ].object_id == self.user3.object_id + assert result[ u"other_users" ][ 1 ].username == self.user3.username + + def login( self ): + result = self.http_post( "/users/login", dict( + username = self.username, + password = self.password, + login_button = u"login", + ) ) + self.session_id = result[ u"session_id" ] + + def login2( self ): + result = self.http_post( "/users/login", dict( + username = self.username2, + password = self.password2, + login_button = u"login", + ) ) + self.session_id = result[ u"session_id" ] diff --git a/controller/test/Test_users.py b/controller/test/Test_users.py index d11692e..1a7e620 100644 --- a/controller/test/Test_users.py +++ b/controller/test/Test_users.py @@ -9,6 +9,7 @@ from Test_controller import Test_controller from Stub_smtp import Stub_smtp import Stub_urllib2 from model.User import User +from model.Group import Group from model.Notebook import Notebook from model.Note import Note from model.Password_reset import Password_reset @@ -36,6 +37,8 @@ class Test_users( Test_controller ): self.email_address2 = u"out-there@example.com" self.user = None self.user2 = None + self.group = None + self.group2 = None self.anonymous = None self.notebooks = None self.session_id = None @@ -66,15 +69,22 @@ class Test_users( Test_controller ): ) self.database.save( self.startup_note ) + self.group = Group.create( self.database.next_id( Group ), u"my group" ) + self.database.save( self.group, commit = False ) + self.group2 = Group.create( self.database.next_id( Group ), u"other group" ) + self.database.save( self.group2, commit = False ) + self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address ) self.database.save( self.user, commit = False ) self.database.execute( self.user.sql_save_notebook( notebook_id1, read_write = True, owner = True, rank = 0 ), commit = False ) self.database.execute( self.user.sql_save_notebook( trash_id1, read_write = True, owner = True ), commit = False ) self.database.execute( self.user.sql_save_notebook( notebook_id2, read_write = True, owner = True, rank = 1 ), commit = False ) self.database.execute( self.user.sql_save_notebook( trash_id2, read_write = True, owner = True ), commit = False ) + self.database.execute( self.user.sql_save_group( self.group.object_id, admin = False ) ) self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 ) self.database.save( self.user2, commit = False ) + self.database.execute( self.user2.sql_save_group( self.group.object_id, admin = True ) ) self.anonymous = User.create( self.database.next_id( User ), u"anonymous" ) self.database.save( self.anonymous, commit = False ) @@ -570,7 +580,10 @@ class Test_users( Test_controller ): assert rate_plan[ u"name" ] == u"super" assert rate_plan[ u"storage_quota_bytes" ] == 1337 * 10 - assert result[ u"groups" ] == [] + assert result[ u"groups" ] + assert result[ u"groups" ][ 0 ].object_id == self.group.object_id + assert result[ u"groups" ][ 0 ].name == self.group.name + assert result[ u"groups" ][ 0 ].admin == False def test_current_anonymous( self ): result = cherrypy.root.users.current( self.anonymous.object_id ) @@ -719,6 +732,41 @@ class Test_users( Test_controller ): assert access is False + def test_check_group( self ): + membership = cherrypy.root.users.check_group( self.user.object_id, self.group.object_id ) + + assert membership is True + + def test_check_group_with_admin( self ): + membership = cherrypy.root.users.check_group( self.user2.object_id, self.group.object_id ) + + assert membership is True + + def test_check_group_anon( self ): + membership = cherrypy.root.users.check_group( self.anonymous.object_id, self.group.object_id ) + + assert membership is False + + def test_check_group_without_membership( self ): + membership = cherrypy.root.users.check_group( self.user.object_id, self.group2.object_id ) + + assert membership is False + + def test_check_group_without_user( self ): + membership = cherrypy.root.users.check_group( None, self.group2.object_id ) + + assert membership is False + + def test_check_group_admin( self ): + membership = cherrypy.root.users.check_group( self.user.object_id, self.group.object_id, admin = True ) + + assert membership is False + + def test_check_group_admin_with_admin( self ): + membership = cherrypy.root.users.check_group( self.user2.object_id, self.group.object_id, admin = True ) + + assert membership is True + def test_send_reset( self ): # trick send_reset() into using a fake SMTP server Stub_smtp.reset() diff --git a/model/Group.py b/model/Group.py index a136cee..83c1fc0 100644 --- a/model/Group.py +++ b/model/Group.py @@ -67,6 +67,29 @@ class Group( Persistent ): def sql_update( self ): return self.sql_create() + def sql_load_users( self, admin = None ): + """ + Return a SQL string to load a list of the users with membership to this group. + """ + if admin is True: + admin_clause = " and user_group.admin = 't'" + elif admin is False: + admin_clause = " and user_group.admin = 'f'" + else: + admin_clause = "" + + return \ + """ + select + luminotes_user_current.* + from + user_group, luminotes_user_current + where + user_group.group_id = %s and + user_group.user_id = luminotes_user_current.id%s + order by user_group.username; + """ % quote( self.object_id ) + def to_dict( self ): d = Persistent.to_dict( self ) diff --git a/model/User.py b/model/User.py index 50015ce..fa1fea1 100644 --- a/model/User.py +++ b/model/User.py @@ -242,8 +242,6 @@ class User( Persistent ): """ Return a SQL string to save the id of a group to which this user has membership. """ - if rank is None: rank = quote( None ) - return \ "insert into user_group ( user_id, group_id, admin ) values " + \ "( %s, %s, %s );" % ( quote( self.object_id ), quote( group_id ), quote( admin and 't' or 'f' ) )