119 lines
4.1 KiB
Python
119 lines
4.1 KiB
Python
"""
|
|
Parser for Gwinstek oscilloscope CSV files.
|
|
"""
|
|
|
|
import csv
|
|
from typing import Dict, List
|
|
import numpy as np
|
|
|
|
from .base_parser import BaseOscilloscopeParser
|
|
from .data import ScopeData, ChannelData
|
|
|
|
|
|
class GwinstekParser(BaseOscilloscopeParser):
|
|
"""Parser for Gwinstek oscilloscope CSV files."""
|
|
|
|
def can_parse(self, file_path: str) -> bool:
|
|
"""Check if file is from Gwinstek scope."""
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
first_lines = [f.readline().strip() for _ in range(10)]
|
|
# Gwinstek-specific
|
|
return any("Memory Length" in line for line in first_lines)
|
|
except:
|
|
return False
|
|
|
|
def parse(self, file_path: str) -> ScopeData:
|
|
"""Parse Gwinstek oscilloscope CSV file."""
|
|
|
|
with open(file_path, "r") as f:
|
|
# Read header metadata
|
|
metadata_lines = []
|
|
data_start_line = 0
|
|
|
|
for i, line in enumerate(f):
|
|
line = line.strip()
|
|
if line.startswith("Waveform Data"): # Gwinstek uses this as separator
|
|
data_start_line = i + 1
|
|
break
|
|
metadata_lines.append(line)
|
|
|
|
# Parse metadata
|
|
metadata = self._parse_metadata(metadata_lines)
|
|
|
|
# Reset file pointer to data section
|
|
f.seek(0)
|
|
for _ in range(data_start_line):
|
|
next(f)
|
|
|
|
# Parse waveform data (just numbers, one per line)
|
|
voltage_data = []
|
|
time_data = []
|
|
|
|
for i, line in enumerate(f):
|
|
line = line.strip()
|
|
if not line: # Skip empty lines
|
|
continue
|
|
|
|
# Remove trailing comma TODO: If 2ch this might not be trailing
|
|
if line.endswith(","):
|
|
line = line[:-1]
|
|
|
|
try:
|
|
# Gwinstek data is in pixels, yes its dumb
|
|
pixel_value = float(line)
|
|
|
|
# Get volts per division from metadata
|
|
volts_per_division = float(metadata.get("Vertical Scale", "1.0"))
|
|
vertical_position = float(metadata.get("Vertical Position", "0.0"))
|
|
|
|
# Convert pixels to voltage: 25 pixels per division
|
|
# Pixel 0 is in the middle, so we center around vertical_position
|
|
voltage = (
|
|
pixel_value / 25.0
|
|
) * volts_per_division + vertical_position
|
|
voltage_data.append(voltage)
|
|
|
|
# Calculate time values from sampling period
|
|
sampling_period = float(metadata.get("Sampling Period", "2.0E-06"))
|
|
time_data.append(i * sampling_period)
|
|
|
|
except ValueError:
|
|
# Skip non-numeric lines
|
|
continue
|
|
|
|
# Create channel data
|
|
channel_name = metadata.get("Source", "CH1")
|
|
|
|
channel_data = ChannelData(
|
|
channel_name=channel_name,
|
|
voltage_values=np.array(voltage_data),
|
|
time_values=np.array(time_data),
|
|
metadata=metadata,
|
|
)
|
|
|
|
return ScopeData(channels={channel_name: channel_data}, metadata=metadata)
|
|
|
|
def _parse_metadata(self, metadata_lines: List[str]) -> Dict[str, str]:
|
|
"""Parse metadata from header lines."""
|
|
metadata = {}
|
|
|
|
for line in metadata_lines:
|
|
if "," in line:
|
|
parts = line.split(",", 2) # Gwinstek has 3 parts: key, value, empty
|
|
if len(parts) >= 2:
|
|
key = parts[0].strip()
|
|
value = parts[1].strip()
|
|
if value: # Only add non-empty values
|
|
metadata[key] = value
|
|
|
|
return metadata
|
|
|
|
def _extract_time_interval(self, metadata: Dict[str, str]) -> float:
|
|
"""Extract time interval between samples for Gwinstek format."""
|
|
sampling_period = metadata.get("Sampling Period", "2.0E-06")
|
|
try:
|
|
return float(sampling_period)
|
|
except ValueError:
|
|
return 2.0e-6 # Default to 2 µs
|