witten
/
luminotes
Archived
1
0
Fork 0

Self-service password reset functionality, plus full unit tests.

GOD DAMN that was annoying to implement.
This commit is contained in:
Dan Helfman 2007-09-26 23:49:27 +00:00
parent 4a39a798cd
commit f6da052e88
24 changed files with 1090 additions and 17 deletions

View File

@ -7,6 +7,12 @@ from Validate import Validation_error
view_override = None
class Expose_error( Exception ):
def __init__( self, message ):
Exception.__init__( self, message )
self.__message = message
def expose( view = None, rss = None ):
"""
expose() can be used to tag a method as available for publishing to the web via CherryPy. In
@ -68,8 +74,7 @@ def expose( view = None, rss = None ):
return unicode( view_override( **result ) )
except:
if redirect is None:
print "error: %s" % result
raise
raise Expose_error( result.get( u"error" ) or result )
# if that doesn't work, and there's a redirect, then redirect
del( result[ u"redirect" ] )

View File

@ -120,6 +120,8 @@ class Notebooks( object ):
if notebook is None:
note = None
elif note_id == u"blank":
note = Note( note_id )
else:
note = notebook.lookup_note( note_id )

View File

@ -7,6 +7,7 @@ from Async import async
from Notebooks import Notebooks
from Users import Users
from Updater import update_client, wait_for_update
from Database import Valid_id
from view.Main_page import Main_page
from view.Json import Json
from view.Error_page import Error_page
@ -38,10 +39,24 @@ class Root( object ):
database,
settings[ u"global" ].get( u"luminotes.http_url", u"" ),
settings[ u"global" ].get( u"luminotes.https_url", u"" ),
settings[ u"global" ].get( u"luminotes.support_email", u"" ),
settings[ u"global" ].get( u"luminotes.rate_plans", [] ),
)
self.__notebooks = Notebooks( scheduler, database, self.__users )
@expose()
def default( self, password_reset_id ):
# if the value looks like an id, assume it's a password reset id, and redirect
try:
validator = Valid_id()
password_reset_id = validator( password_reset_id )
except ValueError:
raise cherrypy.NotFound
return dict(
redirect = u"/users/redeem_reset/%s" % password_reset_id,
)
@expose( view = Main_page )
def index( self ):
"""
@ -86,10 +101,20 @@ class Root( object ):
cherrypy.response.body = [ unicode( Not_found_page( self.__settings[ u"global" ].get( u"luminotes.support_email" ) ) ) ]
return
import sys
import traceback
traceback.print_exc()
cherrypy.response.body = [ unicode( Error_page( self.__settings[ u"global" ].get( u"luminotes.support_email" ) ) ) ]
exc_info = sys.exc_info()
if exc_info:
message = exc_info[ 1 ].message
else:
message = None
cherrypy.response.body = [ unicode( Error_page(
self.__settings[ u"global" ].get( u"luminotes.support_email" ),
message,
) ) ]
scheduler = property( lambda self: self.__scheduler )
database = property( lambda self: self.__database )

View File

@ -1,8 +1,10 @@
import re
import cherrypy
from datetime import datetime, timedelta
from model.User import User
from model.Notebook import Notebook
from model.Note import Note
from model.Password_reset import Password_reset
from Scheduler import Scheduler
from Expose import expose
from Validate import validate, Valid_string, Valid_bool, Validation_error
@ -11,6 +13,8 @@ from Updater import update_client, wait_for_update
from Expire import strongly_expire
from Async import async
from view.Json import Json
from view.Main_page import Main_page
from view.Redeem_reset_note import Redeem_reset_note
USERNAME_PATTERN = re.compile( "^[a-zA-Z0-9]+$" )
@ -55,6 +59,17 @@ class Authentication_error( Exception ):
)
class Password_reset_error( Exception ):
def __init__( self, message ):
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
def grab_user_id( function ):
"""
A decorator to grab the current logged in user id from the cherrypy session and pass it as a
@ -106,7 +121,7 @@ class Users( object ):
"""
Controller for dealing with users, corresponding to the "/users" URL.
"""
def __init__( self, scheduler, database, http_url, https_url, rate_plans ):
def __init__( self, scheduler, database, http_url, https_url, support_email, rate_plans ):
"""
Create a new Users object.
@ -118,6 +133,8 @@ class Users( object ):
@param http_url: base URL to use for non-SSL http requests, or an empty string
@type https_url: unicode
@param https_url: base URL to use for SSL http requests, or an empty string
@type support_email: unicode
@param support_email: email address for support requests
@type rate_plans: [ { "name": unicode, "storage_quota_bytes": int } ]
@param rate_plans: list of configured rate plans
@rtype: Users
@ -127,6 +144,7 @@ class Users( object ):
self.__database = database
self.__http_url = http_url
self.__https_url = https_url
self.__support_email = support_email
self.__rate_plans = rate_plans
@expose( view = Json )
@ -193,6 +211,13 @@ class Users( object ):
user = User( user_id, username, password, email_address, notebooks = [ notebook ] )
self.__database.save( user )
# add the new user to the user list
self.__database.load( u"User_list all", self.scheduler.thread )
user_list = ( yield Scheduler.SLEEP )
if user_list:
user_list.add_user( user )
self.__database.save( user_list )
redirect = u"/notebooks/%s" % notebook.object_id
yield dict(
@ -286,6 +311,7 @@ class Users( object ):
'login_url': url,
'rate_plan': rateplandict,
}
@raise Validation_error: one of the arguments is invalid
"""
# if there's no logged-in user, default to the anonymous user
self.__database.load( user_id or u"User anonymous", self.__scheduler.thread )
@ -371,4 +397,205 @@ class Users( object ):
yield callback, user
@expose( view = Json )
@wait_for_update
@async
@update_client
@validate(
email_address = ( Valid_string( min = 1, max = 60 ), valid_email_address ),
send_reset_button = unicode,
)
def send_reset( self, email_address, send_reset_button ):
"""
Send a password reset email to the given email address.
@type email_address: unicode
@param email_address: an existing user's email address
@type send_reset_button: unicode
@param send_reset_button: ignored
@rtype: json dict
@return: { 'error': message }
@raise Password_reset_error: an error occured when sending the password reset email
@raise Validation_error: one of the arguments is invalid
"""
import sha
import random
import smtplib
from email import Message
# check whether there are actually any users with the given email address
self.__database.load( u"User_list all", self.scheduler.thread )
user_list = ( yield Scheduler.SLEEP )
if not user_list:
raise Password_reset_error( "There was an error when sending your password reset email. Please contact %s." % self.__support_email )
users = [ user for user in user_list.users if user.email_address == email_address ]
if len( users ) == 0:
raise Password_reset_error( u"There are no Luminotes users with the email address %s" % email_address )
# record the sending of this reset email
self.__database.next_id( self.__scheduler.thread )
password_reset_id = ( yield Scheduler.SLEEP )
password_reset = Password_reset( password_reset_id, email_address )
self.__database.save( password_reset )
# create an email message with a unique link
message = Message.Message()
message[ u"from" ] = u"Luminotes support <%s>" % self.__support_email
message[ u"to" ] = email_address
message[ u"subject" ] = u"Luminotes password reset"
message.set_payload(
u"Someone has requested a password reset for a Luminotes user with your email\n" +
u"address. If this someone is you, please visit the following link for a\n" +
u"username reminder or a password reset:\n\n" +
u"%s/%s\n\n" % ( self.__https_url or self.__http_url, password_reset.object_id ) +
u"This link will expire in 24 hours.\n\n" +
u"Thanks!"
)
# send the message out through localhost's smtp server
server = smtplib.SMTP()
server.connect()
server.sendmail( message[ u"from" ], [ email_address ], message.as_string() )
server.quit()
# FIXME: this should really be "message =" instead of "error =" since it's not an error, but at
# least this gets it displayed on the client
yield dict(
error = u"Please check your inbox. A password reset email has been sent to %s" % email_address,
)
@expose( view = Main_page )
@strongly_expire
@wait_for_update
@async
@update_client
@validate(
password_reset_id = Valid_id(),
)
def redeem_reset( self, password_reset_id ):
"""
Provide the information necessary to display the web site's main page along with a dynamically
generated "complete your password reset" note.
@type password_reset_id: unicode
@param password_reset_id: id of model.Password_reset to redeem
@rtype: unicode
@return: rendered HTML page
@raise Password_reset_error: an error occured when redeeming the password reset, such as an expired link
@raise Validation_error: one of the arguments is invalid
"""
self.__database.load( u"User anonymous", self.__scheduler.thread )
anonymous = ( yield Scheduler.SLEEP )
if not anonymous or len( anonymous.notebooks ) == 0:
raise Password_reset_error( "There was an error when completing your password reset. Please contact %s." % self.__support_email )
self.__database.load( password_reset_id, self.__scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
if not password_reset or datetime.now() - password_reset.revision > timedelta( hours = 25 ):
raise Password_reset_error( "Your password reset link has expired. Please request a new password reset email." )
if password_reset.redeemed:
raise Password_reset_error( "Your password has already been reset. Please request a new password reset email." )
self.__database.load( u"User_list all", self.__scheduler.thread )
user_list = ( yield Scheduler.SLEEP )
if not user_list:
raise Password_reset_error( u"There are no Luminotes users with the email address %s" % password_reset.email_address )
# find the user(s) with the email address from the password reset request
matching_users = [ user for user in user_list.users if user.email_address == password_reset.email_address ]
if len( matching_users ) == 0:
raise Password_reset_error( u"There are no Luminotes users with the email address %s" % password_reset.email_address )
yield dict(
notebook_id = anonymous.notebooks[ 0 ].object_id,
note_id = u"blank",
note_contents = unicode( Redeem_reset_note( password_reset_id, matching_users ) ),
)
@expose( view = Json )
@wait_for_update
@async
@update_client
def reset_password( self, password_reset_id, reset_button, **new_passwords ):
"""
Reset all the users with the provided passwords.
@type password_reset_id: unicode
@param password_reset_id: id of model.Password_reset to use
@type reset_button: unicode
@param reset_button: return
@type new_passwords: { userid: [ newpassword, newpasswordrepeat ] }
@param new_passwords: map of user id to new passwords or empty strings
@rtype: json dict
@return: { 'redirect': '/' }
@raise Password_reset_error: an error occured when resetting the passwords, such as an expired link
"""
try:
id_validator = Valid_id()
id_validator( password_reset_id )
except ValueError:
raise Validation_error( "password_reset_id", password_reset_id, id_validator, "is not a valid id" )
self.__database.load( password_reset_id, self.__scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
if not password_reset or datetime.now() - password_reset.revision > timedelta( hours = 25 ):
raise Password_reset_error( "Your password reset link has expired. Please request a new password reset email." )
if password_reset.redeemed:
raise Password_reset_error( "Your password has already been reset. Please request a new password reset email." )
self.__database.load( u"User_list all", self.__scheduler.thread )
user_list = ( yield Scheduler.SLEEP )
if not user_list:
raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email )
# find the user(s) with the email address from the password reset request
matching_users = [ user for user in user_list.users if user.email_address == password_reset.email_address ]
allowed_user_ids = [ user.object_id for user in matching_users ]
# reset any passwords that are non-blank
users_to_reset = []
for ( user_id, ( new_password, new_password_repeat ) ) in new_passwords.items():
if user_id not in allowed_user_ids:
raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email )
# skip blank passwords
if new_password == u"" and new_password_repeat == u"":
continue
self.__database.load( user_id, self.__scheduler.thread )
user = ( yield Scheduler.SLEEP )
if not user:
raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email )
# ensure the passwords match
if new_password != new_password_repeat:
raise Password_reset_error( u"The new passwords you entered for user %s do not match. Please try again." % user.username )
# ensure the new password isn't too long
if len( new_password ) > 30:
raise Password_reset_error( u"Your password can be no longer than 30 characters." )
users_to_reset.append( ( user, new_password ) )
for ( user, new_password ) in users_to_reset:
user.password = new_password
self.__database.save( user )
# if all the new passwords provided are blank, bail
if not users_to_reset:
raise Password_reset_error( u"Please enter a new password. Or, if you already know your password, just click the login link above." )
password_reset.redeemed = True
self.__database.save( password_reset )
yield dict( redirect = u"/" )
scheduler = property( lambda self: self.__scheduler )

View File

@ -0,0 +1,29 @@
class Stub_smtp( object ):
"""
A stub intended to replace smtplib.SMTP for unit testing code that depends on it.
"""
connected = False
from_address = None
to_addresses = None
message = None
def connect( self ):
Stub_smtp.connected = True
def sendmail( self, from_address, to_addresses, message ):
if not Stub_smtp.connected:
raise Exception( "not connected to the server" )
Stub_smtp.from_address = from_address
Stub_smtp.to_addresses = to_addresses
Stub_smtp.message = message
def quit( self ):
Stub_smtp.connected = False
@staticmethod
def reset():
Stub_smtp.connected = False
Stub_smtp.from_address = None
Stub_smtp.to_addresses = None
Stub_smtp.message = None

View File

@ -19,14 +19,15 @@ class Test_controller( object ):
u"luminotes.https_url" : u"https://luminotes.com",
u"luminotes.http_proxy_ip" : u"127.0.0.1",
u"luminotes.https_proxy_ip" : u"127.0.0.2",
"luminotes.rate_plans": [
u"luminotes.support_email": "unittest@luminotes.com",
u"luminotes.rate_plans": [
{
"name": "super",
"storage_quota_bytes": 1337,
u"name": u"super",
u"storage_quota_bytes": 1337,
},
{
"name": "extra super",
"storage_quota_bytes": 31337,
u"name": "extra super",
u"storage_quota_bytes": 31337,
},
],
},

View File

@ -155,6 +155,29 @@ class Test_notebooks( Test_controller ):
assert note.object_id == self.note.object_id
assert self.user.storage_bytes == 0
def test_contents_with_blank_note( self ):
self.login()
result = self.http_get(
"/notebooks/contents?notebook_id=%s&note_id=blank" % self.notebook.object_id ,
session_id = self.session_id,
)
notebook = result[ "notebook" ]
startup_notes = result[ "startup_notes" ]
assert notebook.object_id == self.notebook.object_id
assert len( startup_notes ) == 1
assert startup_notes[ 0 ] == self.note
note = result[ "note" ]
assert note.object_id == u"blank"
assert note.contents == None
assert note.title == None
assert note.deleted_from == None
assert self.user.storage_bytes == 0
def test_contents_without_login( self ):
result = self.http_get(
"/notebooks/contents?notebook_id=%s" % self.notebook.object_id,

View File

@ -80,3 +80,9 @@ class Test_root( Test_controller ):
login_button = u"login",
) )
self.session_id = result[ u"session_id" ]
def test_redeem_reset( self ):
redeem_reset_id = u"foobarbaz"
result = self.http_get( "/%s" % redeem_reset_id )
assert result[ u"redirect" ] == u"/users/redeem_reset/%s" % redeem_reset_id

View File

@ -1,12 +1,20 @@
import re
import cherrypy
import smtplib
from datetime import datetime, timedelta
from nose.tools import raises
from Test_controller import Test_controller
from Stub_smtp import Stub_smtp
from controller.Scheduler import Scheduler
from model.User import User
from model.Notebook import Notebook
from model.Note import Note
from model.User_list import User_list
class Test_users( Test_controller ):
RESET_LINK_PATTERN = re.compile( "(https?://\S+)?/(\S+)" )
def setUp( self ):
Test_controller.setUp( self )
@ -16,7 +24,11 @@ class Test_users( Test_controller ):
self.new_username = u"reynolds"
self.new_password = u"shiny"
self.new_email_address = u"capn@example.com"
self.username2 = u"scully"
self.password2 = u"trustsome1"
self.email_address2 = u"outthere@example.com"
self.user = None
self.user2 = None
self.anonymous = None
self.notebooks = None
@ -45,10 +57,21 @@ class Test_users( Test_controller ):
self.database.next_id( self.scheduler.thread )
self.user = User( ( yield Scheduler.SLEEP ), self.username, self.password, self.email_address, self.notebooks )
self.database.next_id( self.scheduler.thread )
self.user2 = User( ( yield Scheduler.SLEEP ), self.username2, self.password2, self.email_address2 )
self.database.next_id( self.scheduler.thread )
self.anonymous = User( ( yield Scheduler.SLEEP ), u"anonymous", None, None, [ self.anon_notebook ] )
self.database.next_id( self.scheduler.thread )
user_list_id = ( yield Scheduler.SLEEP )
user_list = User_list( user_list_id, u"all" )
user_list.add_user( self.user )
user_list.add_user( self.user2 )
user_list.add_user( self.anonymous )
self.database.save( self.user )
self.database.save( self.user2 )
self.database.save( self.anonymous )
self.database.save( user_list )
def test_signup( self ):
result = self.http_post( "/users/signup", dict(
@ -264,3 +287,427 @@ class Test_users( Test_controller ):
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
def test_send_reset( self ):
# trick send_reset() into using a fake SMTP server
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
result = self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
session_id = result[ u"session_id" ]
assert u"has been sent to" in result[ u"error" ]
assert smtplib.SMTP.connected == False
assert "<%s>" % self.settings[ u"global" ][ u"luminotes.support_email" ] in smtplib.SMTP.from_address
assert smtplib.SMTP.to_addresses == [ self.user.email_address ]
assert u"password reset" in smtplib.SMTP.message
assert self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
def test_send_reset_to_unknown_email_address( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
result = self.http_post( "/users/send_reset", dict(
email_address = u"unknown@example.com",
send_reset_button = u"email me",
) )
assert u"no Luminotes user" in result[ u"error" ]
assert smtplib.SMTP.connected == False
assert smtplib.SMTP.from_address == None
assert smtplib.SMTP.to_addresses == None
assert smtplib.SMTP.message == None
def test_redeem_reset( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
result = self.http_get( "/users/redeem_reset/%s" % password_reset_id )
assert result[ u"notebook_id" ] == self.anonymous.notebooks[ 0 ].object_id
assert result[ u"note_id" ]
assert u"password reset" in result[ u"note_contents" ]
assert self.user.username in result[ u"note_contents" ]
assert self.user2.username in result[ u"note_contents" ]
def test_redeem_reset_unknown( self ):
password_reset_id = u"unknownresetid"
result = self.http_get( "/users/redeem_reset/%s" % password_reset_id )
assert u"expired" in result[ u"error" ]
def test_redeem_reset_expired( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
# to trigger expiration, pretend that the password reset was made 25 hours ago
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
password_reset._Persistent__revision = datetime.now() - timedelta( hours = 25 )
self.database.save( password_reset )
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
result = self.http_get( "/users/redeem_reset/%s" % password_reset_id )
assert u"expired" in result[ u"error" ]
def test_redeem_reset_already_redeemed( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
password_reset.redeemed = True
self.database.save( password_reset )
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
result = self.http_get( "/users/redeem_reset/%s" % password_reset_id )
assert u"already" in result[ u"error" ]
def test_redeem_reset_unknown_email( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
password_reset._Password_reset__email_address = u"unknown@example.com"
self.database.save( password_reset )
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
result = self.http_get( "/users/redeem_reset/%s" % password_reset_id )
assert u"email address" in result[ u"error" ]
def test_reset_password( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
new_password = u"newpass"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that the password reset is now marked as redeemed
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
assert password_reset.redeemed
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
# check that the password was actually reset for one of the users, but not the other
assert self.user.check_password( new_password )
assert self.user2.check_password( self.password2 )
assert result[ u"redirect" ]
def test_reset_password_unknown_reset_id( self ):
new_password = u"newpass"
password_reset_id = u"unknownresetid"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert u"expired" in result[ "error" ]
def test_reset_password_invalid_reset_id( self ):
new_password = u"newpass"
password_reset_id = u"invalid reset id"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert u"valid" in result[ "error" ]
def test_reset_password_expired( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
# to trigger expiration, pretend that the password reset was made 25 hours ago
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
password_reset._Persistent__revision = datetime.now() - timedelta( hours = 25 )
self.database.save( password_reset )
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
new_password = u"newpass"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that the password reset is not marked as redeemed
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
assert password_reset.redeemed == False
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert u"expired" in result[ "error" ]
def test_reset_password_expired( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
password_reset.redeemed = True
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
new_password = u"newpass"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert u"already" in result[ "error" ]
def test_reset_password_unknown_user_id( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
new_password = u"newpass"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( u"unknown", u"foo" ),
( u"unknown", u"foo" ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert result[ "error" ]
def test_reset_password_non_matching( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
new_password = u"newpass"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, u"nonmatchingpass" ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert result[ "error" ]
def test_reset_password_blank( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, u"" ),
( self.user.object_id, u"" ),
( self.user2.object_id, u"" ),
( self.user2.object_id, u"" ),
) )
# check that neither user's password has changed
assert self.user.check_password( self.password )
assert self.user2.check_password( self.password2 )
assert result[ "error" ]
def test_reset_password_multiple_users( self ):
Stub_smtp.reset()
smtplib.SMTP = Stub_smtp
self.http_post( "/users/send_reset", dict(
email_address = self.user.email_address,
send_reset_button = u"email me",
) )
matches = self.RESET_LINK_PATTERN.search( smtplib.SMTP.message )
password_reset_id = matches.group( 2 )
assert password_reset_id
new_password = u"newpass"
new_password2 = u"newpass2"
result = self.http_post( "/users/reset_password", (
( u"password_reset_id", password_reset_id ),
( u"reset_button", u"reset passwords" ),
( self.user.object_id, new_password ),
( self.user.object_id, new_password ),
( self.user2.object_id, new_password2 ),
( self.user2.object_id, new_password2 ),
) )
# check that the password reset is now marked as redeemed
def gen():
self.database.load( password_reset_id, self.scheduler.thread )
password_reset = ( yield Scheduler.SLEEP )
assert password_reset.redeemed
g = gen()
self.scheduler.add( g )
self.scheduler.wait_for( g )
# check that the password was actually reset for both users
assert self.user.check_password( new_password )
assert self.user2.check_password( new_password2 )
assert result[ u"redirect" ]

View File

@ -38,6 +38,10 @@ class Note( Persistent ):
self.update_revision()
self.__contents = contents
if contents is None:
self.__title = None
return
# parse title out of the beginning of the contents
result = Note.TITLE_PATTERN.search( contents )

29
model/Password_reset.py Normal file
View File

@ -0,0 +1,29 @@
from Persistent import Persistent
class Password_reset( Persistent ):
"""
A request for a password reset.
"""
def __init__( self, id, email_address ):
"""
Create a password reset request with the given id.
@type id: unicode
@param id: id of the password reset
@type email_address: unicode
@param email_address: where the reset confirmation was emailed
@rtype: Password_reset
@return: newly constructed password reset
"""
Persistent.__init__( self, id )
self.__email_address = email_address
self.__redeemed = False
def __set_redeemed( self, redeemed ):
if redeemed != self.__redeemed:
self.update_revision()
self.__redeemed = redeemed
email_address = property( lambda self: self.__email_address )
redeemed = property( lambda self: self.__redeemed, __set_redeemed )

48
model/User_list.py Normal file
View File

@ -0,0 +1,48 @@
from copy import copy
from Persistent import Persistent
class User_list( Persistent ):
"""
A list of users.
"""
def __init__( self, id, secondary_id = None ):
"""
Create a list of users, and give the list the provided id.
@type id: unicode
@param id: id of the user list
@type secondary_id: unicode or NoneType
@param secondary_id: convenience id for easy access (optional)
@rtype: User_list
@return: newly constructed user list
"""
Persistent.__init__( self, id, secondary_id )
self.__users = []
def add_user( self, user ):
"""
Add a user to this list.
@type user: User
@param user: user to add
"""
if user.object_id not in [ u.object_id for u in self.__users ]:
self.update_revision()
self.__users.append( user )
def remove_user( self, user ):
"""
Remove a user from this list.
@type user: User
@param user: user to remove
"""
if user in self.__users:
self.update_revision()
self.__users.remove( user )
def __set_users( self, users ):
self.__users = users
users = property( lambda self: copy( self.__users ), __set_users )

View File

@ -15,6 +15,15 @@ class Test_note( object ):
assert self.note.title == self.title
assert self.note.deleted_from == None
def test_create_blank( self ):
object_id = u"22"
blank_note = Note( object_id )
assert blank_note.object_id == object_id
assert blank_note.contents == None
assert blank_note.title == None
assert blank_note.deleted_from == None
def test_set_contents( self ):
new_title = u"new title"
new_contents = u"<h3>%s</h3>new blah" % new_title

View File

@ -0,0 +1,38 @@
from model.User import User
from model.Password_reset import Password_reset
class Test_password_reset( object ):
def setUp( self ):
self.object_id = u"17"
self.email_address = u"bob@example.com"
self.password_reset = Password_reset( self.object_id, self.email_address )
def test_create( self ):
assert self.password_reset.object_id == self.object_id
assert self.password_reset.email_address == self.email_address
assert self.password_reset.redeemed == False
def test_redeem( self ):
previous_revision = self.password_reset.revision
self.password_reset.redeemed = True
assert self.password_reset.redeemed == True
assert self.password_reset.revision > previous_revision
def test_redeem_twice( self ):
self.password_reset.redeemed = True
current_revision = self.password_reset.revision
self.password_reset.redeemed = True
assert self.password_reset.redeemed == True
assert self.password_reset.revision == current_revision
def test_unredeem( self ):
self.password_reset.redeemed = True
previous_revision = self.password_reset.revision
self.password_reset.redeemed = False
assert self.password_reset.redeemed == False
assert self.password_reset.revision > previous_revision

View File

@ -0,0 +1,57 @@
from model.User import User
from model.User_list import User_list
class Test_user_list( object ):
def setUp( self ):
self.object_id = u"17"
self.secondary_id = u"mylist"
self.user_list = User_list( self.object_id, self.secondary_id )
self.user = User( u"18", u"bob", u"pass", u"bob@example.com" )
self.user2 = User( u"19", u"rob", u"pass2", u"rob@example.com" )
def test_create( self ):
assert self.user_list.object_id == self.object_id
assert self.user_list.secondary_id == self.secondary_id
assert self.user_list.users == []
def test_add_user( self ):
previous_revision = self.user_list.revision
self.user_list.add_user( self.user )
assert self.user_list.users == [ self.user ]
assert self.user_list.revision > previous_revision
def test_add_user_twice( self ):
self.user_list.add_user( self.user )
current_revision = self.user_list.revision
self.user_list.add_user( self.user )
assert self.user_list.users == [ self.user ]
assert self.user_list.revision == current_revision
def test_add_two_users( self ):
previous_revision = self.user_list.revision
self.user_list.add_user( self.user )
self.user_list.add_user( self.user2 )
assert self.user_list.users == [ self.user, self.user2 ]
assert self.user_list.revision > previous_revision
def test_remove_user( self ):
self.user_list.add_user( self.user )
previous_revision = self.user_list.revision
self.user_list.remove_user( self.user )
assert self.user_list.users == []
assert self.user_list.revision > previous_revision
def test_remove_user_twice( self ):
self.user_list.add_user( self.user )
self.user_list.remove_user( self.user )
current_revision = self.user_list.revision
self.user_list.remove_user( self.user )
assert self.user_list.users == []
assert self.user_list.revision == current_revision

View File

@ -153,6 +153,22 @@ Editor.prototype.finish_init = function () {
} );
}
var send_reset_button = withDocument( this.document, function () { return getElement( "send_reset_button" ); } );
if ( send_reset_button ) {
var send_reset_form = withDocument( this.document, function () { return getElement( "send_reset_form" ); } );
connect( send_reset_button, "onclick", function ( event ) {
signal( self, "submit_form", "/users/send_reset", send_reset_form ); event.stop();
} );
}
var reset_button = withDocument( this.document, function () { return getElement( "reset_button" ); } );
if ( reset_button ) {
var reset_form = withDocument( this.document, function () { return getElement( "reset_form" ); } );
connect( reset_button, "onclick", function ( event ) {
signal( self, "submit_form", "/users/reset_password", reset_form ); event.stop();
} );
}
if ( this.iframe.contentDocument ) { // browsers such as Firefox
if ( this.read_write ) this.exec_command( "styleWithCSS", false );
this.resize();

View File

@ -244,7 +244,13 @@ Wiki.prototype.populate = function ( result ) {
var read_write = this.read_write;
if ( getElement( "revision" ).value ) read_write = false;
if ( result.note )
this.create_editor( result.note.object_id, result.note.contents, result.note.deleted_from, result.note.revisions_list, undefined, read_write, false, true );
this.create_editor(
result.note.object_id,
result.note.contents || getElement( "note_contents" ).value,
result.note.deleted_from,
result.note.revisions_list,
undefined, read_write, false, true
);
if ( result.startup_notes.length == 0 && !result.note )
this.display_empty_message();

View File

@ -8,6 +8,7 @@ from model.Notebook import Notebook
from model.Read_only_notebook import Read_only_notebook
from model.Note import Note
from model.User import User
from model.User_list import User_list
class Initializer( object ):
@ -80,6 +81,13 @@ class Initializer( object ):
self.anonymous = User( anonymous_user_id, u"anonymous", None, None, notebooks )
self.database.save( self.anonymous )
# create a user list
self.database.next_id( self.scheduler.thread )
user_list_id = ( yield Scheduler.SLEEP )
user_list = User_list( user_list_id, u"all" )
user_list.add_user( self.anonymous )
self.database.save( user_list )
def main():
print "IMPORTANT: Stop the Luminotes server before running this program."

View File

@ -6,6 +6,7 @@ from config.Common import settings
from controller.Database import Database
from controller.Scheduler import Scheduler
from model.Note import Note
from model.User_list import User_list
from tools.initdb import fix_note_contents
@ -30,6 +31,7 @@ class Initializer( object ):
self.navigation_note_id = navigation_note_id
threads = (
self.create_user_list(),
self.update_main_notebook(),
)
@ -37,6 +39,27 @@ class Initializer( object ):
self.scheduler.add( thread )
self.scheduler.wait_for( thread )
def create_user_list( self ):
# if there's no user list, create one and populate it with all users in the database
self.database.load( u"User_list all", self.scheduler.thread )
user_list = ( yield Scheduler.SLEEP )
if user_list is not None:
return
self.database.next_id( self.scheduler.thread )
user_list_id = ( yield Scheduler.SLEEP )
user_list = User_list( user_list_id, u"all" )
for key in self.database._Database__db.keys():
if not key.startswith( "User " ): continue
self.database.load( key, self.scheduler.thread )
user = ( yield Scheduler.SLEEP )
if user:
user_list.add_user( user )
self.database.save( user_list )
def update_main_notebook( self ):
self.database.load( u"User anonymous", self.scheduler.thread )
anonymous = ( yield Scheduler.SLEEP )

View File

@ -3,9 +3,18 @@ from Tags import Div, H2, P, A, Ul, Li, Strong
class Error_page( Page ):
def __init__( self, support_email ):
title = u"uh oh"
def __init__( self, support_email, message = None ):
if message:
title = u"whoops"
Page.__init__(
self,
H2( title ),
P( message ),
include_js = False,
)
return
title = u"uh oh"
Page.__init__(
self,
title,
@ -33,4 +42,5 @@ class Error_page( Page ):
),
class_ = u"error_box",
),
include_js = False,
)

View File

@ -1,3 +1,4 @@
from cgi import escape
from Page import Page
from Tags import Input, Div, Noscript, H2, H4, A
from Search_form import Search_form
@ -6,8 +7,9 @@ from Toolbar import Toolbar
class Main_page( Page ):
def __init__( self, notebook_id = None, note_id = None, parent_id = None, revision = None ):
def __init__( self, notebook_id = None, note_id = None, parent_id = None, revision = None, note_contents = None ):
title = None
note_contents = note_contents and escape( note_contents, quote = True ) or ""
Page.__init__(
self,
@ -16,6 +18,7 @@ class Main_page( Page ):
Input( type = u"hidden", name = u"note_id", id = u"note_id", value = note_id or "" ),
Input( type = u"hidden", name = u"parent_id", id = u"parent_id", value = parent_id or "" ),
Input( type = u"hidden", name = u"revision", id = u"revision", value = revision or "" ),
Input( type = u"hidden", name = u"note_contents", id = u"note_contents", value = note_contents ),
Div(
id = u"status_area",
),

View File

@ -20,4 +20,5 @@ class Not_found_page( Page ):
),
class_ = u"error_box",
),
include_js = False,
)

View File

@ -10,15 +10,21 @@ class Page( Html ):
if "id" not in attrs:
attrs[ "id" ] = u"content"
if "include_js" in attrs:
include_js = attrs[ "include_js" ]
del attrs[ "include_js" ]
else:
include_js = True
# move certain types of children from the body to the head
Html.__init__(
self,
Head(
Link( rel = u"stylesheet", type = u"text/css", href = u"/static/css/style.css" ),
Script( type = u"text/javascript", src = u"/static/js/MochiKit.js" ),
Script( type = u"text/javascript", src = u"/static/js/Invoker.js" ),
Script( type = u"text/javascript", src = u"/static/js/Editor.js" ),
Script( type = u"text/javascript", src = u"/static/js/Wiki.js" ),
include_js and Script( type = u"text/javascript", src = u"/static/js/MochiKit.js" ) or None,
include_js and Script( type = u"text/javascript", src = u"/static/js/Invoker.js" ) or None,
include_js and Script( type = u"text/javascript", src = u"/static/js/Editor.js" ) or None,
include_js and Script( type = u"text/javascript", src = u"/static/js/Wiki.js" ) or None,
Meta( content = u"text/html; charset=UTF-8", http_equiv = u"content-type" ),
[ child for child in children if type( child ) in head_types ],
Title( title and u"%s: %s" % ( app_name, title ) or app_name ),

50
view/Redeem_reset_note.py Normal file
View File

@ -0,0 +1,50 @@
from Tags import Span, H3, P, Form, Table, Tr, Th, Td, Input, P, Strong
class Redeem_reset_note( Span ):
def __init__( self, password_reset_id, users ):
title = None
Span.__init__(
self,
H3( u"complete your password reset" ),
P(
"""
Below is a list of Luminotes users matching your email address. You can reset
the passwords of any of these users. If you just needed a username reminder and
you already know your password, then click the login link above without performing
a password reset.
"""
),
Form(
Table(
Tr(
Th( u"username" ),
Th( u"new password" ),
Th( u"new password (again)" ),
),
[ Tr(
Td( user.username ),
Td( Input( type = u"password", name = user.object_id, size = 30, maxlength = 30, class_ = u"text_field" ) ),
Td( Input( type = u"password", name = user.object_id, size = 30, maxlength = 30, class_ = u"text_field" ) ),
) for user in users ],
),
P(
Input( type = u"hidden", id = u"password_reset_id", name = u"password_reset_id", value = password_reset_id ),
Input(
type = u"submit",
name = u"reset_button",
id = u"reset_button",
class_ = u"button",
value = ( len( users ) > 1 ) and u"reset passwords" or u"reset password" ),
),
id = "reset_form",
),
P(
Strong( u"tip:" ),
u"""
When you submit this form, you'll be redirected to the front page where you can login with
your new password.
""",
),
)