"""
Low level inteface to APO Ash Dome
"""
import time
import timer
import numpy as np
from threading import Timer, Lock, Thread
from logging import Logger
from tristate import Tristate
import Encoder
import subprocess
# put in try/except for readthedocs
try :
import piplates.RELAYplate as RELAY
import APOSafety
import RPi.GPIO as GPIO
except :
print('no APOSafety or RPi.GPIO')
# RELAYPlates relay numbers for different operations
DOME_POWER = 1 #relay 207 pin 47 on old Autoscope box
DOME_DIRECTION = 2 #relay 208 pin 5
UPPER_POWER = 3 #relay 205 pin 48
UPPER_DIRECTION = 4 #relay 206 pin 4
WATCHDOG_RESET = 5 #relay 201 pin 50
LOWER_DIRECTION = 6
LOWER_POWER = 7
# GPIO bit for home sensor
HOME = 16
# time before shutters are reigster open or closed
UPPER_TIME = 86
LOWER_TIME = 120
# Park and home positions
PARK_POSITION = 60
HOME_POSITION = 80
DOME_TOLERANCE = 3
# GPIO pins for azimuth encoder
ENCODER_A = 5
ENCODER_B = 12
# scale for azimuth encoder
steps_per_degree = -724
from enum import Enum
[docs]
class ShutterState(Enum) :
shutterOpen = 0 # Dome shutter status open
shutterClosed = 1 # Dome shutter status closed
shutterOpening = 2 # Dome shutter status opening
shutterClosing = 3 # Dome shutter status closing
shutterError = 4 # Dome shutter status error
class APOAshDome() :
def __init__(self, logger=None, lower=False, upper_time=UPPER_TIME, lower_time=LOWER_TIME ) :
""" Initialize dome properties and capabilities
"""
self.connected = True
self.altitude = None
#try:
self.lower = lower
self.is_upper_open = Tristate()
self.is_upper_closed = Tristate()
self.is_lower_open = Tristate()
self.is_lower_closed = Tristate()
self.cansetaltitude = False
self.cansetazimuth = True
self.cansetpark = True
self.cansetshutter = True
self.canfindhome = True
self.canslave = False
self.canpark = True
self.cansyncazimuth = True
self.slaved = False
self.shutterstatus = ShutterState.shutterError.value
self.slewing = False
self.park_position = PARK_POSITION
self.verbose = True
self.enc = Encoder.Encoder(ENCODER_A,ENCODER_B)
GPIO.setup(HOME, GPIO.IN)
self.upper_time = upper_time
self.lower_time = lower_time
self.SupportedActions=['lower','upper_time','lower_time']
try:
with open("SavedPosition.txt") as fp :
self.azimuth=float(fp.read())
self.enc.pos = int((self.azimuth - HOME_POSITION) * steps_per_degree)
except:
self.azimuth = HOME_POSITION
self.logger = logger
if self.logger is not None : self.logger.info('Instantiating dome device')
self.start_watchdog()
self.start_weather()
def save_position(self) :
""" Save current position to file
"""
self.get_azimuth()
with open("SavedPosition.txt","w") as fp :
fp.write('{:7.1f}'.format(self.azimuth))
def start_weather(self) :
""" Start weather monitoring thread
"""
self.safety=APOSafety.APOSafety(warnonly=False,use25m=True)
t=Thread(target=self.monitor_weather)
t.start()
def monitor_weather(self,timeout=90) :
""" Check weather periodically
"""
while True :
try :
if not self.safety.issafe() and \
not (self.shutterstatus == ShutterState.shutterClosed.value) and\
not (self.shutterstatus == ShutterState.shutterClosing.value) :
self.close_shutter()
if self.logger is not None : self.logger.info('closing dome from monitor_weather')
if self.logger is not None : self.logger.info('closing louvers from monitor_weather')
subprocess.run("ms1 off 9",shell=True)
time.sleep(timeout)
except Exception as e :
print('Error: ', e)
if self.logger is not None : self.logger.exception(e)
def reset_watchdog(self,timeout=110) :
""" Reset watchdog periodically
"""
while True :
if self.logger is not None : self.logger.info('resetting watchdog')
set_relay(WATCHDOG_RESET,1)
time.sleep(5)
set_relay(WATCHDOG_RESET,0)
if not self.safety.issafe() and \
not (self.shutterstatus == ShutterState.shutterClosed.value) and\
not (self.shutterstatus == ShutterState.shutterClosing.value) :
self.close_shutter()
if self.logger is not None : self.logger.info('closing dome from reset_watchdog')
time.sleep(timeout)
def start_watchdog(self) :
""" Start thread to periodically reset watchdog
"""
t=Thread(target=self.reset_watchdog)
t.start()
def home(self) :
""" Send dome to home asynchronously
"""
if not self.athome() :
t=Thread(target=self.sendhome)
t.start()
def sendhome(self,timeout=180) :
""" Go to home
"""
if self.logger is not None : self.logger.info('sending home')
self.rotate(1)
t=timer.Timer()
t.start()
while not self.athome() and t.elapsed()<timeout :
time.sleep(0.1)
continue
if t.elapsed() < timeout :
if self.logger is not None : self.logger.info('Encoder position at home: {:d}'.format(self.enc.pos))
self.enc.pos = 0
if self.logger is not None : self.logger.info('Setting to zero{:d}'.format(self.enc.pos))
self.azimuth = HOME_POSITION
set_relay(DOME_POWER,0)
self.slewing = False
if self.logger is not None : self.logger.info('hit home')
else :
if self.logger is not None : self.logger.info('Home timer expired before finding home !')
t.stop()
self.save_position()
def athome(self) :
""" Check if at home position
"""
if GPIO.input(HOME) :
return True
else :
return False
def set_upper_open(self) :
""" Set upper shutter status to open and turn off shutter power
"""
if self.logger is not None : self.logger.info('setting upper shutter open')
self.is_upper_open = True
self.shutterstatus = ShutterState.shutterOpen.value
set_relay(UPPER_POWER,0)
set_relay(UPPER_DIRECTION,0)
def set_upper_closed(self) :
""" Set upper shutter status to closed and turn off shutter power
"""
if self.logger is not None : self.logger.info('setting upper shutter closed')
self.is_upper_open = False
self.shutterstatus = ShutterState.shutterClosed.value
set_relay(UPPER_POWER,0)
def open_upper(self) :
""" Open upper shutter asynchronously
"""
set_relay(UPPER_POWER,0)
if self.logger is not None : self.logger.info('starting shutter open')
set_relay(UPPER_DIRECTION,0)
set_relay(UPPER_POWER,1)
self.shutterstatus = ShutterState.shutterOpening.value
t=Timer(self.upper_time,self.set_upper_open)
t.start()
def close_upper(self) :
""" Close upper shutter
"""
set_relay(UPPER_POWER,0)
if self.logger is not None : self.logger.info('starting shutter close')
set_relay(UPPER_DIRECTION,1)
set_relay(UPPER_POWER,1)
self.shutterstatus = ShutterState.shutterClosing.value
t=Timer(self.upper_time,self.set_upper_closed)
t.start()
def set_lower_open(self) :
""" Set lower shutter status to open and turn off shutter power
"""
self.is_lower_open = True
set_relay(LOWER_POWER,0)
set_relay(LOWER_DIRECTION,0)
def set_lower_closed(self) :
""" Set lower shutter status to closed and turn off shutter power
"""
self.is_lower_open = True
set_relay(LOWER_POWER,0)
set_relay(LOWER_DIRECTION,0)
def open_lower(self) :
""" Open lower shutter asynchronously
"""
if self.is_upper_open == True :
print('opening lower')
set_relay(LOWER_POWER,0)
set_relay(LOWER_DIRECTION,1)
set_relay(LOWER_POWER,1)
t=Timer(self.lower_time,self.set_lower_open)
t.start()
else :
raise RuntimeError('cannot open lower shutter when upper shutter is not open')
def close_lower(self) :
""" Close lower shutter
"""
if self.is_upper_open == True :
print('closing lower')
set_relay(LOWER_POWER,0)
set_relay(LOWER_DIRECTION,0)
set_relay(LOWER_POWER,1)
t=Timer(self.lower_time+10,self.set_lower_closed)
t.start()
else :
print('cannot close lower shutter when upper shutter is not open')
def open_shutter(self) :
""" Open the dome shutter(s). If lower, wait 10s after starting upper to start lower
"""
if not self.safety.issafe() :
if self.logger is not None : self.logger.info('cannot open shutter due to weather condition!')
return
self.open_upper()
if self.lower :
t=Timer(self.upper_time+10,self.open_lower)
t.start()
def close_shutter(self) :
""" Close the dome shutter(s). If lower, wait 30s after starting lower to start upper
"""
if self.lower :
self.close_lower()
t=Timer(self.lower_time+10,self.close_upper)
t.start()
else :
self.close_upper()
def atpark(self) :
""" Is telescope at park position?
"""
az = self.get_azimuth()
if abs(az-self.park_position) < DOME_TOLERANCE :
return True
else :
return False
def set_park(self) :
""" Set park position to current position
"""
self.park_position = self.get_azimuth()
def park(self) :
""" Send dome to park asynchronously
"""
if not self.atpark() :
t=Thread(target=self.sendpark)
t.start()
def sendpark(self) :
""" Go to park
"""
if self.logger is not None : self.logger.info('sending to park')
self.slewtoazimuth(self.park_position)
def abort_slew(self) :
""" Turn off dome rotation power
"""
if self.logger is not None : self.logger.info('abort: turning dome rotation power off')
self.stop()
def stop(self) :
""" Stop dome rotation
"""
if self.logger is not None : self.logger.info('stopping dome rotation ')
set_relay(DOME_POWER,0)
self.slewing = False
self.enc.delta=np.array([0,1,-1,2,-1,0,-2,1,1,-2,0,-1,2,-1,1,0])
#self.logger.debug('counter: {:d}'.format(self.enc.counter))
#self.logger.debug('counter16: {:d}'.format(self.enc.counter16))
#self.logger.debug('delta: {:f}'.format(self.enc.delta))
#self.logger.debug('total events: {:d}'.format(np.sum(self.enc.counter16)))
#self.logger.debug('sum: {:f}'.format(np.sum(self.enc.delta*self.enc.counter16)))
#self.logger.debug('pos: {:d}'.format(self.enc.pos))
def rotate(self,cw=True) :
""" Start dome rotating
"""
self.stop()
if self.verbose and self.logger is not None: self.logger.info('starting dome rotation: {:d} '.format(cw))
self.enc.reset()
if cw :
set_relay(DOME_DIRECTION,0)
#self.enc.delta=[0,1,3,2,3,0,2,1,1,2,0,3,2,3,1,0]
else :
set_relay(DOME_DIRECTION,1)
#self.enc.delta=[0,-3,-1,-2,-1,0,-2,-3,-3,-2,0,-1,-2,-1,-3,0]
set_relay(DOME_POWER,1)
self.slewing = True
def get_azimuth(self) :
""" Get current dome azimuth
"""
self.azimuth = self.enc.pos/steps_per_degree + HOME_POSITION
self.azimuth %= 360
return self.azimuth
def slewtoazimuth(self,azimuth) :
""" Start slew to requested azimuth
"""
if not self.safety.issafe() and \
not (self.shutterstatus == ShutterState.shutterClosed.value) and\
not (self.shutterstatus == ShutterState.shutterClosing.value) :
self.close_shutter()
if self.logger is not None : self.logger.info('closing dome from slewtoazimuth')
else :
t=Thread(target=lambda : self.gotoazimuth(azimuth))
t.start()
def gotoazimuth(self,azimuth,timeout=180) :
""" slew to requested azimuth
"""
current_az = self.azimuth
if self.logger is not None : self.logger.info('desired_az: {:f}'.format(azimuth))
if self.logger is not None : self.logger.info(' current_az: {:f}'.format(current_az))
delta = diff(azimuth,current_az)
if self.logger is not None : self.logger.info(' delta: {:f}'.format(delta))
if abs(delta) < DOME_TOLERANCE :
return
elif delta > 0 :
self.rotate(1)
#if going CW, undershoot to allow coast
delta = -1.0
else :
self.rotate(0)
#if going CCW, overshoot to allow coast
delta = +1.0
t=timer.Timer()
t.start()
x = lambda azimuth : diff(azimuth,self.get_azimuth())
while abs(x(azimuth+delta)) > DOME_TOLERANCE/4. and t.elapsed()<timeout :
#print(self.azimuth)
#time.sleep(1)
continue
self.stop()
if t.elapsed() > timeout :
if self.logger is not None : self.logger.info('Rotate timer expired before reaching desired azimuth !')
if self.logger is not None : self.logger.info('self.azimuth: {:f}'.format(self.azimuth))
t.stop()
time.sleep(2)
self.save_position()
def slewtoaltitude(self, altitude) :
return
#raise RuntimeError('altitude slew not implemented')
def slave(self,val) :
#self.slaved=True
raise RuntimeError('slaving not available')
def set_lower(self,val) :
self.logger.info('setting lower to {:d}: '.format(val))
self.lower = val
def set_lower_time(self,val) :
self.logger.info('setting lower_time to {:d}: '.format(val))
self.lower_time = val
def set_upper_time(self,val) :
self.logger.info('setting upper_time to {:d}: '.format(val))
self.upper_time = val
[docs]
def set_relay(bit,value) :
""" Utility routine to turn RELAYplates relays on (1) or off
Always sleep 200 ms after a call to give system time to respond before subsequent call
"""
if value == 1 :
RELAY.relayON(0,bit)
else :
RELAY.relayOFF(0,bit)
time.sleep(0.2)
return
def get_bit(bit,fake=None) :
if fake is not None :
return fake
else :
return 0
[docs]
def diff(azimuth,current_az) :
""" Get proper delta dome motion
"""
delta = ( azimuth - current_az )
if delta > 180 : delta-=360
elif delta < -180 : delta+=360
return delta