在 Windows 上尝试使用 python 多处理时出现 RuntimeError

2024-12-24 08:56:00
admin
原创
87
摘要:问题描述:我正在 Windows 机器上尝试使用线程和多处理编写我的第一个正式 Python 程序。但是我无法启动进程,python 给出了以下消息。问题是,我没有在主模块中启动线程。线程在类内的单独模块中处理。编辑:顺便说一下,此代码在 ubuntu 上运行良好。在 Windows 上不太好RuntimeE...

问题描述:

我正在 Windows 机器上尝试使用线程和多处理编写我的第一个正式 Python 程序。但是我无法启动进程,python 给出了以下消息。问题是,我没有在模块中启动线程。线程在类内的单独模块中处理。

编辑:顺便说一下,此代码在 ubuntu 上运行良好。在 Windows 上不太好

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

我的原始代码很长,但我能够在代码的精简版本中重现错误。它分为两个文件,第一个是主模块,除了导入处理进程/线程和调用方法的模块外,几乎没有其他作用。第二个模块是代码的核心所在。


测试Main.py:

import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

并行测试模块.py:

import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'
'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

解决方案 1:

在 Windows 上,子进程将在启动时导入(即执行)主模块。您需要if __name__ == '__main__':在主模块中插入一个保护,以避免递归创建子进程。

修改的testMain.py

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

解决方案 2:

尝试将代码放入 testMain.py 中的主函数中

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

查看文档:

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

也就是说

“确保主模块可以被新的 Python 解释器安全地导入,而不会导致意外的副作用(例如启动新进程)。”

...通过使用if __name__ == '__main__'

解决方案 3:

尽管之前的答案是正确的,但有一个小问题需要评论一下。

如果您的主模块导入另一个模块,其中定义全局变量或类成员变量并将其初始化为(或使用)某些新对象,则您可能必须以相同的方式限制该导入:

if __name__ ==  '__main__':
  import my_module

解决方案 4:

你好,这是我的多进程结构

from multiprocessing import Process
import time


start = time.perf_counter()


def do_something(time_for_sleep):
    print(f'Sleeping {time_for_sleep} second...')
    time.sleep(time_for_sleep)
    print('Done Sleeping...')



p1 = Process(target=do_something, args=[1])
p2 = Process(target=do_something, args=[2])


if __name__ == '__main__':
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    finish = time.perf_counter()
    print(f'Finished in {round(finish-start,2 )} second(s)')

你不必把导入放进去if __name__ == '__main__':,只需运行你想在里面运行的程序

解决方案 5:

正如 @Ofer 所说,当您使用其他库或模块时,您应该在if __name__ == '__main__':

因此,就我而言,结局是这样的:

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()

解决方案 6:

在 yolo v5 中使用 Python 3.8.5

if __name__ == '__main__':
    from yolov5 import train
    train.run()

解决方案 7:

我在以下非常简单的代码上尝试了上述技巧。但我仍然无法阻止它在任何装有 Python 3.8/3.10 的 Windows 机器上重置。如果您能告诉我我错在哪里,我将不胜感激。

print('script reset')

def do_something(inp):
    print('Done!')

if __name__ == '__main__':
    from multiprocessing import Process, get_start_method
    print('main reset')
    print(get_start_method())
    Process(target=do_something, args=[1]).start()
    print('Finished')

输出显示:

script reset
main reset
spawn
Finished
script reset
Done!

更新:

据我所知,你们并没有阻止包含__main__或 的脚本.start()重置(这在 Linux 中不会发生),而是建议采用变通方法,这样我们就不会看到重置。我们必须将所有导入最小化,并将它们分别放在每个函数中,但相对于 Linux 来说,这仍然很慢。

解决方案 8:

就我的情况而言,这是代码中的一个简单错误,在创建变量之前使用了该变量。在尝试上述解决方案之前,值得检查一下。为什么我会收到这个特定的错误消息,天知道。

解决方案 9:

以下解决方案适用于 python 多处理和 pytorch 多处理。

正如其他答案所提到的,修复是可以的if __name__ == '__main__':,但是我在确定从哪里开始时遇到了几个问题,因为我使用了多个脚本和模块。当我可以在 main 中调用我的第一个函数时,它开始创建多个进程之前的所有操作(不确定为什么)。

将其放在第一行(甚至在导入之前)有效。只有调用第一个函数才会返回超时错误。下面是我的代码的第一个文件,在调用几个函数后使用了多处理,但将 main 放在第一个似乎是这里唯一的解决方法。

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])

    # Get demonstration data if necessary and behavior clone
    if job_data['algorithm'] != 'NPG':
        print("========================================")
        print("Collecting expert demonstrations")
        print("========================================")
        demo_paths = pickle.load(open(job_data['demo_file'], 'rb'))

        ########################################################################################
        demo_paths = demo_paths[0:3]
        print (job_data['demo_file'], len(demo_paths))
        for d in range(len(demo_paths)):
            feats = demo_paths[d]['features']
            feats = np.vstack(feats)
            demo_paths[d]['observations'] = feats

        ########################################################################################

        bc_agent = BC(demo_paths, policy=policy, epochs=job_data['bc_epochs'], batch_size=job_data['bc_batch_size'],
                      lr=job_data['bc_learn_rate'], loss_type='MSE', set_transforms=False)

        in_shift, in_scale, out_shift, out_scale = bc_agent.compute_transformations()
        bc_agent.set_transformations(in_shift, in_scale, out_shift, out_scale)
        bc_agent.set_variance_with_data(out_scale)

        ts = timer.time()
        print("========================================")
        print("Running BC with expert demonstrations")
        print("========================================")
        bc_agent.train()
        print("========================================")
        print("BC training complete !!!")
        print("time taken = %f" % (timer.time() - ts))
        print("========================================")

        # if job_data['eval_rollouts'] >= 1:
        #     score = e.evaluate_policy(policy, num_episodes=job_data['eval_rollouts'], mean_action=True)
        #     print("Score with behavior cloning = %f" % score[0][0])

    if job_data['algorithm'] != 'DAPG':
        # We throw away the demo data when training from scratch or fine-tuning with RL without explicit augmentation
        demo_paths = None

    # ===============================================================================
    # RL Loop
    # ===============================================================================

    rl_agent = DAPG(e, policy, baseline, demo_paths,
                    normalized_step_size=job_data['rl_step_size'],
                    lam_0=job_data['lam_0'], lam_1=job_data['lam_1'],
                    seed=job_data['seed'], save_logs=True
                    )

    print("========================================")
    print("Starting reinforcement learning phase")
    print("========================================")


    ts = timer.time()
    train_agent(job_name=JOB_DIR,
                agent=rl_agent,
                seed=job_data['seed'],
                niter=job_data['rl_num_iter'],
                gamma=job_data['rl_gamma'],
                gae_lambda=job_data['rl_gae'],
                num_cpu=job_data['num_cpu'],
                sample_mode='trajectories',
                num_traj=job_data['rl_num_traj'],
                num_samples= job_data['rl_num_samples'],
                save_freq=job_data['save_freq'],
                evaluation_rollouts=job_data['eval_rollouts'])
    print("time taken = %f" % (timer.time()-ts))

解决方案 10:

我也遇到了同样的问题,@ofter方法是正确的,因为有一些细节需要注意,下面是我修改后调试成功的代码,供大家参考:


if __name__ == '__main__':
    import matplotlib.pyplot as plt
    import numpy as np
    def imgshow(img):
        img = img / 2 + 0.5
        np_img = img.numpy()
        plt.imshow(np.transpose(np_img, (1, 2, 0)))
        plt.show()

    dataiter = iter(train_loader)
    images, labels = dataiter.next()

    imgshow(torchvision.utils.make_grid(images))
    print(' '.join('%5s' % classes[labels[i]] for i in range(4)))

顺便说一下,我没有子程序,只有主程序,但我遇到了和你一样的问题。这表明,在程序段中间导入 Python 库文件时,我们应该添加:

if __name__ == '__main__':
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1120  
  IPD(Integrated Product Development,集成产品开发)流程是一种广泛应用于高科技和制造业的产品开发方法论。它通过跨职能团队的紧密协作,将产品开发周期缩短,同时提高产品质量和市场成功率。在IPD流程中,CDCP(Concept Decision Checkpoint,概念决策检查点)是一个关...
IPD培训课程   75  
  研发IPD(集成产品开发)流程作为一种系统化的产品开发方法,已经在许多行业中得到广泛应用。它不仅能够提升产品开发的效率和质量,还能够通过优化流程和资源分配,显著提高客户满意度。客户满意度是企业长期成功的关键因素之一,而IPD流程通过其独特的结构和机制,能够确保产品从概念到市场交付的每个环节都围绕客户需求展开。本文将深入...
IPD流程   66  
  IPD(Integrated Product Development,集成产品开发)流程是一种以跨职能团队协作为核心的产品开发方法,旨在通过优化资源分配、提高沟通效率以及减少返工,从而缩短项目周期并提升产品质量。随着企业对产品上市速度的要求越来越高,IPD流程的应用价值愈发凸显。通过整合产品开发过程中的各个环节,IPD...
IPD项目管理咨询   76  
  跨部门沟通是企业运营中不可或缺的一环,尤其在复杂的产品开发过程中,不同部门之间的协作效率直接影响项目的成败。集成产品开发(IPD)作为一种系统化的项目管理方法,旨在通过优化流程和增强团队协作来提升产品开发的效率和质量。然而,跨部门沟通的复杂性往往成为IPD实施中的一大挑战。部门之间的目标差异、信息不对称以及沟通渠道不畅...
IPD是什么意思   70  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用