python实现把其他sql server数据库的某些表的前一天数据定时存储到自己数据库同名的表中
创始人
2025-01-08 20:33:54
0
import schedule import time import pyodbc import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, text import warnings import logging  # 配置数据库连接 source_databases = [     {         'database_name': '',         'server': '',         'database': '',         'username': '',         'password': '',         'branch_id': 0  # 分店ID     }, ]  # 目标数据库配置 target_database = {     'database_name': '',     'server': '',     'database': '',     'username': '',     'password': '' }  # 要处理的表及其唯一标识字段和日期字段 tables = {     'cmis_patientinfo': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'},     'cmis_yuyue': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, }   # 连接数据库 def connect_to_db(config):     connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config['server']};DATABASE={config['database']};UID={config['username']};PWD={config['password']}"     return pyodbc.connect(connection_string)   # 处理数据 def process_data(df, branch_id, fendian_field):     df[fendian_field] = branch_id  # 动态更新分店ID     return df   # 获取前一天的数据 def get_yesterday_data(connection, table, date_field):     # 获取昨天的日期和时间(0点)     yesterday_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)     # 获取昨天的日期和时间(23:59:59)     yesterday_end = yesterday_start + timedelta(hours=23, minutes=59, seconds=59)      # 查询前一天的数据以包含 upload 不为 5 或为 NULL 的条件     query = f"SELECT * FROM {table} WHERE {date_field} BETWEEN ? AND ? AND (upload != 5 OR upload IS NULL)"      # 忽略pandas发出的特定UserWarning     warnings.filterwarnings('ignore', category=UserWarning,                             message="pandas only supports SQLAlchemy connectable")      return pd.read_sql(query, connection, params=[yesterday_start, yesterday_end])   # 获取目标表的列名 def get_target_columns(connection, table):     cursor = connection.cursor()     cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'")     columns = [row.COLUMN_NAME for row in cursor.fetchall()]     cursor.close()     return columns   # 数据类型转换函数 def convert_data_types(row):     new_row = []     for value in row:         if pd.isnull(value):             new_row.append(None)         elif isinstance(value, pd.Timestamp):             new_row.append(value.to_pydatetime())         else:             new_row.append(value)     return tuple(new_row)   # 插入数据到目标数据库并更新upload字段 def insert_data_to_target(source_connection, target_connection, table, unique_field, df, db_config):     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据插入中...")     source_cursor = source_connection.cursor()     target_cursor = target_connection.cursor()      target_columns = get_target_columns(target_connection, table)     df_columns = df.columns.tolist()      # 过滤出目标表存在的列     common_columns = [col for col in df_columns if col in target_columns]      success_count = 0     failure_count = 0     error = ''      for index, row in df.iterrows():         columns = ', '.join(common_columns)         placeholders = ', '.join(['?' for _ in common_columns])         values = convert_data_types(row[common_columns])          insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"          try:             target_cursor.execute(insert_query, values)             target_connection.commit()              unique_value = row[unique_field]             # 更新源数据库中 upload 字段为 5             update_source_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             source_cursor.execute(update_source_query, unique_value)             source_connection.commit()             # 更新目标数据库中 upload 字段为 5             update_target_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             target_cursor.execute(update_target_query, unique_value)             target_connection.commit()              success_count += 1         except Exception as e:             failure_count += 1             error = e             target_connection.rollback()             source_connection.rollback()      source_cursor.close()     target_cursor.close()      message = f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条"      if failure_count > 0:         message += f", 失败原因: {error}"      print(message)     # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条")   # 主任务 def main_task():     source_conn = None     target_conn = None     for db_config in source_databases:         try:             # 连接数据库             source_conn = connect_to_db(db_config)             print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接成功!")             target_conn = connect_to_db(target_database)             print(f"{time.ctime()} —— {target_database['database_name']}-数据库连接成功!")              for table, fields in tables.items():                 unique_field = fields['unique_field']                 date_field = fields['date_field']                 fendian_field = fields['fendian_field']                 try:                     df = get_yesterday_data(source_conn, table, date_field)                     if not df.empty:                         processed_df = process_data(df, db_config['branch_id'], fendian_field)                         insert_data_to_target(source_conn, target_conn, table, unique_field, processed_df, db_config)                         # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据处理成功!")                     else:                         print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 暂无待处理的昨日数据!")                 except Exception as e:                     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}:处理数据失败!error: {e}")         except Exception as e:             # print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             logging.error(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             continue         finally:             try:                 if source_conn is not None:                     source_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {db_config['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {db_config['database_name']}-数据库连接时出错: {e}")              try:                 if target_conn is not None:                     target_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {target_database['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {target_database['database_name']}-数据库连接时出错: {e}")  # 定时任务 schedule.every().day.at("03:00").do(main_task)  while True:     schedule.run_pending()     time.sleep(40)  

相关内容

热门资讯

必备了解!赖子三加一辅助(辅助... 必备了解!赖子三加一辅助(辅助)潘潘讲故事切实是真的辅助软件(哔哩哔哩)1、让任何用户在无需赖子三加...
教你了解!微乐自建房脚本免费下... 教你了解!微乐自建房脚本免费下载(辅助)桂林字牌本来是有辅助器(哔哩哔哩)亲,关键说明,微乐自建房脚...
总结了解!微信途游辅助器(辅助... 总结了解!微信途游辅助器(辅助)趣乐互娱好像是有辅助app(哔哩哔哩)小薇(辅助器软件下载)致您一封...
分享了解!腾威填大坑辅助(辅助... 分享了解!腾威填大坑辅助(辅助)七七麻将本来是有辅助插件(哔哩哔哩);1、完成腾威填大坑辅助辅助器v...
专业了解!顺欣茶楼辅助(辅助)... 专业了解!顺欣茶楼辅助(辅助)新道游一直是有辅助神器(哔哩哔哩)1、许多玩家不知道顺欣茶楼辅助辅助怎...
总结了解!中至九江脚本(辅助)... 总结了解!中至九江脚本(辅助)panda本来是真的辅助工具(哔哩哔哩)1)中至九江脚本免费钻石:进一...
开挂了解!衢州都莱罗松透视辅助... 开挂了解!衢州都莱罗松透视辅助工具(辅助)新鸿狐切实存在有辅助脚本(哔哩哔哩)1)衢州都莱罗松透视辅...
有挂了解!罗松十三道辅助器(辅... 有挂了解!罗松十三道辅助器(辅助)钢铁互娱果然存在有辅助平台(哔哩哔哩)暗藏猫腻,小编详细说明罗松十...
必备了解!约逗东乡辅助器(辅助... 必备了解!约逗东乡辅助器(辅助)琼雀海南麻将好像有挂辅助安装(哔哩哔哩)1、全新机制【约逗东乡辅助器...
关于了解!填大坑辅助工具排行(... 关于了解!填大坑辅助工具排行(辅助)来趣广西麻将切实是真的辅助挂(哔哩哔哩)1、许多玩家不知道填大坑...