DivYonko commited on
Commit
146e596
·
1 Parent(s): eede559

feat: replace pytchat with YouTube Data API v3 scraper

Browse files
Files changed (2) hide show
  1. app.py +113 -39
  2. requirements.txt +1 -2
app.py CHANGED
@@ -102,53 +102,127 @@ def _safe_topic(text: str):
102
  return "General", 0.50
103
 
104
 
105
- def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event) -> None:
106
- """Background thread that scrapes live chat and writes to in-memory store."""
107
- import pytchat
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
 
109
- logger.info("Scraper thread starting — video=%s key=%s", video_id, redis_key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  try:
111
- chat = pytchat.create(video_id=video_id, interruptable=False)
 
 
 
 
 
112
  except Exception as exc:
113
- logger.error("pytchat.create failed: %s", exc)
 
 
 
 
 
 
 
 
114
  return
115
 
116
- if not chat.is_alive():
117
- logger.error("Live chat not available for %s", video_id)
 
 
 
 
118
  return
119
 
120
- logger.info("Live chat connected for %s", video_id)
121
 
122
- while chat.is_alive() and not stop_event.is_set():
123
- try:
124
- for c in chat.get().sync_items():
125
- if stop_event.is_set():
126
- break
127
- text = c.message.strip()
128
- author = c.author.name
129
- if not text:
130
- continue
131
-
132
- sentiment, s_conf = _safe_sentiment(text)
133
- topic, t_conf = _safe_topic(text)
134
-
135
- message_data = {
136
- "author": author,
137
- "text": text,
138
- "sentiment": sentiment,
139
- "confidence": round(s_conf, 3),
140
- "topic": topic,
141
- "topic_conf": round(t_conf, 3),
142
- "time": datetime.now().isoformat(),
143
- }
144
- store_rpush(redis_key, json.dumps(message_data))
145
-
146
- except Exception as exc:
147
- if not stop_event.is_set():
148
- logger.error("Scraper error: %s", exc, exc_info=True)
149
-
150
- if not stop_event.is_set():
151
- time.sleep(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
 
153
  logger.info("Scraper thread ended — key=%s", redis_key)
154
 
 
102
  return "General", 0.50
103
 
104
 
105
+ def _get_live_chat_id(video_id: str, api_key: str) -> str | None:
106
+ """Fetch the liveChatId for a given video using YouTube Data API v3."""
107
+ import urllib.request
108
+ import urllib.parse
109
+
110
+ url = (
111
+ "https://www.googleapis.com/youtube/v3/videos"
112
+ f"?part=liveStreamingDetails&id={urllib.parse.quote(video_id)}&key={api_key}"
113
+ )
114
+ try:
115
+ with urllib.request.urlopen(url, timeout=10) as resp:
116
+ data = json.loads(resp.read())
117
+ items = data.get("items", [])
118
+ if not items:
119
+ logger.error("No video found for id=%s", video_id)
120
+ return None
121
+ live_details = items[0].get("liveStreamingDetails", {})
122
+ chat_id = live_details.get("activeLiveChatId")
123
+ if not chat_id:
124
+ logger.error("No active live chat for video id=%s", video_id)
125
+ return chat_id
126
+ except Exception as exc:
127
+ logger.error("Failed to get liveChatId: %s", exc)
128
+ return None
129
 
130
+
131
+ def _fetch_chat_messages(live_chat_id: str, api_key: str, page_token: str | None = None):
132
+ """
133
+ Fetch one page of live chat messages.
134
+ Returns (messages_list, next_page_token, polling_interval_ms).
135
+ """
136
+ import urllib.request
137
+ import urllib.parse
138
+
139
+ params = {
140
+ "part": "snippet,authorDetails",
141
+ "liveChatId": live_chat_id,
142
+ "key": api_key,
143
+ "maxResults": "200",
144
+ }
145
+ if page_token:
146
+ params["pageToken"] = page_token
147
+
148
+ url = "https://www.googleapis.com/youtube/v3/liveChat/messages?" + urllib.parse.urlencode(params)
149
  try:
150
+ with urllib.request.urlopen(url, timeout=10) as resp:
151
+ data = json.loads(resp.read())
152
+ messages = data.get("items", [])
153
+ next_token = data.get("nextPageToken")
154
+ poll_interval = data.get("pollingIntervalMillis", 5000)
155
+ return messages, next_token, poll_interval
156
  except Exception as exc:
157
+ logger.error("Failed to fetch chat messages: %s", exc)
158
+ return [], None, 5000
159
+
160
+
161
+ def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event) -> None:
162
+ """Background thread — scrapes live chat via YouTube Data API v3."""
163
+ api_key = os.getenv("YOUTUBE_API_KEY", "")
164
+ if not api_key:
165
+ logger.error("YOUTUBE_API_KEY env var not set. Cannot start scraper.")
166
  return
167
 
168
+ logger.info("Scraper thread starting — video=%s key=%s", video_id, redis_key)
169
+
170
+ # Step 1: get the live chat ID
171
+ live_chat_id = _get_live_chat_id(video_id, api_key)
172
+ if not live_chat_id:
173
+ logger.error("Could not get live chat ID for video=%s", video_id)
174
  return
175
 
176
+ logger.info("Live chat ID obtained: %s", live_chat_id)
177
 
178
+ # Step 2: poll for messages
179
+ page_token = None
180
+ seen_ids: set = set() # avoid reprocessing messages on first page
181
+
182
+ while not stop_event.is_set():
183
+ messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token)
184
+
185
+ for item in messages:
186
+ if stop_event.is_set():
187
+ break
188
+
189
+ msg_id = item.get("id", "")
190
+ if msg_id in seen_ids:
191
+ continue
192
+ seen_ids.add(msg_id)
193
+
194
+ snippet = item.get("snippet", {})
195
+ # only process text messages
196
+ if snippet.get("type") != "textMessageEvent":
197
+ continue
198
+
199
+ text = snippet.get("displayMessage", "").strip()
200
+ author = item.get("authorDetails", {}).get("displayName", "Unknown")
201
+
202
+ if not text:
203
+ continue
204
+
205
+ sentiment, s_conf = _safe_sentiment(text)
206
+ topic, t_conf = _safe_topic(text)
207
+
208
+ message_data = {
209
+ "author": author,
210
+ "text": text,
211
+ "sentiment": sentiment,
212
+ "confidence": round(s_conf, 3),
213
+ "topic": topic,
214
+ "topic_conf": round(t_conf, 3),
215
+ "time": datetime.now().isoformat(),
216
+ }
217
+ store_rpush(redis_key, json.dumps(message_data))
218
+
219
+ # keep seen_ids from growing unbounded
220
+ if len(seen_ids) > 5000:
221
+ seen_ids = set(list(seen_ids)[-2000:])
222
+
223
+ # respect YouTube's requested polling interval (min 3s to be safe)
224
+ wait_s = max(poll_ms / 1000, 3.0)
225
+ stop_event.wait(timeout=wait_s)
226
 
227
  logger.info("Scraper thread ended — key=%s", redis_key)
228
 
requirements.txt CHANGED
@@ -7,8 +7,7 @@ sentencepiece>=0.1.99
7
  emoji>=2.10.0
8
  deep-translator>=1.11.4
9
 
10
- # Live chat scraping
11
- pytchat>=0.5.5
12
 
13
  # Dashboard
14
  streamlit>=1.35.0
 
7
  emoji>=2.10.0
8
  deep-translator>=1.11.4
9
 
10
+ # Live chat scraping (now uses YouTube Data API v3 — no extra package needed)
 
11
 
12
  # Dashboard
13
  streamlit>=1.35.0