from re import DEBUG import time import logging import pyvisa from .errors import * from .enums import * from .common import * from .constants.dg2052 import * class DG2052( pyvisa.resources.MessageBasedResource ): comm: CommMethod rm: pyvisa.ResourceManager port: str def __init__( self, port: str ): if "TCPIP" in port: logging.debug("(PROG) detected TCPIP port") self.comm = CommMethod.LAN elif "USB" in port: logging.debug("(PROG) detected USB port") self.comm = CommMethod.USB else: raise UndefinedCommunicationMethodError(port) rm = pyvisa.ResourceManager() self.rm = rm self.port = port super().__init__(rm, port) logging.debug("(PROG) created dg2052 instance") self.open() logging.debug("(PROG) connected to dg2052 device") def whoami( self ) -> str: match(self.comm): case CommMethod.LAN: logging.debug("(PROG) communication method: LAN") manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(',')) # ipaddr = self.system.communicate.lan.ipaddress().strip() ipaddr = self.query(":SYST:COMM:LAN:IPAD?").strip() # mac = self.system.communicate.lan.mac().strip() mac = self.query(":SYST:COMM:LAN:MAC?").strip() return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tIPADDRESS: {ipaddr}\n\tMAC: {mac}" case CommMethod.USB: logging.debug("(PROG) communication method USB") manufacturer, model, serial, software_ver = tuple(self.query('*IDN?').strip().split(',')) # info = self.system.communicate.usb.information().strip() info = self.query(":SYST:COMM:USB:INF?").strip() return f"{manufacturer} {model}:\n\tSerial Nr.: {serial}\n\tSoftware Ver.: {software_ver}\n\tPort: {self.port}\n\tINFORMATION: {info}" case _: raise UndefinedCommunicationMethodError(self.port) def set_output(self, channel: OutputChannel, state: bool): if state: logging.debug( f"(PROG) :OUTP{channel.value} ON" ) self.write( f':OUTP{channel.value} ON' ) else: logging.debug( f"(PROG) :OUTP{channel.value} OFF" ) self.write( f':OUTP{channel.value} OFF' ) def toggle_output(self, channel: OutputChannel): state = self.query( f':OUT{channel.value}?' ).strip() logging.debug(f"(PROG) output {channel.value} state: {state}") match(state): case "ON": self.set_output(channel, False) case "OFF": self.set_output(channel, True) case _: raise UndefinedValueError(state, "ON or OFF") def get_output_volt_limits(self, channel: OutputChannel) -> tuple[float, float]: low: float = float(self.query( f':OUTP{channel.value}:VOLL:LOW?' )) high: float = float(self.query( f':OUTP{channel.value}:VOLL:HIGH?' )) logging.debug(f"(PROG) output {channel.value} limits: {low}, {high}") return low, high def get_output_impedance(self, channel: OutputChannel) -> float: impedance = float(self.query( f':OUTP{channel.value}:IMP?' )) logging.debug(f"(PROG) output {channel.value} impedance: {impedance}") return impedance def get_output_load(self, channel: OutputChannel) -> float: load = float(self.query( f':OUTP{channel.value}:LOAD?' )) logging.debug(f"(PROG) output {channel.value} load: {load}") return load def get_output_signal(self, channel: OutputChannel) -> str: signal = self.query( f':SOUR{channel.value}:APPL?' ).strip() logging.debug(f"(PROG) output {channel.value} signal: {signal}") return signal def get_output_state(self, channel: OutputChannel) -> str: state = self.query( f':OUTP{channel.value}?' ).strip() logging.debug(f"(PROG) output {channel.value} state: {state}") return state def set_dc(self, channel: OutputChannel, offset: float): logging.debug(f"(PROG) set dc signal with offset: {offset}") self.write( f':SOUR{channel.value}:APPL:DC 1,1,{offset}' ) def set_sine_wave(self, channel: OutputChannel, freq: float = 1e3, amp: float = 5.0, offset: float = 0.0, phase: int = 0): if freq < SIN_RANGE[0] and freq > SIN_RANGE[1]: raise ValueOutOfBoundsError(SIN_RANGE, freq) if phase < 0 and phase > 360: raise ValueOutOfBoundsError((0, 360), phase) logging.debug(f"(PROG) set sine signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") self.write( f':SOUR{channel.value}:APPL:SIN {freq},{amp},{offset},{phase}' ) def set_square_wave( self, channel: OutputChannel, # Sets the output channel of the ramp function freq: float = 1e3, # Sets the frequency amp: float = 5.0, # Sets the amplitude offset: float = 0.0, # Sets the amplitude offset phase: int = 0 # Sets the phase shift ): check_bounds(SQU_RANGE, freq) check_bounds((0, 360), phase) logging.debug(f"(PROG) set square signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") self.write( f':SOUR{channel.value}:APPL:SQU {freq},{amp},{offset},{phase}' ) def set_ramp( self, channel: OutputChannel, # Sets the output channel of the ramp function freq: float = 1e3, # Sets the frequency amp: float = 5, # Sets the amplitude offset: float = 0, # Sets the amplitude offset phase: int = 0 # Sets the phase shift ): check_bounds(RAMP_RANGE, freq) check_bounds((0, 360), phase) logging.debug(f"(PROG) set ramp signal with freq: {freq}, amp: {amp}, offset: {offset}, phase: {phase}") self.write( f':SOUR{channel.value}:APPL:RAMP {freq},{amp},{offset},{phase}' ) def set_sweep( self, channel: OutputChannel, # Sets the output channel of the sweep function signal_type: SweepSignalType = SweepSignalType.SINE, # Sets the type of signal being sweeped htime_start: float = 0, # Sets the start hold time of the sweep function htime_stop: float = 0, # Sets the stop hold time of the sweep function rtime: float = 0, # Sets the return time of the sweep function time: float = 1, # Sets the sweep time spacing: SweepSpacing = SweepSpacing.LIN, # Sets the sweep type step: int = 2, # Sets the number of steps of the sweep function trigger_slope: SweepTriggerSlope = SweepTriggerSlope.POSITIVE, # Sets the edge type of the trigger input signal (for external trigger only) trigger_source: SweepTriggerSource = SweepTriggerSource.INTERNAL # Sets the sweep trigger source ): time_bounds: tuple[float, float] = (0, 500) command_header = f":SOUR{channel.value}:SWE" check_bounds(time_bounds, htime_start) check_bounds(time_bounds, htime_stop) check_bounds(time_bounds, rtime) check_bounds((2, 1024), step) check_bounds((1e-3, 599.0), time) self.write( f"{command_header}:SPAC {spacing}" ) self.write( f"{command_header}:STEP {step}" ) match trigger_source: case SweepTriggerSource.INTERNAL: self.write( f"{command_header}:TRIG:SOUR INT" ) self.write( f"{command_header}:HTIM:STAR {htime_start}" ) self.write( f"{command_header}:HTIM {htime_stop}" ) self.write( f"{command_header}:RTIM {rtime}" ) self.write( f"{command_header}:TIME {time}" ) case SweepTriggerSource.EXTERNAL: self.write( f"{command_header}:TRIG:SOUR EXT" ) if trigger_slope == SweepTriggerSlope.POSITIVE: self.write( f"{command_header}:TRIG:SLOP POS" ) elif trigger_slope == SweepTriggerSlope.NEGATIVE: self.write( f"{command_header}:TRIG:SLOP NEG" ) else: UndefinedValueError(trigger_slope, "SweepTriggerSlope.Positive or SweepTriggerSlope.Negative") case SweepTriggerSource.MANUAL: self.write( f"{command_header}:TRIG:SOUR MAN" ) case _: UndefinedValueError(trigger_source, "SweepTriggerSource.[INTERNAL | EXTERNAL | MANUAL]") self.write( f"{command_header}:STAT ON" ) def trigger_sweep(self, channel: OutputChannel): self.write( f":SOUR{channel.value}:SWE:TRIG:IMM" ) # def set_pulse(self, channel: OutputChannel, duty_cycle: float, transition_leading: float, transition_trailing: float, pulse_width: float): # transition_bounds = (8e-9, 0.625*pulse_width) # duty_cycle_bounds = (0.001, 99.999) # pulse_width_bounds = (16e-9, 999.999e3) # command_header = f":SOUR{channel.value}:PULS" # check_bounds(duty_cycle_bounds, duty_cycle) # check_bounds(transition_bounds, transition_leading) # check_bounds(transition_bounds, transition_trailing) # check_bounds(pulse_width_bounds, pulse_width) # self.write( f"{command_header}:WIDT {pulse_width}" ) # self.write( f"{command_header}:DCYC {duty_cycle}" ) # self.write( f"{command_header}:TRAN:LEAD {transition_leading}" ) # self.write( f"{command_header}:TRAN:TRA {transition_trailing}" )