witten
/
luminotes
Archived
1
0
Fork 0
This repository has been archived on 2023-12-16. You can view files and clone it, but cannot push or open issues or pull requests.
luminotes/controller/Notebooks.py

1902 lines
70 KiB
Python

import re
import cgi
import cherrypy
from datetime import datetime
from Expose import expose
from Validate import validate, Valid_string, Validation_error, Valid_bool, Valid_int
from Database import Valid_id, Valid_revision, end_transaction
from Users import grab_user_id, Access_error
from Expire import strongly_expire, weakly_expire
from Html_nuker import Html_nuker
from Html_differ import Html_differ
from Files import Upload_file
from model.Notebook import Notebook
from model.Note import Note
from model.Invite import Invite
from model.User import User
from model.User_revision import User_revision
from model.File import File
from model.Tag import Tag
from view.Main_page import Main_page
from view.Json import Json
from view.Note_tree_area import Note_tree_area
from view.Notebook_rss import Notebook_rss
from view.Updates_rss import Updates_rss
from view.Update_link_page import Update_link_page
class Import_error( Exception ):
def __init__( self, message = None ):
if message is None:
message = u"An error occurred when trying to import your file. Please try a different file, or contact support for help."
Exception.__init__( self, message )
self.__message = message
def to_dict( self ):
return dict(
error = self.__message
)
class Notebooks( object ):
WHITESPACE_PATTERN = re.compile( u"\s+" )
LINK_PATTERN = re.compile( u'<a\s+((?:[^>]+\s)?href="([^"]+)"(?:\s+target="([^"]*)")?[^>]*)>(<img [^>]+>)?([^<]*)</a>', re.IGNORECASE )
FILE_PATTERN = re.compile( u'/files/' )
NEW_FILE_PATTERN = re.compile( u'/files/new' )
EXPORT_FORMAT_PATTERN = re.compile( u"^[a-zA-Z0-9_]+$" )
"""
Controller for dealing with notebooks and their notes, corresponding to the "/notebooks" URL.
"""
def __init__( self, database, users, files, https_url ):
"""
Create a new Notebooks object.
@type database: controller.Database
@param database: database that notebooks are stored in
@type users: controller.Users
@param users: controller for all users, used here for updating storage utilization
@type files: controller.Files
@param files: controller for all uploaded files, used here for deleting files that are no longer
referenced within saved notes
@type https_url: unicode
@param https_url: base URL to use for SSL http requests, or an empty string
@return: newly constructed Notebooks
"""
self.__database = database
self.__users = users
self.__files = files
self.__https_url = https_url
@expose( view = Main_page, rss = Notebook_rss )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
parent_id = Valid_id(),
revision = Valid_revision(),
previous_revision = Valid_revision( none_okay = True ),
rename = Valid_bool(),
deleted_id = Valid_id(),
preview = Valid_string(),
user_id = Valid_id( none_okay = True ),
)
def default( self, notebook_id, note_id = None, parent_id = None, revision = None,
previous_revision = None, rename = False, deleted_id = None, preview = None,
user_id = None ):
"""
Provide the information necessary to display the page for a particular notebook. If a
particular note id is given without a revision, then the most recent version of that note is
displayed.
@type notebook_id: unicode
@param notebook_id: id of the notebook to display
@type note_id: unicode or NoneType
@param note_id: id of single note in this notebook to display (optional)
@type parent_id: unicode or NoneType
@param parent_id: id of parent notebook to this notebook (optional)
@type revision: unicode or NoneType
@param revision: revision timestamp of the provided note (optional)
@type previous_revision: unicode or NoneType
@param previous_revision: older revision timestamp to diff with the given revision (optional)
@type rename: bool or NoneType
@param rename: whether this is a new notebook and should be renamed (optional, defaults to False)
@type deleted_id: unicode or NoneType
@param deleted_id: id of the notebook that was just deleted, if any (optional)
@type preview: unicode
@param preview: type of access with which to preview this notebook, either "collaborator",
"viewer", "owner", or "default" (optional, defaults to "default"). access must
be equal to or lower than user's own access level to this notebook
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype: unicode
@return: rendered HTML page
"""
result = self.__users.current( user_id )
if preview == u"collaborator":
read_write = True
owner = False
result[ u"notebooks" ] = [
notebook for notebook in result[ "notebooks" ] if notebook.object_id == notebook_id
]
if len( result[ u"notebooks" ] ) == 1:
result[ u"notebooks" ][ 0 ].owner = False
elif preview == u"viewer":
read_write = False
owner = False
result[ u"notebooks" ] = [
notebook for notebook in result[ "notebooks" ] if notebook.object_id == notebook_id
]
if len( result[ u"notebooks" ] ) == 1:
result[ u"notebooks" ][ 0 ].read_write = Notebook.READ_ONLY
result[ u"notebooks" ][ 0 ].owner = False
elif preview in ( u"owner", u"default", None ):
read_write = True
owner = True
else:
raise Access_error()
result.update( self.contents( notebook_id, note_id, revision, previous_revision, read_write, owner, user_id ) )
result[ "parent_id" ] = parent_id
if revision:
result[ "note_read_write" ] = False
notebook = result[ u"notebook" ]
# if this is a forum thread notebook, redirect to the forum thread page
forum_tags = [ tag for tag in notebook.tags if tag.name == u"forum" ]
if forum_tags:
forum_name = forum_tags[ 0 ].value
if forum_name == "blog":
redirect = u"/blog/%s" % notebook.friendly_id
else:
redirect = u"/forums/%s/%s" % ( forum_name, notebook_id )
if note_id:
redirect += u"?note_id=%s" % note_id
return dict(
redirect = redirect,
)
if notebook.name != u"Luminotes":
result[ "recent_notes" ] = self.__database.select_many( Note, notebook.sql_load_notes_in_update_order( start = 0, count = 10 ) )
# if the user doesn't have any storage bytes yet, they're a new user, so see what type of
# conversion this is (demo or signup)
if result[ "user" ].username != u"anonymous" and result[ "user" ].storage_bytes == 0:
if u"this is a demo" in [ note.title for note in result[ "startup_notes" ] ]:
result[ "conversion" ] = u"demo"
else:
result[ "conversion" ] = u"signup"
result[ "rename" ] = rename
result[ "deleted_id" ] = deleted_id
return result
def contents( self, notebook_id, note_id = None, revision = None, previous_revision = None,
read_write = True, owner = True, user_id = None ):
"""
Return information about the requested notebook, including its startup notes. Optionally include
a single requested note as well.
@type notebook_id: unicode
@param notebook_id: id of notebook to return
@type note_id: unicode or NoneType
@param note_id: id of single note in this notebook to return (optional)
@type revision: unicode or NoneType
@param revision: revision timestamp of the provided note (optional)
@type previous_revision: unicode or NoneType
@param previous_revision: older revision timestamp to diff with the given revision (optional)
@type read_write: bool or NoneType
@param read_write: whether the notebook should be returned as read-write (optional, defaults to True).
this can only lower access, not elevate it
@type owner: bool or NoneType
@param owner: whether the notebook should be returned as owner-level access (optional, defaults to True).
this can only lower access, not elevate it
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype: dict
@return: {
'notebook': notebook,
'startup_notes': notelist,
'total_notes_count': notecount,
'notes': notelist,
'invites': invitelist
}
@raise Access_error: the current user doesn't have access to the given notebook or note
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if notebook is None or anonymous is None:
raise Access_error()
if read_write is False:
notebook.read_write = Notebook.READ_ONLY
if owner is False:
notebook.owner = False
if note_id:
note = self.__database.load( Note, note_id, revision )
if note and note.notebook_id != notebook_id:
if note.notebook_id == notebook.trash_id:
note = None
else:
raise Access_error()
# if two revisions were provided, then make the returned note's contents into a diff
if note and revision and previous_revision:
previous_note = self.__database.load( Note, note_id, previous_revision )
if previous_note and previous_note.contents:
note.replace_contents( Html_differ().diff( previous_note.contents, note.contents ) )
else:
note = None
notebook.tags = \
self.__database.select_many( Tag, notebook.sql_load_tags( user_id ) ) + \
self.__database.select_many( Tag, notebook.sql_load_tags( anonymous.object_id ) )
startup_notes = self.__database.select_many( Note, notebook.sql_load_startup_notes() )
total_notes_count = self.__database.select_one( int, notebook.sql_count_notes(), use_cache = True )
if self.__users.load_notebook( user_id, notebook_id, owner = True ):
invites = self.__database.select_many( Invite, Invite.sql_load_notebook_invites( notebook_id ) )
else:
invites = []
return dict(
notebook = notebook,
startup_notes = startup_notes,
total_notes_count = total_notes_count,
notes = note and [ note ] or [],
invites = invites or [],
)
@expose( view = None, rss = Updates_rss )
@strongly_expire
@end_transaction
@validate(
notebook_id = Valid_id(),
notebook_name = Valid_string(),
)
def updates( self, notebook_id, notebook_name ):
"""
Provide the information necessary to display an updated notes RSS feed for the given notebook.
This method does not require any sort of login.
@type notebook_id: unicode
@param notebook_id: id of the notebook to provide updates for
@type notebook_name: unicode
@param notebook_name: name of the notebook to include in the RSS feed
@rtype: unicode
@return: rendered RSS feed
"""
notebook = self.__database.load( Notebook, notebook_id )
if not notebook:
return dict(
recent_notes = [],
notebook_id = notebook_id,
notebook_name = notebook_name,
https_url = self.__https_url,
)
recent_notes = self.__database.select_many( Note, notebook.sql_load_notes_in_update_order( start = 0, count = 10 ) )
return dict(
recent_notes = [ ( note.object_id, note.revision ) for note in recent_notes ],
notebook_id = notebook_id,
notebook_name = notebook_name,
https_url = self.__https_url,
)
@expose( view = Update_link_page )
@strongly_expire
@end_transaction
@validate(
notebook_id = Valid_id(),
notebook_name = Valid_string(),
note_id = Valid_id(),
revision = Valid_revision(),
)
def get_update_link( self, notebook_id, notebook_name, note_id, revision ):
"""
Provide the information necessary to display a link to an updated note. This method does not
require any sort of login.
@type notebook_id: unicode
@param notebook_id: id of the notebook the note is in
@type notebook_name: unicode
@param notebook_name: name of the notebook
@type note_id: unicode
@param note_id: id of the note to link to
@type revision: unicode
@param revision: ignored; present so RSS feed readers distinguish between different revisions
@rtype: unicode
@return: rendered HTML page
"""
return dict(
notebook_id = notebook_id,
notebook_name = notebook_name,
note_id = note_id,
https_url = self.__https_url,
)
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
revision = Valid_revision(),
previous_revision = Valid_revision( none_okay = True ),
summarize = Valid_bool(),
user_id = Valid_id( none_okay = True ),
)
def load_note( self, notebook_id, note_id, revision = None, previous_revision = None, summarize = False, user_id = None ):
"""
Return the information on a particular note by its id.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_id: unicode
@param note_id: id of note to return
@type revision: unicode or NoneType
@param revision: revision timestamp of the note (optional)
@type previous_revision: unicode or NoneType
@param previous_revision: older revision timestamp to diff with the given revision (optional)
@type summarize: bool or NoneType
@param summarize: True to return a summary of the note's contents, False to return full text
(optional, defaults to False)
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'note': notedict or None }
@raise Access_error: the current user doesn't have access to the given notebook or note
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
note = self.__database.load( Note, note_id, revision )
# if the note has no notebook, it has been deleted "forever"
if note and note.notebook_id is None:
return dict(
note = None,
)
if note and note.notebook_id != notebook_id:
if note.notebook_id == notebook.trash_id:
if revision:
return dict(
note = summarize and self.summarize_note( note ) or note,
)
return dict(
note = None,
note_id_in_trash = note.object_id,
)
raise Access_error()
if note and revision and previous_revision:
previous_note = self.__database.load( Note, note_id, previous_revision )
if previous_note and previous_note.contents:
note.replace_contents( Html_differ().diff( previous_note.contents, note.contents ) )
return dict(
note = summarize and self.summarize_note( note ) or note,
)
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_title = Valid_string( min = 1, max = 500 ),
summarize = Valid_bool(),
user_id = Valid_id( none_okay = True ),
)
def load_note_by_title( self, notebook_id, note_title, summarize = False, user_id = None ):
"""
Return the information on a particular note by its title. The lookup by title is performed
case-insensitively.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_title: unicode
@param note_title: title of the note to return
@type summarize: bool or NoneType
@param summarize: True to return a summary of the note's contents, False to return full text
(optional, defaults to False)
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'note': notedict or None }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
note = self.__database.select_one( Note, notebook.sql_load_note_by_title( note_title ) )
return dict(
note = summarize and self.summarize_note( note ) or note,
)
def summarize_note( self, note, max_summary_length = None, word_count = None, highlight_text = None ):
"""
Create a truncated, HTML-free note summary for the given note, and then return the note with
its summary set.
@type note: model.Note or NoneType
@param note: note to summarize, or None
@type max_summary_length: int or NoneType
@param max_summary_length: the length to which the summary is truncated (optional, defaults
to a reasonable length)
@type word_count: int or NoneType
@param word_count: the number of words to which the summary is truncated (optional, defaults
to a reasonable number of words)
@type highlight_text: unicode or NoneType
@param highlight_text: text to emphasize within the summary (optional, defaults to no emphasis)
@rtype: model.Note or NoneType
@return: note with its summary member set, or None if no note was provided
"""
DEFAULT_MAX_SUMMARY_LENGTH = 40
DEFAULT_WORD_COUNT = 10
if not max_summary_length:
max_summary_length = DEFAULT_MAX_SUMMARY_LENGTH
if not word_count:
word_count = DEFAULT_WORD_COUNT
if note is None:
return None
if note.contents is None:
return note
# remove all HTML from the contents and also remove the title
summary = Html_nuker().nuke( note.contents )
if note.title and summary.startswith( note.title ):
summary = summary[ len( note.title ) : ]
# split the summary on whitespace
words = self.WHITESPACE_PATTERN.split( summary )
def first_words( words, word_count ):
return u" ".join( words[ : word_count ] )
# find a summary less than MAX_SUMMARY_LENGTH and, if possible, truncated on a word boundary
truncated = False
summary = first_words( words, word_count )
while len( summary ) > max_summary_length:
word_count -= 1
summary = first_words( words, word_count )
# if the first word is just ridiculously long, truncate it without finding a word boundary
if word_count == 1:
summary = summary[ : max_summary_length ]
truncated = True
break
if truncated or word_count < len( words ):
summary += " ..."
if highlight_text:
summary = summary.replace( highlight_text, "<b>%s</b>" % highlight_text )
note.summary = summary
return note
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_title = Valid_string( min = 1, max = 500 ),
user_id = Valid_id( none_okay = True ),
)
def lookup_note_id( self, notebook_id, note_title, user_id ):
"""
Return a note's id by looking up its title. The lookup by title is performed
case-insensitively.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_title: unicode
@param note_title: title of the note id to return
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'note_id': noteid or None }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
note = self.__database.select_one( Note, notebook.sql_load_note_by_title( note_title ) )
return dict(
note_id = note and note.object_id or None,
)
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def load_note_revisions( self, notebook_id, note_id, user_id = None ):
"""
Return the full list of revision timestamps for this note in chronological order.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_id: unicode
@param note_id: id of note in question
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'revisions': userrevisionlist or None }
@raise Access_error: the current user doesn't have access to the given notebook or note
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
note = self.__database.load( Note, note_id )
if note:
if note and note.notebook_id is None:
return dict(
revisions = None,
)
if note.notebook_id != notebook_id:
if note.notebook_id == notebook.trash_id:
return dict(
revisions = None,
)
raise Access_error()
revisions = self.__database.select_many( User_revision, note.sql_load_revisions() )
else:
revisions = None
return dict(
revisions = revisions,
)
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def load_note_links( self, notebook_id, note_id, user_id = None ):
"""
Return a list of HTTP links found within the contents of the given note.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_id: unicode
@param note_id: id of note in question
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'tree_html': html_fragment }
@raise Access_error: the current user doesn't have access to the given notebook or note
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
note = self.__database.load( Note, note_id )
if note is None or note.notebook_id not in ( notebook_id, notebook.trash_id ):
raise Access_error()
items = []
for match in self.LINK_PATTERN.finditer( note.contents ):
( attributes, href, target, embedded_image, title ) = match.groups()
# if it has a link target, it's a link to an external web site
if target:
items.append( Note_tree_area.make_item( title, attributes, u"note_tree_external_link" ) )
continue
# if it has '/files/' in its path, it's an uploaded file link
if self.FILE_PATTERN.search( href ):
if not self.NEW_FILE_PATTERN.search( href ): # ignore files that haven't been uploaded yet
if embedded_image:
title = u"embedded image"
items.append( Note_tree_area.make_item( title, attributes, u"note_tree_file_link", target = u"_new" ) )
continue
# if it has a note_id, load that child note and see whether it has any children of its own
child_note_ids = cgi.parse_qs( href.split( '?' )[ -1 ] ).get( u"note_id" )
if child_note_ids:
child_note_id = child_note_ids[ 0 ]
child_note = self.__database.load( Note, child_note_id )
if child_note and child_note.contents and self.LINK_PATTERN.search( child_note.contents ):
items.append( Note_tree_area.make_item( title, attributes, u"note_tree_link", has_children = True ) )
continue
# otherwise, it's childless
items.append( Note_tree_area.make_item( title, attributes, u"note_tree_link", has_children = False ) )
return dict(
tree_html = unicode( Note_tree_area.make_tree( items ) ),
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
contents = Valid_string( min = 1, max = 50000, escape_html = False ),
startup = Valid_bool(),
previous_revision = Valid_revision( none_okay = True ),
position_after = Valid_id( none_okay = True ),
position_before = Valid_id( none_okay = True ),
user_id = Valid_id( none_okay = True ),
)
def save_note( self, notebook_id, note_id, contents, startup, previous_revision = None,
position_after = None, position_before = None, user_id = None ):
"""
Save a new revision of the given note. This function will work both for creating a new note and
for updating an existing note. If the note exists and the given contents are identical to the
existing contents for the given previous_revision, then no saving takes place and a new_revision
of None is returned. Otherwise this method returns the timestamp of the new revision.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_id: unicode
@param note_id: id of note to save
@type contents: unicode
@param contents: new textual contents of the note, including its title
@type startup: bool
@param startup: whether the note should be displayed on startup
@type previous_revision: unicode or NoneType
@param previous_revision: previous known revision timestamp of the provided note, or None if
the note is new
@type position_after: unicode or NoneType
@param position_after: id of note to position the saved note after (optional)
@type position_before: unicode or NoneType
@param position_before: id of note to position the saved note before (optional)
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: {
'new_revision': User_revision of saved note, or None if nothing was saved
'previous_revision': User_revision immediately before new_revision, or None if the note is new
'storage_bytes': current storage usage by user
'rank': float rank of the saved note, or None
}
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, note_id = note_id )
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error();
note = self.__database.load( Note, note_id )
# if the user has read-write access only to their own notes in this notebook, force the startup
# flag to be True for this note. also ignore note positioning parameters
if notebook.read_write == Notebook.READ_WRITE_FOR_OWN_NOTES:
startup = True
position_before = None
position_after = None
def update_rank( position_after, position_before ):
after_note = position_after and self.__database.load( Note, position_after ) or None
before_note = position_before and self.__database.load( Note, position_before ) or None
if after_note and before_note:
new_rank = float( after_note.rank ) + 1.0
# if necessary, increment the rank of all subsequent notes to make "room" for this note
if new_rank >= before_note.rank:
# clear the cache of before_note and all notes with subsequent rank
self.__database.uncache_many(
Note,
self.__database.select_many(
unicode,
notebook.sql_load_note_ids_starting_from_rank( before_note.rank )
)
)
self.__database.execute( notebook.sql_increment_rank( before_note.rank ), commit = False )
return new_rank
elif after_note:
return float( after_note.rank ) + 1.0
elif before_note:
return float( before_note.rank ) - 1.0
return 0.0
# check whether the provided note contents have been changed since the previous revision
def update_note( current_notebook, old_note, startup, user ):
# the note hasn't been changed, so bail without updating it
if not position_after and not position_before and startup == old_note.startup and \
contents.replace( u"\n", u"" ) == old_note.contents.replace( u"\n", "" ):
new_revision = None
# the note has changed, so update it
else:
note.contents = contents
note.startup = startup
if position_after or position_before:
note.rank = update_rank( position_after, position_before )
elif note.rank is None:
note.rank = self.__database.select_one( float, notebook.sql_highest_note_rank() ) + 1
note.user_id = user.object_id
new_revision = User_revision( note.revision, note.user_id, user.username )
self.__files.purge_unused( note )
return new_revision
# if the note is already in the given notebook, load it and update it
if note and note.notebook_id == notebook.object_id:
old_note = self.__database.load( Note, note_id, previous_revision )
previous_user = self.__database.load( User, note.user_id )
previous_revision = User_revision( note.revision, note.user_id, previous_user and previous_user.username or None )
new_revision = update_note( notebook, old_note, startup, user )
# the note is not already in the given notebook, so look for it in the trash
elif note and notebook.trash_id and note.notebook_id == notebook.trash_id:
old_note = self.__database.load( Note, note_id, previous_revision )
# undelete the note, putting it back in the given notebook
previous_user = self.__database.load( User, note.user_id )
previous_revision = User_revision( note.revision, note.user_id, previous_user and previous_user.username or None )
note.notebook_id = notebook.object_id
note.deleted_from_id = None
new_revision = update_note( notebook, old_note, startup, user )
# otherwise, create a new note
else:
if position_after or position_before:
rank = update_rank( position_after, position_before )
else:
rank = self.__database.select_one( float, notebook.sql_highest_note_rank() ) + 1
previous_revision = None
note = Note.create( note_id, contents, notebook_id = notebook.object_id, startup = startup, rank = rank, user_id = user_id )
new_revision = User_revision( note.revision, note.user_id, user.username )
if new_revision:
self.__database.save( note, commit = False )
user = self.__users.update_storage( user_id, commit = False )
self.__database.uncache_command( notebook.sql_count_notes() ) # cached note count is now invalid
self.__database.commit()
user.group_storage_bytes = self.__users.calculate_group_storage( user )
else:
user = None
if note.rank is None:
rank = None
else:
rank = float( note.rank )
return dict(
new_revision = new_revision,
previous_revision = previous_revision,
storage_bytes = user and user.storage_bytes or 0,
rank = rank,
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
revision = Valid_revision(),
user_id = Valid_id( none_okay = True ),
)
def revert_note( self, notebook_id, note_id, revision, user_id ):
"""
Revert the contents of a note to that of an earlier revision, thereby creating a new revision.
The timestamp of the new revision is returned.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_id: unicode
@param note_id: id of note to revert
@type revision: unicode or NoneType
@param revision: revision timestamp to revert to for the provided note
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: {
'new_revision': User_revision of the reverted note
'previous_revision': User_revision immediately before new_revision
'storage_bytes': current storage usage by user,
}
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True )
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error()
note = self.__database.load( Note, note_id )
if not note:
raise Access_error()
if not self.__users.load_notebook( user_id, note.notebook_id, read_write = True, note_id = note.object_id ):
raise Access_error()
# check whether the provided note contents have been changed since the previous revision
def update_note( current_notebook, old_note, user ):
# if the revision to revert to is already the newest revision, bail without updating the note
if old_note.revision == note.revision:
new_revision = None
# otherwise, revert the note's contents to that of the older revision
else:
note.contents = old_note.contents
note.user_id = user.object_id
new_revision = User_revision( note.revision, note.user_id, user.username )
self.__files.purge_unused( note )
return new_revision
previous_user = self.__database.load( User, note.user_id )
previous_revision = User_revision( note.revision, note.user_id, previous_user and previous_user.username or None )
# if the note is already in the given notebook, load it and revert it
if note and note.notebook_id == notebook.object_id:
old_note = self.__database.load( Note, note_id, revision )
new_revision = update_note( notebook, old_note, user )
# the note is not already in the given notebook, so look for it in the trash
elif note and notebook.trash_id and note.notebook_id == notebook.trash_id:
old_note = self.__database.load( Note, note_id, revision )
# undelete the note, putting it back in the given notebook
note.notebook_id = notebook.object_id
note.deleted_from_id = None
new_revision = update_note( notebook, old_note, user )
# otherwise, the note doesn't exist
else:
raise Access_error()
if new_revision:
self.__database.save( note, commit = False )
user = self.__users.update_storage( user_id, commit = False )
self.__database.commit()
user.group_storage_bytes = self.__users.calculate_group_storage( user )
else:
user = None
return dict(
new_revision = new_revision,
previous_revision = previous_revision,
storage_bytes = user and user.storage_bytes or 0,
contents = note.contents,
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def delete_note( self, notebook_id, note_id, user_id ):
"""
Delete the given note from its notebook and move it to the notebook's trash. The note is added
as a startup note within the trash. If the given notebook is the trash and the given note is
already there, then it is deleted from the trash forever.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type note_id: unicode
@param note_id: id of note to delete
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'storage_bytes': current storage usage by user }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, note_id = note_id )
if not notebook:
raise Access_error()
note = self.__database.load( Note, note_id )
if note and note.notebook_id == notebook_id:
if notebook.trash_id:
note.deleted_from_id = notebook_id
note.notebook_id = notebook.trash_id
note.startup = True
else:
self.__files.purge_unused( note, purge_all_links = True )
note.notebook_id = None
note.user_id = user_id
self.__database.save( note, commit = False )
user = self.__users.update_storage( user_id, commit = False )
self.__database.uncache_command( notebook.sql_count_notes() ) # cached note count is now invalid
self.__database.commit()
user.group_storage_bytes = self.__users.calculate_group_storage( user )
return dict( storage_bytes = user.storage_bytes )
else:
return dict( storage_bytes = 0 )
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
note_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def undelete_note( self, notebook_id, note_id, user_id ):
"""
Undelete the given note from the trash, moving it back into its notebook. The note is added
as a startup note within its notebook.
@type notebook_id: unicode
@param notebook_id: id of notebook the note was in
@type note_id: unicode
@param note_id: id of note to undelete
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'storage_bytes': current storage usage by user }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, note_id = note_id )
if not notebook:
raise Access_error()
note = self.__database.load( Note, note_id )
if note and notebook.trash_id:
# if the note isn't deleted, and it's already in this notebook, just return
if note.deleted_from_id is None and note.notebook_id == notebook_id:
return dict( storage_bytes = 0 )
# if the note was deleted from a different notebook than the notebook given, raise
if note.deleted_from_id != notebook_id:
raise Access_error()
note.notebook_id = note.deleted_from_id
note.deleted_from_id = None
note.startup = True
note.user_id = user_id
self.__database.save( note, commit = False )
user = self.__users.update_storage( user_id, commit = False )
self.__database.uncache_command( notebook.sql_count_notes() ) # cached note count is now invalid
self.__database.commit()
user.group_storage_bytes = self.__users.calculate_group_storage( user )
return dict( storage_bytes = user.storage_bytes )
else:
return dict( storage_bytes = 0 )
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def delete_all_notes( self, notebook_id, user_id ):
"""
Delete all notes from the given notebook and move them to the notebook's trash (if any). The
notes are added as startup notes within the trash. If the given notebook is the trash, then
all notes in the trash are deleted forever.
@type notebook_id: unicode
@param notebook_id: id of notebook the note is in
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'storage_bytes': current storage usage by user }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True )
if not notebook or notebook.read_write == Notebook.READ_WRITE_FOR_OWN_NOTES:
raise Access_error()
notes = self.__database.select_many( Note, notebook.sql_load_notes_in_update_order() )
for note in notes:
if notebook.trash_id:
note.deleted_from_id = notebook_id
note.notebook_id = notebook.trash_id
note.startup = True
else:
self.__files.purge_unused( note, purge_all_links = True )
note.notebook_id = None
note.user_id = user_id
self.__database.save( note, commit = False )
user = self.__users.update_storage( user_id, commit = False )
self.__database.uncache_command( notebook.sql_count_notes() ) # cached note count is now invalid
self.__database.commit()
user.group_storage_bytes = self.__users.calculate_group_storage( user )
return dict(
storage_bytes = user.storage_bytes,
)
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
search_text = unicode,
user_id = Valid_id( none_okay = True ),
)
def search_titles( self, notebook_id, search_text, user_id ):
"""
Search the note titles within the given notebook for the given search text, and return matching
notes. The search is case-insensitive. The returned notes include title summaries with the
search term highlighted and are ordered by descending revision timestamp.
@type notebook_id: unicode
@param notebook_id: id of notebook to search
@type search_text: unicode
@param search_text: search term
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'notes': [ matching notes ] }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
@raise Search_error: the provided search_text is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
MAX_SEARCH_TEXT_LENGTH = 256
if len( search_text ) > MAX_SEARCH_TEXT_LENGTH:
raise Validation_error( u"search_text", None, unicode, message = u"is too long" )
if len( search_text ) == 0:
raise Validation_error( u"search_text", None, unicode, message = u"is missing" )
notes = self.__database.select_many( Note, Notebook.sql_search_titles( notebook_id, search_text ) )
for note in notes:
# do a case-insensitive replace to wrap the search term with bold
search_text_pattern = re.compile( u"(%s)" % re.escape( search_text ), re.I )
note.summary = search_text_pattern.sub( r"<b>\1</b>", note.summary )
return dict(
notes = notes,
)
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
search_text = unicode,
user_id = Valid_id( none_okay = True ),
)
def search( self, notebook_id, search_text, user_id ):
"""
Search the notes within all notebooks that the user has access to for the given search text.
Note that the search is case-insensitive, and all HTML tags are ignored. Notes with title
matches are generally ranked higher than matches that are only in the note contents. The
returned notes include content summaries with the search terms highlighted.
@type notebook_id: unicode
@param notebook_id: id of notebook to show first in search results
@type search_text: unicode
@param search_text: search term
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: json dict
@return: { 'notes': [ matching notes ] }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
@raise Search_error: the provided search_text is invalid
"""
# if the anonymous user has access to the given notebook, then run the search as the anonymous
# user instead of the given user id
anonymous = self.__database.select_one( User, User.sql_load_by_username( u"anonymous" ), use_cache = True )
if not anonymous:
raise Access_error()
notebook = self.__users.load_notebook( anonymous.object_id, notebook_id )
if notebook:
user_id = anonymous.object_id
else:
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
MAX_SEARCH_TEXT_LENGTH = 256
if len( search_text ) > MAX_SEARCH_TEXT_LENGTH:
raise Validation_error( u"search_text", None, unicode, message = u"is too long" )
if len( search_text ) == 0:
raise Validation_error( u"search_text", None, unicode, message = u"is missing" )
notes = self.__database.select_many( Note, Notebook.sql_search_notes( user_id, notebook_id, search_text, self.__database.backend ) )
# make a summary for each note that doesn't have one
notes = [
note.summary and note or
self.summarize_note( note, max_summary_length = 80, word_count = 30, highlight_text = search_text )
for note in notes
]
return dict(
notes = notes,
)
@expose()
@weakly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
format = Valid_string( min = 1, max = 100 ),
note_id = Valid_id( none_okay = True ),
user_id = Valid_id( none_okay = True ),
)
def export( self, notebook_id, format, note_id = None, user_id = None ):
"""
Download the entire contents of the given notebook as a stand-alone file.
@type notebook_id: unicode
@param notebook_id: id of notebook to export
@type format: unicode
@param format: string indicating the export plugin to use, currently one of: "html", "csv"
@type notebook_id: unicode
@param note_id: id of single note within the notebook to export (optional)
@type user_id: unicode
@param user_id: id of current logged-in user (if any), determined by @grab_user_id
@rtype: unicode or generator (for streaming files)
@return: exported file with appropriate headers to trigger a download
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid or the format is unknown
"""
if not self.EXPORT_FORMAT_PATTERN.search( format ):
raise Validation_error( u"format", format, Valid_string, message = u"is invalid" )
notebook = self.__users.load_notebook( user_id, notebook_id )
if not notebook:
raise Access_error()
if note_id:
note = self.__database.load( Note, note_id )
if not note:
raise Access_error()
notes = [ note ]
notebook = None
else:
startup_notes = self.__database.select_many( Note, notebook.sql_load_startup_notes() )
other_notes = self.__database.select_many( Note, notebook.sql_load_non_startup_notes() )
notes = startup_notes + other_notes
from plugins.Invoke import invoke
try:
return invoke(
plugin_type = u"export",
plugin_name = format,
database = self.__database,
notebook = notebook,
notes = notes,
response_headers = cherrypy.response.headerMap,
)
except ( ImportError, AttributeError ):
raise Validation_error( u"format", format, Valid_string, message = u"is unknown" )
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
user_id = Valid_id( none_okay = True ),
)
def create( self, user_id ):
"""
Create a new notebook and give it a default name.
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype dict
@return { 'redirect': new_notebook_url }
@raise Access_error: the current user doesn't have access to create a notebook
@raise Validation_error: one of the arguments is invalid
"""
if user_id is None:
raise Access_error()
user = self.__database.load( User, user_id )
notebook = self.__create_notebook( u"new notebook", user )
return dict(
redirect = u"/notebooks/%s?rename=true" % notebook.object_id,
)
def __create_notebook( self, name, user, commit = True ):
# create the notebook along with a trash
trash_id = self.__database.next_id( Notebook, commit = False )
trash = Notebook.create( trash_id, u"trash", user_id = user.object_id )
self.__database.save( trash, commit = False )
notebook_id = self.__database.next_id( Notebook, commit = False )
notebook = Notebook.create( notebook_id, name, trash_id, user_id = user.object_id )
self.__database.save( notebook, commit = False )
# record the fact that the user has access to their new notebook
rank = self.__database.select_one( float, user.sql_highest_notebook_rank() ) + 1
self.__database.execute( user.sql_save_notebook( notebook_id, read_write = True, owner = True, rank = rank ), commit = False )
self.__database.execute( user.sql_save_notebook( trash_id, read_write = True, owner = True ), commit = False )
if commit:
self.__database.commit()
return notebook
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
name = Valid_string( min = 1, max = 100 ),
user_id = Valid_id( none_okay = True ),
)
def rename( self, notebook_id, name, user_id ):
"""
Change the name of the given notebook.
@type notebook_id: unicode
@param notebook_id: id of notebook to rename
@type name: unicode
@param name: new name of the notebook
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype dict
@return {}
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, owner = True )
# special case to allow the creator of a READ_WRITE_FOR_OWN_NOTES notebook to rename it
if notebook is None:
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True )
if not notebook or not ( notebook.read_write == Notebook.READ_WRITE_FOR_OWN_NOTES and
notebook.user_id == user_id ):
raise Access_error()
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error()
# prevent renaming of the trash notebook to anything
if notebook.name == u"trash":
raise Access_error()
# prevent just anyone from making official Luminotes notebooks
if name.startswith( u"Luminotes" ) and not notebook.name.startswith( u"Luminotes" ):
raise Access_error()
# prevent renaming of another notebook to "trash"
if name == u"trash":
raise Access_error()
notebook.name = name
notebook.user_id = user_id
self.__database.save( notebook, commit = False )
self.__database.commit()
return dict()
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def delete( self, notebook_id, user_id ):
"""
Delete the given notebook and redirect to a remaining read-write notebook. If there is none,
create one.
@type notebook_id: unicode
@param notebook_id: id of notebook to delete
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype dict
@return { 'redirect': remaining_notebook_url }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
if user_id is None:
raise Access_error()
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, owner = True )
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error()
# prevent deletion of a trash notebook directly
if notebook.name == u"trash":
raise Access_error()
notebook.deleted = True
notebook.user_id = user_id
self.__database.save( notebook, commit = False )
# redirect to a remaining undeleted read-write notebook, or if there isn't one, create an empty notebook
remaining_notebook = self.__database.select_one( Notebook, user.sql_load_notebooks(
parents_only = True, undeleted_only = True, read_write = True,
) )
if remaining_notebook is None:
remaining_notebook = self.__create_notebook( u"my notebook", user, commit = False )
self.__database.commit()
return dict(
redirect = u"/notebooks/%s?deleted_id=%s" % ( remaining_notebook.object_id, notebook.object_id ),
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def delete_forever( self, notebook_id, user_id ):
"""
Delete the given notebook permanently (by simply revoking the user's access to it).
@type notebook_id: unicode
@param notebook_id: id of notebook to delete
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype dict
@return: { 'storage_bytes': current storage usage by user }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
if user_id is None:
raise Access_error()
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, owner = True )
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error()
# prevent deletion of a trash notebook directly
if notebook.name == u"trash":
raise Access_error()
self.__database.execute( user.sql_remove_notebook( notebook_id ), commit = False )
user = self.__users.update_storage( user_id, commit = False )
self.__database.commit()
user.group_storage_bytes = self.__users.calculate_group_storage( user )
return dict( storage_bytes = user.storage_bytes )
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def undelete( self, notebook_id, user_id ):
"""
Undelete the given notebook and redirect to it.
@type notebook_id: unicode
@param notebook_id: id of notebook to undelete
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype dict
@return { 'redirect': notebook_url }
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
if user_id is None:
raise Access_error()
notebook = self.__users.load_notebook( user_id, notebook_id, read_write = True, owner = True )
if not notebook:
raise Access_error()
notebook.deleted = False
notebook.user_id = user_id
self.__database.save( notebook, commit = False )
self.__database.commit()
return dict(
redirect = u"/notebooks/%s" % notebook.object_id,
)
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def move_up( self, notebook_id, user_id ):
"""
Reorder the user's notebooks by moving the given notebook up by one. If the notebook is already
first, then wrap it around to be the last notebook.
@type notebook_id: unicode
@param notebook_id: id of notebook to move up
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype json dict
@return {}
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error()
# load the notebooks to which this user has access
notebooks = self.__database.select_many(
Notebook,
user.sql_load_notebooks( parents_only = True, undeleted_only = True ),
)
if not notebooks:
raise Access_error()
# find the given notebook and the one previous to it
previous_notebook = None
current_notebook = None
for notebook in notebooks:
if notebook.object_id == notebook_id:
current_notebook = notebook
break
previous_notebook = notebook
if current_notebook is None:
raise Access_error()
# if there is no previous notebook, then the current notebook is first. so, move it after the
# last notebook
if previous_notebook is None:
last_notebook = notebooks[ -1 ]
self.__database.execute(
user.sql_update_notebook_rank( current_notebook.object_id, last_notebook.rank + 1 ),
commit = False,
)
# otherwise, save the current and previous notebooks back to the database with swapped ranks
else:
self.__database.execute(
user.sql_update_notebook_rank( current_notebook.object_id, previous_notebook.rank ),
commit = False,
)
self.__database.execute(
user.sql_update_notebook_rank( previous_notebook.object_id, current_notebook.rank ),
commit = False,
)
self.__database.commit()
return dict()
@expose( view = Json )
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def move_down( self, notebook_id, user_id ):
"""
Reorder the user's notebooks by moving the given notebook down by one. If the notebook is
already last, then wrap it around to be the first notebook.
@type notebook_id: unicode
@param notebook_id: id of notebook to move down
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype json dict
@return {}
@raise Access_error: the current user doesn't have access to the given notebook
@raise Validation_error: one of the arguments is invalid
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
user = self.__database.load( User, user_id )
if not user or not notebook:
raise Access_error()
# load the notebooks to which this user has access
notebooks = self.__database.select_many(
Notebook,
user.sql_load_notebooks( parents_only = True, undeleted_only = True ),
)
if not notebooks:
raise Access_error()
# find the given notebook and the one after it
current_notebook = None
next_notebook = None
for notebook in notebooks:
if notebook.object_id == notebook_id:
current_notebook = notebook
elif current_notebook:
next_notebook = notebook
break
if current_notebook is None:
raise Access_error()
# if there is no next notebook, then the current notebook is last. so, move it before the
# first notebook
if next_notebook is None:
first_notebook = notebooks[ 0 ]
self.__database.execute(
user.sql_update_notebook_rank( current_notebook.object_id, first_notebook.rank - 1 ),
commit = False,
)
# otherwise, save the current and next notebooks back to the database with swapped ranks
else:
self.__database.execute(
user.sql_update_notebook_rank( current_notebook.object_id, next_notebook.rank ),
commit = False,
)
self.__database.execute(
user.sql_update_notebook_rank( next_notebook.object_id, current_notebook.rank ),
commit = False,
)
self.__database.commit()
return dict()
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
notebook_id = Valid_id(),
start = Valid_int( min = 0 ),
count = Valid_int( min = 1 ),
user_id = Valid_id( none_okay = True ),
)
def load_recent_updates( self, notebook_id, start, count, user_id = None ):
"""
Provide the information necessary to display a notebook's recent updated/created notes, in
reverse chronological order by update time.
@type notebook_id: unicode
@param notebook_id: id of the notebook containing the notes
@type start: unicode or NoneType
@param start: index of recent note to start with (defaults to 0, the most recent note)
@type count: int or NoneType
@param count: number of recent notes to display (defaults to 10 notes)
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype: json dict
@return: { 'notes': recent_notes_list }
@raise Access_error: the current user doesn't have access to the given notebook or note
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if notebook is None:
raise Access_error()
recent_notes = self.__database.select_many( Note, notebook.sql_load_notes_in_update_order( start = start, count = count ) )
return dict(
notes = recent_notes,
)
def recent_notes( self, notebook_id, start = 0, count = 10, user_id = None ):
"""
Return the given notebook's recently created notes in reverse chronological order by creation
time.
@type notebook_id: unicode
@param notebook_id: id of the notebook containing the notes
@type start: unicode or NoneType
@param start: index of recent note to start with (defaults to 0, the most recent note)
@type count: int or NoneType
@param count: number of recent notes to return (defaults to 10 notes)
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype: dict
@return: data for Main_page() constructor
@raise Access_error: the current user doesn't have access to the given notebook or note
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if notebook is None:
raise Access_error()
notes = self.__database.select_many( Note, notebook.sql_load_notes_in_creation_order( start, count ) )
result = self.__users.current( user_id )
result.update( self.contents( notebook_id, user_id = user_id ) )
result[ "notes" ] = notes
result[ "start" ] = start
result[ "count" ] = count
return result
def old_notes( self, notebook_id, start = 0, count = 10, user_id = None ):
"""
Return the given notebook's oldest notes in chronological order by creation time.
@type notebook_id: unicode
@param notebook_id: id of the notebook containing the notes
@type start: unicode or NoneType
@param start: index of recent note to start with (defaults to 0, the oldest note)
@type count: int or NoneType
@param count: number of notes to return (defaults to 10 notes)
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype: dict
@return: data for Main_page() constructor
@raise Access_error: the current user doesn't have access to the given notebook or note
"""
notebook = self.__users.load_notebook( user_id, notebook_id )
if notebook is None:
raise Access_error()
notes = self.__database.select_many( Note, notebook.sql_load_notes_in_creation_order( start, count, reverse = True ) )
result = self.__users.current( user_id )
result.update( self.contents( notebook_id, user_id = user_id ) )
result[ "notes" ] = notes
result[ "start" ] = start
result[ "count" ] = count
return result
WHITESPACE_PATTERN = re.compile( "\s+" )
NEWLINE_PATTERN = re.compile( "\r?\n" )
NOTE_LINK_PATTERN = re.compile( '(<a\s+(?:[^>]+\s+)?href=")[^"]*/notebooks/(\w+)\?note_id=(\w+)("[^>]*>)', re.IGNORECASE )
@expose( view = Json )
@strongly_expire
@end_transaction
@grab_user_id
@validate(
file_id = Valid_id(),
content_column = Valid_int( min = 0 ),
title_column = Valid_int( min = 0, none_okay = True ),
plaintext = Valid_bool(),
import_button = unicode,
user_id = Valid_id( none_okay = True ),
)
def import_csv( self, file_id, content_column, title_column, plaintext, import_button, user_id = None ):
"""
Import a previously uploaded CSV file of notes as a new notebook. Delete the file once the
import is complete.
Plaintext contents are left mostly untouched, just stripping HTML and converting newlines to
<br> tags. HTML contents are cleaned of any disallowed/harmful HTML tags, and target="_new"
attributes are added to all links without targets, except internal note links.
Internal note links are rewritten such that they point to the newly imported notes. This is
accomplished by looking for a "note_id" column and determining what note each link points to.
Then each internal note link is rewritten to point at the new notebook id and note id.
@type file_id: unicode
@param file_id: id of the previously uploaded CSV file to import
@type content_column: int
@param content_column: zero-based index of the column containing note contents
@type title_column: int or NoneType
@param title_column: zero-based index of the column containing note titles (None indicates
the lack of any such column, in which case titles are derived from the
first few words of each note's contents if no title is already present
in the note's contents)
@type plaintext: bool
@param plaintext: True if the note contents are plaintext, or False if they're HTML
@type import_button: unicode
@param import_button: ignored
@type user_id: unicode or NoneType
@param user_id: id of current logged-in user (if any)
@rtype: dict
@return: { 'redirect': new_notebook_url }
@raise Access_error: the current user doesn't have access to the given file
@raise Files.Parse_error: there was an error in parsing the given file
@raise Import_error: there was an error in importing the notes from the file
"""
TRUNCATED_TITLE_CHAR_LENGTH = 80
if user_id is None:
raise Access_error()
user = self.__database.load( User, user_id )
if user is None:
raise Access_error()
db_file = self.__database.load( File, file_id )
if db_file is None:
raise Access_error()
db_notebook = self.__users.load_notebook( user_id, db_file.notebook_id )
if db_notebook is None or db_notebook.read_write == Notebook.READ_WRITE_FOR_OWN_NOTES:
raise Access_error()
# if the file has a "note_id" header column, record its index
note_id_column = None
note_ids = {} # map of original CSV note id to imported note id
parser = self.__files.parse_csv( file_id, skip_header = False )
row = parser.next()
if row and u"note_id" in row:
note_id_column = row.index( u"note_id" )
parser = self.__files.parse_csv( file_id, skip_header = True )
# create a new notebook for the imported notes
notebook = self.__create_notebook( u"imported notebook", user, commit = False )
# import the notes into the new notebook
for row in parser:
row_length = len( row )
if content_column >= row_length:
raise Import_error()
if title_column is not None and title_column >= row_length:
raise Import_error()
title = None
# if there is a title column, use it. otherwise, if the note doesn't already contain a title,
# use the first line of the content column as the title
if title_column and title_column != content_column and len( row[ title_column ].strip() ) > 0:
title = Html_nuker( allow_refs = True ).nuke( Valid_string( escape_html = plaintext )( row[ title_column ].strip() ) )
elif plaintext or not Note.TITLE_PATTERN.search( row[ content_column ] ):
content_text = Html_nuker( allow_refs = True ).nuke( Valid_string( escape_html = plaintext )( row[ content_column ].strip() ) )
content_lines = [ line for line in self.NEWLINE_PATTERN.split( content_text ) if line.strip() ]
# skip notes with empty contents
if len( content_lines ) == 0:
continue
title = content_lines[ 0 ]
# truncate the makeshift title to a reasonable length, but truncate on a word boundary
if len( title ) > TRUNCATED_TITLE_CHAR_LENGTH:
title_words = self.WHITESPACE_PATTERN.split( title )
for i in range( 1, len( title_words ) ):
title_candidate = u" ".join( title_words[ : i ] )
if len( title_candidate ) <= TRUNCATED_TITLE_CHAR_LENGTH:
title = title_candidate
else:
break
contents = Valid_string( max = 50000, escape_html = plaintext, require_link_target = True )( row[ content_column ] )
if plaintext:
contents = contents.replace( u"\n", u"<br />" )
note_id = self.__database.next_id( Note, commit = False )
note = Note.create( note_id, contents, notebook_id = notebook.object_id, startup = False, rank = None, user_id = user_id )
# if the note doesn't have a title yet, then tack the given title onto the start of the contents
if title and note.title is None:
note.contents = u"<h3>%s</h3>%s" % ( title, note.contents )
# if there is a note id column, then map the original CSV note id to its new imported note id
if note_id_column:
try:
original_note_id = Valid_id( none_okay = True )( row[ note_id_column ].strip() )
except ValueError:
original_note_id = None
if original_note_id:
note_ids[ original_note_id ] = note_id
self.__database.save( note, commit = False )
def rewrite_link( match ):
( link_start, original_notebook_id, original_note_id, link_end ) = match.groups()
note_id = note_ids.get( original_note_id )
if note_id:
return "%s/notebooks/%s?note_id=%s%s" % ( link_start, notebook.object_id, note_id, link_end )
# if we don't know how to rewrite the link (for lack of the new note id), then don't rewrite
# it and leave the link as it is
return "%s/notebooks/%s?note_id=%s%s" % ( link_start, original_notebook_id, original_note_id, link_end )
# do a pass over all the imported notes to rewrite internal note links so that they point to
# the newly imported note ids in the new notebook
for ( original_note_id, note_id ) in note_ids.items():
note = self.__database.load( Note, note_id )
if note:
( rewritten_contents, rewritten_count ) = self.NOTE_LINK_PATTERN.subn( rewrite_link, note.contents )
if rewritten_count > 0:
note.contents = rewritten_contents
self.__database.save( note, commit = False )
# delete the CSV file now that it's been imported
self.__database.execute( db_file.sql_delete(), commit = False )
self.__database.uncache( db_file )
self.__database.commit()
Upload_file.delete_file( file_id )
return dict(
redirect = u"/notebooks/%s?rename=true" % notebook.object_id,
)