fork(4) download
  1. import csv
  2. import os
  3. from datetime import datetime
  4.  
  5. def calculate_device_usage(input_file, output_file):
  6. """
  7. 统计智能设备使用时间
  8. :param input_file: 输入CSV文件路径
  9. :param output_file: 输出CSV文件路径
  10. """
  11. # 检查输入文件是否存在
  12. if not os.path.exists(input_file):
  13. raise FileNotFoundError(f"输入文件不存在: {input_file}")
  14.  
  15. # 读取并处理数据
  16. device_data = {}
  17. with open(input_file, 'r', encoding='utf-8') as f:
  18. reader = csv.DictReader(f)
  19. for row_num, row in enumerate(reader, 1):
  20. try:
  21. device_id = row['device_id']
  22. start = datetime.fromisoformat(row['start_time'])
  23. end = datetime.fromisoformat(row['end_time']) if row['end_time'] else datetime.now()
  24.  
  25. if device_id not in device_data:
  26. device_data[device_id] = []
  27. device_data[device_id].append((start, end))
  28. except (KeyError, ValueError) as e:
  29. print(f"警告: 第 {row_num} 行数据格式错误 - {str(e)}")
  30. continue
  31.  
  32. # 如果没有找到有效数据
  33. if not device_data:
  34. print("错误: 输入文件中没有找到有效数据")
  35. return
  36.  
  37. # 计算每个设备的总使用时间(秒)
  38. results = []
  39. for device_id, periods in device_data.items():
  40. # 按开始时间排序
  41. periods.sort()
  42.  
  43. # 合并重叠时间段
  44. merged = []
  45. current_start, current_end = periods[0]
  46. for start, end in periods[1:]:
  47. if start <= current_end:
  48. current_end = max(current_end, end)
  49. else:
  50. merged.append((current_start, current_end))
  51. current_start, current_end = start, end
  52. merged.append((current_start, current_end))
  53.  
  54. # 计算总时间(秒)
  55. total_seconds = sum((end - start).total_seconds() for start, end in merged)
  56.  
  57. # 转换为可读格式
  58. hours, remainder = divmod(total_seconds, 3600)
  59. minutes, seconds = divmod(remainder, 60)
  60. readable = f"{int(hours)}小时{int(minutes)}分{int(seconds)}秒"
  61.  
  62. results.append({
  63. 'device_id': device_id,
  64. 'total_seconds': total_seconds,
  65. 'readable_time': readable
  66. })
  67.  
  68. # 写入结果
  69. with open(output_file, 'w', encoding='utf-8', newline='') as f:
  70. writer = csv.DictWriter(f, fieldnames=['device_id', 'total_seconds', 'readable_time'])
  71. writer.writeheader()
  72. writer.writerows(results)
  73.  
  74. print(f"统计完成!结果已保存至: {output_file}")
  75. print(f"处理了 {len(results)} 台设备的使用记录")
  76.  
  77. # 使用示例
  78. if __name__ == "__main__":
  79. try:
  80. # 替换为实际文件路径
  81. input_file = "device_logs.csv" # 输入文件名
  82. output_file = "usage_report.csv" # 输出文件名
  83.  
  84. # 检查输入文件是否存在
  85. if not os.path.exists(input_file):
  86. # 创建示例文件(仅用于测试)
  87. print(f"创建示例文件: {input_file}")
  88. sample_data = [
  89. ['device_id', 'start_time', 'end_time'],
  90. ['D001', '2023-06-01 08:30:00', '2023-06-01 10:15:00'],
  91. ['D001', '2023-06-01 14:00:00', '2023-06-01 16:45:00'],
  92. ['D002', '2023-06-01 09:00:00', '2023-06-01 11:30:00'],
  93. ['D001', '2023-06-02 13:30:00', '2023-06-02 15:00:00'],
  94. ['D003', '2023-06-02 10:00:00', ''], # 未结束的使用记录
  95. ]
  96.  
  97. with open(input_file, 'w', encoding='utf-8', newline='') as f:
  98. writer = csv.writer(f)
  99. writer.writerows(sample_data)
  100. print("已创建示例数据文件,程序将使用此文件运行")
  101.  
  102. # 执行统计
  103. calculate_device_usage(input_file, output_file)
  104.  
  105. except Exception as e:
  106. print(f"程序运行时出错: {str(e)}")
  107. print("请检查输入文件格式是否正确")
  108. print("文件格式要求:")
  109. print("1. CSV格式,包含表头: device_id,start_time,end_time")
  110. print("2. 时间格式: YYYY-MM-DD HH:MM:SS")
  111. print("3. 空结束时间表示设备仍在运行")
Success #stdin #stdout 0.04s 10096KB
stdin
Standard input is empty
stdout
创建示例文件: device_logs.csv
程序运行时出错: [Errno 13] Permission denied: 'device_logs.csv'
请检查输入文件格式是否正确
文件格式要求:
1. CSV格式,包含表头: device_id,start_time,end_time
2. 时间格式: YYYY-MM-DD HH:MM:SS
3. 空结束时间表示设备仍在运行