witten
/
luminotes
Archived
1
0
Fork 0

Completed note link rewriting support for CSV importing. Also completed unit tests for that and CSV exporting.

This commit is contained in:
Dan Helfman 2008-09-26 21:30:40 -07:00
parent 6c46951285
commit 38f4772a6a
5 changed files with 312 additions and 17 deletions

View File

@ -883,6 +883,7 @@ class Files( object ):
import csv
table_file = Upload_file.open_file( file_id )
table_file.seek( 0 ) # necessary in case the file is opened by another call to parse_csv()
sniffer = csv.Sniffer()
# attempt to determine the presence of a header

View File

@ -1,5 +1,6 @@
# originally from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/496942
import re
import urlparse
from htmllib import HTMLParser
from cgi import escape
@ -17,6 +18,8 @@ class Html_cleaner(HTMLParser):
"""
Cleans HTML of any tags not matching a whitelist.
"""
NOTE_LINK_URL_PATTERN = re.compile( '[^"]*/notebooks/\w+\?note_id=\w+', re.IGNORECASE )
def __init__( self, require_link_target = False ):
HTMLParser.__init__( self, AbstractFormatter( NullWriter() ) )
self.result = []
@ -194,7 +197,8 @@ class Html_cleaner(HTMLParser):
else:
bt += ' %s=%s' % \
(xssescape(attribute), quoteattr(attrs[attribute]))
if self.require_link_target and tag == "a" and not attrs.get( 'target' ):
if self.require_link_target and tag == "a" and not attrs.get( 'target' ) and \
( not attrs.get( 'href' ) or not self.NOTE_LINK_URL_PATTERN.search( attrs.get( 'href' ) ) ):
bt += ' target="_new"'
if bt == "<a" or bt == "<img":
return

View File

@ -1250,7 +1250,7 @@ class Notebooks( object ):
writer = csv.writer( buffer )
cherrypy.response.headerMap[ u"Content-Disposition" ] = u"attachment; filename=wiki.csv"
cherrypy.response.headerMap[ u"Content-Type" ] = u"text/csv"
cherrypy.response.headerMap[ u"Content-Type" ] = u"text/csv;charset=utf-8"
def stream():
writer.writerow( ( u"contents", u"title", u"note_id", u"startup", u"username", u"revision_date" ) )
@ -1263,7 +1263,7 @@ class Notebooks( object ):
user = self.__database.load( User, note.user_id )
writer.writerow( (
note.contents.encode( "utf8" ), # TODO: should this try to remove the title?
note.contents.encode( "utf8" ),
note.title.encode( "utf8" ),
note.object_id,
note.startup and 1 or 0,
@ -1740,6 +1740,7 @@ class Notebooks( object ):
WHITESPACE_PATTERN = re.compile( "\s+" )
NEWLINE_PATTERN = re.compile( "\r?\n" )
NOTE_LINK_PATTERN = re.compile( '(<a\s+(?:[^>]+\s+)?href=")[^"]*/notebooks/(\w+)\?note_id=(\w+)("[^>]*>)', re.IGNORECASE )
@expose( view = Json )
@strongly_expire
@ -1763,7 +1764,7 @@ class Notebooks( object ):
attributes are added to all links without targets, except internal note links.
Internal note links are rewritten such that they point to the newly imported notes. This is
accomplished by looking for a "note_id" column and determining what note each link points out.
accomplished by looking for a "note_id" column and determining what note each link points to.
Then each internal note link is rewritten to point at the new notebook id and note id.
@type file_id: unicode
@ -1800,6 +1801,15 @@ class Notebooks( object ):
if db_file is None or not self.__users.check_access( user_id, db_file.notebook_id ):
raise Access_error()
# if the file has a "note_id" header column, record its index
note_id_column = None
note_ids = {} # map of original CSV note id to imported note id
parser = self.__files.parse_csv( file_id, skip_header = False )
row = parser.next()
if row and u"note_id" in row:
note_id_column = row.index( u"note_id" )
parser = self.__files.parse_csv( file_id, skip_header = True )
# create a new notebook for the imported notes
@ -1853,8 +1863,36 @@ class Notebooks( object ):
if title and note.title is None:
note.contents = u"<h3>%s</h3>%s" % ( title, note.contents )
# if there is a note id column, then map the original CSV note id to its new imported note id
if note_id_column:
original_note_id = Valid_id( none_okay = True )( row[ note_id_column ].strip() )
if original_note_id:
note_ids[ original_note_id ] = note_id
self.__database.save( note, commit = False )
def rewrite_link( match ):
( link_start, original_notebook_id, original_note_id, link_end ) = match.groups()
note_id = note_ids.get( original_note_id )
if note_id:
return "%s/notebooks/%s?note_id=%s%s" % ( link_start, notebook.object_id, note_id, link_end )
# if we don't know how to rewrite the link (for lack of the new note id), then don't rewrite
# it and leave the link as it is
return "%s/notebooks/%s?note_id=%s%s" % ( link_start, original_notebook_id, original_note_id, link_end )
# do a pass over all the imported notes to rewrite internal note links so that they point to
# the newly imported note ids in the new notebook
for ( original_note_id, note_id ) in note_ids.items():
note = self.__database.load( Note, note_id )
if note:
( rewritten_contents, rewritten_count ) = self.NOTE_LINK_PATTERN.subn( rewrite_link, note.contents )
if rewritten_count > 0:
note.contents = rewritten_contents
self.__database.save( note )
# delete the CSV file now that it's been imported
self.__database.execute( db_file.sql_delete(), commit = False )
self.__database.uncache( db_file )

View File

@ -116,6 +116,22 @@ class Test_controller( object ):
u"stream_response": True,
u"encoding_filter.on": False,
},
u"/files/download_product": {
u"stream_response": True,
u"encoding_filter.on": False,
},
u"/files/thumbnail": {
u"stream_response": True,
u"encoding_filter.on": False,
},
u"/files/image": {
u"stream_response": True,
u"encoding_filter.on": False,
},
u"/notebooks/export_csv": {
u"stream_response": True,
u"encoding_filter.on": False,
},
u"/files/progress": {
u"stream_response": True,
},

View File

@ -1,4 +1,8 @@
# -*- coding: utf8 -*-
import re
import csv
import types
import cherrypy
import urllib
from nose.tools import raises
@ -3485,23 +3489,36 @@ class Test_notebooks( Test_controller ):
assert result.get( "notebook_name" ) == self.notebook.name
notes = result.get( "notes" )
assert len( notes ) == len( self.notebook.notes )
assert len( notes ) == self.database.select_one( int, self.notebook.sql_count_notes() )
startup_note_allowed = True
previous_revision = None
# assert that startup notes come first, then normal notes in descending revision order
for note in notes:
if self.notebook.is_startup_note( note ):
if note.startup:
assert startup_note_allowed
else:
startup_note_allowed = False
assert note in self.notebook.notes
if previous_revision:
assert note.revision < previous_revision
previous_revision = note.revision
def test_export_html( self ):
db_note = self.database.load( Note, note.object_id )
assert db_note
assert note.object_id == db_note.object_id
assert note.revision == db_note.revision
assert note.title == db_note.title
assert note.contents == db_note.contents
assert note.notebook_id == db_note.notebook_id
assert note.startup == db_note.startup
assert note.deleted_from_id == db_note.deleted_from_id
assert note.rank == db_note.rank
assert note.user_id == db_note.user_id
assert note.creation == db_note.creation
def test_export_html_without_login( self ):
note3 = Note.create( "55", u"<h3>blah</h3>foo", notebook_id = self.notebook.object_id )
self.database.save( note3 )
@ -3525,6 +3542,103 @@ class Test_notebooks( Test_controller ):
assert result.get( "error" )
def test_export_csv( self, note_text = None ):
self.login()
if not note_text:
note_text = u"foo"
note3 = Note.create( "55", u"<h3>blah</h3>%s" % note_text, notebook_id = self.notebook.object_id )
self.database.save( note3 )
result = self.http_get(
"/notebooks/export_csv/%s" % self.notebook.object_id,
session_id = self.session_id,
)
headers = result[ u"headers" ]
assert headers
assert headers[ u"Content-Type" ] == u"text/csv;charset=utf-8"
assert headers[ u"Content-Disposition" ] == 'attachment; filename=wiki.csv'
gen = result[ u"body" ]
assert isinstance( gen, types.GeneratorType )
pieces = []
try:
for piece in gen:
pieces.append( piece )
except AttributeError, exc:
if u"session_storage" not in str( exc ):
raise exc
csv_data = "".join( pieces )
reader = csv.reader( StringIO( csv_data ) )
row = reader.next()
expected_header = [ u"contents", u"title", u"note_id", u"startup", u"username", u"revision_date" ]
assert row == expected_header
expected_note_count = self.database.select_one( int, self.notebook.sql_count_notes() )
note_count = 0
startup_note_allowed = True
previous_revision = None
# assert that startup notes come first, then normal notes in descending revision order
for row in reader:
note_count += 1
assert len( row ) == len( expected_header )
( contents, title, note_id, startup, username, revision_date ) = row
if startup:
assert startup_note_allowed
else:
startup_note_allowed = False
if previous_revision:
assert revision_date < previous_revision
previous_revision = revision_date
db_note = self.database.load( Note, note_id )
assert db_note
assert contents.decode( "utf8" ) == db_note.contents
assert title.decode( "utf8" ) == db_note.title
assert note_id.decode( "utf8" ) == db_note.object_id
assert startup.decode( "utf8" ) == db_note.startup and u"1" or "0"
assert username.decode( "utf8" ) == ( db_note.user_id and self.user.username or u"" )
assert revision_date.decode( "utf8" ) == unicode( db_note.revision )
assert note_count == expected_note_count
def test_export_csv_with_unicode( self ):
self.test_export_csv( note_text = u"ümlaut.png" )
def test_export_csv_without_login( self ):
note3 = Note.create( "55", u"<h3>blah</h3>foo", notebook_id = self.notebook.object_id )
self.database.save( note3 )
path = "/notebooks/export_csv/%s" % self.notebook.object_id
result = self.http_get(
path,
session_id = self.session_id,
)
headers = result.get( "headers" )
assert headers
assert headers.get( "Location" ) == u"http:///login?after_login=%s" % urllib.quote( path )
def test_export_csv_with_unknown_notebook( self ):
self.login()
result = self.http_get(
"/notebooks/export_csv/%s" % self.unknown_notebook_id,
session_id = self.session_id,
)
assert u"access" in result[ u"body" ][ 0 ]
def test_create( self ):
self.login()
@ -4350,7 +4464,7 @@ class Test_notebooks( Test_controller ):
self.__assert_imported_notebook( expected_notes, result )
LINK_PATTERN = re.compile( '<a href="([^"]*)"\s*([^>]*)>([^<]*)</a>', re.IGNORECASE )
NOTE_URL_PATTERN = re.compile( '(.*)/notebooks/([^?]+)\?note_id=(.*)', re.IGNORECASE )
NOTE_URL_PATTERN = re.compile( '([^"]*)/notebooks/(\w+)\?note_id=(\w+)', re.IGNORECASE )
def __assert_imported_notebook( self, expected_notes, result, plaintext = True ):
assert result[ u"redirect" ].startswith( u"/notebooks/" )
@ -4394,11 +4508,19 @@ class Test_notebooks( Test_controller ):
url_match = self.NOTE_URL_PATTERN.search( url )
if url_match:
imported_notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" )
( protocol_and_host, notebook_id, note_id ) = url_match.groups()
assert attributes == u""
assert protocol_and_host == u""
assert notebook_id == self.notebook.object_id
assert note_id # TODO: assert that the note id has been rewritten properly
# assert that the link has been rewritten to point to a note in the new notebook
assert note_id
rewritten_note = self.database.load( Note, note_id )
if rewritten_note:
assert rewritten_note.notebook_id == imported_notebook.object_id
assert notebook_id == imported_notebook.object_id
else:
assert notebook_id == self.notebook.object_id
else:
assert attributes.startswith( u'target="' )
@ -5064,11 +5186,6 @@ class Test_notebooks( Test_controller ):
# one of the imported notes contains a link to one of the other imported notes
note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" )
csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.<b>3 &nbsp;</b>",idone\n"8","whee","hmm\n<a href=""%s"">foo</a>",idtwo\n3,4,5,idthree' % note_url
expected_notes = [
( "blah and stuff", "3.<b>3 &nbsp;</b>" ), # ( title, contents )
( "whee", 'hmm\n<a href="%s">foo</a>' % note_url ), # TODO: expect rewritten URL instead
( "4", "5" ),
]
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
@ -5090,6 +5207,125 @@ class Test_notebooks( Test_controller ):
import_button = u"import",
), session_id = self.session_id )
notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" )
note = self.database.select_one( Note, notebook.sql_load_note_by_title( u"4" ) )
rewritten_note_url = "/notebooks/%s?note_id=%s" % ( notebook.object_id, note.object_id )
expected_notes = [
( "blah and stuff", "3.<b>3 &nbsp;</b>" ), # ( title, contents )
( "4", "5" ),
( "whee", 'hmm\n<a href="%s">foo</a>' % rewritten_note_url ),
]
self.__assert_imported_notebook( expected_notes, result, plaintext = False )
def test_import_csv_html_content_with_internal_note_link_and_blank_note_id_value( self ):
self.login()
# one of the imported notes contains a link to one of the other imported notes
note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" )
csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.<b>3 &nbsp;</b>",\n"8","whee","hmm\n<a href=""%s"">foo</a>",idtwo\n3,4,5,idthree' % note_url
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = csv_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_post( "/notebooks/import_csv/", dict(
file_id = self.file_id,
content_column = 2,
title_column = 1,
plaintext = False,
import_button = u"import",
), session_id = self.session_id )
notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" )
note = self.database.select_one( Note, notebook.sql_load_note_by_title( u"4" ) )
rewritten_note_url = "/notebooks/%s?note_id=%s" % ( notebook.object_id, note.object_id )
expected_notes = [
( "blah and stuff", "3.<b>3 &nbsp;</b>" ), # ( title, contents )
( "4", "5" ),
( "whee", 'hmm\n<a href="%s">foo</a>' % rewritten_note_url ),
]
self.__assert_imported_notebook( expected_notes, result, plaintext = False )
def test_import_csv_html_content_with_internal_note_link_to_unknown_note( self ):
self.login()
# one of the imported notes contains a link to one of the other imported notes
note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idunknown" )
csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.<b>3 &nbsp;</b>",idone\n"8","whee","hmm\n<a href=""%s"">foo</a>",idtwo\n3,4,5,idthree' % note_url
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = csv_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_post( "/notebooks/import_csv/", dict(
file_id = self.file_id,
content_column = 2,
title_column = 1,
plaintext = False,
import_button = u"import",
), session_id = self.session_id )
expected_notes = [
( "blah and stuff", "3.<b>3 &nbsp;</b>" ), # ( title, contents )
( "4", "5" ),
( "whee", 'hmm\n<a href="%s">foo</a>' % note_url ), # the note url should not be rewritten
]
self.__assert_imported_notebook( expected_notes, result, plaintext = False )
def test_import_csv_html_content_with_internal_note_link_without_note_id_column( self ):
self.login()
# one of the imported notes contains a link to one of the other imported notes
note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" )
csv_data = '"label 1","label 2","label 3",\n5,"blah and stuff","3.<b>3 &nbsp;</b>"\n"8","whee","hmm\n<a href=""%s"">foo</a>"\n3,4,5' % note_url
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = csv_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_post( "/notebooks/import_csv/", dict(
file_id = self.file_id,
content_column = 2,
title_column = 1,
plaintext = False,
import_button = u"import",
), session_id = self.session_id )
expected_notes = [
( "blah and stuff", "3.<b>3 &nbsp;</b>" ), # ( title, contents )
( "whee", 'hmm\n<a href="%s">foo</a>' % note_url ), # the note url should not be rewritten
( "4", "5" ),
]
self.__assert_imported_notebook( expected_notes, result, plaintext = False )
def test_import_csv_without_login( self ):