登陆注册
2272

一个读取excel数据处理完成后读入数据库的例子

站长网2023-05-23 17:14:050

最近收集了一批数据,各地根据问题数据做出反馈,但是各地在反馈的时候字段都进行了创新,好在下发的数据内容并没有改变,开始写的单进程的,由于时间较长,耗时380 秒,又改成多进程的,时间缩短为80-秒。现在把程序发出来,请各位大神进行指正。

import multiprocessing

import os

import time

import pandas as pd

from sqlalchemy import create_engine

import asyncio

import warnings

# warnings.simplefilter("ignore")

ywstrList=['经办机构', '原子业务编号', '原子业务名称', '风险名称','风险描述',

'校验规则结果', '创建时间', '风险提示信息', '业务日志号']

ywstrListMemo=['经办机构', '原子业务编号', '原子业务名称', '风险名称','风险描述',

'校验规则结果', '创建时间', '风险提示信息', '业务日志号','memo','time']

szlist = ['省本', '成都', '自贡', '攀枝', '泸州', '德阳', '绵阳', '广元',

'遂宁', '内江', '乐山', '南充', '眉山', '宜宾', '广安', '达州',

'雅安', '巴中', '资阳', '阿坝', '甘孜', '凉山']

# connect = create_engine('mysql pymysql://root:@127.0.0.1:3306/ywgk?charset=utf8')

connect = create_engine('mysql mysqlconnector://root:@127.0.0.1:3306/ywgk?charset=utf8')

# engine = create_engine('mysql mysqlconnector://scott:tiger@localhost/foo')

def findSZ(filename):

for sz in szlist:

if filename.find(sz) != -1:

return sz

return None

def ReadExcel(filename):

xlsdf = ''

xlsdf = pd.read_excel(filename)

""""

remove columns='sz' or '市'

"""

if "市" in list(xlsdf.keys().to_list()):

xlsdf.drop(columns='市', axis=1, inplace=True)

if "sz" in list(xlsdf.keys().to_list()):

xlsdf.drop(columns='sz', axis=1, inplace=True)

xlsdf = xlsdf.fillna("").astype('string')

return xlsdf

def filterDataOfSz(filename, xlsdf):

sz = findSZ(filename)

print(sz)

"""

筛选出包含对应市州的数据。

"""

if sz != None:

xlsdf = xlsdf[xlsdf['经办机构'].str[:2] == sz] # 筛出本市州数据

return xlsdf

def ConCatRestCols(xlsdf):

"""

去掉业务部分字段,保留市州反馈意见。

"""

print(xlsdf)

# if xlsdf==None:

#

print(filename "为空,需要处理")

#

return

xlfdf_keys_set = set(xlsdf.keys().to_list())

xlsdf_restkeys_set = xlfdf_keys_set - set(ywstrList)

xlsdf_restkeys_list = list(xlsdf_restkeys_set)

xls_rest_df = xlsdf.loc[:, xlsdf_restkeys_list] # 可以正确操作

xlsdf['memo'] = '#'

for col in xlsdf_restkeys_list:

xlsdf['memo'] = xlsdf[col]

#

return xlsdf

def SetTimeStamp(filename, xlsdf):

xlsdf['time'] = os.stat(str(filename)).st_mtime

return xlsdf

async def ProcessExcelAndtosql(filename, table):

df = ReadExcel(filename)

df = filterDataOfSz(filename=filename, xlsdf=df)

df = ConCatRestCols(df)

df = SetTimeStamp(filename=filename, xlsdf=df)

df = df.loc[:, ywstrListMemo]

print(filename)

print(df)

df.to_sql(name=table, con=connect, if_exists='append', index=False, chunksize=1000, method='multi')

def profile(func):

def wrapers(*args,**kwargs):

print("测试开始")

begin=time.time()

func(*args,**kwargs)

end=time.time()

print(f"耗时{end-begin}秒")

return wrapers

# async def getmsg(msg):

#

print(f'#{msg}')

#

await asyncio.sleep(1)

def getFiles(src:str):

import pathlib

files=[]

for file in pathlib.Path(src).rglob("*.xls?"):

files.append(str(file))

return files

def process_asyncio(files,table):

loop=asyncio.new_event_loop()

tasks=[loop.create_task(ProcessExcelAndtosql(filename,table)) for filename in files]

loop.run_until_complete(asyncio.wait(tasks))

@profile

def run(iterable,table):

process_count = multiprocessing.cpu_count()

# print(process_count)

pool = multiprocessing.Pool(process_count-2)

iterable=get_chunks(iterable, process_count)

for lst in iterable:

pool.apply_async(process_asyncio, args=(lst,table))

pool.close()

pool.join()

def main():

files = getFiles(r"e:\市州返回")

run(files, 'ywgk3')

def get_chunks(iterable,num):

# global iterable

import numpy as np

return np.array_split(iterable, num)

# import profile

if __name__=="__main__":

main()

本人只是编程的业余爱好者,只是把技术用于辅助工作,并没有深入研究技术理论,都是野路子,还请批评指正。

0000
评论列表
共(0)条