From db56a898899c4e1dba77712880032aa8a2750d97 Mon Sep 17 00:00:00 2001 From: wulili Date: Sun, 22 Dec 2024 09:58:15 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BC=98=E5=8C=96=E7=89=9B=E5=8F=AA?= =?UTF-8?q?=E4=BD=93=E9=87=8D=E6=95=B0=E6=8D=AE=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/sqldialects.xml | 6 + config.py | 49 +++++ controller.py | 295 +++++++++++++++++++++++++++++ db_connection.py | 14 -- db_dao.py | 2 +- main.py | 348 ++++++++++++++++++++++++++-------- models.py | 429 ++++++++++++++++++++++++++++++++++++++++++ sql_statements.py | 154 +++++++++++++++ util.py | 74 ++++++++ 9 files changed, 1278 insertions(+), 93 deletions(-) create mode 100644 .idea/sqldialects.xml create mode 100644 config.py create mode 100644 controller.py delete mode 100644 db_connection.py create mode 100644 models.py create mode 100644 sql_statements.py create mode 100644 util.py diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml new file mode 100644 index 0000000..de2d928 --- /dev/null +++ b/.idea/sqldialects.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..b018210 --- /dev/null +++ b/config.py @@ -0,0 +1,49 @@ +# Author: Lee Wu Love Lele +# Datetime: 2024/12/6 15:31 + +# 远程数据库配置 +RDB_HOST = "rm-bp1442up0e0nlz95szo.mysql.rds.aliyuncs.com" +RDB_USER = "rhm_read_only" +RDB_PASSWORD = "aA123456+" +RDB_NAME = "rhm_insure_dev" + +# 本地数据库配置 +LDB_HOST = "127.0.0.1" +LDB_PORT = "3306" +LDB_USER = "root" +LDB_PASSWORD = "1QAZ9ol." +LDB_NAME = "hm_data" + +# 数据库日期时间格式 +DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" + +# tqdm执行进度条开关 +TQDM_DISABLE = False + +# 计算牛只体重波动时,变化率小于此值视为稳定 +STABILITY_THRESHOLD = 3 + +# 提取体重日志时的最小时间窗口,体重波小于10s的不提取 +MIN_TIME_WINDOW = 10 + +# 所有牛只购入体重最大值 +BUY_WEIGHT_MAX = '609' + +# 所有牛只购入体重最小值 +BUY_WEIGHT_MIN = '363' + +# 体重日志截取最小值,BUY_WEIGHT_AVG 的百分比 +MIN_TOTAL_BUY_WEIGHT_PERCENT = '0.7' + +MIN_SINGLE_BUY_WEIGHT_PERCENT = '0.7' + +# 体重日志截取最大值,BUY_WEIGHT_AVG 的百分比 +MAX_TOTAL_BUY_WEIGHT_PERCENT = '1.5' + +# 根据特定牛只的购入体重截取最大值比例,在目前的数据中,有一头牛出栏体重比购入体重大1.8倍,已这个为参考值定义2倍。 +MAX_SINGLE_BUY_WEIGHT_PERCENT = '2' + +TOTAL_BUY_WEIGHT_AVG = '439.24' + +# 判断一个波的时间周期内,RFID频率的阈值(大于) +RFID_FREQUENCY_THRESHOLD_VALUE = 0.8 \ No newline at end of file diff --git a/controller.py b/controller.py new file mode 100644 index 0000000..12b8e38 --- /dev/null +++ b/controller.py @@ -0,0 +1,295 @@ +# Author: Lee Wu Love Lele +# Datetime: 2024/12/6 15:24 +from util import calc_one_cattle, smooth_data +from models import WeightWave, DAO, ExecLog, DailyWeight +from decimal import Decimal, getcontext +import copy +from datetime import datetime +from config import DATETIME_FORMAT, TQDM_DISABLE, MIN_TIME_WINDOW, BUY_WEIGHT_MAX, BUY_WEIGHT_MIN, \ + MIN_TOTAL_BUY_WEIGHT_PERCENT, \ + MAX_TOTAL_BUY_WEIGHT_PERCENT, RFID_FREQUENCY_THRESHOLD_VALUE, MIN_SINGLE_BUY_WEIGHT_PERCENT, \ + MAX_SINGLE_BUY_WEIGHT_PERCENT +from tqdm import tqdm +import numpy as np +import pandas as pd +from scipy.signal import savgol_filter + +def clear_all_data(): + DAO.delete_weight_wave_all(False) + DAO.delete_exec_log_all(True) +def clear_data_by_time_std(time_std): + DAO.delete_weight_wave_by_time_std(time_std) + DAO.delete_exec_log_by_time_std(time_std) + +def init_data(time_std): + """ + 初始化数据,从远程数据库中截取地磅的连续波,同时记录连续波中的体重值出现过的对应的RFID Reader ID + :return: + """ + search_start_time = '2024-09-02 00:00:00' # TODO: 从开始时间截取的波之前如果有日志,这个波的开始时间并不准确,因为之前可能存在连续值属于这个波。 + search_end_time = "2024-12-19 23:59:59" # 查询远程日志的截止时间,当前系统时间 + getcontext().prec = 5 + buy_weight_max = Decimal(BUY_WEIGHT_MAX) # 609 + buy_weight_min = Decimal(BUY_WEIGHT_MIN) # 363 + ctrl_min_weight = buy_weight_min * Decimal(MIN_TOTAL_BUY_WEIGHT_PERCENT) # 254.10 + ctrl_max_weight = buy_weight_max * Decimal(MAX_TOTAL_BUY_WEIGHT_PERCENT) # 791.70 + # init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, MIN_TIME_WINDOW) + init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, MIN_TIME_WINDOW) + + +# def init_daily_weight(time_std): +# """ +# 初始化牛只每日体重 +# :param time_std: +# :return: +# """ +# rfid_lst = DAO.get_rfid(time_std) # 分组查询所有体重波中的牛只RFID +# for rfid in rfid_lst: +# weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid) # 根据RFID查询所有体重波 +# for weight_wave in weight_wave_lst: +# pass +# pass +# return +def get_daily_weight(rfid, time_std): + """ + 根据单个RFID获取按照每日计算的体重 + :param rfid: + :param time_std: + :return: + """ + weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid, time_std) + record_date = None + weight_lst = [] + daily_weight_lst = [] + index = 1 + for weight_wave in weight_wave_lst: + if (record_date is not None and record_date != datetime.strptime(weight_wave.start_time, + DATETIME_FORMAT).date()) or index == len( + weight_wave_lst): + if len(weight_lst) > 2: + trimmed_lst = sorted(weight_lst)[1:-1] + daily_weight_lst.append(DailyWeight( + weight=float(Decimal(str(sum(trimmed_lst))) / Decimal(str(len(trimmed_lst)))), + date=datetime.strftime(record_date, DATETIME_FORMAT) + )) + weight_lst.clear() + if record_date is None or record_date != datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date(): + record_date = datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date() + weight_lst.append(weight_wave.weight) + index += 1 + + if not weight_lst and daily_weight_lst and len(weight_lst) > 2: + trimmed_lst = sorted(weight_lst)[1:-1] + daily_weight_lst.append(DailyWeight( + weight=float(Decimal(str(sum(trimmed_lst))) / Decimal(str(len(trimmed_lst)))), + date=datetime.strftime(record_date, DATETIME_FORMAT) + )) + dict_lst = [obj.__dict__ for obj in daily_weight_lst] + sorted_lst = sorted(dict_lst, key=lambda x: datetime.strptime(x['date'], DATETIME_FORMAT)) + return sorted_lst + + +def get_weight_wave(rfid, time_std): + """ + 根据RFID获取体重波 + :param rfid: str + :param time_std: int 1=设备时间,2=服务器时间 + :return: + """ + weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid, time_std) + # dict_lst = [obj.__dict__ for obj in weight_wave_lst] + sorted_lst = sorted(weight_wave_lst, key=lambda x: datetime.strptime(x.start_time, DATETIME_FORMAT)) + return sorted_lst + + +def get_weight_wave_log(weight_wave, time_std): + params = {'floor_scale_id': (weight_wave.floor_scale_id,), 'start_time': weight_wave.start_time, + 'end_time': weight_wave.end_time, 'time_std': time_std} + weight_wave_logs = DAO.get_weight_logs(**params) + return weight_wave_logs + + +def init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std=1, + min_time_window=10): + """ + 初始化体重波 + 根据地磅列表从所有地磅日志中从开始时间逐秒轮询,截取在指定体重范围内的连续体重 + :param search_start_time: + :param search_end_time: + :param ctrl_min_weight: + :param ctrl_max_weight: + :param time_std: + :param min_time_window + :return: + """ + devices = DAO.get_devices_id() # 获取所有地磅ID,从养殖场记录中获取。TODO: 是否时当前设备,如果设备更换了如何处理? + exec_logs = DAO.get_exec_log((time_std,)) # 根据所需分析的时间类型获取执行记录。 + for device in tqdm(devices, desc="遍历地磅设备:", position=1, disable=TQDM_DISABLE): + floor_scale_id = device['reader_code'] # 取出地磅id + never_logged = True # 截取进程记录控制 + # if floor_scale_id != '2405085412': + # continue + for exec_log_item in exec_logs: + if exec_log_item.get('floor_scale_id') == floor_scale_id: + print('-- 地磅:' + floor_scale_id + ' | ' + ( + '设备时间' if time_std == 1 else '服务器时间 :') + ' 记录存在') + # 执行记录中存在地磅并且新查询的结束时间在执行记录的范围内,则不做处理。 + if exec_log_item.get('end_time') < datetime.strptime( + search_end_time, DATETIME_FORMAT): + # 执行记录中存在地磅并且新查询的结束时间在执行记录之后,根据执行记录最后一个波的结束时间开始执行 + print('---- 查询时间段在记录时间段外,补充时间段:' + exec_log_item.get('end_time').strftime( + DATETIME_FORMAT) + ' 到 ' + search_end_time) + weight_wave_lst = create_weight_wave(floor_scale_id, + exec_log_item.get('end_time').strftime(DATETIME_FORMAT), + search_end_time, ctrl_min_weight, ctrl_max_weight, + time_std, min_time_window) + exec_log = save_weight_wave(floor_scale_id, weight_wave_lst, time_std) + save_exec_log(exec_log) + never_logged = False + break + if never_logged: # 执行记录中没有出现过地磅数据,则根据查询起止时间全量查询 + weight_wave_lst = create_weight_wave(floor_scale_id, search_start_time, search_end_time, ctrl_min_weight, + ctrl_max_weight, + time_std, min_time_window) + exec_log = save_weight_wave(floor_scale_id, weight_wave_lst, time_std) + save_exec_log(exec_log) + + +def create_weight_wave(floor_scale_id, search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, + time_std=1, min_time_window=10): + """ + 从远程服务器拉取一个地磅的日志数据,分析体重数据。 + :param floor_scale_id: 地磅ID + :param search_start_time: 查询起始时间(包含) + :param search_end_time: 查询截止时间(包含) + :param ctrl_min_weight: 体重窗口最小值 + :param ctrl_max_weight: 体重窗口最大值 + :param time_std: 1 = 设备时间,2 = 服务器时间 + :param min_time_window: 最小时间窗口(s) + :return: + """ + start_time = None # 一个波循环计数开始时间 + prev_time = None # 上一条数据的时间指针 + weight_wave_lst = [] # 存放波的结构 + weight_lst = [] # 存放波的体重值 + relation_reader_name_set = set() # 去重存放从体重日志中关联的RFID Reader ID + for weight_log in tqdm(DAO.get_weight_logs( + **{'floor_scale_id': (floor_scale_id,), 'start_time': search_start_time, 'end_time': search_end_time, + 'time_std': time_std}), + desc="---- 根据地磅ID迭代日志:", position=2, disable=TQDM_DISABLE): + log_weight = weight_log.weight # 获取体重值 + log_time = weight_log.time # 获得数据采集时间 + log_relation_reader_name = weight_log.rfid_reader + is_out_of_range = ctrl_min_weight > Decimal(log_weight) or Decimal(log_weight) > ctrl_max_weight + is_out_of_interval = (log_time - prev_time).total_seconds() > 1 if prev_time is not None else False + is_in_of_time_window = ( + prev_time - start_time).total_seconds() > min_time_window if start_time is not None else False + if start_time is not None and (is_out_of_interval or is_out_of_range) and is_in_of_time_window: + """ + 一个波的结束,在符合波长时间超过最小波长时间窗口,两个临近值间隔超过1s或者体重范围超过阈值时触发截断,形成一个波, + 但是如果该波段内没有识别到牛只RFID或者识别到的牛只RFID没有明显的频率(有一个RIFD频率超过75%),放弃获取这个波。 + """ + rfid_count_lst = DAO.get_rfid_count(tuple(relation_reader_name_set), start_time.strftime(DATETIME_FORMAT), + prev_time.strftime(DATETIME_FORMAT)) + if rfid_count_lst: + df = pd.DataFrame(rfid_count_lst) + df["percentage"] = df["repeat_count"] / df["repeat_count"].sum() # 计算所有RFID的频率 + df_sorted = df.sort_values(by="percentage", ascending=False) # 按照频率从大到小排列 + # df_sorted = df_sorted.reset_index(drop=True) # 重新排序index + # TODO: 这里可以考虑,虽然频率高但是如果一个10秒以上的波只有一个RFID频次出现,是否需要取这个波?在这里记录一下RFID的频次 + max_frq = df_sorted.iloc[0]["percentage"] # 取最大频率的RFID对应的频率 + max_rfid = df_sorted.iloc[0]["biz_data"] # 取最大频率的RFID + weight_lst_len = len(weight_lst) + # TODO:在这里增加一个判断,需要判断出现一定时常的连续稳定值 + if max_frq > RFID_FREQUENCY_THRESHOLD_VALUE: + weight = calc_one_cattle( # TODO: 这里的稳定值计算与图表显示的不一致,这里相对准确(因为图表显示数据向上取整)。 + smooth_data(weight_lst, max(3, int(weight_lst_len * ( + 12 / 100)) // 2 * 2 + 1))) # 取体重波长度的 12%处附近的奇数作为平滑窗口大小,最小窗口为3 + if not np.isnan(weight): # weight可能为nan,当数据平均浮动过大时,可能找不到稳定值,则不取这个波 + buy_weight = DAO.get_buy_weight_by_rfid(max_rfid) # 获取这个RFID牛只的购入体重 + if buy_weight * Decimal(MAX_SINGLE_BUY_WEIGHT_PERCENT) > weight > buy_weight * Decimal( + MIN_SINGLE_BUY_WEIGHT_PERCENT): # 根据这一头牛的购入体重上下30%区间获取体重波 + weight_wave = WeightWave( + id=-1, + floor_scale_id=floor_scale_id, + start_time=start_time.strftime(DATETIME_FORMAT), + end_time=prev_time.strftime(DATETIME_FORMAT), + time_std=time_std, + max_frq=max_frq, + max_frq_rfid=max_rfid, + weight=weight + ) + weight_wave_lst.append(weight_wave) # 将一个体重波数据加入缓存列表 + relation_reader_name_set.clear() # 清空RFID Reader集,按照每一个波的范围内识别Reader + weight_lst.clear() # 清空体重值存储列表 + if (start_time is None and not is_out_of_range) or ( + start_time is not None and is_out_of_interval and not is_out_of_range): + """ + 一个新的波的开始,重置体重波开始时间,将最新时间作为开始时间。 + 当第一次迭代或者因为时间间隔超过1s,但是当前值在阈值范围内。 + """ + start_time = log_time + + if start_time is not None and is_out_of_range: + """ + 在一个波的遍历过程中(已有开始时间), + 如果获取到的值超过了体重阈值,则无法将这个值作为一个新的波的开始,重置体重波开始时间,将开始时间清空待后续遍历值符合要求后再设定。 + """ + start_time = None + + if not is_out_of_range and log_relation_reader_name not in relation_reader_name_set: + """ + 当一个有效波获取到时(波的开始节点,中间过程和结束节点),在一个波的形成过程中,将体重值存储并将RFID Reader ID去重存储。 + """ + relation_reader_name_set.add(log_relation_reader_name) # 取出关联的RFID Reader ID去重存储 + + if not is_out_of_range: + """ + 当一个有效波获取到时(波的开始节点,中间过程和结束节点),在一个波的形成过程中,将体重值缓存 + """ + weight_lst.append(float(log_weight)) + + prev_time = log_time + return weight_wave_lst + + +def save_weight_wave(floor_scale_id, weight_wave_lst, time_std): + """ + 将体重波存储本地 + :param floor_scale_id + :param weight_wave_lst(已经存放了所有值) + :param time_std + :return: + """ + # 当一个地磅的波取完时,将波和波所属的地磅对应的RFID Reader存储进本地数据库 + period_start_time = None # 一个地磅第一个波的开始时间 + period_end_time = None # 一个地磅最后一个波的结束时间 + for weight_wave_item in tqdm(weight_wave_lst, desc="---- 插入体重波", position=2, + disable=TQDM_DISABLE): + DAO.add_weight_wave(weight_wave_item, True) # 插入体重波数据 + if period_start_time is None: # 判断exec_logs时间周期是否是截点 + period_start_time = weight_wave_item.start_time + period_end_time = weight_wave_item.end_time + return ExecLog(floor_scale_id, period_start_time, period_end_time, time_std) + + +def save_exec_log(exec_log): + """ + 储存进度日志,已存在的地磅更新,只往后更新不往前更新 + :param exec_log: + :return: + """ + if exec_log.period_start_time is None or exec_log.period_end_time is None: # 地磅波列表是空的话不做记录 + return + if exec_log.period_start_time == '': # 当地磅日志没有定义开始时间时,表示需要更新,仅更新该条地磅日志的结束时间 + DAO.update_exec_log(exec_log) + else: + DAO.add_exec_log(exec_log) + + +def get_rfid_all(): + return DAO.get_rfid_all() + + +def get_buy_weight(rfid): + return DAO.get_buy_weight_by_rfid(rfid) diff --git a/db_connection.py b/db_connection.py deleted file mode 100644 index 150ea3e..0000000 --- a/db_connection.py +++ /dev/null @@ -1,14 +0,0 @@ -# Author: Lee Wu Love Lele -# Datetime: 2024/9/7 21:20 -import mysql.connector - -# 连接 MySQL 数据库 -connection = mysql.connector.connect( - host="rm-bp1442up0e0nlz95szo.mysql.rds.aliyuncs.com", - user="rhm_read_only", - password="aA123456+", - database="rhm_insure_dev" -) - -def get_connection(): - return connection diff --git a/db_dao.py b/db_dao.py index d69e423..9e63bc5 100644 --- a/db_dao.py +++ b/db_dao.py @@ -1,6 +1,6 @@ # Author: Lee Wu Love Lele # Datetime: 2024/9/7 21:28 -import db_connection +import models # 创建游标对象 connection = db_connection.get_connection() diff --git a/main.py b/main.py index 2e7b946..4c7ae9d 100644 --- a/main.py +++ b/main.py @@ -1,99 +1,291 @@ # Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings. -import db_dao as dd import matplotlib.pyplot as plt import numpy as np import matplotlib.dates as mdates +from itertools import product import pandas as pd -from datetime import datetime,timedelta +from datetime import datetime +from config import DATETIME_FORMAT +import config +import sql_statements +import models +import sql_statements +import controller +from decimal import Decimal, getcontext +from util import smooth_data, calc_one_cattle +import util +import math -m_weight = [] -m_datetime = [] -m_start_time = '2024-09-09 06:50:00' -m_end_time = '2024-09-09 07:10:00' -# m_reader = '2405085412' # 客户看不到视频 -m_weighbridge_reader = '2405085474' # 客户可以看到设备 -# m_reader = '2404049517' -# m_reader = '2405085489' - -m_rfid_reader = '0A26AE180665' # 对应地磅的 2405085474 - - - -date_format = "%Y-%m-%d %H:%M:%S" -# Press the green button in the gutter to run the script. +SPLIT_LINE = "----------------------------------------" # 创建查询时间范围的序列 -def create_timestamp_seq(): - # 将起止时间转换为 datetime 对象 - m_datetime_start_obj = datetime.strptime(m_start_time, date_format) - m_datetime_end_obj = datetime.strptime(m_end_time, date_format) +# def create_timestamp_seq(): +# # 将起止时间转换为 datetime 对象 +# m_datetime_start_obj = datetime.strptime(m_start_time, date_format) +# m_datetime_end_obj = datetime.strptime(m_end_time, date_format) +# +# # 计算时间差并转为秒数 +# time_diff = m_datetime_end_obj - m_datetime_start_obj +# seconds = time_diff.total_seconds() + 1 +# +# # 创建时间序列,单位为秒 +# start_time = pd.Timestamp(m_start_time) +# time_seconds_seq = pd.date_range(start=start_time, periods=int(seconds), freq='s') # 生成日期差秒数个时间点,每个时间点间隔1秒 +# return time_seconds_seq - # 计算时间差并转为秒数 - time_diff = m_datetime_end_obj - m_datetime_start_obj - seconds = time_diff.total_seconds()+1 - - # 创建时间序列,单位为秒 - start_time = pd.Timestamp(m_start_time) - time_seconds_seq = pd.date_range(start=start_time, periods=int(seconds), freq='s') # 生成日期差秒数个时间点,每个时间点间隔1秒 - return time_seconds_seq # 补充缺失的时间秒数 -def time_data_suppl(timestamp,data): - i = 1 - sorted_data = [] - for ts in timestamp: - format_ts = ts.strftime(date_format) - if format_ts not in data: - sorted_data.append(0) - else: - sorted_data.append(float(data[format_ts])) - # print(str(i)+' 存在数据:'+format_ts+' ------> '+data[format_ts]) - i += 1 - return sorted_data +# def time_data_suppl(timestamp, data): +# i = 1 +# sorted_data = [] +# for ts in timestamp: +# format_ts = ts.strftime(date_format) +# if format_ts not in data: +# sorted_data.append(0) +# else: +# sorted_data.append(float(data[format_ts])) +# # print(str(i)+' 存在数据:'+format_ts+' ------> '+data[format_ts]) +# i += 1 +# return sorted_data + # 对相同时间的数据去重复,取体重最大值 -def time_data_dedup(data): - result_dict = {} - for row in data: - key = row[0].strftime(date_format) - if key not in result_dict or float(row[1]) > float(result_dict[key]): - result_dict[key] = float(row[1]) +# def time_data_dedup(data): +# result_dict = {} +# for row in data: +# key = row[0].strftime(date_format) +# if key not in result_dict or float(row[1]) > float(result_dict[key]): +# result_dict[key] = float(row[1]) +# +# return result_dict - return result_dict -# rfid数据去重 -def rfid_data_dedup(data): - # 保留顺序的去重方法 - seen = set() - unique_tuple_list = [] - for item in data: - if item not in seen: - unique_tuple_list.append(item) - seen.add(item) - return unique_tuple_list +def weight_wave_menu(rfid, buy_weight, time_std): # TODO: 需要修改为可以多个输入ID + weight_wave_lst = controller.get_weight_wave(rfid, time_std) + if not weight_wave_lst: + print('未找到体重数据,请重新输入牛只ID') + return -1 + print(SPLIT_LINE) + for weight_wave in weight_wave_lst: + print(str(weight_wave)) + print(SPLIT_LINE) + while True: + weight_wave_id = input('请输入体重波ID(或输入-1返回):') + if weight_wave_id == '-1': + cattle_menu(time_std) + elif not any(str(i.id) == weight_wave_id for i in weight_wave_lst): + print('输入不合法,请重试') + else: + item = next((obj for obj in weight_wave_lst if obj.id == int(weight_wave_id)), None) + diff_s = (datetime.strptime(item.end_time, DATETIME_FORMAT) - datetime.strptime(item.start_time, + DATETIME_FORMAT)).total_seconds() + if item is not None: + duration = item.start_time + " ~ " + item.end_time + " ( " + str(diff_s) + "s )" + print('正在获取体重日志...') + weight_wave_logs = controller.get_weight_wave_log(item, time_std) + print('正在显示图表...') + show_log_diagram(weight_wave_logs, buy_weight, daily_weight=item.weight, duration=duration, + rfid_frq=item.max_frq, id=item.id) -if __name__ == '__main__': - timestamp_seq = create_timestamp_seq() - mysql_weight_result = dd.get_cattle_weight_by_datetime(m_start_time, m_end_time, m_weighbridge_reader) - mysql_rfid_result = dd.get_rfid_by_datetime(m_start_time, m_end_time, m_rfid_reader) - dd.close() - clean_weight_result = time_data_dedup(mysql_weight_result) - clean_rfid_result = rfid_data_dedup(mysql_rfid_result) - final_weight_result = time_data_suppl(timestamp_seq, clean_weight_result) - plt.figure(figsize=(24, 4)) - plt.plot(timestamp_seq, final_weight_result, color='blue', linestyle='-', linewidth=1) - - for rst in clean_rfid_result: - # 添加纵向分割线 - plt.axvline(x=rst[0], color='r', linestyle='--', label=rst[1], linewidth = 0.5) - - # 添加图例 +def show_log_diagram(weight_wave_logs, buy_weight, daily_weight, duration, rfid_frq, id): + index = 0 + x = [] + y = [] + for item in weight_wave_logs: + y.append(float(item.weight)) + x.append(item.time) + index += 1 + y_smooth = smooth_data(y, max(5, int(len(weight_wave_logs) * ( + 12 / 100)) // 2 * 2 + 1)) + m_daily_weight = calc_one_cattle(smooth_data(y, max(5, int(len(weight_wave_logs) * ( + 12 / 100)) // 2 * 2 + 1))) + plt.figure(1, figsize=(20, 4)) + plt.plot(x, y, color='blue', linestyle='-', linewidth=1) + # 添加横向分割线 + plt.axhline(y=math.ceil(float(buy_weight)), color='red', linestyle='--', linewidth=1, + label='buy_weight: ' + str(math.ceil(float(buy_weight)))) + if not np.isnan(m_daily_weight): # TODO: 为什么 4 - 1307 - 8311 体重波没有稳定值但是记录了 486.0952 + plt.axhline(y=math.ceil(m_daily_weight), color='green', linestyle='--', linewidth=1, + label='daily_weight: ' + str(math.ceil(m_daily_weight))) plt.legend() - - plt.xlabel('datetime') plt.ylabel('weight(kg)') - plt.title('reader: ' + m_weighbridge_reader + '\n' + m_start_time + ' ---> ' + m_end_time) - plt.show() \ No newline at end of file + plt.title( + "id: " + str(id) + " len: " + str(len(weight_wave_logs)) + " duration: " + duration + " rfid_frq: " + str(rfid_frq)) + plt.figure(2, figsize=(20, 4)) + plt.plot(x, y_smooth, color='blue', linestyle='-', linewidth=1) + plt.axhline(y=math.ceil(float(buy_weight)), color='red', linestyle='--', linewidth=1, + label='buy_weight: ' + str(math.ceil(float(buy_weight)))) + if not np.isnan(m_daily_weight): # TODO: 为什么 4 - 1307 - 8311 体重波没有稳定值但是记录了 486.0952 + plt.axhline(y=math.ceil(m_daily_weight), color='green', linestyle='--', linewidth=1, + label='daily_weight: ' + str(math.ceil(m_daily_weight))) + # 添加标签和图例 + plt.legend() + plt.xlabel('datetime') + plt.ylabel('weight(kg)') + plt.title( + "id: " + str(id) + " len: " + str(len(weight_wave_logs)) + " duration: " + duration + " rfid_frq: " + str(rfid_frq)) + plt.show() + return + + +def cattle_menu(time_std): + rfid_lst = sorted(controller.get_rfid_all(), key=lambda x: x.id) # 获取所有牛只的RFID,根据记录主键从小到达排序 + print(SPLIT_LINE) + for item in rfid_lst: + print(vars(item)) + print(SPLIT_LINE) + while True: + cattle_id = input('请输入id(返回输入-1):') + if cattle_id == '-1': + main_menu() + elif not any(i.id == int(cattle_id) for i in rfid_lst): + print('输入无效请重新输入') + continue + else: + item = next((obj for obj in rfid_lst if obj.id == int(cattle_id)), None) + print('RFID:', item.rfid) + buy_weight = controller.get_buy_weight(item.rfid) + rt = weight_wave_menu(item.rfid, buy_weight, time_std) + if rt == -1: + continue + # daily_weight_lst = controller.get_daily_weight(item.rfid) + # if not daily_weight_lst: + # print('未找到体重数据,请重新输入牛只ID') + # continue + # for daily_weight in daily_weight_lst: + # print(daily_weight) + + +def main_menu(): + while True: + print('主菜单') + print(SPLIT_LINE) + print('0 - 退出程序') + print('1 - 初始化体重数据(设备时间)') + print('2 - 查看牛只日体重(设备时间)') + print('3 - 初始化体重数据(服务器时间)') + print('4 - 查看牛只日体重(服务器时间)') + print('5 - 重置本地数据库') + print(SPLIT_LINE) + ctrl = input('请输入:') + if ctrl == '-1': + test() + break + elif ctrl == '0': + break + elif ctrl == '1': + controller.clear_data_by_time_std(1) + controller.init_data(1) + cattle_menu(1) + elif ctrl == '2': + cattle_menu(1) + elif ctrl == '3': + controller.clear_data_by_time_std(2) + controller.init_data(2) + cattle_menu(2) + elif ctrl == '4': + cattle_menu(2) + elif ctrl == '5': + confirm = input('确定要清除所有数据吗?y :确定 / any key:退出') + if confirm == 'y' or 'Y': + controller.clear_all_data() + print('重置成功!') + continue + else: + print('输入错误,请重新输入') + continue + + +def test(): + data = [1, 2, 4, 7, 2, 3, 7, 10, 12, 5, 6, 5, 5, 20, 23, 21, 20, 23, 22, 19] + util.calc_one_cattle(data) + return + + +if __name__ == '__main__': + main_menu() + # timestamp_seq = create_timestamp_seq() + # mysql_weight_result = dd.get_cattle_weight_by_datetime(m_start_time, m_end_time, m_weighbridge_reader) + # mysql_rfid_result = dd.get_rfid_by_datetime(m_start_time, m_end_time, m_rfid_reader) + # dd.close() + # clean_weight_result = time_data_dedup(mysql_weight_result) + # clean_rfid_result = rfid_data_dedup(mysql_rfid_result) + # + # final_weight_result = time_data_suppl(timestamp_seq, clean_weight_result) + + # weight_wave_log = controller.get_weight_wave_log() + # time_seq = [] + # weight_seq = [float(item.get('biz_data')) for item in weight_wave_log] + # # weight_seq = [100, 90, 110, 130, 150, 170, 190, 192, 195, 199, 210, 220, 230, 240, 248, 255, 245, 248, 250, 252, + # # 253, 253, 257, 254, 250, 255, 259, 265, 275, 285, 295, 310, 320, 305, 295, 283, 273, 263, 253, 240, + # # 229, 209, 190, 174, 162, 150, 139, 114, 100, 90, 100, 101, 102, 100, 95, 97] + # index_seq = [i for i in range(1, len(weight_seq) + 1)] + # print('weight_seq: ' + str(weight_seq) + '\nindex_seq: ' + str(index_seq)) + # weight_seq_smooth = (smooth_data(weight_seq)) + # plt.figure(figsize=(20, 4)) + # plt.plot(index_seq, weight_seq_smooth, color='blue', linestyle='-', linewidth=1) + # plt.xlabel('sequence') + # plt.ylabel('weight(kg)') + # plt.title('weight_wave') + # plt.show() + # calc_one_cattle(weight_seq) + """ + x = np.linspace(0, 10, 100) + y1 = np.sin(x) + y2 = np.cos(x) + + # 第1个图形窗口 + plt.figure(1) + plt.plot(x, y1, color='r') + plt.title("Figure 1: Sine Function") + + # 第2个图形窗口 + plt.figure(2) + plt.plot(x, y2, color='b') + plt.title("Figure 2: Cosine Function") + + # 显示所有窗口 + plt.show() + """ + + # index = 1 + # for log in weight_wave_log: + # m_dt = datetime.strftime(log.get('biz_time'), controller.DATETIME_FORMAT) + # print(m_dt) + # time_seq.append(m_dt) + # index_seq.append(index) + # weight_seq.append(log.get('biz_data')) + # index += 1 + # for rst in clean_rfid_result: + # # 添加纵向分割线 + # plt.axvline(x=rst[0], color='r', linestyle='--', label=rst[1], linewidth = 0.5) + # + # # 添加图例 + # plt.legend() + # + # + # result = models.get_logs(floor_scale_id ="2404049517", start_time ="2024-09-01 00:00:00", end_time ="2024-09-02 00:00:00") + # result = models.get_ear_tag('9057970969300989') + # print(result) + + # floor_scale_id, start_time, end_time, max_weight, max_weight_time, max_frq_ear_tag, max_frq_rfid, time_std + # weight_wave = ('floor_scale_id', '2024-12-01 00:00:00', + # '2024-12-07 00:00:00', + # 100.00, '2024-12-06 19:00:00', 'max_frq_ear_tag', + # 'max_frq_rfid', 1) + # parent_id = models.add_weight_wave(False, weight_wave, False) + # rfid_frq = [] + # for i in range(1, 11): + # rfid_frq.append( + # (str(parent_id), str(i), 'rfid' + str(i), i, 1)) + # models.add_rfid_frq(True, rfid_frq,True) + # + # print('parent_id:'+str(parent_id)) + # + # models.delete_weight_wave(False) + # models.delete_rfid_frq(True) + # controller.init_data() + # controller.clean_data() diff --git a/models.py b/models.py new file mode 100644 index 0000000..a887395 --- /dev/null +++ b/models.py @@ -0,0 +1,429 @@ +# Author: Lee Wu +# Datetime: 2024/12/6 14:44 +import datetime +import decimal +from abc import abstractmethod + +import mysql.connector +from config import RDB_HOST, RDB_USER, RDB_NAME, RDB_PASSWORD, LDB_HOST, LDB_PORT, LDB_NAME, LDB_USER, LDB_PASSWORD, \ + DATETIME_FORMAT, TOTAL_BUY_WEIGHT_AVG +import sql_statements +from dataclasses import dataclass, astuple +from decimal import Decimal + + +# from datetime import datetime +@dataclass +class RFID: + id: int + rfid: str + ear_tag: str + + +@dataclass +class WeightWave: + id: int + weight: float + start_time: str + end_time: str + time_std: int + max_frq_rfid: str + max_frq: float + floor_scale_id: str + + +@dataclass +class Weight: + rfid: str + weight: float + time: str + time_std: int + rfid_reader: str = "" + + +@dataclass +class RFIDReader: + id_weight_wave: int + reader_name: str + + +@dataclass +class ExecLog: + floor_scale_id: str + period_start_time: str + period_end_time: str + time_std: int + + +@dataclass +class DailyWeight: + weight: float + date: str + + +# 数据库链接管理父类 +class DataBaseModel: + connection = None + cursor = None + + @classmethod + @abstractmethod + def get_connection_params(cls): + """ + 返回数据库连接参数。 + """ + pass + + @classmethod + def init(cls): + if cls.connection is None: # 防止重复初始化 + params = cls.get_connection_params() + cls.connection = mysql.connector.connect(**params) + cls.cursor = cls.connection.cursor(dictionary=True) + return cls.connection + + @classmethod + def query(cls, sql, params=None): + """ + 执行查询并返回结果 + """ + if cls.connection is None: + cls.init() + cls.cursor.execute(sql, params) + return cls.cursor.fetchall() + + @classmethod + def execute_trans(cls, sql, is_commit=True, params=None): + if cls.connection is None: + cls.init() + cls.cursor.executemany(sql, params) + if is_commit: + cls.connection.commit() + + # try: + # # 检查事务是否已启动,未启动则启动 + # if cls.connection.in_transaction is False: + # # 事务未启动 + # cls.connection.start_transaction() + # if type(params) is tuple: + # cls.cursor.execute(sql, params) + # parent_id = cls.cursor.lastrowid + # elif type(params) is list: + # cls.cursor.executemany(sql, params) + # if is_commit: + # cls.connection.commit() + # except Exception as e: + # print(f"捕获错误: {e} \n --> " + cls.cursor.statement + '\n ----> ' + params) + # cls.connection.rollback() + # finally: + + @classmethod + def execute(cls, sql, is_commit=True, params=None): + if cls.connection is None: + cls.init() + cls.cursor.execute(sql, params) + parent_id = cls.cursor.lastrowid + if is_commit: + cls.connection.commit() + return parent_id + + @classmethod + def close(cls): + """ + 关闭数据库连接 + """ + if cls.cursor: + cls.cursor.close() + if cls.connection: + cls.connection.close() + cls.cursor = None + cls.connection = None + + +# 链接远程数据库,主要用户获取原始数据 +class RemoteDataBaseModel(DataBaseModel): + """ + 具体的子类,定义具体的数据库连接参数。 + """ + + @classmethod + def get_connection_params(cls): + """ + 返回服务器数据库参数 + """ + return { + "host": RDB_HOST, + "user": RDB_USER, + "password": RDB_PASSWORD, + "database": RDB_NAME + } + + +# 链接本地数据库,主要将分析结果储存到本地数据库 +class LocalDataBaseModel(DataBaseModel): + """ + 具体的子类,定义具体的数据库连接参数。 + """ + + @classmethod + def get_connection_params(cls): + """ + 返回服务器数据库参数 + """ + return { + "host": LDB_HOST, + 'port': LDB_PORT, + "user": LDB_USER, + "password": LDB_PASSWORD, + "database": LDB_NAME + } + + +class DAO: + + # 查询所有地磅ID和RFID Reader ID的关联数据 + @staticmethod + def get_devices_id(): + result = RemoteDataBaseModel.query(sql_statements.QUERY_DEVICES) + RemoteDataBaseModel.close() + return result + + @staticmethod + def get_buy_weight_by_rfid(rfid): + result = RemoteDataBaseModel.query(sql_statements.sql_query_buy_weight_by_rfid(), (rfid,)) + RemoteDataBaseModel.close() + if result: + return result[0].get('buy_weight') + else: + return Decimal(TOTAL_BUY_WEIGHT_AVG) # 如果没有购入体重的,则取所有牛只购入体重的平均值 + + # @staticmethod + # def get_rfid_reader_id(**kwargs): + # """ + # 根据地磅ID、时间段获取对应RFIDReaderID,若多个返回值则表示设备ID关系变化,对应的RFIDReader设备识别的RFID都要参考。 + # :param kwargs: floor_scale_id(tuple),start_time(str),end_time(str) + # :return: + # """ + # if 'floor_scale_id' not in kwargs: + # return + # sql = sql_statements.sql_query_logs_group_by_relation_reader_name(kwargs['floor_scale_id']) + # params = kwargs['floor_scale_id'] + (kwargs['start_time'], kwargs['end_time']) + # result = RemoteDataBaseModel.query(sql, params) + # RemoteDataBaseModel.close() + # return result + + # 根据地磅ID查询所有日志或者根据地磅ID和一个时间段查询日志 + @staticmethod + def get_weight_logs(**kwargs): + """ + 根据地磅ID或RFID读取器ID,结合开始时间和结束时间获取日志数据 + :param kwargs: floor_scale_id(tuple),start_time(str),end_time(str),time_std(int) + :return: + """ + time_std = kwargs['time_std'] + sql = sql_statements.sql_query_logs_by_reader_name_and_duartion(kwargs['floor_scale_id'], + time_std) + params = kwargs['floor_scale_id'] + (kwargs['start_time'], kwargs['end_time']) + result = RemoteDataBaseModel.query(sql, params) + RemoteDataBaseModel.close() + weight_lst = [Weight( + weight=item.get('biz_data'), + time=item.get('biz_time') if time_std == 1 else item.get('create_time'), + time_std=kwargs['time_std'], + rfid='', + rfid_reader=item.get('relation_reader_name') + ) for item in result] + RemoteDataBaseModel.close() + return weight_lst + + @staticmethod + def get_rfid_count(rfid_reader_id, start_time, end_time): + """ + 根据多个RFID Reader ID查询其识别的RFID频次 + :param rfid_reader_id: tuple + :param start_time: str + :param end_time: str + :return: + """ + sql = sql_statements.sql_query_logs_count_group_by_biz_data(rfid_reader_id) + params = rfid_reader_id + (start_time, end_time) + result = RemoteDataBaseModel.query(sql, params) + RemoteDataBaseModel.close() + return result + + # 根据RFID查询对应的定位耳标外壳号 + @staticmethod + def get_ear_tag(rfid): + result = RemoteDataBaseModel.query(sql_statements.QUERY_EAR_TAG_ID_BY_RFID, [rfid]) + RemoteDataBaseModel.close() + return result + + # 根据定位耳标外壳号和日期查询体重波,本地查询 + @staticmethod + def get_weight_wave_by_floor_scale_id(floor_scale_id): + """ + 获取体重波基本信息 + :param floor_scale_id: + :return: + """ + result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_by_floor_scale_id(), (floor_scale_id,)) + LocalDataBaseModel.close() + return result + + @staticmethod + def get_weight_wave_by_rfid(rfid, time_std): + """ + 根据牛只RFID获取所有体重波 + :param rfid: + :param time_std: + :return: + """ + result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_by_rfid(), (rfid, time_std)) + LocalDataBaseModel.close() + weight_wave_lst = [WeightWave( + floor_scale_id=item.get('floor_scale_id'), + max_frq_rfid=rfid, + start_time=item.get('start_time').strftime(DATETIME_FORMAT), + end_time=item.get('end_time').strftime(DATETIME_FORMAT), + time_std=item.get('time_std'), + max_frq=item.get('max_frq'), + weight=item.get('weight'), + id=item.get('id_weight_wave') + ) for item in result] + return weight_wave_lst + + @staticmethod + def get_reader(id_weight_wave): + """ + 根据体重波ID查询对应的RFID Reader Name + :param id_weight_wave: + :return: + """ + result = LocalDataBaseModel.query(sql_statements.sql_query_relation_reader_by_id_weight_wave(), + (id_weight_wave,)) + LocalDataBaseModel.close() + return result + + @staticmethod + def get_rfid_all(): + result = RemoteDataBaseModel.query(sql_statements.sql_query_rfid_all_in_cattle()) + rfid_lst = [RFID( + id=item.get('id'), + rfid=item.get('rfid'), + ear_tag=item.get('ear_tag') + ) for item in result] + RemoteDataBaseModel.close() + return rfid_lst + + # @staticmethod + # def get_rfid(time_std): + # result = LocalDataBaseModel.query(sql_statements.sql_query_weight_wave_group_by_rfid(), (time_std,)) + # LocalDataBaseModel.close() + # return result + + # 插入一个体重波 + @staticmethod + def add_weight_wave(weight_wave, is_commit): + """ + 体重波插入值,不直接提交,等RFID Reader获取后统一提交 floor_scale_id, start_time, end_time, max_weight, max_weight_time, time_std, create_time, last_update_time + :param weight_wave: + :param is_commit: 是否提交查询 + :return: 插入后的主键 + """ + id_weight_wave = LocalDataBaseModel.execute(sql_statements.INSERT_WEIGHT_WAVE, is_commit, ( + weight_wave.floor_scale_id, weight_wave.start_time, weight_wave.end_time, weight_wave.time_std, + weight_wave.max_frq_rfid, weight_wave.max_frq, weight_wave.weight)) + return id_weight_wave + + @staticmethod + def add_rfid_frq(is_commit=False, params=None, is_close=True): + """ + 插入多条RFID频次值 + :param is_commit: 是否继续事务 + :param params: RFID频次插入值列表 id_weight_wave,ear_tag,rfid,rfid_frq,time_std + :param is_close: 是否关闭链接 + :return: + """ + # if params is None: + # return + # LocalDataBaseModel.execute_trans(sql_statements.INSERT_RIFD_FRQ, params, is_commit) + # if is_close: + # LocalDataBaseModel.close() + pass + + @staticmethod + def add_rfid_readers(rfid_reader_lst): + """ + 插入Reader后关闭链接 + :param rfid_reader_lst: + :return: + """ + LocalDataBaseModel.execute_trans(sql_statements.sql_insert_rfid_reader(), True, + [astuple(rfid_reader) for rfid_reader in rfid_reader_lst]) + LocalDataBaseModel.close() + + @staticmethod + def add_exec_log(exec_log, is_close=True): + + LocalDataBaseModel.execute(sql_statements.sql_insert_exec_log(), True, ( + exec_log.floor_scale_id, exec_log.period_start_time, exec_log.period_end_time, exec_log.time_std)) + if is_close: + LocalDataBaseModel.close() + + @staticmethod + def update_exec_log(exec_log, is_close=True): + LocalDataBaseModel.execute(sql_statements.sql_update_exec_log(), True, + (exec_log.period_end_time, exec_log.floor_scale_id)) + if is_close: + LocalDataBaseModel.close() + + @staticmethod + def delete_weight_wave_all(is_close=True): + rows = LocalDataBaseModel.execute(sql_statements.sql_delete_weight_wave_all()) + if is_close: + LocalDataBaseModel.close() + return rows + + @staticmethod + def delete_weight_wave_by_time_std(time_std): + rows = LocalDataBaseModel.execute(sql_statements.sql_delete_weight_wave_by_time_std(), True, (time_std,)) + LocalDataBaseModel.close() + return rows + + @staticmethod + def delete_rfid_frq(is_close=True): + rows = LocalDataBaseModel.execute(sql_statements.DELETE_RFID_FRQ) + if is_close: + LocalDataBaseModel.close() + return rows + + @staticmethod + def get_exec_log(time_std): + result = LocalDataBaseModel.query(sql_statements.QUERY_EXEC_LOG, time_std) + LocalDataBaseModel.close() + return result + + @staticmethod + def delete_exec_log_all(is_close=True): + rows = LocalDataBaseModel.execute(sql_statements.sql_delete_exec_log_all()) + if is_close: + LocalDataBaseModel.close() + return rows + + @staticmethod + def delete_exec_log_by_time_std(time_std): + rows = LocalDataBaseModel.execute(sql_statements.sql_delete_exec_log_by_time_std(), True, (time_std,)) + LocalDataBaseModel.close() + return rows + + @staticmethod + def delete_relation_reader(is_close=True): + """ + 删除与体重波关联的RFID Reader + :param is_close: + :return: + """ + rows = LocalDataBaseModel.execute(sql_statements.sql_delete_relatioin_rfid_reader()) + if is_close: + LocalDataBaseModel.close() + return rows diff --git a/sql_statements.py b/sql_statements.py new file mode 100644 index 0000000..fb6daec --- /dev/null +++ b/sql_statements.py @@ -0,0 +1,154 @@ +# Author: Lee Wu Love Lele +# Datetime: 2024/12/6 19:53 + +""" + SELECT ff.reader_code FROM farm_field AS ff + WHERE ff.is_deleted = 1 + AND ff.reader_code IS NOT NULL AND ff.reader_code != '' AND ff.rfid_reader_code IS NOT NULL AND ff.rfid_reader_code != '' + GROUP BY ff.reader_code +""" +QUERY_DEVICES = "SELECT ff.reader_code FROM farm_field AS ff WHERE ff.is_deleted = 1 AND ff.reader_code IS NOT NULL AND ff.reader_code != '' AND ff.rfid_reader_code IS NOT NULL AND ff.rfid_reader_code != '' GROUP BY ff.reader_code;" + +QUERY_EAR_TAG_ID_BY_RFID = """ + SELECT sb.shell_subject_code + FROM subject_base AS sb + WHERE is_deleted = 1 + AND lot_subject_code = %s + AND shell_subject_code != '' +""" + +""" +查询本地日志中是否分析过地磅或者分析到了什么时间 +""" +QUERY_EXEC_LOG = "SELECT * FROM exec_log AS el WHERE el.time_std = %s;" + +INSERT_WEIGHT_WAVE = """ + INSERT INTO weight_wave (floor_scale_id, start_time, end_time, time_std , max_frq_rfid , max_frq, weight, create_time,last_update_time) + VALUES (%s,%s,%s,%s,%s,%s,%s,NOW(),NOW()) +""" + +INSERT_RIFD_FRQ = """ + INSERT INTO rfid_value_frq(id_weight_wave,ear_tag,rfid,rfid_frq,time_std,create_time,last_update_time) + VALUES (%s,%s,%s,%s,%s,%s,%s) +""" + + +def sql_delete_weight_wave_all(): + return "TRUNCATE TABLE weight_wave" + + +def sql_delete_weight_wave_by_time_std(): + return "DELETE FROM weight_wave WHERE time_std = %s;" + + +DELETE_RFID_FRQ = "TRUNCATE TABLE rfid_value_frq" + + +def sql_query_buy_weight_by_rfid(): + return "SELECT buy_weight FROM subject_base WHERE lot_subject_code = %s;" + + +def sql_query_logs_count_group_by_biz_data(reader_names=None): + """ + 耳标RFID识别结果频次,biz_data返回的是RFID + :param reader_names: RFID Reader ID (tuple) + :return: + """ + placeholders = ', '.join(['%s'] * len(reader_names)) + return f"SELECT biz_data, COUNT(*) AS repeat_count FROM notify_log WHERE notify_group = 'rfidCallback' AND reader_name IN ({placeholders}) AND biz_time BETWEEN %s AND %s GROUP BY biz_data ORDER BY repeat_count DESC;" + + +# def sql_query_logs_by_reader_name(reader_names=None): +# """ +# 根据设备ID查询全时间周期 +# :param reader_name: (tuple) +# :return: +# """ +# placeholders = ', '.join(['%s'] * len(reader_names)) +# return f"SELECT nl.biz_time, nl.biz_data, nl.create_time, nl.relation_reader_name FROM notify_log AS nl WHERE reader_name IN ({placeholders}) AND is_deleted = 1 ORDER BY biz_time ASC, create_time ASC;" + + +def sql_query_logs_by_reader_name_and_duartion(reader_names=None, time_std=1): + """ + 根据设备ID查询特定时间段 + :param reader_name: (tuple) + :return: + """ + placeholders = ', '.join(['%s'] * len(reader_names)) + first_order = 'biz_data' + second_order = 'create_time' + time = 'biz_time' + if time_std == 1: + pass + elif time_std == 2: + first_order = 'create_time' + second_order = 'biz_data' + time = 'create_time' + pass + return f"SELECT nl.notify_id, nl.biz_data ,nl.biz_time, nl.create_time, nl.relation_reader_name FROM notify_log AS nl WHERE reader_name IN({placeholders}) AND is_deleted = 1 AND {time} BETWEEN %s AND %s ORDER BY {first_order} ASC, {second_order} ASC, notify_id ASC;" + + +def sql_query_logs_group_by_relation_reader_name(reader_names=None): + """ + 根据地磅ID查询日志中对应的RFIDReaderID出现次数,识别一个时间段中地磅ID与RFIDReaderID对应关系的变化。 + :param reader_name: (tuple) + :return: + """ + placeholders = ', '.join(['%s'] * len(reader_names)) + return f"SELECT relation_reader_name, COUNT(*) AS repeat_count FROM notify_log WHERE reader_name IN ({placeholders}) AND biz_time BETWEEN %s AND %s GROUP BY relation_reader_name ORDER BY repeat_count DESC;" + + +def sql_insert_rfid_reader(): + return "INSERT INTO relation_rfid_reader(id_weight_wave,reader_name,create_time,last_update_time) VALUES (%s,%s,NOW(),NOW());" + + +def sql_insert_exec_log(): + return "INSERT INTO exec_log(floor_scale_id,start_time,end_time,time_std,create_time,last_update_time) VALUES(%s,%s,%s,%s,NOW(),NOW());" + + +def sql_delete_exec_log_all(): + return "TRUNCATE TABLE exec_log;" + + +def sql_delete_exec_log_by_time_std(): + return "DELETE FROM weight_wave WHERE time_std = %s;" + + +def sql_delete_relatioin_rfid_reader(): + return "TRUNCATE TABLE relation_rfid_reader;" + + +def sql_update_exec_log(): + return "UPDATE exec_log SET end_time = %s, last_update_time = NOW() WHERE floor_scale_id = %s;" + + +def sql_query_weight_wave_by_floor_scale_id(): + """ + 通过地磅ID查询全部体重波 + :return: + """ + return "SELECT * FROM weight_wave where floor_scale_id = %s" + + +def sql_query_weight_wave_by_rfid(): + """ + 最高出现频率的RFID查询全部体重波 + :return: + """ + return "SELECT * FROM weight_wave where max_frq_rfid = %s" + + +def sql_query_relation_reader_by_id_weight_wave(): + return "SELECT * FROM relation_rfid_reader AS rrr WHERE rrr.id_weight_wave = %s" + + +def sql_query_weight_wave_group_by_rfid(): + return "SELECT max_frq_rfid, COUNT(*) AS repeat_count FROM weight_wave WHERE time_std = %s GROUP BY max_frq_rfid;" + + +def sql_query_weight_wave_by_rfid(): + return "SELECT * FROM weight_wave WHERE max_frq_rfid = %s AND time_std = %s ORDER BY start_time ASC;" + + +def sql_query_rfid_all_in_cattle(): + return "SELECT subject_id AS id ,lot_subject_code AS rfid, shell_subject_code AS ear_tag FROM subject_base WHERE is_deleted = 1 AND buy_weight != 0 AND lot_subject_code != '' AND shell_subject_code != '';" diff --git a/util.py b/util.py new file mode 100644 index 0000000..d8115cb --- /dev/null +++ b/util.py @@ -0,0 +1,74 @@ +# Author: Lee Wu Love Lele +# Datetime: 2024/12/8 12:58 +from config import DATETIME_FORMAT +from datetime import datetime +from scipy.signal import savgol_filter +import numpy as np +from config import STABILITY_THRESHOLD +import pandas as pd + + +def smooth_data(data, window_length=5, polyorder=3): + """ + 平滑数据降噪,使用Savitzky-Golay滤波 + * 通过算法(Savitzky-Golay滤波),将短期波动平滑化,突出重量变化的主要趋势,从而更接近牛的真实体重曲线。 + * 在牛走上或走下地磅时,体重数据会从小到大(走上)或从大到小(走下)发生变化。平滑处理可以让“从小到大”或“从大到小”的趋势更加清晰,便于后续算法找到牛完整通过地磅的时间段。 + * 如果直接对噪声较大的数据进行分析(如提取最大值、计算均值等),可能需要大量的异常值处理或复杂的算法来排除干扰。 + * 平滑数据可以简化后续计算过程,使得分析模型更加高效。 + * 牛在地磅上可能出现诸如抬腿、摇晃等小幅动作,这些动作会导致短时间内体重数据发生波动。平滑能够消除这些短时间的小波动,使得体重数据更加稳定。 + * 平滑可以让真正的异常值(如多头牛站上地磅)更加显著,便于识别。例如:没有平滑时,普通波动和异常值可能混淆在一起。平滑后,正常数据的波动范围变小,异常值更加容易识别。 + :param data: + :param window_length: int + :param polyorder: int + :return: + """ + + """ + window_length + * 含义: + * 表示滑动窗口的长度,即在每次滤波计算中使用的点的数量。 + * 它必须是一个正奇数,如 3、5、7 等,因为 Savitzky-Golay 滤波需要确保窗口中心有一个对称点。 + * 作用: + * 窗口越大: + * 数据被平滑得越明显,但可能会导致细节丢失。 + * 更适合处理噪声较多但关注长期趋势的情况。 + * 窗口越小: + * 保留更多的细节,但对噪声的抑制效果较差。 + * 更适合处理数据变化较快且噪声较少的情况。 + + polyorder + * 含义: + * 表示拟合多项式的阶数,用于滑动窗口内的数据拟合。 + * 它必须小于 window_length,因为阶数不能超过拟合点的数量(否则拟合就无意义)。 + * 作用: + * 多项式阶数越高: + * 滤波器拟合数据的能力更强,可以保留更多复杂的细节,但同时可能引入噪声(过拟合)。 + * 更适合处理具有快速变化趋势的数据。 + * 多项式阶数越低: + * 滤波器更倾向于平滑数据,减少噪声,但可能会忽略一些数据细节。 + * 更适合处理平稳的数据或去除高频噪声。 + """ + print(f'window_len: {window_length}, polyorder: {polyorder}') + smoothed_data = savgol_filter(data, window_length=window_length, polyorder=polyorder) + return smoothed_data + + +def calc_one_cattle(data): + print('data: ', data) + # print('data len: ', len(data)) + # 计算变化率 + rate_of_change = np.abs(np.diff(data)) # 一阶差分后的绝对值,比data长度少1 + print('np.diff: ', np.diff(data)) + # print('np.diff len: ', len(np.diff(data))) + print('rate_of_change: ', rate_of_change) + # print('rate_of_change len: ', len(rate_of_change)) + # 设置阈值,识别稳定时间段 + stability_threshold = STABILITY_THRESHOLD # 变化率小于此值视为稳定 + stable_indices = np.where(rate_of_change < stability_threshold)[0] + print('stable_indices: ', stable_indices) + # 找到稳定时间段的体重中值 + stable_weights = [data[i] for i in stable_indices] + print('stable_weights: ', stable_weights) + stable_median_weight = np.median(stable_weights) # TODO:这里取了所有稳定值的中位数,这里包含了异常稳定值形成干扰,如何识别这些异常? + print('stable_median_weight: ', stable_median_weight) + return stable_median_weight