Skip to content

Commit

Permalink
Utilize zigpy energy scanning API (#31)
Browse files Browse the repository at this point in the history
* Implement energy scanning using the new zigpy API

* Disable bellows UART thread

* Bump minimum package versions
  • Loading branch information
puddly authored Mar 31, 2023
1 parent d380e1f commit 54ec10f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ dependencies = [
"click",
"coloredlogs",
"scapy",
"zigpy>=0.53.0",
"bellows>=0.34.3",
"zigpy-deconz>=0.18.0",
"zigpy-znp>=0.8.0",
"zigpy>=0.54.0",
"bellows>=0.35.0",
"zigpy-deconz>=0.12.0",
"zigpy-znp>=0.10.0",
"zigpy-xbee>=0.17.0",
]

[tool.setuptools.packages.find]
Expand Down
27 changes: 8 additions & 19 deletions zigpy_cli/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import zigpy.zdo.types

from zigpy_cli.cli import cli, click_coroutine
from zigpy_cli.common import HEX_OR_DEC_INT
from zigpy_cli.const import RADIO_LOGGING_CONFIGS, RADIO_TO_PACKAGE, RADIO_TO_PYPI

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,7 +54,9 @@ async def radio(ctx, radio, port, baudrate=None, database=None):
{
"device": {"path": port},
"backup_enabled": False,
"startup_energy_scan": False,
"database_path": database,
"use_thread": False,
}
)

Expand Down Expand Up @@ -171,45 +172,33 @@ async def permit(app, join_time):

@radio.command()
@click.pass_obj
@click.option("--nwk", type=HEX_OR_DEC_INT, default=0x0000)
@click.option("-n", "--num-scans", type=int, default=-1)
@click_coroutine
async def energy_scan(app, nwk, num_scans):
async def energy_scan(app, num_scans):
await app.startup()
LOGGER.info("Running scan...")

# Temporarily create a zigpy device for scans not using the coordinator itself
if nwk != 0x0000:
app.add_device(
nwk=nwk,
ieee=zigpy.types.EUI64.convert("AA:AA:AA:AA:AA:AA:AA:AA"),
)

# We compute an average over the last 5 scans
channel_energies = collections.defaultdict(lambda: collections.deque([], maxlen=5))

for scan in itertools.count():
if num_scans != -1 and scan > num_scans:
break

rsp = await app.get_device(nwk=nwk).zdo.Mgmt_NWK_Update_req(
zigpy.zdo.types.NwkUpdate(
ScanChannels=zigpy.types.Channels.ALL_CHANNELS,
ScanDuration=0x02,
ScanCount=1,
)
results = await app.energy_scan(
channels=zigpy.types.Channels.ALL_CHANNELS, duration_exp=2, count=1
)

_, scanned_channels, _, _, energy_values = rsp

for channel, energy in zip(scanned_channels, energy_values):
for channel, energy in results.items():
energies = channel_energies[channel]
energies.append(energy)

total = 0xFF * len(energies)

print(f"Channel energy (mean of {len(energies)} / {energies.maxlen}):")
print("------------------------------------------------")
print(" ! Different radios compute channel energy differently")
print()
print(" + Lower energy is better")
print(" + Active Zigbee networks on a channel may still cause congestion")
print(" + TX on 26 in North America may be with lower power due to regulations")
Expand Down

0 comments on commit 54ec10f

Please sign in to comment.