From 38f4772a6a9ccc593f4d8f31e98cde57217a1605 Mon Sep 17 00:00:00 2001 From: Dan Helfman Date: Fri, 26 Sep 2008 21:30:40 -0700 Subject: [PATCH] Completed note link rewriting support for CSV importing. Also completed unit tests for that and CSV exporting. --- controller/Files.py | 1 + controller/Html_cleaner.py | 6 +- controller/Notebooks.py | 44 ++++- controller/test/Test_controller.py | 16 ++ controller/test/Test_notebooks.py | 262 +++++++++++++++++++++++++++-- 5 files changed, 312 insertions(+), 17 deletions(-) diff --git a/controller/Files.py b/controller/Files.py index e100fd1..a112d6c 100644 --- a/controller/Files.py +++ b/controller/Files.py @@ -883,6 +883,7 @@ class Files( object ): import csv table_file = Upload_file.open_file( file_id ) + table_file.seek( 0 ) # necessary in case the file is opened by another call to parse_csv() sniffer = csv.Sniffer() # attempt to determine the presence of a header diff --git a/controller/Html_cleaner.py b/controller/Html_cleaner.py index 3c61cc0..9bbfb3d 100644 --- a/controller/Html_cleaner.py +++ b/controller/Html_cleaner.py @@ -1,5 +1,6 @@ # originally from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/496942 +import re import urlparse from htmllib import HTMLParser from cgi import escape @@ -17,6 +18,8 @@ class Html_cleaner(HTMLParser): """ Cleans HTML of any tags not matching a whitelist. """ + NOTE_LINK_URL_PATTERN = re.compile( '[^"]*/notebooks/\w+\?note_id=\w+', re.IGNORECASE ) + def __init__( self, require_link_target = False ): HTMLParser.__init__( self, AbstractFormatter( NullWriter() ) ) self.result = [] @@ -194,7 +197,8 @@ class Html_cleaner(HTMLParser): else: bt += ' %s=%s' % \ (xssescape(attribute), quoteattr(attrs[attribute])) - if self.require_link_target and tag == "a" and not attrs.get( 'target' ): + if self.require_link_target and tag == "a" and not attrs.get( 'target' ) and \ + ( not attrs.get( 'href' ) or not self.NOTE_LINK_URL_PATTERN.search( attrs.get( 'href' ) ) ): bt += ' target="_new"' if bt == "]+\s+)?href=")[^"]*/notebooks/(\w+)\?note_id=(\w+)("[^>]*>)', re.IGNORECASE ) @expose( view = Json ) @strongly_expire @@ -1763,7 +1764,7 @@ class Notebooks( object ): attributes are added to all links without targets, except internal note links. Internal note links are rewritten such that they point to the newly imported notes. This is - accomplished by looking for a "note_id" column and determining what note each link points out. + accomplished by looking for a "note_id" column and determining what note each link points to. Then each internal note link is rewritten to point at the new notebook id and note id. @type file_id: unicode @@ -1800,6 +1801,15 @@ class Notebooks( object ): if db_file is None or not self.__users.check_access( user_id, db_file.notebook_id ): raise Access_error() + # if the file has a "note_id" header column, record its index + note_id_column = None + note_ids = {} # map of original CSV note id to imported note id + + parser = self.__files.parse_csv( file_id, skip_header = False ) + row = parser.next() + if row and u"note_id" in row: + note_id_column = row.index( u"note_id" ) + parser = self.__files.parse_csv( file_id, skip_header = True ) # create a new notebook for the imported notes @@ -1853,8 +1863,36 @@ class Notebooks( object ): if title and note.title is None: note.contents = u"

%s

%s" % ( title, note.contents ) + # if there is a note id column, then map the original CSV note id to its new imported note id + if note_id_column: + original_note_id = Valid_id( none_okay = True )( row[ note_id_column ].strip() ) + if original_note_id: + note_ids[ original_note_id ] = note_id + self.__database.save( note, commit = False ) + def rewrite_link( match ): + ( link_start, original_notebook_id, original_note_id, link_end ) = match.groups() + + note_id = note_ids.get( original_note_id ) + if note_id: + return "%s/notebooks/%s?note_id=%s%s" % ( link_start, notebook.object_id, note_id, link_end ) + + # if we don't know how to rewrite the link (for lack of the new note id), then don't rewrite + # it and leave the link as it is + return "%s/notebooks/%s?note_id=%s%s" % ( link_start, original_notebook_id, original_note_id, link_end ) + + # do a pass over all the imported notes to rewrite internal note links so that they point to + # the newly imported note ids in the new notebook + for ( original_note_id, note_id ) in note_ids.items(): + note = self.__database.load( Note, note_id ) + + if note: + ( rewritten_contents, rewritten_count ) = self.NOTE_LINK_PATTERN.subn( rewrite_link, note.contents ) + if rewritten_count > 0: + note.contents = rewritten_contents + self.__database.save( note ) + # delete the CSV file now that it's been imported self.__database.execute( db_file.sql_delete(), commit = False ) self.__database.uncache( db_file ) diff --git a/controller/test/Test_controller.py b/controller/test/Test_controller.py index 49a714a..8b5554a 100644 --- a/controller/test/Test_controller.py +++ b/controller/test/Test_controller.py @@ -116,6 +116,22 @@ class Test_controller( object ): u"stream_response": True, u"encoding_filter.on": False, }, + u"/files/download_product": { + u"stream_response": True, + u"encoding_filter.on": False, + }, + u"/files/thumbnail": { + u"stream_response": True, + u"encoding_filter.on": False, + }, + u"/files/image": { + u"stream_response": True, + u"encoding_filter.on": False, + }, + u"/notebooks/export_csv": { + u"stream_response": True, + u"encoding_filter.on": False, + }, u"/files/progress": { u"stream_response": True, }, diff --git a/controller/test/Test_notebooks.py b/controller/test/Test_notebooks.py index a2c7d19..23faeab 100644 --- a/controller/test/Test_notebooks.py +++ b/controller/test/Test_notebooks.py @@ -1,4 +1,8 @@ +# -*- coding: utf8 -*- + import re +import csv +import types import cherrypy import urllib from nose.tools import raises @@ -3485,23 +3489,36 @@ class Test_notebooks( Test_controller ): assert result.get( "notebook_name" ) == self.notebook.name notes = result.get( "notes" ) - assert len( notes ) == len( self.notebook.notes ) + assert len( notes ) == self.database.select_one( int, self.notebook.sql_count_notes() ) startup_note_allowed = True previous_revision = None # assert that startup notes come first, then normal notes in descending revision order for note in notes: - if self.notebook.is_startup_note( note ): + if note.startup: assert startup_note_allowed else: startup_note_allowed = False - assert note in self.notebook.notes + if previous_revision: assert note.revision < previous_revision previous_revision = note.revision - - def test_export_html( self ): + + db_note = self.database.load( Note, note.object_id ) + assert db_note + assert note.object_id == db_note.object_id + assert note.revision == db_note.revision + assert note.title == db_note.title + assert note.contents == db_note.contents + assert note.notebook_id == db_note.notebook_id + assert note.startup == db_note.startup + assert note.deleted_from_id == db_note.deleted_from_id + assert note.rank == db_note.rank + assert note.user_id == db_note.user_id + assert note.creation == db_note.creation + + def test_export_html_without_login( self ): note3 = Note.create( "55", u"

blah

foo", notebook_id = self.notebook.object_id ) self.database.save( note3 ) @@ -3525,6 +3542,103 @@ class Test_notebooks( Test_controller ): assert result.get( "error" ) + def test_export_csv( self, note_text = None ): + self.login() + + if not note_text: + note_text = u"foo" + + note3 = Note.create( "55", u"

blah

%s" % note_text, notebook_id = self.notebook.object_id ) + self.database.save( note3 ) + + result = self.http_get( + "/notebooks/export_csv/%s" % self.notebook.object_id, + session_id = self.session_id, + ) + + headers = result[ u"headers" ] + assert headers + assert headers[ u"Content-Type" ] == u"text/csv;charset=utf-8" + assert headers[ u"Content-Disposition" ] == 'attachment; filename=wiki.csv' + + gen = result[ u"body" ] + assert isinstance( gen, types.GeneratorType ) + pieces = [] + + try: + for piece in gen: + pieces.append( piece ) + except AttributeError, exc: + if u"session_storage" not in str( exc ): + raise exc + + csv_data = "".join( pieces ) + reader = csv.reader( StringIO( csv_data ) ) + + row = reader.next() + expected_header = [ u"contents", u"title", u"note_id", u"startup", u"username", u"revision_date" ] + assert row == expected_header + + expected_note_count = self.database.select_one( int, self.notebook.sql_count_notes() ) + note_count = 0 + startup_note_allowed = True + previous_revision = None + + # assert that startup notes come first, then normal notes in descending revision order + for row in reader: + note_count += 1 + + assert len( row ) == len( expected_header ) + ( contents, title, note_id, startup, username, revision_date ) = row + + if startup: + assert startup_note_allowed + else: + startup_note_allowed = False + + if previous_revision: + assert revision_date < previous_revision + + previous_revision = revision_date + + db_note = self.database.load( Note, note_id ) + assert db_note + assert contents.decode( "utf8" ) == db_note.contents + assert title.decode( "utf8" ) == db_note.title + assert note_id.decode( "utf8" ) == db_note.object_id + assert startup.decode( "utf8" ) == db_note.startup and u"1" or "0" + assert username.decode( "utf8" ) == ( db_note.user_id and self.user.username or u"" ) + assert revision_date.decode( "utf8" ) == unicode( db_note.revision ) + + assert note_count == expected_note_count + + def test_export_csv_with_unicode( self ): + self.test_export_csv( note_text = u"ümlaut.png" ) + + def test_export_csv_without_login( self ): + note3 = Note.create( "55", u"

blah

foo", notebook_id = self.notebook.object_id ) + self.database.save( note3 ) + + path = "/notebooks/export_csv/%s" % self.notebook.object_id + result = self.http_get( + path, + session_id = self.session_id, + ) + + headers = result.get( "headers" ) + assert headers + assert headers.get( "Location" ) == u"http:///login?after_login=%s" % urllib.quote( path ) + + def test_export_csv_with_unknown_notebook( self ): + self.login() + + result = self.http_get( + "/notebooks/export_csv/%s" % self.unknown_notebook_id, + session_id = self.session_id, + ) + + assert u"access" in result[ u"body" ][ 0 ] + def test_create( self ): self.login() @@ -4350,7 +4464,7 @@ class Test_notebooks( Test_controller ): self.__assert_imported_notebook( expected_notes, result ) LINK_PATTERN = re.compile( ']*)>([^<]*)', re.IGNORECASE ) - NOTE_URL_PATTERN = re.compile( '(.*)/notebooks/([^?]+)\?note_id=(.*)', re.IGNORECASE ) + NOTE_URL_PATTERN = re.compile( '([^"]*)/notebooks/(\w+)\?note_id=(\w+)', re.IGNORECASE ) def __assert_imported_notebook( self, expected_notes, result, plaintext = True ): assert result[ u"redirect" ].startswith( u"/notebooks/" ) @@ -4394,11 +4508,19 @@ class Test_notebooks( Test_controller ): url_match = self.NOTE_URL_PATTERN.search( url ) if url_match: + imported_notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" ) ( protocol_and_host, notebook_id, note_id ) = url_match.groups() assert attributes == u"" assert protocol_and_host == u"" - assert notebook_id == self.notebook.object_id - assert note_id # TODO: assert that the note id has been rewritten properly + + # assert that the link has been rewritten to point to a note in the new notebook + assert note_id + rewritten_note = self.database.load( Note, note_id ) + if rewritten_note: + assert rewritten_note.notebook_id == imported_notebook.object_id + assert notebook_id == imported_notebook.object_id + else: + assert notebook_id == self.notebook.object_id else: assert attributes.startswith( u'target="' ) @@ -5064,11 +5186,6 @@ class Test_notebooks( Test_controller ): # one of the imported notes contains a link to one of the other imported notes note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" ) csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.3  ",idone\n"8","whee","hmm\nfoo",idtwo\n3,4,5,idthree' % note_url - expected_notes = [ - ( "blah and stuff", "3.3  " ), # ( title, contents ) - ( "whee", 'hmm\nfoo' % note_url ), # TODO: expect rewritten URL instead - ( "4", "5" ), - ] self.http_upload( "/files/upload?file_id=%s" % self.file_id, @@ -5090,6 +5207,125 @@ class Test_notebooks( Test_controller ): import_button = u"import", ), session_id = self.session_id ) + notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" ) + note = self.database.select_one( Note, notebook.sql_load_note_by_title( u"4" ) ) + + rewritten_note_url = "/notebooks/%s?note_id=%s" % ( notebook.object_id, note.object_id ) + expected_notes = [ + ( "blah and stuff", "3.3  " ), # ( title, contents ) + ( "4", "5" ), + ( "whee", 'hmm\nfoo' % rewritten_note_url ), + ] + + self.__assert_imported_notebook( expected_notes, result, plaintext = False ) + + def test_import_csv_html_content_with_internal_note_link_and_blank_note_id_value( self ): + self.login() + + # one of the imported notes contains a link to one of the other imported notes + note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" ) + csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.3  ",\n"8","whee","hmm\nfoo",idtwo\n3,4,5,idthree' % note_url + + self.http_upload( + "/files/upload?file_id=%s" % self.file_id, + dict( + notebook_id = self.notebook.object_id, + note_id = self.note.object_id, + ), + filename = self.filename, + file_data = csv_data, + content_type = self.content_type, + session_id = self.session_id, + ) + + result = self.http_post( "/notebooks/import_csv/", dict( + file_id = self.file_id, + content_column = 2, + title_column = 1, + plaintext = False, + import_button = u"import", + ), session_id = self.session_id ) + + notebook = self.database.select_one( Notebook, "select * from notebook where name = 'imported notebook' limit 1;" ) + note = self.database.select_one( Note, notebook.sql_load_note_by_title( u"4" ) ) + + rewritten_note_url = "/notebooks/%s?note_id=%s" % ( notebook.object_id, note.object_id ) + expected_notes = [ + ( "blah and stuff", "3.3  " ), # ( title, contents ) + ( "4", "5" ), + ( "whee", 'hmm\nfoo' % rewritten_note_url ), + ] + + self.__assert_imported_notebook( expected_notes, result, plaintext = False ) + + def test_import_csv_html_content_with_internal_note_link_to_unknown_note( self ): + self.login() + + # one of the imported notes contains a link to one of the other imported notes + note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idunknown" ) + csv_data = '"label 1","label 2","label 3","note_id",\n5,"blah and stuff","3.3  ",idone\n"8","whee","hmm\nfoo",idtwo\n3,4,5,idthree' % note_url + + self.http_upload( + "/files/upload?file_id=%s" % self.file_id, + dict( + notebook_id = self.notebook.object_id, + note_id = self.note.object_id, + ), + filename = self.filename, + file_data = csv_data, + content_type = self.content_type, + session_id = self.session_id, + ) + + result = self.http_post( "/notebooks/import_csv/", dict( + file_id = self.file_id, + content_column = 2, + title_column = 1, + plaintext = False, + import_button = u"import", + ), session_id = self.session_id ) + + expected_notes = [ + ( "blah and stuff", "3.3  " ), # ( title, contents ) + ( "4", "5" ), + ( "whee", 'hmm\nfoo' % note_url ), # the note url should not be rewritten + ] + + self.__assert_imported_notebook( expected_notes, result, plaintext = False ) + + def test_import_csv_html_content_with_internal_note_link_without_note_id_column( self ): + self.login() + + # one of the imported notes contains a link to one of the other imported notes + note_url = "/notebooks/%s?note_id=%s" % ( self.notebook.object_id, "idthree" ) + csv_data = '"label 1","label 2","label 3",\n5,"blah and stuff","3.3  "\n"8","whee","hmm\nfoo"\n3,4,5' % note_url + + self.http_upload( + "/files/upload?file_id=%s" % self.file_id, + dict( + notebook_id = self.notebook.object_id, + note_id = self.note.object_id, + ), + filename = self.filename, + file_data = csv_data, + content_type = self.content_type, + session_id = self.session_id, + ) + + result = self.http_post( "/notebooks/import_csv/", dict( + file_id = self.file_id, + content_column = 2, + title_column = 1, + plaintext = False, + import_button = u"import", + ), session_id = self.session_id ) + + expected_notes = [ + ( "blah and stuff", "3.3  " ), # ( title, contents ) + ( "whee", 'hmm\nfoo' % note_url ), # the note url should not be rewritten + ( "4", "5" ), + ] + self.__assert_imported_notebook( expected_notes, result, plaintext = False ) def test_import_csv_without_login( self ):