witten
/
luminotes
Archived
1
0
Fork 0

* Preventing a user from revoking their own access.

* More unit tests for revoking access.
This commit is contained in:
Dan Helfman 2007-12-30 01:20:38 +00:00
parent 05876e2d7e
commit d0d87731d2
5 changed files with 316 additions and 104 deletions

View File

@ -756,7 +756,7 @@ class Users( object ):
invite = Invite.create( invite_id, user_id, notebook_id, email_address, read_write, owner )
self.__database.save( invite, commit = False )
# update any unredeemed invitations for this notebook already sent to the same email address
# update any invitations for this notebook already sent to the same email address
similar_invites = self.__database.select_many( Invite, invite.sql_load_similar() )
for similar in similar_invites:
similar.read_write = read_write
@ -764,11 +764,14 @@ class Users( object ):
self.__database.save( similar, commit = False )
# if the invite is already redeemed, then update the relevant entry in the user_notebook
# access table as well
if similar.redeemed_user_id is not None:
# access table as well. prevent the user from updating their own access
if similar.redeemed_user_id is not None and similar.redeemed_user_id != user_id:
redeemed_user = self.__database.load( User, similar.redeemed_user_id )
if redeemed_user:
self.__database.execute( redeemed_user.sql_update_access( notebook_id, read_write, owner ) )
notebook = self.__database.load( Notebook, notebook_id )
if notebook:
self.__database.execute( redeemed_user.sql_update_access( notebook.trash_id, read_write, owner ) )
# create an email message with a unique invitation link
notebook_name = notebook.name.strip().replace( "\n", " " ).replace( "\r", " " )
@ -830,10 +833,14 @@ class Users( object ):
raise Access_error()
invite = self.__database.load( Invite, invite_id )
if not invite or not invite.email_address or invite.notebook_id != notebook_id:
notebook = self.__database.load( Notebook, notebook_id )
if not notebook or not invite or not invite.email_address or invite.notebook_id != notebook_id:
raise Access_error()
self.__database.execute( invite.sql_revoke_user_access(), commit = False )
self.__database.execute(
User.sql_revoke_invite_access( notebook_id, notebook.trash_id, invite.email_address, user_id ),
commit = False,
)
self.__database.execute( invite.sql_revoke_invites(), commit = False )
self.__database.commit()

View File

@ -121,6 +121,23 @@ class Test_controller( object ):
User.sql_update_access = lambda self, notebook_id, read_write = False, owner = False: \
lambda database: sql_update_access( self, notebook_id, read_write, owner, database )
def sql_revoke_invite_access( notebook_id, trash_id, email_address, excluded_user_id, database ):
invites = []
for ( user_id, notebook_infos ) in database.user_notebook.items():
if user_id == excluded_user_id: continue
for notebook_info in list( notebook_infos ):
( db_notebook_id, read_write, owner ) = notebook_info
if db_notebook_id not in ( notebook_id, trash_id ): continue
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Invite ) and obj.notebook_id == notebook_id and \
obj.email_address == email_address:
database.user_notebook[ user_id ].remove( notebook_info )
User.sql_revoke_invite_access = staticmethod( lambda notebook_id, trash_id, email_address, excluded_user_id: \
lambda database: sql_revoke_invite_access( notebook_id, trash_id, email_address, excluded_user_id, database ) )
def sql_load_revisions( self, database ):
note_list = database.objects.get( self.object_id )
if not note_list: return None
@ -258,21 +275,6 @@ class Test_controller( object ):
Invite.sql_load_notebook_invites = staticmethod( lambda notebook_id:
lambda database: sql_load_notebook_invites( notebook_id, database ) )
def sql_revoke_user_access( self, database ):
invites = []
for ( user_id, notebook_infos ) in database.user_notebook.items():
for ( index, ( notebook_id, read_write, owner ) ) in enumerate( notebook_infos ):
if notebook_id != self.notebook_id: continue
for ( object_id, obj_list ) in database.objects.items():
obj = obj_list[ -1 ]
if isinstance( obj, Invite ) and obj.notebook_id == self.notebook_id and \
obj.email_address == self.email_address:
del( database.user_notebook[ user_id ][ index ] )
Invite.sql_revoke_user_access = lambda self: \
lambda database: sql_revoke_user_access( self, database )
def sql_revoke_invites( self, database ):
invites = []

View File

@ -65,7 +65,9 @@ class Test_users( Test_controller ):
self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address )
self.database.save( self.user, commit = False )
self.database.execute( self.user.sql_save_notebook( notebook_id1, read_write = True, owner = True ), commit = False )
self.database.execute( self.user.sql_save_notebook( trash_id1, read_write = True, owner = True ), commit = False )
self.database.execute( self.user.sql_save_notebook( notebook_id2, read_write = True, owner = True ), commit = False )
self.database.execute( self.user.sql_save_notebook( trash_id2, read_write = True, owner = True ), commit = False )
self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 )
self.database.save( self.user2, commit = False )
@ -368,16 +370,27 @@ class Test_users( Test_controller ):
assert result[ u"user" ]
assert result[ u"user" ].object_id == self.user.object_id
assert result[ u"user" ].username == self.user.username
assert len( result[ u"notebooks" ] ) == 3
assert len( result[ u"notebooks" ] ) == 5
assert result[ u"notebooks" ][ 0 ].object_id == self.notebooks[ 0 ].object_id
assert result[ u"notebooks" ][ 0 ].name == self.notebooks[ 0 ].name
assert result[ u"notebooks" ][ 0 ].read_write == True
assert result[ u"notebooks" ][ 0 ].owner == True
assert result[ u"notebooks" ][ 1 ].object_id == self.notebooks[ 1 ].object_id
assert result[ u"notebooks" ][ 1 ].object_id
assert result[ u"notebooks" ][ 1 ].name == u"trash"
assert result[ u"notebooks" ][ 1 ].read_write == True
assert result[ u"notebooks" ][ 1 ].owner == True
assert result[ u"notebooks" ][ 2 ].object_id == self.anon_notebook.object_id
assert result[ u"notebooks" ][ 2 ].read_write == False
assert result[ u"notebooks" ][ 2 ].owner == False
assert result[ u"notebooks" ][ 2 ].object_id == self.notebooks[ 1 ].object_id
assert result[ u"notebooks" ][ 2 ].name == self.notebooks[ 1 ].name
assert result[ u"notebooks" ][ 2 ].read_write == True
assert result[ u"notebooks" ][ 2 ].owner == True
assert result[ u"notebooks" ][ 3 ].object_id
assert result[ u"notebooks" ][ 3 ].name == u"trash"
assert result[ u"notebooks" ][ 3 ].read_write == True
assert result[ u"notebooks" ][ 3 ].owner == True
assert result[ u"notebooks" ][ 4 ].object_id == self.anon_notebook.object_id
assert result[ u"notebooks" ][ 4 ].name == self.anon_notebook.name
assert result[ u"notebooks" ][ 4 ].read_write == False
assert result[ u"notebooks" ][ 4 ].owner == False
assert result[ u"login_url" ] is None
assert result[ u"logout_url" ] == self.settings[ u"global" ][ u"luminotes.https_url" ] + u"/"
@ -1111,81 +1124,7 @@ class Test_users( Test_controller ):
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id1 = matches.group( 2 )
assert invite_id1
# update the user_notebook table accordingly. this normally happens when an invite is redeemed
self.database.execute( self.user.sql_save_notebook(
self.notebooks[ 0 ].object_id,
read_write = False,
owner = False,
) )
# then send a similar invite to the same email address with read_write and owner set to True
result = self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"owner",
invite_button = u"send invites",
), session_id = self.session_id )
invites = result[ u"invites" ]
assert len( invites ) == 1
invite = invites[ 0 ]
assert invite
assert invite.read_write is True
assert invite.owner is True
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id2 = matches.group( 2 )
assert invite_id2
# assert that both invites have the read_write / owner flags set to True now
invite1_list = self.database.objects.get( invite_id1 )
assert invite1_list
assert len( invite1_list ) >= 2
invite1 = invite1_list[ -1 ]
assert invite1
assert invite1.read_write is True
assert invite1.owner is True
invite2_list = self.database.objects.get( invite_id2 )
assert invite2_list
assert len( invite2_list ) >= 1
invite2 = invite2_list[ -1 ]
assert invite2
assert invite2.read_write is True
assert invite2.owner is True
# assert that the user_notebook table has also been updated accordingly
access = self.database.select_one( bool, self.user.sql_has_access(
self.notebooks[ 0 ].object_id,
read_write = True,
owner = True,
) )
assert access is True
def test_send_invites_similar_already_redeemed( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
# first send an invite with read_write and owner set to False
self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id1 = matches.group( 2 )
assert invite_id1
@ -1234,12 +1173,178 @@ class Test_users( Test_controller ):
assert invite2.owner is True
# assert that the user_notebook table has also been updated accordingly
access = self.database.select_one( bool, self.user2.sql_has_access(
self.notebooks[ 0 ].object_id,
read_write = True,
owner = True,
) )
assert access is True
access = self.database.select_one( bool, self.user2.sql_has_access(
self.notebooks[ 0 ].trash_id,
read_write = True,
owner = True,
) )
assert access is True
def test_send_invites_similar_downgrade( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
# first send an invite with read_write and owner set to False
self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"collaborator",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id1 = matches.group( 2 )
assert invite_id1
# login as another user and redeem the invite
self.login2()
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id1,
), session_id = self.session_id )
# then send a similar invite to the same email address with read_write and owner set to False
self.login()
result = self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
invites = result[ u"invites" ]
assert len( invites ) == 1
invite = invites[ 0 ]
assert invite
assert invite.read_write is False
assert invite.owner is False
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id2 = matches.group( 2 )
assert invite_id2
# assert that both invites have the read_write / owner flags set to False now
invite1_list = self.database.objects.get( invite_id1 )
assert invite1_list
assert len( invite1_list ) >= 2
invite1 = invite1_list[ -1 ]
assert invite1
assert invite1.read_write is False
assert invite1.owner is False
invite2_list = self.database.objects.get( invite_id2 )
assert invite2_list
assert len( invite2_list ) >= 1
invite2 = invite2_list[ -1 ]
assert invite2
assert invite2.read_write is False
assert invite2.owner is False
# assert that the user_notebook table has also been updated accordingly
access = self.database.select_one( bool, self.user2.sql_has_access(
self.notebooks[ 0 ].object_id,
read_write = False,
owner = False,
) )
assert access is True
access = self.database.select_one( bool, self.user2.sql_has_access(
self.notebooks[ 0 ].trash_id,
read_write = False,
owner = False,
) )
assert access is True
def test_send_invites_similar_downgrade_self( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
# first send an invite with read_write and owner set to False
self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"collaborator",
invite_button = u"send invites",
), session_id = self.session_id )
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id1 = matches.group( 2 )
assert invite_id1
# redeem the invite as the same user
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id1,
), session_id = self.session_id )
# then send a similar invite to the same email address with read_write and owner set to False
result = self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
# assert that both invites have the read_write / owner flags set to False now
invites = result[ u"invites" ]
assert len( invites ) == 1
invite = invites[ 0 ]
assert invite
assert invite.read_write is False
assert invite.owner is False
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id2 = matches.group( 2 )
assert invite_id2
# assert that both invites have the read_write / owner flags set to False now
invite1_list = self.database.objects.get( invite_id1 )
assert invite1_list
assert len( invite1_list ) >= 2
invite1 = invite1_list[ -1 ]
assert invite1
assert invite1.read_write is False
assert invite1.owner is False
invite2_list = self.database.objects.get( invite_id2 )
assert invite2_list
assert len( invite2_list ) >= 1
invite2 = invite2_list[ -1 ]
assert invite2
assert invite2.read_write is False
assert invite2.owner is False
# since the user is trying to downgrade their own access, assert that the downgrade was
# prevented and the user still retains their original access
access = self.database.select_one( bool, self.user.sql_has_access(
self.notebooks[ 0 ].object_id,
read_write = True,
owner = True,
) )
assert access is True
access = self.database.select_one( bool, self.user.sql_has_access(
self.notebooks[ 0 ].trash_id,
read_write = True,
owner = True,
) )
assert access is True
def test_send_invites_with_generic_from_address( self ):
Stub_smtp.reset()
@ -1612,6 +1717,89 @@ class Test_users( Test_controller ):
assert len( result[ u"invites" ] ) == 1
assert result[ u"invites" ][ 0 ].email_address == email_addresses_list[ 1 ]
def test_revoke_invite_redeemed( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
result = self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
assert len( result[ u"invites" ] ) == 1
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
self.login2()
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id,
), session_id = self.session_id )
assert cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].object_id )
assert cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].trash_id )
self.login()
result = self.http_post( "/users/revoke_invite", dict(
notebook_id = self.notebooks[ 0 ].object_id,
invite_id = invite_id,
), session_id = self.session_id )
assert result[ u"message" ]
assert len( result[ u"invites" ] ) == 0
assert not cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].object_id )
assert not cherrypy.root.users.check_access( self.user2.object_id, self.notebooks[ 0 ].trash_id )
def test_revoke_invite_redeemed_self( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.login()
self.user.rate_plan = 1
self.database.save( self.user )
email_addresses_list = [ u"foo@example.com" ]
email_addresses = email_addresses_list[ 0 ]
result = self.http_post( "/users/send_invites", dict(
notebook_id = self.notebooks[ 0 ].object_id,
email_addresses = email_addresses,
access = u"viewer",
invite_button = u"send invites",
), session_id = self.session_id )
assert len( result[ u"invites" ] ) == 1
matches = self.INVITE_LINK_PATTERN.search( smtplib.SMTP.message )
invite_id = matches.group( 2 )
result = self.http_post( "/users/redeem_invite", dict(
invite_id = invite_id,
), session_id = self.session_id )
assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].object_id )
assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].trash_id )
result = self.http_post( "/users/revoke_invite", dict(
notebook_id = self.notebooks[ 0 ].object_id,
invite_id = invite_id,
), session_id = self.session_id )
assert result[ u"message" ]
assert len( result[ u"invites" ] ) == 0
# the user should've been prevented from revoking their own access
assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].object_id )
assert cherrypy.root.users.check_access( self.user.object_id, self.notebooks[ 0 ].trash_id )
def test_revoke_invite_without_login( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp

View File

@ -103,11 +103,6 @@ class Invite( Persistent ):
return "select id, revision, from_user_id, notebook_id, email_address, read_write, owner, redeemed_user_id from invite " + \
"where id in ( select max( id ) from invite where notebook_id = %s group by email_address ) order by email_address;" % quote( notebook_id )
def sql_revoke_user_access( self ):
return "delete from user_notebook where notebook_id = %s and user_id in " % quote( self.__notebook_id ) + \
"( select redeemed_user_id from invite where notebook_id = %s and email_address = %s );" % \
( quote( self.__notebook_id ), quote( self.__email_address ) )
def sql_revoke_invites( self ):
return "delete from invite where notebook_id = %s and email_address = %s;" % \
( quote( self.__notebook_id ), quote( self.__email_address ) )

View File

@ -194,6 +194,26 @@ class User( Persistent ):
( quote( read_write and 't' or 'f' ), quote( owner and 't' or 'f' ), quote( self.object_id ),
quote( notebook_id ) )
@staticmethod
def sql_revoke_invite_access( notebook_id, trash_id, email_address, excluded_user_id ):
return \
"""
delete from
user_notebook
where
notebook_id in ( %s, %s ) and
user_notebook.user_id != %s and
user_notebook.user_id in (
select
redeemed_user_id
from
invite
where
notebook_id = %s and
email_address = %s
);
""" % ( quote( notebook_id ), quote( trash_id ), quote( excluded_user_id ), quote( notebook_id ), quote( email_address ) )
def sql_calculate_storage( self ):
"""
Return a SQL string to calculate the total bytes of storage usage by this user. Note that this