1 import Cookie
2 import cStringIO as StringIO
3 import os
4 import types
5 import logging
6 import string
7 import unittest
8
9 import cherrypy
10 from cherrypy._cphttptools import Request, Response
11
12 from webtest import TestApp
13
14 try:
15 import sqlobject
16 from sqlobject.inheritance import InheritableSQLObject
17 except ImportError:
18 sqlobject = None
19 try:
20 import sqlalchemy
21 except ImportError:
22 sqlalchemy = None
23
24 from turbogears import (config, controllers, database, startup, update_config,
25 validators)
26 from turbogears.identity import current_provider
27 from turbogears.util import get_model, deprecated
28
29
30
31 for w in os.walk('.'):
32 if not os.sep + '.' in w[0]:
33 for f in w[2]:
34 if f.endswith('.kid'):
35 f = os.path.join(w[0], f[:-3] + 'pyc')
36 if os.path.exists(f):
37 os.remove(f)
38
39
40 if os.path.exists('test.cfg'):
41
42 for dirpath, dirs, dummy2 in os.walk('.'):
43 basename = os.path.basename(dirpath)
44 dirname = os.path.basename(os.path.dirname(dirpath))
45 init_py = os.path.join(dirpath, '__init__.py')
46 if basename == 'config' and dirname[0] in string.ascii_letters + '_' \
47 and os.path.exists(init_py):
48 modulename = "%s.app" % dirpath[2:].replace(os.sep, ".")
49 break
50 else:
51 modulename = None
52
53
54
55 try:
56 update_config(configfile="test.cfg", modulename=modulename)
57 except ImportError, exc:
58 import warnings
59 warnings.warn("Could not import configuration from module: %s" % exc,
60 RuntimeWarning)
61 update_config(configfile="test.cfg", modulename=None)
62 else:
63 database.set_db_uri("sqlite:///:memory:")
64
65 config.update({'global':
66 {'autoreload.on': False, 'tg.new_style_logging': True}})
67
68
69
70
71
72 -def mount(controller, path="/"):
73 """Mount a controller at a path. Returns a wsgi application."""
74 if path == '/':
75 cherrypy.root = controller
76 else:
77 cherrypy.tree.mount(controller, path)
78 return make_wsgiapp()
79
81 """Remove an application from the object traversal tree."""
82
83
84
85
86 cherrypy.root = None
87 cherrypy.tree.mount_points = {}
88
90 """Return a WSGI application from CherryPy's root object."""
91 wsgiapp = cherrypy._cpwsgi.wsgiApp
92 return wsgiapp
93
95 """Return a WebTest.TestApp instance from CherryPy.
96
97 If a Controller object is provided, it will be mounted at the root level.
98 If not, it'll look for an already mounted root.
99
100 """
101 if controller:
102 wsgiapp = mount(controller(), '/')
103 else:
104 wsgiapp = make_wsgiapp()
105 return TestApp(wsgiapp)
106
116
118 """Stop the server and unmount the application.
119
120 Use tg_only=True to leave CherryPy running (for faster tests).
121
122 """
123 unmount()
124 if config.get("cp_started") and not tg_only:
125 cherrypy.server.stop()
126 config.update({"cp_started" : False})
127
128 if config.get("server_started"):
129 startup.stopTurboGears()
130 config.update({"server_started" : False})
131
132
133
134 _currentcat = None
137
139 logging.Handler.__init__(self, level=logging.DEBUG)
140 self.log = []
141
142 - def emit(self, record):
143 print "Got record: %s" % record
144 print "formatted as: %s" % self.format(record)
145 self.log.append(self.format(record))
146
148 print "\n".join(self.log)
149 self.log = []
150
155
156 _memhandler = MemoryListHandler()
170
172 """Capture log for one category.
173
174 The category can either be a single category (a string like 'foo.bar')
175 or a list of them. You *must* call print_log() to reset when you're done.
176
177 """
178 global _currentcat
179 assert not _currentcat, "_currentcat not cleared. Use get_log to reset."
180 if not isinstance(category, list) and not isinstance(category, tuple):
181 category = [category]
182 _currentcat = category
183 for cat in category:
184 log = logging.getLogger(cat)
185 log.setLevel(logging.DEBUG)
186 log.addHandler(_memhandler)
187
197
199 """Print the log captured by capture_log to stdout.
200
201 Resets that log and resets the temporarily added handlers.
202
203 """
204 _reset_logging()
205 _memhandler.print_log()
206
208 """Return the list of log messages captured by capture_log.
209
210 Resets that log and resets the temporarily added handlers.
211
212 """
213 _reset_logging()
214 return _memhandler.get_log()
215
225
226
227
228
229 -class TGTest(unittest.TestCase):
230 """A WebTest enabled unit testing class.
231
232 To use, subclass & set root to your controller object, or set app to a
233 webtest.TestApp instance.
234
235 In your tests, use self.app to make WebTest calls.
236
237 """
238 root = None
239 app = None
240 stop_tg_only = False
241 config = None
242
244 """Set up the WebTest by starting the server.
245
246 You should override this and make sure you have properly
247 mounted a root for your server before calling super,
248 or simply pass a root controller to super.
249 Otherwise the CherryPy filters for TurboGears will not be used.
250
251 """
252 assert self.root or self.app, "Either self.root or self.app must be set"
253 if not self.app:
254 self.app = make_app(self.root)
255 if not self.config:
256 self.config = config.copy()
257 start_server()
258
263
264
269
272
274 self.visit = None
275 self.response, self.status = None, None
276 self.cookie = Cookie.SimpleCookie()
277 self.app = make_app()
278
279 - def goto(self, *args, **kwargs):
280 if self.cookie:
281 headers = kwargs.setdefault('headers', {})
282 headers['Cookie'] = self.cookie_encoded
283 response = self.app.get(*args, **kwargs)
284
285
286
287 ctype_parts = response.headers['Content-Type'].split(';')
288 for parameter in ctype_parts[1:]:
289 attribute, value = parameter.strip().split('=')
290 try:
291 self.unicode_response = response.body.decode(value)
292 break
293 except:
294
295
296 pass
297
298 self.response = response.body
299 self.full_response = response
300 self.status = response.status
301 self.cookie = response.cookies_set
302 self.cookie_encoded = response.headers.get('Set-Cookie', '')
303
306 """A very simple dummy session."""
307
308 session_storage = dict
309 to_be_loaded = None
310
313 """A very simple dummy request."""
314
315 remote_host = "127.0.0.1"
316
317 - def __init__(self, method='GET', path='/', headers=None):
324
327
330 """A very simple dummy response."""
331
332 headers = {}
333
336 """A database enabled unit testing class.
337
338 Creates and destroys your database before and after each unit test.
339 You must set the model attribute in order for this class to
340 function correctly.
341
342 """
343 model = None
344
346 raise NotImplementedError()
347
349 raise NotImplementedError()
350
353
355 try:
356 return [self.model.__dict__[x] for x in self.model.soClasses]
357 except AttributeError:
358 return self.model.__dict__.values()
359
361 if not self.model:
362 self.model = get_model()
363 if not self.model:
364 raise Exception("Unable to run database tests without a model")
365
366
367 constraints = list()
368
369 for item in self._get_soClasses():
370 if isinstance(item, types.TypeType) and issubclass(item,
371 sqlobject.SQLObject) and item != sqlobject.SQLObject \
372 and item != InheritableSQLObject:
373
374
375
376
377 collected_constraints = item.createTable(ifNotExists=True,
378 applyConstraints=False)
379
380 if collected_constraints:
381 constraints.extend(collected_constraints)
382
383
384 for postponed_constraint in constraints:
385
386 item._connection.query(postponed_constraint)
387
389 database.rollback_all()
390 for item in reversed(self._get_soClasses()):
391 if isinstance(item, types.TypeType) and issubclass(item,
392 sqlobject.SQLObject) and item != sqlobject.SQLObject \
393 and item != InheritableSQLObject:
394 item.dropTable(ifExists=True, cascade=True)
395
405
406
407
408
409 if config.get("sqlobject.dburi"):
410 DBTest = DBTestSO
411 elif config.get("sqlalchemy.dburi"):
412 DBTest = DBTestSA
413 else:
414 raise Exception("Unable to find sqlalchemy or sqlobject dburi")
415
416
417
418
419 start_cp = deprecated('start_cp is superceded by start_server')(start_server)
420
421 reset_cp = deprecated('reset_cp has been superceded by unmount.')(unmount)
422
423 test_user = None
430
437
438 @deprecated("create_request is deprecated. See TestMigration on the TG Wiki")
439 -def create_request(request, method="GET", protocol="HTTP/1.1",
440 headers={}, rfile=None, clientAddress="127.0.0.1",
441 remoteHost="localhost", scheme="http"):
442 start_server()
443 if not rfile:
444 rfile = StringIO.StringIO("")
445 if type(headers) != dict:
446 headerList = headers
447 else:
448 headerList = [(key, value) for key, value in headers.items()]
449 headerList.append(("Host", "localhost"))
450 if not hasattr(cherrypy.root, "started"):
451 startup.startTurboGears()
452 cherrypy.root.started = True
453 req = Request(clientAddress, 80, remoteHost, scheme)
454 cherrypy.serving.request = req
455 attach_identity(req)
456 cherrypy.serving.response = Response()
457 req.run(" ".join((method, request, protocol)), headerList, rfile)
458 return cherrypy.serving.response
459
460
461 createRequest = create_request
465
466 @deprecated("Please see the TestMigration page in the TG wiki.")
467 -def call(method, *args, **kw):
468 start_server()
469 output, response = call_with_request(method, DummyRequest(), *args, **kw)
470 return output
471
472 @deprecated("Please see the TestMigration page in the TG wiki.")
473 -def call_with_request(method, request, *args, **kw):
474 """More fine-grained version of call method.
475
476 This allows using request/response.
477
478 """
479 orig_proc_output = controllers._process_output
480 controllers._process_output = _return_directly
481 cherrypy.serving.response = Response()
482 cherrypy.serving.request = request
483 if not hasattr(request, "identity"):
484 attach_identity(request)
485 output = None
486 try:
487 output = method(*args, **kw)
488 finally:
489 del cherrypy.serving.request
490 controllers._process_output = orig_proc_output
491 response = cherrypy.serving.response
492 return output, response
493
494
495 __all__ = [
496 "BrowsingSession",
497 "DBTest",
498 "DBTestSA",
499 "DBTestSO",
500 "DummyRequest"
501 "DumyResponse",
502 "DummySession",
503 "TGTest",
504 "capture_log",
505 "make_app",
506 "make_wsgiapp",
507 "mount",
508 "print_log",
509 "get_log",
510 "sqlalchemy_cleanup",
511 "start_server",
512 "stop_server",
513 "unmount"
514 ]
515