import sys, random import os import pygame import time from pygame.locals import * import nouha_recv as bwr from socket import socket, AF_INET, SOCK_DGRAM import TaskManager #”重要” offLineModeをFalseにすると脳波データの通信が行われるため、UDP通信が起動していない場合にFalseにするとバグ発生の可能性あり offLineMode = False WIDTH = 3200 #ウィンドウの横サイズ HEIGHT = 1800 #ウィンドウの縦サイズ TIME = 30000 #タスクの継続時間(ミリ秒) BREAK_TIME = 15000 #タスク間の休憩時間(ミリ秒) LONG_BREAK_TIME = 120000 #長めの休憩時間(ミリ秒) LONG_BREAK_INTERVAL = 5 #何タスクごとに長めの休憩を取るか PREP_TIME = 10000 #各タスクの「慣れ」時間(ミリ秒) TASK_COUNT = 10 #右、左、ニュートラルそれぞれのタスク数(デフォルトの場合それぞれ5回ずつタスクを実施) #UDPの準備 HOST = '' PORT = 8001 #サウンドファイルの指定 SOUND_DIR = "sounds" BLINK_INTERVAL = 1000 white = (255, 255, 255) black = (0, 0, 0) screen_type = ["sound", "screen", "task"] text_type = {"sound":"音声再生・脳波測定中…", "screen":"画面指示・脳波測定中…", "task":"タスク指示・脳波測定中…"} mind_type = ["right", "left", "neutral"] musics = ["001", "002", "003", "420"] position_rect = (0, 0, 0, 0) position_circle = (0, 0) get_tick_time = [0, 0] #[before, now] # 指示内容を定義 instructions = { "right": [ "右手を想像して箸で食べ物を掴んでください。", "右手のボタンを押してください。", "右手のレバーを操作してください。", ], "left": [ "左手を想像して箸で食べ物を掴んでください。", "左手のボタンを押してください。", "左手のレバーを操作してください。", ], "neutral": [ "目を閉じてリラックスしてください。", "深呼吸をしてください。", "座禅を組んで瞑想してください。" ] } #アプリの初期化 def initialize_pygame(): try: pygame.init() screen = pygame.display.set_mode((WIDTH, HEIGHT)) get_tick_time = [pygame.time.get_ticks(), pygame.time.get_ticks()] return screen except pygame.error as e: print(f"Pygameの初期化エラー: \n{e}") sys.exit(1) #音声・音楽の再生 def load_and_play_sound(filename): try: pygame.mixer.music.load(filename) pygame.mixer.music.play() except pygame.error as e: print(f"音楽ファイル{filename}の読み込みエラー: \n{e}") return False return True #脳波データの受け取りと処理 def send_nouhadata(s, choice, mind, process): print_text = text_type[choice] print("タスク経過時間:", str(get_tick_time[1] - get_tick_time[0]), print_text) if offLineMode == False and get_tick_time[1] - get_tick_time[0] >= PREP_TIME: data, address = s.recvfrom(1024) process.Receive_BrainWave(nouha=data, key=mind, address=address) get_tick_time[1] = pygame.time.get_ticks() #アプリの終了時の処理 def check_exit(s, choice, process): for event in pygame.event.get(): if event.type == QUIT: try: if not offLineMode: send_nouhadata(s, choice, mind="quit", process=process) pygame.quit() s.close() sys.exit() except Exception as e: print(f"終了処理中にエラー発生: \n{e}") sys.exit(1) return False #メイン def main(): try: s = socket(AF_INET, SOCK_DGRAM) try: s.bind((HOST, PORT)) except OSError as e: print(f"ポート{PORT}のバインドに失敗。ポートが既に開かれている可能性があります。: \n{e}") return screen = initialize_pygame() process = bwr.BrainWave_Receive() task_manager = TaskManager.TaskManager(TASK_COUNT) while True: try: get_tick_time[0] = pygame.time.get_ticks() mind = task_manager.get_next_type("mind") if mind is None: print("すべてのタスクが完了しました。ウィンドウを閉じて終了してください。") while True: if check_exit(s, choice, process): break choice = task_manager.get_next_type("task") if choice != "task" and mind == "neutral": task_manager.sub_counts(mind, choice, 1) continue if choice == "screen" and mind != "neutral": try: load_and_play_sound(os.path.join(SOUND_DIR, "画面を注視してください.wav")) last_blink = pygame.time.get_ticks() is_visible = True screen.fill(black) #円の描画 if mind == "right": position_rect = (WIDTH / 2, 0, WIDTH, HEIGHT) position_circle = (WIDTH * 3 / 4, HEIGHT / 2) elif mind == "left": position_rect = (0, 0, WIDTH / 2, HEIGHT) position_circle = (WIDTH / 4, HEIGHT / 2) while get_tick_time[1] - get_tick_time[0] <= TIME: current_time = pygame.time.get_ticks() if current_time - last_blink >= BLINK_INTERVAL: is_visible = not is_visible last_blink = current_time if is_visible: screen.fill(black) screen.fill((255, 255, 0), position_rect) pygame.draw.circle(screen, (255, 0, 255), position_circle, 150) else: screen.fill(black) pygame.display.update() send_nouhadata(s, choice, mind, process) if check_exit(s, choice, process): break screen.fill(black) except pygame.error as e: print(f"画面描画エラー: \n{e}") elif choice == "sound" and mind != "neutral": screen.fill(black) music_choice = random.choice(musics) filename = os.path.join(SOUND_DIR, music_choice+"_"+mind+".mp3") if not load_and_play_sound(filename): continue while pygame.mixer.music.get_busy(): send_nouhadata(s, choice, mind, process) if check_exit(s, choice, process): break screen.fill(black) elif choice == "task": screen.fill(black) instruction = random.choice(instructions[mind]) filename = os.path.join(SOUND_DIR, mind+"_"+instruction+".wav") load_and_play_sound(filename) print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {instruction}") while get_tick_time[1] - get_tick_time[0] <= TIME: send_nouhadata(s, choice, mind, process) if check_exit(s, choice, process): break load_and_play_sound(filename = os.path.join(SOUND_DIR, "タスクを終了してください.wav")) screen.fill(black) #画面の更新 pygame.display.update() if task_manager.get_sum_count() % LONG_BREAK_INTERVAL == 0: #タスク間の休憩(長め) get_tick_time[0] = pygame.time.get_ticks() print(f"現在の実行回数: {task_manager.get_counts()}\nタスク実行回数: {task_manager.get_taskCounts()}\n2分間休憩してください。") while get_tick_time[1] - get_tick_time[0] <= LONG_BREAK_TIME: if check_exit(s, choice, process): break get_tick_time[1] = pygame.time.get_ticks() else: #タスク間の休憩 get_tick_time[0] = pygame.time.get_ticks() print(f"現在の実行回数: {task_manager.get_counts()}\nタスク実行回数: {task_manager.get_taskCounts()}") while get_tick_time[1] - get_tick_time[0] <= BREAK_TIME: if check_exit(s, choice, process): break get_tick_time[1] = pygame.time.get_ticks() except SystemExit: pygame.quit() s.close() sys.exit() except Exception as e: print(f"予期せぬエラー: \n{e}") continue except Exception as e: print(f"重大なエラー: \n{e}") finally: try: pygame.quit() s.close() except Exception as e: print(f"終了処理中にエラーが発生しました:\n{e}") sys.exit(1) #直接起動時の処理 if __name__ == "__main__": main() # process = bwr.BrainWave_Receive #インスタンス作成 # while True: # timeClock.tick(250) # mind = random.choice(mind_type) # choice = random.choice(screen_type) # if choice == "screen" and mind != "neutral": # screen.fill(black) # #円の描画 # if mind == "right": # position_rect = (width / 2, 0, width, height) # position_circle = (width * 3 / 4, height / 2) # elif mind == "left": # position_rect = (0, 0, width / 2, height) # position_circle = (width / 4, height / 2) # screen.fill((255, 255, 0), position_rect) # pygame.draw.circle(screen, (255, 0, 255), position_circle, 150) # pygame.display.update() # while timeClock.get_time() <= 60000: # print(str(timeClock.get_time())) # if offLineMode == False: # data, address = s.recvfrom(1024) # process.Receive_BrainWave(nouha=data, key=mind, address=address) # elif choice == "sound" and mind != "neutral": # screen.fill(black) # if mind == "right": # pygame.mixer.music.load(filename_right) # pygame.mixer.music.rewind() # elif mind == "left": # pygame.mixer.music.load(filename_left) # pygame.mixer.music.rewind() # while pygame.mixer.music.get_busy(): # print("Playing...") # if offLineMode == False: # data, address = s.recvfrom(1024) # process.Receive_BrainWave(nouha=data, key=mind, address=address) # elif choice == "task": # screen.fill(black) # instruction = random.choice(instructions[mind]) # filename = ".\\" + mind + "_" + instruction + ".wav" # pygame.mixer.music.load(filename) # pygame.mixer.music.play() # text = instruction # print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {instruction}") # while pygame.mixer.music.get_busy(): # print("Playing...") # while timeClock.get_time() <= 60000: # print(str(timeClock.get_time())) # if offLineMode == False: # data, address = s.recvfrom(1024) # process.Receive_BrainWave(nouha=data, key=mind, address=address) # #画面の更新 # pygame.display.update() # pygame.time.wait(10000) #10秒間クールダウン # #終了イベント # for event in pygame.event.get(): # if event.type == QUIT: # process.Receive_BrainWave(nouha=data, key=9, address=address) # pygame.quit() # sys.exit()