diff --git a/controller/Files.py b/controller/Files.py
index e100fd1..a112d6c 100644
--- a/controller/Files.py
+++ b/controller/Files.py
@@ -883,6 +883,7 @@ class Files( object ):
import csv
table_file = Upload_file.open_file( file_id )
+ table_file.seek( 0 ) # necessary in case the file is opened by another call to parse_csv()
sniffer = csv.Sniffer()
# attempt to determine the presence of a header
diff --git a/controller/Html_cleaner.py b/controller/Html_cleaner.py
index 3c61cc0..9bbfb3d 100644
--- a/controller/Html_cleaner.py
+++ b/controller/Html_cleaner.py
@@ -1,5 +1,6 @@
# originally from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/496942
+import re
import urlparse
from htmllib import HTMLParser
from cgi import escape
@@ -17,6 +18,8 @@ class Html_cleaner(HTMLParser):
"""
Cleans HTML of any tags not matching a whitelist.
"""
+ NOTE_LINK_URL_PATTERN = re.compile( '[^"]*/notebooks/\w+\?note_id=\w+', re.IGNORECASE )
+
def __init__( self, require_link_target = False ):
HTMLParser.__init__( self, AbstractFormatter( NullWriter() ) )
self.result = []
@@ -194,7 +197,8 @@ class Html_cleaner(HTMLParser):
else:
bt += ' %s=%s' % \
(xssescape(attribute), quoteattr(attrs[attribute]))
- if self.require_link_target and tag == "a" and not attrs.get( 'target' ):
+ if self.require_link_target and tag == "a" and not attrs.get( 'target' ) and \
+ ( not attrs.get( 'href' ) or not self.NOTE_LINK_URL_PATTERN.search( attrs.get( 'href' ) ) ):
bt += ' target="_new"'
if bt == "]+\s+)?href=")[^"]*/notebooks/(\w+)\?note_id=(\w+)("[^>]*>)', re.IGNORECASE )
@expose( view = Json )
@strongly_expire
@@ -1763,7 +1764,7 @@ class Notebooks( object ):
attributes are added to all links without targets, except internal note links.
Internal note links are rewritten such that they point to the newly imported notes. This is
- accomplished by looking for a "note_id" column and determining what note each link points out.
+ accomplished by looking for a "note_id" column and determining what note each link points to.
Then each internal note link is rewritten to point at the new notebook id and note id.
@type file_id: unicode
@@ -1800,6 +1801,15 @@ class Notebooks( object ):
if db_file is None or not self.__users.check_access( user_id, db_file.notebook_id ):
raise Access_error()
+ # if the file has a "note_id" header column, record its index
+ note_id_column = None
+ note_ids = {} # map of original CSV note id to imported note id
+
+ parser = self.__files.parse_csv( file_id, skip_header = False )
+ row = parser.next()
+ if row and u"note_id" in row:
+ note_id_column = row.index( u"note_id" )
+
parser = self.__files.parse_csv( file_id, skip_header = True )
# create a new notebook for the imported notes
@@ -1853,8 +1863,36 @@ class Notebooks( object ):
if title and note.title is None:
note.contents = u"%s
%s" % ( title, note.contents )
+ # if there is a note id column, then map the original CSV note id to its new imported note id
+ if note_id_column:
+ original_note_id = Valid_id( none_okay = True )( row[ note_id_column ].strip() )
+ if original_note_id:
+ note_ids[ original_note_id ] = note_id
+
self.__database.save( note, commit = False )
+ def rewrite_link( match ):
+ ( link_start, original_notebook_id, original_note_id, link_end ) = match.groups()
+
+ note_id = note_ids.get( original_note_id )
+ if note_id:
+ return "%s/notebooks/%s?note_id=%s%s" % ( link_start, notebook.object_id, note_id, link_end )
+
+ # if we don't know how to rewrite the link (for lack of the new note id), then don't rewrite
+ # it and leave the link as it is
+ return "%s/notebooks/%s?note_id=%s%s" % ( link_start, original_notebook_id, original_note_id, link_end )
+
+ # do a pass over all the imported notes to rewrite internal note links so that they point to
+ # the newly imported note ids in the new notebook
+ for ( original_note_id, note_id ) in note_ids.items():
+ note = self.__database.load( Note, note_id )
+
+ if note:
+ ( rewritten_contents, rewritten_count ) = self.NOTE_LINK_PATTERN.subn( rewrite_link, note.contents )
+ if rewritten_count > 0:
+ note.contents = rewritten_contents
+ self.__database.save( note )
+
# delete the CSV file now that it's been imported
self.__database.execute( db_file.sql_delete(), commit = False )
self.__database.uncache( db_file )
diff --git a/controller/test/Test_controller.py b/controller/test/Test_controller.py
index 49a714a..8b5554a 100644
--- a/controller/test/Test_controller.py
+++ b/controller/test/Test_controller.py
@@ -116,6 +116,22 @@ class Test_controller( object ):
u"stream_response": True,
u"encoding_filter.on": False,
},
+ u"/files/download_product": {
+ u"stream_response": True,
+ u"encoding_filter.on": False,
+ },
+ u"/files/thumbnail": {
+ u"stream_response": True,
+ u"encoding_filter.on": False,
+ },
+ u"/files/image": {
+ u"stream_response": True,
+ u"encoding_filter.on": False,
+ },
+ u"/notebooks/export_csv": {
+ u"stream_response": True,
+ u"encoding_filter.on": False,
+ },
u"/files/progress": {
u"stream_response": True,
},
diff --git a/controller/test/Test_notebooks.py b/controller/test/Test_notebooks.py
index a2c7d19..23faeab 100644
--- a/controller/test/Test_notebooks.py
+++ b/controller/test/Test_notebooks.py
@@ -1,4 +1,8 @@
+# -*- coding: utf8 -*-
+
import re
+import csv
+import types
import cherrypy
import urllib
from nose.tools import raises
@@ -3485,23 +3489,36 @@ class Test_notebooks( Test_controller ):
assert result.get( "notebook_name" ) == self.notebook.name
notes = result.get( "notes" )
- assert len( notes ) == len( self.notebook.notes )
+ assert len( notes ) == self.database.select_one( int, self.notebook.sql_count_notes() )
startup_note_allowed = True
previous_revision = None
# assert that startup notes come first, then normal notes in descending revision order
for note in notes:
- if self.notebook.is_startup_note( note ):
+ if note.startup:
assert startup_note_allowed
else:
startup_note_allowed = False
- assert note in self.notebook.notes
+
if previous_revision:
assert note.revision < previous_revision
previous_revision = note.revision
-
- def test_export_html( self ):
+
+ db_note = self.database.load( Note, note.object_id )
+ assert db_note
+ assert note.object_id == db_note.object_id
+ assert note.revision == db_note.revision
+ assert note.title == db_note.title
+ assert note.contents == db_note.contents
+ assert note.notebook_id == db_note.notebook_id
+ assert note.startup == db_note.startup
+ assert note.deleted_from_id == db_note.deleted_from_id
+ assert note.rank == db_note.rank
+ assert note.user_id == db_note.user_id
+ assert note.creation == db_note.creation
+
+ def test_export_html_without_login( self ):
note3 = Note.create( "55", u"blah
foo", notebook_id = self.notebook.object_id )
self.database.save( note3 )
@@ -3525,6 +3542,103 @@ class Test_notebooks( Test_controller ):
assert result.get( "error" )
+ def test_export_csv( self, note_text = None ):
+ self.login()
+
+ if not note_text:
+ note_text = u"foo"
+
+ note3 = Note.create( "55", u"blah
%s" % note_text, notebook_id = self.notebook.object_id )
+ self.database.save( note3 )
+
+ result = self.http_get(
+ "/notebooks/export_csv/%s" % self.notebook.object_id,
+ session_id = self.session_id,
+ )
+
+ headers = result[ u"headers" ]
+ assert headers
+ assert headers[ u"Content-Type" ] == u"text/csv;charset=utf-8"
+ assert headers[ u"Content-Disposition" ] == 'attachment; filename=wiki.csv'
+
+ gen = result[ u"body" ]
+ assert isinstance( gen, types.GeneratorType )
+ pieces = []
+
+ try:
+ for piece in gen:
+ pieces.append( piece )
+ except AttributeError, exc:
+ if u"session_storage" not in str( exc ):
+ raise exc
+
+ csv_data = "".join( pieces )
+ reader = csv.reader( StringIO( csv_data ) )
+
+ row = reader.next()
+ expected_header = [ u"contents", u"title", u"note_id", u"startup", u"username", u"revision_date" ]
+ assert row == expected_header
+
+ expected_note_count = self.database.select_one( int, self.notebook.sql_count_notes() )
+ note_count = 0
+ startup_note_allowed = True
+ previous_revision = None
+
+ # assert that startup notes come first, then normal notes in descending revision order
+ for row in reader:
+ note_count += 1
+
+ assert len( row ) == len( expected_header )
+ ( contents, title, note_id, startup, username, revision_date ) = row
+
+ if startup:
+ assert startup_note_allowed
+ else:
+ startup_note_allowed = False
+
+ if previous_revision:
+ assert revision_date < previous_revision
+
+ previous_revision = revision_date
+
+ db_note = self.database.load( Note, note_id )
+ assert db_note
+ assert contents.decode( "utf8" ) == db_note.contents
+ assert title.decode( "utf8" ) == db_note.title
+ assert note_id.decode( "utf8" ) == db_note.object_id
+ assert startup.decode( "utf8" ) == db_note.startup and u"1" or "0"
+ assert username.decode( "utf8" ) == ( db_note.user_id and self.user.username or u"" )
+ assert revision_date.decode( "utf8" ) == unicode( db_note.revision )
+
+ assert note_count == expected_note_count
+
+ def test_export_csv_with_unicode( self ):
+ self.test_export_csv( note_text = u"ümlaut.png" )
+
+ def test_export_csv_without_login( self ):
+ note3 = Note.create( "55", u"blah
foo", notebook_id = self.notebook.object_id )
+ self.database.save( note3 )
+
+ path = "/notebooks/export_csv/%s" % self.notebook.object_id
+ result = self.http_get(
+ path,
+ session_id = self.session_id,
+ )
+
+ headers = result.get( "headers" )
+ assert headers
+ assert headers.get( "Location" ) == u"http:///login?after_login=%s" % urllib.quote( path )
+
+ def test_export_csv_with_unknown_notebook( self ):
+ self.login()
+
+ result = self.http_get(
+ "/notebooks/export_csv/%s" % self.unknown_notebook_id,
+ session_id = self.session_id,
+ )
+
+ assert u"access" in result[ u"body" ][ 0 ]
+
def test_create( self ):
self.login()
@@ -4350,7 +4464,7 @@ class Test_notebooks( Test_controller ):
self.__assert_imported_notebook( expected_notes, result )
LINK_PATTERN = re.compile( ']*)>([^<]*)', re.IGNORECASE )
- NOTE_URL_PATTERN = re.compile( '(.*)/notebooks/([^?]+)\?note_id=(.*)', re.IGNORECASE )
+ NOTE_URL_PATTERN = re.compile( '([^"]*)/notebooks/(\w+)\?note_id=(\w+)', re.IGNORECASE )
def __assert_imported_notebook( self, expected_notes, result, plaintext = True ):
assert result[ u"redirect" ].startswith( u"/notebooks/" )
@@ -4394,11 +4508,19 @@ class Test_notebooks( Test_controller ):
url_match = self.NOTE_URL_PATTERN.search( url )
if url_match:
+ imported_notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" )
( protocol_and_host, notebook_id, note_id ) = url_match.groups()
assert attributes == u""
assert protocol_and_host == u""
- assert notebook_id == self.notebook.object_id
- assert note_id # TODO: assert that the note id has been rewritten properly
+
+ # assert that the link has been rewritten to point to a note in the new notebook
+ assert note_id
+ rewritten_note = self.database.load( Note, note_id )
+ if rewritten_note:
+ assert rewritten_note.notebook_id == imported_notebook.object_id
+ assert notebook_id == imported_notebook.object_id
+ else:
+ assert notebook_id == self.notebook.object_id
else:
assert attributes.startswith( u'target="' )
@@ -5064,11 +5186,6 @@ class Test_notebooks( Test_controller ):
# one of the imported notes contains a link to one of the other imported notes
note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" )
csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.3 ",idone\n"8","whee","hmm\nfoo",idtwo\n3,4,5,idthree' % note_url
- expected_notes = [
- ( "blah and stuff", "3.3 " ), # ( title, contents )
- ( "whee", 'hmm\nfoo' % note_url ), # TODO: expect rewritten URL instead
- ( "4", "5" ),
- ]
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
@@ -5090,6 +5207,125 @@ class Test_notebooks( Test_controller ):
import_button = u"import",
), session_id = self.session_id )
+ notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" )
+ note = self.database.select_one( Note, notebook.sql_load_note_by_title( u"4" ) )
+
+ rewritten_note_url = "/notebooks/%s?note_id=%s" % ( notebook.object_id, note.object_id )
+ expected_notes = [
+ ( "blah and stuff", "3.3 " ), # ( title, contents )
+ ( "4", "5" ),
+ ( "whee", 'hmm\nfoo' % rewritten_note_url ),
+ ]
+
+ self.__assert_imported_notebook( expected_notes, result, plaintext = False )
+
+ def test_import_csv_html_content_with_internal_note_link_and_blank_note_id_value( self ):
+ self.login()
+
+ # one of the imported notes contains a link to one of the other imported notes
+ note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" )
+ csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.3 ",\n"8","whee","hmm\nfoo",idtwo\n3,4,5,idthree' % note_url
+
+ self.http_upload(
+ "/files/upload?file_id=%s" % self.file_id,
+ dict(
+ notebook_id = self.notebook.object_id,
+ note_id = self.note.object_id,
+ ),
+ filename = self.filename,
+ file_data = csv_data,
+ content_type = self.content_type,
+ session_id = self.session_id,
+ )
+
+ result = self.http_post( "/notebooks/import_csv/", dict(
+ file_id = self.file_id,
+ content_column = 2,
+ title_column = 1,
+ plaintext = False,
+ import_button = u"import",
+ ), session_id = self.session_id )
+
+ notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" )
+ note = self.database.select_one( Note, notebook.sql_load_note_by_title( u"4" ) )
+
+ rewritten_note_url = "/notebooks/%s?note_id=%s" % ( notebook.object_id, note.object_id )
+ expected_notes = [
+ ( "blah and stuff", "3.3 " ), # ( title, contents )
+ ( "4", "5" ),
+ ( "whee", 'hmm\nfoo' % rewritten_note_url ),
+ ]
+
+ self.__assert_imported_notebook( expected_notes, result, plaintext = False )
+
+ def test_import_csv_html_content_with_internal_note_link_to_unknown_note( self ):
+ self.login()
+
+ # one of the imported notes contains a link to one of the other imported notes
+ note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idunknown" )
+ csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.3 ",idone\n"8","whee","hmm\nfoo",idtwo\n3,4,5,idthree' % note_url
+
+ self.http_upload(
+ "/files/upload?file_id=%s" % self.file_id,
+ dict(
+ notebook_id = self.notebook.object_id,
+ note_id = self.note.object_id,
+ ),
+ filename = self.filename,
+ file_data = csv_data,
+ content_type = self.content_type,
+ session_id = self.session_id,
+ )
+
+ result = self.http_post( "/notebooks/import_csv/", dict(
+ file_id = self.file_id,
+ content_column = 2,
+ title_column = 1,
+ plaintext = False,
+ import_button = u"import",
+ ), session_id = self.session_id )
+
+ expected_notes = [
+ ( "blah and stuff", "3.3 " ), # ( title, contents )
+ ( "4", "5" ),
+ ( "whee", 'hmm\nfoo' % note_url ), # the note url should not be rewritten
+ ]
+
+ self.__assert_imported_notebook( expected_notes, result, plaintext = False )
+
+ def test_import_csv_html_content_with_internal_note_link_without_note_id_column( self ):
+ self.login()
+
+ # one of the imported notes contains a link to one of the other imported notes
+ note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" )
+ csv_data = '"label 1","label 2","label 3",\n5,"blah and stuff","3.3 "\n"8","whee","hmm\nfoo"\n3,4,5' % note_url
+
+ self.http_upload(
+ "/files/upload?file_id=%s" % self.file_id,
+ dict(
+ notebook_id = self.notebook.object_id,
+ note_id = self.note.object_id,
+ ),
+ filename = self.filename,
+ file_data = csv_data,
+ content_type = self.content_type,
+ session_id = self.session_id,
+ )
+
+ result = self.http_post( "/notebooks/import_csv/", dict(
+ file_id = self.file_id,
+ content_column = 2,
+ title_column = 1,
+ plaintext = False,
+ import_button = u"import",
+ ), session_id = self.session_id )
+
+ expected_notes = [
+ ( "blah and stuff", "3.3 " ), # ( title, contents )
+ ( "whee", 'hmm\nfoo' % note_url ), # the note url should not be rewritten
+ ( "4", "5" ),
+ ]
+
self.__assert_imported_notebook( expected_notes, result, plaintext = False )
def test_import_csv_without_login( self ):