hms_cattle_research/models.py
2024-12-22 09:58:15 +08:00

430 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Author: Lee Wu
# Datetime: 2024/12/6 14:44
import datetime
import decimal
from abc import abstractmethod
import mysql.connector
from config import RDB_HOST, RDB_USER, RDB_NAME, RDB_PASSWORD, LDB_HOST, LDB_PORT, LDB_NAME, LDB_USER, LDB_PASSWORD, \
DATETIME_FORMAT, TOTAL_BUY_WEIGHT_AVG
import sql_statements
from dataclasses import dataclass, astuple
from decimal import Decimal
# from datetime import datetime
@dataclass
class RFID:
id: int
rfid: str
ear_tag: str
@dataclass
class WeightWave:
id: int
weight: float
start_time: str
end_time: str
time_std: int
max_frq_rfid: str
max_frq: float
floor_scale_id: str
@dataclass
class Weight:
rfid: str
weight: float
time: str
time_std: int
rfid_reader: str = ""
@dataclass
class RFIDReader:
id_weight_wave: int
reader_name: str
@dataclass
class ExecLog:
floor_scale_id: str
period_start_time: str
period_end_time: str
time_std: int
@dataclass
class DailyWeight:
weight: float
date: str
# 数据库链接管理父类
class DataBaseModel:
connection = None
cursor = None
@classmethod
@abstractmethod
def get_connection_params(cls):
"""
返回数据库连接参数。
"""
pass
@classmethod
def init(cls):
if cls.connection is None: # 防止重复初始化
params = cls.get_connection_params()
cls.connection = mysql.connector.connect(**params)
cls.cursor = cls.connection.cursor(dictionary=True)
return cls.connection
@classmethod
def query(cls, sql, params=None):
"""
执行查询并返回结果
"""
if cls.connection is None:
cls.init()
cls.cursor.execute(sql, params)
return cls.cursor.fetchall()
@classmethod
def execute_trans(cls, sql, is_commit=True, params=None):
if cls.connection is None:
cls.init()
cls.cursor.executemany(sql, params)
if is_commit:
cls.connection.commit()
# try:
# # 检查事务是否已启动,未启动则启动
# if cls.connection.in_transaction is False:
# # 事务未启动
# cls.connection.start_transaction()
# if type(params) is tuple:
# cls.cursor.execute(sql, params)
# parent_id = cls.cursor.lastrowid
# elif type(params) is list:
# cls.cursor.executemany(sql, params)
# if is_commit:
# cls.connection.commit()
# except Exception as e:
# print(f"捕获错误: {e} \n --> " + cls.cursor.statement + '\n ----> ' + params)
# cls.connection.rollback()
# finally:
@classmethod
def execute(cls, sql, is_commit=True, params=None):
if cls.connection is None:
cls.init()
cls.cursor.execute(sql, params)
parent_id = cls.cursor.lastrowid
if is_commit:
cls.connection.commit()
return parent_id
@classmethod
def close(cls):
"""
关闭数据库连接
"""
if cls.cursor:
cls.cursor.close()
if cls.connection:
cls.connection.close()
cls.cursor = None
cls.connection = None
# 链接远程数据库,主要用户获取原始数据
class RemoteDataBaseModel(DataBaseModel):
"""
具体的子类,定义具体的数据库连接参数。
"""
@classmethod
def get_connection_params(cls):
"""
返回服务器数据库参数
"""
return {
"host": RDB_HOST,
"user": RDB_USER,
"password": RDB_PASSWORD,
"database": RDB_NAME
}
# 链接本地数据库,主要将分析结果储存到本地数据库
class LocalDataBaseModel(DataBaseModel):
"""
具体的子类,定义具体的数据库连接参数。
"""
@classmethod
def get_connection_params(cls):
"""
返回服务器数据库参数
"""
return {
"host": LDB_HOST,
'port': LDB_PORT,
"user": LDB_USER,
"password": LDB_PASSWORD,
"database": LDB_NAME
}
class DAO:
# 查询所有地磅ID和RFID Reader ID的关联数据
@staticmethod
def get_devices_id():
result = RemoteDataBaseModel.query(sql_statements.QUERY_DEVICES)
RemoteDataBaseModel.close()
return result
@staticmethod
def get_buy_weight_by_rfid(rfid):
result = RemoteDataBaseModel.query(sql_statements.sql_query_buy_weight_by_rfid(), (rfid,))
RemoteDataBaseModel.close()
if result:
return result[0].get('buy_weight')
else:
return Decimal(TOTAL_BUY_WEIGHT_AVG) # 如果没有购入体重的,则取所有牛只购入体重的平均值
# @staticmethod
# def get_rfid_reader_id(**kwargs):
# """
# 根据地磅ID、时间段获取对应RFIDReaderID若多个返回值则表示设备ID关系变化对应的RFIDReader设备识别的RFID都要参考。
# :param kwargs: floor_scale_id(tuple),start_time(str),end_time(str)
# :return:
# """
# if 'floor_scale_id' not in kwargs:
# return
# sql = sql_statements.sql_query_logs_group_by_relation_reader_name(kwargs['floor_scale_id'])
# params = kwargs['floor_scale_id'] + (kwargs['start_time'], kwargs['end_time'])
# result = RemoteDataBaseModel.query(sql, params)
# RemoteDataBaseModel.close()
# return result
# 根据地磅ID查询所有日志或者根据地磅ID和一个时间段查询日志
@staticmethod
def get_weight_logs(**kwargs):
"""
根据地磅ID或RFID读取器ID结合开始时间和结束时间获取日志数据
:param kwargs: floor_scale_id(tuple),start_time(str),end_time(str),time_std(int)
:return:
"""
time_std = kwargs['time_std']
sql = sql_statements.sql_query_logs_by_reader_name_and_duartion(kwargs['floor_scale_id'],
time_std)
params = kwargs['floor_scale_id'] + (kwargs['start_time'], kwargs['end_time'])
result = RemoteDataBaseModel.query(sql, params)
RemoteDataBaseModel.close()
weight_lst = [Weight(
weight=item.get('biz_data'),
time=item.get('biz_time') if time_std == 1 else item.get('create_time'),
time_std=kwargs['time_std'],
rfid='',
rfid_reader=item.get('relation_reader_name')
) for item in result]
RemoteDataBaseModel.close()
return weight_lst
@staticmethod
def get_rfid_count(rfid_reader_id, start_time, end_time):
"""
根据多个RFID Reader ID查询其识别的RFID频次
:param rfid_reader_id: tuple
:param start_time: str
:param end_time: str
:return:
"""
sql = sql_statements.sql_query_logs_count_group_by_biz_data(rfid_reader_id)
params = rfid_reader_id + (start_time, end_time)
result = RemoteDataBaseModel.query(sql, params)
RemoteDataBaseModel.close()
return result
# 根据RFID查询对应的定位耳标外壳号
@staticmethod
def get_ear_tag(rfid):
result = RemoteDataBaseModel.query(sql_statements.QUERY_EAR_TAG_ID_BY_RFID, [rfid])
RemoteDataBaseModel.close()
return result
# 根据定位耳标外壳号和日期查询体重波,本地查询
@staticmethod
def get_weight_wave_by_floor_scale_id(floor_scale_id):
"""
获取体重波基本信息
:param floor_scale_id:
:return:
"""
result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_by_floor_scale_id(), (floor_scale_id,))
LocalDataBaseModel.close()
return result
@staticmethod
def get_weight_wave_by_rfid(rfid, time_std):
"""
根据牛只RFID获取所有体重波
:param rfid:
:param time_std:
:return:
"""
result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_by_rfid(), (rfid, time_std))
LocalDataBaseModel.close()
weight_wave_lst = [WeightWave(
floor_scale_id=item.get('floor_scale_id'),
max_frq_rfid=rfid,
start_time=item.get('start_time').strftime(DATETIME_FORMAT),
end_time=item.get('end_time').strftime(DATETIME_FORMAT),
time_std=item.get('time_std'),
max_frq=item.get('max_frq'),
weight=item.get('weight'),
id=item.get('id_weight_wave')
) for item in result]
return weight_wave_lst
@staticmethod
def get_reader(id_weight_wave):
"""
根据体重波ID查询对应的RFID Reader Name
:param id_weight_wave:
:return:
"""
result = LocalDataBaseModel.query(sql_statements.sql_query_relation_reader_by_id_weight_wave(),
(id_weight_wave,))
LocalDataBaseModel.close()
return result
@staticmethod
def get_rfid_all():
result = RemoteDataBaseModel.query(sql_statements.sql_query_rfid_all_in_cattle())
rfid_lst = [RFID(
id=item.get('id'),
rfid=item.get('rfid'),
ear_tag=item.get('ear_tag')
) for item in result]
RemoteDataBaseModel.close()
return rfid_lst
# @staticmethod
# def get_rfid(time_std):
# result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_group_by_rfid(), (time_std,))
# LocalDataBaseModel.close()
# return result
# 插入一个体重波
@staticmethod
def add_weight_wave(weight_wave, is_commit):
"""
体重波插入值不直接提交等RFID Reader获取后统一提交 floor_scale_id, start_time, end_time, max_weight, max_weight_time, time_std, create_time, last_update_time
:param weight_wave:
:param is_commit: 是否提交查询
:return: 插入后的主键
"""
id_weight_wave = LocalDataBaseModel.execute(sql_statements.INSERT_WEIGHT_WAVE, is_commit, (
weight_wave.floor_scale_id, weight_wave.start_time, weight_wave.end_time, weight_wave.time_std,
weight_wave.max_frq_rfid, weight_wave.max_frq, weight_wave.weight))
return id_weight_wave
@staticmethod
def add_rfid_frq(is_commit=False, params=None, is_close=True):
"""
插入多条RFID频次值
:param is_commit: 是否继续事务
:param params: RFID频次插入值列表 id_weight_wave,ear_tag,rfid,rfid_frq,time_std
:param is_close: 是否关闭链接
:return:
"""
# if params is None:
# return
# LocalDataBaseModel.execute_trans(sql_statements.INSERT_RIFD_FRQ, params, is_commit)
# if is_close:
# LocalDataBaseModel.close()
pass
@staticmethod
def add_rfid_readers(rfid_reader_lst):
"""
插入Reader后关闭链接
:param rfid_reader_lst:
:return:
"""
LocalDataBaseModel.execute_trans(sql_statements.sql_insert_rfid_reader(), True,
[astuple(rfid_reader) for rfid_reader in rfid_reader_lst])
LocalDataBaseModel.close()
@staticmethod
def add_exec_log(exec_log, is_close=True):
LocalDataBaseModel.execute(sql_statements.sql_insert_exec_log(), True, (
exec_log.floor_scale_id, exec_log.period_start_time, exec_log.period_end_time, exec_log.time_std))
if is_close:
LocalDataBaseModel.close()
@staticmethod
def update_exec_log(exec_log, is_close=True):
LocalDataBaseModel.execute(sql_statements.sql_update_exec_log(), True,
(exec_log.period_end_time, exec_log.floor_scale_id))
if is_close:
LocalDataBaseModel.close()
@staticmethod
def delete_weight_wave_all(is_close=True):
rows = LocalDataBaseModel.execute(sql_statements.sql_delete_weight_wave_all())
if is_close:
LocalDataBaseModel.close()
return rows
@staticmethod
def delete_weight_wave_by_time_std(time_std):
rows = LocalDataBaseModel.execute(sql_statements.sql_delete_weight_wave_by_time_std(), True, (time_std,))
LocalDataBaseModel.close()
return rows
@staticmethod
def delete_rfid_frq(is_close=True):
rows = LocalDataBaseModel.execute(sql_statements.DELETE_RFID_FRQ)
if is_close:
LocalDataBaseModel.close()
return rows
@staticmethod
def get_exec_log(time_std):
result = LocalDataBaseModel.query(sql_statements.QUERY_EXEC_LOG, time_std)
LocalDataBaseModel.close()
return result
@staticmethod
def delete_exec_log_all(is_close=True):
rows = LocalDataBaseModel.execute(sql_statements.sql_delete_exec_log_all())
if is_close:
LocalDataBaseModel.close()
return rows
@staticmethod
def delete_exec_log_by_time_std(time_std):
rows = LocalDataBaseModel.execute(sql_statements.sql_delete_exec_log_by_time_std(), True, (time_std,))
LocalDataBaseModel.close()
return rows
@staticmethod
def delete_relation_reader(is_close=True):
"""
删除与体重波关联的RFID Reader
:param is_close:
:return:
"""
rows = LocalDataBaseModel.execute(sql_statements.sql_delete_relatioin_rfid_reader())
if is_close:
LocalDataBaseModel.close()
return rows