星期三, 八月 08, 2007

python socket buffers and blocking

前面 python socket write/read only 讨论了使用服务器端只写、客户端只读的方式来进行 mirrord/fs_mirror 之间的同步,那么服务器端如下:
...
fconn = conn.makefile('w', 0)
...
while True:
try:
try:
self.wmCond.acquire()
while self.manager.serial == serial: self.wmCond.wait()
sn = serial
serial = self.manager.serial
finally:
self.wmCond.release()
while sn < serial:
action, target = self.manager.wmlog[sn]
# #1:
fconn.write("%s:%s\n" % (action, target)) # OR:
# conn.sendall("%s:%s\n" % (action, target))
if debug:
print "after writing"
sn += 1
self.manager.svPool[self] = sn
客户端现在主要用在测试中,是这样的:
def __check_protocol(self, conn, fconn, md, expect, incr, func, *args):
svCurrent = md.Status.svCurrent()
sn = md.manager.serial
if mirrord.debug: print "sn:", sn
func(*args)
nloop = md.Status.nloop()
while md.Status.nloop() <= nloop + 1: time.sleep(1)
# Before the wmlog being read, the server should reserve it for the client
self.failUnlessEqual(md.manager.svPool[svCurrent], sn
result = []
for i in range(incr):
line = fconn.readline()[:-1]
result.append(line)
result.sort()
expect.sort()
self.failUnlessEqual(result, expect)
nloop = md.Status.nloop()
while md.Status.nloop() <= nloop + 1: time.sleep(1)
# After the wmlog being transported, the server should delete the overdue records
self.failUnlessEqual(md.manager.svPool[svCurrent], sn + incr)
运行测试时,会在上面的
self.failUnlessEqual(md.manager.svPool[svCurrent], sn
处报错,实际是 md.manager.svPool[svCurrent] > sn!

从输出来分析原因,可以发现在服务器端代码的 #1 处并没有被阻塞!也就是说,fconn.write() 或 conn.sendall() 并不会等待客户端收到数据才返回。实际上,socket 和文件操作一样,使用缓冲 buffers 方法,只有数据被 write() 或 sendall() 到缓冲区以后就会立即返回。

这里虽然在前面使用了 fconn = conn.makefile('w', 0),也就是不使用缓冲,按道理来说,应该是要等待数据成功发送才返回的,但因为这个 fconn 只是一个文件对象,使用的是自己的缓冲,它在底层调用 socket 对象的 sendall() 方法,而这个 sendall() 方法在 Python 中一定会隐含使用 buffers,所以这里还是会立即返回。


《Python 网络编程基础》

对于很多操作系统来说,有时候在网络上发送的数据的调用会在远程服务器确保已经收到信息之前返回,因此,很有可能一个来自对 sendall() 成功调用返回的数据,实际上永远都没有被收到。


或者不完整或顺序不对?

总之,recv() 在缓冲是空的时候阻塞,而 sendall() 等应该是在 buffers 满的时候阻塞!

因此,上面的测试代码,在 func(*args) 被调用之后(通常是 os.mkdir,open('...', 'w') 或 shutil.move 之类的),导致文件系统变化,这样服务器端 fconn.write() 之后立刻返回,则该服务线程的 serial 必然增加,而不会等到下面 line = fconn.readline() 的时候,所以比较 serial 的测试会抛出错误。

所以除非使用一开始那种客户端发送确认回执的方法,否则在这里不能进行这种比较(或者也没必要进行这种比较?)。


但这是不是也就意味着在服务器和客户端之间实际上缺乏一种校准机制?
例如服务器端有一个 serial 为 11138 的 wmlog 记录,但客户端可能并没有收到,一种情况是客户端丢掉了这个记录,另一种情况是客户端不得不进行一次完全的初始化操作?

那么是否使用回执的方法就是正确的校准机制呢?

或者我应该向客户端传递 serial 号?

python socket write/read only

最初服务器端的代码是这样的:
while True:
try:
......
# When modified, inform the client first:
conn.sendall("NEXT")
data = conn.recv(1024)
if data != "OK":
raise ServerExc("Sync failed")
sending = []
while sn < serial:
action, target = self.manager.wmlog[sn]
sending.append("%s:%s\n" % (action, target))
if debug:
print "after writing"
sn += 1
self.manager.svPool[self] = sn
# Assign must be an atomic operation, but dict[k] and getattr?
# But I think it is not necessary to Lock at here.
fconn.writelines(sending)
fconn.write("EOF\n")
fconn.flush()
客户端是这样的(同样在一个循环中):
data = conn.recv(1024)
self.failUnlessEqual(data, "NEXT")
conn.sendall("OK")
result = []
for i in range(incr):
line = fconn.readline()[:-1]
if line == 'EOF': break
result.append(line)
这时候,相当于每次服务器端要发送数据之前,都会先通知客户端,并等待客户端的一个回执。这时我考虑,因为两边都在循环,所以我只需要在服务器端不断的写入,在客户端不断的读取就可以了,因为从这里直到线程退出都不会在发送别的类型的数据。

那么就写成这样:
while True:
try:
......
while sn < serial:
action, target = self.manager.wmlog[sn]
fconn.write("%s:%s\n" % (action, target)) # OR:
# conn.sendall("%s:%s\n" % (action, target))
if debug:
print "after writing"
sn += 1
self.manager.svPool[self] = sn
# Assign must be an atomic operation, but dict[k] and getattr?
# But I think it is not necessary to Lock at here.
客户端:
result = []
for i in range(incr):
line = fconn.readline()[:-1]
result.append(line)
但这时就有一个问题,就是之前采用回执办法的时候,客户端退出时服务器端的 conn.recv() 总会得到空,因此可以通过判断来决定是否要关闭这个 socket(但如果是异常终止呢?)。

而现在服务器端不再做判断,如果客户端退出或异常终止,服务器端会出现什么情况呢?使用一个原型来看看:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

"""
Prototype: server write only, client read only,
if client terminated, what about the server?
This is the server side: write only

Author: Roc Zhou
Date: 2007-08-08
Email: chowroc.z@gmail.com
"""

import sys
import time
import socket
import traceback
import threading

def server(conn):
name = threading.currentThread().getName()
print "Client server: %s" % name
# print "Got connection from: ", conn.getpeername()
conn.sendall("START")
data = conn.recv(1024)
if data != "OK": raise "FAILED"
fconn = conn.makefile('w', 0)
i = 0
while True:
# conn.sendall("message %d" % i)
fconn.write("message %d\n" % i)
i += 1
time.sleep(5)
print "Working thread of %s ending ..." % name
conn.close()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 2123))
print "Waiting for connections ..."
sock.listen(5)

while True:
try:
conn, addr = sock.accept()

print "Got connection from:", addr
that = threading.Thread(target=server, name='server', args=(conn,))
that.setDaemon(1)
that.start()
except KeyboardInterrupt:
raise
except:
print >> sys.stderr, "CATCH ALL:"
traceback.print_exc()
continue
客户端:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

"""
Prototype: server write only, client read only,
if client terminated, what about the server?
This is the client side: read only

Author: Roc Zhou
Date: 2007-08-08
Email: chowroc.z@gmail.com
"""

import socket

host = 'localhost'
port = 2123

print "Creating socket ..."
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print "Connecting ..."
conn.connect((host, port))
print "Done."

data = conn.recv(1024)
if data != "START": raise "FAILED"
conn.sendall("OK")
fconn = conn.makefile('r', 0)

while True:
try:
message = fconn.readline()[:-1]
print message
except KeyboardInterrupt:
raise
运行及其输出

客户端:
sh$ python fs_mirror_10_read_only.py
Creating socket ...
Connecting ...
Done.
message 0
message 1
message 2
message 3
message 4
message 5
message 6
Traceback (most recent call last):
File "fs_mirror_10_read_only.py", line 32, in
message = fconn.readline()[:-1]
File "/usr/lib/python2.5/socket.py", line 331, in readline
data = recv(1)
KeyboardInterrupt


服务器端:
sh# python mirrord_10_write_only.py
Waiting for connections ...
Got connection from: ('127.0.0.1', 38618)
Client server: server
Exception in thread server:
Traceback (most recent call last):
File "/usr/lib/python2.5/threading.py", line 460, in __bootstrap
self.run()
File "/usr/lib/python2.5/threading.py", line 440, in run
self.__target(*self.__args, **self.__kwargs)
File "mirrord_10_write_only.py", line 31, in server
fconn.write("message %d\n" % i)
File "/usr/lib/python2.5/socket.py", line 262, in write
self.flush()
File "/usr/lib/python2.5/socket.py", line 249, in flush
self._sock.sendall(buffer)
error: (32, 'Broken pipe')
或者当不使用 time.sleep(5) 时,服务器端的出错信息如下:
Waiting for connections ...
Got connection from: ('127.0.0.1', 56912)
Client server: server
Exception in thread server:
Traceback (most recent call last):
File "/usr/lib/python2.5/threading.py", line 460, in __bootstrap
self.run()
File "/usr/lib/python2.5/threading.py", line 440, in run
self.__target(*self.__args, **self.__kwargs)
File "mirrord_10_write_only.py", line 30, in server
fconn.write("message %d\n" % i)
File "/usr/lib/python2.5/socket.py", line 262, in write
self.flush()
File "/usr/lib/python2.5/socket.py", line 249, in flush
self._sock.sendall(buffer)
error: (104, 'Connection reset by peer')
总之,会抛出一个 socket.error 的异常,可以通过捕捉这个异常来退出。

但如果使用一边只读,一边只写的情况,就必须要考虑到 socket 对于缓冲(buffers)的使用及其与阻塞(blocking)之间的关系了!

现在的一个问题就是,到底采用那种方式更好呢?理由何在?

星期一, 八月 06, 2007

python socket file object and shutdown()

今天写 mirrord.py 的服务器端和协议的测试代码,在使用 socket 的文件类对象的时候有问题。工作部分的代码如下:
try:
conn = self.conn
fconn = conn.makefile('w', 0)
data = conn.recv(1024)
if data != "INIT SYNC":
raise ServerExc("Invalid init sync command")
try:
self.wmLock.acquire()
# Copy the content from the manager queue at first:
wdirs = self.manager.wdirs.keys()
files = self.manager.files.keys()
serial = self.manager.serial
# And then register itself to the svPool to inform the scheduler
reserving the manager.wmlog for it:
self.manager.svPool[self] = serial
finally:
self.wmLock.release()
conn.sendall("OK")
data = conn.recv(1024)
if data != "INIT SYNC DIRS":
raise ServerExc("Invalid init sync dirs command")
if debug:
print "server wdirs send", wdirs
# #1:
# fconn = conn.makefile('w', 0)
# fconn.writelines("%s\n" % fn for fn in wdirs)
# fconn.close()
# #2:
fconn.writelines("%s\n" % fn for fn in wdirs)
conn.shutdown(1)
# #3:
# fconn = conn.makefile('w', 0)
# for line in wdirs:
# # conn.sendall("%s\n" % line)
# fconn.write("%s\n" % line)
# # fconn.write("^D")
# fconn.write("")
# fconn.close()
# #4:
# for line in wdirs:
# conn.sendall("%s\n" % line)
测试代码实际上就是一个客户端,代码如下:
try:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect(('localhost', 2123))
fconn = conn.makefile('r', 0)
conn.sendall("START")
data = conn.recv(1024)
conn.sendall("INIT SYNC")
data = conn.recv(1024)
svCurrent = md.Status.svCurrent()
conn.sendall("INIT SYNC DIRS")
# #1:
result_wdirs = [S.strip() for S in fconn.readlines()]
# #2:
# result_wdirs = []
# for line in fconn:
# line = line.strip()
# print line
# result_wdirs.append(line)
# #3:
# data = conn.recv(2048)
# print "DEBUG", data
# #4:
# while 1:
# data = fconn.readline()
# print data
# if not data: break
if mirrord.debug:
print "client wdirs", result_wdirs
expect_wdirs = snap_wdirs
result_wdirs.sort()
expect_wdirs.sort()
self.failUnlessEqual(result_wdirs, expect_wdirs)
conn.sendall("INVALID INIT SYNC FILES CMD")
self.failIf(conn.recv(1024))
self.failUnlessEqual(svCurrent.Status.stage(), mirrord.STAGE_ABORTED)
self.failIf(svCurrent.that.isAlive())
nloop = md.Status.nloop()
while md.Status.nloop() < nloop + 1: time.sleep(1)
self.failIf(md.manager.svPool.has_key(svCurrent))
except socket.error, socex:
self.fail("Communication failed: %s" % socex)
一开始的时候,始终出现一个问题,就是测试代码在运行到上面的片段的时候挂起。按 Ctrl-C 退出,抛出的异常信息如下:
Thread SERVER ABORTED
loop conitnue at 1186393237.81
Thread SERVER ABORTED
loop conitnue at 1186393238.82
loop conitnue at 1186393239.82
server wdirs send ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
server wdirs sent ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
loop conitnue at 1186393240.83
loop conitnue at 1186393241.83
loop conitnue at 1186393242.84
loop conitnue at 1186393243.84
loop conitnue at 1186393244.85
loop conitnue at 1186393245.85
loop conitnue at 1186393246.85
loop conitnue at 1186393247.86
loop conitnue at 1186393248.86
DELETE FILE: /root/ulfs/cutils/trunk/testdata/mirrord/tin/tfile1
Traceback (most recent call last):
File "mirrord_ut.py", line 1142, in
runner.run(suite)
File "/usr/lib/python2.5/unittest.py", line 705, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 437, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 433, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 281, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 260, in run
testMethod()
File "mirrord_ut.py", line 1047, in test_protocol
data = fconn.readline()
File "/usr/lib/python2.5/socket.py", line 331, in readline
data = recv(1)
KeyboardInterrupt
每次都是这样,不论在客户端使用怎样的读取方法,例如在上面的 #1 到 #4 等集中方法。实际上是因为被阻塞在 socket 的 recv(1) 上了。使用客户端代码的 #2,#3 或 #4 会打印出读取的数据,基本上是这样的:
Thread SERVER ABORTED
loop conitnue at 1186393237.81
Thread SERVER ABORTED
loop conitnue at 1186393238.82
loop conitnue at 1186393239.82
server wdirs send ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
server wdirs sent ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1

/root/ulfs/cutils/trunk/testdata/mirrord/tin


loop conitnue at 1186393240.83
loop conitnue at 1186393241.83
loop conitnue at 1186393242.84
loop conitnue at 1186393243.84
loop conitnue at 1186393244.85
loop conitnue at 1186393245.85
loop conitnue at 1186393246.85
loop conitnue at 1186393247.86
loop conitnue at 1186393248.86
DELETE FILE: /root/ulfs/cutils/trunk/testdata/mirrord/tin/tfile1
Traceback (most recent call last):
File "mirrord_ut.py", line 1142, in
runner.run(suite)
File "/usr/lib/python2.5/unittest.py", line 705, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 437, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 433, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 281, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 260, in run
testMethod()
File "mirrord_ut.py", line 1047, in test_protocol
data = fconn.readline()
File "/usr/lib/python2.5/socket.py", line 331, in readline
data = recv(1)
KeyboardInterrupt
进一步验证了被阻塞在了发送的数据结束的地方!

那么是不是和 EOF 有关了,是不是因为没有发送文件结束标志呢?但问题是我不知道如何向一个文件对象写入 EOF,尝试了 "^D",也尝试了空字符串,都没有用;后来我想是不是必须关闭文件对象才会写一个 EOF,但在上面的尝试中也不行;使用 conn.sendall() 也不行,仍然被阻塞,除非客户端使用 conn.recv(),但读到的数据不完整(参看上面服务器端工作代码 #1, #3 和 #4):
loop conitnue at 1186395856.35
Thread SERVER ABORTED
Thread SERVER ABORTED
loop conitnue at 1186395857.36
server wdirs send ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
DEBUG /root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1

server wdirs sent ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
...
最终,在服务器端使用了 #2 处的代码,调用一个 conn.shutdown(1),这样客户端才终于不再阻塞!

这是从《Python 网络编程基础》上来的:
对于很多操作系统来说,有时候在网络上发送的数据的调用会在远程服务器确保已经收到信息之前返回,因此,很有可能一个来自对 sendall() 成功调用返回的数据,实际上永远都没有被收到。

一旦结束写操作,应该立刻调用 shutdown() 函数,这样会强制清除缓存里面的内容,同时如果有任何问题就会产生一个异常。

...使用 shutdown() 确保服务器完全收到请求...

...请牢记,数据只有在您调用了 shudown() 函数之后才能确保被发送...


但是从 Python lib 里面的内容来看,似乎又不是这个意思,似乎后续写操作都会受影响(上面调用的是 shutdown(1))?是不是这样呢?还得再写程序再看。

实际上,使用 shutdown(1) 会导致后续的写操作都无效,当然 shutdown(1) 会在当前写缓冲区里面的数据都成功写入后才返回。在《Python 网络编程基础》中讲到的半开发 socket 就使用这种方法,当在 fork 或多线程时,使用这种方法防止多个进程/线程之间的数据读写紊乱很有用,但在这里,如果 shutdown(1) 之后不能再写数据,后面的操作全都不能进行了,这显然不是我想要的。

验证一下。服务器端的代码如下:
# #1:
fconn.writelines("%s\n" % fn for fn in wdirs)
fconn.write("EOF\n")
# fconn.flush()
conn.shutdown(1)
fconn.write("EOF...\n")
客户端代码:
while 1:
data = fconn.readline()[:-1]
if not data: break
# if data == 'EOF': break
result_wdirs.append(data)
print "client readline:", data
if mirrord.debug:
print "client wdirs", result_wdirs
输出:
 server wdirs send ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1']
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1
client readline: EOF
client wdirs ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', 'EOF']
F
可以看出,第二个 EOF... 没有被打印出来,因为此时 conn.shutdown(1) 已经执行了,后面再 fconn.write() 已经不起作用──但为什么没有抛出 socket.error 异常???但后面的
if debug:
print "server wdirs sent", wdirs
倒也确实没有执行!似乎也被阻塞了!!!即便在客户端再加上读取的语句:
result_wdirs = []
while 1:
data = fconn.readline()[:-1]
# if not data: break
if data == 'EOF': break
result_wdirs.append(data)
print "client readline:", data
data = conn.recv(1024)
print data
也一样,data 没有被读取,最终的输出结果为:
loop conitnue at 1186411462.36
Thread SERVER ABORTED
Thread SERVER ABORTED
loop conitnue at 1186411463.41
server wdirs send ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1']
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1
after shutdown(1)
client wdirs ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1']
F
======================================================================
FAIL: testing server and the protocol for communication ...
----------------------------------------------------------------------
Traceback (most recent call last):
File "mirrord_ut.py", line 1069, in test_protocol
self.failUnlessEqual(svCurrent.Status.stage(), mirrord.STAGE_ABORTED)
AssertionError: 1 != 127

----------------------------------------------------------------------
Ran 1 test in 2.241s

FAILED (failures=1)
stage == 1 说明在运行状态,只可能是被阻塞。

但究竟为什么被阻塞而不是抛出 socket.error 的异常呢?

先撇开这个问题,分析一下客户端 readline() 被阻塞的原因。对于循环调用 readline(readlines 就是循环调用 readline),通过分析 socket.py 可以看到,它调用 recv(1),除非得到 "\n" 或 EOF,否则不会终止,但对于 socket 来说,实际上除非关闭这个 socket,才会返回一个 EOF,而因为下面还有数据要读写,所以这里不可能关闭 conn,因此客户端的 readline/readlines 以为下面还有数据要读取,故阻塞等待。

调用 shutdown(1) 从上面的分析来看也不可行。那么只能通过对协议的设计来解决这个问题!服务器端可以在 writelines 后再追加一行 “EOF\n"(注意这里是字符串”EOF“而不是前面的 EOF 标识),客户端在读取到 EOF 字符串后知道传输已经结束了,因此退出循环。这里不存在有名位 EOF 的正常串的问题,因为传送的都是绝对路径的文件路径名,因此必然是以”/“开头的,因此决不会有冲突!

注意,如果在前面在 shutdown(1) 之前忘记增加 fconn.write("EOF\n") 这一行,将导致这样的输出结果:
......
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client r
Traceback (most recent call last):
File "mirrord_ut.py", line 1155, in ?
runner.run(suite)
File "/usr/lib/python2.4/unittest.py", line 696, in run
test(result)
File "/usr/lib/python2.4/unittest.py", line 428, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.4/unittest.py", line 424, in run
test(result)
File "/usr/lib/python2.4/unittest.py", line 281, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.4/unittest.py", line 260, in run
testMethod()
File "mirrord_ut.py", line 1058, in test_protocol
print "client readline:", data
KeyboardInterrupt
除非使用的是 if not data: break 作为判断条件,而不是 if data == 'EOF': break。

这意味着在 socket 的使用中,fconn.readlines() 和 for line in fconn 这样的语法都要注意,一般情况下只能用:
while 1:
line = fconn.readline()[:-1]
if line == 'EOF': break
...
这样的方法!

星期三, 八月 01, 2007

python thread setDaemon

运行一个单元测试程序,结果抛出异常:
roc@crablfs:~/ulfs/cutils/trunk/lib$ python mirrord_ut.py

testing boot init ...
.
testing monitoring ...
Add a dir to watch: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin
Add a file to record: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tfile1
Add a dir to watch: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1
Add a file to record: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1/tfile2
boot init finished
run schedule finished
loop conitnue at 1185979072.2
Staring socket failed: ('', 2123), (98, 'Address already in use')
Exception in thread schedule:
Traceback (most recent call last):
File "/usr/lib/python2.4/threading.py", line 442, in __bootstrap
self.run()
File "/usr/lib/python2.4/threading.py", line 422, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/roc/ulfs/cutils/trunk/lib/mirrord.py", line 426, in _run_schedule
raise MirrordExc(strerr)
MirrordExc:

......

F
======================================================================
FAIL: test_Monitor (__main__.TestMirrord)
----------------------------------------------------------------------
Traceback (most recent call last):
File "mirrord_ut.py", line 181, in test_Monitor
self.failUnless(md.manager.wdirs.has_key(wd))
AssertionError

----------------------------------------------------------------------
Ran 2 tests in 72.394s

FAILED (failures=1)
测试代码是通过产生一个线程去运行工作代码的,这样可以同步监控工作代码中的变量。测试代码是这样的:
    def test_Monitor(self):
print "\ntesting monitoring ..."
try:
mirrord.testing = 1
testdir = self.testdir

# self.setUp()
failed = self.__build_2()
self.failIf(failed)
md = mirrord.Mirrord(identities=['test2'], datadir="/tmp/mirrord")
# Invalid Server should not can boot the run()
self.failUnlessRaises(mirrord.MirrordExc, md.run, address=('localhost', 2123), Server=1)
# The infinite loop should be terminated correctly when request:
mdt = threading.Thread(target=md.run, name="mirrord", args=(('', 2123),))
mdt.setDaemon(1)
mdt.start()
while md.Status.stage() < 2: time.sleep(1)
md.stop()
nloop = md.Status.nloop()
while md.Status.nloop() == nloop: time.sleep(1)
self.failUnlessEqual(md.Terminate, True)
self.failIf(mdt.isAlive())
self.tearDown()

mirrord.debug = 1
# Test inotify functionality:
self.setUp()
self.__build_1()
snap_wdirs = [testdir, "%s/tin/" % testdir, "%s/tin/tdir1" % testdir]
snap_files = ["%s/tin/tfile1" % testdir, "%s/tin/tdir1/tfile2" % testdir]
md = mirrord.Mirrord(identities=['test1'], datadir="/tmp/mirrord")
mdt = threading.Thread(target=md.run, name="mirrord", args=(('', 2123),))
mdt.setDaemon(1)
mdt.start()
while md.Status.stage() < 2: time.sleep(1)
在测试中,前后两次运行了 md.run,而 md.run(Mirrord.run) 会再产生一个线程调用 socket 绑定到 2123 号端口并监听请求,这里报出'Address already in use'的错误,说明前一个调用产生的 socket 线程没有退出,但是 self.failIf(mdt.isAlive()) 的测试已经通过,也就是说 md.run 这个线程已经退出,为什么 socket 线程仍然在运行呢?

这是否和线程调度中的 setDaemon() 有关呢?在工作代码中原来是这样写的:
def _schedule_thread(self, Server, address):
that = threading.Thread(
target=self._run_schedule, name='schedule', args=(Server, address))
that.setDaemon(1)
that.start()
# that.join()

class Mirrord:
...
def _run_schedule(self, Server, address):
"""
This method runs in the child process/thread
to bind to a port for remote clients' requests
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
except socket.error, socex:
strerr = _("Staring socket failed: %s, %s" % (address, socex))
syslog.syslog(syslog.LOG_ERR, strerr)
if debug: print >> sys.stderr, strerr
raise MirrordExc(strerr)

while not self.Terminate:
try:
conn, addr = sock.accept()
# TODO: Add authentication code here:
output = _("Got connection from:", addr)
syslog.syslog(syslog.LOG_INFO, output)
serv = Server(self.manager, self.wmLock, conn, addr)
serv.start()
# After the sync, create a child process or thread to serve the client
except socket.error, socex:
strerr = _("Get connections error: %s" % socex)
syslog.syslog(syslog.LOG_ERR, strerr)
except:
strerr = traceback.format_exc()
syslog.syslog(syslog.LOG_ERR, strerr)

def run(self, address, Server=ServerThread):
...
self._schedule(Server, address)
...

while not self.Terminate:
try:
if debug:
output = _("loop conitnue at %s" % time.time())
print output
syslog.syslog(syslog.LOG_DEBUG, output)
if self.notifier.check_events():
self.notifier.read_events()
try:
self.wmLock.acquire()
self.notifier.process_events()
# Process the dirs/files moved to the to location unmonitored:
# the question is: will IN_MOVED_FROM and IN_MOVED_TO occur in one polling?
self.processor.clean_cookies()
# TODO: Clean up the overdue manager.wmlog here
temp = self.manager.progress.values()
if temp: temp.sort(); min = temp[0]
else: min = self.manager.serial
for sn in self.manager.wmlog.keys():
if sn < min: self.manager.wmlog.pop(sn)
finally:
self.wmLock.release()
# self.Status.nloop += 1
self.Status.nloop = self.Status.nloop() + 1
except:
strerr = traceback.format_exc()
syslog.syslog(syslog.LOG_ERR, strerr)
if debug: print strerr
break
self.notifier.stop()

def stop(self):
self.Terminate = True
# Or threading Event()?
如果按照字面意思来理解,则如果将子线程设置为 Daemonic,那么在产生它的主线程退出后,它仍将继续运行,虽然在这里设置了 self.Terminate,但因为在 conn, addr = sock.accept() 的时候,线程被阻塞了,而测试代码中并没有产生连接请求,所以该子线程无法检测到 self.Terminate 的变化。

但实际上,即使使用 setDaemon(),除非主线程(即整个进程)退出──这里就是测试代码,停止 md.run 也不会导致 schedule 线程退出。如果设置成 daemonic,则当 main thread 退出的时候,schedule thread 反而会继续运行,程序将无法返回到命令提示符!

关于这一点,参见如下的一个原型测试代码:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

"""
Python treading daemonic:
If the second thread exits, the third thread will not exits,
no matter whether setDaemon(),
Only when the main thread(process) exists, the third one exists!
Author: Roc Zhou
Date: 2007-08-02
Email: chowroc.z@gmail.com
"""

import time
import threading

def run_thread():
while 1:
print "child thread is running ..."
time.sleep(1)

def run_schedule():
this = threading.Thread(target=run_thread, name="thread")
# this.setDaemon(1)
this.start()
print "schedule exiting ..."

that = threading.Thread(target=run_schedule, name="schedule")
# that.setDaemon(1)
that.start()
for i in range(10):
print "main thread is running ..."
time.sleep(1)
print "main thread exiting ..."
按几种不同的 setDaemon() 情况分别运行改程序: