BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
module.multicast.BoswatchModule Class Reference

Multicast module with multi-instance support and active trigger mechanism. More...

Public Member Functions

 __init__ (self, config)
 init preload some needed locals and then call onLoad() directly
 
 onLoad (self)
 Initialize module configuration and start the global cleanup thread.
 
 doWork (self, bwPacket)
 Process an incoming packet and handle multicast logic.
 
 onUnload (self)
 Unregister instance from the global cleanup process.
 

Data Fields

 instance_id
 
 name
 
- Data Fields inherited from module.moduleBase.ModuleBase
 config
 

Protected Member Functions

 _get_packet_data (self, bwPacket)
 Safely extract data dictionary from packet.
 
 _filter_output (self, result)
 Apply multicastRole filtering before output.
 
 _combine_results (self, *results)
 Combine multiple result sources into a single list or status.
 
 _should_output_packet (self, multicast_role)
 Check if packet should be output based on role.
 
 _add_tone_ric_packet (self, freq, packet_dict)
 Add a tone-RIC to the shared buffer.
 
 _get_queued_packets (self)
 Pop and return all packets currently in the static queue.
 
 _copy_packet_dict_to_packet (self, recipient_dict, packet, index=1)
 Copy dict fields to Packet with timestamp shift for DB uniqueness.
 
 _distribute_complete (self, freq, text_packet_dict)
 Create full multicast packets with message content.
 
 _create_incomplete_multicast (self, freq, recipient_dicts)
 Generate multicast packets for timeouts (no text message).
 
 _enrich_normal_alarm (self, bwPacket, packet_dict)
 Enrich a standard single alarm with multicast metadata.
 
 _handle_delimiter (self, freq, ric, bwPacket=None)
 Handle delimiter packet and clear orphaned tone-RICs.
 
 _set_mcast_metadata (self, packet, mode, role, source="", count="1", index="1")
 Helper to set standard multicast fields and register wildcards.
 
 _apply_list_tags (self, packet, recipient_dicts)
 Helper to aggregate fields from all recipients into comma-separated lists.
 
 _register_wildcard_safe (self, wildcard, field)
 Register wildcard if not already globally registered.
 
 _perform_instance_tick (self)
 Tick-entry point for this specific instance.
 
 _check_all_my_frequencies (self)
 Monitor timeouts for all frequencies assigned to this instance.
 
 _check_instance_auto_clear (self, freq)
 Check if frequency has exceeded timeout (called from doWork).
 
 _send_wakeup_trigger (self, freq, fallback_ric)
 Send a loopback trigger via socket to wake up the system.
 
- Protected Member Functions inherited from module.moduleBase.ModuleBase
 _cleanup (self)
 Cleanup routine calls onUnload() directly.
 
 _run (self, bwPacket)
 start an run of the module.
 
 _getStatistics (self)
 Returns statistical information's from last module run.
 

Static Protected Member Functions

 _global_cleanup_worker ()
 Static background thread that ticks all active module instances.
 
 _cleanup_hard_timeout_global ()
 Global failsafe for really old packets (ignores instance config).
 

Protected Attributes

 _my_frequencies
 
 _auto_clear_timeout
 
 _hard_timeout
 
 _delimiter_rics
 
 _text_rics
 
 _netident_rics
 
 _trigger_ric
 
 _trigger_ric_mode
 
 _trigger_host
 
 _trigger_port
 
 _block_delimiter
 
 _block_netident
 
- Protected Attributes inherited from module.moduleBase.ModuleBase
 _moduleName
 
 _cumTime
 
 _moduleTime
 
 _runCount
 
 _moduleErrorCount
 

Static Protected Attributes

 _tone_ric_packets = defaultdict(list)
 
 _last_tone_ric_time = defaultdict(float)
 
 _processing_text_ric = defaultdict(bool)
 
 _processing_text_ric_started = defaultdict(float)
 
 _lock = threading.Lock()
 
 _cleanup_thread = None
 
bool _running = False
 
 _wildcards_registered = set()
 
list _packet_queue = []
 
 _queue_lock = threading.Lock()
 
list _instances = []
 
str _TRIGGER_HOST = "127.0.0.1"
 
int _TRIGGER_PORT = 8080
 
str _MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
 
str _DEFAULT_TRIGGER_RIC = "9999999"
 
- Static Protected Attributes inherited from module.moduleBase.ModuleBase
list _modulesActive = []
 

Additional Inherited Members

- Static Public Member Functions inherited from module.moduleBase.ModuleBase
 registerWildcard (newWildcard, bwPacketField)
 Register a new wildcard.
 

Detailed Description

Multicast module with multi-instance support and active trigger mechanism.

This module handles multicast alarm distribution. It manages the correlation between tone-RICs (recipients) and text-RICs (message content), ensuring reliable alarm delivery even in complex multi-frequency scenarios.

Constructor & Destructor Documentation

◆ __init__()

module.multicast.BoswatchModule.__init__ (   self,
  moduleName 
)

init preload some needed locals and then call onLoad() directly

Reimplemented from module.moduleBase.ModuleBase.

64 def __init__(self, config):
65 super().__init__(__name__, config)
66

Member Function Documentation

◆ onLoad()

module.multicast.BoswatchModule.onLoad (   self)

Initialize module configuration and start the global cleanup thread.

Parameters
None
Returns
None

Reimplemented from module.moduleBase.ModuleBase.

67 def onLoad(self):
68 r"""!Initialize module configuration and start the global cleanup thread.
69
70 @param None
71 @return None"""
72 self._my_frequencies = set()
73 self.instance_id = hex(id(self))[-4:]
74 self.name = f"MCAST_{self.instance_id}"
75
76 self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
77 self._hard_timeout = self._auto_clear_timeout * 3
78
79 def parse_list(key):
80 val = self.config.get(key)
81 if val:
82 return [x.strip() for x in str(val).split(",") if x.strip()]
83 return []
84
85 self._delimiter_rics = parse_list("delimiterRics")
86 self._text_rics = parse_list("textRics")
87 self._netident_rics = parse_list("netIdentRics")
88
89 trigger_ric_cfg = self.config.get("triggerRic")
90 if trigger_ric_cfg:
91 self._trigger_ric = str(trigger_ric_cfg).strip()
92 self._trigger_ric_mode = "explicit"
93 else:
94 self._trigger_ric = None
95 self._trigger_ric_mode = "dynamic"
96
97 self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
98 self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
99
100 self._block_delimiter = bool(self._delimiter_rics)
101 self._block_netident = bool(self._netident_rics)
102
103 logging.info("[%s] Multicast module loaded", self.name)
104
105 with BoswatchModule._lock:
106 if self not in BoswatchModule._instances:
107 BoswatchModule._instances.append(self)
108
109 if not BoswatchModule._running:
110 BoswatchModule._running = True
111 BoswatchModule._cleanup_thread = threading.Thread(
112 target=BoswatchModule._global_cleanup_worker, daemon=True
113 )
114 BoswatchModule._cleanup_thread.start()
115 logging.info("Global multicast cleanup thread started")
116
117# ============================================================
118# MAIN PROCESSING
119# ============================================================
120

◆ doWork()

module.multicast.BoswatchModule.doWork (   self,
  bwPacket 
)

Process an incoming packet and handle multicast logic.

Parameters
bwPacketA BOSWatch packet instance or list of packets
Returns
bwPacket, a list of packets, or False if blocked

Reimplemented from module.moduleBase.ModuleBase.

121 def doWork(self, bwPacket):
122 r"""!Process an incoming packet and handle multicast logic.
123
124 @param bwPacket: A BOSWatch packet instance or list of packets
125 @return bwPacket, a list of packets, or False if blocked"""
126 if isinstance(bwPacket, list):
127 result_packets = []
128 for single_packet in bwPacket:
129 processed = self.doWork(single_packet)
130 if processed is None:
131 result_packets.append(single_packet)
132 elif processed is not False:
133 if isinstance(processed, list):
134 result_packets.extend(processed)
135 else:
136 result_packets.append(processed)
137 return result_packets if result_packets else None
138
139 packet_dict = self._get_packet_data(bwPacket)
140 msg = packet_dict.get("message")
141 ric = packet_dict.get("ric")
142 freq = packet_dict.get("frequency", "default")
143 mode = packet_dict.get("mode")
144
145 if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
146 if self._trigger_ric and ric != self._trigger_ric:
147 pass
148 else:
149 logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
150 queued = self._get_queued_packets()
151 return queued if queued else False
152
153 if mode != "pocsag":
154 queued = self._get_queued_packets()
155 return queued if queued else None
156
157 self._my_frequencies.add(freq)
158
159 is_text_ric = False
160 if self._text_rics:
161 is_text_ric = ric in self._text_rics and msg and msg.strip()
162 else:
163 with BoswatchModule._lock:
164 is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0
165
166 if is_text_ric:
167 with BoswatchModule._lock:
168 BoswatchModule._processing_text_ric[freq] = True
169 BoswatchModule._processing_text_ric_started[freq] = time.time()
170 logging.debug("[%s] Text-RIC %s detected", self.name, ric)
171
172 queued_packets = self._get_queued_packets()
173 incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
174
175 if self._netident_rics and ric in self._netident_rics:
176 self._set_mcast_metadata(bwPacket, "control", "netident")
177 result = self._combine_results(incomplete_packets, queued_packets, [bwPacket])
178 return self._filter_output(result)
179
180 if self._delimiter_rics and ric in self._delimiter_rics:
181 delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
182 result = self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
183 return self._filter_output(result)
184
185 if not msg or not msg.strip():
186 self._add_tone_ric_packet(freq, packet_dict)
187 result = self._combine_results(incomplete_packets, queued_packets, False)
188 return self._filter_output(result)
189
190 if is_text_ric and msg:
191 logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
192 alarm_packets = self._distribute_complete(freq, packet_dict)
193 with BoswatchModule._lock:
194 BoswatchModule._processing_text_ric[freq] = False
195 BoswatchModule._processing_text_ric_started.pop(freq, None)
196
197 if not alarm_packets:
198 logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
199 normal = self._enrich_normal_alarm(bwPacket, packet_dict)
200 result = self._combine_results(normal, incomplete_packets, queued_packets)
201 else:
202 result = self._combine_results(alarm_packets, incomplete_packets, queued_packets)
203 return self._filter_output(result)
204
205 if msg:
206 normal = self._enrich_normal_alarm(bwPacket, packet_dict)
207 result = self._combine_results(normal, incomplete_packets, queued_packets)
208 return self._filter_output(result)
209
210 result = self._combine_results(incomplete_packets, queued_packets)
211 return self._filter_output(result)
212
213# ============================================================
214# PACKET PROCESSING HELPERS (called by doWork)
215# ============================================================
216

◆ _get_packet_data()

module.multicast.BoswatchModule._get_packet_data (   self,
  bwPacket 
)
protected

Safely extract data dictionary from packet.

Parameters
bwPacketPacket instance or dict
Returns
dict: The packet data dictionary
217 def _get_packet_data(self, bwPacket):
218 r"""!Safely extract data dictionary from packet.
219
220 @param bwPacket: Packet instance or dict
221 @return dict: The packet data dictionary"""
222 if isinstance(bwPacket, dict):
223 return bwPacket
224 try:
225 if hasattr(bwPacket, 'data'):
226 return bwPacket.data
227 return {}
228 except Exception:
229 return {}
230

◆ _filter_output()

module.multicast.BoswatchModule._filter_output (   self,
  result 
)
protected

Apply multicastRole filtering before output.

Parameters
resultSingle packet, list of packets, None or False
Returns
Final packet(s) or False if blocked
231 def _filter_output(self, result):
232 r"""!Apply multicastRole filtering before output.
233
234 @param result: Single packet, list of packets, None or False
235 @return Final packet(s) or False if blocked"""
236 if result is None or result is False:
237 return result
238
239 if isinstance(result, list):
240 filtered = [p for p in result if self._should_output_packet(p.get("multicastRole"))]
241 if not filtered:
242 logging.debug("All packets filtered out by multicastRole")
243 return False
244 return filtered if len(filtered) > 1 else filtered[0]
245 else:
246 if self._should_output_packet(result.get("multicastRole")):
247 return result
248 logging.debug("Packet filtered out: multicastRole=%s", result.get("multicastRole"))
249 return False
250

◆ _combine_results()

module.multicast.BoswatchModule._combine_results (   self,
results 
)
protected

Combine multiple result sources into a single list or status.

Parameters
resultsMultiple packet objects, lists, or booleans
Returns
combined list, False or None
251 def _combine_results(self, *results):
252 r"""!Combine multiple result sources into a single list or status.
253
254 @param results: Multiple packet objects, lists, or booleans
255 @return combined list, False or None"""
256 combined = []
257 has_false = False
258 for result in results:
259 if result is False:
260 has_false = True
261 continue
262 if result is None:
263 continue
264 if isinstance(result, list):
265 combined.extend(result)
266 else:
267 combined.append(result)
268 if combined:
269 return combined
270 return False if has_false else None
271

◆ _should_output_packet()

module.multicast.BoswatchModule._should_output_packet (   self,
  multicast_role 
)
protected

Check if packet should be output based on role.

Parameters
multicast_roleThe role string to check
Returns
bool: True if allowed
272 def _should_output_packet(self, multicast_role):
273 r"""!Check if packet should be output based on role.
274
275 @param multicast_role: The role string to check
276 @return bool: True if allowed"""
277 if self._block_delimiter and multicast_role == "delimiter":
278 return False
279 if self._block_netident and multicast_role == "netident":
280 return False
281 return True
282
283# ============================================================
284# TONE-RIC BUFFER MANAGEMENT
285# ============================================================
286

◆ _add_tone_ric_packet()

module.multicast.BoswatchModule._add_tone_ric_packet (   self,
  freq,
  packet_dict 
)
protected

Add a tone-RIC to the shared buffer.

Parameters
freqFrequency identifier
packet_dictDictionary containing packet data
Returns
None
287 def _add_tone_ric_packet(self, freq, packet_dict):
288 r"""!Add a tone-RIC to the shared buffer.
289
290 @param freq: Frequency identifier
291 @param packet_dict: Dictionary containing packet data
292 @return None"""
293 with BoswatchModule._lock:
294 stored_packet = packet_dict.copy()
295 stored_packet['_multicast_timestamp'] = time.time()
296 BoswatchModule._tone_ric_packets[freq].append(stored_packet)
297 BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
298 logging.info("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq)
299

◆ _get_queued_packets()

module.multicast.BoswatchModule._get_queued_packets (   self)
protected

Pop and return all packets currently in the static queue.

Parameters
None
Returns
list: List of packets or None
300 def _get_queued_packets(self):
301 r"""!Pop and return all packets currently in the static queue.
302
303 @param None
304 @return list: List of packets or None"""
305 with BoswatchModule._queue_lock:
306 if BoswatchModule._packet_queue:
307 packets = BoswatchModule._packet_queue[:]
308 BoswatchModule._packet_queue.clear()
309 return packets
310 return None
311
312# ============================================================
313# MULTICAST PACKET CREATION
314# ============================================================
315

◆ _copy_packet_dict_to_packet()

module.multicast.BoswatchModule._copy_packet_dict_to_packet (   self,
  recipient_dict,
  packet,
  index = 1 
)
protected

Copy dict fields to Packet with timestamp shift for DB uniqueness.

Parameters
recipient_dictSource dictionary
packetTarget Packet object
indexPacket index (1-based) - shifts timestamp by milliseconds
Returns
None
316 def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1):
317 r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness.
318
319 @param recipient_dict: Source dictionary
320 @param packet: Target Packet object
321 @param index: Packet index (1-based) - shifts timestamp by milliseconds
322 @return None"""
323 for k, v in recipient_dict.items():
324 if k.startswith('_'):
325 continue
326 if k == 'timestamp' and index > 1:
327 try:
328 dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
329 dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
330 packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
331 except:
332 packet.set(k, str(v))
333 else:
334 packet.set(k, str(v))
335

◆ _distribute_complete()

module.multicast.BoswatchModule._distribute_complete (   self,
  freq,
  text_packet_dict 
)
protected

Create full multicast packets with message content.

Parameters
freqFrequency identifier
text_packet_dictData of the message-carrying packet
Returns
list: List of fully populated Packet instances
336 def _distribute_complete(self, freq, text_packet_dict):
337 r"""!Create full multicast packets with message content.
338
339 @param freq: Frequency identifier
340 @param text_packet_dict: Data of the message-carrying packet
341 @return list: List of fully populated Packet instances"""
342 with BoswatchModule._lock:
343 recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
344 BoswatchModule._tone_ric_packets[freq].clear()
345 BoswatchModule._last_tone_ric_time.pop(freq, None)
346
347 if not recipient_dicts:
348 return []
349 text_ric = text_packet_dict.get("ric")
350 message_text = text_packet_dict.get("message")
351 alarm_packets = []
352
353 for idx, recipient_dict in enumerate(recipient_dicts, 1):
354 p = Packet()
355 self._copy_packet_dict_to_packet(recipient_dict, p, idx)
356 p.set("message", message_text)
357 self._apply_list_tags(p, recipient_dicts)
358 self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx)
359 alarm_packets.append(p)
360
361 logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric)
362 return alarm_packets
363

◆ _create_incomplete_multicast()

module.multicast.BoswatchModule._create_incomplete_multicast (   self,
  freq,
  recipient_dicts 
)
protected

Generate multicast packets for timeouts (no text message).

Parameters
freqFrequency identifier
recipient_dictsList of recipient data dictionaries
Returns
list: List of incomplete Packet instances
364 def _create_incomplete_multicast(self, freq, recipient_dicts):
365 r"""!Generate multicast packets for timeouts (no text message).
366
367 @param freq: Frequency identifier
368 @param recipient_dicts: List of recipient data dictionaries
369 @return list: List of incomplete Packet instances"""
370 if not recipient_dicts:
371 return []
372 first_ric = recipient_dicts[0].get("ric", "unknown")
373 incomplete_packets = []
374 for idx, recipient_dict in enumerate(recipient_dicts, 1):
375 p = Packet()
376 self._copy_packet_dict_to_packet(recipient_dict, p, idx)
377 p.set("message", "")
378 self._apply_list_tags(p, recipient_dicts)
379 self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx)
380 incomplete_packets.append(p)
381 return incomplete_packets
382

◆ _enrich_normal_alarm()

module.multicast.BoswatchModule._enrich_normal_alarm (   self,
  bwPacket,
  packet_dict 
)
protected

Enrich a standard single alarm with multicast metadata.

Parameters
bwPacketTarget Packet object
packet_dictSource data dictionary
Returns
list: List containing the enriched packet
383 def _enrich_normal_alarm(self, bwPacket, packet_dict):
384 r"""!Enrich a standard single alarm with multicast metadata.
385
386 @param bwPacket: Target Packet object
387 @param packet_dict: Source data dictionary
388 @return list: List containing the enriched packet"""
389 self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
390 self._apply_list_tags(bwPacket, [packet_dict])
391 self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
392 return [bwPacket]
393

◆ _handle_delimiter()

module.multicast.BoswatchModule._handle_delimiter (   self,
  freq,
  ric,
  bwPacket = None 
)
protected

Handle delimiter packet and clear orphaned tone-RICs.

Parameters
freqFrequency identifier
ricDelimiter RIC
bwPacketOptional delimiter packet instance
Returns
list: Incomplete packets or delimiter control packet
394 def _handle_delimiter(self, freq, ric, bwPacket=None):
395 r"""!Handle delimiter packet and clear orphaned tone-RICs.
396
397 @param freq: Frequency identifier
398 @param ric: Delimiter RIC
399 @param bwPacket: Optional delimiter packet instance
400 @return list: Incomplete packets or delimiter control packet"""
401 with BoswatchModule._lock:
402 orphaned = BoswatchModule._tone_ric_packets[freq].copy()
403 BoswatchModule._tone_ric_packets[freq].clear()
404 BoswatchModule._last_tone_ric_time.pop(freq, None)
405 BoswatchModule._processing_text_ric[freq] = False
406
407 if orphaned:
408 age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
409 logging.info("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds)
410 return self._create_incomplete_multicast(freq, orphaned)
411 if bwPacket is not None:
412 self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0")
413 return [bwPacket]
414 return None
415
416# ============================================================
417# PACKET METADATA HELPERS
418# ============================================================
419

◆ _set_mcast_metadata()

module.multicast.BoswatchModule._set_mcast_metadata (   self,
  packet,
  mode,
  role,
  source = "",
  count = "1",
  index = "1" 
)
protected

Helper to set standard multicast fields and register wildcards.

Parameters
packetThe Packet instance to modify
modemulticastMode (complete, incomplete, single, control)
rolemulticastRole (recipient, single, delimiter, netident)
sourceThe originating RIC
countTotal number of recipients
indexCurrent recipient index
Returns
None
420 def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"):
421 r"""!Helper to set standard multicast fields and register wildcards.
422
423 @param packet: The Packet instance to modify
424 @param mode: multicastMode (complete, incomplete, single, control)
425 @param role: multicastRole (recipient, single, delimiter, netident)
426 @param source: The originating RIC
427 @param count: Total number of recipients
428 @param index: Current recipient index
429 @return None"""
430 mapping = {
431 "multicastMode": (mode, "{MCAST_MODE}"),
432 "multicastRole": (role, "{MCAST_ROLE}"),
433 "multicastSourceRic": (source, "{MCAST_SOURCE}"),
434 "multicastRecipientCount": (str(count), "{MCAST_COUNT}"),
435 "multicastRecipientIndex": (str(index), "{MCAST_INDEX}")
436 }
437 for key, (val, wildcard) in mapping.items():
438 packet.set(key, val)
439 self._register_wildcard_safe(wildcard, key)
440

◆ _apply_list_tags()

module.multicast.BoswatchModule._apply_list_tags (   self,
  packet,
  recipient_dicts 
)
protected

Helper to aggregate fields from all recipients into comma-separated lists.

Parameters
packetThe target Packet instance
recipient_dictsList of dictionaries of all recipients in this group
Returns
None
441 def _apply_list_tags(self, packet, recipient_dicts):
442 r"""!Helper to aggregate fields from all recipients into comma-separated lists.
443
444 @param packet: The target Packet instance
445 @param recipient_dicts: List of dictionaries of all recipients in this group
446 @return None"""
447 all_fields = set()
448 for r in recipient_dicts:
449 all_fields.update(k for k in r.keys() if not k.startswith('_'))
450
451 for f in sorted(all_fields):
452 list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts])
453 list_key = f"{f}_list"
454 packet.set(list_key, list_val)
455 self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key)
456

◆ _register_wildcard_safe()

module.multicast.BoswatchModule._register_wildcard_safe (   self,
  wildcard,
  field 
)
protected

Register wildcard if not already globally registered.

Parameters
wildcardThe wildcard string (e.g. {MCAST_MODE})
fieldThe packet field name
Returns
None
457 def _register_wildcard_safe(self, wildcard, field):
458 r"""!Register wildcard if not already globally registered.
459
460 @param wildcard: The wildcard string (e.g. {MCAST_MODE})
461 @param field: The packet field name
462 @return None"""
463 if wildcard not in BoswatchModule._wildcards_registered:
464 self.registerWildcard(wildcard, field)
465 BoswatchModule._wildcards_registered.add(wildcard)
466
467# ============================================================
468# CLEANUP & TIMEOUT MANAGEMENT
469# ============================================================
470

◆ _global_cleanup_worker()

module.multicast.BoswatchModule._global_cleanup_worker ( )
staticprotected

Static background thread that ticks all active module instances.

Parameters
None
Returns
None
472 def _global_cleanup_worker():
473 r"""!Static background thread that ticks all active module instances.
474
475 @param None
476 @return None"""
477 logging.info("Global multicast cleanup ticker active")
478 while BoswatchModule._running:
479 time.sleep(1)
480 with BoswatchModule._lock:
481 active_instances = BoswatchModule._instances[:]
482 for instance in active_instances:
483 try:
484 instance._perform_instance_tick()
485 except Exception as e:
486 logging.error("Error in instance cleanup: %s", e)
487 if int(time.time()) % 60 == 0:
488 BoswatchModule._cleanup_hard_timeout_global()
489

◆ _perform_instance_tick()

module.multicast.BoswatchModule._perform_instance_tick (   self)
protected

Tick-entry point for this specific instance.

Parameters
None
Returns
None
490 def _perform_instance_tick(self):
491 r"""!Tick-entry point for this specific instance.
492
493 @param None
494 @return None"""
495 self._check_all_my_frequencies()
496

◆ _check_all_my_frequencies()

module.multicast.BoswatchModule._check_all_my_frequencies (   self)
protected

Monitor timeouts for all frequencies assigned to this instance.

Parameters
None
Returns
None
497 def _check_all_my_frequencies(self):
498 r"""!Monitor timeouts for all frequencies assigned to this instance.
499
500 @param None
501 @return None"""
502 incomplete_packets = []
503 trigger_data = []
504
505 with BoswatchModule._lock:
506 current_time = time.time()
507 for freq in list(self._my_frequencies):
508 if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
509 continue
510
511 if BoswatchModule._processing_text_ric.get(freq, False):
512 flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time)
513 if flag_age > 2:
514 BoswatchModule._processing_text_ric[freq] = False
515 BoswatchModule._processing_text_ric_started.pop(freq, None)
516 else:
517 continue
518
519 last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
520 if current_time - last_time > self._auto_clear_timeout:
521 recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
522 safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
523 trigger_data.append((freq, safe_ric))
524 BoswatchModule._tone_ric_packets[freq].clear()
525 BoswatchModule._last_tone_ric_time.pop(freq, None)
526
527 logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
528 packets = self._create_incomplete_multicast(freq, recipient_dicts)
529 if packets:
530 incomplete_packets.extend(packets)
531
532 if incomplete_packets:
533 with BoswatchModule._queue_lock:
534 BoswatchModule._packet_queue.extend(incomplete_packets)
535 for freq, safe_ric in trigger_data:
536 self._send_wakeup_trigger(freq, safe_ric)
537

◆ _check_instance_auto_clear()

module.multicast.BoswatchModule._check_instance_auto_clear (   self,
  freq 
)
protected

Check if frequency has exceeded timeout (called from doWork).

Parameters
freqFrequency identifier
Returns
list: Incomplete packets if timeout exceeded, else None
538 def _check_instance_auto_clear(self, freq):
539 r"""!Check if frequency has exceeded timeout (called from doWork).
540
541 @param freq: Frequency identifier
542 @return list: Incomplete packets if timeout exceeded, else None"""
543 with BoswatchModule._lock:
544 if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
545 return None
546 last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
547 if time.time() - last_time > self._auto_clear_timeout:
548 recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
549 BoswatchModule._tone_ric_packets[freq].clear()
550 BoswatchModule._last_tone_ric_time.pop(freq, None)
551 logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
552 return self._create_incomplete_multicast(freq, recipient_dicts)
553 return None
554

◆ _cleanup_hard_timeout_global()

module.multicast.BoswatchModule._cleanup_hard_timeout_global ( )
staticprotected

Global failsafe for really old packets (ignores instance config).

Parameters
None
Returns
None
556 def _cleanup_hard_timeout_global():
557 r"""!Global failsafe for really old packets (ignores instance config).
558
559 @param None
560 @return None"""
561 with BoswatchModule._lock:
562 current_time = time.time()
563 max_hard_timeout = 120
564 if BoswatchModule._instances:
565 max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances)
566 for freq in list(BoswatchModule._tone_ric_packets.keys()):
567 BoswatchModule._tone_ric_packets[freq] = [
568 p for p in BoswatchModule._tone_ric_packets[freq]
569 if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout
570 ]
571 if not BoswatchModule._tone_ric_packets[freq]:
572 del BoswatchModule._tone_ric_packets[freq]
573
574# ============================================================
575# TRIGGER SYSTEM
576# ============================================================
577

◆ _send_wakeup_trigger()

module.multicast.BoswatchModule._send_wakeup_trigger (   self,
  freq,
  fallback_ric 
)
protected

Send a loopback trigger via socket to wake up the system.

Parameters
freqFrequency identifier
fallback_ricRIC to use if no explicit trigger RIC is configured
Returns
None
578 def _send_wakeup_trigger(self, freq, fallback_ric):
579 r"""!Send a loopback trigger via socket to wake up the system.
580
581 @param freq: Frequency identifier
582 @param fallback_ric: RIC to use if no explicit trigger RIC is configured
583 @return None"""
584 try:
585 trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
586 payload = {
587 "timestamp": time.time(),
588 "mode": "pocsag",
589 "bitrate": "1200",
590 "ric": trigger_ric,
591 "subric": "1",
592 "subricText": "a",
593 "message": BoswatchModule._MAGIC_WAKEUP_MSG,
594 "clientName": "MulticastTrigger",
595 "inputSource": "loopback",
596 "frequency": freq
597 }
598 json_str = json.dumps(payload)
599 header = f"{len(json_str):<10}"
600 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
601 sock.settimeout(1.0)
602 sock.connect((self._trigger_host, self._trigger_port))
603 sock.sendall(header.encode('utf-8'))
604 sock.sendall(json_str.encode('utf-8'))
605 logging.debug("[%s] Wakeup trigger sent for freq %s (RIC=%s)", self.name, freq, trigger_ric)
606 except Exception as e:
607 logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
608
609# ============================================================
610# LIFECYCLE (End)
611# ============================================================
612

◆ onUnload()

module.multicast.BoswatchModule.onUnload (   self)

Unregister instance from the global cleanup process.

Parameters
None
Returns
None

Reimplemented from module.moduleBase.ModuleBase.

613 def onUnload(self):
614 r"""!Unregister instance from the global cleanup process.
615
616 @param None
617 @return None"""
618 with BoswatchModule._lock:
619 if self in BoswatchModule._instances:
620 BoswatchModule._instances.remove(self)
621 logging.debug("[%s] Multicast instance unloaded", self.name)

Field Documentation

◆ _tone_ric_packets

module.multicast.BoswatchModule._tone_ric_packets = defaultdict(list)
staticprotected

◆ _last_tone_ric_time

module.multicast.BoswatchModule._last_tone_ric_time = defaultdict(float)
staticprotected

◆ _processing_text_ric

module.multicast.BoswatchModule._processing_text_ric = defaultdict(bool)
staticprotected

◆ _processing_text_ric_started

module.multicast.BoswatchModule._processing_text_ric_started = defaultdict(float)
staticprotected

◆ _lock

module.multicast.BoswatchModule._lock = threading.Lock()
staticprotected

◆ _cleanup_thread

module.multicast.BoswatchModule._cleanup_thread = None
staticprotected

◆ _running

bool module.multicast.BoswatchModule._running = False
staticprotected

◆ _wildcards_registered

module.multicast.BoswatchModule._wildcards_registered = set()
staticprotected

◆ _packet_queue

list module.multicast.BoswatchModule._packet_queue = []
staticprotected

◆ _queue_lock

module.multicast.BoswatchModule._queue_lock = threading.Lock()
staticprotected

◆ _instances

list module.multicast.BoswatchModule._instances = []
staticprotected

◆ _TRIGGER_HOST

str module.multicast.BoswatchModule._TRIGGER_HOST = "127.0.0.1"
staticprotected

◆ _TRIGGER_PORT

int module.multicast.BoswatchModule._TRIGGER_PORT = 8080
staticprotected

◆ _MAGIC_WAKEUP_MSG

str module.multicast.BoswatchModule._MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
staticprotected

◆ _DEFAULT_TRIGGER_RIC

str module.multicast.BoswatchModule._DEFAULT_TRIGGER_RIC = "9999999"
staticprotected

◆ _my_frequencies

module.multicast.BoswatchModule._my_frequencies
protected

◆ instance_id

module.multicast.BoswatchModule.instance_id

◆ name

module.multicast.BoswatchModule.name

◆ _auto_clear_timeout

module.multicast.BoswatchModule._auto_clear_timeout
protected

◆ _hard_timeout

module.multicast.BoswatchModule._hard_timeout
protected

◆ _delimiter_rics

module.multicast.BoswatchModule._delimiter_rics
protected

◆ _text_rics

module.multicast.BoswatchModule._text_rics
protected

◆ _netident_rics

module.multicast.BoswatchModule._netident_rics
protected

◆ _trigger_ric

module.multicast.BoswatchModule._trigger_ric
protected

◆ _trigger_ric_mode

module.multicast.BoswatchModule._trigger_ric_mode
protected

◆ _trigger_host

module.multicast.BoswatchModule._trigger_host
protected

◆ _trigger_port

module.multicast.BoswatchModule._trigger_port
protected

◆ _block_delimiter

module.multicast.BoswatchModule._block_delimiter
protected

◆ _block_netident

module.multicast.BoswatchModule._block_netident
protected