# Author: Lee Wu Love Lele # Datetime: 2024/12/6 15:24 from util import calc_one_cattle, smooth_data from models import WeightWave, DAO, ExecLog, DailyWeight from decimal import Decimal, getcontext import copy from datetime import datetime from config import DATETIME_FORMAT, TQDM_DISABLE, MIN_TIME_WINDOW, BUY_WEIGHT_MAX, BUY_WEIGHT_MIN, \ MIN_TOTAL_BUY_WEIGHT_PERCENT, \ MAX_TOTAL_BUY_WEIGHT_PERCENT, RFID_FREQUENCY_THRESHOLD_VALUE, MIN_SINGLE_BUY_WEIGHT_PERCENT, \ MAX_SINGLE_BUY_WEIGHT_PERCENT from tqdm import tqdm import numpy as np import pandas as pd from scipy.signal import savgol_filter def clear_all_data(): DAO.delete_weight_wave_all(False) DAO.delete_exec_log_all(True) def clear_data_by_time_std(time_std): DAO.delete_weight_wave_by_time_std(time_std) DAO.delete_exec_log_by_time_std(time_std) def init_data(time_std): """ 初始化数据,从远程数据库中截取地磅的连续波,同时记录连续波中的体重值出现过的对应的RFID Reader ID :return: """ search_start_time = '2024-09-02 00:00:00' # TODO: 从开始时间截取的波之前如果有日志,这个波的开始时间并不准确,因为之前可能存在连续值属于这个波。 search_end_time = "2024-12-19 23:59:59" # 查询远程日志的截止时间,当前系统时间 getcontext().prec = 5 buy_weight_max = Decimal(BUY_WEIGHT_MAX) # 609 buy_weight_min = Decimal(BUY_WEIGHT_MIN) # 363 ctrl_min_weight = buy_weight_min * Decimal(MIN_TOTAL_BUY_WEIGHT_PERCENT) # 254.10 ctrl_max_weight = buy_weight_max * Decimal(MAX_TOTAL_BUY_WEIGHT_PERCENT) # 791.70 # init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, MIN_TIME_WINDOW) init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, MIN_TIME_WINDOW) # def init_daily_weight(time_std): # """ # 初始化牛只每日体重 # :param time_std: # :return: # """ # rfid_lst = DAO.get_rfid(time_std) # 分组查询所有体重波中的牛只RFID # for rfid in rfid_lst: # weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid) # 根据RFID查询所有体重波 # for weight_wave in weight_wave_lst: # pass # pass # return def get_daily_weight(rfid, time_std): """ 根据单个RFID获取按照每日计算的体重 :param rfid: :param time_std: :return: """ weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid, time_std) record_date = None weight_lst = [] daily_weight_lst = [] index = 1 for weight_wave in weight_wave_lst: if (record_date is not None and record_date != datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date()) or index == len( weight_wave_lst): if len(weight_lst) > 2: trimmed_lst = sorted(weight_lst)[1:-1] daily_weight_lst.append(DailyWeight( weight=float(Decimal(str(sum(trimmed_lst))) / Decimal(str(len(trimmed_lst)))), date=datetime.strftime(record_date, DATETIME_FORMAT) )) weight_lst.clear() if record_date is None or record_date != datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date(): record_date = datetime.strptime(weight_wave.start_time, DATETIME_FORMAT).date() weight_lst.append(weight_wave.weight) index += 1 if not weight_lst and daily_weight_lst and len(weight_lst) > 2: trimmed_lst = sorted(weight_lst)[1:-1] daily_weight_lst.append(DailyWeight( weight=float(Decimal(str(sum(trimmed_lst))) / Decimal(str(len(trimmed_lst)))), date=datetime.strftime(record_date, DATETIME_FORMAT) )) dict_lst = [obj.__dict__ for obj in daily_weight_lst] sorted_lst = sorted(dict_lst, key=lambda x: datetime.strptime(x['date'], DATETIME_FORMAT)) return sorted_lst def get_weight_wave(rfid, time_std): """ 根据RFID获取体重波 :param rfid: str :param time_std: int 1=设备时间,2=服务器时间 :return: """ weight_wave_lst = DAO.get_weight_wave_by_rfid(rfid, time_std) # dict_lst = [obj.__dict__ for obj in weight_wave_lst] sorted_lst = sorted(weight_wave_lst, key=lambda x: datetime.strptime(x.start_time, DATETIME_FORMAT)) return sorted_lst def get_weight_wave_log(weight_wave, time_std): params = {'floor_scale_id': (weight_wave.floor_scale_id,), 'start_time': weight_wave.start_time, 'end_time': weight_wave.end_time, 'time_std': time_std} weight_wave_logs = DAO.get_weight_logs(**params) return weight_wave_logs def init_weight_wave(search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std=1, min_time_window=10): """ 初始化体重波 根据地磅列表从所有地磅日志中从开始时间逐秒轮询,截取在指定体重范围内的连续体重 :param search_start_time: :param search_end_time: :param ctrl_min_weight: :param ctrl_max_weight: :param time_std: :param min_time_window :return: """ devices = DAO.get_devices_id() # 获取所有地磅ID,从养殖场记录中获取。TODO: 是否时当前设备,如果设备更换了如何处理? exec_logs = DAO.get_exec_log((time_std,)) # 根据所需分析的时间类型获取执行记录。 for device in tqdm(devices, desc="遍历地磅设备:", position=1, disable=TQDM_DISABLE): floor_scale_id = device['reader_code'] # 取出地磅id never_logged = True # 截取进程记录控制 # if floor_scale_id != '2405085412': # continue for exec_log_item in exec_logs: if exec_log_item.get('floor_scale_id') == floor_scale_id: print('-- 地磅:' + floor_scale_id + ' | ' + ( '设备时间' if time_std == 1 else '服务器时间 :') + ' 记录存在') # 执行记录中存在地磅并且新查询的结束时间在执行记录的范围内,则不做处理。 if exec_log_item.get('end_time') < datetime.strptime( search_end_time, DATETIME_FORMAT): # 执行记录中存在地磅并且新查询的结束时间在执行记录之后,根据执行记录最后一个波的结束时间开始执行 print('---- 查询时间段在记录时间段外,补充时间段:' + exec_log_item.get('end_time').strftime( DATETIME_FORMAT) + ' 到 ' + search_end_time) weight_wave_lst = create_weight_wave(floor_scale_id, exec_log_item.get('end_time').strftime(DATETIME_FORMAT), search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, min_time_window) exec_log = save_weight_wave(floor_scale_id, weight_wave_lst, time_std) save_exec_log(exec_log) never_logged = False break if never_logged: # 执行记录中没有出现过地磅数据,则根据查询起止时间全量查询 weight_wave_lst = create_weight_wave(floor_scale_id, search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std, min_time_window) exec_log = save_weight_wave(floor_scale_id, weight_wave_lst, time_std) save_exec_log(exec_log) def create_weight_wave(floor_scale_id, search_start_time, search_end_time, ctrl_min_weight, ctrl_max_weight, time_std=1, min_time_window=10): """ 从远程服务器拉取一个地磅的日志数据,分析体重数据。 :param floor_scale_id: 地磅ID :param search_start_time: 查询起始时间(包含) :param search_end_time: 查询截止时间(包含) :param ctrl_min_weight: 体重窗口最小值 :param ctrl_max_weight: 体重窗口最大值 :param time_std: 1 = 设备时间,2 = 服务器时间 :param min_time_window: 最小时间窗口(s) :return: """ start_time = None # 一个波循环计数开始时间 prev_time = None # 上一条数据的时间指针 weight_wave_lst = [] # 存放波的结构 weight_lst = [] # 存放波的体重值 relation_reader_name_set = set() # 去重存放从体重日志中关联的RFID Reader ID for weight_log in tqdm(DAO.get_weight_logs( **{'floor_scale_id': (floor_scale_id,), 'start_time': search_start_time, 'end_time': search_end_time, 'time_std': time_std}), desc="---- 根据地磅ID迭代日志:", position=2, disable=TQDM_DISABLE): log_weight = weight_log.weight # 获取体重值 log_time = weight_log.time # 获得数据采集时间 log_relation_reader_name = weight_log.rfid_reader is_out_of_range = ctrl_min_weight > Decimal(log_weight) or Decimal(log_weight) > ctrl_max_weight is_out_of_interval = (log_time - prev_time).total_seconds() > 1 if prev_time is not None else False is_in_of_time_window = ( prev_time - start_time).total_seconds() > min_time_window if start_time is not None else False if start_time is not None and (is_out_of_interval or is_out_of_range) and is_in_of_time_window: """ 一个波的结束,在符合波长时间超过最小波长时间窗口,两个临近值间隔超过1s或者体重范围超过阈值时触发截断,形成一个波, 但是如果该波段内没有识别到牛只RFID或者识别到的牛只RFID没有明显的频率(有一个RIFD频率超过75%),放弃获取这个波。 """ rfid_count_lst = DAO.get_rfid_count(tuple(relation_reader_name_set), start_time.strftime(DATETIME_FORMAT), prev_time.strftime(DATETIME_FORMAT)) if rfid_count_lst: df = pd.DataFrame(rfid_count_lst) df["percentage"] = df["repeat_count"] / df["repeat_count"].sum() # 计算所有RFID的频率 df_sorted = df.sort_values(by="percentage", ascending=False) # 按照频率从大到小排列 # df_sorted = df_sorted.reset_index(drop=True) # 重新排序index # TODO: 这里可以考虑,虽然频率高但是如果一个10秒以上的波只有一个RFID频次出现,是否需要取这个波?在这里记录一下RFID的频次 max_frq = df_sorted.iloc[0]["percentage"] # 取最大频率的RFID对应的频率 max_rfid = df_sorted.iloc[0]["biz_data"] # 取最大频率的RFID weight_lst_len = len(weight_lst) # TODO:在这里增加一个判断,需要判断出现一定时常的连续稳定值 if max_frq > RFID_FREQUENCY_THRESHOLD_VALUE: weight = calc_one_cattle( # TODO: 这里的稳定值计算与图表显示的不一致,这里相对准确(因为图表显示数据向上取整)。 smooth_data(weight_lst, max(3, int(weight_lst_len * ( 12 / 100)) // 2 * 2 + 1))) # 取体重波长度的 12%处附近的奇数作为平滑窗口大小,最小窗口为3 if not np.isnan(weight): # weight可能为nan,当数据平均浮动过大时,可能找不到稳定值,则不取这个波 buy_weight = DAO.get_buy_weight_by_rfid(max_rfid) # 获取这个RFID牛只的购入体重 if buy_weight * Decimal(MAX_SINGLE_BUY_WEIGHT_PERCENT) > weight > buy_weight * Decimal( MIN_SINGLE_BUY_WEIGHT_PERCENT): # 根据这一头牛的购入体重上下30%区间获取体重波 weight_wave = WeightWave( id=-1, floor_scale_id=floor_scale_id, start_time=start_time.strftime(DATETIME_FORMAT), end_time=prev_time.strftime(DATETIME_FORMAT), time_std=time_std, max_frq=max_frq, max_frq_rfid=max_rfid, weight=weight ) weight_wave_lst.append(weight_wave) # 将一个体重波数据加入缓存列表 relation_reader_name_set.clear() # 清空RFID Reader集,按照每一个波的范围内识别Reader weight_lst.clear() # 清空体重值存储列表 if (start_time is None and not is_out_of_range) or ( start_time is not None and is_out_of_interval and not is_out_of_range): """ 一个新的波的开始,重置体重波开始时间,将最新时间作为开始时间。 当第一次迭代或者因为时间间隔超过1s,但是当前值在阈值范围内。 """ start_time = log_time if start_time is not None and is_out_of_range: """ 在一个波的遍历过程中(已有开始时间), 如果获取到的值超过了体重阈值,则无法将这个值作为一个新的波的开始,重置体重波开始时间,将开始时间清空待后续遍历值符合要求后再设定。 """ start_time = None if not is_out_of_range and log_relation_reader_name not in relation_reader_name_set: """ 当一个有效波获取到时(波的开始节点,中间过程和结束节点),在一个波的形成过程中,将体重值存储并将RFID Reader ID去重存储。 """ relation_reader_name_set.add(log_relation_reader_name) # 取出关联的RFID Reader ID去重存储 if not is_out_of_range: """ 当一个有效波获取到时(波的开始节点,中间过程和结束节点),在一个波的形成过程中,将体重值缓存 """ weight_lst.append(float(log_weight)) prev_time = log_time return weight_wave_lst def save_weight_wave(floor_scale_id, weight_wave_lst, time_std): """ 将体重波存储本地 :param floor_scale_id :param weight_wave_lst(已经存放了所有值) :param time_std :return: """ # 当一个地磅的波取完时,将波和波所属的地磅对应的RFID Reader存储进本地数据库 period_start_time = None # 一个地磅第一个波的开始时间 period_end_time = None # 一个地磅最后一个波的结束时间 for weight_wave_item in tqdm(weight_wave_lst, desc="---- 插入体重波", position=2, disable=TQDM_DISABLE): DAO.add_weight_wave(weight_wave_item, True) # 插入体重波数据 if period_start_time is None: # 判断exec_logs时间周期是否是截点 period_start_time = weight_wave_item.start_time period_end_time = weight_wave_item.end_time return ExecLog(floor_scale_id, period_start_time, period_end_time, time_std) def save_exec_log(exec_log): """ 储存进度日志,已存在的地磅更新,只往后更新不往前更新 :param exec_log: :return: """ if exec_log.period_start_time is None or exec_log.period_end_time is None: # 地磅波列表是空的话不做记录 return if exec_log.period_start_time == '': # 当地磅日志没有定义开始时间时,表示需要更新,仅更新该条地磅日志的结束时间 DAO.update_exec_log(exec_log) else: DAO.add_exec_log(exec_log) def get_rfid_all(): return DAO.get_rfid_all() def get_buy_weight(rfid): return DAO.get_buy_weight_by_rfid(rfid)