Source code for py4syn.utils.counter

import py4syn
from py4syn.epics.ICountable import ICountable
from py4syn.epics.ScalerClass import Scaler
import collections

def findMonitor():
    for k, v in py4syn.counterDB.items():
        if v['monitor'] and v['enable']:
            return k, v

    return None, None

[docs]def createCounter(mnemonic, device, channel=None, monitor=False, factor=1): """ Add a countable to the counterDB dictionary Parameters ---------- mnemonic : `string` Mnemonic of the counter device : `ICountable` The device that handles this counter and implements ICountable interface channel : `int` Specify where usable the channel index for this counter monitor : `bool` If monitor is set to True the process will be by counts and not by time factor : `int` Specify the factor to be used when reading the counter value Examples ---------- >>> scaler = Scaler("SOL:SCALER", 10, "scaler1") # Creates an EPICS Scaler >>> scalerSIM = SimCountable("ANY:PV:HERE", "simMnemonic") >>> createCounter("seconds", scaler, 1, factor=1e+7) # Create a channel using the real >>> # scaler, channel number 1 and a factor. >>> createCounter("det", scaler, 3) >>> createCounter("mon", scaler, 10, monitor=True) # Create a channel that will act as a monitor >>> createCounter("cyberSim", scalerSIM, 2) >>> ct(10000) # Start a count process until 10000 counts in the monitor >>> # and show the results for each channel at the end. >>> counts = ctr(2, use_monitor=False) # Start a count process with 2 seconds >>> # of integration time. Returns the counts as a map. >>> print("Counts on det: ", counts['det']) """ try: if(not isinstance(device, ICountable)): raise Exception("The device is not a ICountable. Please check!") if(mnemonic in py4syn.counterDB): raise Exception("Mnemonic already in use at counterDB. Please check!") mne, pars = findMonitor() if(monitor and mne is not None): raise Exception("There is already one monitor at counterDB. Monitor mnemonic is: "+str(mne)+". Please check.") if(monitor and not device.canMonitor()): raise Exception("The device cannot be used as a monitor. Please check.") counterData = {} counterData['device'] = device counterData['channel'] = channel counterData['monitor'] = monitor counterData['factor'] = factor counterData['enable'] = True py4syn.counterDB[mnemonic] = counterData except Exception as e: print("\tError: ",e)
def waitAll(monitor=False): if(monitor): k, v = findMonitor() dev = v['device'] dev.wait() else: for k, v in py4syn.counterDB.items(): v = py4syn.counterDB[k] if v['enable']: dev = v['device'] dev.wait() def stopAll(): for k, v in py4syn.counterDB.items(): dev = v['device'] dev.stopCount() def startCounters(use_monitor=False): for k, v in py4syn.counterDB.items(): dev = v['device'] if not dev.isCounting() and v['enable']: if(use_monitor and not dev.canStopCount()): continue dev.startCount() def printCountersValue(data): print("-"*32) print("Counts:") print("-"*32) for m, v in data.items(): if(isinstance(v, collections.Iterable)): print("{0:>10} {1:>20}".format(m, v.sum())) else: print("{0:>10} {1:>20}".format(m, int(v))) print("-"*32) def ctr(t=1, use_monitor=False, wait=True): from epics import ca k, v = findMonitor() if(k is not None and use_monitor): # We have a monitor, so we start the monitor, wait until it finishes, stop all other counters and grab count values v['device'].setPresetValue(v['channel'], t) for kT, vT in py4syn.counterDB.items(): dev = vT['device'] if(dev == v['device']): continue if not dev.isCounting() and dev.canStopCount() and v['enable']: dev.setCountTime(ICountable.INFINITY_TIME) startCounters(use_monitor=True) ca.poll() if(wait): waitAll(monitor=True) stopAll() return getCountersData() else: for k, v in py4syn.counterDB.items(): dev = v['device'] if not dev.isCounting() and v['enable']: dev.setCountTime(t) startCounters() ca.poll() if(wait): waitAll() return getCountersData() def getCountersData(): data = collections.OrderedDict() for k, v in py4syn.counterDB.items(): if (v['enable'] == True): dev = v['device'] cnt = 0 if(v['channel'] is not None): cnt = dev.getValue(channel=v['channel']) else: cnt = dev.getValue() data[k] = cnt/(v['factor']*1.0) return data
[docs]def ct(t=1, use_monitor=False): """ Count all counters available using the given time Parameters ---------- t : `double` Time of Preset to be used use_monitor : `bool` If monitor is set to True the process will be by counts and not by time """ data = ctr(t, use_monitor) printCountersValue(data)
[docs]def getActiveCountersNumber(): """ Count all counters available using the given time Returns ---------- `int` The number of enabled counters in counterDB """ cnt = 0 for k, v in py4syn.counterDB.items(): v = py4syn.counterDB[k] if v['enable']: cnt += 1 return cnt
[docs]def disableCounter(mne): """ Disable an specific counter Parameters ---------- mne : `string` The counter mnemonic """ if(mne not in py4syn.counterDB): raise Exception("Counter "+mne+" not found in counterDB. Please check!") c = py4syn.counterDB[mne] c['enable'] = False
[docs]def enableCounter(mne): """ Enable an specific counter Parameters ---------- mne : `string` The counter mnemonic """ if(mne not in py4syn.counterDB): raise Exception("Counter "+mne+" not found in counterDB. Please check!") c = py4syn.counterDB[mne] c['enable'] = True
def clearCounterDB(): py4syn.counterDB = collections.OrderedDict() if __name__ == "__main__": scalerSIM = Scaler("IMX:SCALER", 13, "scalerSIM") createCounter("seconds", scalerSIM, 1) createCounter("cyberstar", scalerSIM, 13, monitor=True) ct(1) disableCounter("seconds") disableCounter("mon") ct(10000, use_monitor=True) #timescan(0.1, delay=0.1) ''' import datetime countTime = 1 s = datetime.datetime.now() ct(countTime) e = datetime.datetime.now() print("Elapsed Time: ", e-s) #scaler.setCountTime(1) #scaler.setCountStart() #scaler.wait() #print("S1 = ", scaler.getIntensity(1)) #print("S10 = ", scaler.getIntensity(10)) '''