Advertisement
El_Chaderino

.ng to .edf converter with cross checking

May 20th, 2025
142
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.98 KB | None | 0 0
  1. import os
  2. import struct
  3. import logging
  4. import numpy as np
  5. import mne
  6. from pathlib import Path
  7.  
  8. # Optional: for EDF export
  9. try:
  10.     import pyedflib
  11.     HAVE_PYEDFLIB = True
  12. except ImportError:
  13.     HAVE_PYEDFLIB = False
  14.  
  15. def setup_logging(logfile="ng_inspect.log"):
  16.     logging.basicConfig(
  17.         level=logging.INFO,
  18.         format='%(asctime)s - %(levelname)s - %(message)s',
  19.         handlers=[
  20.             logging.FileHandler(logfile, mode='w', encoding='utf-8'),
  21.             logging.StreamHandler()
  22.         ]
  23.     )
  24.  
  25. def log_data_stats(data, context=""):
  26.     """Log shape, min/max/mean/std, all-zeros/NaNs for numpy array or MNE Raw."""
  27.     if isinstance(data, mne.io.BaseRaw):
  28.         arr = data.get_data()
  29.         ch_names = data.ch_names
  30.         sfreq = data.info.get('sfreq', None)
  31.         logging.info(f"[DATA STATS] {context} Raw shape: {arr.shape}, sfreq: {sfreq}")
  32.         logging.info(f"[DATA STATS] {context} Channels: {ch_names}")
  33.     elif isinstance(data, np.ndarray):
  34.         arr = data
  35.         logging.info(f"[DATA STATS] {context} Array shape: {arr.shape}")
  36.     else:
  37.         logging.warning(f"[DATA STATS] {context} Unknown data type: {type(data)}")
  38.         return
  39.     logging.info(f"[DATA STATS] {context} min={np.nanmin(arr):.4f}, max={np.nanmax(arr):.4f}, mean={np.nanmean(arr):.4f}, std={np.nanstd(arr):.4f}")
  40.     if np.all(arr == 0):
  41.         logging.warning(f"[DATA STATS] {context} All values are zero!")
  42.     if np.isnan(arr).any():
  43.         logging.warning(f"[DATA STATS] {context} Contains NaNs!")
  44.  
  45. def parse_neuroguide_ng(file_path, n_channels=19, sample_dtype='float32', header_size=2048):
  46.     dtype_size = struct.calcsize('f') if sample_dtype == 'float32' else 2
  47.     with open(file_path, 'rb') as f:
  48.         header_bytes = f.read(header_size)
  49.         raw_bytes = f.read()
  50.     header_text = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in header_bytes)
  51.     metadata = {}
  52.     # --- Auto-detect sample rate ---
  53.     import re
  54.     sfreq = None
  55.     sr_matches = re.findall(r'(\d{2,4})\s*Hz', header_text)
  56.     if sr_matches:
  57.         sfreq = int(sr_matches[0])
  58.         metadata['sfreq'] = sfreq
  59.     else:
  60.         sr_matches = re.findall(r'sfreq\\s*[=:]\\s*(\\d{2,4})', header_text, re.IGNORECASE)
  61.         if sr_matches:
  62.             sfreq = int(sr_matches[0])
  63.             metadata['sfreq'] = sfreq
  64.         else:
  65.             sr_matches = re.findall(r'Sampling Rate\\s*[:=]\\s*(\\d{2,4})', header_text, re.IGNORECASE)
  66.             if sr_matches:
  67.                 sfreq = int(sr_matches[0])
  68.                 metadata['sfreq'] = sfreq
  69.     # --- Auto-detect channel names ---
  70.     ch_names = None
  71.     ch_list_match = re.search(r'Channels?\\s*[:=]\\s*([A-Za-z0-9_,\\- ]+)', header_text)
  72.     if ch_list_match:
  73.         ch_names_raw = ch_list_match.group(1)
  74.         ch_names = [ch.strip() for ch in re.split(r'[ ,]+', ch_names_raw) if ch.strip()]
  75.         metadata['ch_names'] = ch_names
  76.     elif any(x in header_text for x in ['Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'T3', 'C3', 'Cz', 'C4', 'T4', 'T5', 'P3', 'Pz', 'P4', 'T6', 'O1', 'O2']):
  77.         ch_names = [x for x in ['Fp1','Fp2','F7','F3','Fz','F4','F8','T3','C3','Cz','C4','T4','T5','P3','Pz','P4','T6','O1','O2'] if x in header_text]
  78.         if len(ch_names) >= 8:
  79.             metadata['ch_names'] = ch_names
  80.     total_samples = len(raw_bytes) // (dtype_size * n_channels)
  81.     signals = [[] for _ in range(n_channels)]
  82.     for i in range(total_samples):
  83.         offset = i * n_channels * dtype_size
  84.         for ch in range(n_channels):
  85.             val_bytes = raw_bytes[offset + ch * dtype_size : offset + (ch + 1) * dtype_size]
  86.             if len(val_bytes) == dtype_size:
  87.                 val = struct.unpack('<f', val_bytes)[0] if sample_dtype == 'float32' else struct.unpack('<h', val_bytes)[0]
  88.                 signals[ch].append(val)
  89.     return {'metadata': metadata, 'signals': signals}
  90.  
  91. def load_ng_as_raw(file_path, sfreq_guess=256, ch_names=None):
  92.     """Try to load .ng as MNE RawArray, with robust fallback for dtype and n_channels."""
  93.     logging.info(f"[NG] Attempting to load {file_path} as NeuroGuide .ng")
  94.     tried = []
  95.     for dtype in ['float32', 'int16']:
  96.         for n_channels in [19, 21, 32]:
  97.             try:
  98.                 parsed = parse_neuroguide_ng(file_path, n_channels=n_channels, sample_dtype=dtype)
  99.                 signals = np.array(parsed['signals'])
  100.                 meta = parsed.get('metadata', {})
  101.                 sfreq = meta.get('sfreq', sfreq_guess)
  102.                 ch_names_guess = ch_names or meta.get('ch_names')
  103.                 if not ch_names_guess:
  104.                     if n_channels == 19:
  105.                         ch_names_guess = ['Fp1','Fp2','F7','F3','Fz','F4','F8','T3','C3','Cz','C4','T4','T5','P3','Pz','P4','T6','O1','O2']
  106.                     else:
  107.                         ch_names_guess = [f'Ch{i+1}' for i in range(n_channels)]
  108.                 info = mne.create_info(ch_names=ch_names_guess, sfreq=sfreq, ch_types='eeg')
  109.                 raw = mne.io.RawArray(signals, info)
  110.                 logging.info(f"[NG] Loaded .ng as RawArray: dtype={dtype}, n_channels={n_channels}, shape={signals.shape}, sfreq={sfreq}")
  111.                 return raw
  112.             except Exception as e:
  113.                 tried.append((dtype, n_channels, str(e)))
  114.                 logging.warning(f"[NG] Failed with dtype={dtype}, n_channels={n_channels}: {e}")
  115.     raise RuntimeError(f"Could not load .ng file {file_path}. Tried: {tried}")
  116.  
  117. def export_raw_to_edf(raw, out_path):
  118.     """Export an MNE Raw object to EDF format using pyedflib."""
  119.     if not HAVE_PYEDFLIB:
  120.         raise ImportError("pyedflib is required for EDF export. Install with `pip install pyedflib`.")
  121.     try:
  122.         signals = raw.get_data()
  123.         f = pyedflib.EdfWriter(out_path, raw.info['nchan'], file_type=pyedflib.FILETYPE_EDFPLUS)
  124.         channel_info = []
  125.         for ch in raw.info['chs']:
  126.             channel_info.append({
  127.                 'label': ch['ch_name'],
  128.                 'dimension': 'uV',
  129.                 'sample_rate': int(raw.info['sfreq']),
  130.                 'physical_min': float(np.min(signals)),
  131.                 'physical_max': float(np.max(signals)),
  132.                 'digital_min': -32768,
  133.                 'digital_max': 32767,
  134.                 'transducer': '',
  135.                 'prefilter': ''
  136.             })
  137.         f.setSignalHeaders(channel_info)
  138.         f.writeSamples(signals)
  139.         f.close()
  140.         logging.info(f"[EDF] Exported to {out_path} using pyedflib.")
  141.     except Exception as e:
  142.         logging.error(f"[EDF] Failed to export to {out_path}: {e}")
  143.         raise
  144.  
  145. def inspect_ng_file(file_path):
  146.     """Inspect a NeuroGuide .ng file: print/log header info, detected sample rate, channel names, data stats."""
  147.     for dtype in ['float32', 'int16']:
  148.         for n_channels in [19, 21, 32]:
  149.             try:
  150.                 parsed = parse_neuroguide_ng(file_path, n_channels=n_channels, sample_dtype=dtype)
  151.                 meta = parsed.get('metadata', {})
  152.                 signals = parsed.get('signals', [])
  153.                 logging.info(f\"[NG INSPECT] File: {file_path} | dtype={dtype} | n_channels={n_channels}\")
  154.                logging.info(f\"[NG INSPECT] Metadata: {meta}\")
  155.                n_channels_found = len(signals)
  156.                n_samples = len(signals[0]) if n_channels_found > 0 else 0
  157.                logging.info(f\"[NG INSPECT] Data shape: ({n_channels_found}, {n_samples})\")
  158.                for i, ch_data in enumerate(signals):
  159.                    arr = np.array(ch_data)
  160.                    logging.info(f\"[NG INSPECT] Channel {i}: min={np.min(arr)}, max={np.max(arr)}, mean={np.mean(arr)}, std={np.std(arr)}, first 5: {arr[:5]}\")
  161.                    if np.all(arr == 0):
  162.                        logging.warning(f\"[NG INSPECT] Channel {i} is all zeros!\")
  163.                    if np.isnan(arr).any():
  164.                        logging.warning(f\"[NG INSPECT] Channel {i} contains NaNs!\")
  165.                if n_channels_found == 0 or n_samples == 0:
  166.                    logging.warning(f\"[NG INSPECT] No data found in file!\")
  167.                return  # Only print for first successful parse
  168.            except Exception as e:
  169.                logging.info(f\"[NG INSPECT] Failed with dtype={dtype}, n_channels={n_channels}: {e}\")
  170.  
  171. if __name__ == \"__main__\":
  172.    import argparse
  173.    setup_logging()
  174.    parser = argparse.ArgumentParser(description=\"NeuroGuide .ng Inspector/Converter\")
  175.    parser.add_argument(\"ng_file\", help=\"Path to .ng file\")
  176.    parser.add_argument(\"--to-edf\", dest=\"edf_path\", help=\"Convert to EDF at this path\")
  177.    args = parser.parse_args()
  178.  
  179.    inspect_ng_file(args.ng_file)
  180.    try:
  181.        raw = load_ng_as_raw(args.ng_file)
  182.        log_data_stats(raw, context=f\"Loaded NG {args.ng_file}\")
  183.        if args.edf_path:
  184.            export_raw_to_edf(raw, args.edf_path)
  185.            logging.info(f\"[MAIN] Exported EDF to {args.edf_path}\")
  186.    except Exception as e:
  187.        logging.error(f\"[MAIN] Could not load or convert .ng file: {e}\")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement