witten
/
luminotes
Archived
1
0
Fork 0
This repository has been archived on 2023-12-16. You can view files and clone it, but cannot push or open issues or pull requests.
luminotes/controller/Users.py

1762 lines
69 KiB
Python

import re
import urllib
import urllib2
import cherrypy
import smtplib
from email import Message
from pytz import utc
from datetime import datetime, timedelta
from model.User import User
from model.Group import Group
from model.Notebook import Notebook
from model.Note import Note
from model.Password_reset import Password_reset
from model.Download_access import Download_access
from model.Invite import Invite
from model.Tag import Tag
from Expose import expose
from Validate import validate, Valid_string, Valid_bool, Valid_int, Validation_error
from Database import Valid_id, end_transaction
from Expire import strongly_expire
from view.Json import Json
from view.Main_page import Main_page
from view.Redeem_reset_note import Redeem_reset_note
from view.Redeem_invite_note import Redeem_invite_note
from view.Blank_page import Blank_page
from view.Thanks_note import Thanks_note
from view.Thanks_error_note import Thanks_error_note
from view.Thanks_download_note import Thanks_download_note
from view.Thanks_download_error_note import Thanks_download_error_note
from view.Processing_note import Processing_note
from view.Processing_download_note import Processing_download_note
from view.Form_submit_page import Form_submit_page
USERNAME_PATTERN = re.compile( "^[a-zA-Z0-9]+$" )
EMAIL_ADDRESS_PATTERN = re.compile( "^[\w.%+-]+@[\w-]+(\.[\w-]+)+$" )
EMBEDDED_EMAIL_ADDRESS_PATTERN = re.compile( "(?:^|[\s,<])([\w.%+-]+@[\w-]+(?:\.[\w-]+)+)(?:[\s,>]|$)" )
WHITESPACE_OR_COMMA_PATTERN = re.compile( "[\s,]" )
def valid_username( username ):
if USERNAME_PATTERN.search( username ) is None:
raise ValueError()
return username
valid_username.message = u"can only contain letters and digits"
def valid_email_address( email_address ):
if email_address == "" or EMAIL_ADDRESS_PATTERN.search( email_address ) is None:
raise ValueError()
return email_address
class Signup_error( Exception ):
def __init__( self, message ):
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
class Authentication_error( Exception ):
def __init__( self, message ):
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
class Password_reset_error( Exception ):
def __init__( self, message ):
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
class Invite_error( Exception ):
def __init__( self, message ):
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
class Access_error( Exception ):
def __init__( self, message = None ):
if message is None:
message = u"Sorry, you don't have access to do that. Please make sure you're logged in as the correct user."
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
class Payment_error( Exception ):
def __init__( self, message, params ):
message += "\n" + unicode( params )
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
def grab_user_id( function ):
"""
A decorator to grab the current logged in user id from the cherrypy session and pass it as a
user_id argument to the decorated function. This decorator must be used from within the main
cherrypy request thread.
"""
def get_id( *args, **kwargs ):
arg_names = list( function.func_code.co_varnames )
if "user_id" in arg_names:
arg_index = arg_names.index( "user_id" )
args = list( args )
args[ arg_index - 1 ] = cherrypy.session.get( "user_id" )
else:
kwargs[ "user_id" ] = cherrypy.session.get( "user_id" )
try:
return function( *args, **kwargs )
except Access_error:
# if there was an Access_error, and the user isn't logged in, and this is an HTTP GET request,
# redirect to the login page. that is, unless there is an auto-login username
if cherrypy.session.get( "user_id" ) is None and cherrypy.request.method == "GET":
if cherrypy.config.configs[ u"global" ].get( u"luminotes.auto_login_username" ):
raise cherrypy.HTTPRedirect( u"%s/" % cherrypy.request.base )
original_path = cherrypy.request.path + \
( cherrypy.request.query_string and u"?%s" % cherrypy.request.query_string or "" )
raise cherrypy.HTTPRedirect( u"%s/login?after_login=%s" % ( cherrypy.request.base, urllib.quote( original_path ) ) )
else:
raise
return get_id
def update_auth( function ):
"""
Based on the return value of the decorated function, update the current session's authentication
status. This decorator must be used from within the main cherrypy request thread.
If the return value of the decorated function (which is expected to be a dictionary) contains an
"authenticated" key with a User value, then mark the user as logged in. If the return value of the
decorated function contains a "deauthenticated" key with any value, then mark the user as logged
out.
"""
def handle_result( *args, **kwargs ):
result = function( *args, **kwargs )
# peek in the function's return value to see if we should tweak authentication status
user = result.get( "authenticated" )
if user:
result.pop( "authenticated", None )
cherrypy.session[ u"user_id" ] = user.object_id
cherrypy.session[ u"username" ] = user.username
if result.get( "deauthenticated" ):
result.pop( "deauthenticated", None )
cherrypy.session.pop( u"user_id", None )
cherrypy.session.pop( u"username", None )
return result
return handle_result
class Users( object ):
"""
Controller for dealing with users, corresponding to the "/users" URL.
"""
def __init__( self, database, http_url, https_url, support_email, payment_email, rate_plans, download_products ):
"""
Create a new Users object.
@type database: controller.Database
@param database: database that users are stored in
@type http_url: unicode
@param http_url: base URL to use for non-SSL http requests, or an empty string
@type https_url: unicode
@param https_url: base URL to use for SSL http requests, or an empty string
@type support_email: unicode
@param support_email: email address for support requests
@type payment_email: unicode
@param payment_email: email address for payment
@type rate_plans: [ { "name": unicode, ... } ]
@param rate_plans: list of configured rate plans
@type download_products: [ { "name": unicode, ... } ]
@param download_products: list of configured downloadable products
@rtype: Users
@return: newly constructed Users
"""
self.__database = database
self.__http_url = http_url
self.__https_url = https_url
self.__support_email = support_email
self.__payment_email = payment_email
self.__rate_plans = rate_plans
self.__download_products = download_products
def create_user( self, username, password = None, password_repeat = None, email_address = None, initial_rate_plan = None ):
"""
Create a new User based on the given information. Start that user with their own Notebook and a
"welcome to your wiki" Note. This method does not commit the transaction to the database.
@type username: unicode (alphanumeric only)
@param username: username to use for this new user
@type password: unicode or NoneType
@param password: password to use (optional, defaults to None)
@type password_repeat: unicode or NoneType
@param password_repeat: password to use, again (optional, defaults to None)
@type email_address: unicode or NoneType
@param email_address: user's email address (optional, defaults to None)
@type initial_rate_plan: int or NoneType
@param initial_rate_plan: index of rate plan to start the user with before they even subscribe
(defaults to None)
@type user: ( model.User, model.Notebook )
@parm user: ( newly created user, newly created notebook )
@raise Signup_error: passwords don't match or the username is unavailable
@raise Validation_error: the email address is invalid
"""
if password != password_repeat:
raise Signup_error( u"The passwords you entered do not match. Please try again." )
user = self.__database.select_one( User, User.sql_load_by_username( username ) )
if user is not None:
raise Signup_error( u"Sorry, that username is not available. Please try something else." )
if email_address:
try:
email_address = valid_email_address( email_address )
except ValueError:
raise Validation_error( "email_address", email_address, valid_email_address )
# create a notebook for this user, along with a trash for that notebook
trash_id = self.__database.next_id( Notebook, commit = False )
trash = Notebook.create( trash_id, u"trash" )
self.__database.save( trash, commit = False )
notebook_id = self.__database.next_id( Notebook, commit = False )
notebook = Notebook.create( notebook_id, u"my notebook", trash_id )
self.__database.save( notebook, commit = False )
# create a startup note for this user's notebook
note_id = self.__database.next_id( Note, commit = False )
note_contents = file( u"static/html/welcome to your wiki.html" ).read()
note = Note.create( note_id, note_contents, notebook_id, startup = True, rank = 0 )
self.__database.save( note, commit = False )
# actually create the new user
user_id = self.__database.next_id( User, commit = False )
user = User.create( user_id, username, password, email_address, rate_plan = initial_rate_plan )
self.__database.save( user, commit = False )
# record the fact that the new user has access to their new notebook
self.__database.execute( user.sql_save_notebook( notebook_id, read_write = True, owner = True, rank = 0 ), commit = False )
self.__database.execute( user.sql_save_notebook( trash_id, read_write = True, owner = True ), commit = False )
return ( user, notebook )
@expose( view = Json )
@end_transaction
@update_auth
@validate(
username = ( Valid_string( min = 1, max = 30 ), valid_username ),
password = Valid_string( min = 1, max = 30 ),
password_repeat = Valid_string( min = 1, max = 30 ),
email_address = ( Valid_string( min = 0, max = 60 ) ),
signup_button = unicode,
invite_id = Valid_id( none_okay = True ),
rate_plan = Valid_int( none_okay = True ),
yearly = Valid_bool( none_okay = True ),
)
def signup( self, username, password, password_repeat, email_address, signup_button, invite_id = None, rate_plan = None, yearly = False ):
"""
Create a new User based on the given information. For convenience, login the newly created user
as well.
@type username: unicode (alphanumeric only)
@param username: username to use for this new user
@type password: unicode
@param password: password to use
@type password_repeat: unicode
@param password_repeat: password to use, again
@type email_address: unicode
@param email_address: user's email address
@type signup_button: unicode
@param signup_button: ignored
@type invite_id: unicode
@param invite_id: id of invite to redeem upon signup (optional)
@type rate_plan: int
@param rate_plan: index of rate plan to signup for (optional). if greater than zero, redirect
to PayPal subscribe page after signup
@type yearly: bool
@param yearly: True for a yearly rate plan, False for monthly (optional, defaults to False )
@rtype: json dict
@return: { 'redirect': url, 'authenticated': userdict }
@raise Signup_error: passwords don't match or the username is unavailable
@raise Validation_error: one of the arguments is invalid
"""
( user, notebook ) = self.create_user( username, password, password_repeat, email_address )
self.__database.commit()
# if there's an invite_id, then redeem that invite and redirect to the invite's notebook
if invite_id:
invite = self.__database.load( Invite, invite_id )
if not invite:
raise Signup_error( u"The invite is unknown." )
self.convert_invite_to_access( invite, user.object_id )
redirect = u"/notebooks/%s" % invite.notebook_id
# if there's a requested rate plan, then redirect to the PayPal subscribe page
elif rate_plan and rate_plan > 0:
redirect = u"/users/subscribe?rate_plan=%s&yearly=%s" % ( rate_plan, yearly )
# otherwise, just redirect to the newly created notebook
else:
redirect = u"/notebooks/%s" % notebook.object_id
return dict(
redirect = redirect,
authenticated = user,
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
group_id = Valid_id(),
username = ( Valid_string( min = 1, max = 30 ), valid_username ),
password = Valid_string( min = 1, max = 30 ),
password_repeat = Valid_string( min = 1, max = 30 ),
email_address = ( Valid_string( min = 0, max = 60 ) ),
create_user_button = unicode,
user_id = Valid_id( none_okay = True )
)
def signup_group_member( self, group_id, username, password, password_repeat, email_address, create_user_button, user_id ):
"""
Create a new User in a particular group based on the given information. Start that user with
their own Notebook and a "welcome to your wiki" Note. This method is only available to a user
with admin access to the group.
@type group_id: unicode
@param group_id: id of the group to which the new user should be added
@type username: unicode (alphanumeric only)
@param username: username to use for this new user
@type password: unicode
@param password: password to use
@type password_repeat: unicode
@param password_repeat: password to use, again
@type email_address: unicode
@param email_address: user's email address
@type create_user_button: unicode
@param create_user_button: ignored
@type user_id: unicode
@param user_id: id of current logged-in user
@rtype: json dict
@return: { 'message': message }
@raise Signup_error: passwords don't match or the username is unavailable
@raise Validation_error: one of the arguments is invalid
@raise Access_error: the current user doesn't have admin membership to the given group
"""
if not self.check_group( user_id, group_id, admin = True ):
raise Access_error()
user = self.__database.load( User, user_id )
if not user:
raise Access_error()
if user.rate_plan < 0 or user.rate_plan >= len( self.__rate_plans ):
raise Access_error()
plan = self.__rate_plans[ user.rate_plan ]
if not plan.get( u"user_admin" ):
raise Access_error()
# the current user's rate plan has a maximum number of included users. make sure we're not
# exceeding that number
included_users_count = plan.get( u"included_users" )
if not included_users_count:
raise Access_error()
group = self.__database.load( Group, group_id )
if not group:
raise Access_error()
# TODO: once multiple groups per account are supported, this needs to count all users in all
# groups of the current admin user
group_users = self.__database.select_many( User, group.sql_load_users() )
if len( group_users ) >= included_users_count:
raise Signup_error( 'Your current rate plan includes a maximum of %s users. Please upgrade your account for additional users.' % included_users_count )
# create a new user with the same rate plan as the currently logged-in user
( created_user, notebook ) = self.create_user(
username,
password,
password_repeat,
email_address,
initial_rate_plan = user.rate_plan,
)
# add the new user to the group
self.__database.execute( created_user.sql_save_group( group_id, admin = False ), commit = False )
self.__database.commit()
return dict(
message = u"A new group member has been created."
)
@expose( view = Form_submit_page )
@grab_user_id
@validate(
rate_plan = Valid_int(),
yearly = Valid_bool( none_okay = True ),
user_id = Valid_id(),
)
def subscribe( self, rate_plan, yearly = False, user_id = None ):
"""
Submit a subscription form to PayPal, allowing the user to subscribe to the given rate plan.
@type rate_plan: int
@param rate_plan: index of rate plan to subscribe to
@type yearly: bool
@param yearly: True for a yearly rate plan, False for monthly (optional, defaults to False )
@type user_id: unicode
@param user_id: id of current logged-in user
@rtype: dict
@return: { 'form': subscription_form_html }
@raise Signup_error: invalid rate plan, no logged-in user, or missing subscribe button
"""
if rate_plan == 0 or rate_plan >= len( self.__rate_plans ):
raise Signup_error( u"The rate plan is invalid." )
plan = self.__rate_plans[ rate_plan ]
if yearly:
button = plan.get( u"yearly_button" )
else:
button = plan.get( u"button" )
if not button or not button.strip():
raise Signup_error(
u"Sorry, that rate plan is not configured for subscriptions. Please contact %s." % \
( self.__support_email or u"support" )
)
return dict(
form = button % ( user_id, 0 ) # 0 = new subscription, 1 = modify an existing subscription
)
@expose()
@end_transaction
@grab_user_id
@update_auth
def demo( self, user_id = None ):
"""
Create a new guest User for purposes of the demo. Start that user with their own Notebook and
"welcome to your wiki" and "this is a demo" notes. For convenience, login the newly created
user as well.
If the user is already logged in as a guest user when calling this function, then skip
creating a new user and notebook, and just redirect to the guest user's existing notebook.
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'redirect': url, 'authenticated': userdict }
"""
# if the user is already logged in as a guest, then just redirect to their existing demo
# notebook
if user_id:
user = self.__database.load( User, user_id )
first_notebook = self.__database.select_one( Notebook, user.sql_load_notebooks( parents_only = True, undeleted_only = True ) )
if user.username is None and first_notebook:
redirect = u"/notebooks/%s" % first_notebook.object_id
return dict( redirect = redirect )
# create a demo notebook for this user, along with a trash for that notebook
trash_id = self.__database.next_id( Notebook, commit = False )
trash = Notebook.create( trash_id, u"trash" )
self.__database.save( trash, commit = False )
notebook_id = self.__database.next_id( Notebook, commit = False )
notebook = Notebook.create( notebook_id, u"my notebook", trash_id )
self.__database.save( notebook, commit = False )
# create startup notes for this user's notebook
note_id = self.__database.next_id( Note, commit = False )
note_contents = file( u"static/html/this is a demo.html" ).read()
note = Note.create( note_id, note_contents, notebook_id, startup = True, rank = 0 )
self.__database.save( note, commit = False )
note_id = self.__database.next_id( Note, commit = False )
note_contents = file( u"static/html/welcome to your wiki.html" ).read()
note = Note.create( note_id, note_contents, notebook_id, startup = True, rank = 1 )
self.__database.save( note, commit = False )
# actually create the new user
user_id = self.__database.next_id( User, commit = False )
user = User.create( user_id, username = None, password = None, email_address = None )
self.__database.save( user, commit = False )
# record the fact that the new user has access to their new notebook
self.__database.execute( user.sql_save_notebook( notebook_id, read_write = True, owner = True, rank = 0 ), commit = False )
self.__database.execute( user.sql_save_notebook( trash_id, read_write = True, owner = True ), commit = False )
self.__database.commit()
redirect = u"/notebooks/%s" % notebook.object_id
return dict(
redirect = redirect,
authenticated = user,
)
@expose( view = Json )
@end_transaction
@update_auth
@validate(
username = ( Valid_string( min = 1, max = 30 ), valid_username ),
password = Valid_string( min = 1, max = 30 ),
login_button = unicode,
invite_id = Valid_id( none_okay = True ),
after_login = Valid_string( min = 0, max = 1000 ),
)
def login( self, username, password, login_button, invite_id = None, after_login = None ):
"""
Attempt to authenticate the user. If successful, associate the given user with the current
session.
@type username: unicode (alphanumeric only)
@param username: username to login
@type password: unicode
@param password: the user's password
@type invite_id: unicode
@param invite_id: id of invite to redeem upon login (optional)
@type after_login: unicode
@param after_login: URL to redirect to after login (optional, must start with "/")
@rtype: json dict
@return: { 'redirect': url, 'authenticated': userdict }
@raise Authentication_error: invalid username or password
@raise Validation_error: one of the arguments is invalid
"""
user = self.__database.select_one( User, User.sql_load_by_username( username ) )
if user is None or user.check_password( password ) is False:
raise Authentication_error( u"Invalid username or password." )
first_notebook = self.__database.select_one( Notebook, user.sql_load_notebooks( parents_only = True, undeleted_only = True ) )
# if there's an invite_id, then redeem that invite and redirect to the invite's notebook
if invite_id:
invite = self.__database.load( Invite, invite_id )
if not invite:
raise Authentication_error( u"The invite is unknown." )
self.convert_invite_to_access( invite, user.object_id )
redirect = u"/notebooks/%s" % invite.notebook_id
# if there's an after_login URL, redirect to it
elif after_login and after_login.startswith( "/" ):
redirect = after_login
# otherwise, just redirect to the user's first notebook (if any)
elif first_notebook:
redirect = u"/notebooks/%s" % first_notebook.object_id
else:
redirect = u"/"
return dict(
redirect = redirect,
authenticated = user,
)
@expose()
@end_transaction
@update_auth
def logout( self ):
"""
Deauthenticate the user and log them out of their current session.
@rtype: dict
@return: { 'redirect': url, 'deauthenticated': True }
"""
return dict(
redirect = self.__http_url + u"/",
deauthenticated = True,
)
def current( self, user_id ):
"""
Return information on the currently logged-in user. If not logged in, default to the anonymous
user.
@type user_id: unicode
@param user_id: id of current logged-in user (if any)
@rtype: json dict
@return: {
'user': user or None,
'notebooks': notebookslist,
'login_url': url,
'logout_url': url,
'rate_plan': rateplandict,
'groups': groups,
}
@raise Validation_error: one of the arguments is invalid
@raise Access_error: user_id or anonymous user unknown
"""
# if there's no logged-in user, default to the anonymous user
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if user_id:
user = self.__database.load( User, user_id )
else:
user = anonymous
if not user or not anonymous:
raise Access_error()
user.group_storage_bytes = self.calculate_group_storage( user )
login_url = None
if user_id and user_id != anonymous.object_id:
notebooks = self.__database.select_many( Notebook, user.sql_load_notebooks( parents_only = True ) )
groups = self.__database.select_many( Group, user.sql_load_groups() )
# if the user is not logged in, return a login URL
else:
notebooks = self.__database.select_many( Notebook, anonymous.sql_load_notebooks( undeleted_only = True ) )
groups = []
if len( notebooks ) > 0 and notebooks[ 0 ]:
main_notebook = notebooks[ 0 ]
login_note = self.__database.select_one( Note, main_notebook.sql_load_note_by_title( u"login" ) )
if login_note:
login_url = "%s/notebooks/%s?note_id=%s" % ( self.__https_url, main_notebook.object_id, login_note.object_id )
for notebook in notebooks:
notebook.tags = \
self.__database.select_many( Tag, notebook.sql_load_tags( user_id ) ) + \
self.__database.select_many( Tag, notebook.sql_load_tags( anonymous.object_id ) )
return dict(
user = user,
notebooks = notebooks,
login_url = login_url,
logout_url = self.__https_url + u"/users/logout",
rate_plan = ( user.rate_plan < len( self.__rate_plans ) ) and self.__rate_plans[ user.rate_plan ] or {},
groups = groups,
)
def calculate_storage( self, user ):
"""
Calculate total storage utilization for all notes of the given user, including storage for all
past revisions.
@type user: User
@param user: user for which to calculate storage utilization
@rtype: int
@return: total bytes used for storage
"""
return sum( self.__database.select_one( tuple, user.sql_calculate_storage( self.__database.backend ) ), 0 )
def calculate_group_storage( self, user ):
"""
Calculate total storage utilization for all groups that the given user is a member of.
@type user: User
@param user: user for which to calculate storage utilization
@rtype: int
@return: total bytes used for group storage
"""
return sum( self.__database.select_one( tuple, user.sql_calculate_group_storage() ), 0 )
def update_storage( self, user_id, commit = True ):
"""
Calculate and record total storage utilization for the given user.
@type user_id: unicode
@param user_id: id of user for which to calculate storage utilization
@type commit: bool
@param commit: True to automatically commit after the update
@rtype: model.User
@return: object of the user corresponding to user_id
"""
user = self.__database.load( User, user_id )
if user is None:
return None
user.storage_bytes = self.calculate_storage( user )
self.__database.save( user, commit )
return user
def load_notebook( self, user_id, notebook_id, read_write = False, owner = False, note_id = None ):
"""
Determine whether the given user has access to the given notebook, and if so, return that
notebook.
If the notebook.read_write member is READ_WRITE_FOR_OWN_NOTES, and a particular note_id is
given, then make sure that the given note_id is one of the user's own notes.
@type user_id: unicode
@param user_id: id of user whose access to check
@type notebook_id: unicode
@param notebook_id: id of notebook to check access for
@type read_write: boolean
@param read_write: True if the notebook must be READ_WRITE or READ_WRITE_FOR_OWN_NOTES,
False if read-write access is not to be checked (defaults to False)
@type owner: bool
@param owner: True if owner-level access is being checked (defaults to False)
@type note_id: unicode
@param note_id: id of the note in the given notebook that the user is trying to access.
if the notebook is READ_WRITE_FOR_OWN_NOTES, then the given note is checked
to make sure its user_id is the same as the given user_id. for READ_WRITE
and READ_ONLY notebooks, this note_id parameter is ignored
@rtype: Notebook or NoneType
@return: the loaded notebook if the user has access to it, None otherwise
"""
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
user = user_id and self.__database.load( User, user_id ) or anonymous
notebook = None
# first try loading the notebook as the given user (if any)
if user:
notebook = self.__database.select_one( Notebook, user.sql_load_notebooks( notebook_id = notebook_id ) )
# if that doesn't work, try loading the notebook as the anonymous user
if notebook is None:
notebook = self.__database.select_one( Notebook, anonymous.sql_load_notebooks( notebook_id = notebook_id ) )
# if the user has no access to this notebook, bail
if notebook is None:
return None
if read_write and notebook.read_write == Notebook.READ_ONLY:
return None
if owner and not notebook.owner:
return None
# if a particular note_id is given, and the notebook is READ_WRITE_FOR_OWN_NOTES, then check
# that the user is associated with that note (if the note exists). this prevents a user
# from modifying someone else's note in a READ_WRITE_FOR_OWN_NOTES notebook
if note_id and notebook.read_write == Notebook.READ_WRITE_FOR_OWN_NOTES:
note = self.__database.load( Note, note_id )
if note and (
( note.user_id and user_id != note.user_id ) or
( note.notebook_id and notebook_id != note.notebook_id )
):
return None
# also, prevent anonymous/demo read-write or owner access to READ_WRITE_FOR_OWN_NOTES notebooks
if notebook.read_write == Notebook.READ_WRITE_FOR_OWN_NOTES and \
( read_write is True or owner is True ) and \
( user is None or user.username is None or user.username == u"anonymous" ):
return None
return notebook
def check_group( self, user_id, group_id, admin = False ):
"""
Determine whether the given user has membership to the given group.
@type user_id: unicode
@param user_id: id of user whose membership to check
@type group_id: unicode
@param group_id: id of group to check membership in
@type admin: bool
@param admin: True if admin-level membership is being checked (defaults to False)
@rtype: bool
@return: True if the user has membership
"""
# check if the given user has access to this notebook
user = self.__database.load( User, user_id )
if user and self.__database.select_one( bool, user.sql_in_group( group_id, admin ) ):
return True
return False
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
user_id_to_remove = Valid_id(),
group_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def remove_group( self, user_id_to_remove, group_id, user_id = None ):
"""
Remove a user's membership from the given group. For now, this also sets them to the lowest
rate plan.
@type user_id_to_remove: unicode
@param user_id_to_remove: id of the user to remove from the group
@type group_id: unicode
@param group_id: id of the group to remove membership from
@type user_id: unicode
@param user_id: id of current logged-in user (if any)
@raise Validation_error: one of the arguments is invalid
@raise Access_error: the current user doesn't have admin membership to the given group
"""
if not self.check_group( user_id, group_id, admin = True ):
raise Access_error()
user = self.__database.load( User, user_id_to_remove )
if not user:
raise Access_error()
self.__database.execute( user.sql_remove_group( group_id ) )
# setting the user's rate plan to 0 upon group removal prevents a group admin from creating
# an unlimited number of users with high-end rate plans
user.rate_plan = 0
self.__database.save( user )
return dict(
message = u"Group membership for %s has been revoked." % user.username,
)
@expose( view = Json )
@end_transaction
@validate(
email_address = ( Valid_string( min = 1, max = 60 ), valid_email_address ),
send_reset_button = unicode,
)
def send_reset( self, email_address, send_reset_button ):
"""
Send a password reset email to the given email address.
@type email_address: unicode
@param email_address: an existing user's email address
@type send_reset_button: unicode
@param send_reset_button: ignored
@rtype: json dict
@return: { 'message': message }
@raise Password_reset_error: an error occured when sending the password reset email
@raise Validation_error: one of the arguments is invalid
"""
# check whether there are actually any users with the given email address
users = self.__database.select_many( User, User.sql_load_by_email_address( email_address ) )
if len( users ) == 0:
raise Password_reset_error( u"There are no Luminotes users with the email address %s" % email_address )
# record the sending of this reset email
password_reset_id = self.__database.next_id( Password_reset, commit = False )
password_reset = Password_reset.create( password_reset_id, email_address )
self.__database.save( password_reset )
# create an email message with a unique link
message = Message.Message()
message[ u"From" ] = u"Luminotes support <%s>" % self.__support_email
message[ u"To" ] = email_address
message[ u"Subject" ] = u"Luminotes password reset"
message.set_payload(
u"Someone has requested a password reset for a Luminotes user with your email\n" +
u"address. If this someone is you, please visit the following link for a\n" +
u"username reminder or a password reset:\n\n" +
u"%s/r/%s\n\n" % ( self.__https_url or self.__http_url, password_reset.object_id ) +
u"This link will expire in 24 hours.\n\n" +
u"Thanks!"
)
# send the message out through localhost's smtp server
server = smtplib.SMTP()
server.connect()
server.sendmail( message[ u"From" ], [ email_address ], message.as_string() )
server.quit()
return dict(
message = u"Please check your inbox. A password reset email has been sent to %s" % email_address,
)
@expose( view = Main_page )
@strongly_expire
@end_transaction
@validate(
password_reset_id = Valid_id(),
)
def redeem_reset( self, password_reset_id ):
"""
Provide the information necessary to display the web site's main page along with a dynamically
generated "complete your password reset" note.
@type password_reset_id: unicode
@param password_reset_id: id of model.Password_reset to redeem
@rtype: unicode
@return: rendered HTML page
@raise Password_reset_error: an error occured when redeeming the password reset, such as an expired link
@raise Validation_error: one of the arguments is invalid
"""
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if anonymous:
main_notebook = self.__database.select_one( Notebook, anonymous.sql_load_notebooks( undeleted_only = True ) )
if not anonymous or not main_notebook:
raise Password_reset_error( "There was an error when completing your password reset. Please contact %s." % self.__support_email )
password_reset = self.__database.load( Password_reset, password_reset_id )
if not password_reset or datetime.now( tz = utc ) - password_reset.revision > timedelta( hours = 25 ):
raise Password_reset_error( "Your password reset link has expired. Please request a new password reset email." )
if password_reset.redeemed:
raise Password_reset_error( "Your password has already been reset. Please request a new password reset email." )
# find the user(s) with the email address from the password reset request
matching_users = self.__database.select_many( User, User.sql_load_by_email_address( password_reset.email_address ) )
if len( matching_users ) == 0:
raise Password_reset_error( u"There are no Luminotes users with the email address %s" % password_reset.email_address )
result = self.current( anonymous.object_id )
result[ "notebook" ] = main_notebook
result[ "startup_notes" ] = self.__database.select_many( Note, main_notebook.sql_load_startup_notes() )
result[ "total_notes_count" ] = self.__database.select_one( Note, main_notebook.sql_count_notes(), use_cache = True )
result[ "note_read_write" ] = False
result[ "notes" ] = [ Note.create(
object_id = u"password_reset",
contents = unicode( Redeem_reset_note( password_reset_id, matching_users ) ),
notebook_id = main_notebook.object_id,
) ]
result[ "invites" ] = []
return result
@expose( view = Json )
@end_transaction
def reset_password( self, password_reset_id, reset_button, **new_passwords ):
"""
Reset all the users with the provided passwords.
@type password_reset_id: unicode
@param password_reset_id: id of model.Password_reset to use
@type reset_button: unicode
@param reset_button: return
@type new_passwords: { userid: [ newpassword, newpasswordrepeat ] }
@param new_passwords: map of user id to new passwords or empty strings
@rtype: json dict
@return: { 'redirect': '/' }
@raise Password_reset_error: an error occured when resetting the passwords, such as an expired link
"""
try:
id_validator = Valid_id()
id_validator( password_reset_id )
except ValueError:
raise Validation_error( "password_reset_id", password_reset_id, id_validator, "is not a valid id" )
password_reset = self.__database.load( Password_reset, password_reset_id )
if not password_reset or datetime.now( tz = utc ) - password_reset.revision > timedelta( hours = 25 ):
raise Password_reset_error( "Your password reset link has expired. Please request a new password reset email." )
if password_reset.redeemed:
raise Password_reset_error( "Your password has already been reset. Please request a new password reset email." )
matching_users = self.__database.select_many( User, User.sql_load_by_email_address( password_reset.email_address ) )
allowed_user_ids = [ user.object_id for user in matching_users ]
# reset any passwords that are non-blank
at_least_one_reset = False
for ( user_id, ( new_password, new_password_repeat ) ) in new_passwords.items():
if user_id not in allowed_user_ids:
raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email )
# skip blank passwords
if new_password == u"" and new_password_repeat == u"":
continue
user = self.__database.load( User, user_id )
if not user:
raise Password_reset_error( "There was an error when resetting your password. Please contact %s." % self.__support_email )
# ensure the passwords match
if new_password != new_password_repeat:
raise Password_reset_error( u"The new passwords you entered for user %s do not match. Please try again." % user.username )
# ensure the new password isn't too long
if len( new_password ) > 30:
raise Password_reset_error( u"Your password can be no longer than 30 characters." )
at_least_one_reset = True
user.password = new_password
self.__database.save( user, commit = False )
# if all the new passwords provided are blank, bail
if not at_least_one_reset:
raise Password_reset_error( u"Please enter a new password. Or, if you already know your password, just click the login link above." )
password_reset.redeemed = True
self.__database.save( password_reset, commit = False )
self.__database.commit()
return dict( redirect = u"/" )
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
email_addresses = unicode,
access = Valid_string(),
invite_button = unicode,
user_id = Valid_id( none_okay = True ),
)
def send_invites( self, notebook_id, email_addresses, access, invite_button, user_id = None ):
"""
Send notebook invitations to the given email addresses.
@type notebook_id: unicode
@param notebook_id: id of the notebook that the invitation is for
@type email_addresses: unicode
@param email_addresses: a string containing whitespace- or comma-separated email addresses
@type access: unicode
@param access: type of access to grant, either "collaborator", "viewer", or "owner". with
certain rate plans, only "viewer" is allowed
@type invite_button: unicode
@param invite_button: ignored
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'message': message, 'invites': invites }
@raise Invite_error: an error occured when sending the invite
@raise Validation_error: one of the arguments is invalid
@raise Access_error: user_id doesn't have owner-level notebook access to send an invite or
doesn't have a rate plan supporting notebook collaboration
"""
if len( email_addresses ) < 5:
raise Invite_error( u"Please enter at least one valid email address." )
if len( email_addresses ) > 5000:
raise Invite_error( u"Please enter fewer email addresses." )
notebook = self.load_notebook( user_id, notebook_id, read_write = True, owner = True )
if not notebook:
raise Access_error()
# except for viewer-only invites, this feature requires a rate plan above basic
user = self.__database.load( User, user_id )
plan = self.__rate_plans[ user.rate_plan ]
if user is None or user.username is None or ( plan[ u"notebook_collaboration" ] != True and access != u"viewer" ):
raise Access_error()
if access == u"collaborator":
read_write = True
owner = False
elif access == u"viewer":
read_write = False
owner = False
elif access == u"owner":
read_write = True
owner = True
else:
raise Access_error()
# parse email_addresses string into individual email addresses
email_addresses_list = set()
for piece in WHITESPACE_OR_COMMA_PATTERN.split( email_addresses ):
for match in EMBEDDED_EMAIL_ADDRESS_PATTERN.finditer( piece ):
email_addresses_list.add( match.groups( 0 )[ 0 ] )
email_count = len( email_addresses_list )
if email_count == 0:
raise Invite_error( u"Please enter at least one valid email address." )
import smtplib
from email import Message, Charset
for email_address in email_addresses_list:
# record the sending of this invite email
invite_id = self.__database.next_id( Invite, commit = False )
invite = Invite.create( invite_id, user_id, notebook_id, email_address, read_write, owner )
self.__database.save( invite, commit = False )
# update any invitations for this notebook already sent to the same email address
similar_invites = self.__database.select_many( Invite, invite.sql_load_similar() )
for similar in similar_invites:
similar.read_write = read_write
similar.owner = owner
self.__database.save( similar, commit = False )
# if the invite is already redeemed, then update the relevant entry in the user_notebook
# access table as well
if similar.redeemed_user_id is not None:
redeemed_user = self.__database.load( User, similar.redeemed_user_id )
if redeemed_user:
self.__database.execute( redeemed_user.sql_update_access( notebook_id, read_write, owner ) )
notebook = self.__database.load( Notebook, notebook_id )
if notebook:
self.__database.execute( redeemed_user.sql_update_access( notebook.trash_id, read_write, owner ) )
# create an email message with a unique invitation link
notebook_name = notebook.name.strip().replace( "\n", " " ).replace( "\r", " " )
message = Message.Message()
message[ u"From" ] = user.email_address or u"Luminotes personal wiki <%s>" % self.__support_email
if user.email_address:
message[ u"Sender" ] = u"Luminotes personal wiki <%s>" % self.__support_email
message[ u"To" ] = email_address
message[ u"Subject" ] = notebook_name
payload = \
u"I've shared a wiki with you called \"%s\".\n" % notebook_name + \
u"Please visit the following link to view it online:\n\n" + \
u"%s/i/%s\n\n" % ( self.__https_url or self.__http_url, invite.object_id )
# try representing the payload as plain 7-bit ASCII for greatest compatibility
try:
str( notebook_name )
message.set_payload( payload )
# if that doesn't work, encode the payload as UTF-8 instead
except UnicodeEncodeError:
message.set_payload( payload.encode( "utf-8" ) )
charset = Charset.Charset( "utf-8" )
charset.body_encoding = Charset.QP
message.set_charset( charset )
# send the message out through localhost's smtp server
server = smtplib.SMTP()
server.connect()
server.sendmail( message[ u"From" ], [ email_address ], message.as_string() )
server.quit()
self.__database.commit()
invites = self.__database.select_many( Invite, Invite.sql_load_notebook_invites( notebook_id ) )
if email_count == 1:
return dict(
message = u"An invitation has been sent. The person you invited will receive an invite link (shown below) by email. Feel free to copy and paste the invite link to them yourself.",
invites = invites,
)
else:
return dict(
message = u"%s invitations have been sent. The people you invited will each receive an invite link (shown below) by email. Feel free to copy and paste the invite links to them yourself." % email_count,
invites = invites,
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
invite_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def revoke_invite( self, notebook_id, invite_id, user_id = None ):
"""
Revoke the invite's access to the given notebook.
@type notebook_id: unicode
@param notebook_id: id of the notebook that the invitation is for
@type invite_id: unicode
@param invite_id: id of the invite to revoke
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'message': message, 'invites': invites }
@raise Validation_error: one of the arguments is invalid
@raise Access_error: user_id doesn't have owner-level notebook access to revoke an invite
"""
notebook = self.load_notebook( user_id, notebook_id, read_write = True, owner = True )
if not notebook:
raise Access_error()
invite = self.__database.load( Invite, invite_id )
if not invite or not invite.email_address or invite.notebook_id != notebook_id:
raise Access_error()
self.__database.execute(
User.sql_revoke_invite_access( notebook_id, notebook.trash_id, invite.email_address ),
commit = False,
)
self.__database.execute( invite.sql_revoke_invites(), commit = False )
self.__database.commit()
invites = self.__database.select_many( Invite, Invite.sql_load_notebook_invites( notebook_id ) )
return dict(
message = u"Notebook access for %s has been revoked." % invite.email_address,
invites = invites,
)
@expose( view = Main_page )
@end_transaction
@grab_user_id
@validate(
invite_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def redeem_invite( self, invite_id, user_id = None ):
"""
Begin the process of redeeming a notebook invite.
@type invite_id: unicode
@param invite_id: id of invite to redeem
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: unicode
@return: rendered HTML page
@raise Validation_error: one of the arguments is invalid
@raise Invite_error: an error occured when redeeming the invite
"""
invite = self.__database.load( Invite, invite_id )
if not invite:
raise Invite_error( "That invite is unknown. Please make sure that you typed the address correctly." )
if user_id is not None:
# if the user is logged in but the invite is unredeemed, redeem it and redirect to the notebook
if invite.redeemed_user_id is None:
self.convert_invite_to_access( invite, user_id )
return dict( redirect = u"/notebooks/%s" % invite.notebook_id )
# if the user is logged in and has already redeemed this invite, then just redirect to the notebook
if invite.redeemed_user_id == user_id:
return dict( redirect = u"/notebooks/%s" % invite.notebook_id )
else:
raise Invite_error( u"That invite has already been used by someone else." )
if invite.redeemed_user_id:
raise Invite_error( u"That invite has already been used. If you were the one who used it, then simply <a href=\"/login\">login</a> to your account." )
notebook = self.__database.load( Notebook, invite.notebook_id )
if not notebook:
raise Invite_error( "That notebook you've been invited to is unknown." )
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if anonymous:
main_notebook = self.__database.select_one( Notebook, anonymous.sql_load_notebooks( undeleted_only = True ) )
invite_notebook = self.__database.load( Notebook, invite.notebook_id )
if not anonymous or not main_notebook or not invite_notebook:
raise Password_reset_error( "There was an error when redeeming your invite. Please contact %s." % self.__support_email )
# give the user the option to sign up or login in order to redeem the invite
result = self.current( anonymous.object_id )
result[ "notebook" ] = main_notebook
result[ "startup_notes" ] = self.__database.select_many( Note, main_notebook.sql_load_startup_notes() )
result[ "total_notes_count" ] = self.__database.select_one( int, main_notebook.sql_count_notes(), use_cache = True )
result[ "note_read_write" ] = False
result[ "notes" ] = [ Note.create(
object_id = u"redeem_invite",
contents = unicode( Redeem_invite_note( invite, invite_notebook ) ),
notebook_id = main_notebook.object_id,
) ]
result[ "invites" ] = []
return result
def convert_invite_to_access( self, invite, user_id ):
"""
Grant the given user access to the notebook specified in the invite, and mark that invite as
redeemed.
@type invite: model.Invite
@param invite: invite to convert to notebook access
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@raise Invite_error: an error occured when redeeming the invite
"""
# prevent a user from redeeming their own invite
if invite.from_user_id == user_id:
return
user = self.__database.load( User, user_id )
notebook = self.__database.load( Notebook, invite.notebook_id )
if not user or not notebook:
raise Invite_error( "There was an error when redeeming your invite. Please contact %s." % self.__support_email )
# if the user doesn't already have access to this notebook, then grant access
if not self.__database.select_one( bool, user.sql_has_access( notebook.object_id ) ):
rank = self.__database.select_one( float, user.sql_highest_notebook_rank() ) + 1
self.__database.execute( user.sql_save_notebook( notebook.object_id, invite.read_write, invite.owner, rank = rank ), commit = False )
# the same goes for the trash notebook
if not self.__database.select_one( bool, user.sql_has_access( notebook.trash_id ) ):
self.__database.execute( user.sql_save_notebook( notebook.trash_id, invite.read_write, invite.owner ), commit = False )
invite.redeemed_user_id = user_id
self.__database.save( invite, commit = False )
self.__database.commit()
#PAYPAL_URL = u"https://www.sandbox.paypal.com/cgi-bin/webscr"
PAYPAL_URL = u"https://www.paypal.com/cgi-bin/webscr"
@expose( view = Blank_page )
@end_transaction
def paypal_notify( self, **params ):
"""
Notify Luminotes of payments, subscriptions, cancellations, refunds, etc.
This method is responsible for validating the request, POSTing back to
PayPal to make sure the request is valid, and then updating the user's
record in the database with their new rate plan. paypal_notify() is
invoked by PayPal itself.
"""
# check that payment_status is Completed
payment_status = params.get( u"payment_status" )
if payment_status == u"Refunded":
return dict() # for now, ignore refunds and let paypal handle them
if payment_status and payment_status != u"Completed":
raise Payment_error( u"payment_status is not Completed", params )
# TODO: check that txn_id is not a duplicate
# check that receiver_email is mine
if params.get( u"receiver_email" ) != self.__payment_email:
raise Payment_error( u"incorrect receiver_email", params )
# verify mc_currency
if params.get( u"mc_currency" ) != u"USD":
raise Payment_error( u"unsupported mc_currency", params )
# verify item_number
item_number = params.get( u"item_number" )
if item_number == None or item_number == u"":
return dict() # ignore this transaction if there's no item number
try:
int( item_number )
except ValueError:
raise Payment_error( u"invalid item_number", params )
product = None
for potential_product in self.__download_products:
if unicode( item_number ) == potential_product.get( u"item_number" ):
product = potential_product
if product:
self.__paypal_notify_download( params, product, unicode( item_number ) )
else:
plan_index = int( item_number )
try:
rate_plan = self.__rate_plans[ plan_index ]
except IndexError:
raise Payment_error( u"invalid item_number", params )
self.__paypal_notify_subscribe( params, rate_plan, plan_index )
return dict()
TRANSACTION_ID_PATTERN = re.compile( u"^[a-zA-Z0-9]+$" )
@staticmethod
def urlencode( params ):
# unicode-safe wrapper for urllib.urlencode()
if isinstance( params, dict ):
params = params.items()
return urllib.urlencode(
[ ( key, isinstance( value, unicode ) and value.encode( "utf-8" ) or value )
for ( key, value ) in params ]
)
def __paypal_notify_download( self, params, product, item_number ):
# verify that quantity * the expected fee == mc_gross
fee = float( product[ u"fee" ] )
try:
mc_gross = float( params.get( u"mc_gross" ) )
if not mc_gross: raise ValueError()
except ( TypeError, ValueError ):
raise Payment_error( u"invalid mc_gross", params )
try:
quantity = float( params.get( u"quantity" ) )
if not quantity: raise ValueError()
except ( TypeError, ValueError ):
raise Payment_error( u"invalid quantity", params )
if quantity * fee != mc_gross:
raise Payment_error( u"invalid mc_gross", params )
# verify item_name
item_name = params.get( u"item_name" )
if item_name and product[ u"name" ].lower() not in item_name.lower():
raise Payment_error( u"invalid item_name", params )
params[ u"cmd" ] = u"_notify-validate"
encoded_params = self.urlencode( params )
# verify txn_type
txn_type = params.get( u"txn_type" )
if txn_type and txn_type != u"web_accept":
raise Payment_error( u"invalid txn_type", params )
# verify txn_id
txn_id = params.get( u"txn_id" )
if not self.TRANSACTION_ID_PATTERN.search( txn_id ):
raise Payment_error( u"invalid txn_id", params )
# ask paypal to verify the request
request = urllib2.Request( self.PAYPAL_URL )
request.add_header( u"Content-type", u"application/x-www-form-urlencoded" )
request_file = urllib2.urlopen( self.PAYPAL_URL, encoded_params )
result = request_file.read()
if result != u"VERIFIED":
raise Payment_error( result, params )
# update the database with a record of the transaction, thereby giving the user access to the
# download
download_access_id = self.__database.next_id( Download_access, commit = False )
download_access = Download_access.create( download_access_id, item_number, txn_id )
self.__database.save( download_access, commit = False )
self.__database.commit()
# using the reported payer email, send the user an email with a download link
email_address = params.get( u"payer_email" )
if not email_address:
return
# create an email message with a unique invitation link
message = Message.Message()
message[ u"From" ] = u"Luminotes personal wiki <%s>" % self.__support_email
message[ u"To" ] = email_address
message[ u"Subject" ] = u"Luminotes Desktop download"
payload = \
u"Thank you for purchasing Luminotes Desktop!\n\n" + \
u"To download the installer, please follow this link:\n\n" + \
u"%s/d/%s\n\n" % ( self.__https_url or self.__http_url, download_access_id ) + \
u"You can use this link anytime to download Luminotes Desktop or upgrade\n" + \
u"to new versions as they are released. So you should probably keep the\n" + \
u"link around.\n\n" + \
u"If you have any questions, please email support@luminotes.com\n\n" + \
u"Enjoy!"
message.set_payload( payload )
# send the message out through localhost's smtp server
server = smtplib.SMTP()
server.connect()
server.sendmail( message[ u"From" ], [ email_address ], message.as_string() )
server.quit()
def __paypal_notify_subscribe( self, params, rate_plan, plan_index ):
# verify mc_gross
fee = u"%0.2f" % rate_plan[ u"fee" ]
yearly_fee = u"%0.2f" % rate_plan[ u"yearly_fee" ]
mc_gross = params.get( u"mc_gross" )
if mc_gross and mc_gross not in ( fee, yearly_fee ):
raise Payment_error( u"invalid mc_gross", params )
# verify mc_amount1 (free 30-day trial)
mc_amount1 = params.get( u"mc_amount1" )
if mc_amount1 and mc_amount1 != "0.00":
raise Payment_error( u"invalid mc_amount1", params )
# verify mc_amount3 (actual payment)
mc_amount3 = params.get( u"mc_amount3" )
if mc_amount3 and mc_amount3 not in ( fee, yearly_fee ):
raise Payment_error( u"invalid mc_amount3", params )
# verify item_name
item_name = params.get( u"item_name" )
if item_name and item_name.lower() != u"luminotes " + rate_plan[ u"name" ].lower():
raise Payment_error( u"invalid item_name", params )
# verify period1 (free 30-day trial)
period1 = params.get( u"period1" )
if period1 and period1 != "30 D":
raise Payment_error( u"invalid period1", params )
# verify period2 (should not be present)
if params.get( u"period2" ):
raise Payment_error( u"invalid period2", params )
# verify period3
period3 = params.get( u"period3" )
if mc_amount3 == yearly_fee:
if period3 and period3 != u"1 Y": # one-year subscription
raise Payment_error( u"invalid period3", params )
else:
if period3 and period3 != u"1 M": # one-month subscription
raise Payment_error( u"invalid period3", params )
params[ u"cmd" ] = u"_notify-validate"
encoded_params = self.urlencode( params )
# ask paypal to verify the request
request = urllib2.Request( self.PAYPAL_URL )
request.add_header( u"Content-type", u"application/x-www-form-urlencoded" )
request_file = urllib2.urlopen( self.PAYPAL_URL, encoded_params )
result = request_file.read()
if result != u"VERIFIED":
raise Payment_error( result, params )
# update the database based on the type of transaction
txn_type = params.get( u"txn_type" )
user_id = params.get( u"custom", u"" )
try:
user_id = Valid_id()( user_id )
except ValueError:
raise Payment_error( u"invalid custom", params )
user = self.__database.load( User, user_id )
if not user:
raise Payment_error( u"unknown custom", params )
if txn_type in ( u"subscr_signup", u"subscr_modify" ):
if params.get( u"recurring" ) != u"1":
raise Payment_error( u"invalid recurring", params )
user.rate_plan = plan_index
self.__database.save( user, commit = False )
self.update_groups( user )
self.__database.commit()
elif txn_type == u"subscr_cancel":
pass # HACK: for now, ignore cancellations
# user.rate_plan = 0 # return the user to the free account level
# self.__database.save( user, commit = False )
# self.update_groups( user )
# self.__database.commit()
elif txn_type in ( u"subscr_payment", u"subscr_failed", "subscr_eot" ):
pass # for now, ignore payments and let paypal handle them
else:
raise Payment_error( "unknown txn_type", params )
def update_groups( self, user ):
"""
Update a user's group membership as a result of a rate plan change. This method does not commit
the current database transaction.
"""
rate_plan = self.__rate_plans[ user.rate_plan ]
# if the user has a rate plan with admin capabilities
if rate_plan.get( u"user_admin" ) is True:
has_an_admin_group = False
groups = self.__database.select_many( Group, user.sql_load_groups() )
# determine whether the user is the admin of at least one group
for group in groups:
if group.admin is False: continue
has_an_admin_group = True
# set all users in this group to the same rate plan as the admin
group_users = self.__database.select_many( User, group.sql_load_users() )
for group_user in group_users:
group_user.rate_plan = user.rate_plan
self.__database.save( group_user )
# if the user is not an admin of any group, create one for them and make them the admin
if has_an_admin_group is False:
group_id = self.__database.next_id( Group, commit = False )
group = Group.create( group_id, name = u"my group", admin = True )
self.__database.save( group, commit = False )
self.__database.execute( user.sql_save_group( group_id, admin = True ), commit = False )
return
# otherwise, downgrade the user's group admin access to normal group membership
groups = self.__database.select_many( Group, user.sql_load_groups() )
for group in groups:
if group.admin is False: continue
self.__database.execute( user.sql_update_group_admin( group.object_id, admin = False ), commit = False )
# also return all users in this group to the free account level
group_users = self.__database.select_many( User, group.sql_load_users() )
for group_user in group_users:
group_user.rate_plan = 0
self.__database.save( group_user )
@expose( view = Main_page )
@end_transaction
@grab_user_id
def thanks( self, **params ):
"""
Provide the information necessary to display the subscription thanks page.
"""
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if anonymous:
main_notebook = self.__database.select_one( Notebook, anonymous.sql_load_notebooks( undeleted_only = True ) )
else:
main_notebook = None
result = self.current( params.get( u"user_id" ) )
rate_plan = params.get( u"item_number", "" )
try:
rate_plan = int( rate_plan )
except ValueError:
rate_plan = None
retry_count = params.get( u"retry_count", "" )
try:
retry_count = int( retry_count )
except ValueError:
retry_count = None
# if there's no rate plan or we've retried too many times, give up and display an error
RETRY_TIMEOUT = 15
if retry_count > RETRY_TIMEOUT:
note = Thanks_error_note()
# if the rate plan of the subscription matches the user's current rate plan, success
elif rate_plan == result[ u"user" ].rate_plan:
note = Thanks_note( self.__rate_plans[ rate_plan ][ u"name" ].capitalize() )
result[ "conversion" ] = "subscribe_%s" % rate_plan
# if a rate plan is given, display an auto-reloading "processing..." page
elif rate_plan is not None:
note = Processing_note( rate_plan, retry_count )
# otherwise, assume that this is a free trial and default to a generic thanks page
else:
note = Thanks_note()
result[ "notebook" ] = main_notebook
result[ "startup_notes" ] = self.__database.select_many( Note, main_notebook.sql_load_startup_notes() )
result[ "total_notes_count" ] = self.__database.select_one( Note, main_notebook.sql_count_notes(), use_cache = True )
result[ "note_read_write" ] = False
result[ "notes" ] = [ Note.create(
object_id = u"thanks",
contents = unicode( note ),
notebook_id = main_notebook.object_id,
) ]
result[ "invites" ] = []
return result
def rate_plan( self, plan_index ):
return self.__rate_plans[ plan_index ]
@expose( view = Main_page )
@end_transaction
@grab_user_id
def thanks_download( self, **params ):
"""
Provide the information necessary to display the download thanks page, including a product
download link. This information can be accessed with either a tx (transaction id) or a download
access_id.
"""
# if a valid tx is provided, redirect to this page with the corresponding access_id.
# that way, if the user bookmarks the page, they'll bookmark it with the access_id rather
# than the tx
tx = params.get( u"tx" )
if tx:
if not self.TRANSACTION_ID_PATTERN.search( tx ):
raise Payment_error( u"invalid tx", params )
download_access = self.__database.select_one( Download_access, Download_access.sql_load_by_transaction_id( tx ) )
if download_access:
return dict(
redirect = u"/users/thanks_download?access_id=%s" % download_access.object_id
)
download_access_id = params.get( u"access_id" )
download_url = None
item_number = None
if download_access_id:
try:
Valid_id()( download_access_id )
except ValueError:
raise Payment_error( u"invalid access_id", params )
download_access = self.__database.load( Download_access, download_access_id )
if download_access:
download_url = u"%s/files/download_product?access_id=%s" % \
( self.__https_url or self.__http_url, download_access_id )
item_number = download_access.item_number
if not tx and not download_access_id:
raise Payment_error( u"either tx or access_id required", params )
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if anonymous:
main_notebook = self.__database.select_one( Notebook, anonymous.sql_load_notebooks( undeleted_only = True ) )
else:
main_notebook = None
result = self.current( params.get( u"user_id" ) )
retry_count = params.get( u"retry_count", "" )
try:
retry_count = int( retry_count )
except ValueError:
retry_count = None
# if there's no download access or we've retried too many times, give up and display an error
RETRY_TIMEOUT = 15
if download_url is None and retry_count > RETRY_TIMEOUT:
note = Thanks_download_error_note()
# if the rate plan of the subscription matches the user's current rate plan, success
elif download_url:
note = Thanks_download_note( download_url )
result[ "conversion" ] = "download_%s" % item_number
# otherwise, display an auto-reloading "processing..." page
else:
note = Processing_download_note( download_access_id, tx, retry_count )
result[ "notebook" ] = main_notebook
result[ "startup_notes" ] = self.__database.select_many( Note, main_notebook.sql_load_startup_notes() )
result[ "total_notes_count" ] = self.__database.select_one( Note, main_notebook.sql_count_notes(), use_cache = True )
result[ "note_read_write" ] = False
result[ "notes" ] = [ Note.create(
object_id = u"thanks",
contents = unicode( note ),
notebook_id = main_notebook.object_id,
) ]
result[ "invites" ] = []
return result
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
email_address = ( Valid_string( min = 0, max = 60 ) ),
settings_button = unicode,
user_id = Valid_id( none_okay = True ),
)
def update_settings( self, email_address, settings_button, user_id ):
"""
Update the settings for a particular user.
@type email_address: unicode
@param email_address: new email address
@type settings_button: unicode
@param settings_button: ignored
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { "email_address": new_email_address }
@raise Validation_error: one of the arguments is invalid
@raise Access_error: the given user id is unknown
"""
if len( email_address ) > 0:
try:
email_address = valid_email_address( email_address )
except ValueError:
raise Validation_error( "email_address", email_address, valid_email_address )
else:
email_address = None
user = self.__database.load( User, user_id )
if not user:
raise Access_error()
if email_address != user.email_address:
user.email_address = email_address
self.__database.save( user )
return dict(
email_address = email_address,
)