- import numpy as np
- from joblib import load
- import os
- # from __future__ import unicode_literals, print_function
- from socket import socket, AF_INET, SOCK_DGRAM
- from pythonosc import osc_message
- import time
-
- HOST = ''
- PORT = 8001
- class EEGPredictor:
- def __init__(self, model_dir='trained_model'):
- self.model, self.scaler = self._load_model(model_dir)
- self.class_names = {0: 'neutral', 1: 'right', 2: 'left'}
- self.feature_names = [
- 'alpha', 'beta', 'theta', 'delta', 'gamma',
- 'alpha_beta_ratio', 'theta_beta_ratio', 'alpha_theta_ratio',
- 'alpha_rel', 'beta_rel', 'theta_rel', 'delta_rel', 'gamma_rel',
- 'alpha_log', 'beta_log', 'theta_log', 'delta_log', 'gamma_log'
- ]
-
- def _load_model(self, model_dir):
- model_path = os.path.join(model_dir, 'rf_model.joblib')
- scaler_path = os.path.join(model_dir, 'scaler.joblib')
- return load(model_path), load(scaler_path)
-
- def _create_features(self, basic_features):
- """基本的な脳波データから追加の特徴量を生成"""
- features = {}
-
- # 基本特徴量
- alpha, beta, theta, delta, gamma = basic_features
- features.update({
- 'alpha': alpha, 'beta': beta, 'theta': theta,
- 'delta': delta, 'gamma': gamma
- })
-
- # 比率の計算
- features['alpha_beta_ratio'] = alpha / beta if beta != 0 else 0
- features['theta_beta_ratio'] = theta / beta if beta != 0 else 0
- features['alpha_theta_ratio'] = alpha / theta if theta != 0 else 0
-
- # 相対パワーの計算
- total_power = sum(basic_features)
- if total_power != 0:
- features.update({
- 'alpha_rel': alpha / total_power,
- 'beta_rel': beta / total_power,
- 'theta_rel': theta / total_power,
- 'delta_rel': delta / total_power,
- 'gamma_rel': gamma / total_power
- })
- else:
- features.update({
- 'alpha_rel': 0, 'beta_rel': 0, 'theta_rel': 0,
- 'delta_rel': 0, 'gamma_rel': 0
- })
-
- # 対数変換
- features.update({
- 'alpha_log': np.log1p(alpha) if alpha > 0 else 0,
- 'beta_log': np.log1p(beta) if beta > 0 else 0,
- 'theta_log': np.log1p(theta) if theta > 0 else 0,
- 'delta_log': np.log1p(delta) if delta > 0 else 0,
- 'gamma_log': np.log1p(gamma) if gamma > 0 else 0
- })
-
- # 特徴量を正しい順序で並べ替え
- return [features[name] for name in self.feature_names]
-
- def predict(self, eeg_data):
- """脳波データから予測を行う"""
- # 特徴量の生成
- features = self._create_features(eeg_data)
- features = np.array(features).reshape(1, -1)
-
- # スケーリングと予測
- scaled_data = self.scaler.transform(features)
- prediction = self.model.predict(scaled_data)[0]
- probabilities = self.model.predict_proba(scaled_data)[0]
-
- # 結果の整形
- result = {
- 'predicted_class': self.class_names[prediction],
- 'confidence': float(probabilities[prediction]),
- 'probabilities': {
- self.class_names[i]: float(prob)
- for i, prob in enumerate(probabilities)
- }
- }
-
- return result
- def process_eeg_data(raw_eeg_data):
- """
- 脳波データを処理して予測を行う
-
- Parameters:
- raw_eeg_data: [alpha, beta, theta, delta, gamma]の形式の脳波データ
- """
- predictor = EEGPredictor()
- result = predictor.predict(raw_eeg_data)
-
- print(f"Predicted class: {result['predicted_class']}")
- print(f"Confidence: {result['confidence']:.2f}")
- print("Class probabilities:")
- for class_name, prob in result['probabilities'].items():
- print(f" {class_name}: {prob:.2f}")
-
- return result
-
- def Convert_BrainWave(data):
- msg = osc_message.OscMessage(data)
- #print(msg.params)
- types = msg.address
- arguments = []
- if types == "/Attention":
- arguments.append("Attention")
- arguments.append(float(msg.params[0]))
-
- elif types == "/Meditation":
- arguments.append("Meditation")
- arguments.append(float(msg.params[0]))
-
- elif types == "/BandPower":
- arguments.append("BandPower")
- arguments += list(map(float, msg.params[0].split(";")))
- #print(map(float, msg.params[0].split(";")))
- return arguments
- s = socket(AF_INET, SOCK_DGRAM)
- s.bind((HOST, PORT))
- while True:
- # 実際の脳波データをここに入れます
- print("受信待ち")
- data, address = s.recvfrom(1024)
- received_data = Convert_BrainWave(data=data)
- received_data = received_data[1:]
- print(received_data)
- result = process_eeg_data(received_data)
- time.sleep(0.5)