# Author: Lee Wu # Datetime: 2024/12/6 14:44 import datetime import decimal from abc import abstractmethod import mysql.connector from config import RDB_HOST, RDB_USER, RDB_NAME, RDB_PASSWORD, LDB_HOST, LDB_PORT, LDB_NAME, LDB_USER, LDB_PASSWORD, \ DATETIME_FORMAT, TOTAL_BUY_WEIGHT_AVG import sql_statements from dataclasses import dataclass, astuple from decimal import Decimal # from datetime import datetime @dataclass class RFID: id: int rfid: str ear_tag: str @dataclass class WeightWave: id: int weight: float start_time: str end_time: str time_std: int max_frq_rfid: str max_frq: float floor_scale_id: str @dataclass class Weight: rfid: str weight: float time: str time_std: int rfid_reader: str = "" @dataclass class RFIDReader: id_weight_wave: int reader_name: str @dataclass class ExecLog: floor_scale_id: str period_start_time: str period_end_time: str time_std: int @dataclass class DailyWeight: weight: float date: str # 数据库链接管理父类 class DataBaseModel: connection = None cursor = None @classmethod @abstractmethod def get_connection_params(cls): """ 返回数据库连接参数。 """ pass @classmethod def init(cls): if cls.connection is None: # 防止重复初始化 params = cls.get_connection_params() cls.connection = mysql.connector.connect(**params) cls.cursor = cls.connection.cursor(dictionary=True) return cls.connection @classmethod def query(cls, sql, params=None): """ 执行查询并返回结果 """ if cls.connection is None: cls.init() cls.cursor.execute(sql, params) return cls.cursor.fetchall() @classmethod def execute_trans(cls, sql, is_commit=True, params=None): if cls.connection is None: cls.init() cls.cursor.executemany(sql, params) if is_commit: cls.connection.commit() # try: # # 检查事务是否已启动,未启动则启动 # if cls.connection.in_transaction is False: # # 事务未启动 # cls.connection.start_transaction() # if type(params) is tuple: # cls.cursor.execute(sql, params) # parent_id = cls.cursor.lastrowid # elif type(params) is list: # cls.cursor.executemany(sql, params) # if is_commit: # cls.connection.commit() # except Exception as e: # print(f"捕获错误: {e} \n --> " + cls.cursor.statement + '\n ----> ' + params) # cls.connection.rollback() # finally: @classmethod def execute(cls, sql, is_commit=True, params=None): if cls.connection is None: cls.init() cls.cursor.execute(sql, params) parent_id = cls.cursor.lastrowid if is_commit: cls.connection.commit() return parent_id @classmethod def close(cls): """ 关闭数据库连接 """ if cls.cursor: cls.cursor.close() if cls.connection: cls.connection.close() cls.cursor = None cls.connection = None # 链接远程数据库,主要用户获取原始数据 class RemoteDataBaseModel(DataBaseModel): """ 具体的子类,定义具体的数据库连接参数。 """ @classmethod def get_connection_params(cls): """ 返回服务器数据库参数 """ return { "host": RDB_HOST, "user": RDB_USER, "password": RDB_PASSWORD, "database": RDB_NAME } # 链接本地数据库,主要将分析结果储存到本地数据库 class LocalDataBaseModel(DataBaseModel): """ 具体的子类,定义具体的数据库连接参数。 """ @classmethod def get_connection_params(cls): """ 返回服务器数据库参数 """ return { "host": LDB_HOST, 'port': LDB_PORT, "user": LDB_USER, "password": LDB_PASSWORD, "database": LDB_NAME } class DAO: # 查询所有地磅ID和RFID Reader ID的关联数据 @staticmethod def get_devices_id(): result = RemoteDataBaseModel.query(sql_statements.QUERY_DEVICES) RemoteDataBaseModel.close() return result @staticmethod def get_buy_weight_by_rfid(rfid): result = RemoteDataBaseModel.query(sql_statements.sql_query_buy_weight_by_rfid(), (rfid,)) RemoteDataBaseModel.close() if result: return result[0].get('buy_weight') else: return Decimal(TOTAL_BUY_WEIGHT_AVG) # 如果没有购入体重的,则取所有牛只购入体重的平均值 # @staticmethod # def get_rfid_reader_id(**kwargs): # """ # 根据地磅ID、时间段获取对应RFIDReaderID,若多个返回值则表示设备ID关系变化,对应的RFIDReader设备识别的RFID都要参考。 # :param kwargs: floor_scale_id(tuple),start_time(str),end_time(str) # :return: # """ # if 'floor_scale_id' not in kwargs: # return # sql = sql_statements.sql_query_logs_group_by_relation_reader_name(kwargs['floor_scale_id']) # params = kwargs['floor_scale_id'] + (kwargs['start_time'], kwargs['end_time']) # result = RemoteDataBaseModel.query(sql, params) # RemoteDataBaseModel.close() # return result # 根据地磅ID查询所有日志或者根据地磅ID和一个时间段查询日志 @staticmethod def get_weight_logs(**kwargs): """ 根据地磅ID或RFID读取器ID,结合开始时间和结束时间获取日志数据 :param kwargs: floor_scale_id(tuple),start_time(str),end_time(str),time_std(int) :return: """ time_std = kwargs['time_std'] sql = sql_statements.sql_query_logs_by_reader_name_and_duartion(kwargs['floor_scale_id'], time_std) params = kwargs['floor_scale_id'] + (kwargs['start_time'], kwargs['end_time']) result = RemoteDataBaseModel.query(sql, params) RemoteDataBaseModel.close() weight_lst = [Weight( weight=item.get('biz_data'), time=item.get('biz_time') if time_std == 1 else item.get('create_time'), time_std=kwargs['time_std'], rfid='', rfid_reader=item.get('relation_reader_name') ) for item in result] RemoteDataBaseModel.close() return weight_lst @staticmethod def get_rfid_count(rfid_reader_id, start_time, end_time): """ 根据多个RFID Reader ID查询其识别的RFID频次 :param rfid_reader_id: tuple :param start_time: str :param end_time: str :return: """ sql = sql_statements.sql_query_logs_count_group_by_biz_data(rfid_reader_id) params = rfid_reader_id + (start_time, end_time) result = RemoteDataBaseModel.query(sql, params) RemoteDataBaseModel.close() return result # 根据RFID查询对应的定位耳标外壳号 @staticmethod def get_ear_tag(rfid): result = RemoteDataBaseModel.query(sql_statements.QUERY_EAR_TAG_ID_BY_RFID, [rfid]) RemoteDataBaseModel.close() return result # 根据定位耳标外壳号和日期查询体重波,本地查询 @staticmethod def get_weight_wave_by_floor_scale_id(floor_scale_id): """ 获取体重波基本信息 :param floor_scale_id: :return: """ result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_by_floor_scale_id(), (floor_scale_id,)) LocalDataBaseModel.close() return result @staticmethod def get_weight_wave_by_rfid(rfid, time_std): """ 根据牛只RFID获取所有体重波 :param rfid: :param time_std: :return: """ result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_by_rfid(), (rfid, time_std)) LocalDataBaseModel.close() weight_wave_lst = [WeightWave( floor_scale_id=item.get('floor_scale_id'), max_frq_rfid=rfid, start_time=item.get('start_time').strftime(DATETIME_FORMAT), end_time=item.get('end_time').strftime(DATETIME_FORMAT), time_std=item.get('time_std'), max_frq=item.get('max_frq'), weight=item.get('weight'), id=item.get('id_weight_wave') ) for item in result] return weight_wave_lst @staticmethod def get_reader(id_weight_wave): """ 根据体重波ID查询对应的RFID Reader Name :param id_weight_wave: :return: """ result = LocalDataBaseModel.query(sql_statements.sql_query_relation_reader_by_id_weight_wave(), (id_weight_wave,)) LocalDataBaseModel.close() return result @staticmethod def get_rfid_all(): result = RemoteDataBaseModel.query(sql_statements.sql_query_rfid_all_in_cattle()) rfid_lst = [RFID( id=item.get('id'), rfid=item.get('rfid'), ear_tag=item.get('ear_tag') ) for item in result] RemoteDataBaseModel.close() return rfid_lst # @staticmethod # def get_rfid(time_std): # result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_group_by_rfid(), (time_std,)) # LocalDataBaseModel.close() # return result # 插入一个体重波 @staticmethod def add_weight_wave(weight_wave, is_commit): """ 体重波插入值,不直接提交,等RFID Reader获取后统一提交 floor_scale_id, start_time, end_time, max_weight, max_weight_time, time_std, create_time, last_update_time :param weight_wave: :param is_commit: 是否提交查询 :return: 插入后的主键 """ id_weight_wave = LocalDataBaseModel.execute(sql_statements.INSERT_WEIGHT_WAVE, is_commit, ( weight_wave.floor_scale_id, weight_wave.start_time, weight_wave.end_time, weight_wave.time_std, weight_wave.max_frq_rfid, weight_wave.max_frq, weight_wave.weight)) return id_weight_wave @staticmethod def add_rfid_frq(is_commit=False, params=None, is_close=True): """ 插入多条RFID频次值 :param is_commit: 是否继续事务 :param params: RFID频次插入值列表 id_weight_wave,ear_tag,rfid,rfid_frq,time_std :param is_close: 是否关闭链接 :return: """ # if params is None: # return # LocalDataBaseModel.execute_trans(sql_statements.INSERT_RIFD_FRQ, params, is_commit) # if is_close: # LocalDataBaseModel.close() pass @staticmethod def add_rfid_readers(rfid_reader_lst): """ 插入Reader后关闭链接 :param rfid_reader_lst: :return: """ LocalDataBaseModel.execute_trans(sql_statements.sql_insert_rfid_reader(), True, [astuple(rfid_reader) for rfid_reader in rfid_reader_lst]) LocalDataBaseModel.close() @staticmethod def add_exec_log(exec_log, is_close=True): LocalDataBaseModel.execute(sql_statements.sql_insert_exec_log(), True, ( exec_log.floor_scale_id, exec_log.period_start_time, exec_log.period_end_time, exec_log.time_std)) if is_close: LocalDataBaseModel.close() @staticmethod def update_exec_log(exec_log, is_close=True): LocalDataBaseModel.execute(sql_statements.sql_update_exec_log(), True, (exec_log.period_end_time, exec_log.floor_scale_id)) if is_close: LocalDataBaseModel.close() @staticmethod def delete_weight_wave_all(is_close=True): rows = LocalDataBaseModel.execute(sql_statements.sql_delete_weight_wave_all()) if is_close: LocalDataBaseModel.close() return rows @staticmethod def delete_weight_wave_by_time_std(time_std): rows = LocalDataBaseModel.execute(sql_statements.sql_delete_weight_wave_by_time_std(), True, (time_std,)) LocalDataBaseModel.close() return rows @staticmethod def delete_rfid_frq(is_close=True): rows = LocalDataBaseModel.execute(sql_statements.DELETE_RFID_FRQ) if is_close: LocalDataBaseModel.close() return rows @staticmethod def get_exec_log(time_std): result = LocalDataBaseModel.query(sql_statements.QUERY_EXEC_LOG, time_std) LocalDataBaseModel.close() return result @staticmethod def delete_exec_log_all(is_close=True): rows = LocalDataBaseModel.execute(sql_statements.sql_delete_exec_log_all()) if is_close: LocalDataBaseModel.close() return rows @staticmethod def delete_exec_log_by_time_std(time_std): rows = LocalDataBaseModel.execute(sql_statements.sql_delete_exec_log_by_time_std(), True, (time_std,)) LocalDataBaseModel.close() return rows @staticmethod def delete_relation_reader(is_close=True): """ 删除与体重波关联的RFID Reader :param is_close: :return: """ rows = LocalDataBaseModel.execute(sql_statements.sql_delete_relatioin_rfid_reader()) if is_close: LocalDataBaseModel.close() return rows