139 lines
4.9 KiB
Python
139 lines
4.9 KiB
Python
"""
|
|
Parser for National Instruments LVM files.
|
|
"""
|
|
|
|
import re
|
|
from typing import Dict, List
|
|
import numpy as np
|
|
|
|
from .base_parser import BaseOscilloscopeParser
|
|
from .data import ScopeData, ChannelData
|
|
|
|
|
|
class NIParser(BaseOscilloscopeParser):
|
|
"""Parser for National Instruments LVM files."""
|
|
|
|
date_time = None
|
|
|
|
def can_parse(self, file_path: str) -> bool:
|
|
"""Check if the file is valid/readable."""
|
|
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
first_lines = f.readlines()
|
|
has_header = first_lines[0].strip().startswith("LabVIEW Measurement")
|
|
has_end_of_header = any(
|
|
"***End_of_Header***" in line for line in first_lines
|
|
)
|
|
|
|
if not (has_header and has_end_of_header):
|
|
return False
|
|
|
|
date_time_pattern = re.compile(r"Date\s*(.*)\s*Time\s*(.*)")
|
|
for line in first_lines:
|
|
match = date_time_pattern.search(line)
|
|
if match:
|
|
date = match.group(1).strip()
|
|
time = match.group(2).strip()
|
|
self.date_time = f"{date} {time}"
|
|
break
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def parse(self, file_path: str) -> ScopeData:
|
|
"""Parse NI LVM file."""
|
|
|
|
with open(file_path, "r") as f:
|
|
lines = f.readlines()
|
|
|
|
channels = {}
|
|
global_metadata = {}
|
|
|
|
# Parse global metadata (before first ***End_of_Header***)
|
|
for i, line in enumerate(lines):
|
|
line = line.strip()
|
|
if line == "***End_of_Header***":
|
|
break
|
|
if "\t" in line and not line.startswith("***"):
|
|
parts = line.split("\t")
|
|
if len(parts) >= 2:
|
|
key = parts[0].strip()
|
|
value = parts[1].strip()
|
|
if value:
|
|
global_metadata[key] = value
|
|
|
|
# Find all data blocks (after ***End_of_Header***)
|
|
block_starts = []
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "***End_of_Header***":
|
|
# Look for the next data block (X_Value_1 line)
|
|
for j in range(i + 1, len(lines)):
|
|
if lines[j].strip().startswith("X_Value_1"):
|
|
if j not in block_starts: # Avoid duplicates
|
|
block_starts.append(j)
|
|
break
|
|
|
|
# Parse each block
|
|
for block_idx, start_line in enumerate(block_starts):
|
|
block_metadata = {}
|
|
time_data = []
|
|
voltage_data = []
|
|
|
|
# Find the block metadata (look backwards from X_Value_1 line)
|
|
metadata_start = start_line
|
|
for i in range(start_line - 1, -1, -1):
|
|
line = lines[i].strip()
|
|
if line == "***End_of_Header***":
|
|
metadata_start = i + 1
|
|
break
|
|
|
|
# Parse block metadata
|
|
for i in range(metadata_start, start_line):
|
|
line = lines[i].strip()
|
|
if (
|
|
"\t" in line
|
|
and not line.startswith("***")
|
|
and not line.startswith("Page_")
|
|
):
|
|
parts = line.split("\t")
|
|
if len(parts) >= 2:
|
|
key = parts[0].strip()
|
|
value = parts[1].strip()
|
|
if value and value != "---": # Only add non-empty values
|
|
block_metadata[key] = value
|
|
|
|
# Parse data
|
|
data_start = start_line + 1 # Skip the X_Value_1 header line
|
|
for line in lines[data_start:]:
|
|
if not line.strip() or line.strip().startswith("***"):
|
|
break
|
|
try:
|
|
parts = line.split("\t")
|
|
if len(parts) >= 2:
|
|
time_data.append(float(parts[0]))
|
|
voltage_data.append(float(parts[1]))
|
|
except ValueError:
|
|
continue
|
|
|
|
if time_data and voltage_data:
|
|
# Create unique channel name for each block
|
|
channel_name = f"CH{block_idx + 1}"
|
|
|
|
# Add block metadata to channel metadata
|
|
channel_metadata = {**global_metadata, **block_metadata}
|
|
if self.date_time:
|
|
channel_metadata["Date_Time"] = self.date_time
|
|
|
|
channel_data = ChannelData(
|
|
channel_name=channel_name,
|
|
voltage_values=np.array(voltage_data),
|
|
time_values=np.array(time_data),
|
|
metadata=channel_metadata,
|
|
)
|
|
|
|
channels[channel_name] = channel_data
|
|
|
|
return ScopeData(channels=channels, metadata=global_metadata)
|