python实现把其他sql server数据库的某些表的前一天数据定时存储到自己数据库同名的表中
创始人
2025-01-08 20:33:54
0
import schedule import time import pyodbc import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, text import warnings import logging  # 配置数据库连接 source_databases = [     {         'database_name': '',         'server': '',         'database': '',         'username': '',         'password': '',         'branch_id': 0  # 分店ID     }, ]  # 目标数据库配置 target_database = {     'database_name': '',     'server': '',     'database': '',     'username': '',     'password': '' }  # 要处理的表及其唯一标识字段和日期字段 tables = {     'cmis_patientinfo': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'},     'cmis_yuyue': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, }   # 连接数据库 def connect_to_db(config):     connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config['server']};DATABASE={config['database']};UID={config['username']};PWD={config['password']}"     return pyodbc.connect(connection_string)   # 处理数据 def process_data(df, branch_id, fendian_field):     df[fendian_field] = branch_id  # 动态更新分店ID     return df   # 获取前一天的数据 def get_yesterday_data(connection, table, date_field):     # 获取昨天的日期和时间(0点)     yesterday_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)     # 获取昨天的日期和时间(23:59:59)     yesterday_end = yesterday_start + timedelta(hours=23, minutes=59, seconds=59)      # 查询前一天的数据以包含 upload 不为 5 或为 NULL 的条件     query = f"SELECT * FROM {table} WHERE {date_field} BETWEEN ? AND ? AND (upload != 5 OR upload IS NULL)"      # 忽略pandas发出的特定UserWarning     warnings.filterwarnings('ignore', category=UserWarning,                             message="pandas only supports SQLAlchemy connectable")      return pd.read_sql(query, connection, params=[yesterday_start, yesterday_end])   # 获取目标表的列名 def get_target_columns(connection, table):     cursor = connection.cursor()     cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'")     columns = [row.COLUMN_NAME for row in cursor.fetchall()]     cursor.close()     return columns   # 数据类型转换函数 def convert_data_types(row):     new_row = []     for value in row:         if pd.isnull(value):             new_row.append(None)         elif isinstance(value, pd.Timestamp):             new_row.append(value.to_pydatetime())         else:             new_row.append(value)     return tuple(new_row)   # 插入数据到目标数据库并更新upload字段 def insert_data_to_target(source_connection, target_connection, table, unique_field, df, db_config):     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据插入中...")     source_cursor = source_connection.cursor()     target_cursor = target_connection.cursor()      target_columns = get_target_columns(target_connection, table)     df_columns = df.columns.tolist()      # 过滤出目标表存在的列     common_columns = [col for col in df_columns if col in target_columns]      success_count = 0     failure_count = 0     error = ''      for index, row in df.iterrows():         columns = ', '.join(common_columns)         placeholders = ', '.join(['?' for _ in common_columns])         values = convert_data_types(row[common_columns])          insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"          try:             target_cursor.execute(insert_query, values)             target_connection.commit()              unique_value = row[unique_field]             # 更新源数据库中 upload 字段为 5             update_source_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             source_cursor.execute(update_source_query, unique_value)             source_connection.commit()             # 更新目标数据库中 upload 字段为 5             update_target_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             target_cursor.execute(update_target_query, unique_value)             target_connection.commit()              success_count += 1         except Exception as e:             failure_count += 1             error = e             target_connection.rollback()             source_connection.rollback()      source_cursor.close()     target_cursor.close()      message = f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条"      if failure_count > 0:         message += f", 失败原因: {error}"      print(message)     # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条")   # 主任务 def main_task():     source_conn = None     target_conn = None     for db_config in source_databases:         try:             # 连接数据库             source_conn = connect_to_db(db_config)             print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接成功!")             target_conn = connect_to_db(target_database)             print(f"{time.ctime()} —— {target_database['database_name']}-数据库连接成功!")              for table, fields in tables.items():                 unique_field = fields['unique_field']                 date_field = fields['date_field']                 fendian_field = fields['fendian_field']                 try:                     df = get_yesterday_data(source_conn, table, date_field)                     if not df.empty:                         processed_df = process_data(df, db_config['branch_id'], fendian_field)                         insert_data_to_target(source_conn, target_conn, table, unique_field, processed_df, db_config)                         # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据处理成功!")                     else:                         print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 暂无待处理的昨日数据!")                 except Exception as e:                     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}:处理数据失败!error: {e}")         except Exception as e:             # print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             logging.error(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             continue         finally:             try:                 if source_conn is not None:                     source_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {db_config['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {db_config['database_name']}-数据库连接时出错: {e}")              try:                 if target_conn is not None:                     target_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {target_database['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {target_database['database_name']}-数据库连接时出错: {e}")  # 定时任务 schedule.every().day.at("03:00").do(main_task)  while True:     schedule.run_pending()     time.sleep(40)  

相关内容

热门资讯

黑科技辅助“微信开心泉州辅助器... 黑科技辅助“微信开心泉州辅助器”外挂透视辅助工具(一贯有挂);1、该软件可以轻松地帮助玩家将微信开心...
第3分钟了解!新全游辅助器(辅... 《第3分钟了解!新全游辅助器(辅助挂)一贯真的是有挂(详细辅助神器)》 新全游辅助器软件透明挂更新公...
揭秘几款"四川游戏家... 揭秘几款"四川游戏家园茶馆辅助"四川游戏家园茶馆辅助(原来真的有挂)1、起透看视 四川游戏家园茶馆辅...
技术分享!四川熊猫辅助软件下载... 技术分享!四川熊猫辅助软件下载(辅助挂)固有真的有挂(详细辅助wepoke教程);实战中需综合运用上...
第七分钟了解!陕西奇迹打锅子破... 第七分钟了解!陕西奇迹打锅子破解(辅助挂)总是存在有挂(详细辅助德州教程)1、陕西奇迹打锅子破解系统...
1分钟了解!微信小程序打哈儿脚... 1分钟了解!微信小程序打哈儿脚本下载(辅助挂)原来存在有挂(详细辅助软件);支持多人共享记分板与复盘...
黑科技辅助挂“财神十三脚本”外... 黑科技辅助挂“财神十三脚本”外挂透视辅助挂(本来存在有挂);1、游戏颠覆性的策略玩法,独创攻略技巧玩...
热点推荐!新上游通用挂是真的吗... 《热点推荐!新上游通用挂是真的吗(辅助挂)起初有挂(详细辅助细节揭秘)》 新上游通用挂是真的吗软件透...
推荐一款"天天爱柳州... 推荐一款"天天爱柳州辅助"天天爱柳州辅助(总是真的有挂)1、构建自己的天天爱柳州辅助辅助插件;2、选...
第七分钟辅助挂!新海贝之城ap... 第七分钟辅助挂!新海贝之城app破解(辅助挂)原来有挂(详细辅助细节揭秘)1、每一步都需要思考,不同...