[筆記] API VS DB操作

個人筆記,利用python call api及DB操作

API

import requests
from requests.auth import HTTPBasicAuth
from Class.ExecutionResult import ExecutionResult

class APIClient:
    def __init__(self, base_url, userid=None, pwd=None,proxies=None):        
        self.base_url = base_url
        self.auth = HTTPBasicAuth(userid, pwd) if userid and pwd else None
        self.proxies = proxies

    def post(self, endpoint, params=None, headers=None
             , verify=False, cls=None):
        result = ExecutionResult()
        if endpoint:
            url = f"{self.base_url}/{endpoint}"
        else:
            url = self.base_url
        
        try:
            response = requests.post(url, auth=self.auth, json=params
                                     , headers=headers, verify=verify, proxies=self.proxies)
            result = self.handle_response(response, cls,result)            
        except Exception as e:
            result.set_failure(error = e, message = "Failed to post!")
        
        return result
        # return self.handle_response(response, parse_data_func)
            
    def get(self, endpoint, params=None, headers=None
            , verify=False, cls=None):
        """
        範例
        result = ExecutionResult()
        url='https://xxxx'
        headers = {
                "Authorization": "aaaa",           
                "user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
                "Cookie": "xxxxxxxxxxxxxxxxxxx"
            }
        proxies = {
                "http":  "http://xxxxxxx",
                "https": "http://xxxxxxxxxxxxxxxxxx"
            }
        api = APIClient(base_url=url , proxies=proxies)
        result = api.get(endpoint=None , params= None ,headers= headers 
                ,verify=False ,cls = Dept)
        for item in result.data["data"]:
            print(item.d_code)
        """
        result = ExecutionResult()
        if endpoint:
            url = f"{self.base_url}/{endpoint}"
        else:
            url = self.base_url
                    
        try:
            response = requests.get(url, auth=self.auth
                                    , headers=headers, params=params
                                    , verify=verify
                                    , proxies=self.proxies
                                    )
            result = self.handle_response(response, cls,result)            
        except Exception as e:
            result.set_failure(error = e, message = "Failed to get!")
                    
        return result

    def handle_response(self, response, cls,result):        
        if response.status_code == 200:            
            getResponse = response.json()
            code = getResponse.get('code')
            msg = getResponse.get('msg')
            data = getResponse.get('data')

            if cls:
                data = self.parse_data_as_class(data,cls)            
            result.set_success(message = "get post/get successfully!!", data = {"code":code,"msg":msg,"data":data})                        
        else:
            response.raise_for_status()
        return result
    
    @staticmethod
    def parse_data_as_class(data, cls):
        """

        :param data: list or dict, 包含需要解析的数据
        :param cls: class, 具有 from_dict 方法的类
        :return: list, 包含指定类的实例的列表
        """
        data_list = []
        if isinstance(data, list):
            try:
                # 将列表中的每个字典转换为指定类的实例
                data_list = [cls.from_dict(item) for item in data]
            except (ValueError, TypeError, KeyError):
                data_list = []
        elif isinstance(data, dict):
            try:
                # 将字典转换为单个指定类的实例并放入列表中
                data_list = [cls.from_dict(data)]
            except (ValueError, TypeError, KeyError):
                data_list = []
        return data_list
                    

DB操作

import pyodbc
import pandas as pd
import re
from Class.ExecutionResult import ExecutionResult

class DBUtil:
    def __init__(self, connectionString):
        self.connectionString = connectionString
        
    def getEQRConnection(self):
        result = ExecutionResult()
        try:
            conn = pyodbc.connect(self.connectionString)
            result.set_success(message = "get connection successfully!!", data = conn)
        except Exception as e:
            result.set_failure(error = e, message = "Failed to get connection!")
        return result
        
    def getConnection(self,connectionString):
        '''
        連線字串
        '''
        result = ExecutionResult()
                
        try:
            conn = pyodbc.connect(connectionString)
            result.set_success(message = "get connection successfully!!", data = conn)
        except Exception as e:
            result.set_failure(error = e, message = "Failed to get connection!")
            
        return result        

    def closeConnection(self,conn):
        '''
        關閉Connection
        '''
        try:
            conn.close()
        except Exception as e:
            print("close db error:",e)

    def execute_query(self, sql_query,conn=None):
        """
        Execute a SQL query on the database.
        """        
        
        result = ExecutionResult()
                
        try:
            if conn==None:            
                conn = pyodbc.connect(self.connectionString)            
            data = pd.read_sql_query(sql_query,conn)
            result.set_success(message = "Exec command successfully!!", data = data)
        except Exception as e:
            result.set_failure(error = e, message = "Exec command failed!")

        return result

    def prepare_commandstring(self,query_template, params):
        """
        准备带有命名参数的 SQL 查询。
        :param query_template: 带有命名参数的查询模板
        :param params: 参数字典
        :return: (最终的查询字符串, 参数元组)
        
            SELECT * FROM your_table_name 
        WHERE column1 = {param1} AND column2 = {param2} OR column3 = {param1}
        """
        # 使用字符串格式化生成最终的查询
        query = query_template.format(**{key: '?' for key in params.keys()})

        # 使用正则表达式找到所有的占位符
        param_order = re.findall(r'\{(\w+)\}', query_template)
        # print(param_order)

        # 生成参数元组,确保每个占位符都有对应的参数值
        param_values = tuple(params[key] for key in param_order)

        # 返回最终的查询字符串和参数元组
        return query, param_values

    def bulk_insert_or_update(self, df, table_name, key_columns, update_columns=None,conn=None):
        """
        批量插入或更新数据
        :param df: pandas DataFrame, 包含需要插入或更新的数据
        :param table_name: str, 数据库表名,包含 schema 信息
        :param key_columns: list, 用于判断是插入还是更新的关键列
        :param update_columns: list, 需要更新的列,如果为 None,则更新所有列
        """
        
        """
        範例
        result = ExecutionResult()
        db = DBUtil("DRIVER={SQL Server}; SERVER=TWTP1APM40\\SQLEXPRESS; DATABASE=HRSDB; UID=jojo; PWD=633605jojo")
        data = {
            "empNo":["AA404","AA305"]
            ,"empName":['陳小Jo',"HaHaa"]
        }
        df = pd.DataFrame(data)
        result=db.bulk_insert_or_update(df, "dbo.empTest", key_columns=["empNo"], update_columns=["empName"])
        """
        result = ExecutionResult()
        try:
            if conn==None:            
                conn = pyodbc.connect(self.connectionString)
            cursor = conn.cursor()

            # 创建临时表
            temp_table_name = f"#{table_name.replace('.', '_')}_tempforpy"

            create_temp_table_sql = f"SELECT TOP 0 * INTO {temp_table_name} FROM {table_name}"

            cursor.execute(create_temp_table_sql)
            conn.commit()

            # 将数据插入临时表
            insert_temp_sql = f"INSERT INTO {temp_table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['?' for _ in df.columns])})"
            cursor.executemany(insert_temp_sql, df.values.tolist())
            conn.commit()

            # 如果 update_columns 为 None,则更新所有列
            if update_columns is None:
                update_columns = [col for col in df.columns if col not in key_columns]

            # 合并数据
            merge_sql = f"""
            MERGE INTO {table_name} AS target
            USING {temp_table_name} AS source
            ON {" AND ".join([f"target.{col} = source.{col}" for col in key_columns])}
            WHEN MATCHED AND {" OR ".join([f"target.{col} <> source.{col}" for col in update_columns])} THEN
                UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in update_columns])}, target.updatetime = GETDATE()
            WHEN NOT MATCHED THEN
                INSERT ({', '.join(df.columns)}, updatetime) VALUES ({', '.join([f"source.{col}" for col in df.columns])}, GETDATE());
            """
            cursor.execute(merge_sql)
            conn.commit()

            # 删除临时表
            drop_temp_table_sql = f"DROP TABLE {temp_table_name}"
            cursor.execute(drop_temp_table_sql)
            conn.commit()

            cursor.close()
            conn.close()
            result.set_success(message = "bulk insert / update successfully !!!", data = None)            
        except Exception as e:
            result.set_failure(error = e, message = "bulk insert / update failed !!!")            
        
        return result
    
    def bulk_insert(self, df, table_name,conn=None):
        """
        批量插入数据
        :param df: pandas DataFrame, 包含需要插入的数据
        :param table_name: str, 数据库表名
        """
        
        """
        範例
        result = ExecutionResult()
        db = DBUtil("connectionstring")
        data = {
            "empNo":["AB404","AB305"]
            ,"empName":['王大明',"陳小胖"]
        }
        df = pd.DataFrame(data)
        result=db.bulk_insert(df, "dbo.empTest")
        """
        
        result = ExecutionResult()
        try:
            if conn ==  None:
                conn = pyodbc.connect(self.connectionString)
                        
            cursor = conn.cursor()

            # 构建插入 SQL 语句
            columns = ', '.join(df.columns) + ', updatetime'
            placeholders = ', '.join(['?' for _ in df.columns]) + ', GETDATE()'
            insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

            # 批量插入数据
            cursor.executemany(insert_sql, df.values.tolist())
            conn.commit()
            cursor.close()
            conn.close()
            result.set_success(message = "bulk insert successfully !!!", data = None)            
        except Exception as e:            
            result.set_failure(error = e, message = "bulk insert failed !!!")
            
        return result
class ExecutionResult:
    def __init__(self):
        self.success = True
        self.message = ""
        self.data = None
        self.error = None # Exception

    def set_success(self, message="", data=None):
        self.success = True
        self.message = message
        self.data = data

    def set_failure(self, error, message=""):
        self.success = False
        self.message = message
        self.error = error

 

 

打雜打久了,就變成打雜妹

程式寫久了,就變成老乞丐