# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings. import matplotlib.pyplot as plt import numpy as np import matplotlib.dates as mdates from itertools import product import pandas as pd from datetime import datetime from config import DATETIME_FORMAT import config import sql_statements import models import sql_statements import controller from decimal import Decimal, getcontext from util import smooth_data, calc_one_cattle import util import math SPLIT_LINE = "----------------------------------------" # 创建查询时间范围的序列 # def create_timestamp_seq(): # # 将起止时间转换为 datetime 对象 # m_datetime_start_obj = datetime.strptime(m_start_time, date_format) # m_datetime_end_obj = datetime.strptime(m_end_time, date_format) # # # 计算时间差并转为秒数 # time_diff = m_datetime_end_obj - m_datetime_start_obj # seconds = time_diff.total_seconds() + 1 # # # 创建时间序列,单位为秒 # start_time = pd.Timestamp(m_start_time) # time_seconds_seq = pd.date_range(start=start_time, periods=int(seconds), freq='s') # 生成日期差秒数个时间点,每个时间点间隔1秒 # return time_seconds_seq # 补充缺失的时间秒数 # def time_data_suppl(timestamp, data): # i = 1 # sorted_data = [] # for ts in timestamp: # format_ts = ts.strftime(date_format) # if format_ts not in data: # sorted_data.append(0) # else: # sorted_data.append(float(data[format_ts])) # # print(str(i)+' 存在数据:'+format_ts+' ------> '+data[format_ts]) # i += 1 # return sorted_data # 对相同时间的数据去重复,取体重最大值 # def time_data_dedup(data): # result_dict = {} # for row in data: # key = row[0].strftime(date_format) # if key not in result_dict or float(row[1]) > float(result_dict[key]): # result_dict[key] = float(row[1]) # # return result_dict def weight_wave_menu(rfid, buy_weight, time_std): # TODO: 需要修改为可以多个输入ID weight_wave_lst = controller.get_weight_wave(rfid, time_std) if not weight_wave_lst: print('未找到体重数据,请重新输入牛只ID') return -1 print(SPLIT_LINE) for weight_wave in weight_wave_lst: print(str(weight_wave)) print(SPLIT_LINE) while True: weight_wave_id = input('请输入体重波ID(或输入-1返回):') if weight_wave_id == '-1': cattle_menu(time_std) elif not any(str(i.id) == weight_wave_id for i in weight_wave_lst): print('输入不合法,请重试') else: item = next((obj for obj in weight_wave_lst if obj.id == int(weight_wave_id)), None) diff_s = (datetime.strptime(item.end_time, DATETIME_FORMAT) - datetime.strptime(item.start_time, DATETIME_FORMAT)).total_seconds() if item is not None: duration = item.start_time + " ~ " + item.end_time + " ( " + str(diff_s) + "s )" print('正在获取体重日志...') weight_wave_logs = controller.get_weight_wave_log(item, time_std) print('正在显示图表...') show_log_diagram(weight_wave_logs, buy_weight, daily_weight=item.weight, duration=duration, rfid_frq=item.max_frq, id=item.id) def show_log_diagram(weight_wave_logs, buy_weight, daily_weight, duration, rfid_frq, id): index = 0 x = [] y = [] for item in weight_wave_logs: y.append(float(item.weight)) x.append(item.time) index += 1 y_smooth = smooth_data(y, max(5, int(len(weight_wave_logs) * ( 12 / 100)) // 2 * 2 + 1)) m_daily_weight = calc_one_cattle(smooth_data(y, max(5, int(len(weight_wave_logs) * ( 12 / 100)) // 2 * 2 + 1))) plt.figure(1, figsize=(20, 4)) plt.plot(x, y, color='blue', linestyle='-', linewidth=1) # 添加横向分割线 plt.axhline(y=math.ceil(float(buy_weight)), color='red', linestyle='--', linewidth=1, label='buy_weight: ' + str(math.ceil(float(buy_weight)))) if not np.isnan(m_daily_weight): # TODO: 为什么 4 - 1307 - 8311 体重波没有稳定值但是记录了 486.0952 plt.axhline(y=math.ceil(m_daily_weight), color='green', linestyle='--', linewidth=1, label='daily_weight: ' + str(math.ceil(m_daily_weight))) plt.legend() plt.xlabel('datetime') plt.ylabel('weight(kg)') plt.title( "id: " + str(id) + " len: " + str(len(weight_wave_logs)) + " duration: " + duration + " rfid_frq: " + str(rfid_frq)) plt.figure(2, figsize=(20, 4)) plt.plot(x, y_smooth, color='blue', linestyle='-', linewidth=1) plt.axhline(y=math.ceil(float(buy_weight)), color='red', linestyle='--', linewidth=1, label='buy_weight: ' + str(math.ceil(float(buy_weight)))) if not np.isnan(m_daily_weight): # TODO: 为什么 4 - 1307 - 8311 体重波没有稳定值但是记录了 486.0952 plt.axhline(y=math.ceil(m_daily_weight), color='green', linestyle='--', linewidth=1, label='daily_weight: ' + str(math.ceil(m_daily_weight))) # 添加标签和图例 plt.legend() plt.xlabel('datetime') plt.ylabel('weight(kg)') plt.title( "id: " + str(id) + " len: " + str(len(weight_wave_logs)) + " duration: " + duration + " rfid_frq: " + str(rfid_frq)) plt.show() return def cattle_menu(time_std): rfid_lst = sorted(controller.get_rfid_all(), key=lambda x: x.id) # 获取所有牛只的RFID,根据记录主键从小到达排序 print(SPLIT_LINE) for item in rfid_lst: print(vars(item)) print(SPLIT_LINE) while True: cattle_id = input('请输入id(返回输入-1):') if cattle_id == '-1': main_menu() elif not any(i.id == int(cattle_id) for i in rfid_lst): print('输入无效请重新输入') continue else: item = next((obj for obj in rfid_lst if obj.id == int(cattle_id)), None) print('RFID:', item.rfid) buy_weight = controller.get_buy_weight(item.rfid) rt = weight_wave_menu(item.rfid, buy_weight, time_std) if rt == -1: continue # daily_weight_lst = controller.get_daily_weight(item.rfid) # if not daily_weight_lst: # print('未找到体重数据,请重新输入牛只ID') # continue # for daily_weight in daily_weight_lst: # print(daily_weight) def main_menu(): while True: print('主菜单') print(SPLIT_LINE) print('0 - 退出程序') print('1 - 初始化体重数据(设备时间)') print('2 - 查看牛只日体重(设备时间)') print('3 - 初始化体重数据(服务器时间)') print('4 - 查看牛只日体重(服务器时间)') print('5 - 重置本地数据库') print(SPLIT_LINE) ctrl = input('请输入:') if ctrl == '-1': test() break elif ctrl == '0': break elif ctrl == '1': controller.clear_data_by_time_std(1) controller.init_data(1) cattle_menu(1) elif ctrl == '2': cattle_menu(1) elif ctrl == '3': controller.clear_data_by_time_std(2) controller.init_data(2) cattle_menu(2) elif ctrl == '4': cattle_menu(2) elif ctrl == '5': confirm = input('确定要清除所有数据吗?y :确定 / any key:退出') if confirm == 'y' or 'Y': controller.clear_all_data() print('重置成功!') continue else: print('输入错误,请重新输入') continue def test(): data = [1, 2, 4, 7, 2, 3, 7, 10, 12, 5, 6, 5, 5, 20, 23, 21, 20, 23, 22, 19] util.calc_one_cattle(data) return if __name__ == '__main__': main_menu() # timestamp_seq = create_timestamp_seq() # mysql_weight_result = dd.get_cattle_weight_by_datetime(m_start_time, m_end_time, m_weighbridge_reader) # mysql_rfid_result = dd.get_rfid_by_datetime(m_start_time, m_end_time, m_rfid_reader) # dd.close() # clean_weight_result = time_data_dedup(mysql_weight_result) # clean_rfid_result = rfid_data_dedup(mysql_rfid_result) # # final_weight_result = time_data_suppl(timestamp_seq, clean_weight_result) # weight_wave_log = controller.get_weight_wave_log() # time_seq = [] # weight_seq = [float(item.get('biz_data')) for item in weight_wave_log] # # weight_seq = [100, 90, 110, 130, 150, 170, 190, 192, 195, 199, 210, 220, 230, 240, 248, 255, 245, 248, 250, 252, # # 253, 253, 257, 254, 250, 255, 259, 265, 275, 285, 295, 310, 320, 305, 295, 283, 273, 263, 253, 240, # # 229, 209, 190, 174, 162, 150, 139, 114, 100, 90, 100, 101, 102, 100, 95, 97] # index_seq = [i for i in range(1, len(weight_seq) + 1)] # print('weight_seq: ' + str(weight_seq) + '\nindex_seq: ' + str(index_seq)) # weight_seq_smooth = (smooth_data(weight_seq)) # plt.figure(figsize=(20, 4)) # plt.plot(index_seq, weight_seq_smooth, color='blue', linestyle='-', linewidth=1) # plt.xlabel('sequence') # plt.ylabel('weight(kg)') # plt.title('weight_wave') # plt.show() # calc_one_cattle(weight_seq) """ x = np.linspace(0, 10, 100) y1 = np.sin(x) y2 = np.cos(x) # 第1个图形窗口 plt.figure(1) plt.plot(x, y1, color='r') plt.title("Figure 1: Sine Function") # 第2个图形窗口 plt.figure(2) plt.plot(x, y2, color='b') plt.title("Figure 2: Cosine Function") # 显示所有窗口 plt.show() """ # index = 1 # for log in weight_wave_log: # m_dt = datetime.strftime(log.get('biz_time'), controller.DATETIME_FORMAT) # print(m_dt) # time_seq.append(m_dt) # index_seq.append(index) # weight_seq.append(log.get('biz_data')) # index += 1 # for rst in clean_rfid_result: # # 添加纵向分割线 # plt.axvline(x=rst[0], color='r', linestyle='--', label=rst[1], linewidth = 0.5) # # # 添加图例 # plt.legend() # # # result = models.get_logs(floor_scale_id ="2404049517", start_time ="2024-09-01 00:00:00", end_time ="2024-09-02 00:00:00") # result = models.get_ear_tag('9057970969300989') # print(result) # floor_scale_id, start_time, end_time, max_weight, max_weight_time, max_frq_ear_tag, max_frq_rfid, time_std # weight_wave = ('floor_scale_id', '2024-12-01 00:00:00', # '2024-12-07 00:00:00', # 100.00, '2024-12-06 19:00:00', 'max_frq_ear_tag', # 'max_frq_rfid', 1) # parent_id = models.add_weight_wave(False, weight_wave, False) # rfid_frq = [] # for i in range(1, 11): # rfid_frq.append( # (str(parent_id), str(i), 'rfid' + str(i), i, 1)) # models.add_rfid_frq(True, rfid_frq,True) # # print('parent_id:'+str(parent_id)) # # models.delete_weight_wave(False) # models.delete_rfid_frq(True) # controller.init_data() # controller.clean_data()