BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
module.multicast.BoswatchModule Class Reference

Multicast module with multi-instance support and active trigger mechanism. More...

Public Member Functions

 __init__ (self, config)
 init preload some needed locals and then call onLoad() directly
 
 onLoad (self)
 Initialize module configuration and start the global cleanup thread.
 
 doWork (self, bwPacket)
 Process an incoming packet and handle multicast logic.
 
 onUnload (self)
 Unregister instance from the global cleanup process.
 

Data Fields

 instance_id
 
 name
 
- Data Fields inherited from module.moduleBase.ModuleBase
 config
 

Protected Member Functions

 _get_packet_data (self, bwPacket)
 Safely extract all fields from packet as a dictionary.
 
 _combine_results (self, *results)
 Combine multiple result sources into a single list or status.
 
 _add_tone_ric_packet (self, freq, packet_dict)
 Add a tone-RIC to the shared buffer.
 
 _get_queued_packets (self)
 Pop and return all packets currently in the static queue.
 
 _copy_packet_dict_to_packet (self, recipient_dict, packet, index=1)
 Copy dict fields to Packet with timestamp shift for DB uniqueness.
 
 _distribute_complete (self, freq, text_packet_dict)
 Create full multicast packets with message content.
 
 _create_incomplete_multicast (self, freq, recipient_dicts)
 Generate multicast packets for timeouts (no text message).
 
 _enrich_normal_alarm (self, bwPacket, packet_dict)
 Enrich a standard single alarm with multicast metadata.
 
 _handle_delimiter (self, freq, ric, bwPacket=None)
 Handle delimiter packet and clear orphaned tone-RICs.
 
 _set_mcast_metadata (self, packet, mode, role, source="", count="1", index="1")
 Helper to set standard multicast fields and register wildcards.
 
 _apply_list_tags (self, packet, recipient_dicts)
 Helper to aggregate fields from all recipients into comma-separated lists.
 
 _register_wildcard_safe (self, wildcard, field)
 Register wildcard if not already globally registered.
 
 _cleanup_worker (self)
 Per-instance background thread for timeout management.
 
 _check_all_my_frequencies (self)
 Monitor timeouts for all frequencies assigned to this instance.
 
 _check_instance_auto_clear (self, freq)
 Check if frequency has exceeded timeout (called from doWork).
 
 _cleanup_hard_timeout (self)
 Failsafe for really old packets.
 
 _send_wakeup_trigger (self, freq, fallback_ric)
 Send a loopback trigger using the standard TCPClient class.
 
- Protected Member Functions inherited from module.moduleBase.ModuleBase
 _cleanup (self)
 Cleanup routine calls onUnload() directly.
 
 _run (self, bwPacket)
 start an run of the module.
 
 _getStatistics (self)
 Returns statistical information's from last module run.
 

Protected Attributes

 _my_frequencies
 
 _auto_clear_timeout
 
 _hard_timeout
 
 _delimiter_rics
 
 _text_rics
 
 _netident_rics
 
 _trigger_ric
 
 _trigger_host
 
 _trigger_port
 
 _tone_ric_packets
 
 _last_tone_ric_time
 
 _processing_text_ric
 
 _processing_text_ric_started
 
 _wildcards_registered
 
 _packet_queue
 
 _lock
 
 _queue_lock
 
 _running
 
 _cleanup_thread
 
 _MAGIC_WAKEUP_MSG
 
- Protected Attributes inherited from module.moduleBase.ModuleBase
 _moduleName
 
 _cumTime
 
 _moduleTime
 
 _runCount
 
 _moduleErrorCount
 

Static Protected Attributes

str _TRIGGER_HOST = "127.0.0.1"
 
int _TRIGGER_PORT = 8080
 
str _MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
 
str _DEFAULT_TRIGGER_RIC = "9999999"
 
- Static Protected Attributes inherited from module.moduleBase.ModuleBase
list _modulesActive = []
 

Additional Inherited Members

- Static Public Member Functions inherited from module.moduleBase.ModuleBase
 registerWildcard (newWildcard, bwPacketField)
 Register a new wildcard.
 

Detailed Description

Multicast module with multi-instance support and active trigger mechanism.

This module handles multicast alarm distribution. It manages the correlation between tone-RICs (recipients) and text-RICs (message content), ensuring reliable alarm delivery even in complex multi-frequency scenarios.

Constructor & Destructor Documentation

◆ __init__()

module.multicast.BoswatchModule.__init__ (   self,
  moduleName 
)

init preload some needed locals and then call onLoad() directly

Reimplemented from module.moduleBase.ModuleBase.

49 def __init__(self, config):
50 super().__init__(__name__, config)
51

Member Function Documentation

◆ onLoad()

module.multicast.BoswatchModule.onLoad (   self)

Initialize module configuration and start the global cleanup thread.

Parameters
None
Returns
None

Reimplemented from module.moduleBase.ModuleBase.

52 def onLoad(self):
53 r"""!Initialize module configuration and start the global cleanup thread.
54
55 @param None
56 @return None"""
57 self._my_frequencies = set()
58 self.instance_id = hex(id(self))[-4:]
59 self.name = f"MCAST_{self.instance_id}"
60
61 self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
62 self._hard_timeout = self._auto_clear_timeout * 3
63
64 def parse_list(key):
65 val = self.config.get(key)
66 if val:
67 return [x.strip() for x in str(val).split(",") if x.strip()]
68 return []
69
70 self._delimiter_rics = parse_list("delimiterRics")
71 self._text_rics = parse_list("textRics")
72 self._netident_rics = parse_list("netIdentRics")
73
74 trigger_ric_cfg = self.config.get("triggerRic")
75 if trigger_ric_cfg:
76 self._trigger_ric = str(trigger_ric_cfg).strip()
77 else:
78 self._trigger_ric = None
79
80 self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
81 self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
82
83 # --- Per-instance state (replaces all former class-variables) ---
84 # Key: frequency string (e.g. "85.125M")
85 self._tone_ric_packets = defaultdict(list) # buffered tone-RICs per frequency
86 self._last_tone_ric_time = defaultdict(float) # last arrival time per frequency
87 self._processing_text_ric = defaultdict(bool) # text-RIC currently being processed?
88 self._processing_text_ric_started = defaultdict(float) # when did processing start?
89 self._wildcards_registered = set() # avoid double-registering wildcards
90 self._packet_queue = [] # deferred packets waiting for trigger
91
92 # --- Locks (only needed within this instance, no cross-instance sharing) ---
93 self._lock = threading.Lock()
94 self._queue_lock = threading.Lock()
95
96 # --- Per-instance cleanup thread ---
97 self._running = True
98 self._cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True)
99 self._cleanup_thread.start()
100
101 logging.info("[%s] Multicast module loaded", self.name)
102
103# ============================================================
104# MAIN PROCESSING
105# ============================================================
106

◆ doWork()

module.multicast.BoswatchModule.doWork (   self,
  bwPacket 
)

Process an incoming packet and handle multicast logic.

Enriches packets with multicast metadata (mode, role, source). Does NOT filter - all packets pass through, downstream modules handle filtering.

Parameters
bwPacketA BOSWatch packet instance or list of packets
Returns
bwPacket, a list of packets, or None if no processing

Reimplemented from module.moduleBase.ModuleBase.

107 def doWork(self, bwPacket):
108 r"""!Process an incoming packet and handle multicast logic.
109
110 Enriches packets with multicast metadata (mode, role, source).
111 Does NOT filter - all packets pass through, downstream modules handle filtering.
112
113 @param bwPacket: A BOSWatch packet instance or list of packets
114 @return bwPacket, a list of packets, or None if no processing"""
115 if isinstance(bwPacket, list):
116 result_packets = []
117 for single_packet in bwPacket:
118 processed = self.doWork(single_packet)
119 if processed is not None and processed is not False:
120 if isinstance(processed, list):
121 result_packets.extend(processed)
122 else:
123 result_packets.append(processed)
124 return result_packets if result_packets else None
125
126 packet_dict = self._get_packet_data(bwPacket)
127 msg = packet_dict.get("message")
128 ric = packet_dict.get("ric")
129 freq = packet_dict.get("frequency", "default")
130 mode = packet_dict.get("mode")
131
132 # Handle wakeup triggers
133 if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
134 if self._trigger_ric and ric != self._trigger_ric:
135 return None
136 logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
137 queued = self._get_queued_packets()
138 return queued if queued else None
139
140 # Only process POCSAG
141 if mode != "pocsag":
142 queued = self._get_queued_packets()
143 return queued if queued else None
144
145 self._my_frequencies.add(freq)
146
147 # Determine if this is a text-RIC
148 is_text_ric = False
149 if self._text_rics:
150 is_text_ric = ric in self._text_rics and msg and msg.strip()
151 else:
152 with self._lock:
153 is_text_ric = msg and msg.strip() and len(self._tone_ric_packets[freq]) > 0
154
155 if is_text_ric:
156 with self._lock:
157 self._processing_text_ric[freq] = True
158 self._processing_text_ric_started[freq] = time.time()
159
160 queued_packets = self._get_queued_packets()
161 incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
162
163 # === CONTROL PACKETS (netident, delimiter) ===
164 # Mark and pass through - no filtering!
165
166 if self._netident_rics and ric in self._netident_rics:
167 self._set_mcast_metadata(bwPacket, "control", "netident", ric)
168 return self._combine_results(incomplete_packets, queued_packets, [bwPacket])
169
170 if self._delimiter_rics and ric in self._delimiter_rics:
171 delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
172 return self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
173
174 # === TONE-RICs (no message) ===
175 if not msg or not msg.strip():
176 self._add_tone_ric_packet(freq, packet_dict)
177 return self._combine_results(incomplete_packets, queued_packets, False)
178
179 # === TEXT-RICs (with message) ===
180 if is_text_ric and msg:
181 logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
182 alarm_packets = self._distribute_complete(freq, packet_dict)
183 with self._lock:
184 self._processing_text_ric[freq] = False
185 self._processing_text_ric_started.pop(freq, None)
186
187 if not alarm_packets:
188 logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
189 normal = self._enrich_normal_alarm(bwPacket, packet_dict)
190 return self._combine_results(normal, incomplete_packets, queued_packets)
191 else:
192 return self._combine_results(alarm_packets, incomplete_packets, queued_packets)
193
194 # === SINGLE ALARM (message but no text-RICs configured) ===
195 if msg:
196 normal = self._enrich_normal_alarm(bwPacket, packet_dict)
197 return self._combine_results(normal, incomplete_packets, queued_packets)
198
199 return self._combine_results(incomplete_packets, queued_packets)
200
201# ============================================================
202# PACKET PROCESSING HELPERS (called by doWork)
203# ============================================================
204

◆ _get_packet_data()

module.multicast.BoswatchModule._get_packet_data (   self,
  bwPacket 
)
protected

Safely extract all fields from packet as a dictionary.

Handles both dict objects and Packet instances. Dynamically extracts all fields including those added by other modules.

Parameters
bwPacketPacket instance or dict
Returns
dict: Complete dictionary of all packet fields
205 def _get_packet_data(self, bwPacket):
206 r"""!Safely extract all fields from packet as a dictionary.
207
208 Handles both dict objects and Packet instances.
209 Dynamically extracts all fields including those added by other modules.
210
211 @param bwPacket: Packet instance or dict
212 @return dict: Complete dictionary of all packet fields"""
213 # 1. Fall: Es ist bereits ein Dictionary
214 if isinstance(bwPacket, dict):
215 return bwPacket.copy()
216
217 # 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet)
218 if hasattr(bwPacket, '_packet'):
219 return bwPacket._packet.copy()
220
221 # 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet'
222 try:
223 return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')}
224 except Exception as e:
225 logging.warning("[%s] Error: %s", self.name, e)
226 return {}
227

◆ _combine_results()

module.multicast.BoswatchModule._combine_results (   self,
results 
)
protected

Combine multiple result sources into a single list or status.

Parameters
resultsMultiple packet objects, lists, or booleans
Returns
combined list, False or None
228 def _combine_results(self, *results):
229 r"""!Combine multiple result sources into a single list or status.
230
231 @param results: Multiple packet objects, lists, or booleans
232 @return combined list, False or None"""
233 combined = []
234 has_false = False
235 for result in results:
236 if result is False:
237 has_false = True
238 continue
239 if result is None:
240 continue
241 if isinstance(result, list):
242 combined.extend(result)
243 else:
244 combined.append(result)
245 if combined:
246 return combined
247 return False if has_false else None
248
249# ============================================================
250# TONE-RIC BUFFER MANAGEMENT
251# ============================================================
252

◆ _add_tone_ric_packet()

module.multicast.BoswatchModule._add_tone_ric_packet (   self,
  freq,
  packet_dict 
)
protected

Add a tone-RIC to the shared buffer.

Parameters
freqFrequency identifier
packet_dictDictionary containing packet data
Returns
None
253 def _add_tone_ric_packet(self, freq, packet_dict):
254 r"""!Add a tone-RIC to the shared buffer.
255
256 @param freq: Frequency identifier
257 @param packet_dict: Dictionary containing packet data
258 @return None"""
259 with self._lock:
260 stored_packet = packet_dict.copy()
261 stored_packet['_multicast_timestamp'] = time.time()
262 self._tone_ric_packets[freq].append(stored_packet)
263 self._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
264 logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(self._tone_ric_packets[freq]), freq)
265

◆ _get_queued_packets()

module.multicast.BoswatchModule._get_queued_packets (   self)
protected

Pop and return all packets currently in the static queue.

Parameters
None
Returns
list: List of packets or None
266 def _get_queued_packets(self):
267 r"""!Pop and return all packets currently in the static queue.
268
269 @param None
270 @return list: List of packets or None"""
271 with self._queue_lock:
272 if self._packet_queue:
273 packets = self._packet_queue[:]
274 self._packet_queue.clear()
275 return packets
276 return None
277
278# ============================================================
279# MULTICAST PACKET CREATION
280# ============================================================
281

◆ _copy_packet_dict_to_packet()

module.multicast.BoswatchModule._copy_packet_dict_to_packet (   self,
  recipient_dict,
  packet,
  index = 1 
)
protected

Copy dict fields to Packet with timestamp shift for DB uniqueness.

Parameters
recipient_dictSource dictionary
packetTarget Packet object
indexPacket index (1-based) - shifts timestamp by milliseconds
Returns
None
282 def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1):
283 r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness.
284
285 @param recipient_dict: Source dictionary
286 @param packet: Target Packet object
287 @param index: Packet index (1-based) - shifts timestamp by milliseconds
288 @return None"""
289 for k, v in recipient_dict.items():
290 if k.startswith('_'):
291 continue
292 if k == 'timestamp' and index > 1:
293 try:
294 dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
295 dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
296 packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
297 except (ValueError, TypeError):
298 packet.set(k, str(v))
299 else:
300 packet.set(k, str(v))
301

◆ _distribute_complete()

module.multicast.BoswatchModule._distribute_complete (   self,
  freq,
  text_packet_dict 
)
protected

Create full multicast packets with message content.

Parameters
freqFrequency identifier
text_packet_dictData of the message-carrying packet
Returns
list: List of fully populated Packet instances
302 def _distribute_complete(self, freq, text_packet_dict):
303 r"""!Create full multicast packets with message content.
304
305 @param freq: Frequency identifier
306 @param text_packet_dict: Data of the message-carrying packet
307 @return list: List of fully populated Packet instances"""
308 with self._lock:
309 recipient_dicts = self._tone_ric_packets[freq].copy()
310 logging.debug("Text RIC found. Matching against %d stored RICs", len(recipient_dicts))
311 self._tone_ric_packets[freq].clear()
312 self._last_tone_ric_time.pop(freq, None)
313
314 if not recipient_dicts:
315 return []
316 text_ric = text_packet_dict.get("ric")
317 message_text = text_packet_dict.get("message")
318 alarm_packets = []
319
320 for idx, recipient_dict in enumerate(recipient_dicts, 1):
321 p = Packet()
322 self._copy_packet_dict_to_packet(recipient_dict, p, idx)
323 p.set("message", message_text)
324 self._apply_list_tags(p, recipient_dicts)
325 self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx)
326 alarm_packets.append(p)
327
328 logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric)
329 return alarm_packets
330

◆ _create_incomplete_multicast()

module.multicast.BoswatchModule._create_incomplete_multicast (   self,
  freq,
  recipient_dicts 
)
protected

Generate multicast packets for timeouts (no text message).

Parameters
freqFrequency identifier
recipient_dictsList of recipient data dictionaries
Returns
list: List of incomplete Packet instances
331 def _create_incomplete_multicast(self, freq, recipient_dicts):
332 r"""!Generate multicast packets for timeouts (no text message).
333
334 @param freq: Frequency identifier
335 @param recipient_dicts: List of recipient data dictionaries
336 @return list: List of incomplete Packet instances"""
337 if not recipient_dicts:
338 return []
339 first_ric = recipient_dicts[0].get("ric", "unknown")
340 incomplete_packets = []
341 for idx, recipient_dict in enumerate(recipient_dicts, 1):
342 p = Packet()
343 self._copy_packet_dict_to_packet(recipient_dict, p, idx)
344 p.set("message", "")
345 self._apply_list_tags(p, recipient_dicts)
346 self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx)
347 incomplete_packets.append(p)
348 return incomplete_packets
349

◆ _enrich_normal_alarm()

module.multicast.BoswatchModule._enrich_normal_alarm (   self,
  bwPacket,
  packet_dict 
)
protected

Enrich a standard single alarm with multicast metadata.

Parameters
bwPacketTarget Packet object
packet_dictSource data dictionary
Returns
list: List containing the enriched packet
350 def _enrich_normal_alarm(self, bwPacket, packet_dict):
351 r"""!Enrich a standard single alarm with multicast metadata.
352
353 @param bwPacket: Target Packet object
354 @param packet_dict: Source data dictionary
355 @return list: List containing the enriched packet"""
356 self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
357 self._apply_list_tags(bwPacket, [packet_dict])
358 self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
359 logging.debug("Creating single-alarm for RIC %s", packet_dict.get('ric'))
360 return [bwPacket]
361

◆ _handle_delimiter()

module.multicast.BoswatchModule._handle_delimiter (   self,
  freq,
  ric,
  bwPacket = None 
)
protected

Handle delimiter packet and clear orphaned tone-RICs.

Parameters
freqFrequency identifier
ricDelimiter RIC
bwPacketOptional delimiter packet instance
Returns
list: Incomplete packets or delimiter control packet
362 def _handle_delimiter(self, freq, ric, bwPacket=None):
363 r"""!Handle delimiter packet and clear orphaned tone-RICs.
364
365 @param freq: Frequency identifier
366 @param ric: Delimiter RIC
367 @param bwPacket: Optional delimiter packet instance
368 @return list: Incomplete packets or delimiter control packet"""
369 with self._lock:
370 orphaned = self._tone_ric_packets[freq].copy()
371 self._tone_ric_packets[freq].clear()
372 self._last_tone_ric_time.pop(freq, None)
373 self._processing_text_ric[freq] = False
374
375 if orphaned:
376 age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
377
378 logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds)
379 return self._create_incomplete_multicast(freq, orphaned)
380 if bwPacket is not None:
381 self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0")
382 return [bwPacket]
383 return None
384
385# ============================================================
386# PACKET METADATA HELPERS
387# ============================================================
388

◆ _set_mcast_metadata()

module.multicast.BoswatchModule._set_mcast_metadata (   self,
  packet,
  mode,
  role,
  source = "",
  count = "1",
  index = "1" 
)
protected

Helper to set standard multicast fields and register wildcards.

Parameters
packetThe Packet instance to modify
modemulticastMode (complete, incomplete, single, control)
rolemulticastRole (recipient, single, delimiter, netident)
sourceThe originating RIC
countTotal number of recipients
indexCurrent recipient index
Returns
None
389 def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"):
390 r"""!Helper to set standard multicast fields and register wildcards.
391
392 @param packet: The Packet instance to modify
393 @param mode: multicastMode (complete, incomplete, single, control)
394 @param role: multicastRole (recipient, single, delimiter, netident)
395 @param source: The originating RIC
396 @param count: Total number of recipients
397 @param index: Current recipient index
398 @return None"""
399 logging.debug("setting Metadata - Mode: %s, Role: %s, Index: %s of %s for RIC: %s", mode, role, index, count, source)
400 mapping = {
401 "multicastMode": (mode, "{MCAST_MODE}"),
402 "multicastRole": (role, "{MCAST_ROLE}"),
403 "multicastSourceRic": (source, "{MCAST_SOURCE}"),
404 "multicastRecipientCount": (str(count), "{MCAST_COUNT}"),
405 "multicastRecipientIndex": (str(index), "{MCAST_INDEX}")
406 }
407 for key, (val, wildcard) in mapping.items():
408 packet.set(key, val)
409 self._register_wildcard_safe(wildcard, key)
410

◆ _apply_list_tags()

module.multicast.BoswatchModule._apply_list_tags (   self,
  packet,
  recipient_dicts 
)
protected

Helper to aggregate fields from all recipients into comma-separated lists.

Parameters
packetThe target Packet instance
recipient_dictsList of dictionaries of all recipients in this group
Returns
None
411 def _apply_list_tags(self, packet, recipient_dicts):
412 r"""!Helper to aggregate fields from all recipients into comma-separated lists.
413
414 @param packet: The target Packet instance
415 @param recipient_dicts: List of dictionaries of all recipients in this group
416 @return None"""
417 all_fields = set()
418 for r in recipient_dicts:
419 all_fields.update(k for k in r.keys() if not k.startswith('_'))
420
421 for f in sorted(all_fields):
422 list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts])
423 list_key = f"{f}_list"
424 packet.set(list_key, list_val)
425 self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key)
426

◆ _register_wildcard_safe()

module.multicast.BoswatchModule._register_wildcard_safe (   self,
  wildcard,
  field 
)
protected

Register wildcard if not already globally registered.

Parameters
wildcardThe wildcard string (e.g. {MCAST_MODE})
fieldThe packet field name
Returns
None
427 def _register_wildcard_safe(self, wildcard, field):
428 r"""!Register wildcard if not already globally registered.
429
430 @param wildcard: The wildcard string (e.g. {MCAST_MODE})
431 @param field: The packet field name
432 @return None"""
433 if wildcard not in self._wildcards_registered:
434 self.registerWildcard(wildcard, field)
435 self._wildcards_registered.add(wildcard)
436
437# ============================================================
438# CLEANUP & TIMEOUT MANAGEMENT
439# ============================================================
440

◆ _cleanup_worker()

module.multicast.BoswatchModule._cleanup_worker (   self)
protected

Per-instance background thread for timeout management.

441 def _cleanup_worker(self):
442 r"""!Per-instance background thread for timeout management."""
443 logging.info("[%s] Cleanup thread started", self.name)
444 while self._running:
445 time.sleep(1)
446 try:
447 self._check_all_my_frequencies()
448 except Exception as e:
449 logging.error("[%s] Error in cleanup thread: %s", self.name, e)
450 if int(time.time()) % 60 == 0:
451 self._cleanup_hard_timeout()
452

◆ _check_all_my_frequencies()

module.multicast.BoswatchModule._check_all_my_frequencies (   self)
protected

Monitor timeouts for all frequencies assigned to this instance.

Parameters
None
Returns
None
453 def _check_all_my_frequencies(self):
454 r"""!Monitor timeouts for all frequencies assigned to this instance.
455
456 @param None
457 @return None"""
458 incomplete_packets = []
459 trigger_data = []
460
461 with self._lock:
462 current_time = time.time()
463 for freq in list(self._my_frequencies):
464 if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]:
465 continue
466
467 if self._processing_text_ric.get(freq, False):
468 flag_age = current_time - self._processing_text_ric_started.get(freq, current_time)
469 if flag_age > 2:
470 self._processing_text_ric[freq] = False
471 self._processing_text_ric_started.pop(freq, None)
472 else:
473 continue
474
475 last_time = self._last_tone_ric_time.get(freq, 0)
476 if current_time - last_time > self._auto_clear_timeout:
477 recipient_dicts = self._tone_ric_packets[freq].copy()
478 safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
479 trigger_data.append((freq, safe_ric))
480 self._tone_ric_packets[freq].clear()
481 self._last_tone_ric_time.pop(freq, None)
482
483 logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
484 packets = self._create_incomplete_multicast(freq, recipient_dicts)
485 if packets:
486 incomplete_packets.extend(packets)
487
488 if incomplete_packets:
489 with self._queue_lock:
490 self._packet_queue.extend(incomplete_packets)
491 for freq, safe_ric in trigger_data:
492 self._send_wakeup_trigger(freq, safe_ric)
493

◆ _check_instance_auto_clear()

module.multicast.BoswatchModule._check_instance_auto_clear (   self,
  freq 
)
protected

Check if frequency has exceeded timeout (called from doWork).

Parameters
freqFrequency identifier
Returns
list: Incomplete packets if timeout exceeded, else None
494 def _check_instance_auto_clear(self, freq):
495 r"""!Check if frequency has exceeded timeout (called from doWork).
496
497 @param freq: Frequency identifier
498 @return list: Incomplete packets if timeout exceeded, else None"""
499 with self._lock:
500 if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]:
501 return None
502 last_time = self._last_tone_ric_time.get(freq, 0)
503 if time.time() - last_time > self._auto_clear_timeout:
504 recipient_dicts = self._tone_ric_packets[freq].copy()
505 self._tone_ric_packets[freq].clear()
506 self._last_tone_ric_time.pop(freq, None)
507 logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
508 return self._create_incomplete_multicast(freq, recipient_dicts)
509 return None
510

◆ _cleanup_hard_timeout()

module.multicast.BoswatchModule._cleanup_hard_timeout (   self)
protected

Failsafe for really old packets.

511 def _cleanup_hard_timeout(self):
512 r"""!Failsafe for really old packets."""
513 with self._lock:
514 current_time = time.time()
515 for freq in list(self._tone_ric_packets.keys()):
516 self._tone_ric_packets[freq] = [
517 p for p in self._tone_ric_packets[freq]
518 if current_time - p.get('_multicast_timestamp', 0) < self._hard_timeout
519 ]
520 # cleaning empty frequencies
521 if not self._tone_ric_packets[freq]:
522 del self._tone_ric_packets[freq]
523
524# ============================================================
525# TRIGGER SYSTEM
526# ============================================================
527

◆ _send_wakeup_trigger()

module.multicast.BoswatchModule._send_wakeup_trigger (   self,
  freq,
  fallback_ric 
)
protected

Send a loopback trigger using the standard TCPClient class.

528 def _send_wakeup_trigger(self, freq, fallback_ric):
529 r"""!Send a loopback trigger using the standard TCPClient class."""
530 try:
531 trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
532 payload = {
533 "timestamp": time.time(),
534 "mode": "pocsag",
535 "bitrate": "1200",
536 "ric": trigger_ric,
537 "subric": "1",
538 "subricText": "a",
539 "message": self._MAGIC_WAKEUP_MSG,
540 "clientName": "MulticastTrigger",
541 "inputSource": "loopback",
542 "frequency": freq
543 }
544 json_str = json.dumps(payload)
545
546 # using BOSWatch-Architecture
547 client = TCPClient(timeout=2)
548 if client.connect(self._trigger_host, self._trigger_port):
549 # 1. Send
550 client.transmit(json_str)
551
552 # 2. Recieve (getting [ack] and prevents connection reset)
553 client.receive(timeout=1)
554
555 client.disconnect()
556 logging.debug("[%s] Wakeup trigger sent and acknowledged (RIC=%s)", self.name, trigger_ric)
557 else:
558 logging.error("[%s] Could not connect to local server for wakeup", self.name)
559
560 except Exception as e:
561 logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
562
563# ============================================================
564# LIFECYCLE (End)
565# ============================================================
566

◆ onUnload()

module.multicast.BoswatchModule.onUnload (   self)

Unregister instance from the global cleanup process.

Parameters
None
Returns
None

Reimplemented from module.moduleBase.ModuleBase.

567 def onUnload(self):
568 r"""!Unregister instance from the global cleanup process.
569
570 @param None
571 @return None"""
572 self._running = False
573 logging.debug("[%s] Multicast instance unloaded", self.name)

Field Documentation

◆ _TRIGGER_HOST

str module.multicast.BoswatchModule._TRIGGER_HOST = "127.0.0.1"
staticprotected

◆ _TRIGGER_PORT

int module.multicast.BoswatchModule._TRIGGER_PORT = 8080
staticprotected

◆ _MAGIC_WAKEUP_MSG [1/2]

str module.multicast.BoswatchModule._MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
staticprotected

◆ _DEFAULT_TRIGGER_RIC

str module.multicast.BoswatchModule._DEFAULT_TRIGGER_RIC = "9999999"
staticprotected

◆ _my_frequencies

module.multicast.BoswatchModule._my_frequencies
protected

◆ instance_id

module.multicast.BoswatchModule.instance_id

◆ name

module.multicast.BoswatchModule.name

◆ _auto_clear_timeout

module.multicast.BoswatchModule._auto_clear_timeout
protected

◆ _hard_timeout

module.multicast.BoswatchModule._hard_timeout
protected

◆ _delimiter_rics

module.multicast.BoswatchModule._delimiter_rics
protected

◆ _text_rics

module.multicast.BoswatchModule._text_rics
protected

◆ _netident_rics

module.multicast.BoswatchModule._netident_rics
protected

◆ _trigger_ric

module.multicast.BoswatchModule._trigger_ric
protected

◆ _trigger_host

module.multicast.BoswatchModule._trigger_host
protected

◆ _trigger_port

module.multicast.BoswatchModule._trigger_port
protected

◆ _tone_ric_packets

module.multicast.BoswatchModule._tone_ric_packets
protected

◆ _last_tone_ric_time

module.multicast.BoswatchModule._last_tone_ric_time
protected

◆ _processing_text_ric

module.multicast.BoswatchModule._processing_text_ric
protected

◆ _processing_text_ric_started

module.multicast.BoswatchModule._processing_text_ric_started
protected

◆ _wildcards_registered

module.multicast.BoswatchModule._wildcards_registered
protected

◆ _packet_queue

module.multicast.BoswatchModule._packet_queue
protected

◆ _lock

module.multicast.BoswatchModule._lock
protected

◆ _queue_lock

module.multicast.BoswatchModule._queue_lock
protected

◆ _running

module.multicast.BoswatchModule._running
protected

◆ _cleanup_thread

module.multicast.BoswatchModule._cleanup_thread
protected

◆ _MAGIC_WAKEUP_MSG [2/2]

module.multicast.BoswatchModule._MAGIC_WAKEUP_MSG
protected