個人筆記,利用python call api及DB操作
API
import requests
from requests.auth import HTTPBasicAuth
from Class.ExecutionResult import ExecutionResult
class APIClient:
def __init__(self, base_url, userid=None, pwd=None,proxies=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(userid, pwd) if userid and pwd else None
self.proxies = proxies
def post(self, endpoint, params=None, headers=None
, verify=False, cls=None):
result = ExecutionResult()
if endpoint:
url = f"{self.base_url}/{endpoint}"
else:
url = self.base_url
try:
response = requests.post(url, auth=self.auth, json=params
, headers=headers, verify=verify, proxies=self.proxies)
result = self.handle_response(response, cls,result)
except Exception as e:
result.set_failure(error = e, message = "Failed to post!")
return result
# return self.handle_response(response, parse_data_func)
def get(self, endpoint, params=None, headers=None
, verify=False, cls=None):
"""
範例
result = ExecutionResult()
url='https://xxxx'
headers = {
"Authorization": "aaaa",
"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Cookie": "xxxxxxxxxxxxxxxxxxx"
}
proxies = {
"http": "http://xxxxxxx",
"https": "http://xxxxxxxxxxxxxxxxxx"
}
api = APIClient(base_url=url , proxies=proxies)
result = api.get(endpoint=None , params= None ,headers= headers
,verify=False ,cls = Dept)
for item in result.data["data"]:
print(item.d_code)
"""
result = ExecutionResult()
if endpoint:
url = f"{self.base_url}/{endpoint}"
else:
url = self.base_url
try:
response = requests.get(url, auth=self.auth
, headers=headers, params=params
, verify=verify
, proxies=self.proxies
)
result = self.handle_response(response, cls,result)
except Exception as e:
result.set_failure(error = e, message = "Failed to get!")
return result
def handle_response(self, response, cls,result):
if response.status_code == 200:
getResponse = response.json()
code = getResponse.get('code')
msg = getResponse.get('msg')
data = getResponse.get('data')
if cls:
data = self.parse_data_as_class(data,cls)
result.set_success(message = "get post/get successfully!!", data = {"code":code,"msg":msg,"data":data})
else:
response.raise_for_status()
return result
@staticmethod
def parse_data_as_class(data, cls):
"""
:param data: list or dict, 包含需要解析的数据
:param cls: class, 具有 from_dict 方法的类
:return: list, 包含指定类的实例的列表
"""
data_list = []
if isinstance(data, list):
try:
# 将列表中的每个字典转换为指定类的实例
data_list = [cls.from_dict(item) for item in data]
except (ValueError, TypeError, KeyError):
data_list = []
elif isinstance(data, dict):
try:
# 将字典转换为单个指定类的实例并放入列表中
data_list = [cls.from_dict(data)]
except (ValueError, TypeError, KeyError):
data_list = []
return data_list
DB操作
import pyodbc
import pandas as pd
import re
from Class.ExecutionResult import ExecutionResult
class DBUtil:
def __init__(self, connectionString):
self.connectionString = connectionString
def getEQRConnection(self):
result = ExecutionResult()
try:
conn = pyodbc.connect(self.connectionString)
result.set_success(message = "get connection successfully!!", data = conn)
except Exception as e:
result.set_failure(error = e, message = "Failed to get connection!")
return result
def getConnection(self,connectionString):
'''
連線字串
'''
result = ExecutionResult()
try:
conn = pyodbc.connect(connectionString)
result.set_success(message = "get connection successfully!!", data = conn)
except Exception as e:
result.set_failure(error = e, message = "Failed to get connection!")
return result
def closeConnection(self,conn):
'''
關閉Connection
'''
try:
conn.close()
except Exception as e:
print("close db error:",e)
def execute_query(self, sql_query,conn=None):
"""
Execute a SQL query on the database.
"""
result = ExecutionResult()
try:
if conn==None:
conn = pyodbc.connect(self.connectionString)
data = pd.read_sql_query(sql_query,conn)
result.set_success(message = "Exec command successfully!!", data = data)
except Exception as e:
result.set_failure(error = e, message = "Exec command failed!")
return result
def prepare_commandstring(self,query_template, params):
"""
准备带有命名参数的 SQL 查询。
:param query_template: 带有命名参数的查询模板
:param params: 参数字典
:return: (最终的查询字符串, 参数元组)
SELECT * FROM your_table_name
WHERE column1 = {param1} AND column2 = {param2} OR column3 = {param1}
"""
# 使用字符串格式化生成最终的查询
query = query_template.format(**{key: '?' for key in params.keys()})
# 使用正则表达式找到所有的占位符
param_order = re.findall(r'\{(\w+)\}', query_template)
# print(param_order)
# 生成参数元组,确保每个占位符都有对应的参数值
param_values = tuple(params[key] for key in param_order)
# 返回最终的查询字符串和参数元组
return query, param_values
def bulk_insert_or_update(self, df, table_name, key_columns, update_columns=None,conn=None):
"""
批量插入或更新数据
:param df: pandas DataFrame, 包含需要插入或更新的数据
:param table_name: str, 数据库表名,包含 schema 信息
:param key_columns: list, 用于判断是插入还是更新的关键列
:param update_columns: list, 需要更新的列,如果为 None,则更新所有列
"""
"""
範例
result = ExecutionResult()
db = DBUtil("DRIVER={SQL Server}; SERVER=TWTP1APM40\\SQLEXPRESS; DATABASE=HRSDB; UID=jojo; PWD=633605jojo")
data = {
"empNo":["AA404","AA305"]
,"empName":['陳小Jo',"HaHaa"]
}
df = pd.DataFrame(data)
result=db.bulk_insert_or_update(df, "dbo.empTest", key_columns=["empNo"], update_columns=["empName"])
"""
result = ExecutionResult()
try:
if conn==None:
conn = pyodbc.connect(self.connectionString)
cursor = conn.cursor()
# 创建临时表
temp_table_name = f"#{table_name.replace('.', '_')}_tempforpy"
create_temp_table_sql = f"SELECT TOP 0 * INTO {temp_table_name} FROM {table_name}"
cursor.execute(create_temp_table_sql)
conn.commit()
# 将数据插入临时表
insert_temp_sql = f"INSERT INTO {temp_table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['?' for _ in df.columns])})"
cursor.executemany(insert_temp_sql, df.values.tolist())
conn.commit()
# 如果 update_columns 为 None,则更新所有列
if update_columns is None:
update_columns = [col for col in df.columns if col not in key_columns]
# 合并数据
merge_sql = f"""
MERGE INTO {table_name} AS target
USING {temp_table_name} AS source
ON {" AND ".join([f"target.{col} = source.{col}" for col in key_columns])}
WHEN MATCHED AND {" OR ".join([f"target.{col} <> source.{col}" for col in update_columns])} THEN
UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in update_columns])}, target.updatetime = GETDATE()
WHEN NOT MATCHED THEN
INSERT ({', '.join(df.columns)}, updatetime) VALUES ({', '.join([f"source.{col}" for col in df.columns])}, GETDATE());
"""
cursor.execute(merge_sql)
conn.commit()
# 删除临时表
drop_temp_table_sql = f"DROP TABLE {temp_table_name}"
cursor.execute(drop_temp_table_sql)
conn.commit()
cursor.close()
conn.close()
result.set_success(message = "bulk insert / update successfully !!!", data = None)
except Exception as e:
result.set_failure(error = e, message = "bulk insert / update failed !!!")
return result
def bulk_insert(self, df, table_name,conn=None):
"""
批量插入数据
:param df: pandas DataFrame, 包含需要插入的数据
:param table_name: str, 数据库表名
"""
"""
範例
result = ExecutionResult()
db = DBUtil("connectionstring")
data = {
"empNo":["AB404","AB305"]
,"empName":['王大明',"陳小胖"]
}
df = pd.DataFrame(data)
result=db.bulk_insert(df, "dbo.empTest")
"""
result = ExecutionResult()
try:
if conn == None:
conn = pyodbc.connect(self.connectionString)
cursor = conn.cursor()
# 构建插入 SQL 语句
columns = ', '.join(df.columns) + ', updatetime'
placeholders = ', '.join(['?' for _ in df.columns]) + ', GETDATE()'
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入数据
cursor.executemany(insert_sql, df.values.tolist())
conn.commit()
cursor.close()
conn.close()
result.set_success(message = "bulk insert successfully !!!", data = None)
except Exception as e:
result.set_failure(error = e, message = "bulk insert failed !!!")
return result
class ExecutionResult:
def __init__(self):
self.success = True
self.message = ""
self.data = None
self.error = None # Exception
def set_success(self, message="", data=None):
self.success = True
self.message = message
self.data = data
def set_failure(self, error, message=""):
self.success = False
self.message = message
self.error = error
打雜打久了,就變成打雜妹
程式寫久了,就變成老乞丐