Python3 多线程threading处理xlsx/csv数据

2022-07-27,,,,

Python如果单线程执行代码去处理数万个xlsx,包括读出和写入的操作,整个过程耗时会很长。本文以处理一批15000个csv文件为例,对比Python3单线程和多线程处理效率。

任务:每一个csv包括三个波段的内容,将其分解成三个波段,总共产生45000个文件。

import os
from pandas import Series
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import cmath
import  math
import shutil
from scipy.optimize import curve_fit
from scipy import signal
import time
import threading

1)单线程处理代码,如下:


def hylidar_split(input_raw,split_dir):
    """split 3 channels in one file to one channel in one file"""
    fns_raw = os.listdir(input_raw)
    for i in range(len(fns_raw)):
        df = pd.read_csv(os.path.join(input_raw + '\\', fns_raw[i]))
        ch2_name = fns_raw[i][-20:-16]
        ch3_name = fns_raw[i][-15:-11]
        ch4_name = fns_raw[i][-10:-6]
        #Emittted_bb == Emitted broad band
        df.rename(columns={'channel1': 'Emitted_bb', 'channel2': ch2_name, 'channel3': ch3_name, 'channel4': ch4_name}, inplace=True)#将列重命名一下
        #split_dir + '\\' + fns_raw[i][:-23]  + ch2_name + fns_raw[i][-6:]
        #F:\SCI1数据处理\d8_2process\d8_2split +\\+试验d8_2_X_-10_Y_-10+ch01+_1.csv
        df[['time', 'Emitted_bb', ch2_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch2_name + fns_raw[i][-6:],index=False)#选择需要的列生产新csv文件
        df[['time', 'Emitted_bb', ch3_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch3_name + fns_raw[i][-6:],index=False)
        df[['time', 'Emitted_bb', ch4_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch4_name + fns_raw[i][-6:],index=False)
        del df

if __name == "__main__":
	rawdata_dir = "F:\\leaf data\\WT梧桐\\叶片"
    split_dir = "F:\\leaf data\\WT梧桐\\split_wt"
    beginning_time = time.time()
    localtime = time.asctime( time.localtime(time.time()) )
    
    hylidar_split(rawdata_dir, split_dir)
    
    ending_time = time.time()
    time_cost = (ending_time - beginning_time)/60 
    localtime = time.asctime( time.localtime(time.time()) )
    print("end at :",localtime)
    print("process took",time_cost,"minutes!")

输出:

end at : Thu Nov 19 16:23:55 2020
process took 4.528384971618652 minutes!

2)多线程处理代码,如下:

def hylidar_split(fns_raw,rawdata_dir,split_dir):
    """split 3 channels in one file to one channel in one file"""
    for i in range(len(fns_raw)):
        df = pd.read_csv(rawdata_dir+'\\'+fns_raw[i])
        ch2_name = fns_raw[i][-20:-16]
        ch3_name = fns_raw[i][-15:-11]
        ch4_name = fns_raw[i][-10:-6]
        #Emittted_bb == Emitted broad band
        df.rename(columns={'channel1': 'Emitted_bb', 'channel2': ch2_name, 'channel3': ch3_name, 'channel4': ch4_name}, inplace=True)#将列重命名一下
        #split_dir + '\\' + fns_raw[i][:-23]  + ch2_name + fns_raw[i][-6:]
        #F:\SCI1数据处理\d8_2process\d8_2split +\\+试验d8_2_X_-10_Y_-10+ch01+_1.csv
        df[['time', 'Emitted_bb', ch2_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch2_name + fns_raw[i][-6:],index=False)#选择需要的列生产新csv文件
        df[['time', 'Emitted_bb', ch3_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch3_name + fns_raw[i][-6:],index=False)
        df[['time', 'Emitted_bb', ch4_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch4_name + fns_raw[i][-6:],index=False)
        del df

if __name__ == "__main__":
    
    rawdata_dir = "F:\\leaf data\\WT梧桐\\叶片"
    split_dir = "F:\\leaf data\\WT梧桐\\split_wt"
    
    beginning_time = time.time()
    localtime = time.asctime( time.localtime(time.time()) )
    print("beginning at :",localtime)
    #wl = [409, 425, 442, 458, 474, 491, 507, 523, 540, 556, 572, 589, 605, 621, 637, 653, 670, 686, 703, 719, 735, 751, 768, 784,800, 816, 833, 840, 865, 882, 898, 914]
    n_threads = 16
    thread_list = []
    fns_raw = os.listdir(rawdata_dir)
    step = int(len(fns_raw)/n_threads)
    count = 0
    for item in range(n_threads):
        current_fns = fns_raw[count:count+step]
        thread = threading.Thread(target = hylidar_split,args=(current_fns,rawdata_dir,split_dir,))
        thread_list.append(thread)
        thread.start()
        count+=step
    for item2 in thread_list:
        item2.join()
    ending_time = time.time()
    time_cost = (ending_time - beginning_time)/60 
    localtime = time.asctime( time.localtime(time.time()) )
    print("end at :",localtime)
    print("process took",time_cost,"minutes!")

输出:

end at : Thu Nov 19 22:10:37 2020
process took 3.288083263238271 minutes!

减少1.3mins.
从这个例子来看,并没有减少多长时间。
另外也试过Python3最新的线程池
from concurrent.futures import ThreadPoolExecutor
以及多进程处理
import multiprocessing
时间均为3分多一点。
不知是逻辑不对,理解不到位还是哪里的问题,我计算机16线程全部用上,也才减少一分钟。

本文地址:https://blog.csdn.net/qq_37970770/article/details/109816913

《Python3 多线程threading处理xlsx/csv数据.doc》

下载本文的Word格式文档,以方便收藏与打印。