import numpy as np
import pandas as pd
from scipy.spatial.distance import euclidean
# 시계열 데이터를 기준 길이로 확장하는 함수 (단순 확장 + 마지막 N개 평균으로 패딩)
def extend_series_with_tail_avg(series, target_length, tail_window=10):
# 유효 데이터: 전체를 그대로 사용
truncated = series.copy()
current_len = len(truncated)
# 마지막 N개 평균 구하기
if current_len >= tail_window:
avg_tail = np.mean(truncated[-tail_window:])
else:
avg_tail = np.mean(truncated)
# 부족한 길이만큼 평균으로 채우기
if current_len < target_length:
pad_len = target_length - current_len
padding = np.full(pad_len, avg_tail)
extended = np.concatenate([truncated, padding])
else:
extended = truncated[:target_length] # 너무 길 경우 자르기
return extended
# 두 센서 데이터 간 거리 계산 함수
def calculate_distance(series1, series2, target_length=100):
interp1 = extend_series_with_tail_avg(series1, target_length)
interp2 = extend_series_with_tail_avg(series2, target_length)
return euclidean(interp1, interp2)
# 예제: 다양한 길이의 데이터 (0도 유효 데이터로 취급)
sensor_data = {
'sensor_A': np.array([10, 15, 13, 11, 9, 6, 3, 0, 0, 0]),
'sensor_B': np.array([8, 12, 14, 13, 10, 8, 6, 4, 2, 1]),
'sensor_C': np.array([5, 7, 6, 0, 0, 0, 0, 0, 0, 0])
}
# 기준 길이
target_len = 50
# 센서 간 거리 계산
distance_matrix = pd.DataFrame(index=sensor_data.keys(), columns=sensor_data.keys())
for name1 in sensor_data:
for name2 in sensor_data:
dist = calculate_distance(sensor_data[name1], sensor_data[name2], target_len)
distance_matrix.loc[name1, name2] = dist
print("센서 간 거리 행렬:")
print(distance_matrix)
반응형
반응형