python实现把其他sql server数据库的某些表的前一天数据定时存储到自己数据库同名的表中
创始人
2025-01-08 20:33:54
0
import schedule import time import pyodbc import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, text import warnings import logging  # 配置数据库连接 source_databases = [     {         'database_name': '',         'server': '',         'database': '',         'username': '',         'password': '',         'branch_id': 0  # 分店ID     }, ]  # 目标数据库配置 target_database = {     'database_name': '',     'server': '',     'database': '',     'username': '',     'password': '' }  # 要处理的表及其唯一标识字段和日期字段 tables = {     'cmis_patientinfo': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'},     'cmis_yuyue': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, }   # 连接数据库 def connect_to_db(config):     connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config['server']};DATABASE={config['database']};UID={config['username']};PWD={config['password']}"     return pyodbc.connect(connection_string)   # 处理数据 def process_data(df, branch_id, fendian_field):     df[fendian_field] = branch_id  # 动态更新分店ID     return df   # 获取前一天的数据 def get_yesterday_data(connection, table, date_field):     # 获取昨天的日期和时间(0点)     yesterday_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)     # 获取昨天的日期和时间(23:59:59)     yesterday_end = yesterday_start + timedelta(hours=23, minutes=59, seconds=59)      # 查询前一天的数据以包含 upload 不为 5 或为 NULL 的条件     query = f"SELECT * FROM {table} WHERE {date_field} BETWEEN ? AND ? AND (upload != 5 OR upload IS NULL)"      # 忽略pandas发出的特定UserWarning     warnings.filterwarnings('ignore', category=UserWarning,                             message="pandas only supports SQLAlchemy connectable")      return pd.read_sql(query, connection, params=[yesterday_start, yesterday_end])   # 获取目标表的列名 def get_target_columns(connection, table):     cursor = connection.cursor()     cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'")     columns = [row.COLUMN_NAME for row in cursor.fetchall()]     cursor.close()     return columns   # 数据类型转换函数 def convert_data_types(row):     new_row = []     for value in row:         if pd.isnull(value):             new_row.append(None)         elif isinstance(value, pd.Timestamp):             new_row.append(value.to_pydatetime())         else:             new_row.append(value)     return tuple(new_row)   # 插入数据到目标数据库并更新upload字段 def insert_data_to_target(source_connection, target_connection, table, unique_field, df, db_config):     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据插入中...")     source_cursor = source_connection.cursor()     target_cursor = target_connection.cursor()      target_columns = get_target_columns(target_connection, table)     df_columns = df.columns.tolist()      # 过滤出目标表存在的列     common_columns = [col for col in df_columns if col in target_columns]      success_count = 0     failure_count = 0     error = ''      for index, row in df.iterrows():         columns = ', '.join(common_columns)         placeholders = ', '.join(['?' for _ in common_columns])         values = convert_data_types(row[common_columns])          insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"          try:             target_cursor.execute(insert_query, values)             target_connection.commit()              unique_value = row[unique_field]             # 更新源数据库中 upload 字段为 5             update_source_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             source_cursor.execute(update_source_query, unique_value)             source_connection.commit()             # 更新目标数据库中 upload 字段为 5             update_target_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             target_cursor.execute(update_target_query, unique_value)             target_connection.commit()              success_count += 1         except Exception as e:             failure_count += 1             error = e             target_connection.rollback()             source_connection.rollback()      source_cursor.close()     target_cursor.close()      message = f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条"      if failure_count > 0:         message += f", 失败原因: {error}"      print(message)     # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条")   # 主任务 def main_task():     source_conn = None     target_conn = None     for db_config in source_databases:         try:             # 连接数据库             source_conn = connect_to_db(db_config)             print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接成功!")             target_conn = connect_to_db(target_database)             print(f"{time.ctime()} —— {target_database['database_name']}-数据库连接成功!")              for table, fields in tables.items():                 unique_field = fields['unique_field']                 date_field = fields['date_field']                 fendian_field = fields['fendian_field']                 try:                     df = get_yesterday_data(source_conn, table, date_field)                     if not df.empty:                         processed_df = process_data(df, db_config['branch_id'], fendian_field)                         insert_data_to_target(source_conn, target_conn, table, unique_field, processed_df, db_config)                         # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据处理成功!")                     else:                         print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 暂无待处理的昨日数据!")                 except Exception as e:                     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}:处理数据失败!error: {e}")         except Exception as e:             # print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             logging.error(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             continue         finally:             try:                 if source_conn is not None:                     source_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {db_config['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {db_config['database_name']}-数据库连接时出错: {e}")              try:                 if target_conn is not None:                     target_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {target_database['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {target_database['database_name']}-数据库连接时出错: {e}")  # 定时任务 schedule.every().day.at("03:00").do(main_task)  while True:     schedule.run_pending()     time.sleep(40)  

相关内容

热门资讯

绝活儿辅助!广西老友玩老是输怎... 绝活儿辅助!广西老友玩老是输怎么办(辅助挂)都是真的有辅助app(讲解有挂)在进入广西老友玩老是输怎...
法门辅助!福建13水插件(辅助... 法门辅助!福建13水插件(辅助挂)一贯是有辅助技巧(有挂技术)1、许多玩家不知道福建13水插件辅助怎...
办法辅助!潮友会app下载官方... 办法辅助!潮友会app下载官方辅助器(辅助挂)真是真的是有辅助app(有挂教程)该软件可以轻松地帮助...
妙招辅助!邯郸胡乐挂辅助(辅助... 妙招辅助!邯郸胡乐挂辅助(辅助挂)好像存在有辅助插件(有挂方略)1、上手简单,内置详细流程视频教学,...
教程书辅助!乐酷辅助(辅助挂)... 教程书辅助!乐酷辅助(辅助挂)其实存在有辅助脚本(有挂细节)乐酷辅助能透视中分为三种模型:乐酷辅助模...
学习辅助!决战卡五星辅助(辅助... 学习辅助!决战卡五星辅助(辅助挂)本来真的是有辅助软件(有人有挂)学习辅助!决战卡五星辅助(辅助挂)...
绝活辅助!边锋嘉兴麻将辅助器(... 绝活辅助!边锋嘉兴麻将辅助器(辅助挂)真是真的有辅助神器(新版有挂)1、边锋嘉兴麻将辅助器公共底牌简...
举措辅助!枫叶辅助器(辅助挂)... 举措辅助!枫叶辅助器(辅助挂)本来存在有辅助技巧(竟然有挂)1、下载好枫叶辅助器正确养号方法之后点击...
讲义辅助!点我达辅助(辅助挂)... 讲义辅助!点我达辅助(辅助挂)一直存在有辅助技巧(有人有挂)1、点我达辅助辅助器安装包、点我达辅助辅...
模块辅助!威信茶馆有挂的吗(辅... 模块辅助!威信茶馆有挂的吗(辅助挂)一直真的是有辅助脚本(揭秘有挂)1、玩家可以在威信茶馆有挂的吗线...