Newer
Older
BrainWave-Task-Visualizer / preprocessing.py
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy import signal
from scipy.fft import fft, fftfreq
from typing import List, Dict, Tuple, Union, Any
import logging

def validate_data(df: pd.DataFrame) -> Tuple[bool, str, Tuple[List[int], List[str]]]:
    """
    データの妥当性をチェック
    
    Parameters:
    df: DataFrame 入力データ
    
    Returns:
    bool: データが有効かどうか
    str: エラーメッセージ(エラーがある場合)
    tuple: (クリーニングが必要な行のインデックス, クリーニングの理由)
    """
    expected_columns = ['alpha', 'beta', 'theta', 'delta', 'gamma', 'key']
    valid_keys = [0, 1, 2]  # ニュートラル、右、左
    rows_to_clean = []
    cleaning_reasons = []
    
    # カラムの確認
    if not all(col in df.columns for col in expected_columns):
        missing_cols = [col for col in expected_columns if col not in df.columns]
        return False, f"Missing columns: {missing_cols}", ([], [])
        
    # 数値データの確認
    numerical_cols = ['alpha', 'beta', 'theta', 'delta', 'gamma']
    for col in numerical_cols:
        if not np.issubdtype(df[col].dtype, np.number):
            return False, f"Column {col} is not numeric", ([], [])
        
        # 負の値のチェック
        negative_mask = df[col] < 0
        if negative_mask.any():
            negative_rows = df[negative_mask].index.tolist()
            rows_to_clean.extend(negative_rows)
            cleaning_reasons.extend([f'Negative value in {col}'] * len(negative_rows))
        
        # NaNのチェック
        nan_mask = df[col].isna()
        if nan_mask.any():
            nan_rows = df[nan_mask].index.tolist()
            rows_to_clean.extend(nan_rows)
            cleaning_reasons.extend([f'NaN in {col}'] * len(nan_rows))

    # keyの値の確認
    key_nan_mask = df['key'].isna()
    if key_nan_mask.any():
        key_nan_rows = df[key_nan_mask].index.tolist()
        rows_to_clean.extend(key_nan_rows)
        cleaning_reasons.extend(['NaN in key'] * len(key_nan_rows))

    valid_rows = ~df['key'].isna()
    invalid_keys = [k for k in df[valid_rows]['key'].unique() if k not in valid_keys]
    if invalid_keys:
        invalid_key_mask = df['key'].isin(invalid_keys)
        invalid_key_rows = df[invalid_key_mask].index.tolist()
        rows_to_clean.extend(invalid_key_rows)
        cleaning_reasons.extend(['Invalid key value'] * len(invalid_key_rows))
    
    # 重複を除去
    if rows_to_clean:
        unique_indices = []
        unique_reasons = []
        seen = set()
        for idx, reason in zip(rows_to_clean, cleaning_reasons):
            if idx not in seen:
                unique_indices.append(idx)
                unique_reasons.append(reason)
                seen.add(idx)
        return True, "Data needs cleaning", (sorted(unique_indices), 
                                          [unique_reasons[unique_indices.index(i)] for i in sorted(unique_indices)])
    
    return True, "Data is valid", ([], [])

def load_task_segments(file_path: str) -> List[pd.DataFrame]:
    """
    空行で区切られたタスクデータを読み込む
    
    Parameters:
    file_path: CSVファイルのパス
    
    Returns:
    List[DataFrame]: タスクセグメントのリスト
    """
    df = pd.read_csv(file_path)
    empty_rows = df.index[df.isnull().all(axis=1)].tolist()
    
    task_segments = []
    start_idx = 0
    
    for end_idx in empty_rows:
        if end_idx > start_idx:
            segment = df.iloc[start_idx:end_idx].reset_index(drop=True)
            if not segment.empty:
                task_segments.append(segment)
        start_idx = end_idx + 1
    
    if start_idx < len(df):
        segment = df.iloc[start_idx:].reset_index(drop=True)
        if not segment.empty:
            task_segments.append(segment)
    
    return task_segments

def compute_spectral_features(data: np.ndarray, fs: float = 100.0) -> Dict[str, float]:
    """スペクトル特徴量の計算"""
    features = {}
    
    # FFTの計算
    n = len(data)
    yf = fft(data)
    xf = fftfreq(n, 1/fs)[:n//2]
    power_spectrum = 2.0/n * np.abs(yf[0:n//2])
    
    # スペクトル特徴量
    features['spectral_mean'] = np.mean(power_spectrum)
    features['spectral_std'] = np.std(power_spectrum)
    features['spectral_skew'] = pd.Series(power_spectrum).skew()
    features['spectral_kurtosis'] = pd.Series(power_spectrum).kurtosis()
    
    # ピーク周波数
    peak_freq = xf[np.argmax(power_spectrum)]
    features['peak_frequency'] = peak_freq
    
    return features

def create_features(segment: pd.DataFrame) -> Dict[str, float]:
    """1タスク分のデータから特徴量を生成"""
    features = {}
    basic_features = ['alpha', 'beta', 'theta', 'delta', 'gamma']
    
    # 基本統計量
    for feature in basic_features:
        # 時系列データの統計量
        features[f'{feature}_mean'] = segment[feature].mean()
        features[f'{feature}_std'] = segment[feature].std()
        features[f'{feature}_var'] = segment[feature].var()
        features[f'{feature}_skew'] = segment[feature].skew()
        features[f'{feature}_kurtosis'] = segment[feature].kurtosis()
        
        # トレンドと変化率
        x = np.arange(len(segment))
        features[f'{feature}_trend'] = np.polyfit(x, segment[feature].values, 1)[0]
        features[f'{feature}_max_diff'] = segment[feature].diff().max()
        features[f'{feature}_min_diff'] = segment[feature].diff().min()
        
        # スペクトル特徴量
        spectral_features = compute_spectral_features(segment[feature].values)
        for spec_name, spec_value in spectral_features.items():
            features[f'{feature}_{spec_name}'] = spec_value
    
    # 波形間の比率
    ratios = {
        'alpha_beta_ratio': segment['alpha'] / segment['beta'],
        'theta_beta_ratio': segment['theta'] / segment['beta'],
        'alpha_theta_ratio': segment['alpha'] / segment['theta']
    }
    
    for ratio_name, ratio_data in ratios.items():
        features[f'{ratio_name}_mean'] = ratio_data.mean()
        features[f'{ratio_name}_std'] = ratio_data.std()
    
    # パワーの合計と相対パワー
    total_power = segment[basic_features].sum(axis=1)
    for feature in basic_features:
        rel_power = segment[feature] / total_power
        features[f'{feature}_rel_mean'] = rel_power.mean()
        features[f'{feature}_rel_std'] = rel_power.std()
    
    return features

def preprocess_eeg_data(file_path: str, test_size: float = 0.2, 
                       random_state: int = 42, validate: bool = True,
                       clean_data: bool = True) -> Dict[str, Any]:
    """
    EEGデータの前処理パイプライン
    
    Parameters:
    file_path: str, 入力CSVファイルのパス
    test_size: float, テストデータの割合
    random_state: int, 乱数シード
    validate: bool, データの妥当性チェックを行うかどうか
    clean_data: bool, 問題のあるデータを自動的に除去するかどうか
    
    Returns:
    Dict: 前処理済みのデータセット
    """
    # データの読み込みと妥当性チェック
    df = pd.read_csv(file_path)
    if validate:
        is_valid, message, (rows_to_clean, cleaning_reasons) = validate_data(df)
        
        if not is_valid:
            raise ValueError(f"Invalid data: {message}")
            
        if rows_to_clean and clean_data:
            logging.warning(f"Found {len(rows_to_clean)} rows that need cleaning:")
            for idx, (row, reason) in enumerate(zip(rows_to_clean, cleaning_reasons)):
                logging.warning(f"Row {row}: {reason}")
            
            # 問題のある行を削除
            df = df.drop(rows_to_clean).reset_index(drop=True)
            logging.info(f"Removed {len(rows_to_clean)} problematic rows. Remaining rows: {len(df)}")
            
        elif rows_to_clean and not clean_data:
            raise ValueError(f"Data contains {len(rows_to_clean)} problematic rows. Set clean_data=True to automatically remove them.")
    
    # タスクセグメントの読み込み
    segments = load_task_segments(file_path)
    
    X = []
    y = []
    
    # 各セグメントの処理
    for segment in segments:
        if not segment.empty:
            features = create_features(segment)
            label = segment['key'].iloc[0]  # セグメント内の最初のラベルを使用
            
            X.append(features)
            y.append(label)
    
    if X and y:  # データが存在する場合のみ処理
        X = pd.DataFrame(X)
        y = np.array(y)
        
        # 特徴量の前処理
        X = X.replace([np.inf, -np.inf], np.nan)
        X = X.fillna(X.mean())
        
        # データの分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state, stratify=y
        )
        
        # スケーリング
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        return {
            'X_train': X_train_scaled,
            'X_test': X_test_scaled,
            'y_train': y_train,
            'y_test': y_test,
            'feature_names': X.columns,
            'scaler': scaler
        }
    
    return None  # データが存在しない場合