星期一, 八月 06, 2007

python socket file object and shutdown()

今天写 mirrord.py 的服务器端和协议的测试代码,在使用 socket 的文件类对象的时候有问题。工作部分的代码如下:
try:
conn = self.conn
fconn = conn.makefile('w', 0)
data = conn.recv(1024)
if data != "INIT SYNC":
raise ServerExc("Invalid init sync command")
try:
self.wmLock.acquire()
# Copy the content from the manager queue at first:
wdirs = self.manager.wdirs.keys()
files = self.manager.files.keys()
serial = self.manager.serial
# And then register itself to the svPool to inform the scheduler
reserving the manager.wmlog for it:
self.manager.svPool[self] = serial
finally:
self.wmLock.release()
conn.sendall("OK")
data = conn.recv(1024)
if data != "INIT SYNC DIRS":
raise ServerExc("Invalid init sync dirs command")
if debug:
print "server wdirs send", wdirs
# #1:
# fconn = conn.makefile('w', 0)
# fconn.writelines("%s\n" % fn for fn in wdirs)
# fconn.close()
# #2:
fconn.writelines("%s\n" % fn for fn in wdirs)
conn.shutdown(1)
# #3:
# fconn = conn.makefile('w', 0)
# for line in wdirs:
# # conn.sendall("%s\n" % line)
# fconn.write("%s\n" % line)
# # fconn.write("^D")
# fconn.write("")
# fconn.close()
# #4:
# for line in wdirs:
# conn.sendall("%s\n" % line)
测试代码实际上就是一个客户端,代码如下:
try:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect(('localhost', 2123))
fconn = conn.makefile('r', 0)
conn.sendall("START")
data = conn.recv(1024)
conn.sendall("INIT SYNC")
data = conn.recv(1024)
svCurrent = md.Status.svCurrent()
conn.sendall("INIT SYNC DIRS")
# #1:
result_wdirs = [S.strip() for S in fconn.readlines()]
# #2:
# result_wdirs = []
# for line in fconn:
# line = line.strip()
# print line
# result_wdirs.append(line)
# #3:
# data = conn.recv(2048)
# print "DEBUG", data
# #4:
# while 1:
# data = fconn.readline()
# print data
# if not data: break
if mirrord.debug:
print "client wdirs", result_wdirs
expect_wdirs = snap_wdirs
result_wdirs.sort()
expect_wdirs.sort()
self.failUnlessEqual(result_wdirs, expect_wdirs)
conn.sendall("INVALID INIT SYNC FILES CMD")
self.failIf(conn.recv(1024))
self.failUnlessEqual(svCurrent.Status.stage(), mirrord.STAGE_ABORTED)
self.failIf(svCurrent.that.isAlive())
nloop = md.Status.nloop()
while md.Status.nloop() < nloop + 1: time.sleep(1)
self.failIf(md.manager.svPool.has_key(svCurrent))
except socket.error, socex:
self.fail("Communication failed: %s" % socex)
一开始的时候,始终出现一个问题,就是测试代码在运行到上面的片段的时候挂起。按 Ctrl-C 退出,抛出的异常信息如下:
Thread SERVER ABORTED
loop conitnue at 1186393237.81
Thread SERVER ABORTED
loop conitnue at 1186393238.82
loop conitnue at 1186393239.82
server wdirs send ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
server wdirs sent ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
loop conitnue at 1186393240.83
loop conitnue at 1186393241.83
loop conitnue at 1186393242.84
loop conitnue at 1186393243.84
loop conitnue at 1186393244.85
loop conitnue at 1186393245.85
loop conitnue at 1186393246.85
loop conitnue at 1186393247.86
loop conitnue at 1186393248.86
DELETE FILE: /root/ulfs/cutils/trunk/testdata/mirrord/tin/tfile1
Traceback (most recent call last):
File "mirrord_ut.py", line 1142, in
runner.run(suite)
File "/usr/lib/python2.5/unittest.py", line 705, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 437, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 433, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 281, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 260, in run
testMethod()
File "mirrord_ut.py", line 1047, in test_protocol
data = fconn.readline()
File "/usr/lib/python2.5/socket.py", line 331, in readline
data = recv(1)
KeyboardInterrupt
每次都是这样,不论在客户端使用怎样的读取方法,例如在上面的 #1 到 #4 等集中方法。实际上是因为被阻塞在 socket 的 recv(1) 上了。使用客户端代码的 #2,#3 或 #4 会打印出读取的数据,基本上是这样的:
Thread SERVER ABORTED
loop conitnue at 1186393237.81
Thread SERVER ABORTED
loop conitnue at 1186393238.82
loop conitnue at 1186393239.82
server wdirs send ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
server wdirs sent ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1

/root/ulfs/cutils/trunk/testdata/mirrord/tin


loop conitnue at 1186393240.83
loop conitnue at 1186393241.83
loop conitnue at 1186393242.84
loop conitnue at 1186393243.84
loop conitnue at 1186393244.85
loop conitnue at 1186393245.85
loop conitnue at 1186393246.85
loop conitnue at 1186393247.86
loop conitnue at 1186393248.86
DELETE FILE: /root/ulfs/cutils/trunk/testdata/mirrord/tin/tfile1
Traceback (most recent call last):
File "mirrord_ut.py", line 1142, in
runner.run(suite)
File "/usr/lib/python2.5/unittest.py", line 705, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 437, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 433, in run
test(result)
File "/usr/lib/python2.5/unittest.py", line 281, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.5/unittest.py", line 260, in run
testMethod()
File "mirrord_ut.py", line 1047, in test_protocol
data = fconn.readline()
File "/usr/lib/python2.5/socket.py", line 331, in readline
data = recv(1)
KeyboardInterrupt
进一步验证了被阻塞在了发送的数据结束的地方!

那么是不是和 EOF 有关了,是不是因为没有发送文件结束标志呢?但问题是我不知道如何向一个文件对象写入 EOF,尝试了 "^D",也尝试了空字符串,都没有用;后来我想是不是必须关闭文件对象才会写一个 EOF,但在上面的尝试中也不行;使用 conn.sendall() 也不行,仍然被阻塞,除非客户端使用 conn.recv(),但读到的数据不完整(参看上面服务器端工作代码 #1, #3 和 #4):
loop conitnue at 1186395856.35
Thread SERVER ABORTED
Thread SERVER ABORTED
loop conitnue at 1186395857.36
server wdirs send ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
DEBUG /root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1

server wdirs sent ['/root/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', '/root/ulfs/cutils/trunk/testdata/mirrord/tin']
...
最终,在服务器端使用了 #2 处的代码,调用一个 conn.shutdown(1),这样客户端才终于不再阻塞!

这是从《Python 网络编程基础》上来的:
对于很多操作系统来说,有时候在网络上发送的数据的调用会在远程服务器确保已经收到信息之前返回,因此,很有可能一个来自对 sendall() 成功调用返回的数据,实际上永远都没有被收到。

一旦结束写操作,应该立刻调用 shutdown() 函数,这样会强制清除缓存里面的内容,同时如果有任何问题就会产生一个异常。

...使用 shutdown() 确保服务器完全收到请求...

...请牢记,数据只有在您调用了 shudown() 函数之后才能确保被发送...


但是从 Python lib 里面的内容来看,似乎又不是这个意思,似乎后续写操作都会受影响(上面调用的是 shutdown(1))?是不是这样呢?还得再写程序再看。

实际上,使用 shutdown(1) 会导致后续的写操作都无效,当然 shutdown(1) 会在当前写缓冲区里面的数据都成功写入后才返回。在《Python 网络编程基础》中讲到的半开发 socket 就使用这种方法,当在 fork 或多线程时,使用这种方法防止多个进程/线程之间的数据读写紊乱很有用,但在这里,如果 shutdown(1) 之后不能再写数据,后面的操作全都不能进行了,这显然不是我想要的。

验证一下。服务器端的代码如下:
# #1:
fconn.writelines("%s\n" % fn for fn in wdirs)
fconn.write("EOF\n")
# fconn.flush()
conn.shutdown(1)
fconn.write("EOF...\n")
客户端代码:
while 1:
data = fconn.readline()[:-1]
if not data: break
# if data == 'EOF': break
result_wdirs.append(data)
print "client readline:", data
if mirrord.debug:
print "client wdirs", result_wdirs
输出:
 server wdirs send ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1']
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1
client readline: EOF
client wdirs ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1', 'EOF']
F
可以看出,第二个 EOF... 没有被打印出来,因为此时 conn.shutdown(1) 已经执行了,后面再 fconn.write() 已经不起作用──但为什么没有抛出 socket.error 异常???但后面的
if debug:
print "server wdirs sent", wdirs
倒也确实没有执行!似乎也被阻塞了!!!即便在客户端再加上读取的语句:
result_wdirs = []
while 1:
data = fconn.readline()[:-1]
# if not data: break
if data == 'EOF': break
result_wdirs.append(data)
print "client readline:", data
data = conn.recv(1024)
print data
也一样,data 没有被读取,最终的输出结果为:
loop conitnue at 1186411462.36
Thread SERVER ABORTED
Thread SERVER ABORTED
loop conitnue at 1186411463.41
server wdirs send ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1']
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin
client readline: /home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1
after shutdown(1)
client wdirs ['/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin', '/home/roc/ulfs/cutils/trunk/testdata/mirrord/tin/tdir1']
F
======================================================================
FAIL: testing server and the protocol for communication ...
----------------------------------------------------------------------
Traceback (most recent call last):
File "mirrord_ut.py", line 1069, in test_protocol
self.failUnlessEqual(svCurrent.Status.stage(), mirrord.STAGE_ABORTED)
AssertionError: 1 != 127

----------------------------------------------------------------------
Ran 1 test in 2.241s

FAILED (failures=1)
stage == 1 说明在运行状态,只可能是被阻塞。

但究竟为什么被阻塞而不是抛出 socket.error 的异常呢?

先撇开这个问题,分析一下客户端 readline() 被阻塞的原因。对于循环调用 readline(readlines 就是循环调用 readline),通过分析 socket.py 可以看到,它调用 recv(1),除非得到 "\n" 或 EOF,否则不会终止,但对于 socket 来说,实际上除非关闭这个 socket,才会返回一个 EOF,而因为下面还有数据要读写,所以这里不可能关闭 conn,因此客户端的 readline/readlines 以为下面还有数据要读取,故阻塞等待。

调用 shutdown(1) 从上面的分析来看也不可行。那么只能通过对协议的设计来解决这个问题!服务器端可以在 writelines 后再追加一行 “EOF\n"(注意这里是字符串”EOF“而不是前面的 EOF 标识),客户端在读取到 EOF 字符串后知道传输已经结束了,因此退出循环。这里不存在有名位 EOF 的正常串的问题,因为传送的都是绝对路径的文件路径名,因此必然是以”/“开头的,因此决不会有冲突!

注意,如果在前面在 shutdown(1) 之前忘记增加 fconn.write("EOF\n") 这一行,将导致这样的输出结果:
......
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client readline:
client r
Traceback (most recent call last):
File "mirrord_ut.py", line 1155, in ?
runner.run(suite)
File "/usr/lib/python2.4/unittest.py", line 696, in run
test(result)
File "/usr/lib/python2.4/unittest.py", line 428, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.4/unittest.py", line 424, in run
test(result)
File "/usr/lib/python2.4/unittest.py", line 281, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.4/unittest.py", line 260, in run
testMethod()
File "mirrord_ut.py", line 1058, in test_protocol
print "client readline:", data
KeyboardInterrupt
除非使用的是 if not data: break 作为判断条件,而不是 if data == 'EOF': break。

这意味着在 socket 的使用中,fconn.readlines() 和 for line in fconn 这样的语法都要注意,一般情况下只能用:
while 1:
line = fconn.readline()[:-1]
if line == 'EOF': break
...
这样的方法!

没有评论: