Archived
1
0

Wrote initial unit tests for new Files controller. A few more still needed.

This commit is contained in:
Dan Helfman 2008-02-06 01:34:12 +00:00
parent 90c83ad1e5
commit 1398e89f96
6 changed files with 291 additions and 14 deletions

View File

@ -63,7 +63,7 @@ settings = {
"""
""",
},
"/files/upload_file": {
"/files/upload": {
"stream_response": True
},
}

View File

@ -84,7 +84,7 @@ class Files( object ):
note_id = Valid_id(),
user_id = Valid_id( none_okay = True ),
)
def upload_file( self, upload, notebook_id, note_id, user_id ):
def upload( self, upload, notebook_id, note_id, user_id ):
"""
Upload a file from the client for attachment to a particular note.
@ -107,8 +107,7 @@ class Files( object ):
cherrypy.server.max_request_body_size = 0 # remove file size limit of 100 MB
cherrypy.response.timeout = 3600 # increase upload timeout to one hour (default is 5 min)
cherrypy.server.socket_timeout = 60 # increase socket timeout to one minute (default is 10 sec)
# TODO: increase to 8k
CHUNK_SIZE = 1#8 * 1024 # 8 Kb
CHUNK_SIZE = 8 * 1024 # 8 Kb
headers = {}
for key, val in cherrypy.request.headers.iteritems():
@ -142,7 +141,7 @@ class Files( object ):
<script type="text/javascript" src="/static/js/MochiKit.js"></script>
<meta content="text/html; charset=UTF-8" http_equiv="content-type" />
</head>
<body onload="withDocument( window.parent.document, function () { getElement( 'upload_frame' ).pulldown.upload_complete(); } );">
<body>
"""
if not filename:
@ -150,6 +149,7 @@ class Files( object ):
u"""
<div class="field_label">upload error: </div>
Please check that the filename is valid.
</body></html>
"""
return
@ -188,31 +188,38 @@ class Files( object ):
if fraction_done > fraction_reported + tick_increment:
yield '<script type="text/javascript">tick(%s);</script>' % fraction_reported
fraction_reported += tick_increment
import time
time.sleep(0.05) # TODO: removeme
fraction_reported = fraction_done
# TODO: write to the database
if fraction_reported == 0:
yield "An error occurred when uploading the file."
yield "An error occurred when uploading the file.</body></html>"
return
# the file finished uploading, so fill out the progress meter to 100%
if fraction_reported < 1.0:
yield '<script type="text/javascript">tick(1.0);</script>'
# the setTimeout() below ensures that the 100% progress bar is displayed for at least a moment
yield \
u"""
<script type="text/javascript">
setTimeout( 'withDocument( window.parent.document, function () { getElement( "upload_frame" ).pulldown.upload_complete(); } );', 10 );
</script>
</body>
</html>
"""
upload.file.close()
cherrypy.request.rfile.close()
# release the session lock before beginning the upload, because if the upload is cancelled
# before it's done, the lock won't be released
cherrypy.session.release_lock()
return process_upload()
def stats( file_id ):
pass
def rename( file_id, filename ):
pass

View File

@ -22,7 +22,7 @@ class Root( object ):
"""
The root of the controller hierarchy, corresponding to the "/" URL.
"""
def __init__( self, database, settings ):
def __init__( self, database, settings, suppress_exceptions = False ):
"""
Create a new Root object with the given settings.
@ -45,6 +45,7 @@ class Root( object ):
)
self.__notebooks = Notebooks( database, self.__users )
self.__files = Files( database, self.__users )
self.__suppress_exceptions = suppress_exceptions # used for unit tests
@expose( Main_page )
@grab_user_id
@ -310,7 +311,8 @@ class Root( object ):
return
import traceback
traceback.print_exc()
if not self.__suppress_exceptions:
traceback.print_exc()
self.report_traceback()
import sys

View File

@ -307,6 +307,8 @@ class Test_controller( object ):
self.database = Stub_database()
self.settings = {
u"global": {
u"session_filter.on": True,
u"session_filter.storage_type": u"ram",
u"luminotes.http_url" : u"http://luminotes.com",
u"luminotes.https_url" : u"https://luminotes.com",
u"luminotes.http_proxy_ip" : u"127.0.0.1",
@ -330,9 +332,12 @@ class Test_controller( object ):
},
],
},
u"/files/upload": {
u"stream_response": True
},
}
cherrypy.root = Root( self.database, self.settings )
cherrypy.root = Root( self.database, self.settings, suppress_exceptions = True )
cherrypy.config.update( Common.settings )
cherrypy.config.update( { u"server.log_to_screen": False } )
cherrypy.server.start( init_only = True, server_class = None )
@ -369,6 +374,7 @@ class Test_controller( object ):
try:
if Stub_view.result is not None:
result = Stub_view.result
Stub_view.result = None
else:
result = dict(
status = response.status,
@ -408,6 +414,61 @@ class Test_controller( object ):
try:
if Stub_view.result is not None:
result = Stub_view.result
Stub_view.result = None
else:
result = dict(
status = response.status,
headers = response.headers,
body = response.body,
)
result[ u"session_id" ] = session_id
return result
finally:
request.close()
def http_upload( self, http_path, form_args, filename, file_data, headers = None, session_id = None ):
"""
Perform an HTTP POST with the given path on the test server, sending the provided form_args
and file_data as a multipart form file upload. Return the result dict as returned by the
invoked method.
"""
boundary = "boundarygoeshere"
post_data = [ "--%s\n" % boundary ]
for ( name, value ) in form_args.items():
post_data.append( 'Content-Disposition: form-data; name="%s"\n\n%s\n--%s\n' % (
name, value, boundary
) )
post_data.append( 'Content-Disposition: form-data; name="upload"; filename="%s"\n' % (
filename
) )
post_data.append( "Content-Type: image/png\n\n%s\n--%s--\n" % (
file_data, boundary
) )
if headers is None:
headers = []
post_data = str( "".join( post_data ) )
headers.extend( [
( "Content-Type", "multipart/form-data; boundary=%s" % boundary ),
( "Content-Length", str( len( post_data ) ) ),
] )
if session_id:
headers.append( ( u"Cookie", "session_id=%s" % session_id ) ) # will break if unicode is used for the value
request = cherrypy.server.request( ( u"127.0.0.1", 1234 ), u"127.0.0.5" )
response = request.run( "POST %s HTTP/1.0" % str( http_path ), headers = headers, rfile = StringIO( post_data ) )
session_id = response.simple_cookie.get( u"session_id" )
if session_id: session_id = session_id.value
try:
if Stub_view.result is not None:
result = Stub_view.result
Stub_view.result = None
else:
result = dict(
status = response.status,

View File

@ -0,0 +1,207 @@
import types
import cherrypy
from Test_controller import Test_controller
from model.Notebook import Notebook
from model.Note import Note
from model.User import User
from model.Invite import Invite
from controller.Notebooks import Access_error
class Test_files( Test_controller ):
def setUp( self ):
Test_controller.setUp( self )
self.notebook = None
self.anon_notebook = None
self.username = u"mulder"
self.password = u"trustno1"
self.email_address = u"outthere@example.com"
self.username2 = u"deepthroat"
self.password2 = u"mmmtobacco"
self.email_address2 = u"parkinglot@example.com"
self.user = None
self.user2 = None
self.anonymous = None
self.session_id = None
self.filename = "file.png"
self.file_data = "foobar\x07`-=[]\;',./~!@#$%^&*()_+{}|:\"<>?" * 100
self.make_users()
self.make_notebooks()
self.database.commit()
def make_notebooks( self ):
user_id = self.user.object_id
self.trash = Notebook.create( self.database.next_id( Notebook ), u"trash", user_id = user_id )
self.database.save( self.trash, commit = False )
self.notebook = Notebook.create( self.database.next_id( Notebook ), u"notebook", self.trash.object_id, user_id = user_id )
self.database.save( self.notebook, commit = False )
note_id = self.database.next_id( Note )
self.note = Note.create( note_id, u"<h3>my title</h3>blah", notebook_id = self.notebook.object_id, startup = True, user_id = user_id )
self.database.save( self.note, commit = False )
self.anon_notebook = Notebook.create( self.database.next_id( Notebook ), u"anon_notebook", user_id = user_id )
self.database.save( self.anon_notebook, commit = False )
self.database.execute( self.user.sql_save_notebook( self.notebook.object_id, read_write = True, owner = True ) )
self.database.execute( self.user.sql_save_notebook( self.notebook.trash_id, read_write = True, owner = True ) )
self.database.execute( self.user.sql_save_notebook( self.anon_notebook.object_id, read_write = False, owner = False ) )
def make_users( self ):
self.user = User.create( self.database.next_id( User ), self.username, self.password, self.email_address )
self.database.save( self.user, commit = False )
self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 )
self.database.save( self.user2, commit = False )
self.anonymous = User.create( self.database.next_id( User ), u"anonymous" )
self.database.save( self.anonymous, commit = False )
def test_upload_page( self ):
result = self.http_get(
"/files/upload_page?notebook_id=%s&note_id=%s" % ( self.notebook.object_id, self.note.object_id ),
)
assert result.get( u"notebook_id" ) == self.notebook.object_id
assert result.get( u"note_id" ) == self.note.object_id
def test_upload_file( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
session_id = self.session_id,
)
gen = result[ u"body" ]
assert isinstance( gen, types.GeneratorType )
tick_count = 0
tick_done = False
try:
for piece in gen:
if u"tick(" in piece:
tick_count += 1
if u"tick(1.0)" in piece:
tick_done = True
# during this unit test, full session info isn't available, so swallow an expected
# exception about session_storage
except AttributeError, exc:
if u"session_storage" not in str( exc ):
raise exc
# assert that the progress bar is moving, and then completes
assert tick_count >= 3
assert tick_done
# TODO: assert that the uploaded file actually got stored somewhere
def test_upload_file_without_login( self ):
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
session_id = self.session_id,
)
assert u"access" in result.get( u"body" )[ 0 ]
def test_upload_file_without_access( self ):
self.login2()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
session_id = self.session_id,
)
assert u"access" in result.get( u"body" )[ 0 ]
def assert_inline_error( self, result ):
gen = result[ u"body" ]
assert isinstance( gen, types.GeneratorType )
found_error = False
try:
for piece in gen:
if "error" in piece:
found_error = True
except AttributeError, exc:
if u"session_storage" not in str( exc ):
raise exc
assert found_error
def test_upload_file_unnamed( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = "",
file_data = self.file_data,
session_id = self.session_id,
)
self.assert_inline_error( result )
def test_upload_file_empty( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = "",
session_id = self.session_id,
)
self.assert_inline_error( result )
def test_upload_file_cancel( self ):
raise NotImplementError()
def test_upload_file_over_quota( self ):
raise NotImplementError()
def login( self ):
result = self.http_post( "/users/login", dict(
username = self.username,
password = self.password,
login_button = u"login",
) )
self.session_id = result[ u"session_id" ]
def login2( self ):
result = self.http_post( "/users/login", dict(
username = self.username2,
password = self.password2,
login_button = u"login",
) )
self.session_id = result[ u"session_id" ]

View File

@ -16,7 +16,7 @@ class Upload_page( Html ):
Input( type = u"submit", id = u"upload_button", class_ = u"button", value = u"upload" ),
Input( type = u"hidden", id = u"notebook_id", name = u"notebook_id", value = notebook_id ),
Input( type = u"hidden", id = u"note_id", name = u"note_id", value = note_id ),
action = u"/files/upload_file",
action = u"/files/upload",
method = u"post",
enctype = u"multipart/form-data",
),