hms_cattle_research/main.py
2024-09-19 20:55:26 +08:00

100 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import db_dao as dd
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.dates as mdates
import pandas as pd
from datetime import datetime,timedelta
m_weight = []
m_datetime = []
m_start_time = '2024-09-09 06:50:00'
m_end_time = '2024-09-09 07:10:00'
# m_reader = '2405085412' # 客户看不到视频
m_weighbridge_reader = '2405085474' # 客户可以看到设备
# m_reader = '2404049517'
# m_reader = '2405085489'
m_rfid_reader = '0A26AE180665' # 对应地磅的 2405085474
date_format = "%Y-%m-%d %H:%M:%S"
# Press the green button in the gutter to run the script.
# 创建查询时间范围的序列
def create_timestamp_seq():
# 将起止时间转换为 datetime 对象
m_datetime_start_obj = datetime.strptime(m_start_time, date_format)
m_datetime_end_obj = datetime.strptime(m_end_time, date_format)
# 计算时间差并转为秒数
time_diff = m_datetime_end_obj - m_datetime_start_obj
seconds = time_diff.total_seconds()+1
# 创建时间序列,单位为秒
start_time = pd.Timestamp(m_start_time)
time_seconds_seq = pd.date_range(start=start_time, periods=int(seconds), freq='s') # 生成日期差秒数个时间点每个时间点间隔1秒
return time_seconds_seq
# 补充缺失的时间秒数
def time_data_suppl(timestamp,data):
i = 1
sorted_data = []
for ts in timestamp:
format_ts = ts.strftime(date_format)
if format_ts not in data:
sorted_data.append(0)
else:
sorted_data.append(float(data[format_ts]))
# print(str(i)+' 存在数据:'+format_ts+' ------> '+data[format_ts])
i += 1
return sorted_data
# 对相同时间的数据去重复,取体重最大值
def time_data_dedup(data):
result_dict = {}
for row in data:
key = row[0].strftime(date_format)
if key not in result_dict or float(row[1]) > float(result_dict[key]):
result_dict[key] = float(row[1])
return result_dict
# rfid数据去重
def rfid_data_dedup(data):
# 保留顺序的去重方法
seen = set()
unique_tuple_list = []
for item in data:
if item not in seen:
unique_tuple_list.append(item)
seen.add(item)
return unique_tuple_list
if __name__ == '__main__':
timestamp_seq = create_timestamp_seq()
mysql_weight_result = dd.get_cattle_weight_by_datetime(m_start_time, m_end_time, m_weighbridge_reader)
mysql_rfid_result = dd.get_rfid_by_datetime(m_start_time, m_end_time, m_rfid_reader)
dd.close()
clean_weight_result = time_data_dedup(mysql_weight_result)
clean_rfid_result = rfid_data_dedup(mysql_rfid_result)
final_weight_result = time_data_suppl(timestamp_seq, clean_weight_result)
plt.figure(figsize=(24, 4))
plt.plot(timestamp_seq, final_weight_result, color='blue', linestyle='-', linewidth=1)
for rst in clean_rfid_result:
# 添加纵向分割线
plt.axvline(x=rst[0], color='r', linestyle='--', label=rst[1], linewidth = 0.5)
# 添加图例
plt.legend()
plt.xlabel('datetime')
plt.ylabel('weight(kg)')
plt.title('reader: ' + m_weighbridge_reader + '\n' + m_start_time + ' ---> ' + m_end_time)
plt.show()