星期二, 七月 31, 2007

python assemble methods at runtime?

在以前曾经讨论过一个例子:python unzip,当时在外部定义了一个函数,然后直接利用 setattr 将其加入到 ZipFile 类对象中,如下:
>>> class test:
... pass
...
>>> def func(self):
... print self.__dict__
...
>>> test.func = func
>>> t = test()
>>> t.func()
{}
>>>
但是,如果在类定义中直接使用,则不行:
>>> def func(self):
... print self.__dict__
...
>>> class test:
... def __init__(self):
... self.func = func
...
>>> t = test()
>>> t.func()
Traceback (most recent call last):
File "", line 1, in
TypeError: func() takes exactly 1 argument (0 given)
使用 setattr 也不行:
>>> def func(self):
... print self.__dict__
...
>>> class test:
... def __init__(self):
... setattr(self, 'func', func)
...
>>> t = test()
>>> t.func()
Traceback (most recent call last):
File "", line 1, in
TypeError: func() takes exactly 1 argument (0 given)
但是,是不是我放在外面定义就没有一点问题了呢?在 mirrord.py 中,我就希望在运行时再装配,因为直接在 __init__ 中定义不行,所以我这样做:
def _schedule_thread(self, Server, address):
that = threading.Thread(
target=self.__run_schedule, name='schedule', args=(Server, address))
that.setDaemon(1)
that.start()
that.join()

def _schedule_process(self, Server, address):
"""
There is a sharing problem has not been solved:
manager(WatchManager) should be shared among processes
"""
pid = os.fork()
if pid == 0: self.__run_schedule(Server, address)

class Mirrord:
...
def __run_schedule(self, Server, address):
...
Mirrord._schedule = _schedule_thread
结果在测试的时候报错:
Exception in thread mirrord:
Traceback (most recent call last):
File "/usr/lib/python2.5/threading.py", line 460, in __bootstrap
self.run()
File "/usr/lib/python2.5/threading.py", line 440, in run
self.__target(*self.__args, **self.__kwargs)
File "mirrord_ut.py", line 56, in __run_thread
self.mrd.run(('localhost', 2123))
File "/root/ulfs/cutils/trunk/lib/mirrord.py", line 497, in run
self._schedule(Server, address)
File "/root/ulfs/cutils/trunk/lib/mirrord.py", line 380, in _schedule_thread
target=self.__run_schedule, name='schedule', args=(Server, address))
AttributeError: Mirrord instance has no attribute '__run_schedule'
,而如果将那两个函数改为:
def _schedule_thread(self, Server, address):
that = threading.Thread(
target=self._Mirrord__run_schedule, name='schedule', args=(Server, address))
that.setDaemon(1)
that.start()
that.join()

def _schedule_process(self, Server, address):
"""
There is a sharing problem has not been solved:
manager(WatchManager) should be shared among processes
"""
pid = os.fork()
if pid == 0: self._Mirrord__run_schedule(Server, address)
则可以通过。但显然使用 _Mirrord__run_schedule 这种方式看上去不是太好!

没有评论: