Rigol-DG2052-Function-Gener.../fn_gen/dg2052.py

192 lines
9.8 KiB
Python

from re import DEBUG
import time
import logging
import pyvisa
from .errors import *
from .enums import *
from .common import *
from .constants.dg2052 import *
class DG2052( pyvisa.resources.MessageBasedResource ):
comm: CommMethod
rm: pyvisa.ResourceManager
port: str
def __init__( self, port: str ):
if "TCPIP" in port:
logging.debug("(PROG) detected TCPIP port")
self.comm = CommMethod.LAN
elif "USB" in port:
logging.debug("(PROG) detected USB port")
self.comm = CommMethod.USB
else:
raise UndefinedCommunicationMethodError(port)
rm = pyvisa.ResourceManager()
self.rm = rm
self.port = port
super().__init__(rm, port)
logging.debug("(PROG) created dg2052 instance")
self.open()
logging.debug("(PROG) connected to dg2052 device")
def whoami( self ) -> str:
match(self.comm):
case CommMethod.LAN:
logging.debug("(PROG) communication method: LAN")
manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(','))
# ipaddr = self.system.communicate.lan.ipaddress().strip()
ipaddr = self.query(":SYST:COMM:LAN:IPAD?").strip()
# mac = self.system.communicate.lan.mac().strip()
mac = self.query(":SYST:COMM:LAN:MAC?").strip()
return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}"
case CommMethod.USB:
logging.debug("(PROG) communication method USB")
manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(','))
# info = self.system.communicate.usb.information().strip()
info = self.query(":SYST:COMM:USB:INF?").strip()
return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tINFORMATION: {info}"
case _:
raise UndefinedCommunicationMethodError(self.port)
def set_output(self, channel: OutputChannel, state: bool):
if state:
logging.debug( f"(PROG) :OUTP{channel.value} ON" )
self.write( f':OUTP{channel.value} ON' )
else:
logging.debug( f"(PROG) :OUTP{channel.value} OFF" )
self.write( f':OUTP{channel.value} OFF' )
def toggle_output(self, channel: OutputChannel):
state = self.query( f':OUT{channel.value}?' ).strip()
logging.debug(f"(PROG) output {channel.value} state: {state}")
match(state):
case "ON":
self.set_output(channel, False)
case "OFF":
self.set_output(channel, True)
case _:
raise UndefinedValueError(state, "ON or OFF")
def get_output_volt_limits(self, channel: OutputChannel) -> tuple[float, float]:
low: float = float(self.query( f':OUTP{channel.value}:VOLL:LOW?' ))
high: float = float(self.query( f':OUTP{channel.value}:VOLL:HIGH?' ))
logging.debug(f"(PROG) output {channel.value} limits: {low}, {high}")
return low, high
def get_output_impedance(self, channel: OutputChannel) -> float:
impedance = float(self.query( f':OUTP{channel.value}:IMP?' ))
logging.debug(f"(PROG) output {channel.value} impedance: {impedance}")
return impedance
def get_output_load(self, channel: OutputChannel) -> float:
load = float(self.query( f':OUTP{channel.value}:LOAD?' ))
logging.debug(f"(PROG) output {channel.value} load: {load}")
return load
def get_output_signal(self, channel: OutputChannel) -> str:
signal = self.query( f':SOUR{channel.value}:APPL?' ).strip()
logging.debug(f"(PROG) output {channel.value} signal: {signal}")
return signal
def get_output_state(self, channel: OutputChannel) -> str:
state = self.query( f':OUTP{channel.value}?' ).strip()
logging.debug(f"(PROG) output {channel.value} state: {state}")
return state
def set_dc(self, channel: OutputChannel, offset: float):
logging.debug(f"(PROG) set dc signal with offset: {offset}")
self.write( f':SOUR{channel.value}:APPL:DC 1,1,{offset}' )
def set_sine_wave(self, channel: OutputChannel, freq: float = 1e3, amp: float = 5.0, offset: float = 0.0, phase: int = 0):
if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]:
raise ValueOutOfBoundsError(SIN_RANGE, freq)
if phase < 0 and phase > 360:
raise ValueOutOfBoundsError((0, 360), phase)
logging.debug(f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}")
self.write( f':SOUR{channel.value}:APPL:SIN {freq},{amp},{offset},{phase}' )
def set_square_wave(
self,
channel: OutputChannel, # Sets the output channel of the ramp function
freq: float = 1e3, # Sets the frequency
amp: float = 5.0, # Sets the amplitude
offset: float = 0.0, # Sets the amplitude offset
phase: int = 0 # Sets the phase shift
):
check_bounds(SQU_RANGE, freq)
check_bounds((0, 360), phase)
logging.debug(f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}")
self.write( f':SOUR{channel.value}:APPL:SQU {freq},{amp},{offset},{phase}' )
def set_ramp(
self,
channel: OutputChannel, # Sets the output channel of the ramp function
freq: float = 1e3, # Sets the frequency
amp: float = 5, # Sets the amplitude
offset: float = 0, # Sets the amplitude offset
phase: int = 0 # Sets the phase shift
):
check_bounds(RAMP_RANGE, freq)
check_bounds((0, 360), phase)
logging.debug(f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}")
self.write( f':SOUR{channel.value}:APPL:RAMP {freq},{amp},{offset},{phase}' )
def set_sweep(
self,
channel: OutputChannel, # Sets the output channel of the sweep function
signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped
htime_start: float = 0, # Sets the start hold time of the sweep function
htime_stop: float = 0, # Sets the stop hold time of the sweep function
rtime: float = 0, # Sets the return time of the sweep function
time: float = 1, # Sets the sweep time
spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type
step: int = 2, # Sets the number of steps of the sweep function
trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only)
trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL # Sets the sweep trigger source
):
time_bounds: tuple[float, float] = (0, 500)
command_header = f":SOUR{channel.value}:SWE"
check_bounds(time_bounds, htime_start)
check_bounds(time_bounds, htime_stop)
check_bounds(time_bounds, rtime)
check_bounds((2, 1024), step)
check_bounds((1e-3, 599.0), time)
self.write( f"{command_header}:SPAC {spacing}" )
self.write( f"{command_header}:STEP {step}" )
match trigger_source:
case SweepTriggerSource.INTERNAL:
self.write( f"{command_header}:TRIG:SOUR INT" )
self.write( f"{command_header}:HTIM:STAR {htime_start}" )
self.write( f"{command_header}:HTIM {htime_stop}" )
self.write( f"{command_header}:RTIM {rtime}" )
self.write( f"{command_header}:TIME {time}" )
case SweepTriggerSource.EXTERNAL:
self.write( f"{command_header}:TRIG:SOUR EXT" )
if trigger_slope == SweepTriggerSlope.POSITIVE:
self.write( f"{command_header}:TRIG:SLOP POS" )
elif trigger_slope == SweepTriggerSlope.NEGATIVE:
self.write( f"{command_header}:TRIG:SLOP NEG" )
else:
UndefinedValueError(trigger_slope, "SweepTriggerSlope.Positive or SweepTriggerSlope.Negative")
case SweepTriggerSource.MANUAL:
self.write( f"{command_header}:TRIG:SOUR MAN" )
case _:
UndefinedValueError(trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]")
self.write( f"{command_header}:STAT ON" )
def trigger_sweep(self, channel: OutputChannel):
self.write( f":SOUR{channel.value}:SWE:TRIG:IMM" )
# def set_pulse(self, channel: OutputChannel, duty_cycle: float, transition_leading: float, transition_trailing: float, pulse_width: float):
# transition_bounds = (8e-9, 0.625*pulse_width)
# duty_cycle_bounds = (0.001, 99.999)
# pulse_width_bounds = (16e-9, 999.999e3)
# command_header = f":SOUR{channel.value}:PULS"
# check_bounds(duty_cycle_bounds, duty_cycle)
# check_bounds(transition_bounds, transition_leading)
# check_bounds(transition_bounds, transition_trailing)
# check_bounds(pulse_width_bounds, pulse_width)
# self.write( f"{command_header}:WIDT {pulse_width}" )
# self.write( f"{command_header}:DCYC {duty_cycle}" )
# self.write( f"{command_header}:TRAN:LEAD {transition_leading}" )
# self.write( f"{command_header}:TRAN:TRA {transition_trailing}" )