hms_cattle_research/controller.py
2024-12-22 09:58:15 +08:00

296 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Author: Lee Wu Love Lele
# Datetime: 2024/12/6 15:24
from util import calc_one_cattle, smooth_data
from models import WeightWave, DAO, ExecLog, DailyWeight
from decimal import Decimal, getcontext
import copy
from datetime import datetime
from config import DATETIME_FORMAT, TQDM_DISABLE, MIN_TIME_WINDOW, BUY_WEIGHT_MAX, BUY_WEIGHT_MIN, \
MIN_TOTAL_BUY_WEIGHT_PERCENT, \
MAX_TOTAL_BUY_WEIGHT_PERCENT, RFID_FREQUENCY_THRESHOLD_VALUE, MIN_SINGLE_BUY_WEIGHT_PERCENT, \
MAX_SINGLE_BUY_WEIGHT_PERCENT
from tqdm import tqdm
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
def clear_all_data():
DAO.delete_weight_wave_all(False)
DAO.delete_exec_log_all(True)
def clear_data_by_time_std(time_std):
DAO.delete_weight_wave_by_time_std(time_std)
DAO.delete_exec_log_by_time_std(time_std)
def init_data(time_std):
"""
初始化数据从远程数据库中截取地磅的连续波同时记录连续波中的体重值出现过的对应的RFID Reader ID
:return:
"""
search_start_time = '2024-09-02 00:00:00' # TODO: 从开始时间截取的波之前如果有日志,这个波的开始时间并不准确,因为之前可能存在连续值属于这个波。
search_end_time = "2024-12-19 23:59:59" # 查询远程日志的截止时间,当前系统时间
getcontext().prec = 5
buy_weight_max = Decimal(BUY_WEIGHT_MAX) # 609
buy_weight_min = Decimal(BUY_WEIGHT_MIN) # 363
ctrl_min_weight = buy_weight_min * Decimal(MIN_TOTAL_BUY_WEIGHT_PERCENT) # 254.10
ctrl_max_weight = buy_weight_max * Decimal(MAX_TOTAL_BUY_WEIGHT_PERCENT) # 791.70
# init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, MIN_TIME_WINDOW)
init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, MIN_TIME_WINDOW)
# def init_daily_weight(time_std):
# """
# 初始化牛只每日体重
# :param time_std:
# :return:
# """
# rfid_lst = DAO.get_rfid(time_std) # 分组查询所有体重波中的牛只RFID
# for rfid in rfid_lst:
# weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid) # 根据RFID查询所有体重波
# for weight_wave in weight_wave_lst:
# pass
# pass
# return
def get_daily_weight(rfid, time_std):
"""
根据单个RFID获取按照每日计算的体重
:param rfid:
:param time_std:
:return:
"""
weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid, time_std)
record_date = None
weight_lst = []
daily_weight_lst = []
index = 1
for weight_wave in weight_wave_lst:
if (record_date is not None and record_date != datetime.strptime(weight_wave.start_time,
DATETIME_FORMAT).date()) or index == len(
weight_wave_lst):
if len(weight_lst) > 2:
trimmed_lst = sorted(weight_lst)[1:-1]
daily_weight_lst.append(DailyWeight(
weight=float(Decimal(str(sum(trimmed_lst))) / Decimal(str(len(trimmed_lst)))),
date=datetime.strftime(record_date, DATETIME_FORMAT)
))
weight_lst.clear()
if record_date is None or record_date != datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date():
record_date = datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date()
weight_lst.append(weight_wave.weight)
index += 1
if not weight_lst and daily_weight_lst and len(weight_lst) > 2:
trimmed_lst = sorted(weight_lst)[1:-1]
daily_weight_lst.append(DailyWeight(
weight=float(Decimal(str(sum(trimmed_lst))) / Decimal(str(len(trimmed_lst)))),
date=datetime.strftime(record_date, DATETIME_FORMAT)
))
dict_lst = [obj.__dict__ for obj in daily_weight_lst]
sorted_lst = sorted(dict_lst, key=lambda x: datetime.strptime(x['date'], DATETIME_FORMAT))
return sorted_lst
def get_weight_wave(rfid, time_std):
"""
根据RFID获取体重波
:param rfid: str
:param time_std: int 1=设备时间2=服务器时间
:return:
"""
weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid, time_std)
# dict_lst = [obj.__dict__ for obj in weight_wave_lst]
sorted_lst = sorted(weight_wave_lst, key=lambda x: datetime.strptime(x.start_time, DATETIME_FORMAT))
return sorted_lst
def get_weight_wave_log(weight_wave, time_std):
params = {'floor_scale_id': (weight_wave.floor_scale_id,), 'start_time': weight_wave.start_time,
'end_time': weight_wave.end_time, 'time_std': time_std}
weight_wave_logs = DAO.get_weight_logs(**params)
return weight_wave_logs
def init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std=1,
min_time_window=10):
"""
初始化体重波
根据地磅列表从所有地磅日志中从开始时间逐秒轮询,截取在指定体重范围内的连续体重
:param search_start_time:
:param search_end_time:
:param ctrl_min_weight:
:param ctrl_max_weight:
:param time_std:
:param min_time_window
:return:
"""
devices = DAO.get_devices_id() # 获取所有地磅ID从养殖场记录中获取。TODO: 是否时当前设备,如果设备更换了如何处理?
exec_logs = DAO.get_exec_log((time_std,)) # 根据所需分析的时间类型获取执行记录。
for device in tqdm(devices, desc="遍历地磅设备:", position=1, disable=TQDM_DISABLE):
floor_scale_id = device['reader_code'] # 取出地磅id
never_logged = True # 截取进程记录控制
# if floor_scale_id != '2405085412':
# continue
for exec_log_item in exec_logs:
if exec_log_item.get('floor_scale_id') == floor_scale_id:
print('-- 地磅:' + floor_scale_id + ' | ' + (
'设备时间' if time_std == 1 else '服务器时间 :') + ' 记录存在')
# 执行记录中存在地磅并且新查询的结束时间在执行记录的范围内,则不做处理。
if exec_log_item.get('end_time') < datetime.strptime(
search_end_time, DATETIME_FORMAT):
# 执行记录中存在地磅并且新查询的结束时间在执行记录之后,根据执行记录最后一个波的结束时间开始执行
print('---- 查询时间段在记录时间段外,补充时间段:' + exec_log_item.get('end_time').strftime(
DATETIME_FORMAT) + '' + search_end_time)
weight_wave_lst = create_weight_wave(floor_scale_id,
exec_log_item.get('end_time').strftime(DATETIME_FORMAT),
search_end_time, ctrl_min_weight, ctrl_max_weight,
time_std, min_time_window)
exec_log = save_weight_wave(floor_scale_id, weight_wave_lst, time_std)
save_exec_log(exec_log)
never_logged = False
break
if never_logged: # 执行记录中没有出现过地磅数据,则根据查询起止时间全量查询
weight_wave_lst = create_weight_wave(floor_scale_id, search_start_time, search_end_time, ctrl_min_weight,
ctrl_max_weight,
time_std, min_time_window)
exec_log = save_weight_wave(floor_scale_id, weight_wave_lst, time_std)
save_exec_log(exec_log)
def create_weight_wave(floor_scale_id, search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight,
time_std=1, min_time_window=10):
"""
从远程服务器拉取一个地磅的日志数据,分析体重数据。
:param floor_scale_id: 地磅ID
:param search_start_time: 查询起始时间(包含)
:param search_end_time: 查询截止时间(包含)
:param ctrl_min_weight: 体重窗口最小值
:param ctrl_max_weight: 体重窗口最大值
:param time_std: 1 = 设备时间2 = 服务器时间
:param min_time_window: 最小时间窗口s
:return:
"""
start_time = None # 一个波循环计数开始时间
prev_time = None # 上一条数据的时间指针
weight_wave_lst = [] # 存放波的结构
weight_lst = [] # 存放波的体重值
relation_reader_name_set = set() # 去重存放从体重日志中关联的RFID Reader ID
for weight_log in tqdm(DAO.get_weight_logs(
**{'floor_scale_id': (floor_scale_id,), 'start_time': search_start_time, 'end_time': search_end_time,
'time_std': time_std}),
desc="---- 根据地磅ID迭代日志", position=2, disable=TQDM_DISABLE):
log_weight = weight_log.weight # 获取体重值
log_time = weight_log.time # 获得数据采集时间
log_relation_reader_name = weight_log.rfid_reader
is_out_of_range = ctrl_min_weight > Decimal(log_weight) or Decimal(log_weight) > ctrl_max_weight
is_out_of_interval = (log_time - prev_time).total_seconds() > 1 if prev_time is not None else False
is_in_of_time_window = (
prev_time - start_time).total_seconds() > min_time_window if start_time is not None else False
if start_time is not None and (is_out_of_interval or is_out_of_range) and is_in_of_time_window:
"""
一个波的结束在符合波长时间超过最小波长时间窗口两个临近值间隔超过1s或者体重范围超过阈值时触发截断形成一个波
但是如果该波段内没有识别到牛只RFID或者识别到的牛只RFID没有明显的频率有一个RIFD频率超过75%),放弃获取这个波。
"""
rfid_count_lst = DAO.get_rfid_count(tuple(relation_reader_name_set), start_time.strftime(DATETIME_FORMAT),
prev_time.strftime(DATETIME_FORMAT))
if rfid_count_lst:
df = pd.DataFrame(rfid_count_lst)
df["percentage"] = df["repeat_count"] / df["repeat_count"].sum() # 计算所有RFID的频率
df_sorted = df.sort_values(by="percentage", ascending=False) # 按照频率从大到小排列
# df_sorted = df_sorted.reset_index(drop=True) # 重新排序index
# TODO: 这里可以考虑虽然频率高但是如果一个10秒以上的波只有一个RFID频次出现是否需要取这个波在这里记录一下RFID的频次
max_frq = df_sorted.iloc[0]["percentage"] # 取最大频率的RFID对应的频率
max_rfid = df_sorted.iloc[0]["biz_data"] # 取最大频率的RFID
weight_lst_len = len(weight_lst)
# TODO在这里增加一个判断需要判断出现一定时常的连续稳定值
if max_frq > RFID_FREQUENCY_THRESHOLD_VALUE:
weight = calc_one_cattle( # TODO: 这里的稳定值计算与图表显示的不一致,这里相对准确(因为图表显示数据向上取整)。
smooth_data(weight_lst, max(3, int(weight_lst_len * (
12 / 100)) // 2 * 2 + 1))) # 取体重波长度的 12%处附近的奇数作为平滑窗口大小最小窗口为3
if not np.isnan(weight): # weight可能为nan当数据平均浮动过大时可能找不到稳定值则不取这个波
buy_weight = DAO.get_buy_weight_by_rfid(max_rfid) # 获取这个RFID牛只的购入体重
if buy_weight * Decimal(MAX_SINGLE_BUY_WEIGHT_PERCENT) > weight > buy_weight * Decimal(
MIN_SINGLE_BUY_WEIGHT_PERCENT): # 根据这一头牛的购入体重上下30%区间获取体重波
weight_wave = WeightWave(
id=-1,
floor_scale_id=floor_scale_id,
start_time=start_time.strftime(DATETIME_FORMAT),
end_time=prev_time.strftime(DATETIME_FORMAT),
time_std=time_std,
max_frq=max_frq,
max_frq_rfid=max_rfid,
weight=weight
)
weight_wave_lst.append(weight_wave) # 将一个体重波数据加入缓存列表
relation_reader_name_set.clear() # 清空RFID Reader集按照每一个波的范围内识别Reader
weight_lst.clear() # 清空体重值存储列表
if (start_time is None and not is_out_of_range) or (
start_time is not None and is_out_of_interval and not is_out_of_range):
"""
一个新的波的开始,重置体重波开始时间,将最新时间作为开始时间。
当第一次迭代或者因为时间间隔超过1s但是当前值在阈值范围内。
"""
start_time = log_time
if start_time is not None and is_out_of_range:
"""
在一个波的遍历过程中(已有开始时间),
如果获取到的值超过了体重阈值,则无法将这个值作为一个新的波的开始,重置体重波开始时间,将开始时间清空待后续遍历值符合要求后再设定。
"""
start_time = None
if not is_out_of_range and log_relation_reader_name not in relation_reader_name_set:
"""
当一个有效波获取到时波的开始节点中间过程和结束节点在一个波的形成过程中将体重值存储并将RFID Reader ID去重存储。
"""
relation_reader_name_set.add(log_relation_reader_name) # 取出关联的RFID Reader ID去重存储
if not is_out_of_range:
"""
当一个有效波获取到时(波的开始节点,中间过程和结束节点),在一个波的形成过程中,将体重值缓存
"""
weight_lst.append(float(log_weight))
prev_time = log_time
return weight_wave_lst
def save_weight_wave(floor_scale_id, weight_wave_lst, time_std):
"""
将体重波存储本地
:param floor_scale_id
:param weight_wave_lst已经存放了所有值
:param time_std
:return:
"""
# 当一个地磅的波取完时将波和波所属的地磅对应的RFID Reader存储进本地数据库
period_start_time = None # 一个地磅第一个波的开始时间
period_end_time = None # 一个地磅最后一个波的结束时间
for weight_wave_item in tqdm(weight_wave_lst, desc="---- 插入体重波", position=2,
disable=TQDM_DISABLE):
DAO.add_weight_wave(weight_wave_item, True) # 插入体重波数据
if period_start_time is None: # 判断exec_logs时间周期是否是截点
period_start_time = weight_wave_item.start_time
period_end_time = weight_wave_item.end_time
return ExecLog(floor_scale_id, period_start_time, period_end_time, time_std)
def save_exec_log(exec_log):
"""
储存进度日志,已存在的地磅更新,只往后更新不往前更新
:param exec_log:
:return:
"""
if exec_log.period_start_time is None or exec_log.period_end_time is None: # 地磅波列表是空的话不做记录
return
if exec_log.period_start_time == '': # 当地磅日志没有定义开始时间时,表示需要更新,仅更新该条地磅日志的结束时间
DAO.update_exec_log(exec_log)
else:
DAO.add_exec_log(exec_log)
def get_rfid_all():
return DAO.get_rfid_all()
def get_buy_weight(rfid):
return DAO.get_buy_weight_by_rfid(rfid)