diff --git a/controller/Users.py b/controller/Users.py index f2cba66..5983821 100644 --- a/controller/Users.py +++ b/controller/Users.py @@ -756,7 +756,7 @@ class Users( object ): invite = Invite.create( invite_id, user_id, notebook_id, email_address, read_write, owner ) self.__database.save( invite, commit = False ) - # update any unredeemed invitations for this notebook already sent to the same email address + # update any invitations for this notebook already sent to the same email address similar_invites = self.__database.select_many( Invite, invite.sql_load_similar() ) for similar in similar_invites: similar.read_write = read_write @@ -764,11 +764,14 @@ class Users( object ): self.__database.save( similar, commit = False ) # if the invite is already redeemed, then update the relevant entry in the user_notebook - # access table as well - if similar.redeemed_user_id is not None: + # access table as well. prevent the user from updating their own access + if similar.redeemed_user_id is not None and similar.redeemed_user_id != user_id: redeemed_user = self.__database.load( User, similar.redeemed_user_id ) if redeemed_user: self.__database.execute( redeemed_user.sql_update_access( notebook_id, read_write, owner ) ) + notebook = self.__database.load( Notebook, notebook_id ) + if notebook: + self.__database.execute( redeemed_user.sql_update_access( notebook.trash_id, read_write, owner ) ) # create an email message with a unique invitation link notebook_name = notebook.name.strip().replace( "\n", " " ).replace( "\r", " " ) @@ -830,10 +833,14 @@ class Users( object ): raise Access_error() invite = self.__database.load( Invite, invite_id ) - if not invite or not invite.email_address or invite.notebook_id != notebook_id: + notebook = self.__database.load( Notebook, notebook_id ) + if not notebook or not invite or not invite.email_address or invite.notebook_id != notebook_id: raise Access_error() - self.__database.execute( invite.sql_revoke_user_access(), commit = False ) + self.__database.execute( + User.sql_revoke_invite_access( notebook_id, notebook.trash_id, invite.email_address, user_id ), + commit = False, + ) self.__database.execute( invite.sql_revoke_invites(), commit = False ) self.__database.commit() diff --git a/controller/test/Test_controller.py b/controller/test/Test_controller.py index ac76dca..24d4898 100644 --- a/controller/test/Test_controller.py +++ b/controller/test/Test_controller.py @@ -121,6 +121,23 @@ class Test_controller( object ): User.sql_update_access = lambda self, notebook_id, read_write = False, owner = False: \ lambda database: sql_update_access( self, notebook_id, read_write, owner, database ) + def sql_revoke_invite_access( notebook_id, trash_id, email_address, excluded_user_id, database ): + invites = [] + + for ( user_id, notebook_infos ) in database.user_notebook.items(): + if user_id == excluded_user_id: continue + for notebook_info in list( notebook_infos ): + ( db_notebook_id, read_write, owner ) = notebook_info + if db_notebook_id not in ( notebook_id, trash_id ): continue + for ( object_id, obj_list ) in database.objects.items(): + obj = obj_list[ -1 ] + if isinstance( obj, Invite ) and obj.notebook_id == notebook_id and \ + obj.email_address == email_address: + database.user_notebook[ user_id ].remove( notebook_info ) + + User.sql_revoke_invite_access = staticmethod( lambda notebook_id, trash_id, email_address, excluded_user_id: \ + lambda database: sql_revoke_invite_access( notebook_id, trash_id, email_address, excluded_user_id, database ) ) + def sql_load_revisions( self, database ): note_list = database.objects.get( self.object_id ) if not note_list: return None @@ -258,21 +275,6 @@ class Test_controller( object ): Invite.sql_load_notebook_invites = staticmethod( lambda notebook_id: lambda database: sql_load_notebook_invites( notebook_id, database ) ) - def sql_revoke_user_access( self, database ): - invites = [] - - for ( user_id, notebook_infos ) in database.user_notebook.items(): - for ( index, ( notebook_id, read_write, owner ) ) in enumerate( notebook_infos ): - if notebook_id != self.notebook_id: continue - for ( object_id, obj_list ) in database.objects.items(): - obj = obj_list[ -1 ] - if isinstance( obj, Invite ) and obj.notebook_id == self.notebook_id and \ - obj.email_address == self.email_address: - del( database.user_notebook[ user_id ][ index ] ) - - Invite.sql_revoke_user_access = lambda self: \ - lambda database: sql_revoke_user_access( self, database ) - def sql_revoke_invites( self, database ): invites = [] diff --git a/controller/test/Test_users.py b/controller/test/Test_users.py index e757557..69debc9 100644 --- a/controller/test/Test_users.py +++ b/controller/test/Test_users.py @@ -65,7 +65,9 @@ class Test_users( Test_controller ): self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address ) self.database.save( self.user, commit = False ) self.database.execute( self.user.sql_save_notebook( notebook_id1, read_write = True, owner = True ), commit = False ) + self.database.execute( self.user.sql_save_notebook( trash_id1, read_write = True, owner = True ), commit = False ) self.database.execute( self.user.sql_save_notebook( notebook_id2, read_write = True, owner = True ), commit = False ) + self.database.execute( self.user.sql_save_notebook( trash_id2, read_write = True, owner = True ), commit = False ) self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 ) self.database.save( self.user2, commit = False ) @@ -368,16 +370,27 @@ class Test_users( Test_controller ): assert result[ u"user" ] assert result[ u"user" ].object_id == self.user.object_id assert result[ u"user" ].username == self.user.username - assert len( result[ u"notebooks" ] ) == 3 + assert len( result[ u"notebooks" ] ) == 5 assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id + assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name assert result[ u"notebooks" ][ 0 ].read_write == True assert result[ u"notebooks" ][ 0 ].owner == True - assert result[ u"notebooks" ][ 1 ].object_id == self.notebooks[ 1 ].object_id + assert result[ u"notebooks" ][ 1 ].object_id + assert result[ u"notebooks" ][ 1 ].name == u"trash" assert result[ u"notebooks" ][ 1 ].read_write == True assert result[ u"notebooks" ][ 1 ].owner == True - assert result[ u"notebooks" ][ 2 ].object_id == self.anon_notebook.object_id - assert result[ u"notebooks" ][ 2 ].read_write == False - assert result[ u"notebooks" ][ 2 ].owner == False + assert result[ u"notebooks" ][ 2 ].object_id == self.notebooks[ 1 ].object_id + assert result[ u"notebooks" ][ 2 ].name == self.notebooks[ 1 ].name + assert result[ u"notebooks" ][ 2 ].read_write == True + assert result[ u"notebooks" ][ 2 ].owner == True + assert result[ u"notebooks" ][ 3 ].object_id + assert result[ u"notebooks" ][ 3 ].name == u"trash" + assert result[ u"notebooks" ][ 3 ].read_write == True + assert result[ u"notebooks" ][ 3 ].owner == True + assert result[ u"notebooks" ][ 4 ].object_id == self.anon_notebook.object_id + assert result[ u"notebooks" ][ 4 ].name == self.anon_notebook.name + assert result[ u"notebooks" ][ 4 ].read_write == False + assert result[ u"notebooks" ][ 4 ].owner == False assert result[ u"login_url" ] is None assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/" @@ -1111,81 +1124,7 @@ class Test_users( Test_controller ): access = u"viewer", invite_button = u"send invites", ), session_id = self.session_id ) - - matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) - invite_id1 = matches.group( 2 ) - assert invite_id1 - # update the user_notebook table accordingly. this normally happens when an invite is redeemed - self.database.execute( self.user.sql_save_notebook( - self.notebooks[ 0 ].object_id, - read_write = False, - owner = False, - ) ) - - # then send a similar invite to the same email address with read_write and owner set to True - result = self.http_post( "/users/send_invites", dict( - notebook_id = self.notebooks[ 0 ].object_id, - email_addresses = email_addresses, - access = u"owner", - invite_button = u"send invites", - ), session_id = self.session_id ) - - invites = result[ u"invites" ] - assert len( invites ) == 1 - invite = invites[ 0 ] - assert invite - assert invite.read_write is True - assert invite.owner is True - - matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) - invite_id2 = matches.group( 2 ) - assert invite_id2 - - # assert that both invites have the read_write / owner flags set to True now - invite1_list = self.database.objects.get( invite_id1 ) - assert invite1_list - assert len( invite1_list ) >= 2 - invite1 = invite1_list[ -1 ] - assert invite1 - assert invite1.read_write is True - assert invite1.owner is True - - invite2_list = self.database.objects.get( invite_id2 ) - assert invite2_list - assert len( invite2_list ) >= 1 - invite2 = invite2_list[ -1 ] - assert invite2 - assert invite2.read_write is True - assert invite2.owner is True - - # assert that the user_notebook table has also been updated accordingly - access = self.database.select_one( bool, self.user.sql_has_access( - self.notebooks[ 0 ].object_id, - read_write = True, - owner = True, - ) ) - assert access is True - - def test_send_invites_similar_already_redeemed( self ): - Stub_smtp.reset() - smtplib.SMTP = Stub_smtp - self.login() - - self.user.rate_plan = 1 - self.database.save( self.user ) - - email_addresses_list = [ u"foo@example.com" ] - email_addresses = email_addresses_list[ 0 ] - - # first send an invite with read_write and owner set to False - self.http_post( "/users/send_invites", dict( - notebook_id = self.notebooks[ 0 ].object_id, - email_addresses = email_addresses, - access = u"viewer", - invite_button = u"send invites", - ), session_id = self.session_id ) - matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) invite_id1 = matches.group( 2 ) assert invite_id1 @@ -1234,12 +1173,178 @@ class Test_users( Test_controller ): assert invite2.owner is True # assert that the user_notebook table has also been updated accordingly + access = self.database.select_one( bool, self.user2.sql_has_access( + self.notebooks[ 0 ].object_id, + read_write = True, + owner = True, + ) ) + assert access is True + access = self.database.select_one( bool, self.user2.sql_has_access( + self.notebooks[ 0 ].trash_id, + read_write = True, + owner = True, + ) ) + assert access is True + + def test_send_invites_similar_downgrade( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + self.login() + + self.user.rate_plan = 1 + self.database.save( self.user ) + + email_addresses_list = [ u"foo@example.com" ] + email_addresses = email_addresses_list[ 0 ] + + # first send an invite with read_write and owner set to False + self.http_post( "/users/send_invites", dict( + notebook_id = self.notebooks[ 0 ].object_id, + email_addresses = email_addresses, + access = u"collaborator", + invite_button = u"send invites", + ), session_id = self.session_id ) + + matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) + invite_id1 = matches.group( 2 ) + assert invite_id1 + + # login as another user and redeem the invite + self.login2() + result = self.http_post( "/users/redeem_invite", dict( + invite_id = invite_id1, + ), session_id = self.session_id ) + + # then send a similar invite to the same email address with read_write and owner set to False + self.login() + result = self.http_post( "/users/send_invites", dict( + notebook_id = self.notebooks[ 0 ].object_id, + email_addresses = email_addresses, + access = u"viewer", + invite_button = u"send invites", + ), session_id = self.session_id ) + + invites = result[ u"invites" ] + assert len( invites ) == 1 + invite = invites[ 0 ] + assert invite + assert invite.read_write is False + assert invite.owner is False + + matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) + invite_id2 = matches.group( 2 ) + assert invite_id2 + + # assert that both invites have the read_write / owner flags set to False now + invite1_list = self.database.objects.get( invite_id1 ) + assert invite1_list + assert len( invite1_list ) >= 2 + invite1 = invite1_list[ -1 ] + assert invite1 + assert invite1.read_write is False + assert invite1.owner is False + + invite2_list = self.database.objects.get( invite_id2 ) + assert invite2_list + assert len( invite2_list ) >= 1 + invite2 = invite2_list[ -1 ] + assert invite2 + assert invite2.read_write is False + assert invite2.owner is False + + # assert that the user_notebook table has also been updated accordingly + access = self.database.select_one( bool, self.user2.sql_has_access( + self.notebooks[ 0 ].object_id, + read_write = False, + owner = False, + ) ) + assert access is True + access = self.database.select_one( bool, self.user2.sql_has_access( + self.notebooks[ 0 ].trash_id, + read_write = False, + owner = False, + ) ) + assert access is True + + def test_send_invites_similar_downgrade_self( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + self.login() + + self.user.rate_plan = 1 + self.database.save( self.user ) + + email_addresses_list = [ u"foo@example.com" ] + email_addresses = email_addresses_list[ 0 ] + + # first send an invite with read_write and owner set to False + self.http_post( "/users/send_invites", dict( + notebook_id = self.notebooks[ 0 ].object_id, + email_addresses = email_addresses, + access = u"collaborator", + invite_button = u"send invites", + ), session_id = self.session_id ) + + matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) + invite_id1 = matches.group( 2 ) + assert invite_id1 + + # redeem the invite as the same user + result = self.http_post( "/users/redeem_invite", dict( + invite_id = invite_id1, + ), session_id = self.session_id ) + + # then send a similar invite to the same email address with read_write and owner set to False + result = self.http_post( "/users/send_invites", dict( + notebook_id = self.notebooks[ 0 ].object_id, + email_addresses = email_addresses, + access = u"viewer", + invite_button = u"send invites", + ), session_id = self.session_id ) + + # assert that both invites have the read_write / owner flags set to False now + invites = result[ u"invites" ] + assert len( invites ) == 1 + invite = invites[ 0 ] + assert invite + assert invite.read_write is False + assert invite.owner is False + + matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) + invite_id2 = matches.group( 2 ) + assert invite_id2 + + # assert that both invites have the read_write / owner flags set to False now + invite1_list = self.database.objects.get( invite_id1 ) + assert invite1_list + assert len( invite1_list ) >= 2 + invite1 = invite1_list[ -1 ] + assert invite1 + assert invite1.read_write is False + assert invite1.owner is False + + invite2_list = self.database.objects.get( invite_id2 ) + assert invite2_list + assert len( invite2_list ) >= 1 + invite2 = invite2_list[ -1 ] + assert invite2 + assert invite2.read_write is False + assert invite2.owner is False + + # since the user is trying to downgrade their own access, assert that the downgrade was + # prevented and the user still retains their original access access = self.database.select_one( bool, self.user.sql_has_access( self.notebooks[ 0 ].object_id, read_write = True, owner = True, ) ) assert access is True + access = self.database.select_one( bool, self.user.sql_has_access( + self.notebooks[ 0 ].trash_id, + read_write = True, + owner = True, + ) ) + assert access is True def test_send_invites_with_generic_from_address( self ): Stub_smtp.reset() @@ -1612,6 +1717,89 @@ class Test_users( Test_controller ): assert len( result[ u"invites" ] ) == 1 assert result[ u"invites" ][ 0 ].email_address == email_addresses_list[ 1 ] + def test_revoke_invite_redeemed( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + self.login() + + self.user.rate_plan = 1 + self.database.save( self.user ) + + email_addresses_list = [ u"foo@example.com" ] + email_addresses = email_addresses_list[ 0 ] + + result = self.http_post( "/users/send_invites", dict( + notebook_id = self.notebooks[ 0 ].object_id, + email_addresses = email_addresses, + access = u"viewer", + invite_button = u"send invites", + ), session_id = self.session_id ) + + assert len( result[ u"invites" ] ) == 1 + matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) + invite_id = matches.group( 2 ) + + self.login2() + result = self.http_post( "/users/redeem_invite", dict( + invite_id = invite_id, + ), session_id = self.session_id ) + + assert cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].object_id ) + assert cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].trash_id ) + + self.login() + result = self.http_post( "/users/revoke_invite", dict( + notebook_id = self.notebooks[ 0 ].object_id, + invite_id = invite_id, + ), session_id = self.session_id ) + + assert result[ u"message" ] + assert len( result[ u"invites" ] ) == 0 + + assert not cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].object_id ) + assert not cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].trash_id ) + + def test_revoke_invite_redeemed_self( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + self.login() + + self.user.rate_plan = 1 + self.database.save( self.user ) + + email_addresses_list = [ u"foo@example.com" ] + email_addresses = email_addresses_list[ 0 ] + + result = self.http_post( "/users/send_invites", dict( + notebook_id = self.notebooks[ 0 ].object_id, + email_addresses = email_addresses, + access = u"viewer", + invite_button = u"send invites", + ), session_id = self.session_id ) + + assert len( result[ u"invites" ] ) == 1 + matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message ) + invite_id = matches.group( 2 ) + + result = self.http_post( "/users/redeem_invite", dict( + invite_id = invite_id, + ), session_id = self.session_id ) + + assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].object_id ) + assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].trash_id ) + + result = self.http_post( "/users/revoke_invite", dict( + notebook_id = self.notebooks[ 0 ].object_id, + invite_id = invite_id, + ), session_id = self.session_id ) + + assert result[ u"message" ] + assert len( result[ u"invites" ] ) == 0 + + # the user should've been prevented from revoking their own access + assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].object_id ) + assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].trash_id ) + def test_revoke_invite_without_login( self ): Stub_smtp.reset() smtplib.SMTP = Stub_smtp diff --git a/model/Invite.py b/model/Invite.py index b7cf227..a8d21c1 100644 --- a/model/Invite.py +++ b/model/Invite.py @@ -103,11 +103,6 @@ class Invite( Persistent ): return "select id, revision, from_user_id, notebook_id, email_address, read_write, owner, redeemed_user_id from invite " + \ "where id in ( select max( id ) from invite where notebook_id = %s group by email_address ) order by email_address;" % quote( notebook_id ) - def sql_revoke_user_access( self ): - return "delete from user_notebook where notebook_id = %s and user_id in " % quote( self.__notebook_id ) + \ - "( select redeemed_user_id from invite where notebook_id = %s and email_address = %s );" % \ - ( quote( self.__notebook_id ), quote( self.__email_address ) ) - def sql_revoke_invites( self ): return "delete from invite where notebook_id = %s and email_address = %s;" % \ ( quote( self.__notebook_id ), quote( self.__email_address ) ) diff --git a/model/User.py b/model/User.py index ced036f..5c0054f 100644 --- a/model/User.py +++ b/model/User.py @@ -194,6 +194,26 @@ class User( Persistent ): ( quote( read_write and 't' or 'f' ), quote( owner and 't' or 'f' ), quote( self.object_id ), quote( notebook_id ) ) + @staticmethod + def sql_revoke_invite_access( notebook_id, trash_id, email_address, excluded_user_id ): + return \ + """ + delete from + user_notebook + where + notebook_id in ( %s, %s ) and + user_notebook.user_id != %s and + user_notebook.user_id in ( + select + redeemed_user_id + from + invite + where + notebook_id = %s and + email_address = %s + ); + """ % ( quote( notebook_id ), quote( trash_id ), quote( excluded_user_id ), quote( notebook_id ), quote( email_address ) ) + def sql_calculate_storage( self ): """ Return a SQL string to calculate the total bytes of storage usage by this user. Note that this