星期三, 八月 01, 2007

python thread setDaemon

运行一个单元测试程序,结果抛出异常:
roc@crablfs:~/ulfs/cutils/trunk/lib$ python mirrord_ut.py

testing boot init ...
.
testing monitoring ...
Add a dir to watch: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin
Add a file to record: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tfile1
Add a dir to watch: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1
Add a file to record: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1/tfile2
boot init finished
run schedule finished
loop conitnue at 1185979072.2
Staring socket failed: ('', 2123), (98, 'Address already in use')
Exception in thread schedule:
Traceback (most recent call last):
File "/usr/lib/python2.4/threading.py", line 442, in __bootstrap
self.run()
File "/usr/lib/python2.4/threading.py", line 422, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/roc/ulfs/cutils/trunk/lib/mirrord.py", line 426, in _run_schedule
raise MirrordExc(strerr)
MirrordExc:

......

F
======================================================================
FAIL: test_Monitor (__main__.TestMirrord)
----------------------------------------------------------------------
Traceback (most recent call last):
File "mirrord_ut.py", line 181, in test_Monitor
self.failUnless(md.manager.wdirs.has_key(wd))
AssertionError

----------------------------------------------------------------------
Ran 2 tests in 72.394s

FAILED (failures=1)
测试代码是通过产生一个线程去运行工作代码的,这样可以同步监控工作代码中的变量。测试代码是这样的:
    def test_Monitor(self):
print "\ntesting monitoring ..."
try:
mirrord.testing = 1
testdir = self.testdir

# self.setUp()
failed = self.__build_2()
self.failIf(failed)
md = mirrord.Mirrord(identities=['test2'], datadir="/tmp/mirrord")
# Invalid Server should not can boot the run()
self.failUnlessRaises(mirrord.MirrordExc, md.run, address=('localhost', 2123), Server=1)
# The infinite loop should be terminated correctly when request:
mdt = threading.Thread(target=md.run, name="mirrord", args=(('', 2123),))
mdt.setDaemon(1)
mdt.start()
while md.Status.stage() < 2: time.sleep(1)
md.stop()
nloop = md.Status.nloop()
while md.Status.nloop() == nloop: time.sleep(1)
self.failUnlessEqual(md.Terminate, True)
self.failIf(mdt.isAlive())
self.tearDown()

mirrord.debug = 1
# Test inotify functionality:
self.setUp()
self.__build_1()
snap_wdirs = [testdir, "%s/tin/" % testdir, "%s/tin/tdir1" % testdir]
snap_files = ["%s/tin/tfile1" % testdir, "%s/tin/tdir1/tfile2" % testdir]
md = mirrord.Mirrord(identities=['test1'], datadir="/tmp/mirrord")
mdt = threading.Thread(target=md.run, name="mirrord", args=(('', 2123),))
mdt.setDaemon(1)
mdt.start()
while md.Status.stage() < 2: time.sleep(1)
在测试中,前后两次运行了 md.run,而 md.run(Mirrord.run) 会再产生一个线程调用 socket 绑定到 2123 号端口并监听请求,这里报出'Address already in use'的错误,说明前一个调用产生的 socket 线程没有退出,但是 self.failIf(mdt.isAlive()) 的测试已经通过,也就是说 md.run 这个线程已经退出,为什么 socket 线程仍然在运行呢?

这是否和线程调度中的 setDaemon() 有关呢?在工作代码中原来是这样写的:
def _schedule_thread(self, Server, address):
that = threading.Thread(
target=self._run_schedule, name='schedule', args=(Server, address))
that.setDaemon(1)
that.start()
# that.join()

class Mirrord:
...
def _run_schedule(self, Server, address):
"""
This method runs in the child process/thread
to bind to a port for remote clients' requests
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
except socket.error, socex:
strerr = _("Staring socket failed: %s, %s" % (address, socex))
syslog.syslog(syslog.LOG_ERR, strerr)
if debug: print >> sys.stderr, strerr
raise MirrordExc(strerr)

while not self.Terminate:
try:
conn, addr = sock.accept()
# TODO: Add authentication code here:
output = _("Got connection from:", addr)
syslog.syslog(syslog.LOG_INFO, output)
serv = Server(self.manager, self.wmLock, conn, addr)
serv.start()
# After the sync, create a child process or thread to serve the client
except socket.error, socex:
strerr = _("Get connections error: %s" % socex)
syslog.syslog(syslog.LOG_ERR, strerr)
except:
strerr = traceback.format_exc()
syslog.syslog(syslog.LOG_ERR, strerr)

def run(self, address, Server=ServerThread):
...
self._schedule(Server, address)
...

while not self.Terminate:
try:
if debug:
output = _("loop conitnue at %s" % time.time())
print output
syslog.syslog(syslog.LOG_DEBUG, output)
if self.notifier.check_events():
self.notifier.read_events()
try:
self.wmLock.acquire()
self.notifier.process_events()
# Process the dirs/files moved to the to location unmonitored:
# the question is: will IN_MOVED_FROM and IN_MOVED_TO occur in one polling?
self.processor.clean_cookies()
# TODO: Clean up the overdue manager.wmlog here
temp = self.manager.progress.values()
if temp: temp.sort(); min = temp[0]
else: min = self.manager.serial
for sn in self.manager.wmlog.keys():
if sn < min: self.manager.wmlog.pop(sn)
finally:
self.wmLock.release()
# self.Status.nloop += 1
self.Status.nloop = self.Status.nloop() + 1
except:
strerr = traceback.format_exc()
syslog.syslog(syslog.LOG_ERR, strerr)
if debug: print strerr
break
self.notifier.stop()

def stop(self):
self.Terminate = True
# Or threading Event()?
如果按照字面意思来理解,则如果将子线程设置为 Daemonic,那么在产生它的主线程退出后,它仍将继续运行,虽然在这里设置了 self.Terminate,但因为在 conn, addr = sock.accept() 的时候,线程被阻塞了,而测试代码中并没有产生连接请求,所以该子线程无法检测到 self.Terminate 的变化。

但实际上,即使使用 setDaemon(),除非主线程(即整个进程)退出──这里就是测试代码,停止 md.run 也不会导致 schedule 线程退出。如果设置成 daemonic,则当 main thread 退出的时候,schedule thread 反而会继续运行,程序将无法返回到命令提示符!

关于这一点,参见如下的一个原型测试代码:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

"""
Python treading daemonic:
If the second thread exits, the third thread will not exits,
no matter whether setDaemon(),
Only when the main thread(process) exists, the third one exists!
Author: Roc Zhou
Date: 2007-08-02
Email: chowroc.z@gmail.com
"""

import time
import threading

def run_thread():
while 1:
print "child thread is running ..."
time.sleep(1)

def run_schedule():
this = threading.Thread(target=run_thread, name="thread")
# this.setDaemon(1)
this.start()
print "schedule exiting ..."

that = threading.Thread(target=run_schedule, name="schedule")
# that.setDaemon(1)
that.start()
for i in range(10):
print "main thread is running ..."
time.sleep(1)
print "main thread exiting ..."
按几种不同的 setDaemon() 情况分别运行改程序:

没有评论: