1. <track id="5jh9g"><ruby id="5jh9g"></ruby></track>

        深圳幻海軟件技術有限公司歡迎您!

        幻海優品

        Python multiprocessing 多進程間通信傳遞DataFrame的方法

        1、Python 多進程

        參考文檔:Python 異步編程 多進程

        2、使用multiprocessing.Manager的Namespace()實現

        可以使用multiprocessing.Manager為所有進程提供單例DataFrame實例。有幾種不同的方法可以達到相同的效果,但可能最簡單的方法是將DataFrame放入multiprocessing.Manager實例的Namespace中。

        from multiprocessing import Manager,Processimport pandas as pdimport timedf = pd.DataFrame([[10,6,7,8],[1,9,12,14],[5,8,10,6]],columns = ['a','b','c','d'])mgr = Manager()ns = mgr.Namespace()ns.df = dfdef worker(ns):print(ns.df)time.sleep(1)print("end")#另一個進程p = Process(target=worker, args=(ns,))p.start()p.join()

        3、使用BaseManager和SyncManager實現

        使用Python的BaseManagerSyncManager類使用客戶端/服務器設置。首先設置一個服務器,為數據提供代理類。代碼如下,

        1)DataServer.py

        #!/usr/bin/pythonfrom    multiprocessing.managers import SyncManagerimport  numpyimport pandas as pd# Global for storing the data to be servedgData = {}#不同進程共享的代理類#不要把大數據放在這里,因為那會迫使它被管道傳輸到# other進程在那里實例化時,而是只返回一部分當請求時,#全局數據。class DataProxy(object):    def __init__(self):        pass    def getData(self, key, default=None):        global gData        return gData.get(key, None)if __name__ == '__main__':    port  = 5000    gData[1] = pd.DataFrame([[10,6,7,8],[1,9,12,14],[5,8,10,6]],columns = ['a','b','c','d'])    # Start the server on address(host,port)    print('Serving data. Press <ctrl>-c to stop.')    class myManager(SyncManager): pass    myManager.register('DataProxy', DataProxy)    mgr = myManager(address=('', port), authkey='DataProxy01'.encode())    server = mgr.get_server()    server.serve_forever()

        2)DataClient.py

        from   multiprocessing.managers import BaseManagerimport psutil   #用于獲取進程信息# 獲取共享代理類。該類中的所有方法都在這里可用class DataClient(object):    def __init__(self, port):        #assert DataClient._checkForProcess('DataServer.py'), 'Must have DataServer running'        class myManager(BaseManager): pass        myManager.register('DataProxy')        self.mgr = myManager(address=('localhost', port), authkey='DataProxy01'.encode())        self.mgr.connect()        self.proxy = self.mgr.DataProxy()    # 驗證服務器正在運行 (非必須的)    @staticmethod    def _checkForProcess(name):        for proc in psutil.process_iter():            print(proc.name())            if proc.name() == name:                                return True        return False

        3)使用示例

        先運行DataServer.py,然后運行保存的下面代碼,如下,

        #!/usr/bin/pythonimport timeimport multiprocessing as mpimport numpyfrom   DataClient import *    # “代理”對每個子進程都是全局的,# 不是在所有進程之間共享的gProxy = NonegMode  = NonegDummy = Nonedef init(port, mode):    global gProxy, gMode, gDummy    gProxy  = DataClient(port).proxy    gMode  = mode    gDummy = numpy.random.rand(1000)     print('Init proxy ', id(gProxy), 'in ', mp.current_process())def worker(key):    global gProxy, gMode, gDummy    if 0 == gMode:   # 從代理獲取        array = gProxy.getData(key)        print(array)    elif 1 == gMode: # 測試區別        array = gDummy    else: assert 0, 'unknown mode: %s' % gMode if __name__ == '__main__':    port   = 5000    maxkey = 1000    numpts = 100                       for mode in [1, 0]:        for nprocs in [16, 1]:            if 0==mode: print('使用 client/server %d processes' % nprocs)            if 1==mode: print('使用 local data %d processes' % nprocs)            pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))            start = time.time()            ret_data = pool.map(worker,[1],chunksize=1)            print('took %4.3f seconds' % (time.time()-start))            pool.close()

        免責聲明:以上內容(如有圖片或視頻亦包括在內)有轉載其他網站資源,如有侵權請聯系刪除

        中文在线天堂中文

        1. <track id="5jh9g"><ruby id="5jh9g"></ruby></track>