Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
gmegh committed Sep 13, 2023
1 parent d480aa2 commit dde07e9
Showing 1 changed file with 139 additions and 0 deletions.
139 changes: 139 additions & 0 deletions tests/test_maintel_closed_loop_cwfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# This file is part of ts_standardscripts
#
# Developed for the LSST Telescope and Site Systems.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import asyncio
import random
import types
import unittest

import numpy as np
from lsst.ts import standardscripts
from lsst.ts.salobj import State
from lsst.ts.standardscripts.maintel import ClosedLoopCwfs

random.seed(47) # for set_random_lsst_dds_partition_prefix


class TestClosedLoopCwfs(
standardscripts.BaseScriptTestCase, unittest.IsolatedAsyncioTestCase
):
async def basic_make_script(self, index):
self.script = ClosedLoopCwfs(index=index, add_remotes=False)

# MTCS mocks
self.script.mtcs.assert_all_enabled = unittest.mock.AsyncMock()

# LSSTCam mocks
self.script.lsstcam.assert_all_enabled = unittest.mock.AsyncMock()
self.script.lsstcam.take_acq = unittest.mock.AsyncMock()

# MTAOS mocks
self.script.mtaos.rem.mtaos = unittest.mock.AsyncMock()
self.script.mtaos.rem.mtaos.configure_mock(
**{
"cmd_runWEP.set_start": unittest.mock.AsyncMock(),
"cmd_runOFC.set_start": self.get_offsets,
"evt_wavefrontError.next": self.return_zernikes,
"evt_degreeOfFreedom.next": self.return_offsets,
"cmd_issueCorrection.start": self.apply_offsets,
}
)

self.state_0 = np.zeros(50)
self.state_0[:5] += 1

self.corrections = np.zeros(50)
return (self.script,)

async def get_summary_state(self, *args, **kwargs):
return types.SimpleNamespace(summaryState=State.ENABLED)

async def return_zernikes(self, *args, **kwargs):
return np.random.rand(19)

async def return_offsets(self, *args, **kwargs):
return self.corrections

async def apply_offsets(self, *args, **kwags):
await asyncio.sleep(0.5)
self.state_0 += self.corrections

async def get_offsets(self, *args, **kwags):
# return corrections to be non zero the first time this is called
await asyncio.sleep(0.5)
offsets = np.zeros(50)

if any(self.state_0):
offsets[:5] -= 0.5

return offsets

async def test_configure(self):
# Try configure with minimum set of parameters declared
async with self.make_script():
max_iter = 10
rotation_angle = 30.0
exposure_time = 30
filter = "r"
used_dofs = ["M2_dz", "M2_dx", "M2_dy", "M2_rx", "M2_ry"]
threshold = 1e-3
apply_corrections = True

await self.configure_script(
max_iter=max_iter,
rotation_angle=rotation_angle,
exposure_time=exposure_time,
filter=filter,
used_dofs=used_dofs,
threshold=threshold,
apply_corrections=apply_corrections,
)

assert self.script.max_iter == max_iter
assert self.script.rotation_angle == rotation_angle
assert self.script.exposure_time == exposure_time
assert self.script.filter == filter
assert self.script.used_dofs == [0, 1, 2, 3, 4]
assert self.script.threshold == threshold
assert self.script.apply_corrections == apply_corrections

async def test_run(self):
# Start the test itself
async with self.make_script():
await self.configure_script(
max_iter=10,
filter="r",
used_dofs=[0, 1, 2, 3, 4],
)

# Run the script
await self.run_script()

assert self.state_0 == [0, 0, 0, 0, 0]

async def test_executable(self):
scripts_dir = standardscripts.get_scripts_dir()
script_path = scripts_dir / "maintel" / "closed_loop_cwfs.py"
await self.check_executable(script_path)


if __name__ == "__main__":
unittest.main()

0 comments on commit dde07e9

Please sign in to comment.