BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
plugin.telegram.TelegramSender Class Reference

Public Member Functions

 __init__ (self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None)
 
 send_message (self, text)
 
 send_location (self, latitude, longitude)
 
 shutdown (self)
 

Data Fields

 bot_token
 
 chat_ids
 
 max_retries
 
 initial_delay
 
 max_delay
 
 msg_queue
 
 parse_mode
 

Protected Member Functions

 _worker_loop (self)
 
 _send_to_telegram (self, msg_type, chat_id, content)
 

Static Protected Member Functions

 _escape_text (text, parse_mode)
 

Protected Attributes

 _stop_event
 
 _worker
 

Constructor & Destructor Documentation

◆ __init__()

plugin.telegram.TelegramSender.__init__ (   self,
  bot_token,
  chat_ids,
  max_retries = None,
  initial_delay = None,
  max_delay = None,
  parse_mode = None 
)
35 def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None):
36 self._stop_event = threading.Event()
37 self.bot_token = bot_token
38 self.chat_ids = chat_ids
39 self.max_retries = max_retries if max_retries is not None else 5
40 self.initial_delay = initial_delay if initial_delay is not None else 2
41 self.max_delay = max_delay if max_delay is not None else 300
42 self.msg_queue = queue.Queue(maxsize=100) # max 100 messages
43 self.parse_mode = parse_mode
44
45 # Start Worker Thread
46 self._worker = threading.Thread(target=self._worker_loop, daemon=True)
47 self._worker.start()
48

Member Function Documentation

◆ send_message()

plugin.telegram.TelegramSender.send_message (   self,
  text 
)
49 def send_message(self, text):
50 clean_text = self._escape_text(text, self.parse_mode)
51 for chat_id in self.chat_ids:
52 try:
53 self.msg_queue.put(("text", chat_id, clean_text, 0), block=False)
54 except queue.Full:
55 logger.error(f"Message queue full for chat_id {chat_id}, message discarded: {clean_text[:50]}...")
56

◆ send_location()

plugin.telegram.TelegramSender.send_location (   self,
  latitude,
  longitude 
)
57 def send_location(self, latitude, longitude):
58 try:
59 # Use validated float values, not original strings
60 lat = float(latitude)
61 lon = float(longitude)
62 # check Telegram API Limits
63 if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
64 logger.error(f"Invalid coordinates: lat={lat}, lon={lon} (out of range)")
65 return
66 except (TypeError, ValueError) as e:
67 logger.error(f"Invalid coordinate format: {e}, location skipped")
68 return
69
70 for chat_id in self.chat_ids:
71 try:
72 self.msg_queue.put(("location", chat_id, {"latitude": lat, "longitude": lon}, 0), block=False)
73 except queue.Full:
74 logger.error(f"Location queue full for chat_id {chat_id}, location discarded: {lat}, {lon}")
75

◆ _escape_text()

plugin.telegram.TelegramSender._escape_text (   text,
  parse_mode 
)
staticprotected
77 def _escape_text(text, parse_mode):
78 if not text:
79 return ""
80 if parse_mode == "HTML":
81 protected_tags = {}
82 tag_pattern = r'<(/?)(\w+)>'
83 allowed = ["b", "strong", "i", "em", "code", "pre", "u", "s"]
84
85 def protect_tag(match):
86 tag_full = match.group(0)
87 tag_name = match.group(2)
88 if tag_name in allowed:
89 placeholder = f"__TAG_{len(protected_tags)}__"
90 protected_tags[placeholder] = tag_full
91 return placeholder
92 return tag_full
93
94 text = re.sub(tag_pattern, protect_tag, text)
95 text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
96 for placeholder, original_tag in protected_tags.items():
97 text = text.replace(placeholder, original_tag)
98 elif parse_mode == "MarkdownV2":
99 # Escape all MarkdownV2 special characters (Telegram API requirement)
100 # See: https://core.telegram.org/bots/api#markdownv2-style
101 escape_chars = r'_*[]()~`>#+-=|{}.!'
102 text = "".join(['\\' + char if char in escape_chars else char for char in text])
103 return text[:4090] + "[...]" if len(text) > 4096 else text
104

◆ _worker_loop()

plugin.telegram.TelegramSender._worker_loop (   self)
protected
105 def _worker_loop(self):
106 delay = self.initial_delay
107 while not self._stop_event.is_set():
108 try:
109 msg_type, chat_id, content, retry_count = self.msg_queue.get(timeout=1)
110 success, permanent_failure, custom_delay, error_details = self._send_to_telegram(msg_type, chat_id, content)
111 if success:
112 delay = self.initial_delay
113 elif permanent_failure or retry_count >= self.max_retries:
114 logger.warning(f"Discarding message for {chat_id}: {error_details}")
115 else:
116 wait_time = custom_delay if custom_delay is not None else delay
117 time.sleep(wait_time)
118 self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
119 delay = min(delay * 2, self.max_delay)
120 except queue.Empty:
121 continue
122 except Exception as e:
123 logger.error(f"Error in Telegram worker: {e}")
124 time.sleep(5)
125

◆ _send_to_telegram()

plugin.telegram.TelegramSender._send_to_telegram (   self,
  msg_type,
  chat_id,
  content 
)
protected
126 def _send_to_telegram(self, msg_type, chat_id, content):
127 url = f"https://api.telegram.org/bot{self.bot_token}/"
128 url += "sendMessage" if msg_type == "text" else "sendLocation"
129 payload = {'chat_id': chat_id}
130
131 if msg_type == "text":
132 payload['text'] = content
133 if self.parse_mode:
134 payload['parse_mode'] = self.parse_mode
135 else:
136 payload.update(content)
137
138 try:
139 response = requests.post(url, data=payload, timeout=10)
140 if response.status_code == 200:
141 logger.info(f"Successfully sent to Chat-ID {chat_id}")
142 return True, False, None, None
143
144 # Rate limiting
145 if response.status_code == 429:
146 retry_after = response.json().get("parameters", {}).get("retry_after", 5)
147 logger.warning(f"Rate limited for {chat_id}, retry after {retry_after}s")
148 return False, False, retry_after, f"Rate Limit (retry after {retry_after}s)"
149
150 return False, response.status_code < 500, None, f"HTTP {response.status_code}"
151 except Exception as e:
152 return False, False, None, str(e)
153

◆ shutdown()

plugin.telegram.TelegramSender.shutdown (   self)
Graceful shutdown with queue draining
154 def shutdown(self):
155 r"""Graceful shutdown with queue draining"""
156 logger.info("Shutting down Telegram sender...")
157 self._stop_event.set()
158 timeout = time.time() + 5
159 while not self.msg_queue.empty() and time.time() < timeout:
160 time.sleep(0.1)
161 remaining = self.msg_queue.qsize()
162 if remaining > 0:
163 logger.warning(f"{remaining} messages in queue discarded during shutdown")
164 self._worker.join(timeout=5)
165
166
167# ===========================
168# BoswatchPlugin-Class
169# ===========================
170

Field Documentation

◆ _stop_event

plugin.telegram.TelegramSender._stop_event
protected

◆ bot_token

plugin.telegram.TelegramSender.bot_token

◆ chat_ids

plugin.telegram.TelegramSender.chat_ids

◆ max_retries

plugin.telegram.TelegramSender.max_retries

◆ initial_delay

plugin.telegram.TelegramSender.initial_delay

◆ max_delay

plugin.telegram.TelegramSender.max_delay

◆ msg_queue

plugin.telegram.TelegramSender.msg_queue

◆ parse_mode

plugin.telegram.TelegramSender.parse_mode

◆ _worker

plugin.telegram.TelegramSender._worker
protected