python实现把其他sql server数据库的某些表的前一天数据定时存储到自己数据库同名的表中
创始人
2025-01-08 20:33:54
0
import schedule import time import pyodbc import pandas as pd from datetime import datetime, timedelta from sqlalchemy import create_engine, text import warnings import logging  # 配置数据库连接 source_databases = [     {         'database_name': '',         'server': '',         'database': '',         'username': '',         'password': '',         'branch_id': 0  # 分店ID     }, ]  # 目标数据库配置 target_database = {     'database_name': '',     'server': '',     'database': '',     'username': '',     'password': '' }  # 要处理的表及其唯一标识字段和日期字段 tables = {     'cmis_patientinfo': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'},     'cmis_yuyue': {'unique_field': '唯一标识', 'date_field': '日期字段', 'fendian_field': '分店ID'}, }   # 连接数据库 def connect_to_db(config):     connection_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config['server']};DATABASE={config['database']};UID={config['username']};PWD={config['password']}"     return pyodbc.connect(connection_string)   # 处理数据 def process_data(df, branch_id, fendian_field):     df[fendian_field] = branch_id  # 动态更新分店ID     return df   # 获取前一天的数据 def get_yesterday_data(connection, table, date_field):     # 获取昨天的日期和时间(0点)     yesterday_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)     # 获取昨天的日期和时间(23:59:59)     yesterday_end = yesterday_start + timedelta(hours=23, minutes=59, seconds=59)      # 查询前一天的数据以包含 upload 不为 5 或为 NULL 的条件     query = f"SELECT * FROM {table} WHERE {date_field} BETWEEN ? AND ? AND (upload != 5 OR upload IS NULL)"      # 忽略pandas发出的特定UserWarning     warnings.filterwarnings('ignore', category=UserWarning,                             message="pandas only supports SQLAlchemy connectable")      return pd.read_sql(query, connection, params=[yesterday_start, yesterday_end])   # 获取目标表的列名 def get_target_columns(connection, table):     cursor = connection.cursor()     cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'")     columns = [row.COLUMN_NAME for row in cursor.fetchall()]     cursor.close()     return columns   # 数据类型转换函数 def convert_data_types(row):     new_row = []     for value in row:         if pd.isnull(value):             new_row.append(None)         elif isinstance(value, pd.Timestamp):             new_row.append(value.to_pydatetime())         else:             new_row.append(value)     return tuple(new_row)   # 插入数据到目标数据库并更新upload字段 def insert_data_to_target(source_connection, target_connection, table, unique_field, df, db_config):     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据插入中...")     source_cursor = source_connection.cursor()     target_cursor = target_connection.cursor()      target_columns = get_target_columns(target_connection, table)     df_columns = df.columns.tolist()      # 过滤出目标表存在的列     common_columns = [col for col in df_columns if col in target_columns]      success_count = 0     failure_count = 0     error = ''      for index, row in df.iterrows():         columns = ', '.join(common_columns)         placeholders = ', '.join(['?' for _ in common_columns])         values = convert_data_types(row[common_columns])          insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"          try:             target_cursor.execute(insert_query, values)             target_connection.commit()              unique_value = row[unique_field]             # 更新源数据库中 upload 字段为 5             update_source_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             source_cursor.execute(update_source_query, unique_value)             source_connection.commit()             # 更新目标数据库中 upload 字段为 5             update_target_query = f"UPDATE {table} SET upload = 5 WHERE {unique_field} = ?"             target_cursor.execute(update_target_query, unique_value)             target_connection.commit()              success_count += 1         except Exception as e:             failure_count += 1             error = e             target_connection.rollback()             source_connection.rollback()      source_cursor.close()     target_cursor.close()      message = f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条"      if failure_count > 0:         message += f", 失败原因: {error}"      print(message)     # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 插入成功 {success_count} 条, 插入失败 {failure_count} 条")   # 主任务 def main_task():     source_conn = None     target_conn = None     for db_config in source_databases:         try:             # 连接数据库             source_conn = connect_to_db(db_config)             print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接成功!")             target_conn = connect_to_db(target_database)             print(f"{time.ctime()} —— {target_database['database_name']}-数据库连接成功!")              for table, fields in tables.items():                 unique_field = fields['unique_field']                 date_field = fields['date_field']                 fendian_field = fields['fendian_field']                 try:                     df = get_yesterday_data(source_conn, table, date_field)                     if not df.empty:                         processed_df = process_data(df, db_config['branch_id'], fendian_field)                         insert_data_to_target(source_conn, target_conn, table, unique_field, processed_df, db_config)                         # print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 数据处理成功!")                     else:                         print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}: 暂无待处理的昨日数据!")                 except Exception as e:                     print(f"{time.ctime()} —— {db_config['database_name']}:表-{table}:处理数据失败!error: {e}")         except Exception as e:             # print(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             logging.error(f"{time.ctime()} —— {db_config['database_name']}-数据库连接失败! error: {e}")             continue         finally:             try:                 if source_conn is not None:                     source_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {db_config['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {db_config['database_name']}-数据库连接时出错: {e}")              try:                 if target_conn is not None:                     target_conn.close()                     print(f"{time.ctime()} —— 关闭数据库连接 {target_database['database_name']}")             except Exception as e:                 logging.error(f"{time.ctime()} —— 关闭 {target_database['database_name']}-数据库连接时出错: {e}")  # 定时任务 schedule.every().day.at("03:00").do(main_task)  while True:     schedule.run_pending()     time.sleep(40)  

相关内容

热门资讯

盘点几款!微信牵手跑有没有挂,... 微信牵手跑有没有挂是一款专注玩家量身打造的游戏记牌类型软件,在微信牵手跑有没有挂这款游戏中我们可以记...
重大发现!丽水都来脚本辅助,七... >>您好:丽水都来脚本辅助确实是有挂的,很多玩家在这款丽水都来脚本辅助游戏中打牌都会发现很多用户的牌...
三分钟了解!瓜瓜丰城手机辅助,... 三分钟了解!瓜瓜丰城手机辅助,新海贝之城脚本(有挂开挂辅助脚本);无需打开直接搜索打开薇:13670...
7分钟知晓!天天贵阳app修改... >>您好:边锋透视器辅助器微信确实是有挂的,很多玩家在这款边锋透视器辅助器微信游戏中打牌都会发现很多...
透视存在!wpk俱乐部是真的吗... 广东雀用的是什么智能插件官是一款专注玩家量身打造的游戏记牌类型软件,在广东雀用的是什么智能插件官这款...
一分钟了解!玉海楼茶苑脚本,随... 玉海楼茶苑脚本是一款专注玩家量身打造的游戏记牌类型软件,在玉海楼茶苑脚本这款游戏中我们可以记录下每张...
第七瞬间精通!兴动助手脚本有辅... 您好:这款兴动助手脚本有辅助游戏是可以开挂的,确实是有挂的,很多玩家在这款兴动助手脚本有辅助游戏中打...
透视脚本!雀友会广东潮汕麻雀万... 大家好,今天小编来为大家解答雀友会广东潮汕麻雀万能辅助器这个问题咨询软件客服可以免费测试直接加微信(...
总算了解!德扑之星安卓插件,开... 总算了解!德扑之星安卓插件,开心泉州小程序辅助哪里查看(有挂开挂辅助神器);无需打开直接搜索加薇13...
第3阶段熟悉!福建兄弟十三水辅... 第3阶段熟悉!福建兄弟十三水辅助器下载,雀神麻将小程序辅助软件(有挂开挂辅助安装)这是一款可以让一直...