一个基于 Selenium 的抖音自动评论系统,支持自动获取视频评论并通过 AI 生成回复。
B站视频演示
主要功能
- 自动浏览抖音视频
- 获取视频标题和评论
- 通过 AI 接口生成智能回复
- 自动发送评论
- 支持直播检测和处理
- 图形化操作界面
文件说明
主要文件
-
run.py: 主程序入口,提供图形界面
- 服务控制(启动/停止)
- 脚本控制(启动/暂停/停止)
- 配置管理(等待时间、API提示词)
- 窗口管理(置顶/取消置顶)
- 控制台输出
-
douyin/main.py: 核心运行脚本
- 循环处理视频
- 直播检测
- 评论获取和发送
- 自动切换视频
功能模块
-
douyin/douyinrespond.py: 浏览器管理模块
- 浏览器初始化
- Cookie 管理
- 登录状态维护
- 单例模式实现
-
douyin/getcomment.py: 评论获取模块
- 视频标题获取
- 评论内容获取
- 评论过滤
- API 调用
-
douyin/inputdef.py: 评论输入模块
- 评论框定位
- 文本输入
- 评论发送
- 坐标定位方案
-
douyin/testiflive.py: 直播测试模块
- 直播页面检测
- 直播特征识别
- 测试工具
API 服务
-
douban_chat_selenium.py: 豆包聊天爬虫
- 浏览器自动化
- 登录状态管理
- 网络请求获取
-
douban_chat_client.py: 豆包客户端
- 单例模式
- 聊天功能封装
- 会话管理
-
douban_chat_service.py: API 服务
- Flask Web 服务
- 聊天请求处理
- 状态管理
配置文件
- douyin/config.txt: 视频切换等待时间配置
- douyin/commandtodoubao.txt: API 追加提示词配置
- douyin/browser_info.json: 浏览器信息
- douyin/login_state.json: 登录状态信息
环境要求
- Python 3.7+
- Chrome 浏览器
- ChromeDriver(与 Chrome 版本匹配)
依赖安装
bash
pip install -r requirements.txt
使用说明
- 确保 Chrome 和 ChromeDriver 已正确安装
- 将 Chrome 和 ChromeDriver 放在项目根目录
- 安装所需依赖
- 运行 run.py 启动图形界面
- 先启动服务,再启动脚本
- 根据需要调整配置
注意事项
- 首次运行需要手动登录抖音
- 确保网络连接稳定
- 不要手动关闭浏览器窗口
- 建议使用管理员权限运行
- 遇到问题可查看控制台输出
开发说明
- 使用 PyQt5 构建图形界面
- 采用单例模式管理浏览器实例
- 支持热更新配置
- 提供完整的错误处理
- 包含详细的日志记录
更新日志
- 2024.03: 添加坐标定位方案
- 2024.02: 优化直播检测
- 2024.01: 实现图形界面
部分前端重要代码:
run.py
main.py
from getcomment import CommentCrawler
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import time
import sys
def simulate_x_key(crawler):
"""模拟按X键的方法"""
try:
# 调用 getcomment.py 中的方法
crawler._simulate_x_key()
print("已模拟按X键")
except Exception as e:
print(f"模拟按X键失败: {str(e)}")
#检测直播方法
def check_is_live(crawler):
"""
检查当前页面是否为直播页面
Args:
crawler: 爬虫实例,需要包含driver属性
Returns:
bool: True表示是直播页面,False表示是普通视频页面
"""
is_live = crawler.driver.execute_script("""
// 优先检查是否存在评论提示文字(普通视频特有)
function checkCommentText() {
let spans = document.getElementsByTagName('span');
for (let span of spans) {
// 检查span是否有class属性且文本内容完全匹配
if (span.className &&
span.textContent === '留下你的精彩评论吧') {
console.log("找到了评论提示,是普通视频");
return true; // 找到评论提示,说明是普通视频
}
}
return false; // 没找到评论提示
}
// 检查是否存在直播间提示文字
function checkLiveText() {
let divs = document.getElementsByTagName('div');
for (let div of divs) {
if (div.children.length === 1 &&
div.children[0].tagName === 'DIV' &&
div.textContent &&
div.textContent.includes('进入直播间')) {
if (div.className && div.children[0].className) {
console.log("找到了直播间提示");
return true; // 找到直播间提示,说明是直播
}
}
}
return false; // 没找到直播间提示
}
// 先检查是否是普通视频
if (checkCommentText()) {
return false; // 是普通视频,返回not live
}
// 如果不是普通视频,再检查是否是直播
return checkLiveText();
""")
return is_live
def main():
"""主函数:循环执行评论获取和发送"""
try:
# 初始化爬取器(只执行一次)
crawler = CommentCrawler()
print("\n初始化完成,开始循环执行...")
# 初始化循环计数和上一次标题
runacount = 0
last_title = ""
# 添加连续未获取评论计数
no_comments_count = 0
# 模拟按X键
crawler._simulate_x_key()
time.sleep(2) # 等待按键响应
while True: # 添加循环,用于重新检测
# 检查是否为直播页面
is_live_page = check_is_live(crawler)
if is_live_page:
print("检测到直播页面,开始处理直播...")
crawler._handle_live_video()
print("直播处理完成,继续检测当前视频是否是直播...")
continue # 继续循环,重新检测
else:
print("这是普通视频,继续正常处理...")
break # 如果是普通视频,退出循环继续后续处理
# 循环执行
while True:
try:
# 更新循环计数
runacount += 1
print(f"\n当前是第 {runacount} 次循环")
print("="*50)
print("开始新一轮评论处理")
print("="*50)
while True: # 添加循环,用于重新检测
# 检查是否为直播页面
is_live_page = check_is_live(crawler)
if is_live_page:
print("检测到直播页面,开始处理直播...")
crawler._handle_live_video()
print("直播处理完成,继续检测当前视频是否是直播...")
continue # 继续循环,重新检测
else:
print("这是普通视频,继续正常处理...")
break # 如果是普通视频,退出循环继续后续处理
# 获取评论和标题
comments = crawler.get_comments_by_xpath()
current_title = crawler.get_video_title()
# 检查标题是否与上次相同
if current_title == last_title:
print("检测到重复标题,设置为空")
title = ""
else:
title = current_title
last_title = current_title # 更新上次标题
if comments:
# 重置未获取评论计数
no_comments_count = 0
# 格式化输出
formatted_text = crawler.format_output(title, comments)
# 发送到API并自动评论
crawler.send_to_api(formatted_text)
else:
print("\n未获取到有效评论")
no_comments_count += 1
# 检查是否连续两次未获取到评论
if no_comments_count >= 5:
print("\n警告:连续5次未获取到评论!")
print("正在重新打开抖音首页...")
crawler.driver.get("https://www.douyin.com/")
time.sleep(3) # 等待页面加载
# 模拟按X键
crawler._simulate_x_key()
time.sleep(2) # 等待按键响应
print("已重新打开抖音首页")
no_comments_count = 0 # 重置计数
continue # 跳过后续操作,重新开始循环
# 等待一段时间
try:
with open('config.txt', 'r') as f:
wait_time = int(f.readline().strip())
except:
wait_time = 5 # 如果读取失败则使用默认值
print(f"\n等待{wait_time}秒后切换到下一个视频...")
time.sleep(wait_time)
# 切换到下一个视频
crawler._handle_live_video()
# 等待新视频加载
print("\n等待新视频加载...")
time.sleep(3)
print("\n新视频加载后跳回检测起点...")
except Exception as e:
print(f"\n本轮处理出错: {str(e)}")
print("等待10秒后继续下一轮...")
time.sleep(10)
continue
except KeyboardInterrupt:
print(f"\n\n检测到Ctrl+C,正在退出程序...")
print(f"本次共执行了 {runacount} 次循环")
except Exception as e:
print(f"\n程序发生错误: {str(e)}")
print(f"本次共执行了 {runacount} 次循环")
finally:
try:
# 清理资源
crawler.close()
except:
pass
print("\n程序已退出")
if __name__ == "__main__":
main()
from douyinrespond import DouyinBrowser
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import time
import re
import requests
import sys
from inputdef import CommentInput
import os
class CommentCrawler:
def __init__(self):
"""初始化评论爬取器"""
self.browser = DouyinBrowser.get_instance()
self.driver = None
self.initialize()
# 获取项目根目录路径
self.root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def initialize(self):
"""初始化浏览器和驱动"""
if not self.browser.is_initialized:
self.browser.initialize()
self.driver = self.browser.driver
# 等待页面加载完成
time.sleep(2)
def _simulate_x_key(self):
"""模拟按X键"""
try:
if not self.driver:
return
# 等待页面加载完成
time.sleep(2)
# 确保焦点在页面上
body = self.driver.find_element(By.TAG_NAME, "body")
body.click()
# 模拟按X键
ActionChains(self.driver).send_keys('x').perform()
time.sleep(2) # 等待按键响应
except Exception as e:
print(f"模拟按X键失败: {str(e)}")
def _filter_comment(self, text):
"""过滤评论内容"""
# 去除首尾空白
text = text.strip()
# 如果是空文本,返回None
if not text:
return None
# 需要过滤的关键词
filter_words = ['作者回复过', '回复', '展开', '收起', '复制链接']
# 如果文本包含任何过滤关键词,返回None
if any(word in text for word in filter_words):
return None
# 过滤纯数字(包括带空格的)
if re.match(r'^\s*\d+\s*$', text):
return None
# 过滤日期格式
date_patterns = [
r'\d{1,2}-\d{1,2}', # 匹配 MM-DD
r'\d{4}-\d{1,2}-\d{1,2}', # 匹配 YYYY-MM-DD
r'\d{1,2}月\d{1,2}日', # 匹配 MM月DD日
r'\d+天前', # 匹配 X天前
r'\d+分钟前', # 匹配 X分钟前
r'\d+小时前' # 匹配 X小时前
]
if any(re.match(pattern, text) for pattern in date_patterns):
return None
# 计算中文字符数量
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
if len(chinese_chars) < 10:
return None
return text
def get_video_title(self):
"""获取视频标题"""
try:
# 等待页面加载完成
time.sleep(2)
# 使用JavaScript获取当前视频标题
title = self.driver.execute_script("""
// 查找所有可能的标题元素
function findTitle() {
// 尝试多种选择器
let selectors = [
'.video-info-detail .title.notBideoTags span.HGWtaPTo', // 主标题
'.video-info-detail .title span.arnSiSbK', // 备选标题
'.video-info-detail .title span', // 通用标题
'.video-info-detail .title', // 标题容器
'.video-info-detail .desc' // 描述文本
];
// 遍历选择器
for (let selector of selectors) {
let element = document.querySelector(selector);
if (element && element.textContent.trim()) {
return element.textContent.trim();
}
}
return null;
}
// 等待标题加载
let maxAttempts = 5;
let attempt = 0;
let title = null;
while (attempt < maxAttempts) {
title = findTitle();
if (title) break;
attempt++;
}
return title;
""")
print('原始标题内容:' + title)
if title:
# 清理标题文本
title = re.sub(r'#[\w\u4e00-\u9fff]+', '', title) # 移除话题标签
title = re.sub(r'@[\w\u4e00-\u9fff]+', '', title) # 移除@标签
title = title.strip()
# print(f"\n获取到视频标题: {title}")
return title
else:
# print("\n未找到视频标题")
return "未知标题"
except Exception as e:
print(f"获取视频标题失败: {str(e)}")
return "获取标题失败"
def _handle_live_video(self):
"""切换到下一个视频"""
try:
# 模拟按下方向键
ActionChains(self.driver).send_keys(Keys.ARROW_DOWN).perform()
time.sleep(5) # 等待页面切换
print("已切换到下一个视频")
# 模拟按空格键
ActionChains(self.driver).send_keys(Keys.SPACE).perform()
time.sleep(0.5) # 等待空格键响应
except Exception as e:
print(f"切换视频失败: {str(e)}")
# def get_comments(self, url=None, max_comments=50):
# """获取评论内容"""
# try:
# if url:
# self.driver.get(url)
# time.sleep(3)
#
# print("\n开始获取评论...")
# comments = []
# last_height = self.driver.execute_script("return document.documentElement.scrollHeight")
# no_new_comments_count = 0
#
# while len(comments) < max_comments and no_new_comments_count < 3:
# initial_comment_count = len(comments)
#
# # 查找所有回复元素
# reply_elements = WebDriverWait(self.driver, 10).until(
# EC.presence_of_all_elements_located((By.XPATH, "//span[text()='回复']"))
# )
#
# # 获取每个回复元素的上方评论
# for reply in reply_elements:
# try:
# comment_element = reply.find_element(By.XPATH,
# "./ancestor::div[contains(@class, 'comment')]//span[not(text()='回复')]")
# comment_text = self._filter_comment(comment_element.text)
# if comment_text and comment_text not in comments:
# comments.append(comment_text)
# print(f"找到评论 [{len(comments)}]: {comment_text}")
# except:
# continue
#
# # 滚动到页面底部
# self.driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
# time.sleep(2)
#
# # 检查是否有新评论
# if len(comments) == initial_comment_count:
# no_new_comments_count += 1
# else:
# no_new_comments_count = 0
#
# # 检查是否到达底部
# new_height = self.driver.execute_script("return document.documentElement.scrollHeight")
# if new_height == last_height:
# no_new_comments_count += 1
# last_height = new_height
#
# return comments
#
# except Exception as e:
# print(f"获取评论失败: {str(e)}")
# return []
def get_comments_by_xpath(self, max_comments=50):
"""使用XPath精确定位评论"""
try:
# 首先获取视频标题
time.sleep(3)
title = self.get_video_title()
if title:
print(f"\n视频标题: {title}")
else:
print("\n未能获取到视频标题")
print("\n开始获取评论...")
comments = []
last_height = self.driver.execute_script("return document.documentElement.scrollHeight")
no_new_comments_count = 0
while len(comments) < max_comments and no_new_comments_count < 3:
initial_comment_count = len(comments)
comment_elements = WebDriverWait(self.driver, 10).until(
EC.presence_of_all_elements_located((
By.XPATH,
"//span[text()='回复']/ancestor::div[contains(@class, 'comment')]//span[not(contains(@class, 'action')) and not(text()='回复')]"
))
)
for element in comment_elements:
comment_text = self._filter_comment(element.text)
if comment_text and comment_text not in comments:
comments.append(comment_text)
print(f"找到评论 [{len(comments)}]: {comment_text}")
# 滚动到页面底部
self.driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
time.sleep(2)
# 检查是否有新评论
if len(comments) == initial_comment_count:
no_new_comments_count += 1
else:
no_new_comments_count = 0
# 检查是否到达底部
new_height = self.driver.execute_script("return document.documentElement.scrollHeight")
if new_height == last_height:
no_new_comments_count += 1
last_height = new_height
return comments
except Exception as e:
print(f"获取评论失败: {str(e)}")
return []
def close(self):
"""关闭爬取器"""
try:
self.browser.close()
sys.exit(0) # 确保脚本完全停止
except:
pass
def format_output(self, title, comments):
"""格式化输出标题和评论"""
try:
# 处理标题
title_text = f"标题:{title if title else '未获取到标题'}"
# 处理评论
comment_texts = []
for i, comment in enumerate(comments, 1):
comment_texts.append(f"{i}、{comment}")
# 读取commandtodoubao.txt的内容
command_text = ""
try:
with open('commandtodoubao.txt', 'r', encoding='utf-8') as f:
command_text = " " + f.read().strip() # 用空格代替换行符
time.sleep(1)
except Exception as e:
print(f"\n读取commandtodoubao.txt失败: {str(e)}")
# 组合评论文本
comment_text = "评论:" + "".join(comment_texts)
# 组合最终输出(标题 + 评论 + 命令)
final_text = title_text + " " + comment_text
if command_text:
final_text += command_text
return final_text
except Exception as e:
print(f"格式化输出失败: {str(e)}")
return f"标题:{title if title else '未获取到标题'} 评论:获取失败"
def send_to_api(self, formatted_text):
"""发送格式化文本到API并自动填写评论"""
self.mycomment = ""
try:
# 发送POST请求到API
response = requests.post(
'http://localhost:5000/chat',
json={'message': formatted_text}
)
# 获取并处理响应
result = response.json()
if result.get('success'):
# 获取响应文本
response_text = result.get('response', '无响应内容')
# 检查是否包含关键词
if '具体需求' in response_text or '消息发送失败' in response_text:
filtered_response = "666"
else:
# 过滤响应内容
filtered_lines = []
for line in response_text.split('\n'):
# 跳过包含"回复完毕"的行
if '回复完毕' in line:
continue
# 基本清理
line = line.strip()
# 计算中文字符数量
chinese_chars = len([c for c in line if '\u4e00' <= c <= '\u9fff'])
# 保留有效内容
if chinese_chars >= 4:
filtered_lines.append(line)
# 合并所有有效行
filtered_response = '\n'.join(filtered_lines)
# 存储过滤后的响应
self.mycomment = filtered_response
# 打印API响应
print("\nAPI响应:")
print("-" * 50)
print(filtered_response)
print("-" * 50)
# 检查当前URL
current_url = self.driver.current_url
if 'search' in current_url.lower():
print("\n检测到search在网址里,采用inputdef.py方法发送评论")
comment_input = CommentInput(self.driver)
if comment_input.post_comment(response_text):
print("评论已发送")
else:
print("评论发送失败")
else:
print("\n使用默认方法发送评论")
# 原有的评论发送逻辑
# 尝试定位并操作输入框
result = self.driver.execute_script("""
// 查找输入框容器
let container = document.querySelector('.comment-input-inner-container');
if (!container) {
return { success: false, message: '未找到评论容器' };
}
// 点击容器以激活输入
container.click();
// 等待输入框出现并设置内容
return new Promise((resolve) => {
setTimeout(() => {
// 查找实际的输入框
let input = container.querySelector('.public-DraftStyleDefault-block') ||
container.querySelector('[contenteditable="true"]') ||
container.querySelector('.lFk180Rt');
if (!input) {
resolve({ success: false, message: '未找到输入框元素' });
return;
}
// 设置文本
let text = arguments[0];
input.textContent = text;
// 触发输入事件
input.dispatchEvent(new InputEvent('input', {
bubbles: true,
cancelable: true,
inputType: 'insertText',
data: text
}));
// 等待内容设置完成
setTimeout(() => {
resolve({
success: true,
message: '内容已设置',
text: input.textContent,
element: input
});
}, 500);
}, 500);
});
""", filtered_response)
if result and result.get('success'):
print("\n评论内容设置成功")
# 发送评论
send_result = self.driver.execute_script("""
let input = arguments[0];
// 创建回车键事件
let enterEvent = new KeyboardEvent('keydown', {
key: 'Enter',
code: 'Enter',
keyCode: 13,
which: 13,
bubbles: true,
cancelable: true
});
// 触发回车键事件
input.dispatchEvent(enterEvent);
return true;
""", result['element'])
if send_result:
print("\n评论已发送")
time.sleep(1) # 等待发送完成
else:
print("\n发送评论失败")
else:
print(f"\n设置评论内容失败: {result.get('message', '未知错误')}")
else:
print(f"错误: {result.get('error', '未知错误')}")
print("-" * 50)
except requests.exceptions.ConnectionError:
print("\nAPI连接失败: 请确保API服务正在运行 (http://localhost:5000)")
except Exception as e:
print(f"\nAPI调用失败: {str(e)}")
def main():
try:
crawler = CommentCrawler()
video_url = input("请输入抖音视频链接(直接回车使用当前页面):").strip()
if video_url:
comments = crawler.get_comments_by_xpath()
# 获取标题
title = crawler.get_video_title()
# 显示详细输出
print("\n获取到的评论:")
print("-" * 50)
for i, comment in enumerate(comments, 1):
print(f"{i}. {comment}")
print("-" * 50)
print(f"\n共获取到 {len(comments)} 条评论")
# 格式化输出并发送到API
formatted_text = crawler.format_output(title, comments)
# 显示格式化输出
print("\n格式化输出:")
print(formatted_text)
# 发送完整的格式化文本到API
print("\n正在发送到API...")
crawler.send_to_api(formatted_text) # 发送包含命令文本的完整内容
except Exception as e:
print(f"运行出错: {str(e)}")
finally:
input("\n按回车键退出...")
crawler.close()
# if __name__ == "__main__":
# main()
后端API代码
from flask import Flask, request, jsonify
from douban_chat_client import DoubanChatClient
import threading
import time
app = Flask(__name__)
# 全局变量用于追踪服务状态
service_status = {
"is_initialized": False,
"init_lock": threading.Lock(),
"last_activity": time.time()
}
def initialize_client():
"""初始化聊天客户端"""
with service_status["init_lock"]:
if not service_status["is_initialized"]:
client = DoubanChatClient.get_instance()
if client.initialize():
service_status["is_initialized"] = True
return True
return service_status["is_initialized"]
@app.route('/chat', methods=['POST'])
def chat():
"""处理聊天请求"""
try:
# 检查请求数据
data = request.get_json()
if not data or 'message' not in data:
return jsonify({
'success': False,
'error': '缺少消息内容'
}), 400
message = data['message']
# 确保客户端已初始化
if not service_status["is_initialized"]:
if not initialize_client():
return jsonify({
'success': False,
'error': '客户端初始化失败'
}), 500
# 发送消息并获取回复
client = DoubanChatClient.get_instance()
response = client.chat(message)
# 更新最后活动时间
service_status["last_activity"] = time.time()
return jsonify({
'success': True,
'response': response
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/status', methods=['GET'])
def status():
"""获取服务状态"""
return jsonify({
'initialized': service_status["is_initialized"],
'last_activity': service_status["last_activity"]
})
def cleanup_service():
"""清理服务资源"""
if service_status["is_initialized"]:
try:
client = DoubanChatClient.get_instance()
client.close()
except:
pass
finally:
service_status["is_initialized"] = False
# 注册退出时的清理函数
import atexit
atexit.register(cleanup_service)
if __name__ == '__main__':
try:
# 启动时初始化客户端
initialize_client()
# 启动服务
app.run(host='0.0.0.0', port=5000)
finally:
cleanup_service()
se
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import time
import json
import os
import sys
import select
import random
import string
import pyperclip
# 根据操作系统导入不同的模块
if sys.platform != 'win32':
import termios
class DouBanChatCrawler:
def __init__(self):
# 获取项目根目录路径
root_dir = os.path.dirname(os.path.abspath(__file__))
# 配置Chrome选项
self.chrome_options = Options()
# 设置Chrome二进制文件路径
chrome_path = os.path.join(root_dir, "chrome-win64", "chrome.exe")
if not os.path.exists(chrome_path):
raise Exception(f"Chrome不存在: {chrome_path}")
self.chrome_options.binary_location = chrome_path
# 添加参数
self.chrome_options.add_argument('--disable-gpu')
self.chrome_options.add_argument('--disable-software-rasterizer')
self.chrome_options.add_argument('--disable-dev-shm-usage')
self.chrome_options.add_argument('--no-sandbox')
self.chrome_options.add_argument('--ignore-certificate-errors')
self.chrome_options.add_argument('--enable-unsafe-swiftshader')
self.chrome_options.add_argument('--disable-web-security')
self.chrome_options.add_argument('--disable-blink-features=AutomationControlled')
# 添加实验性选项
self.chrome_options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
self.chrome_options.add_experimental_option('useAutomationExtension', False)
try:
# 设置ChromeDriver路径
chromedriver_path = os.path.join(root_dir, "chromedriver.exe")
if not os.path.exists(chromedriver_path):
raise Exception(f"ChromeDriver不存在: {chromedriver_path}")
service = Service(chromedriver_path)
# 初始化浏览器
self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
# 修改 window.navigator.webdriver 标记
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
# 最大化窗口
self.driver.maximize_window()
except Exception as e:
print(f"浏览器初始化失败: {str(e)}")
raise
def wait_for_network_idle(self, timeout=10):
"""等待网络请求完成"""
time.sleep(timeout) # 简单的等待方式
def get_network_requests(self):
"""获取网络请求信息,优化过滤逻辑"""
return self.driver.execute_script("""
let items = [];
try {
let entries = performance.getEntries() || [];
items = entries.filter(e => {
// 扩展过滤条件以包含更多相关接口
const apiPatterns = [
'/samantha/', // 主要的API接口
'/alice/', // 用户相关接口
'/chat/', // 聊天相关接口
'/message/', // 消息相关接口
'/monitor_browser/' // 监控相关接口
];
return apiPatterns.some(pattern => e.name.includes(pattern)) &&
e.entryType === 'resource' &&
(e.initiatorType === 'xmlhttprequest' || e.initiatorType === 'fetch');
}).map(e => ({
url: e.name,
method: e.initiatorType,
type: e.initiatorType,
duration: e.duration,
status: e.responseStatus,
timestamp: e.startTime,
size: e.transferSize || 0
}));
// 按时间戳排序
items.sort((a, b) => a.timestamp - b.timestamp);
} catch(err) {
console.error('Error:', err);
}
return items;
""")
def generate_chat_id(self):
"""生成5位数字和字母的唯一对话标识"""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(5))
def send_message(self, message):
"""发送消息到豆包对话框"""
try:
# 检查消息内容
if message.strip() == "":
print("消息内容为空,已取消发送")
return False
# 生成唯一对话标识
chat_id = self.generate_chat_id()
# 在消息后添加完成提示请求和唯一标识
message_with_id = f"{message},回复完毕请告诉我本次回复完毕[{chat_id}]"
max_retries = 3
for attempt in range(max_retries):
try:
# 重置页面焦点
self.driver.execute_script("document.activeElement.blur();")
time.sleep(0.1)
# 重新定位输入框
input_box = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "textarea.semi-input-textarea"))
)
# 确保输入框可交互
WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "textarea.semi-input-textarea"))
)
# 重置输入框状态
self.driver.execute_script("""
var input = arguments[0];
input.value = '';
input.blur();
setTimeout(() => {
input.focus();
input.click();
}, 100);
""", input_box)
time.sleep(0.2)
# 再次检查输入框状态
input_box = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "textarea.semi-input-textarea"))
)
# 直接输入消息
input_box.clear()
input_box.send_keys(message_with_id)
time.sleep(0.1)
# 发送消息
input_box.send_keys(Keys.ENTER)
time.sleep(0.2)
# 验证消息是否发送成功
try:
current_value = input_box.get_attribute('value')
if not current_value.strip():
print(f"已发送消息: {message}")
# 发送成功后重置输入框状态
self.driver.execute_script("""
var input = arguments[0];
input.value = '';
input.blur();
""", input_box)
return chat_id
except:
# 如果无法获取值,可能是因为消息已发送
print(f"已发送消息: {message}")
return chat_id
if attempt < max_retries - 1:
print(f"重试发送消息 (尝试 {attempt + 2}/{max_retries})")
time.sleep(0.5)
continue
except Exception as e:
if attempt < max_retries - 1:
print(f"发送失败,正在重试 ({attempt + 2}/{max_retries})")
time.sleep(0.5)
continue
else:
raise e
return None
except Exception as e:
print(f"发送消息失败: {str(e)}")
import traceback
print(traceback.format_exc())
return None
def wait_for_response(self, timeout=30, chat_id=None):
"""等待并获取豆包的回复"""
try:
start_time = time.time()
last_check_time = time.time()
# 获取初始状态
initial_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"div.paragraph-JOTKXA, li, pre.container-_HmLba"
)
initial_count = len(initial_elements)
last_response_text = ""
print(f"等待新回复... (当前元素数: {initial_count})")
print("按回车键可以中断等待")
import threading
interrupt_wait = threading.Event()
def check_input():
try:
# 使用 select 来检测输入,避免阻塞
while not interrupt_wait.is_set():
if sys.platform == 'win32':
import msvcrt
if msvcrt.kbhit():
if msvcrt.getch() in [b'\r', b'\n']:
interrupt_wait.set()
break
else:
if select.select([sys.stdin], [], [], 0.1)[0]:
if sys.stdin.read(1) in ['\n', '\r']:
interrupt_wait.set()
break
time.sleep(0.1)
except:
pass
# 启动输入监听线程
input_thread = threading.Thread(target=check_input)
input_thread.daemon = True
input_thread.start()
# 等待回复的主循环
while time.time() - start_time < timeout and not interrupt_wait.is_set():
try:
current_time = time.time()
# 获取所有回复元素
current_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"div.paragraph-JOTKXA, li, pre.container-_HmLba"
)
# 获取当前所有回复文本
current_response_text = ""
code_block_complete = False
response_complete = False
for element in current_elements[initial_count:]:
if element.tag_name == 'pre':
try:
# 查找代码块中的复制按钮
copy_button = element.find_element(
By.CSS_SELECTOR,
"button[data-testid='message_action_copy']"
)
# 点击复制按钮
copy_button.click()
time.sleep(0.1)
# 从剪贴板获取代码
text = pyperclip.paste()
if text.strip():
# 检查代码块中是否包含对话标识,如果有则移除
if chat_id:
text = text.replace(f"[{chat_id}]", "")
current_response_text += text.strip() + "\n"
# 检查代码块是否完整
if "def" in text and "if __name__ == \"__main__\":" in text:
code_block_complete = True
except Exception as e:
# 如果复制按钮不可用,回退到原来的方法
code_element = element.find_element(By.CSS_SELECTOR, "code")
text = code_element.text
if text.strip():
current_response_text += text.strip() + "\n"
else:
text = element.text.strip()
if text:
# 检查是否包含完成标记
if chat_id and f"回复完毕[{chat_id}]" in text:
response_complete = True
# 移除标记和标识
if chat_id:
text = text.replace(f"回复完毕[{chat_id}]", "")
text = text.replace(f"[{chat_id}]", "")
text = text.replace(",本次回复完毕", "").replace("本次回复完毕", "")
text = text.strip()
if text:
current_response_text += text + "\n"
# 检查是否有新内容
if current_response_text != last_response_text:
last_check_time = current_time
last_response_text = current_response_text
# 检查回复是否完成
if chat_id:
response_complete = f"回复完毕[{chat_id}]" in current_response_text
else:
response_complete = "回复完毕" in current_response_text
# 如果有回复内容,且满足完成条件
if current_response_text and (
response_complete or
(code_block_complete and current_time - last_check_time > 3) or
(current_time - last_check_time > 5)
):
# 处理回复文本
response_parts = []
for element in current_elements[initial_count:]:
if element.tag_name == 'pre':
try:
# 查找并点击复制按钮
copy_button = element.find_element(
By.CSS_SELECTOR,
"button[data-testid='message_action_copy']"
)
copy_button.click()
time.sleep(0.1)
# 从剪贴板获取代码并清理
text = pyperclip.paste()
if text.strip():
# 移除代码中的标识
if chat_id:
text = text.replace(f"[{chat_id}]", "")
response_parts.append("\n```python\n" + text.strip() + "\n```\n")
except:
# 如果复制按钮不可用,回退到原来的方法
code_element = element.find_element(By.CSS_SELECTOR, "code")
text = code_element.text
if text.strip():
response_parts.append("\n```python\n" + text.strip() + "\n```\n")
else:
text = element.text.strip()
if text:
# 移除所有标记和标识
if chat_id:
text = text.replace(f"回复完毕[{chat_id}]", "")
text = text.replace(f"[{chat_id}]", "")
text = text.replace(",本次回复完毕", "").replace("本次回复完毕", "")
text = text.strip()
if text:
if element.tag_name == 'li':
response_parts.append(f"• {text}")
else:
response_parts.append(text)
if response_parts:
complete_response = "\n".join(response_parts)
if response_complete:
print(f"收到完整回复 (元素数: {len(current_elements) - initial_count})")
elif code_block_complete:
print("代码块已完成")
else:
print("回复似乎已完成")
return complete_response
time.sleep(0.2)
except Exception as e:
print(f"获取回复时出错: {str(e)}")
time.sleep(0.2)
continue
# 等待输入线程完成
interrupt_wait.set() # 确保输入线程退出
input_thread.join(0.1)
# 清空输入缓冲区
if sys.platform == 'win32':
import msvcrt
while msvcrt.kbhit():
msvcrt.getch()
elif hasattr(sys, 'stdin'): # 对于其他系统
try:
import termios
termios.tcflush(sys.stdin, termios.TCIOFLUSH)
except (ImportError, AttributeError):
pass # 如果无法使用 termios,就跳过清理
if interrupt_wait.is_set():
return "等待被中断"
return "等待回复超时"
except Exception as e:
print(f"等待回复失败: {str(e)}")
import traceback
print(traceback.format_exc())
return None
def chat_session(self):
"""启动交互式聊天会话"""
print("\n=== 豆包聊天会话已启动 ===")
print("输入 'quit' 退出会话")
# 用于跟踪上一次响应的状态
last_response_time = 0
while True:
try:
# 确保在获取新输入前有足够的冷却时间
current_time = time.time()
if current_time - last_response_time < 0.5: # 减少冷却时间
time.sleep(0.5)
# 使用 input() 获取用户输入
raw_message = input("\n你: ")
# 处理消息内容
message = raw_message.strip()
# 检查是否为退出命令
if message.lower() == 'quit':
break
# 检查消息是否为空或太短
if not message:
print("请输入有效的消息内容")
continue
if len(message) < 2: # 允许单个字符的消息,但给出提示
print("提示:消息太短可能无法获得好的回复,是否继续?(y/n)")
confirm = input().strip().lower()
if confirm != 'y':
continue
# 清除可能存在的旧输入框内容
try:
input_box = self.driver.find_element(By.CSS_SELECTOR, "textarea.semi-input-textarea")
if input_box:
self.driver.execute_script("arguments[0].value = '';", input_box)
except:
pass
# 发送消息并获取对话标识
chat_id = self.send_message(message)
if chat_id:
print("\n豆包正在回复...")
response = self.wait_for_response(chat_id=chat_id)
if response:
print(f"\n豆包: {response}")
last_response_time = time.time()
time.sleep(0.5)
else:
print("未能获取到回复")
last_response_time = time.time()
except Exception as e:
print(f"聊天过程出错: {str(e)}")
continue
def save_login_state(self):
"""保存完整的登录状态到 cookie.txt"""
try:
# 获取 cookies
cookies = self.driver.get_cookies()
# 获取 localStorage
local_storage = self.driver.execute_script("""
let items = {};
try {
for (let i = 0; i < localStorage.length; i++) {
const key = localStorage.key(i);
items[key] = localStorage.getItem(key);
}
} catch(err) {
console.error('Error:', err);
}
return items;
""")
# 保存登录状态
login_state = {
'cookies': cookies,
'localStorage': local_storage,
'timestamp': time.time()
}
# 保存到 cookie.txt
with open('cookie.txt', 'w', encoding='utf-8') as f:
json.dump(login_state, f, ensure_ascii=False, indent=2)
print("已保存登录状态到 cookie.txt")
return True
except Exception as e:
print(f"保存登录状态失败: {str(e)}")
return False
def load_login_state(self):
"""从 cookie.txt 加载登录状态"""
try:
if not os.path.exists('cookie.txt'):
print("未找到登录状态文件,需要首次登录")
return False
with open('cookie.txt', 'r', encoding='utf-8') as f:
login_state = json.load(f)
# 检查登录状态是否过期(7天)
if time.time() - login_state.get('timestamp', 0) > 7 * 24 * 3600:
print("登录状态已过期,需要重新登录")
return False
# 先访问网站
self.driver.get("https://www.doubao.com")
time.sleep(1)
# 添加 cookies
for cookie in login_state.get('cookies', []):
try:
# 移除可能导致问题的属性
if 'expiry' in cookie:
del cookie['expiry']
self.driver.add_cookie(cookie)
except Exception as e:
print(f"添加cookie失败: {str(e)}")
continue
# 添加 localStorage
for key, value in login_state.get('localStorage', {}).items():
try:
self.driver.execute_script(f"window.localStorage.setItem('{key}', '{value}')")
except Exception as e:
print(f"添加localStorage失败: {str(e)}")
continue
print("已从 cookie.txt 加载登录状态")
return True
except Exception as e:
print(f"加载登录状态失败: {str(e)}")
return False
def check_login_status(self):
"""检查登录状态"""
try:
# 等待页面加载
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 检查是否存在登录相关元素
login_indicators = self.driver.execute_script("""
return {
'hasTextarea': !!document.querySelector('textarea.semi-input-textarea'),
'hasLoginButton': !!document.querySelector('button[data-testid="login-button"]'),
'hasUserInfo': !!document.querySelector('div[data-testid="user-info"]')
}
""")
return login_indicators.get('hasTextarea', False) and not login_indicators.get('hasLoginButton', True)
except Exception as e:
print(f"检查登录状态失败: {str(e)}")
return False
def get_chat_info(self):
try:
print("正在打开豆包网站...")
# 尝试从 cookie.txt 加载登录状态
if self.load_login_state():
print("已加载登录状态,尝试自动登录...")
self.driver.get("https://www.doubao.com/chat/")
time.sleep(3)
# 检查登录状态
if self.check_login_status():
print("自动登录成功")
else:
print("自动登录失败,需要重新登录")
print("\n请在打开的浏览器中完成以下操作:")
print("1. 登录豆包账号")
print("2. 进入任意对话")
print("3. 等待页面加载完成")
print("4. 按回车键继续...")
input()
# 保存新的登录状态到 cookie.txt
self.save_login_state()
else:
self.driver.get("https://www.doubao.com/chat/")
print("\n首次登录,请完成以下操作:")
print("1. 登录豆包账号")
print("2. 进入任意对话")
print("3. 等待页面加载完成")
print("4. 按回车键继续...")
input()
# 保存登录状态到 cookie.txt
self.save_login_state()
# 启动聊天会话
self.chat_session()
try:
# 获取cookies
cookies = self.driver.get_cookies()
cookie_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 获取localStorage
local_storage = self.driver.execute_script("""
let items = {};
try {
for (let i = 0; i < localStorage.length; i++) {
const key = localStorage.key(i);
items[key] = localStorage.getItem(key);
}
} catch(err) {
console.error('Error:', err);
}
return items;
""")
# 获取网络请求
network_requests = self.get_network_requests()
# 保存信息到文件
data = {
'cookies': cookie_dict,
'local_storage': local_storage,
'network_requests': network_requests
}
with open('chat_info.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("\n成功!所有信息已保存到 chat_info.json 文件中")
print(f"发现的网络请求数量: {len(network_requests)}")
print("\n主要API接口:")
# 按类型分组显示API接口
api_groups = {
'Samantha APIs': [req for req in network_requests if '/samantha/' in req['url']],
'Alice APIs': [req for req in network_requests if '/alice/' in req['url']],
'Chat APIs': [req for req in network_requests if '/chat/' in req['url']],
'Message APIs': [req for req in network_requests if '/message/' in req['url']],
'Monitor APIs': [req for req in network_requests if '/monitor/' in req['url']]
}
for group_name, requests in api_groups.items():
if requests:
print(f"\n{group_name}:")
for req in requests:
print(f"- {req['method'].upper()}: {req['url']}")
print(f" Duration: {req['duration']:.2f}ms, Size: {req['size']} bytes")
except Exception as e:
print(f"收集信息时发生错误: {str(e)}")
except Exception as e:
print(f"发生错误: {str(e)}")
finally:
input("\n按回车键关闭浏览器...")
try:
self.driver.quit()
except:
pass
def main():
try:
crawler = DouBanChatCrawler()
crawler.get_chat_info()
except Exception as e:
print(f"程序运行失败: {str(e)}")
input("按回车键退出...")
if __name__ == "__main__":
main()
service`
from flask import Flask, request, jsonify
from douban_chat_client import DoubanChatClient
import threading
import time
app = Flask(__name__)
# 全局变量用于追踪服务状态
service_status = {
"is_initialized": False,
"init_lock": threading.Lock(),
"last_activity": time.time()
}
def initialize_client():
"""初始化聊天客户端"""
with service_status["init_lock"]:
if not service_status["is_initialized"]:
client = DoubanChatClient.get_instance()
if client.initialize():
service_status["is_initialized"] = True
return True
return service_status["is_initialized"]
@app.route('/chat', methods=['POST'])
def chat():
"""处理聊天请求"""
try:
# 检查请求数据
data = request.get_json()
if not data or 'message' not in data:
return jsonify({
'success': False,
'error': '缺少消息内容'
}), 400
message = data['message']
# 确保客户端已初始化
if not service_status["is_initialized"]:
if not initialize_client():
return jsonify({
'success': False,
'error': '客户端初始化失败'
}), 500
# 发送消息并获取回复
client = DoubanChatClient.get_instance()
response = client.chat(message)
# 更新最后活动时间
service_status["last_activity"] = time.time()
return jsonify({
'success': True,
'response': response
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/status', methods=['GET'])
def status():
"""获取服务状态"""
return jsonify({
'initialized': service_status["is_initialized"],
'last_activity': service_status["last_activity"]
})
def cleanup_service():
"""清理服务资源"""
if service_status["is_initialized"]:
try:
client = DoubanChatClient.get_instance()
client.close()
except:
pass
finally:
service_status["is_initialized"] = False
# 注册退出时的清理函数
import atexit
atexit.register(cleanup_service)
if __name__ == '__main__':
try:
# 启动时初始化客户端
initialize_client()
# 启动服务
app.run(host='0.0.0.0', port=5000)
finally:
cleanup_service()
程序还在测试优化bug,正在打包程序,觉得对你有帮助的朋友可以到风车自主获取打包好的程序