# This is a sample Python script. # Press ⌃R to execute it or replace it with your code. # Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings. import db_dao as dd import matplotlib.pyplot as plt import numpy as np import matplotlib.dates as mdates import pandas as pd from datetime import datetime,timedelta m_weight = [] m_datetime = [] m_start_time = '2024-09-09 06:50:00' m_end_time = '2024-09-09 07:10:00' # m_reader = '2405085412' # 客户看不到视频 m_weighbridge_reader = '2405085474' # 客户可以看到设备 # m_reader = '2404049517' # m_reader = '2405085489' m_rfid_reader = '0A26AE180665' # 对应地磅的 2405085474 date_format = "%Y-%m-%d %H:%M:%S" # Press the green button in the gutter to run the script. # 创建查询时间范围的序列 def create_timestamp_seq(): # 将起止时间转换为 datetime 对象 m_datetime_start_obj = datetime.strptime(m_start_time, date_format) m_datetime_end_obj = datetime.strptime(m_end_time, date_format) # 计算时间差并转为秒数 time_diff = m_datetime_end_obj - m_datetime_start_obj seconds = time_diff.total_seconds()+1 # 创建时间序列,单位为秒 start_time = pd.Timestamp(m_start_time) time_seconds_seq = pd.date_range(start=start_time, periods=int(seconds), freq='s') # 生成日期差秒数个时间点,每个时间点间隔1秒 return time_seconds_seq # 补充缺失的时间秒数 def time_data_suppl(timestamp,data): i = 1 sorted_data = [] for ts in timestamp: format_ts = ts.strftime(date_format) if format_ts not in data: sorted_data.append(0) else: sorted_data.append(float(data[format_ts])) # print(str(i)+' 存在数据:'+format_ts+' ------> '+data[format_ts]) i += 1 return sorted_data # 对相同时间的数据去重复,取体重最大值 def time_data_dedup(data): result_dict = {} for row in data: key = row[0].strftime(date_format) if key not in result_dict or float(row[1]) > float(result_dict[key]): result_dict[key] = float(row[1]) return result_dict # rfid数据去重 def rfid_data_dedup(data): # 保留顺序的去重方法 seen = set() unique_tuple_list = [] for item in data: if item not in seen: unique_tuple_list.append(item) seen.add(item) return unique_tuple_list if __name__ == '__main__': timestamp_seq = create_timestamp_seq() mysql_weight_result = dd.get_cattle_weight_by_datetime(m_start_time, m_end_time, m_weighbridge_reader) mysql_rfid_result = dd.get_rfid_by_datetime(m_start_time, m_end_time, m_rfid_reader) dd.close() clean_weight_result = time_data_dedup(mysql_weight_result) clean_rfid_result = rfid_data_dedup(mysql_rfid_result) final_weight_result = time_data_suppl(timestamp_seq, clean_weight_result) plt.figure(figsize=(24, 4)) plt.plot(timestamp_seq, final_weight_result, color='blue', linestyle='-', linewidth=1) for rst in clean_rfid_result: # 添加纵向分割线 plt.axvline(x=rst[0], color='r', linestyle='--', label=rst[1], linewidth = 0.5) # 添加图例 plt.legend() plt.xlabel('datetime') plt.ylabel('weight(kg)') plt.title('reader: ' + m_weighbridge_reader + '\n' + m_start_time + ' ---> ' + m_end_time) plt.show()