Python异步编程|PySimpleGUI界面读取PDF转换Excel
创始人
2025-01-08 13:34:19
0

目录

实例要求

原始pdf文件格式

输出xls文件格式

运行界面

完整代码

代码分析

遍历表格

布局界面

控件简介

写入表格

表格排序

事件循环

异步编程


实例要求

使用PySimpleGUI做一个把单位考勤系统导出的pdf文件合并输出Excel的应用,故事出自:

https://hannyang.blog.csdn.net/article/details/135395946

当时时间紧,没有好好做界面且输出csv文件了事。今天趁周六休息,把代码做一下升级处理,使用库PySimpleGUI做了一个稍微漂亮一点的界面;又用pdfplumber直接遍历多个pdf文件,得到数据后输出Excel文件,比我原本先做合并pdf文件再去取数要快,原先的pdf文件合并操作纯粹有点多余。最后,又尝试对pdf文件读取函数的改造,使用了asyncio异步编程效果非常不错。

下面请听我慢慢道来:

原始pdf文件格式

输出xls文件格式

运行界面

完整代码

import xlwt, pyperclip, asyncio, pdfplumber import os, time, datetime as dt import PySimpleGUI as sg  # 全局变量 table_head = '姓名,部门,应到,实到,出勤率,迟到次数,早退次数,加班(分钟)' path, font = '', ('宋体',12) date, data = [], [] DateFormat = '    .  . -    .  .  ' ErrMessage = '错误' SortedType = ["出勤率排序","加班时长排序","迟到次数排序","早退次数排序"]  # 定义布局 layout = [     [sg.Text("昆山分行考勤表",font=('',16)),      sg.Text(pad=(132,10)),      sg.Text("请选择考勤文件:",font=font),      sg.Input(key="-FOLDER-", enable_events=True, readonly=True,font=font,size=18),      sg.FolderBrowse(button_text='...', enable_events=True, initial_folder='./')      ],     [sg.Text("考勤日期:",font=font),      sg.Text(DateFormat,key='-DATE-',font=font)      ],     [sg.Table(values='',               headings=table_head.split(','),               key='-TABLE-',               auto_size_columns=False,               justification='left',               num_rows=10)],     [sg.Button("输出Excel文件",size=(12,1),pad=(15,30)),      sg.Button(SortedType[0], enable_events=True,size=(10,1),pad=(15,30)),      sg.Button(SortedType[1], enable_events=True,size=(10,1),pad=(15,30)),      sg.Button(SortedType[2], enable_events=True,size=(10,1),pad=(15,30)),      sg.Button(SortedType[3], enable_events=True,size=(10,1),pad=(15,30)),      sg.Button("退出",size=(10,1),pad=(15,30))],     [sg.StatusBar('',key="-BAR-",font=font,size=92)] ]  # 读取pdf表格 async def read_table(file):     dct = dict()     with pdfplumber.open(file) as pdf:         for page in pdf.pages:             tables = page.extract_tables(table_settings = {})             for table in tables:                 for lst in table:                     tmp = lst[1:]                     if not any(tmp): continue                     tmp = [tmp[0]]+tmp[3:8]+[tmp[-1]]                     tmp[0] = tmp[0].replace('\n','')                     tmp[0] = tmp[0].split('/')                     tmp[0] = tmp[0][-1]                     if lst[0]=='时间':                         dct[lst[0]] = tmp[0]                     else:                         dct[','.join([lst[0],tmp[0]])] = ','.join(tmp[1:])     return dct  # 写入xls文件 def write_sheet():     global data, date, table_head, ErrMessage     if ErrMessage[:2] in ('错误','文件'): return     myxl = xlwt.Workbook()     style = xlwt.easyxf('align: wrap yes; align: horiz center; font: bold yes;')      sheet = myxl.add_sheet('考勤表')     wcol = [20,40,60,30,30,40,40,40,60]     for i,w in enumerate(wcol):         sheet.col(i).width = w * 80     sheet.write_merge(0,0,0,8,'出勤统计报表',style)     style = xlwt.easyxf('borders:top thin; borders:bottom thin; borders:left thin; borders:right thin;')      sheet.write_merge(1,1,0,2,'考勤日期:'+date[0])     for i,head in enumerate(['序号']+table_head.split(',')):         sheet.write(2,i,head,style)     for i,row in enumerate(data):         for j,col in enumerate([str(i+1)]+row):             sheet.write(3+i,j,col,style)     for i,t in enumerate(SortedType):         if t in ErrMessage:             tmp = SortedType[i]             break     else: tmp = ""     excel_file = f'昆山分行考勤表{date[0]}({tmp}{strDateTime()}).xls'     ErrMessage = f'文件输出为:{excel_file}'     try:         myxl.save(excel_file)     except:         ErrMessage = '写入excel文件失败!'     finally:         pyperclip.copy('\\'.join((os.getcwd(),excel_file)))         window['-BAR-'].update(ErrMessage)  # 获取当前时间 def strDateTime(diff=0):     now = dt.datetime.now()     time = now + dt.timedelta(days=diff)         return f'{time.year}{time.month:02}{time.day:02}{time.hour:02}{time.minute:02}{time.second:02}'  # 选择并处理文件 async def on_text_changed(event, values):     global date, data, path, ErrMessage     new_path = values["-FOLDER-"]     window["-FOLDER-"].update(new_path.split('/')[-1])     if path==new_path: return     else: path = new_path     pdfs = [f for f in os.listdir(path) if f.endswith('.pdf') and not f.startswith('PDFmerged')]     nums = len(pdfs)     if nums==0:         ErrMessage = '错误:所选文件夹中没有PDF文件!'         window['-BAR-'].update(ErrMessage)         window['-DATE-'].update(DateFormat)         window['-TABLE-'].update(values=[])         return     date, data, sheet = [], [], dict()     tasks = []     for pdf in pdfs:         tasks.append(read_table('/'.join([path,pdf])))     ErrMessage = f'文件读取中(共{nums}个PDF文件)......'     window['-BAR-'].update(ErrMessage)     window.refresh()     results = await asyncio.gather(*tasks)     for r in results:         dt = r.get('时间',None)         if dt: date.append(dt)         sheet.update(r)     if date:         window['-DATE-'].update(date[-1])     for k,v in sheet.items():         if k in ('时间','姓名,所属组织','普通班个人出勤统计报表,'): continue         data.append(','.join([k,v]).split(','))         window['-TABLE-'].update(values=data)     persons = len(data)     departments = len(set([d[1] for d in data]))     if 0:#len(set(date))!=1:         data = []         ErrMessage = f'错误:请检查所选文件存在多个时间段:{",".join(set(date))}'     else:         ErrMessage = f'考勤人数:{persons} / 部门数:{departments}'     window['-BAR-'].update(ErrMessage)  # 表格排序 def on_table_sorted(event, data):     global ErrMessage     if not data: return     slist = ['x[-4][:-1]', 'x[-1]', 'x[-3]', 'x[-2]']     style = slist[SortedType.index(event)]     data = sorted(data, key=lambda x: float(eval(style)), reverse=True)     window['-TABLE-'].update(values=data)     ErrMessage = f'已按{event}更新!'     window['-BAR-'].update(ErrMessage)  # 创建窗口 window = sg.Window("考勤表汇总", layout, finalize=True)  # 事件循环 while True:     event, values = window.read()     if event == sg.WINDOW_CLOSED or event == "退出":         break     elif event == "-FOLDER-":         asyncio.run(on_text_changed(event, values))     elif event in SortedType:         on_table_sorted(event, data)     elif event == "输出Excel文件":         write_sheet()  # 关闭窗口 window.close() 

代码分析

重点代码都用彩色字体加粗标注了:

遍历表格

读取代码如下:

import pdfplumber

......
    with pdfplumber.open(file) as pdf:
        for page in pdf.pages:
            tables = page.extract_tables(table_settings = {})
            for table in tables:
                for lst in table:
                    # 根据表格实际情况来清洗数据
    return dct

布局界面

import PySimpleGUI as pg

layout = [
    [sg.Text("昆山分行考勤表",font=('',16)),
     sg.Text(pad=(132,10)),
     sg.Text("请选择考勤文件:",font=font),
     sg.Input(key="-FOLDER-", enable_events=True, readonly=True,font=font,size=18),
     sg.FolderBrowse(button_text='...', enable_events=True, initial_folder='./')
     ],
    [sg.Text("考勤日期:",font=font),
     sg.Text(DateFormat,key='-DATE-',font=font)
     ],
    [sg.Table(values='',
              headings=table_head.split(','),
              key='-TABLE-',
              auto_size_columns=False,
              justification='left',
              num_rows=10)],
    [sg.Button("输出Excel文件",size=(12,1),pad=(15,30)),
     sg.Button(SortedType[0], enable_events=True,size=(10,1),pad=(15,30)),
     sg.Button(SortedType[1], enable_events=True,size=(10,1),pad=(15,30)),
     sg.Button(SortedType[2], enable_events=True,size=(10,1),pad=(15,30)),
     sg.Button(SortedType[3], enable_events=True,size=(10,1),pad=(15,30)),
     sg.Button("退出",size=(10,1),pad=(15,30))],
    [sg.StatusBar('',key="-BAR-",font=font,size=92)]]

控件简介

除了最常用的Text, Input, Button,使用了 FolderBrowse、Table、StatsBar三个不是最常用的控件,分别是文件夹打开框、表格和状态栏。

表格最重要的三个参数: values, headings, auto_size_columns

sg.Table(values='', headings=table_head.split(','), auto_size_columns=False)

表格数据values和表头headings都列表(分别是二维和一维的),auto_size_columns=False建议不要缺省,否则列宽不可控,各列都自动缩进紧靠在一起。

表格更新数据的方法:window['-TABLE-'].update(values=data)

写入表格

import xlwt

def write_sheet():
    global data, date, table_head, ErrMessage
    if ErrMessage[:2] in ('错误','输出'): return
    myxl = xlwt.Workbook()
    style = xlwt.easyxf('align: wrap yes; align: horiz center; font: bold yes;') 
    sheet = myxl.add_sheet('考勤表')
    wcol = [20,40,60,30,30,40,40,40,60]
    for i,w in enumerate(wcol):
        sheet.col(i).width = w * 80
    sheet.write_merge(0,0,0,8,'出勤统计报表',style)
    style = xlwt.easyxf('borders:top thin; borders:bottom thin; borders:left thin; borders:right thin;') 
    sheet.write_merge(1,1,0,2,'考勤日期:'+date[0])
    for i,head in enumerate(['序号']+table_head.split(',')):
        sheet.write(2,i,head,style)
    for i,row in enumerate(data):
        for j,col in enumerate([str(i+1)]+row):
            sheet.write(3+i,j,col,style)
    for i,t in enumerate(SortedType):
        if t in ErrMessage:
            tmp = SortedType[i]
            break
    else: tmp = ""
    excel_file = f'昆山分行考勤表{date[0]}({tmp}{strDateTime()}).xls'
    ErrMessage = f'输出文件为:{excel_file}'
    try:
        myxl.save(excel_file)
    except:
        ErrMessage = '写入excel文件失败!'

注意单格和多个单元格的写入区别: sheet.write()  sheet.write_merge()

表格排序

SortedType = ["出勤率排序","加班时长排序","迟到次数排序","早退次数排序"]
def on_table_sorted(event, data):
    global ErrMessage
    if not data: return
    slist = ['x[-4][:-1]', 'x[-1]', 'x[-3]', 'x[-2]']
    style = slist[SortedType.index(event)]
    data = sorted(data, key=lambda x: float(eval(style)), reverse=True)
    window['-TABLE-'].update(values=data)
    ErrMessage = f'已按{event}更新!'
    window['-BAR-'].update(ErrMessage)

虽然经常有人诟病eval()函数的安全性,但这里还是用eval()简化表格排序事件,否则要多写很多代码。

事件循环

while True:
    event, values = window.read()
    if event == sg.WINDOW_CLOSED or event == "退出":
        break
    elif event == "-FOLDER-":
        asyncio.run(on_text_changed(event, values))
    elif event in SortedType:
        on_table_sorted(event, data)
    elif event == "输出Excel文件":
        write_sheet()

异步编程

此时,请出本篇的主角“异步编程”,什么是异步编程呢?就是有点多任务操作的意思。

异步编程是一种编程范式,它允许某些操作在等待结果时不阻塞整个程序。在传统的同步编程中,程序会按照顺序执行,一旦遇到需要等待的操作(如文件I/O或网络请求),整个程序就会被阻塞,等待操作完成。而在异步编程中,程序并不会因为某个耗时的IO操作而停下其他所有任务,而是将这个任务交给系统处理,自身继续执行后续的操作,等到IO操作完成后,系统会通知程序进行下一步的处理。

asyncio

在上一段代码中,响应"-FOLDER-"时使用了asyncio.run()函数:

import asyncio
....... ......

while True:
    event, values = window.read()
    if event == sg.WINDOW_CLOSED or event == "退出":
        break
    elif event == "-FOLDER-":
        asyncio.run(on_text_changed(event, values))

asyncio.run运行的这个是异步编程的主函数,需要用asyncdef来定义:

async def

asyncdefon_text_changed(event, values):
    ......其它代码略......
    tasks = []
    for pdf in pdfs:
        tasks.append(read_table('/'.join([path,pdf])))
    ErrMessage = f'文件读取中(共{nums}个PDF文件)......'
    window['-BAR-'].update(ErrMessage)
    window.refresh()
    results = awaitasyncio.gather(*tasks)
    for r in results:
       ......遍历取回的被调异步函数返回值的列表......

await

异步主函数中使用 awaitasyncio.gather(*tasks) 取回被函数的返回结果,返回结果是多个任务的返回值组成的列表;而主函数的任务呢就,是被调函数组成的列表:asks.append(read_table())

同样的,被调函数也需要用asyncdef来定义,它一般都是文件I/O或网络请求等比较耗时的操作:

asyncdefread_table(file):
    dct = dict()
    with pdfplumber.open(file) as pdf:
        # 读取pdf文件 I/O操作
    return dct


源码和2个例表已绑定上传资源,欢迎下载测试。

相关内容

热门资讯

从基础知识到应用实例,一站式掌... 前言大家好,我是阔升。今天,我要和大家聊聊 Python 中的正则表达式...
UNiapp微信小程序Ucha... 效果图如下以上为加载接口所得数据的玫瑰图与折线图具体步骤如下1,将插件导入Hbuile...
模拟器小程序/APP抓包(Re... 一、使用adb连接上MUMU模拟器打开多开器点击ADB图标连接模拟器端口: adb c...
设计模式-概述* 1.代码的质量的评判可维护性:不破坏原有代码设计以及不引入新的bug的前提下ÿ...
速学Django:Web开发从... 编辑推荐适读人群 :Web开发 和100000+读者一起跟小楼老师学习...
Django REST fra... 系列文章目录Django入门全攻略:从零搭建你的第一个Web项目Django ORM入...
快速搭建Python(Djan... 文章目录一. 创建vue项目及环境搭建1. 创建vue项目2. 配置axios3. 创建vue组件l...
昇思25天学习打卡营第11天|... 打卡目录打卡序列标注条件随机场(Conditional Random Field, CRF)SCOR...
【数据结构】二叉树的顺序结构及... 目录一.二叉树的顺序结构二.堆的概念及结构三.堆的实现1.向上调整算法2.向下调整算法3.添加数据4...
数据结构——排序之冒泡排序 💞💞 前言hello hello~ ,这里是大耳朵土土...