Python:从multiprocessing.Process中获取回溯

| 我试图从multiprocessing.Process获取一个追溯对象。 不幸的是,通过管道传递异常信息不起作用,因为不能对回溯对象进行腌制:
def foo(pipe_to_parent):
    try:
        raise Exception(\'xxx\')
    except:
        pipe_to_parent.send(sys.exc_info())

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()
追溯:
Traceback (most recent call last):
  File \"/usr/lib/python2.6/multiprocessing/process.py\", line 231, in _bootstrap
    self.run()
  File \"/usr/lib/python2.6/multiprocessing/process.py\", line 88, in run
    self._target(*self._args, **self._kwargs)
  File \"foo\", line 7, in foo
    to_parent.send(sys.exc_info())
PicklingError: Can\'t pickle <type \'traceback\'>: attribute lookup __builtin__.traceback failed
还有另一种访问异常信息的方法吗?我想避免传递格式化的字符串。     
已邀请:
使用
tblib
,您可以传递包装的异常并在以后重新引发它们:
import tblib.pickling_support
tblib.pickling_support.install()

from multiprocessing import Pool
import sys


class ExceptionWrapper(object):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()

    def re_raise(self):
        raise self.ee.with_traceback(self.tb)
        # for Python 2 replace the previous line by:
        # raise self.ee, None, self.tb


# example how to use ExceptionWrapper

def inverse(i):
    \"\"\"will fail for i == 0\"\"\"
    try:
        return 1.0 / i
    except Exception as e:
        return ExceptionWrapper(e)


def main():
    p = Pool(1)
    results = p.map(inverse, [0, 1, 2, 3])
    for result in results:
        if isinstance(result, ExceptionWrapper):
            result.re_raise()


if __name__ == \"__main__\":
    main()
因此,如果在远程进程中捕获到异常,请用with4ѭ将其包装,然后将其传递回去。在主过程中调用ѭ5即可完成工作。     
由于ѭ6确实打印了子进程中引发的异常的字符串内容,因此您可以将所有子进程代码包装在一个try-except中,该异常可以捕获任何异常,格式化relavent堆栈跟踪,并引发一个新的ѭ7holds,以保存所有相关信息。它的字符串: 我在
multiprocessing.map
中使用的一个函数示例:
def run_functor(functor):
    \"\"\"
    Given a no-argument functor, run it and return its result. We can 
    use this with multiprocessing.map and map it over a list of job 
    functors to do them.

    Handles getting more than multiprocessing\'s pitiful exception output
    \"\"\"

    try:
        # This is where you do your actual work
        return functor()
    except:
        # Put all exception text into an exception and raise that
        raise Exception(\"\".join(traceback.format_exception(*sys.exc_info())))
您得到的是一个堆栈跟踪,其中另一个格式化的堆栈跟踪作为错误消息,这有助于调试。     
似乎很难使可追溯的对象成为可拾取的。 但是,您只能使用traceback.extract_tb方法发送
sys.exc_info()
的前两项和预格式化的追溯信息:
import multiprocessing
import sys
import traceback

def foo(pipe_to_parent):
    try:
        raise Exception(\'xxx\')
    except:
        except_type, except_class, tb = sys.exc_info()
        pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()
这给你:
(<type \'exceptions.Exception\'>, Exception(\'xxx\',), [(\'test_tb.py\', 7, \'foo\', \"raise Exception(\'xxx\')\")])
然后,您将能够获取有关异常原因的更多信息(文件名,引发异常的行号,方法名称和引发异常的语句)     
Python 3 在Python 3中,现在
multiprocessing.pool.Async
get
方法返回完整的追溯,请参阅http://bugs.python.org/issue13831。 Python 2 使用“ 15”(表示格式化的expetion)获取回溯字符串。 如下制作装饰器会更加方便。
def full_traceback(func):
    import traceback, functools
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            msg = \"{}\\n\\nOriginal {}\".format(e, traceback.format_exc())
            raise type(e)(msg)
    return wrapper
例:
def func0():
    raise NameError(\"func0 exception\")

def func1():
    return func0()

# Key is here!
@full_traceback
def main(i):
    return func1()

if __name__ == \'__main__\':
    from multiprocessing import Pool
    pool = Pool(4)
    try:
        results = pool.map_async(main, range(5)).get(1e5)
    finally:
        pool.close()
        pool.join()
装饰器的追溯:
Traceback (most recent call last):
  File \"bt.py\", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File \"/opt/anaconda/lib/python2.7/multiprocessing/pool.py\", line 567, in get
    raise self._value
NameError: Exception in func0

Original Traceback (most recent call last):
  File \"bt.py\", line 13, in wrapper
    return func(*args, **kwargs)
  File \"bt.py\", line 27, in main
    return func1()
  File \"bt.py\", line 23, in func1
    return func0()
  File \"bt.py\", line 20, in func0
    raise NameError(\"Exception in func0\")
NameError: Exception in func0
没有装饰器的回溯:
Traceback (most recent call last):
  File \"bt.py\", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File \"/opt/anaconda/lib/python2.7/multiprocessing/pool.py\", line 567, in get
    raise self._value
NameError: Exception in func0
    
这是此出色答案的一种形式。两者都依靠tblib来存储回溯。 但是,不必返回异常对象(如OP所要求的),可以将
worker
函数保持不变,并将其包装在
try
/
except
中,以存储要重新引发的异常。
import tblib.pickling_support
tblib.pickling_support.install()

import sys

class DelayedException(Exception):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()
        super(DelayedException, self).__init__(str(ee))

    def re_raise(self):
        raise self.ee, None, self.tb
def worker():
    try:
        raise ValueError(\'Something went wrong.\')
    except Exception as e:
        raise DelayedException(e)


if __name__ == \'__main__\':

    import multiprocessing

    pool = multiprocessing.Pool()
    try:
        pool.imap(worker, [1, 2, 3])
    except DelayedException as e:
        e.re_raise()
    
与@Syrtis Major和@interfect相同的解决方案,但经过Python 3.6测试:
import sys
import traceback
import functools

def catch_remote_exceptions(wrapped_function):
    \"\"\" https://stackoverflow.com/questions/6126007/python-getting-a-traceback \"\"\"

    @functools.wraps(wrapped_function)
    def new_function(*args, **kwargs):
        try:
            return wrapped_function(*args, **kwargs)

        except:
            raise Exception( \"\".join(traceback.format_exception(*sys.exc_info())) )

    return new_function
用法:
class ProcessLocker(object):
    @catch_remote_exceptions
    def __init__(self):
        super().__init__()

    @catch_remote_exceptions
    def create_process_locks(self, total_processes):
        self.process_locks = []
        # ...
    

要回复问题请先登录注册