BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
module.multicast.BoswatchModule Class Reference

Multicast module with multi-instance support and active trigger mechanism. More...

Public Member Functions

 __init__ (self, config)
 init preload some needed locals and then call onLoad() directly
 
 onLoad (self)
 Initialize module configuration and start the global cleanup thread.
 
 doWork (self, bwPacket)
 Process an incoming packet and handle multicast logic.
 
 onUnload (self)
 Unregister instance from the global cleanup process.
 

Data Fields

 instance_id
 
 name
 
- Data Fields inherited from module.moduleBase.ModuleBase
 config
 

Protected Member Functions

 _get_packet_data (self, bwPacket)
 Safely extract all fields from packet as a dictionary.
 
 _combine_results (self, *results)
 Combine multiple result sources into a single list or status.
 
 _add_tone_ric_packet (self, freq, packet_dict)
 Add a tone-RIC to the shared buffer.
 
 _get_queued_packets (self)
 Pop and return all packets currently in the static queue.
 
 _copy_packet_dict_to_packet (self, recipient_dict, packet, index=1)
 Copy dict fields to Packet with timestamp shift for DB uniqueness.
 
 _distribute_complete (self, freq, text_packet_dict)
 Create full multicast packets with message content.
 
 _create_incomplete_multicast (self, freq, recipient_dicts)
 Generate multicast packets for timeouts (no text message).
 
 _enrich_normal_alarm (self, bwPacket, packet_dict)
 Enrich a standard single alarm with multicast metadata.
 
 _handle_delimiter (self, freq, ric, bwPacket=None)
 Handle delimiter packet and clear orphaned tone-RICs.
 
 _set_mcast_metadata (self, packet, mode, role, source="", count="1", index="1")
 Helper to set standard multicast fields and register wildcards.
 
 _apply_list_tags (self, packet, recipient_dicts)
 Helper to aggregate fields from all recipients into comma-separated lists.
 
 _register_wildcard_safe (self, wildcard, field)
 Register wildcard if not already globally registered.
 
 _perform_instance_tick (self)
 Tick-entry point for this specific instance.
 
 _check_all_my_frequencies (self)
 Monitor timeouts for all frequencies assigned to this instance.
 
 _check_instance_auto_clear (self, freq)
 Check if frequency has exceeded timeout (called from doWork).
 
 _send_wakeup_trigger (self, freq, fallback_ric)
 Send a loopback trigger using the standard TCPClient class.
 
- Protected Member Functions inherited from module.moduleBase.ModuleBase
 _cleanup (self)
 Cleanup routine calls onUnload() directly.
 
 _run (self, bwPacket)
 start an run of the module.
 
 _getStatistics (self)
 Returns statistical information's from last module run.
 

Static Protected Member Functions

 _global_cleanup_worker ()
 Static background thread that ticks all active module instances.
 
 _cleanup_hard_timeout_global ()
 Global failsafe for really old packets (ignores instance config).
 

Protected Attributes

 _my_frequencies
 
 _auto_clear_timeout
 
 _hard_timeout
 
 _delimiter_rics
 
 _text_rics
 
 _netident_rics
 
 _trigger_ric
 
 _trigger_host
 
 _trigger_port
 
- Protected Attributes inherited from module.moduleBase.ModuleBase
 _moduleName
 
 _cumTime
 
 _moduleTime
 
 _runCount
 
 _moduleErrorCount
 

Static Protected Attributes

 _tone_ric_packets = defaultdict(list)
 
 _last_tone_ric_time = defaultdict(float)
 
 _processing_text_ric = defaultdict(bool)
 
 _processing_text_ric_started = defaultdict(float)
 
 _lock = threading.Lock()
 
 _cleanup_thread = None
 
bool _running = False
 
 _wildcards_registered = set()
 
list _packet_queue = []
 
 _queue_lock = threading.Lock()
 
list _instances = []
 
str _TRIGGER_HOST = "127.0.0.1"
 
int _TRIGGER_PORT = 8080
 
str _MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
 
str _DEFAULT_TRIGGER_RIC = "9999999"
 
- Static Protected Attributes inherited from module.moduleBase.ModuleBase
list _modulesActive = []
 

Additional Inherited Members

- Static Public Member Functions inherited from module.moduleBase.ModuleBase
 registerWildcard (newWildcard, bwPacketField)
 Register a new wildcard.
 

Detailed Description

Multicast module with multi-instance support and active trigger mechanism.

This module handles multicast alarm distribution. It manages the correlation between tone-RICs (recipients) and text-RICs (message content), ensuring reliable alarm delivery even in complex multi-frequency scenarios.

Constructor & Destructor Documentation

◆ __init__()

module.multicast.BoswatchModule.__init__ (   self,
  moduleName 
)

init preload some needed locals and then call onLoad() directly

Reimplemented from module.moduleBase.ModuleBase.

64 def __init__(self, config):
65 super().__init__(__name__, config)
66

Member Function Documentation

◆ onLoad()

module.multicast.BoswatchModule.onLoad (   self)

Initialize module configuration and start the global cleanup thread.

Parameters
None
Returns
None

Reimplemented from module.moduleBase.ModuleBase.

67 def onLoad(self):
68 r"""!Initialize module configuration and start the global cleanup thread.
69
70 @param None
71 @return None"""
72 self._my_frequencies = set()
73 self.instance_id = hex(id(self))[-4:]
74 self.name = f"MCAST_{self.instance_id}"
75
76 self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
77 self._hard_timeout = self._auto_clear_timeout * 3
78
79 def parse_list(key):
80 val = self.config.get(key)
81 if val:
82 return [x.strip() for x in str(val).split(",") if x.strip()]
83 return []
84
85 self._delimiter_rics = parse_list("delimiterRics")
86 self._text_rics = parse_list("textRics")
87 self._netident_rics = parse_list("netIdentRics")
88
89 trigger_ric_cfg = self.config.get("triggerRic")
90 if trigger_ric_cfg:
91 self._trigger_ric = str(trigger_ric_cfg).strip()
92 else:
93 self._trigger_ric = None
94
95 self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
96 self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
97
98 logging.info("[%s] Multicast module loaded", self.name)
99
100 with BoswatchModule._lock:
101 if self not in BoswatchModule._instances:
102 BoswatchModule._instances.append(self)
103
104 if not BoswatchModule._running:
105 BoswatchModule._running = True
106 BoswatchModule._cleanup_thread = threading.Thread(
107 target=BoswatchModule._global_cleanup_worker, daemon=True
108 )
109 BoswatchModule._cleanup_thread.start()
110 logging.info("Global multicast cleanup thread started")
111
112# ============================================================
113# MAIN PROCESSING
114# ============================================================
115

◆ doWork()

module.multicast.BoswatchModule.doWork (   self,
  bwPacket 
)

Process an incoming packet and handle multicast logic.

Enriches packets with multicast metadata (mode, role, source). Does NOT filter - all packets pass through, downstream modules handle filtering.

Parameters
bwPacketA BOSWatch packet instance or list of packets
Returns
bwPacket, a list of packets, or None if no processing

Reimplemented from module.moduleBase.ModuleBase.

116 def doWork(self, bwPacket):
117 r"""!Process an incoming packet and handle multicast logic.
118
119 Enriches packets with multicast metadata (mode, role, source).
120 Does NOT filter - all packets pass through, downstream modules handle filtering.
121
122 @param bwPacket: A BOSWatch packet instance or list of packets
123 @return bwPacket, a list of packets, or None if no processing"""
124 if isinstance(bwPacket, list):
125 result_packets = []
126 for single_packet in bwPacket:
127 processed = self.doWork(single_packet)
128 if processed is not None and processed is not False:
129 if isinstance(processed, list):
130 result_packets.extend(processed)
131 else:
132 result_packets.append(processed)
133 return result_packets if result_packets else None
134
135 packet_dict = self._get_packet_data(bwPacket)
136 msg = packet_dict.get("message")
137 ric = packet_dict.get("ric")
138 freq = packet_dict.get("frequency", "default")
139 mode = packet_dict.get("mode")
140
141 # Handle wakeup triggers
142 if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
143 if self._trigger_ric and ric != self._trigger_ric:
144 return None
145 logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
146 queued = self._get_queued_packets()
147 return queued if queued else None
148
149 # Only process POCSAG
150 if mode != "pocsag":
151 queued = self._get_queued_packets()
152 return queued if queued else None
153
154 self._my_frequencies.add(freq)
155
156 # Determine if this is a text-RIC
157 is_text_ric = False
158 if self._text_rics:
159 is_text_ric = ric in self._text_rics and msg and msg.strip()
160 else:
161 with BoswatchModule._lock:
162 is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0
163
164 if is_text_ric:
165 with BoswatchModule._lock:
166 BoswatchModule._processing_text_ric[freq] = True
167 BoswatchModule._processing_text_ric_started[freq] = time.time()
168
169 queued_packets = self._get_queued_packets()
170 incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
171
172 # === CONTROL PACKETS (netident, delimiter) ===
173 # Mark and pass through - no filtering!
174
175 if self._netident_rics and ric in self._netident_rics:
176 self._set_mcast_metadata(bwPacket, "control", "netident", ric)
177 return self._combine_results(incomplete_packets, queued_packets, [bwPacket])
178
179 if self._delimiter_rics and ric in self._delimiter_rics:
180 delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
181 return self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
182
183 # === TONE-RICs (no message) ===
184 if not msg or not msg.strip():
185 self._add_tone_ric_packet(freq, packet_dict)
186 return self._combine_results(incomplete_packets, queued_packets, False)
187
188 # === TEXT-RICs (with message) ===
189 if is_text_ric and msg:
190 logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
191 alarm_packets = self._distribute_complete(freq, packet_dict)
192 with BoswatchModule._lock:
193 BoswatchModule._processing_text_ric[freq] = False
194 BoswatchModule._processing_text_ric_started.pop(freq, None)
195
196 if not alarm_packets:
197 logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
198 normal = self._enrich_normal_alarm(bwPacket, packet_dict)
199 return self._combine_results(normal, incomplete_packets, queued_packets)
200 else:
201 return self._combine_results(alarm_packets, incomplete_packets, queued_packets)
202
203 # === SINGLE ALARM (message but no text-RICs configured) ===
204 if msg:
205 normal = self._enrich_normal_alarm(bwPacket, packet_dict)
206 return self._combine_results(normal, incomplete_packets, queued_packets)
207
208 return self._combine_results(incomplete_packets, queued_packets)
209
210# ============================================================
211# PACKET PROCESSING HELPERS (called by doWork)
212# ============================================================
213

◆ _get_packet_data()

module.multicast.BoswatchModule._get_packet_data (   self,
  bwPacket 
)
protected

Safely extract all fields from packet as a dictionary.

Handles both dict objects and Packet instances. Dynamically extracts all fields including those added by other modules.

Parameters
bwPacketPacket instance or dict
Returns
dict: Complete dictionary of all packet fields
214 def _get_packet_data(self, bwPacket):
215 r"""!Safely extract all fields from packet as a dictionary.
216
217 Handles both dict objects and Packet instances.
218 Dynamically extracts all fields including those added by other modules.
219
220 @param bwPacket: Packet instance or dict
221 @return dict: Complete dictionary of all packet fields"""
222 # 1. Fall: Es ist bereits ein Dictionary
223 if isinstance(bwPacket, dict):
224 return bwPacket.copy()
225
226 # 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet)
227 if hasattr(bwPacket, '_packet'):
228 return bwPacket._packet.copy()
229
230 # 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet'
231 try:
232 return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')}
233 except Exception as e:
234 logging.warning("[%s] Error: %s", self.name, e)
235 return {}
236

◆ _combine_results()

module.multicast.BoswatchModule._combine_results (   self,
results 
)
protected

Combine multiple result sources into a single list or status.

Parameters
resultsMultiple packet objects, lists, or booleans
Returns
combined list, False or None
237 def _combine_results(self, *results):
238 r"""!Combine multiple result sources into a single list or status.
239
240 @param results: Multiple packet objects, lists, or booleans
241 @return combined list, False or None"""
242 combined = []
243 has_false = False
244 for result in results:
245 if result is False:
246 has_false = True
247 continue
248 if result is None:
249 continue
250 if isinstance(result, list):
251 combined.extend(result)
252 else:
253 combined.append(result)
254 if combined:
255 return combined
256 return False if has_false else None
257
258# ============================================================
259# TONE-RIC BUFFER MANAGEMENT
260# ============================================================
261

◆ _add_tone_ric_packet()

module.multicast.BoswatchModule._add_tone_ric_packet (   self,
  freq,
  packet_dict 
)
protected

Add a tone-RIC to the shared buffer.

Parameters
freqFrequency identifier
packet_dictDictionary containing packet data
Returns
None
262 def _add_tone_ric_packet(self, freq, packet_dict):
263 r"""!Add a tone-RIC to the shared buffer.
264
265 @param freq: Frequency identifier
266 @param packet_dict: Dictionary containing packet data
267 @return None"""
268 with BoswatchModule._lock:
269 stored_packet = packet_dict.copy()
270 stored_packet['_multicast_timestamp'] = time.time()
271 BoswatchModule._tone_ric_packets[freq].append(stored_packet)
272 BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
273 logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq)
274

◆ _get_queued_packets()

module.multicast.BoswatchModule._get_queued_packets (   self)
protected

Pop and return all packets currently in the static queue.

Parameters
None
Returns
list: List of packets or None
275 def _get_queued_packets(self):
276 r"""!Pop and return all packets currently in the static queue.
277
278 @param None
279 @return list: List of packets or None"""
280 with BoswatchModule._queue_lock:
281 if BoswatchModule._packet_queue:
282 packets = BoswatchModule._packet_queue[:]
283 BoswatchModule._packet_queue.clear()
284 return packets
285 return None
286
287# ============================================================
288# MULTICAST PACKET CREATION
289# ============================================================
290

◆ _copy_packet_dict_to_packet()

module.multicast.BoswatchModule._copy_packet_dict_to_packet (   self,
  recipient_dict,
  packet,
  index = 1 
)
protected

Copy dict fields to Packet with timestamp shift for DB uniqueness.

Parameters
recipient_dictSource dictionary
packetTarget Packet object
indexPacket index (1-based) - shifts timestamp by milliseconds
Returns
None
291 def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1):
292 r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness.
293
294 @param recipient_dict: Source dictionary
295 @param packet: Target Packet object
296 @param index: Packet index (1-based) - shifts timestamp by milliseconds
297 @return None"""
298 for k, v in recipient_dict.items():
299 if k.startswith('_'):
300 continue
301 if k == 'timestamp' and index > 1:
302 try:
303 dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
304 dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
305 packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
306 except (ValueError, TypeError):
307 packet.set(k, str(v))
308 else:
309 packet.set(k, str(v))
310

◆ _distribute_complete()

module.multicast.BoswatchModule._distribute_complete (   self,
  freq,
  text_packet_dict 
)
protected

Create full multicast packets with message content.

Parameters
freqFrequency identifier
text_packet_dictData of the message-carrying packet
Returns
list: List of fully populated Packet instances
311 def _distribute_complete(self, freq, text_packet_dict):
312 r"""!Create full multicast packets with message content.
313
314 @param freq: Frequency identifier
315 @param text_packet_dict: Data of the message-carrying packet
316 @return list: List of fully populated Packet instances"""
317 with BoswatchModule._lock:
318 recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
319 logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts))
320 BoswatchModule._tone_ric_packets[freq].clear()
321 BoswatchModule._last_tone_ric_time.pop(freq, None)
322
323 if not recipient_dicts:
324 return []
325 text_ric = text_packet_dict.get("ric")
326 message_text = text_packet_dict.get("message")
327 alarm_packets = []
328
329 for idx, recipient_dict in enumerate(recipient_dicts, 1):
330 p = Packet()
331 self._copy_packet_dict_to_packet(recipient_dict, p, idx)
332 p.set("message", message_text)
333 self._apply_list_tags(p, recipient_dicts)
334 self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx)
335 alarm_packets.append(p)
336
337 logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric)
338 return alarm_packets
339

◆ _create_incomplete_multicast()

module.multicast.BoswatchModule._create_incomplete_multicast (   self,
  freq,
  recipient_dicts 
)
protected

Generate multicast packets for timeouts (no text message).

Parameters
freqFrequency identifier
recipient_dictsList of recipient data dictionaries
Returns
list: List of incomplete Packet instances
340 def _create_incomplete_multicast(self, freq, recipient_dicts):
341 r"""!Generate multicast packets for timeouts (no text message).
342
343 @param freq: Frequency identifier
344 @param recipient_dicts: List of recipient data dictionaries
345 @return list: List of incomplete Packet instances"""
346 if not recipient_dicts:
347 return []
348 first_ric = recipient_dicts[0].get("ric", "unknown")
349 incomplete_packets = []
350 for idx, recipient_dict in enumerate(recipient_dicts, 1):
351 p = Packet()
352 self._copy_packet_dict_to_packet(recipient_dict, p, idx)
353 p.set("message", "")
354 self._apply_list_tags(p, recipient_dicts)
355 self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx)
356 incomplete_packets.append(p)
357 return incomplete_packets
358

◆ _enrich_normal_alarm()

module.multicast.BoswatchModule._enrich_normal_alarm (   self,
  bwPacket,
  packet_dict 
)
protected

Enrich a standard single alarm with multicast metadata.

Parameters
bwPacketTarget Packet object
packet_dictSource data dictionary
Returns
list: List containing the enriched packet
359 def _enrich_normal_alarm(self, bwPacket, packet_dict):
360 r"""!Enrich a standard single alarm with multicast metadata.
361
362 @param bwPacket: Target Packet object
363 @param packet_dict: Source data dictionary
364 @return list: List containing the enriched packet"""
365 self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
366 self._apply_list_tags(bwPacket, [packet_dict])
367 self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
368 logging.debug("Erstelle Single-Alarm für RIC %s", packet_dict.get('ric'))
369 return [bwPacket]
370

◆ _handle_delimiter()

module.multicast.BoswatchModule._handle_delimiter (   self,
  freq,
  ric,
  bwPacket = None 
)
protected

Handle delimiter packet and clear orphaned tone-RICs.

Parameters
freqFrequency identifier
ricDelimiter RIC
bwPacketOptional delimiter packet instance
Returns
list: Incomplete packets or delimiter control packet
371 def _handle_delimiter(self, freq, ric, bwPacket=None):
372 r"""!Handle delimiter packet and clear orphaned tone-RICs.
373
374 @param freq: Frequency identifier
375 @param ric: Delimiter RIC
376 @param bwPacket: Optional delimiter packet instance
377 @return list: Incomplete packets or delimiter control packet"""
378 with BoswatchModule._lock:
379 orphaned = BoswatchModule._tone_ric_packets[freq].copy()
380 BoswatchModule._tone_ric_packets[freq].clear()
381 BoswatchModule._last_tone_ric_time.pop(freq, None)
382 BoswatchModule._processing_text_ric[freq] = False
383
384 if orphaned:
385 age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
386
387 logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds)
388 return self._create_incomplete_multicast(freq, orphaned)
389 if bwPacket is not None:
390 self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0")
391 return [bwPacket]
392 return None
393
394# ============================================================
395# PACKET METADATA HELPERS
396# ============================================================
397

◆ _set_mcast_metadata()

module.multicast.BoswatchModule._set_mcast_metadata (   self,
  packet,
  mode,
  role,
  source = "",
  count = "1",
  index = "1" 
)
protected

Helper to set standard multicast fields and register wildcards.

Parameters
packetThe Packet instance to modify
modemulticastMode (complete, incomplete, single, control)
rolemulticastRole (recipient, single, delimiter, netident)
sourceThe originating RIC
countTotal number of recipients
indexCurrent recipient index
Returns
None
398 def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"):
399 r"""!Helper to set standard multicast fields and register wildcards.
400
401 @param packet: The Packet instance to modify
402 @param mode: multicastMode (complete, incomplete, single, control)
403 @param role: multicastRole (recipient, single, delimiter, netident)
404 @param source: The originating RIC
405 @param count: Total number of recipients
406 @param index: Current recipient index
407 @return None"""
408 logging.debug("Setze Metadata - Mode: %s, Role: %s für RIC: %s", mode, role, source)
409 mapping = {
410 "multicastMode": (mode, "{MCAST_MODE}"),
411 "multicastRole": (role, "{MCAST_ROLE}"),
412 "multicastSourceRic": (source, "{MCAST_SOURCE}"),
413 "multicastRecipientCount": (str(count), "{MCAST_COUNT}"),
414 "multicastRecipientIndex": (str(index), "{MCAST_INDEX}")
415 }
416 for key, (val, wildcard) in mapping.items():
417 packet.set(key, val)
418 self._register_wildcard_safe(wildcard, key)
419

◆ _apply_list_tags()

module.multicast.BoswatchModule._apply_list_tags (   self,
  packet,
  recipient_dicts 
)
protected

Helper to aggregate fields from all recipients into comma-separated lists.

Parameters
packetThe target Packet instance
recipient_dictsList of dictionaries of all recipients in this group
Returns
None
420 def _apply_list_tags(self, packet, recipient_dicts):
421 r"""!Helper to aggregate fields from all recipients into comma-separated lists.
422
423 @param packet: The target Packet instance
424 @param recipient_dicts: List of dictionaries of all recipients in this group
425 @return None"""
426 all_fields = set()
427 for r in recipient_dicts:
428 all_fields.update(k for k in r.keys() if not k.startswith('_'))
429
430 for f in sorted(all_fields):
431 list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts])
432 list_key = f"{f}_list"
433 packet.set(list_key, list_val)
434 self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key)
435

◆ _register_wildcard_safe()

module.multicast.BoswatchModule._register_wildcard_safe (   self,
  wildcard,
  field 
)
protected

Register wildcard if not already globally registered.

Parameters
wildcardThe wildcard string (e.g. {MCAST_MODE})
fieldThe packet field name
Returns
None
436 def _register_wildcard_safe(self, wildcard, field):
437 r"""!Register wildcard if not already globally registered.
438
439 @param wildcard: The wildcard string (e.g. {MCAST_MODE})
440 @param field: The packet field name
441 @return None"""
442 if wildcard not in BoswatchModule._wildcards_registered:
443 self.registerWildcard(wildcard, field)
444 BoswatchModule._wildcards_registered.add(wildcard)
445
446# ============================================================
447# CLEANUP & TIMEOUT MANAGEMENT
448# ============================================================
449

◆ _global_cleanup_worker()

module.multicast.BoswatchModule._global_cleanup_worker ( )
staticprotected

Static background thread that ticks all active module instances.

Parameters
None
Returns
None
451 def _global_cleanup_worker():
452 r"""!Static background thread that ticks all active module instances.
453
454 @param None
455 @return None"""
456 logging.info("Global multicast cleanup ticker active")
457 while BoswatchModule._running:
458 time.sleep(1)
459 with BoswatchModule._lock:
460 active_instances = BoswatchModule._instances[:]
461 for instance in active_instances:
462 try:
463 instance._perform_instance_tick()
464 except Exception as e:
465 logging.error("Error in instance cleanup: %s", e)
466 if int(time.time()) % 60 == 0:
467 BoswatchModule._cleanup_hard_timeout_global()
468

◆ _perform_instance_tick()

module.multicast.BoswatchModule._perform_instance_tick (   self)
protected

Tick-entry point for this specific instance.

Acts as an extension hook for future per-instance tick logic (e.g. statistics, heartbeat, watchdog). Do not call directly.

Parameters
None
Returns
None
469 def _perform_instance_tick(self):
470 r"""!Tick-entry point for this specific instance.
471
472 Acts as an extension hook for future per-instance tick logic
473 (e.g. statistics, heartbeat, watchdog). Do not call directly.
474
475 @param None
476 @return None"""
477 self._check_all_my_frequencies()
478

◆ _check_all_my_frequencies()

module.multicast.BoswatchModule._check_all_my_frequencies (   self)
protected

Monitor timeouts for all frequencies assigned to this instance.

Parameters
None
Returns
None
479 def _check_all_my_frequencies(self):
480 r"""!Monitor timeouts for all frequencies assigned to this instance.
481
482 @param None
483 @return None"""
484 incomplete_packets = []
485 trigger_data = []
486
487 with BoswatchModule._lock:
488 current_time = time.time()
489 for freq in list(self._my_frequencies):
490 if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
491 continue
492
493 if BoswatchModule._processing_text_ric.get(freq, False):
494 flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time)
495 if flag_age > 2:
496 BoswatchModule._processing_text_ric[freq] = False
497 BoswatchModule._processing_text_ric_started.pop(freq, None)
498 else:
499 continue
500
501 last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
502 if current_time - last_time > self._auto_clear_timeout:
503 recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
504 safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
505 trigger_data.append((freq, safe_ric))
506 BoswatchModule._tone_ric_packets[freq].clear()
507 BoswatchModule._last_tone_ric_time.pop(freq, None)
508
509 logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
510 packets = self._create_incomplete_multicast(freq, recipient_dicts)
511 if packets:
512 incomplete_packets.extend(packets)
513
514 if incomplete_packets:
515 with BoswatchModule._queue_lock:
516 BoswatchModule._packet_queue.extend(incomplete_packets)
517 for freq, safe_ric in trigger_data:
518 self._send_wakeup_trigger(freq, safe_ric)
519

◆ _check_instance_auto_clear()

module.multicast.BoswatchModule._check_instance_auto_clear (   self,
  freq 
)
protected

Check if frequency has exceeded timeout (called from doWork).

Parameters
freqFrequency identifier
Returns
list: Incomplete packets if timeout exceeded, else None
520 def _check_instance_auto_clear(self, freq):
521 r"""!Check if frequency has exceeded timeout (called from doWork).
522
523 @param freq: Frequency identifier
524 @return list: Incomplete packets if timeout exceeded, else None"""
525 with BoswatchModule._lock:
526 if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
527 return None
528 last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
529 if time.time() - last_time > self._auto_clear_timeout:
530 recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
531 BoswatchModule._tone_ric_packets[freq].clear()
532 BoswatchModule._last_tone_ric_time.pop(freq, None)
533 logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
534 return self._create_incomplete_multicast(freq, recipient_dicts)
535 return None
536

◆ _cleanup_hard_timeout_global()

module.multicast.BoswatchModule._cleanup_hard_timeout_global ( )
staticprotected

Global failsafe for really old packets (ignores instance config).

Parameters
None
Returns
None
538 def _cleanup_hard_timeout_global():
539 r"""!Global failsafe for really old packets (ignores instance config).
540
541 @param None
542 @return None"""
543 with BoswatchModule._lock:
544 current_time = time.time()
545 max_hard_timeout = 120
546 if BoswatchModule._instances:
547 max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances)
548 for freq in list(BoswatchModule._tone_ric_packets.keys()):
549 BoswatchModule._tone_ric_packets[freq] = [
550 p for p in BoswatchModule._tone_ric_packets[freq]
551 if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout
552 ]
553 if not BoswatchModule._tone_ric_packets[freq]:
554 del BoswatchModule._tone_ric_packets[freq]
555
556# ============================================================
557# TRIGGER SYSTEM
558# ============================================================
559

◆ _send_wakeup_trigger()

module.multicast.BoswatchModule._send_wakeup_trigger (   self,
  freq,
  fallback_ric 
)
protected

Send a loopback trigger using the standard TCPClient class.

560 def _send_wakeup_trigger(self, freq, fallback_ric):
561 r"""!Send a loopback trigger using the standard TCPClient class."""
562 try:
563 trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
564 payload = {
565 "timestamp": time.time(),
566 "mode": "pocsag",
567 "bitrate": "1200",
568 "ric": trigger_ric,
569 "subric": "1",
570 "subricText": "a",
571 "message": BoswatchModule._MAGIC_WAKEUP_MSG,
572 "clientName": "MulticastTrigger",
573 "inputSource": "loopback",
574 "frequency": freq
575 }
576 json_str = json.dumps(payload)
577
578 # using BOSWatch-Architecture
579 client = TCPClient(timeout=2)
580 if client.connect(self._trigger_host, self._trigger_port):
581 # 1. Send
582 client.transmit(json_str)
583
584 # 2. Recieve (getting [ack] and prevents connection reset)
585 client.receive(timeout=1)
586
587 client.disconnect()
588 logging.debug("[%s] Wakeup trigger sent and acknowledged (RIC=%s)", self.name, trigger_ric)
589 else:
590 logging.error("[%s] Could not connect to local server for wakeup", self.name)
591
592 except Exception as e:
593 logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
594
595# ============================================================
596# LIFECYCLE (End)
597# ============================================================
598

◆ onUnload()

module.multicast.BoswatchModule.onUnload (   self)

Unregister instance from the global cleanup process.

Parameters
None
Returns
None

Reimplemented from module.moduleBase.ModuleBase.

599 def onUnload(self):
600 r"""!Unregister instance from the global cleanup process.
601
602 @param None
603 @return None"""
604 with BoswatchModule._lock:
605 if self in BoswatchModule._instances:
606 BoswatchModule._instances.remove(self)
607 logging.debug("[%s] Multicast instance unloaded", self.name)

Field Documentation

◆ _tone_ric_packets

module.multicast.BoswatchModule._tone_ric_packets = defaultdict(list)
staticprotected

◆ _last_tone_ric_time

module.multicast.BoswatchModule._last_tone_ric_time = defaultdict(float)
staticprotected

◆ _processing_text_ric

module.multicast.BoswatchModule._processing_text_ric = defaultdict(bool)
staticprotected

◆ _processing_text_ric_started

module.multicast.BoswatchModule._processing_text_ric_started = defaultdict(float)
staticprotected

◆ _lock

module.multicast.BoswatchModule._lock = threading.Lock()
staticprotected

◆ _cleanup_thread

module.multicast.BoswatchModule._cleanup_thread = None
staticprotected

◆ _running

bool module.multicast.BoswatchModule._running = False
staticprotected

◆ _wildcards_registered

module.multicast.BoswatchModule._wildcards_registered = set()
staticprotected

◆ _packet_queue

list module.multicast.BoswatchModule._packet_queue = []
staticprotected

◆ _queue_lock

module.multicast.BoswatchModule._queue_lock = threading.Lock()
staticprotected

◆ _instances

list module.multicast.BoswatchModule._instances = []
staticprotected

◆ _TRIGGER_HOST

str module.multicast.BoswatchModule._TRIGGER_HOST = "127.0.0.1"
staticprotected

◆ _TRIGGER_PORT

int module.multicast.BoswatchModule._TRIGGER_PORT = 8080
staticprotected

◆ _MAGIC_WAKEUP_MSG

str module.multicast.BoswatchModule._MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
staticprotected

◆ _DEFAULT_TRIGGER_RIC

str module.multicast.BoswatchModule._DEFAULT_TRIGGER_RIC = "9999999"
staticprotected

◆ _my_frequencies

module.multicast.BoswatchModule._my_frequencies
protected

◆ instance_id

module.multicast.BoswatchModule.instance_id

◆ name

module.multicast.BoswatchModule.name

◆ _auto_clear_timeout

module.multicast.BoswatchModule._auto_clear_timeout
protected

◆ _hard_timeout

module.multicast.BoswatchModule._hard_timeout
protected

◆ _delimiter_rics

module.multicast.BoswatchModule._delimiter_rics
protected

◆ _text_rics

module.multicast.BoswatchModule._text_rics
protected

◆ _netident_rics

module.multicast.BoswatchModule._netident_rics
protected

◆ _trigger_ric

module.multicast.BoswatchModule._trigger_ric
protected

◆ _trigger_host

module.multicast.BoswatchModule._trigger_host
protected

◆ _trigger_port

module.multicast.BoswatchModule._trigger_port
protected