Source code for py4syn.epics.DigitalIOClass

"""
EPICS Digital I/O

Python Class for EPICS Digital I/O control.

:platform: Unix
:synopsis: Python Class for Digital I/O control using EPICS.

.. moduleauthor:: Laís Pessine do Carmo <lais.carmo@lnls.br>
    .. note:: 12/04/2016 [lais.carmo]  First version released.
    .. note:: 15/04/2016 [lais.carmo]  Internal methods as '__private'; __getPortNumbers() added.

"""

from epics import PV

class ErrorPortSequence(Exception):
    """
    Exception raised when the given port sequence is invalid.
    """
    def __init__(self, port_sequences):
        self.arg = port_sequences
    def __str__(self):
        return "ERROR: Port sequence parameter '" + str(self.args) + "' is not valid. \nTIP: ';' and '-' can be used to define a port sequence. Example: '0-3;5;10-15'"

class ErrorPortType(Exception):
    """
    Exception raised when the given port type is not 'INPUT' neither 'OUTPUT'.
    """
    def __init__(self):
        pass
    def __str__(self):
        return str("Port type must be 'INPUT' or 'OUTPUT'.")

class ErrorIndexNotFound(Exception):
    """
    Exception raised when the port index in not found'.
    """
    def __init__(self, port_index):
        self.arg = port_index
    def __str__(self):
        return str("Port index " + str(self.arg) + " not found.")

class ErrorInputAssignment(Exception):
    """
    Exception raised when it is tried to assign a value to port of type 'INPUT'.
    """
    def __init__(self):
        pass
    def __str__(self):
        return str("Cannot assign a value to an INPUT port (read only).")


[docs]class DigitalIO(object): """ Class for Digital I/O using EPICS. Example: -------- >>> from py4syn.epics.DigitalIOClass import DigitalIO >>> ports_out = DigitalIO('SOL3:DIO:bo', 'OUTPUT', '0-3;7') >>> ports_out.list() ... SOL3:DIO:bo0 ... SOL3:DIO:bo1 ... SOL3:DIO:bo2 ... SOL3:DIO:bo3 ... SOL3:DIO:bo7 >>> ports_out.getValue(3) ... 0 >>> ports_out.putValue(3,1) >>> ports_out.getValue(3) ... 1 >>> ports_out.addPorts('5-6') >>> ports_out.list() ... SOL3:DIO:bo0 ... SOL3:DIO:bo1 ... SOL3:DIO:bo2 ... SOL3:DIO:bo3 ... SOL3:DIO:bo5 ... SOL3:DIO:bo6 ... SOL3:DIO:bo7 >>> ports_out.deletePorts('3') >>> ports_out.list() ... SOL3:DIO:bo0 ... SOL3:DIO:bo1 ... SOL3:DIO:bo2 ... SOL3:DIO:bo5 ... SOL3:DIO:bo6 ... SOL3:DIO:bo7 >>> ports_out.getName(2) ... 'SOL3:DIO:bo2' """ def __init__(self, digitalIO_prefix, port_type, port_sequence): """ **Constructor** Parameters ---------- digitalIO_prefix : `string` Prefix of the ports. port_type : `string` 'INPUT' or 'OUTPUT'. port_sequence : `string` Index of the ports. Example: '0-3;7' means ports from 0 to 3 and 7. """ self.digitalIO_prefix = None if port_type == 'INPUT' or port_type == 'OUTPUT': self.port_type = port_type else: raise ErrorPortType() self.port_name_dict = {} self.port_pv_dict = {} self.digitalIO_prefix = digitalIO_prefix self.addPorts(port_sequence)
[docs] def getValue(self, port_index): """ Returns the current value of a port PV. Parameters ---------- port_index : `int` Index of the port. """ port_index = int(port_index) if port_index not in self.port_name_dict.keys(): raise ErrorIndexNotFound(port_index) return self.port_pv_dict[port_index].get()
[docs] def putValue(self, port_index, new_value): """ Set a new value for a port PV. Parameters ---------- port_index : `int` Index of the port. new_value New value of the port PV. """ port_index = int(port_index) if port_index not in self.port_name_dict.keys(): raise ErrorIndexNotFound(port_index) if self.port_type == 'OUTPUT': self.port_pv_dict[port_index].put(new_value); else: raise ErrorInputAssignment()
[docs] def list(self): """ Lists name of all ports in use. """ for k in self.port_name_dict.keys(): print(self.port_name_dict[k])
[docs] def getName(self, port_index): """ Returns the name of a port PV. Parameters ---------- port_index : `int` Index of the port. """ port_index = int(port_index) if port_index not in self.port_name_dict.keys(): raise ErrorIndexNotFound(port_index) return self.port_name_dict[port_index]
def __getPortNumbers(self, port_sequences): """ Receive a sequece of ports and return an integer array with all port numbers. Parameters ---------- port_sequences : `string` Sequences of port indexes. Returns ---------- port_sequence_array : `int` Array of all port numbers. """ try: port_sequences = str(port_sequences) port_sequence = port_sequences.split(';') port_sequence_array = [] for p in port_sequence: if len(p.split('-')) >= 1: port_init = p.split('-')[0] port_final = port_init if len(p.split('-')) == 2: port_final = p.split('-')[1] for x in range(int(port_init),int(port_final)+1): port_sequence_array.append(x) return port_sequence_array except: raise ErrorPortSequence(port_sequences) def __setPortNames(self, port_sequences): """ Set a dictionary with all port names (based on given prefix and port indexes). Parameters ---------- port_sequences : `string` Index of the ports. """ try: port_numbers = self.__getPortNumbers(port_sequences) for pn in port_numbers: self.port_name_dict[pn] = self.digitalIO_prefix + str(pn) except: raise ErrorPortSequence(port_sequences) def __setPortPVs(self): """ Set a dictionary with all port PVs (based on prefix and port indexes). """ for x in self.port_name_dict.keys(): if x not in self.port_pv_dict.keys(): port_name = self.port_name_dict[x] port_pv = PV(port_name) self.port_pv_dict[x] = port_pv
[docs] def addPorts(self, port_sequences): """ Add ports to name and PVs dictionaries. Parameters ---------- port_sequences : `string` Index of the ports to be added. """ self.__setPortNames(port_sequences) self.__setPortPVs()
[docs] def deletePorts(self, port_sequences): """ Delete ports from name and PVs dictionaries. Parameters ---------- port_sequences : `string` Index of the ports to be deleted. """ try: port_sequences = str(port_sequences) port_numbers = self.__getPortNumbers(port_sequences) for pn in port_numbers: if pn in self.port_name_dict.keys(): del self.port_name_dict[pn] if pn in self.port_pv_dict.keys(): del self.port_pv_dict[pn] except: raise ErrorPortSequence(port_sequences)