Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import struct
- import logging
- import numpy as np
- import mne
- from pathlib import Path
- # Optional: for EDF export
- try:
- import pyedflib
- HAVE_PYEDFLIB = True
- except ImportError:
- HAVE_PYEDFLIB = False
- def setup_logging(logfile="ng_inspect.log"):
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler(logfile, mode='w', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- def log_data_stats(data, context=""):
- """Log shape, min/max/mean/std, all-zeros/NaNs for numpy array or MNE Raw."""
- if isinstance(data, mne.io.BaseRaw):
- arr = data.get_data()
- ch_names = data.ch_names
- sfreq = data.info.get('sfreq', None)
- logging.info(f"[DATA STATS] {context} Raw shape: {arr.shape}, sfreq: {sfreq}")
- logging.info(f"[DATA STATS] {context} Channels: {ch_names}")
- elif isinstance(data, np.ndarray):
- arr = data
- logging.info(f"[DATA STATS] {context} Array shape: {arr.shape}")
- else:
- logging.warning(f"[DATA STATS] {context} Unknown data type: {type(data)}")
- return
- logging.info(f"[DATA STATS] {context} min={np.nanmin(arr):.4f}, max={np.nanmax(arr):.4f}, mean={np.nanmean(arr):.4f}, std={np.nanstd(arr):.4f}")
- if np.all(arr == 0):
- logging.warning(f"[DATA STATS] {context} All values are zero!")
- if np.isnan(arr).any():
- logging.warning(f"[DATA STATS] {context} Contains NaNs!")
- def parse_neuroguide_ng(file_path, n_channels=19, sample_dtype='float32', header_size=2048):
- dtype_size = struct.calcsize('f') if sample_dtype == 'float32' else 2
- with open(file_path, 'rb') as f:
- header_bytes = f.read(header_size)
- raw_bytes = f.read()
- header_text = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in header_bytes)
- metadata = {}
- # --- Auto-detect sample rate ---
- import re
- sfreq = None
- sr_matches = re.findall(r'(\d{2,4})\s*Hz', header_text)
- if sr_matches:
- sfreq = int(sr_matches[0])
- metadata['sfreq'] = sfreq
- else:
- sr_matches = re.findall(r'sfreq\\s*[=:]\\s*(\\d{2,4})', header_text, re.IGNORECASE)
- if sr_matches:
- sfreq = int(sr_matches[0])
- metadata['sfreq'] = sfreq
- else:
- sr_matches = re.findall(r'Sampling Rate\\s*[:=]\\s*(\\d{2,4})', header_text, re.IGNORECASE)
- if sr_matches:
- sfreq = int(sr_matches[0])
- metadata['sfreq'] = sfreq
- # --- Auto-detect channel names ---
- ch_names = None
- ch_list_match = re.search(r'Channels?\\s*[:=]\\s*([A-Za-z0-9_,\\- ]+)', header_text)
- if ch_list_match:
- ch_names_raw = ch_list_match.group(1)
- ch_names = [ch.strip() for ch in re.split(r'[ ,]+', ch_names_raw) if ch.strip()]
- metadata['ch_names'] = ch_names
- elif any(x in header_text for x in ['Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'T3', 'C3', 'Cz', 'C4', 'T4', 'T5', 'P3', 'Pz', 'P4', 'T6', 'O1', 'O2']):
- ch_names = [x for x in ['Fp1','Fp2','F7','F3','Fz','F4','F8','T3','C3','Cz','C4','T4','T5','P3','Pz','P4','T6','O1','O2'] if x in header_text]
- if len(ch_names) >= 8:
- metadata['ch_names'] = ch_names
- total_samples = len(raw_bytes) // (dtype_size * n_channels)
- signals = [[] for _ in range(n_channels)]
- for i in range(total_samples):
- offset = i * n_channels * dtype_size
- for ch in range(n_channels):
- val_bytes = raw_bytes[offset + ch * dtype_size : offset + (ch + 1) * dtype_size]
- if len(val_bytes) == dtype_size:
- val = struct.unpack('<f', val_bytes)[0] if sample_dtype == 'float32' else struct.unpack('<h', val_bytes)[0]
- signals[ch].append(val)
- return {'metadata': metadata, 'signals': signals}
- def load_ng_as_raw(file_path, sfreq_guess=256, ch_names=None):
- """Try to load .ng as MNE RawArray, with robust fallback for dtype and n_channels."""
- logging.info(f"[NG] Attempting to load {file_path} as NeuroGuide .ng")
- tried = []
- for dtype in ['float32', 'int16']:
- for n_channels in [19, 21, 32]:
- try:
- parsed = parse_neuroguide_ng(file_path, n_channels=n_channels, sample_dtype=dtype)
- signals = np.array(parsed['signals'])
- meta = parsed.get('metadata', {})
- sfreq = meta.get('sfreq', sfreq_guess)
- ch_names_guess = ch_names or meta.get('ch_names')
- if not ch_names_guess:
- if n_channels == 19:
- ch_names_guess = ['Fp1','Fp2','F7','F3','Fz','F4','F8','T3','C3','Cz','C4','T4','T5','P3','Pz','P4','T6','O1','O2']
- else:
- ch_names_guess = [f'Ch{i+1}' for i in range(n_channels)]
- info = mne.create_info(ch_names=ch_names_guess, sfreq=sfreq, ch_types='eeg')
- raw = mne.io.RawArray(signals, info)
- logging.info(f"[NG] Loaded .ng as RawArray: dtype={dtype}, n_channels={n_channels}, shape={signals.shape}, sfreq={sfreq}")
- return raw
- except Exception as e:
- tried.append((dtype, n_channels, str(e)))
- logging.warning(f"[NG] Failed with dtype={dtype}, n_channels={n_channels}: {e}")
- raise RuntimeError(f"Could not load .ng file {file_path}. Tried: {tried}")
- def export_raw_to_edf(raw, out_path):
- """Export an MNE Raw object to EDF format using pyedflib."""
- if not HAVE_PYEDFLIB:
- raise ImportError("pyedflib is required for EDF export. Install with `pip install pyedflib`.")
- try:
- signals = raw.get_data()
- f = pyedflib.EdfWriter(out_path, raw.info['nchan'], file_type=pyedflib.FILETYPE_EDFPLUS)
- channel_info = []
- for ch in raw.info['chs']:
- channel_info.append({
- 'label': ch['ch_name'],
- 'dimension': 'uV',
- 'sample_rate': int(raw.info['sfreq']),
- 'physical_min': float(np.min(signals)),
- 'physical_max': float(np.max(signals)),
- 'digital_min': -32768,
- 'digital_max': 32767,
- 'transducer': '',
- 'prefilter': ''
- })
- f.setSignalHeaders(channel_info)
- f.writeSamples(signals)
- f.close()
- logging.info(f"[EDF] Exported to {out_path} using pyedflib.")
- except Exception as e:
- logging.error(f"[EDF] Failed to export to {out_path}: {e}")
- raise
- def inspect_ng_file(file_path):
- """Inspect a NeuroGuide .ng file: print/log header info, detected sample rate, channel names, data stats."""
- for dtype in ['float32', 'int16']:
- for n_channels in [19, 21, 32]:
- try:
- parsed = parse_neuroguide_ng(file_path, n_channels=n_channels, sample_dtype=dtype)
- meta = parsed.get('metadata', {})
- signals = parsed.get('signals', [])
- logging.info(f\"[NG INSPECT] File: {file_path} | dtype={dtype} | n_channels={n_channels}\")
- logging.info(f\"[NG INSPECT] Metadata: {meta}\")
- n_channels_found = len(signals)
- n_samples = len(signals[0]) if n_channels_found > 0 else 0
- logging.info(f\"[NG INSPECT] Data shape: ({n_channels_found}, {n_samples})\")
- for i, ch_data in enumerate(signals):
- arr = np.array(ch_data)
- logging.info(f\"[NG INSPECT] Channel {i}: min={np.min(arr)}, max={np.max(arr)}, mean={np.mean(arr)}, std={np.std(arr)}, first 5: {arr[:5]}\")
- if np.all(arr == 0):
- logging.warning(f\"[NG INSPECT] Channel {i} is all zeros!\")
- if np.isnan(arr).any():
- logging.warning(f\"[NG INSPECT] Channel {i} contains NaNs!\")
- if n_channels_found == 0 or n_samples == 0:
- logging.warning(f\"[NG INSPECT] No data found in file!\")
- return # Only print for first successful parse
- except Exception as e:
- logging.info(f\"[NG INSPECT] Failed with dtype={dtype}, n_channels={n_channels}: {e}\")
- if __name__ == \"__main__\":
- import argparse
- setup_logging()
- parser = argparse.ArgumentParser(description=\"NeuroGuide .ng Inspector/Converter\")
- parser.add_argument(\"ng_file\", help=\"Path to .ng file\")
- parser.add_argument(\"--to-edf\", dest=\"edf_path\", help=\"Convert to EDF at this path\")
- args = parser.parse_args()
- inspect_ng_file(args.ng_file)
- try:
- raw = load_ng_as_raw(args.ng_file)
- log_data_stats(raw, context=f\"Loaded NG {args.ng_file}\")
- if args.edf_path:
- export_raw_to_edf(raw, args.edf_path)
- logging.info(f\"[MAIN] Exported EDF to {args.edf_path}\")
- except Exception as e:
- logging.error(f\"[MAIN] Could not load or convert .ng file: {e}\")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement