witten
/
luminotes
Archived
1
0
Fork 0

Many more controller.Files unit tests.

This commit is contained in:
Dan Helfman 2008-02-23 22:17:02 +00:00
parent 7b76d6371b
commit 4150811d9e
5 changed files with 471 additions and 91 deletions

View File

@ -90,6 +90,9 @@ class Upload_file( object ):
def close( self ):
self.__file.close()
self.complete()
def complete( self ):
self.__complete.set()
def delete( self ):
@ -342,12 +345,12 @@ class Files( object ):
"""
global current_uploads, current_uploads_lock
uploaded_file = current_uploads.get( file_id )
if not uploaded_file:
return dict( script = general_error_script % u"Please select a file to upload." )
current_uploads_lock.acquire()
try:
uploaded_file = current_uploads.get( file_id )
if not uploaded_file:
return dict( script = general_error_script % u"Please select a file to upload." )
del( current_uploads[ file_id ] )
finally:
current_uploads_lock.release()
@ -402,6 +405,8 @@ class Files( object ):
@rtype: unicode
@return: streaming HTML progress bar
"""
global current_uploads
# release the session lock before beginning to stream the upload report. otherwise, if the
# upload is cancelled before it's done, the lock won't be released
try:

View File

@ -335,6 +335,12 @@ class Test_controller( object ):
File.sql_load_note_files = staticmethod( lambda note_id:
lambda database: sql_load_note_files( note_id, database ) )
def sql_delete( self, database ):
del( database.objects[ self.object_id ] )
File.sql_delete = lambda self: \
lambda database: sql_delete( self, database )
def setUp( self ):
from controller.Root import Root
@ -367,7 +373,7 @@ class Test_controller( object ):
},
{
u"name": "extra super",
u"storage_quota_bytes": 31337 * 10,
u"storage_quota_bytes": 31337 * 1000,
u"notebook_collaboration": True,
u"fee": 9.00,
u"button": u"[or here user %s!] button",

View File

@ -1,4 +1,6 @@
import time
import types
from threading import Thread
from StringIO import StringIO
from Test_controller import Test_controller
from model.Notebook import Notebook
@ -7,6 +9,7 @@ from model.User import User
from model.Invite import Invite
from model.File import File
from controller.Notebooks import Access_error
import controller.Files
from controller.Files import Upload_file
@ -57,9 +60,19 @@ class Test_files( Test_controller ):
del( Upload_file.fake_files[ file_id ] )
@staticmethod
def exists( file_id ):
fake_file = Upload_file.fake_files.get( file_id )
return fake_file is not None
def close( self ):
self.complete()
Upload_file.open_file = open_file
Upload_file.delete_file = delete_file
Upload_file.close = lambda self: None
Upload_file.exists = exists
Upload_file.close = close
self.make_users()
self.make_notebooks()
@ -89,6 +102,7 @@ class Test_files( Test_controller ):
self.database.save( self.user, commit = False )
self.user2 = User.create( self.database.next_id( User ), self.username2, self.password2, self.email_address2 )
self.user2.rate_plan = 1
self.database.save( self.user2, commit = False )
self.anonymous = User.create( self.database.next_id( User ), u"anonymous" )
@ -131,7 +145,7 @@ class Test_files( Test_controller ):
session_id = self.session_id,
)
assert u"error" not in result
assert u"body" not in result
assert u"script" not in result
# assert that the file metadata was actually stored in the database
@ -146,6 +160,11 @@ class Test_files( Test_controller ):
# assert that the file data was actually stored
assert Upload_file.open_file( self.file_id ).read() == self.file_data
# assert that storage bytes increased
orig_storage_bytes = self.user.storage_bytes
user = self.database.load( User, self.user.object_id )
assert user.storage_bytes > orig_storage_bytes
def test_upload_without_login( self ):
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
@ -160,6 +179,10 @@ class Test_files( Test_controller ):
assert u"access" in result.get( u"script" )
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def test_upload_without_access( self ):
self.login2()
@ -177,7 +200,121 @@ class Test_files( Test_controller ):
assert u"access" in result.get( u"script" )
def assert_streaming_error( self, result ):
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def test_upload_without_file_id( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id
)
assert u"error" in result[ u"body" ][ 0 ]
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def test_upload_unnamed( self ):
self.login()
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = "",
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id
)
assert "select a file" in result[ "script" ]
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def test_upload_invalid_content_length( self ):
self.login()
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
headers = [ ( "Content-Length", "-10" ) ],
session_id = self.session_id
)
assert u"error" in result[ u"body" ][ 0 ]
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def test_upload_cancel( self ):
self.login()
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
simulate_cancel = True,
session_id = self.session_id
)
assert u"body" not in result
assert u"script" not in result
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def test_upload_over_quota( self ):
large_file_data = self.file_data * 5
self.login()
result = self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = large_file_data,
content_type = self.content_type,
session_id = self.session_id,
)
assert "quota" in result[ "script" ]
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
def assert_streaming_error( self, result, error_string ):
gen = result[ u"body" ]
assert isinstance( gen, types.GeneratorType )
@ -185,7 +322,7 @@ class Test_files( Test_controller ):
try:
for piece in gen:
if "error" in piece:
if error_string in piece:
found_error = True
except AttributeError, exc:
if u"session_storage" not in str( exc ):
@ -193,88 +330,32 @@ class Test_files( Test_controller ):
assert found_error
def test_upload_unnamed( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = "",
file_data = self.file_data,
session_id = self.session_id,
)
self.assert_streaming_error( result )
def test_upload_empty( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = "",
session_id = self.session_id,
)
self.assert_streaming_error( result )
def test_upload_invalid_content_length( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
headers = [ ( "Content-Length", "-10" ) ],
session_id = self.session_id,
)
assert "invalid" in result[ "body" ][ 0 ]
def test_upload_cancel( self ):
self.login()
result = self.http_upload(
"/files/upload",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
simulate_cancel = True,
session_id = self.session_id,
)
self.assert_streaming_error( result )
def test_upload_over_quota( self ):
raise NotImplementedError()
def test_progress( self ):
raise NotImplementedError()
self.database.execute( self.user2.sql_save_notebook( self.notebook.object_id, read_write = True, owner = False ) )
self.database.execute( self.user2.sql_save_notebook( self.notebook.trash_id, read_write = True, owner = False ) )
self.login()
self.login2()
self.database.save( File( object_id = self.file_id ) )
result = self.http_upload(
"/files/progress",
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
# start a file uploading in a separate thread
def upload():
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data * 1000,
content_type = self.content_type,
session_id = self.session_id,
)
Thread( target = upload ).start()
# report on that file's upload progress
result = self.http_get(
"/files/progress?file_id=%s&filename=%s" % ( self.file_id, self.filename ),
session_id = self.session_id,
)
@ -283,6 +364,7 @@ class Test_files( Test_controller ):
tick_count = 0
tick_done = False
complete = False
try:
for piece in gen:
@ -290,6 +372,8 @@ class Test_files( Test_controller ):
tick_count += 1
if u"tick(1.0)" in piece:
tick_done = True
if u"complete" in piece:
complete = True
# during this unit test, full session info isn't available, so swallow an expected
# exception about session_storage
except AttributeError, exc:
@ -299,12 +383,295 @@ class Test_files( Test_controller ):
# assert that the progress bar is moving, and then completes
assert tick_count >= 2
assert tick_done
assert complete
def test_progress_without_login( self ):
self.database.execute( self.user2.sql_save_notebook( self.notebook.object_id, read_write = True, owner = False ) )
self.database.execute( self.user2.sql_save_notebook( self.notebook.trash_id, read_write = True, owner = False ) )
self.login2() # this login is for the upload, not the call to progress
self.database.save( File( object_id = self.file_id ) )
# start a file uploading in a separate thread
def upload():
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data * 1000,
content_type = self.content_type,
session_id = self.session_id,
)
Thread( target = upload ).start()
# report on that file's upload progress
result = self.http_get(
"/files/progress?file_id=%s&filename=%s" % ( self.file_id, self.filename ),
)
print u"access" in result[ u"body" ][ 0 ]
def test_progress_for_completed_upload( self ):
self.login()
self.database.save( File( object_id = self.file_id ) )
# upload a file completely
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
# report on that completed file's upload progress
result = self.http_get(
"/files/progress?file_id=%s&filename=%s" % ( self.file_id, self.filename ),
session_id = self.session_id,
)
gen = result[ u"body" ]
assert isinstance( gen, types.GeneratorType )
complete = False
try:
for piece in gen:
if u"complete" in piece:
complete = True
# during this unit test, full session info isn't available, so swallow an expected
# exception about session_storage
except AttributeError, exc:
if u"session_storage" not in str( exc ):
raise exc
# assert that the progress bar is moving, and then completes
assert complete
def test_progress_with_unknown_file_id( self ):
self.login()
result = self.http_get(
"/files/progress?file_id=%s&filename=%s" % ( self.file_id, self.filename ),
session_id = self.session_id,
)
assert u"error" in result[ u"body" ][ 0 ]
assert u"unknown" in result[ u"body" ][ 0 ]
def test_progress_over_quota( self ):
self.login()
self.database.save( File( object_id = self.file_id ) )
# start a large file uploading in a separate thread
def upload():
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data * 1000,
content_type = self.content_type,
session_id = self.session_id,
)
Thread( target = upload ).start()
result = self.http_get(
"/files/progress?file_id=%s&filename=%s" % ( self.file_id, self.filename ),
session_id = self.session_id,
)
self.assert_streaming_error( result, u"quota" )
def test_stats( self ):
raise NotImplementedError()
self.login()
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_get(
"/files/stats?file_id=%s" % self.file_id,
session_id = self.session_id,
)
assert result[ u"filename" ] == self.filename
assert result[ u"size_bytes" ] == len( self.file_data )
orig_storage_bytes = self.user.storage_bytes
user = self.database.load( User, self.user.object_id )
assert result[ u"storage_bytes" ] == user.storage_bytes
assert user.storage_bytes > orig_storage_bytes
def test_stats_without_login( self ):
self.login() # this login is for the upload, not the call to stats
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_get(
"/files/stats?file_id=%s" % self.file_id,
)
assert u"access" in result[ u"error" ]
def test_stats_without_access( self ):
self.login()
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
self.login2() # now login as a different user
result = self.http_get(
"/files/stats?file_id=%s" % self.file_id,
session_id = self.session_id,
)
assert u"access" in result[ u"error" ]
def test_stats_with_unknown_file_id( self ):
self.login()
result = self.http_get(
"/files/stats?file_id=%s" % self.file_id,
session_id = self.session_id,
)
assert u"access" in result[ u"error" ]
def test_delete( self ):
raise NotImplementedError()
self.login()
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_post(
"/files/delete",
dict(
file_id = self.file_id,
),
session_id = self.session_id,
)
db_file = self.database.load( File, self.file_id )
assert db_file is None
assert not Upload_file.exists( self.file_id )
orig_storage_bytes = self.user.storage_bytes
user = self.database.load( User, self.user.object_id )
assert result[ u"storage_bytes" ] == user.storage_bytes
assert user.storage_bytes != orig_storage_bytes
def test_delete_without_login( self ):
self.login() # this login is for the upload, not the call to delete
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
result = self.http_post(
"/files/delete",
dict(
file_id = self.file_id,
),
)
assert u"access" in result[ u"error" ]
def test_delete_without_access( self ):
self.login()
self.http_upload(
"/files/upload?file_id=%s" % self.file_id,
dict(
notebook_id = self.notebook.object_id,
note_id = self.note.object_id,
),
filename = self.filename,
file_data = self.file_data,
content_type = self.content_type,
session_id = self.session_id,
)
self.login2() # now login as a different user
result = self.http_post(
"/files/delete",
dict(
file_id = self.file_id,
),
session_id = self.session_id,
)
assert u"access" in result[ u"error" ]
def test_delete_with_unknown_file_id( self ):
self.login()
result = self.http_post(
"/files/delete",
dict(
file_id = self.file_id,
),
session_id = self.session_id,
)
assert u"access" in result[ u"error" ]
def test_rename( self ):
raise NotImplementedError()

View File

@ -3252,7 +3252,7 @@ class Test_users( Test_controller ):
rate_plan = result[ u"rate_plan" ]
assert rate_plan
assert rate_plan[ u"name" ] == u"extra super"
assert rate_plan[ u"storage_quota_bytes" ] == 31337 * 10
assert rate_plan[ u"storage_quota_bytes" ] == 31337 * 1000
assert result[ u"conversion" ] == u"subscribe_1"
assert result[ u"notebook" ].object_id == self.anon_notebook.object_id

View File

@ -55,6 +55,8 @@ def stream_progress( uploading_file, filename, fraction_reported ):
received_bytes = uploading_file.wait_for_total_received_bytes()
fraction_done = float( received_bytes ) / float( uploading_file.content_length )
if fraction_done > 1.0: fraction_done = 1.0
if fraction_done == 1.0 or fraction_done > fraction_reported + tick_increment:
fraction_reported = fraction_done
yield '<script type="text/javascript">tick(%s);</script>' % fraction_reported