diff --git a/controller/Expose.py b/controller/Expose.py index bd7080b..e353390 100644 --- a/controller/Expose.py +++ b/controller/Expose.py @@ -7,6 +7,12 @@ from Validate import Validation_error view_override = None +class Expose_error( Exception ): + def __init__( self, message ): + Exception.__init__( self, message ) + self.__message = message + + def expose( view = None, rss = None ): """ expose() can be used to tag a method as available for publishing to the web via CherryPy. In @@ -68,8 +74,7 @@ def expose( view = None, rss = None ): return unicode( view_override( **result ) ) except: if redirect is None: - print "error: %s" % result - raise + raise Expose_error( result.get( u"error" ) or result ) # if that doesn't work, and there's a redirect, then redirect del( result[ u"redirect" ] ) diff --git a/controller/Notebooks.py b/controller/Notebooks.py index 731b99e..307bc2f 100644 --- a/controller/Notebooks.py +++ b/controller/Notebooks.py @@ -120,6 +120,8 @@ class Notebooks( object ): if notebook is None: note = None + elif note_id == u"blank": + note = Note( note_id ) else: note = notebook.lookup_note( note_id ) diff --git a/controller/Root.py b/controller/Root.py index 031f80c..87b21f5 100644 --- a/controller/Root.py +++ b/controller/Root.py @@ -7,6 +7,7 @@ from Async import async from Notebooks import Notebooks from Users import Users from Updater import update_client, wait_for_update +from Database import Valid_id from view.Main_page import Main_page from view.Json import Json from view.Error_page import Error_page @@ -38,10 +39,24 @@ class Root( object ): database, settings[ u"global" ].get( u"luminotes.http_url", u"" ), settings[ u"global" ].get( u"luminotes.https_url", u"" ), + settings[ u"global" ].get( u"luminotes.support_email", u"" ), settings[ u"global" ].get( u"luminotes.rate_plans", [] ), ) self.__notebooks = Notebooks( scheduler, database, self.__users ) + @expose() + def default( self, password_reset_id ): + # if the value looks like an id, assume it's a password reset id, and redirect + try: + validator = Valid_id() + password_reset_id = validator( password_reset_id ) + except ValueError: + raise cherrypy.NotFound + + return dict( + redirect = u"/users/redeem_reset/%s" % password_reset_id, + ) + @expose( view = Main_page ) def index( self ): """ @@ -86,10 +101,20 @@ class Root( object ): cherrypy.response.body = [ unicode( Not_found_page( self.__settings[ u"global" ].get( u"luminotes.support_email" ) ) ) ] return + import sys import traceback traceback.print_exc() - cherrypy.response.body = [ unicode( Error_page( self.__settings[ u"global" ].get( u"luminotes.support_email" ) ) ) ] + exc_info = sys.exc_info() + if exc_info: + message = exc_info[ 1 ].message + else: + message = None + + cherrypy.response.body = [ unicode( Error_page( + self.__settings[ u"global" ].get( u"luminotes.support_email" ), + message, + ) ) ] scheduler = property( lambda self: self.__scheduler ) database = property( lambda self: self.__database ) diff --git a/controller/Users.py b/controller/Users.py index 0eea8cb..cbb5f72 100644 --- a/controller/Users.py +++ b/controller/Users.py @@ -1,8 +1,10 @@ import re import cherrypy +from datetime import datetime, timedelta from model.User import User from model.Notebook import Notebook from model.Note import Note +from model.Password_reset import Password_reset from Scheduler import Scheduler from Expose import expose from Validate import validate, Valid_string, Valid_bool, Validation_error @@ -11,6 +13,8 @@ from Updater import update_client, wait_for_update from Expire import strongly_expire from Async import async from view.Json import Json +from view.Main_page import Main_page +from view.Redeem_reset_note import Redeem_reset_note USERNAME_PATTERN = re.compile( "^[a-zA-Z0-9]+$" ) @@ -55,6 +59,17 @@ class Authentication_error( Exception ): ) +class Password_reset_error( Exception ): + def __init__( self, message ): + Exception.__init__( self, message ) + self.__message = message + + def to_dict( self ): + return dict( + error = self.__message + ) + + def grab_user_id( function ): """ A decorator to grab the current logged in user id from the cherrypy session and pass it as a @@ -106,7 +121,7 @@ class Users( object ): """ Controller for dealing with users, corresponding to the "/users" URL. """ - def __init__( self, scheduler, database, http_url, https_url, rate_plans ): + def __init__( self, scheduler, database, http_url, https_url, support_email, rate_plans ): """ Create a new Users object. @@ -118,6 +133,8 @@ class Users( object ): @param http_url: base URL to use for non-SSL http requests, or an empty string @type https_url: unicode @param https_url: base URL to use for SSL http requests, or an empty string + @type support_email: unicode + @param support_email: email address for support requests @type rate_plans: [ { "name": unicode, "storage_quota_bytes": int } ] @param rate_plans: list of configured rate plans @rtype: Users @@ -127,6 +144,7 @@ class Users( object ): self.__database = database self.__http_url = http_url self.__https_url = https_url + self.__support_email = support_email self.__rate_plans = rate_plans @expose( view = Json ) @@ -193,6 +211,13 @@ class Users( object ): user = User( user_id, username, password, email_address, notebooks = [ notebook ] ) self.__database.save( user ) + # add the new user to the user list + self.__database.load( u"User_list all", self.scheduler.thread ) + user_list = ( yield Scheduler.SLEEP ) + if user_list: + user_list.add_user( user ) + self.__database.save( user_list ) + redirect = u"/notebooks/%s" % notebook.object_id yield dict( @@ -286,6 +311,7 @@ class Users( object ): 'login_url': url, 'rate_plan': rateplandict, } + @raise Validation_error: one of the arguments is invalid """ # if there's no logged-in user, default to the anonymous user self.__database.load( user_id or u"User anonymous", self.__scheduler.thread ) @@ -371,4 +397,205 @@ class Users( object ): yield callback, user + @expose( view = Json ) + @wait_for_update + @async + @update_client + @validate( + email_address = ( Valid_string( min = 1, max = 60 ), valid_email_address ), + send_reset_button = unicode, + ) + def send_reset( self, email_address, send_reset_button ): + """ + Send a password reset email to the given email address. + @type email_address: unicode + @param email_address: an existing user's email address + @type send_reset_button: unicode + @param send_reset_button: ignored + @rtype: json dict + @return: { 'error': message } + @raise Password_reset_error: an error occured when sending the password reset email + @raise Validation_error: one of the arguments is invalid + """ + import sha + import random + import smtplib + from email import Message + + # check whether there are actually any users with the given email address + self.__database.load( u"User_list all", self.scheduler.thread ) + user_list = ( yield Scheduler.SLEEP ) + + if not user_list: + raise Password_reset_error( "There was an error when sending your password reset email. Please contact %s." % self.__support_email ) + + users = [ user for user in user_list.users if user.email_address == email_address ] + if len( users ) == 0: + raise Password_reset_error( u"There are no Luminotes users with the email address %s" % email_address ) + + # record the sending of this reset email + self.__database.next_id( self.__scheduler.thread ) + password_reset_id = ( yield Scheduler.SLEEP ) + password_reset = Password_reset( password_reset_id, email_address ) + self.__database.save( password_reset ) + + # create an email message with a unique link + message = Message.Message() + message[ u"from" ] = u"Luminotes support <%s>" % self.__support_email + message[ u"to" ] = email_address + message[ u"subject" ] = u"Luminotes password reset" + message.set_payload( + u"Someone has requested a password reset for a Luminotes user with your email\n" + + u"address. If this someone is you, please visit the following link for a\n" + + u"username reminder or a password reset:\n\n" + + u"%s/%s\n\n" % ( self.__https_url or self.__http_url, password_reset.object_id ) + + u"This link will expire in 24 hours.\n\n" + + u"Thanks!" + ) + + # send the message out through localhost's smtp server + server = smtplib.SMTP() + server.connect() + server.sendmail( message[ u"from" ], [ email_address ], message.as_string() ) + server.quit() + + # FIXME: this should really be "message =" instead of "error =" since it's not an error, but at + # least this gets it displayed on the client + yield dict( + error = u"Please check your inbox. A password reset email has been sent to %s" % email_address, + ) + + @expose( view = Main_page ) + @strongly_expire + @wait_for_update + @async + @update_client + @validate( + password_reset_id = Valid_id(), + ) + def redeem_reset( self, password_reset_id ): + """ + Provide the information necessary to display the web site's main page along with a dynamically + generated "complete your password reset" note. + @type password_reset_id: unicode + @param password_reset_id: id of model.Password_reset to redeem + @rtype: unicode + @return: rendered HTML page + @raise Password_reset_error: an error occured when redeeming the password reset, such as an expired link + @raise Validation_error: one of the arguments is invalid + """ + self.__database.load( u"User anonymous", self.__scheduler.thread ) + anonymous = ( yield Scheduler.SLEEP ) + + if not anonymous or len( anonymous.notebooks ) == 0: + raise Password_reset_error( "There was an error when completing your password reset. Please contact %s." % self.__support_email ) + + self.__database.load( password_reset_id, self.__scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + + if not password_reset or datetime.now() - password_reset.revision > timedelta( hours = 25 ): + raise Password_reset_error( "Your password reset link has expired. Please request a new password reset email." ) + + if password_reset.redeemed: + raise Password_reset_error( "Your password has already been reset. Please request a new password reset email." ) + + self.__database.load( u"User_list all", self.__scheduler.thread ) + user_list = ( yield Scheduler.SLEEP ) + + if not user_list: + raise Password_reset_error( u"There are no Luminotes users with the email address %s" % password_reset.email_address ) + + # find the user(s) with the email address from the password reset request + matching_users = [ user for user in user_list.users if user.email_address == password_reset.email_address ] + + if len( matching_users ) == 0: + raise Password_reset_error( u"There are no Luminotes users with the email address %s" % password_reset.email_address ) + + yield dict( + notebook_id = anonymous.notebooks[ 0 ].object_id, + note_id = u"blank", + note_contents = unicode( Redeem_reset_note( password_reset_id, matching_users ) ), + ) + + @expose( view = Json ) + @wait_for_update + @async + @update_client + def reset_password( self, password_reset_id, reset_button, **new_passwords ): + """ + Reset all the users with the provided passwords. + @type password_reset_id: unicode + @param password_reset_id: id of model.Password_reset to use + @type reset_button: unicode + @param reset_button: return + @type new_passwords: { userid: [ newpassword, newpasswordrepeat ] } + @param new_passwords: map of user id to new passwords or empty strings + @rtype: json dict + @return: { 'redirect': '/' } + @raise Password_reset_error: an error occured when resetting the passwords, such as an expired link + """ + try: + id_validator = Valid_id() + id_validator( password_reset_id ) + except ValueError: + raise Validation_error( "password_reset_id", password_reset_id, id_validator, "is not a valid id" ) + + self.__database.load( password_reset_id, self.__scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + + if not password_reset or datetime.now() - password_reset.revision > timedelta( hours = 25 ): + raise Password_reset_error( "Your password reset link has expired. Please request a new password reset email." ) + + if password_reset.redeemed: + raise Password_reset_error( "Your password has already been reset. Please request a new password reset email." ) + + self.__database.load( u"User_list all", self.__scheduler.thread ) + user_list = ( yield Scheduler.SLEEP ) + + if not user_list: + raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email ) + + # find the user(s) with the email address from the password reset request + matching_users = [ user for user in user_list.users if user.email_address == password_reset.email_address ] + allowed_user_ids = [ user.object_id for user in matching_users ] + + # reset any passwords that are non-blank + users_to_reset = [] + for ( user_id, ( new_password, new_password_repeat ) ) in new_passwords.items(): + if user_id not in allowed_user_ids: + raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email ) + + # skip blank passwords + if new_password == u"" and new_password_repeat == u"": + continue + + self.__database.load( user_id, self.__scheduler.thread ) + user = ( yield Scheduler.SLEEP ) + + if not user: + raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email ) + + # ensure the passwords match + if new_password != new_password_repeat: + raise Password_reset_error( u"The new passwords you entered for user %s do not match. Please try again." % user.username ) + + # ensure the new password isn't too long + if len( new_password ) > 30: + raise Password_reset_error( u"Your password can be no longer than 30 characters." ) + + users_to_reset.append( ( user, new_password ) ) + + for ( user, new_password ) in users_to_reset: + user.password = new_password + self.__database.save( user ) + + # if all the new passwords provided are blank, bail + if not users_to_reset: + raise Password_reset_error( u"Please enter a new password. Or, if you already know your password, just click the login link above." ) + + password_reset.redeemed = True + self.__database.save( password_reset ) + + yield dict( redirect = u"/" ) + scheduler = property( lambda self: self.__scheduler ) diff --git a/controller/test/Stub_smtp.py b/controller/test/Stub_smtp.py new file mode 100644 index 0000000..392ba46 --- /dev/null +++ b/controller/test/Stub_smtp.py @@ -0,0 +1,29 @@ +class Stub_smtp( object ): + """ + A stub intended to replace smtplib.SMTP for unit testing code that depends on it. + """ + connected = False + from_address = None + to_addresses = None + message = None + + def connect( self ): + Stub_smtp.connected = True + + def sendmail( self, from_address, to_addresses, message ): + if not Stub_smtp.connected: + raise Exception( "not connected to the server" ) + + Stub_smtp.from_address = from_address + Stub_smtp.to_addresses = to_addresses + Stub_smtp.message = message + + def quit( self ): + Stub_smtp.connected = False + + @staticmethod + def reset(): + Stub_smtp.connected = False + Stub_smtp.from_address = None + Stub_smtp.to_addresses = None + Stub_smtp.message = None diff --git a/controller/test/Test_controller.py b/controller/test/Test_controller.py index 483bcb8..8ad62a6 100644 --- a/controller/test/Test_controller.py +++ b/controller/test/Test_controller.py @@ -19,14 +19,15 @@ class Test_controller( object ): u"luminotes.https_url" : u"https://luminotes.com", u"luminotes.http_proxy_ip" : u"127.0.0.1", u"luminotes.https_proxy_ip" : u"127.0.0.2", - "luminotes.rate_plans": [ + u"luminotes.support_email": "unittest@luminotes.com", + u"luminotes.rate_plans": [ { - "name": "super", - "storage_quota_bytes": 1337, + u"name": u"super", + u"storage_quota_bytes": 1337, }, { - "name": "extra super", - "storage_quota_bytes": 31337, + u"name": "extra super", + u"storage_quota_bytes": 31337, }, ], }, diff --git a/controller/test/Test_notebooks.py b/controller/test/Test_notebooks.py index 7b8fe7b..a3c3a57 100644 --- a/controller/test/Test_notebooks.py +++ b/controller/test/Test_notebooks.py @@ -155,6 +155,29 @@ class Test_notebooks( Test_controller ): assert note.object_id == self.note.object_id assert self.user.storage_bytes == 0 + def test_contents_with_blank_note( self ): + self.login() + + result = self.http_get( + "/notebooks/contents?notebook_id=%s¬e_id=blank" % self.notebook.object_id , + session_id = self.session_id, + ) + + notebook = result[ "notebook" ] + startup_notes = result[ "startup_notes" ] + + assert notebook.object_id == self.notebook.object_id + assert len( startup_notes ) == 1 + assert startup_notes[ 0 ] == self.note + + note = result[ "note" ] + + assert note.object_id == u"blank" + assert note.contents == None + assert note.title == None + assert note.deleted_from == None + assert self.user.storage_bytes == 0 + def test_contents_without_login( self ): result = self.http_get( "/notebooks/contents?notebook_id=%s" % self.notebook.object_id, diff --git a/controller/test/Test_root.py b/controller/test/Test_root.py index 38c8b90..19c8f54 100644 --- a/controller/test/Test_root.py +++ b/controller/test/Test_root.py @@ -80,3 +80,9 @@ class Test_root( Test_controller ): login_button = u"login", ) ) self.session_id = result[ u"session_id" ] + + def test_redeem_reset( self ): + redeem_reset_id = u"foobarbaz" + result = self.http_get( "/%s" % redeem_reset_id ) + + assert result[ u"redirect" ] == u"/users/redeem_reset/%s" % redeem_reset_id diff --git a/controller/test/Test_users.py b/controller/test/Test_users.py index 64f9836..15052f3 100644 --- a/controller/test/Test_users.py +++ b/controller/test/Test_users.py @@ -1,12 +1,20 @@ +import re import cherrypy +import smtplib +from datetime import datetime, timedelta +from nose.tools import raises from Test_controller import Test_controller +from Stub_smtp import Stub_smtp from controller.Scheduler import Scheduler from model.User import User from model.Notebook import Notebook from model.Note import Note +from model.User_list import User_list class Test_users( Test_controller ): + RESET_LINK_PATTERN = re.compile( "(https?://\S+)?/(\S+)" ) + def setUp( self ): Test_controller.setUp( self ) @@ -16,7 +24,11 @@ class Test_users( Test_controller ): self.new_username = u"reynolds" self.new_password = u"shiny" self.new_email_address = u"capn@example.com" + self.username2 = u"scully" + self.password2 = u"trustsome1" + self.email_address2 = u"outthere@example.com" self.user = None + self.user2 = None self.anonymous = None self.notebooks = None @@ -45,10 +57,21 @@ class Test_users( Test_controller ): self.database.next_id( self.scheduler.thread ) self.user = User( ( yield Scheduler.SLEEP ), self.username, self.password, self.email_address, self.notebooks ) self.database.next_id( self.scheduler.thread ) + self.user2 = User( ( yield Scheduler.SLEEP ), self.username2, self.password2, self.email_address2 ) + self.database.next_id( self.scheduler.thread ) self.anonymous = User( ( yield Scheduler.SLEEP ), u"anonymous", None, None, [ self.anon_notebook ] ) + self.database.next_id( self.scheduler.thread ) + user_list_id = ( yield Scheduler.SLEEP ) + user_list = User_list( user_list_id, u"all" ) + user_list.add_user( self.user ) + user_list.add_user( self.user2 ) + user_list.add_user( self.anonymous ) + self.database.save( self.user ) + self.database.save( self.user2 ) self.database.save( self.anonymous ) + self.database.save( user_list ) def test_signup( self ): result = self.http_post( "/users/signup", dict( @@ -264,3 +287,427 @@ class Test_users( Test_controller ): g = gen() self.scheduler.add( g ) self.scheduler.wait_for( g ) + + def test_send_reset( self ): + # trick send_reset() into using a fake SMTP server + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + result = self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + session_id = result[ u"session_id" ] + + assert u"has been sent to" in result[ u"error" ] + assert smtplib.SMTP.connected == False + assert "<%s>" % self.settings[ u"global" ][ u"luminotes.support_email" ] in smtplib.SMTP.from_address + assert smtplib.SMTP.to_addresses == [ self.user.email_address ] + assert u"password reset" in smtplib.SMTP.message + assert self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + + def test_send_reset_to_unknown_email_address( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + result = self.http_post( "/users/send_reset", dict( + email_address = u"unknown@example.com", + send_reset_button = u"email me", + ) ) + + assert u"no Luminotes user" in result[ u"error" ] + assert smtplib.SMTP.connected == False + assert smtplib.SMTP.from_address == None + assert smtplib.SMTP.to_addresses == None + assert smtplib.SMTP.message == None + + def test_redeem_reset( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + result = self.http_get( "/users/redeem_reset/%s" % password_reset_id ) + + assert result[ u"notebook_id" ] == self.anonymous.notebooks[ 0 ].object_id + assert result[ u"note_id" ] + assert u"password reset" in result[ u"note_contents" ] + assert self.user.username in result[ u"note_contents" ] + assert self.user2.username in result[ u"note_contents" ] + + def test_redeem_reset_unknown( self ): + password_reset_id = u"unknownresetid" + result = self.http_get( "/users/redeem_reset/%s" % password_reset_id ) + + assert u"expired" in result[ u"error" ] + + def test_redeem_reset_expired( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + # to trigger expiration, pretend that the password reset was made 25 hours ago + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + password_reset._Persistent__revision = datetime.now() - timedelta( hours = 25 ) + self.database.save( password_reset ) + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + result = self.http_get( "/users/redeem_reset/%s" % password_reset_id ) + + assert u"expired" in result[ u"error" ] + + def test_redeem_reset_already_redeemed( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + password_reset.redeemed = True + self.database.save( password_reset ) + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + result = self.http_get( "/users/redeem_reset/%s" % password_reset_id ) + + assert u"already" in result[ u"error" ] + + def test_redeem_reset_unknown_email( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + password_reset._Password_reset__email_address = u"unknown@example.com" + self.database.save( password_reset ) + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + result = self.http_get( "/users/redeem_reset/%s" % password_reset_id ) + + assert u"email address" in result[ u"error" ] + + def test_reset_password( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + new_password = u"newpass" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that the password reset is now marked as redeemed + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + assert password_reset.redeemed + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + # check that the password was actually reset for one of the users, but not the other + assert self.user.check_password( new_password ) + assert self.user2.check_password( self.password2 ) + assert result[ u"redirect" ] + + def test_reset_password_unknown_reset_id( self ): + new_password = u"newpass" + password_reset_id = u"unknownresetid" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert u"expired" in result[ "error" ] + + def test_reset_password_invalid_reset_id( self ): + new_password = u"newpass" + password_reset_id = u"invalid reset id" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert u"valid" in result[ "error" ] + + def test_reset_password_expired( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + # to trigger expiration, pretend that the password reset was made 25 hours ago + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + password_reset._Persistent__revision = datetime.now() - timedelta( hours = 25 ) + self.database.save( password_reset ) + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + new_password = u"newpass" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that the password reset is not marked as redeemed + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + assert password_reset.redeemed == False + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert u"expired" in result[ "error" ] + + def test_reset_password_expired( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + password_reset.redeemed = True + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + new_password = u"newpass" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert u"already" in result[ "error" ] + + def test_reset_password_unknown_user_id( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + new_password = u"newpass" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( u"unknown", u"foo" ), + ( u"unknown", u"foo" ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert result[ "error" ] + + def test_reset_password_non_matching( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + new_password = u"newpass" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, u"nonmatchingpass" ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert result[ "error" ] + + def test_reset_password_blank( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, u"" ), + ( self.user.object_id, u"" ), + ( self.user2.object_id, u"" ), + ( self.user2.object_id, u"" ), + ) ) + + # check that neither user's password has changed + assert self.user.check_password( self.password ) + assert self.user2.check_password( self.password2 ) + assert result[ "error" ] + + def test_reset_password_multiple_users( self ): + Stub_smtp.reset() + smtplib.SMTP = Stub_smtp + + self.http_post( "/users/send_reset", dict( + email_address = self.user.email_address, + send_reset_button = u"email me", + ) ) + + matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message ) + password_reset_id = matches.group( 2 ) + assert password_reset_id + + new_password = u"newpass" + new_password2 = u"newpass2" + result = self.http_post( "/users/reset_password", ( + ( u"password_reset_id", password_reset_id ), + ( u"reset_button", u"reset passwords" ), + ( self.user.object_id, new_password ), + ( self.user.object_id, new_password ), + ( self.user2.object_id, new_password2 ), + ( self.user2.object_id, new_password2 ), + ) ) + + # check that the password reset is now marked as redeemed + def gen(): + self.database.load( password_reset_id, self.scheduler.thread ) + password_reset = ( yield Scheduler.SLEEP ) + assert password_reset.redeemed + + g = gen() + self.scheduler.add( g ) + self.scheduler.wait_for( g ) + + # check that the password was actually reset for both users + assert self.user.check_password( new_password ) + assert self.user2.check_password( new_password2 ) + assert result[ u"redirect" ] diff --git a/model/Note.py b/model/Note.py index cfbc989..33b8c96 100644 --- a/model/Note.py +++ b/model/Note.py @@ -38,6 +38,10 @@ class Note( Persistent ): self.update_revision() self.__contents = contents + if contents is None: + self.__title = None + return + # parse title out of the beginning of the contents result = Note.TITLE_PATTERN.search( contents ) diff --git a/model/Password_reset.py b/model/Password_reset.py new file mode 100644 index 0000000..a2d4a30 --- /dev/null +++ b/model/Password_reset.py @@ -0,0 +1,29 @@ +from Persistent import Persistent + + +class Password_reset( Persistent ): + """ + A request for a password reset. + """ + def __init__( self, id, email_address ): + """ + Create a password reset request with the given id. + + @type id: unicode + @param id: id of the password reset + @type email_address: unicode + @param email_address: where the reset confirmation was emailed + @rtype: Password_reset + @return: newly constructed password reset + """ + Persistent.__init__( self, id ) + self.__email_address = email_address + self.__redeemed = False + + def __set_redeemed( self, redeemed ): + if redeemed != self.__redeemed: + self.update_revision() + self.__redeemed = redeemed + + email_address = property( lambda self: self.__email_address ) + redeemed = property( lambda self: self.__redeemed, __set_redeemed ) diff --git a/model/User_list.py b/model/User_list.py new file mode 100644 index 0000000..fc53683 --- /dev/null +++ b/model/User_list.py @@ -0,0 +1,48 @@ +from copy import copy +from Persistent import Persistent + + +class User_list( Persistent ): + """ + A list of users. + """ + def __init__( self, id, secondary_id = None ): + """ + Create a list of users, and give the list the provided id. + + @type id: unicode + @param id: id of the user list + @type secondary_id: unicode or NoneType + @param secondary_id: convenience id for easy access (optional) + @rtype: User_list + @return: newly constructed user list + """ + Persistent.__init__( self, id, secondary_id ) + self.__users = [] + + def add_user( self, user ): + """ + Add a user to this list. + + @type user: User + @param user: user to add + """ + if user.object_id not in [ u.object_id for u in self.__users ]: + self.update_revision() + self.__users.append( user ) + + def remove_user( self, user ): + """ + Remove a user from this list. + + @type user: User + @param user: user to remove + """ + if user in self.__users: + self.update_revision() + self.__users.remove( user ) + + def __set_users( self, users ): + self.__users = users + + users = property( lambda self: copy( self.__users ), __set_users ) diff --git a/model/test/Test_note.py b/model/test/Test_note.py index b485986..684dc91 100644 --- a/model/test/Test_note.py +++ b/model/test/Test_note.py @@ -15,6 +15,15 @@ class Test_note( object ): assert self.note.title == self.title assert self.note.deleted_from == None + def test_create_blank( self ): + object_id = u"22" + blank_note = Note( object_id ) + + assert blank_note.object_id == object_id + assert blank_note.contents == None + assert blank_note.title == None + assert blank_note.deleted_from == None + def test_set_contents( self ): new_title = u"new title" new_contents = u"