forked from pla10/hassio-addons
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipewire-mqtt.py
executable file
·226 lines (194 loc) · 10.1 KB
/
pipewire-mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/python3
"""Monitor pipewire state."""
import json
import socket
import subprocess
import typing
import uuid
import dns.resolver
import paho.mqtt.client
# FIXME: Make async I/O work
# In my experience, the role can be whatever string I want it to be.
# But in practice, it's barely actually used.
# And in the documentation there's a limited number of options for it.
# https://www.freedesktop.org/wiki/Software/PulseAudio/Documentation/Developer/Clients/ApplicationProperties/#pa_prop_media_role
PW_ROLE_NUM_STREAMS: dict[str, set] = {
# Should be ignored as this is what we're controling anyway.
"music": set(),
# Must mute the music and steal all attention
"phone": set(),
# Should probably dim the lights the same as TV inhibitor
"game": set(),
# Should this dim the lights? Probably not, this'll be YouTube videos and such
"video": set(),
# Chat notification blips, should this mute the music?
# Ideally, probably not.
# Realistically I probably can't differentiate it from 'phone' because I'm setting role at the application level.
"event": set(),
# I personally don't expect to use this one currently.
# It maybe shouldn't mute the music, but maybe lower the music volume.
"a11y": set(),
# Wtf, are these ever even gonna be used?
"animation": set(),
"production": set(),
# FIXME: Should this be phone?
# This is what I'm seeing with Discord, even though I've set `Environment=PULSE_PROP="media.role=phone"`
"communication": set(),
# Not a real option, I'm just using this for anything that doesn't have a defined role
# FIXME: Should probably use this for any roles that are not in this list either
'other': set(),
}
MQTT_TOPIC_BASE: str = f"homeassistant/binary_sensor/{socket.gethostname()}"
AVAILABILITY_TOPIC: str = '/'.join((MQTT_TOPIC_BASE, "availability"))
def mqtt_discovery(client: paho.mqtt.client.Client):
"""Send MQTT discovery info for Home Assistant."""
# FIXME: Why do the DLNA & Nmap devices combine into one, but I can't make this combine with them?
mac_address: str = f'{uuid.getnode():02x}'
hex_mac: str = ':'.join(mac_address[i:i + 2] for i in range(0, len(mac_address), 2))
for role in PW_ROLE_NUM_STREAMS:
client.publish(topic='/'.join((MQTT_TOPIC_BASE, f"pipewire_{role}", "config")),
payload=json.dumps({
"availability_topic": AVAILABILITY_TOPIC,
"device": {
"connections": [("mac", hex_mac)],
"name": socket.gethostname()},
"device_class": "sound",
# "category": "config/diagnostic", # FIXME: wtf is this?
# "icon": "mdi:monitor-speaker",
# # FIXME: Not currently sending attributes anywhere
# "json_attributes_topic": '/'.join((MQTT_TOPIC_BASE, "music_inhibitor", "attributes")),
"name": f"Audio playback - {role}",
"state_topic": '/'.join((MQTT_TOPIC_BASE, f"pipewire_{role}", "state")),
"unique_id": 'pipewire'+hex_mac+role,
}),
retain=True)
def read_pretty_json_list(fp: typing.TextIO):
"""Read & decode the next pretty-printed JSON list from file object."""
line: str = fp.readline()
if line != "[\n":
raise NotImplementedError("Must start with a '['")
json_string = line
while line != "]\n":
line: str = fp.readline()
json_string += line
return json.loads(json_string)
def pipewire_events():
"""."""
pw_dump = subprocess.Popen(['pw-dump', '--monitor', '--no-colors'],
text=True, stdout=subprocess.PIPE)
while pw_dump.poll() is None:
for event in read_pretty_json_list(pw_dump.stdout):
yield event
# Since upstream **still** hasn't fixed this 3yr old bug
# https://github.com/eclipse/paho.mqtt.python/issues/493
# I've copy/pasted upstream's connect_srv function to fix it internally.
# I've also pushed my own pull request for them
# https://github.com/eclipse/paho.mqtt.python/pull/759
# but I get the feeling it's going to be ignored
# -- mijofa, 2023-10-23
def connect_srv(mqtt_client, domain=None, *args, **kwargs):
"""Connect to a remote broker.
domain is the DNS domain to search for SRV records; if None,
try to determine local domain name.
All other args are used as is for connect()
"""
if domain is None:
domain = socket.getfqdn()
domain = domain[domain.find('.') + 1:]
try:
rr = '_mqtt._tcp.%s' % domain
if mqtt_client._ssl:
# IANA specifies secure-mqtt (not mqtts) for port 8883
rr = '_secure-mqtt._tcp.%s' % domain
answers = []
for answer in dns.resolver.resolve(rr, dns.rdatatype.SRV):
addr = answer.target.to_text()[:-1]
answers.append(
(addr, answer.port, answer.priority, answer.weight))
except (dns.resolver.NXDOMAIN,
dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain))
# FIXME: doesn't account for weight
for answer in answers:
host, port, prio, weight = answer
try:
return mqtt_client.connect(host, port, *args, **kwargs)
except Exception:
raise
raise ValueError("No SRV hosts responded")
mqtt_client = paho.mqtt.client.Client()
# NOTE: The will must be set before connecting.
mqtt_client.will_set(topic=AVAILABILITY_TOPIC, payload='offline')
# FIXME: Try anonymous, and fallback on guest:guest when that fails
mqtt_client.username_pw_set(username='guest', password='guest')
connect_srv(mqtt_client)
mqtt_client.loop_start()
# FIXME: Can I make this wait until after the first load of events is handled before first updating the payload?
# Because that first set of dumped events tends to bounce the inhibitor on/off annoyingly.
# FIXME: Notify Systemd that we're ready, perhaps at the same time as ^ FIXME?
mqtt_discovery(mqtt_client)
mqtt_client.publish(topic=AVAILABILITY_TOPIC, payload='online', retain=False)
# Set everything to off before we get started
for mqtt_topic in ('/'.join((MQTT_TOPIC_BASE, f"pipewire_{role}", "state")) for role in PW_ROLE_NUM_STREAMS):
mqtt_client.publish(topic=mqtt_topic,
payload='OFF',
retain=True)
# This is a mapping of {stream_id: stream_role} so that we don't have to iterate all of 'PW_ROLE_NUM_STREAMS' to find a stream.
playback_streams: dict[int, str] = {}
for ev in pipewire_events():
if 'type' not in ev and ev.get('info') is None:
# This is a node being removed, but we don't know what type of node
if ev['id'] in playback_streams:
if (stream_role := playback_streams.pop(ev['id'])) in PW_ROLE_NUM_STREAMS:
PW_ROLE_NUM_STREAMS[stream_role].remove(ev['id'])
print(f"{ev['id']} - del {stream_role} stream,",
f"total = {len(PW_ROLE_NUM_STREAMS[stream_role])}",
PW_ROLE_NUM_STREAMS[stream_role])
if len(PW_ROLE_NUM_STREAMS[stream_role]) == 0:
mqtt_client.publish(topic='/'.join((MQTT_TOPIC_BASE, f"pipewire_{stream_role}", "state")),
payload='OFF',
retain=True)
elif ev.get('type') == 'PipeWire:Interface:Node':
match ev['info']['props'].get('media.class'):
case 'Stream/Output/Audio':
# Pretty sure this is a playback stream.
if ev['info']['props'].get('node.passive', False) is True:
# I've seen this with internal loopback & echo canceling modules.
continue
elif 'state' not in ev['info']['change-mask'] and 'params' not in ev['info']['change-mask']:
# Don't care about anything other than state changes
continue
if ev['info']['state'] != 'running':
# FIXME: Why is this not relevant?
continue
# FIXME: WTF is this titlecased?
stream_role: str = ev['info']['props'].get('media.role', 'other').lower()
if stream_role not in PW_ROLE_NUM_STREAMS:
stream_role = 'other'
# Don't bother logging & publishing an update if we've already handled this one
# Just reduces log spam
if ev['id'] in PW_ROLE_NUM_STREAMS[stream_role]:
continue
PW_ROLE_NUM_STREAMS[stream_role].add(ev['id'])
print(f"{ev['id']} - new {stream_role} stream ",
ev['info']['props'].get('node.name',
ev['info']['props'].get('application.name',
ev['info']['props'].get('application.process.binary'))),
f"[{ev['info']['props'].get('application.process.id')}], ",
f"total = {len(PW_ROLE_NUM_STREAMS[stream_role])}",
sep='')
mqtt_client.publish(topic='/'.join((MQTT_TOPIC_BASE,
f"pipewire_{stream_role}",
"state")),
payload='ON',
retain=True)
# Keep record of what role this was so that we can track it on deletion
playback_streams[ev['id']] = stream_role
case 'Audio/Sink':
# Pretty sure this is output sinks
if 'params' not in ev['info']['change-mask']:
# Don't care about changes outside of 'params'
continue
# Ping the availability topic in case HA has restarted and forgotten latest state
mqtt_client.publish(topic=AVAILABILITY_TOPIC, payload='online', retain=False)