一个基于 Selenium 的抖音自动评论系统,支持自动获取视频评论并通过 AI 生成回复。

B站视频演示

主要功能

  1. 自动浏览抖音视频
  2. 获取视频标题和评论
  3. 通过 AI 接口生成智能回复
  4. 自动发送评论
  5. 支持直播检测和处理
  6. 图形化操作界面

文件说明

主要文件

  • run.py: 主程序入口,提供图形界面

    • 服务控制(启动/停止)
    • 脚本控制(启动/暂停/停止)
    • 配置管理(等待时间、API提示词)
    • 窗口管理(置顶/取消置顶)
    • 控制台输出
  • douyin/main.py: 核心运行脚本

    • 循环处理视频
    • 直播检测
    • 评论获取和发送
    • 自动切换视频

功能模块

  • douyin/douyinrespond.py: 浏览器管理模块

    • 浏览器初始化
    • Cookie 管理
    • 登录状态维护
    • 单例模式实现
  • douyin/getcomment.py: 评论获取模块

    • 视频标题获取
    • 评论内容获取
    • 评论过滤
    • API 调用
  • 抖音智能评论机器人:全自动 AI 互动神器!使用python+cursor+豆包+抖音开发一套全自动智能精准评论系统;douyin/inputdef.py: 评论输入模块

    • 评论框定位
    • 文本输入
    • 评论发送
    • 坐标定位方案
  • douyin/testiflive.py: 直播测试模块

    • 直播页面检测
    • 直播特征识别
    • 测试工具

API 服务

  • douban_chat_selenium.py: 豆包聊天爬虫

    • 浏览器自动化
    • 登录状态管理
    • 网络请求获取
  • douban_chat_client.py: 豆包客户端

    • 单例模式
    • 聊天功能封装
    • 会话管理
  • douban_chat_service.py: API 服务

    • Flask Web 服务
    • 聊天请求处理
    • 状态管理

配置文件

  • douyin/config.txt: 视频切换等待时间配置
  • douyin/commandtodoubao.txt: API 追加提示词配置
  • douyin/browser_info.json: 浏览器信息
  • douyin/login_state.json: 登录状态信息

环境要求

  • Python 3.7+
  • Chrome 浏览器
  • ChromeDriver(与 Chrome 版本匹配)

依赖安装

bash
pip install -r requirements.txt

使用说明

  1. 确保 Chrome 和 ChromeDriver 已正确安装
  2. 将 Chrome 和 ChromeDriver 放在项目根目录
  3. 安装所需依赖
  4. 运行 run.py 启动图形界面
  5. 先启动服务,再启动脚本
  6. 根据需要调整配置

注意事项

  1. 首次运行需要手动登录抖音
  2. 确保网络连接稳定
  3. 不要手动关闭浏览器窗口
  4. 建议使用管理员权限运行
  5. 遇到问题可查看控制台输出

开发说明

  • 使用 PyQt5 构建图形界面
  • 采用单例模式管理浏览器实例
  • 支持热更新配置
  • 提供完整的错误处理
  • 包含详细的日志记录

更新日志

  • 2024.03: 添加坐标定位方案
  • 2024.02: 优化直播检测
  • 2024.01: 实现图形界面

部分前端重要代码:
run.py

main.py

from getcomment import CommentCrawler
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import time
import sys


def simulate_x_key(crawler):
    """模拟按X键的方法"""
    try:
        # 调用 getcomment.py 中的方法
        crawler._simulate_x_key()
        print("已模拟按X键")
    except Exception as e:
        print(f"模拟按X键失败: {str(e)}")

#检测直播方法
def check_is_live(crawler):
    """
    检查当前页面是否为直播页面

    Args:
        crawler: 爬虫实例,需要包含driver属性

    Returns:
        bool: True表示是直播页面,False表示是普通视频页面
    """
    is_live = crawler.driver.execute_script("""
        // 优先检查是否存在评论提示文字(普通视频特有)
        function checkCommentText() {
            let spans = document.getElementsByTagName('span');
            for (let span of spans) {
                // 检查span是否有class属性且文本内容完全匹配
                if (span.className && 
                    span.textContent === '留下你的精彩评论吧') {
                    console.log("找到了评论提示,是普通视频");
                    return true;  // 找到评论提示,说明是普通视频
                }
            }
            return false;  // 没找到评论提示
        }

        // 检查是否存在直播间提示文字
        function checkLiveText() {
            let divs = document.getElementsByTagName('div');
            for (let div of divs) {
                if (div.children.length === 1 && 
                    div.children[0].tagName === 'DIV' && 
                    div.textContent && 
                    div.textContent.includes('进入直播间')) {

                    if (div.className && div.children[0].className) {
                        console.log("找到了直播间提示");
                        return true;  // 找到直播间提示,说明是直播
                    }
                }
            }
            return false;  // 没找到直播间提示
        }

        // 先检查是否是普通视频
        if (checkCommentText()) {
            return false;  // 是普通视频,返回not live
        }
        // 如果不是普通视频,再检查是否是直播
        return checkLiveText();
    """)

    return is_live

def main():
    """主函数:循环执行评论获取和发送"""
    try:
        # 初始化爬取器(只执行一次)
        crawler = CommentCrawler()
        print("\n初始化完成,开始循环执行...")
        
        # 初始化循环计数和上一次标题
        runacount = 0
        last_title = ""
        # 添加连续未获取评论计数
        no_comments_count = 0
        
        # 模拟按X键
        crawler._simulate_x_key()
        time.sleep(2)  # 等待按键响应

        while True:  # 添加循环,用于重新检测
            # 检查是否为直播页面
            is_live_page = check_is_live(crawler)
            if is_live_page:
                print("检测到直播页面,开始处理直播...")
                crawler._handle_live_video()
                print("直播处理完成,继续检测当前视频是否是直播...")
                continue  # 继续循环,重新检测
            else:
                print("这是普通视频,继续正常处理...")
                break  # 如果是普通视频,退出循环继续后续处理

        # 循环执行
        while True:
            try:
                # 更新循环计数
                runacount += 1
                print(f"\n当前是第 {runacount} 次循环")
                print("="*50)
                print("开始新一轮评论处理")
                print("="*50)

                while True:  # 添加循环,用于重新检测
                    # 检查是否为直播页面
                    is_live_page = check_is_live(crawler)
                    if is_live_page:
                        print("检测到直播页面,开始处理直播...")
                        crawler._handle_live_video()
                        print("直播处理完成,继续检测当前视频是否是直播...")
                        continue  # 继续循环,重新检测
                    else:
                        print("这是普通视频,继续正常处理...")
                        break  # 如果是普通视频,退出循环继续后续处理

                # 获取评论和标题
                comments = crawler.get_comments_by_xpath()
                current_title = crawler.get_video_title()
                
                # 检查标题是否与上次相同
                if current_title == last_title:
                    print("检测到重复标题,设置为空")
                    title = ""
                else:
                    title = current_title
                    last_title = current_title  # 更新上次标题
                
                if comments:
                    # 重置未获取评论计数
                    no_comments_count = 0
                    # 格式化输出
                    formatted_text = crawler.format_output(title, comments)
                    
                    # 发送到API并自动评论
                    crawler.send_to_api(formatted_text)
                else:
                    print("\n未获取到有效评论")
                    no_comments_count += 1
                    
                    # 检查是否连续两次未获取到评论
                    if no_comments_count >= 5:
                        print("\n警告:连续5次未获取到评论!")
                        print("正在重新打开抖音首页...")
                        crawler.driver.get("https://www.douyin.com/")
                        time.sleep(3)  # 等待页面加载
                        # 模拟按X键
                        crawler._simulate_x_key()
                        time.sleep(2)  # 等待按键响应
                        print("已重新打开抖音首页")
                        no_comments_count = 0  # 重置计数
                        continue  # 跳过后续操作,重新开始循环
                
                # 等待一段时间
                try:
                    with open('config.txt', 'r') as f:
                        wait_time = int(f.readline().strip())
                except:
                    wait_time = 5  # 如果读取失败则使用默认值
                
                print(f"\n等待{wait_time}秒后切换到下一个视频...")
                time.sleep(wait_time)

                # 切换到下一个视频
                crawler._handle_live_video()

                # 等待新视频加载
                print("\n等待新视频加载...")
                time.sleep(3)
                print("\n新视频加载后跳回检测起点...")
                
            except Exception as e:
                print(f"\n本轮处理出错: {str(e)}")
                print("等待10秒后继续下一轮...")
                time.sleep(10)
                continue
            
    except KeyboardInterrupt:
        print(f"\n\n检测到Ctrl+C,正在退出程序...")
        print(f"本次共执行了 {runacount} 次循环")
    except Exception as e:
        print(f"\n程序发生错误: {str(e)}")
        print(f"本次共执行了 {runacount} 次循环")
    finally:
        try:
            # 清理资源
            crawler.close()
        except:
            pass
        print("\n程序已退出")

if __name__ == "__main__":
    main() 
from douyinrespond import DouyinBrowser
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import time
import re
import requests
import sys
from inputdef import CommentInput
import os


class CommentCrawler:
    def __init__(self):
        """初始化评论爬取器"""
        self.browser = DouyinBrowser.get_instance()
        self.driver = None
        self.initialize()
        
        # 获取项目根目录路径
        self.root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

    def initialize(self):
        """初始化浏览器和驱动"""
        if not self.browser.is_initialized:
            self.browser.initialize()
        self.driver = self.browser.driver

        # 等待页面加载完成
        time.sleep(2)

    def _simulate_x_key(self):
        """模拟按X键"""
        try:
            if not self.driver:
                return

            # 等待页面加载完成
            time.sleep(2)

            # 确保焦点在页面上
            body = self.driver.find_element(By.TAG_NAME, "body")
            body.click()

            # 模拟按X键
            ActionChains(self.driver).send_keys('x').perform()
            time.sleep(2)  # 等待按键响应
        except Exception as e:
            print(f"模拟按X键失败: {str(e)}")

    def _filter_comment(self, text):
        """过滤评论内容"""
        # 去除首尾空白
        text = text.strip()

        # 如果是空文本,返回None
        if not text:
            return None

        # 需要过滤的关键词
        filter_words = ['作者回复过', '回复', '展开', '收起', '复制链接']

        # 如果文本包含任何过滤关键词,返回None
        if any(word in text for word in filter_words):
            return None

        # 过滤纯数字(包括带空格的)
        if re.match(r'^\s*\d+\s*$', text):
            return None

        # 过滤日期格式
        date_patterns = [
            r'\d{1,2}-\d{1,2}',  # 匹配 MM-DD
            r'\d{4}-\d{1,2}-\d{1,2}',  # 匹配 YYYY-MM-DD
            r'\d{1,2}月\d{1,2}日',  # 匹配 MM月DD日
            r'\d+天前',  # 匹配 X天前
            r'\d+分钟前',  # 匹配 X分钟前
            r'\d+小时前'  # 匹配 X小时前
        ]
        if any(re.match(pattern, text) for pattern in date_patterns):
            return None

        # 计算中文字符数量
        chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
        if len(chinese_chars) < 10:
            return None

        return text

    def get_video_title(self):
        """获取视频标题"""
        try:
            # 等待页面加载完成
            time.sleep(2)

            # 使用JavaScript获取当前视频标题
            title = self.driver.execute_script("""
                // 查找所有可能的标题元素
                function findTitle() {
                    // 尝试多种选择器
                    let selectors = [
                        '.video-info-detail .title.notBideoTags span.HGWtaPTo',  // 主标题
                        '.video-info-detail .title span.arnSiSbK',  // 备选标题
                        '.video-info-detail .title span',  // 通用标题
                        '.video-info-detail .title',  // 标题容器
                        '.video-info-detail .desc'  // 描述文本
                    ];

                    // 遍历选择器
                    for (let selector of selectors) {
                        let element = document.querySelector(selector);
                        if (element && element.textContent.trim()) {
                            return element.textContent.trim();
                        }
                    }

                    return null;
                }

                // 等待标题加载
                let maxAttempts = 5;
                let attempt = 0;
                let title = null;

                while (attempt < maxAttempts) {
                    title = findTitle();
                    if (title) break;
                    attempt++;
                }

                return title;
            """)
            print('原始标题内容:' + title)
            if title:
                # 清理标题文本
                title = re.sub(r'#[\w\u4e00-\u9fff]+', '', title)  # 移除话题标签
                title = re.sub(r'@[\w\u4e00-\u9fff]+', '', title)  # 移除@标签
                title = title.strip()

                # print(f"\n获取到视频标题: {title}")
                return title
            else:
                # print("\n未找到视频标题")
                return "未知标题"

        except Exception as e:
            print(f"获取视频标题失败: {str(e)}")
            return "获取标题失败"

    def _handle_live_video(self):
        """切换到下一个视频"""
        try:
           
            
            # 模拟按下方向键
            ActionChains(self.driver).send_keys(Keys.ARROW_DOWN).perform()
            time.sleep(5)  # 等待页面切换
            print("已切换到下一个视频")
             # 模拟按空格键
            ActionChains(self.driver).send_keys(Keys.SPACE).perform()
            time.sleep(0.5)  # 等待空格键响应

        except Exception as e:
            print(f"切换视频失败: {str(e)}")

    # def get_comments(self, url=None, max_comments=50):
    #     """获取评论内容"""
    #     try:
    #         if url:
    #             self.driver.get(url)
    #             time.sleep(3)
    #
    #         print("\n开始获取评论...")
    #         comments = []
    #         last_height = self.driver.execute_script("return document.documentElement.scrollHeight")
    #         no_new_comments_count = 0
    #
    #         while len(comments) < max_comments and no_new_comments_count < 3:
    #             initial_comment_count = len(comments)
    #
    #             # 查找所有回复元素
    #             reply_elements = WebDriverWait(self.driver, 10).until(
    #                 EC.presence_of_all_elements_located((By.XPATH, "//span[text()='回复']"))
    #             )
    #
    #             # 获取每个回复元素的上方评论
    #             for reply in reply_elements:
    #                 try:
    #                     comment_element = reply.find_element(By.XPATH,
    #                                                          "./ancestor::div[contains(@class, 'comment')]//span[not(text()='回复')]")
    #                     comment_text = self._filter_comment(comment_element.text)
    #                     if comment_text and comment_text not in comments:
    #                         comments.append(comment_text)
    #                         print(f"找到评论 [{len(comments)}]: {comment_text}")
    #                 except:
    #                     continue
    #
    #             # 滚动到页面底部
    #             self.driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
    #             time.sleep(2)
    #
    #             # 检查是否有新评论
    #             if len(comments) == initial_comment_count:
    #                 no_new_comments_count += 1
    #             else:
    #                 no_new_comments_count = 0
    #
    #             # 检查是否到达底部
    #             new_height = self.driver.execute_script("return document.documentElement.scrollHeight")
    #             if new_height == last_height:
    #                 no_new_comments_count += 1
    #             last_height = new_height
    #
    #         return comments
    #
    #     except Exception as e:
    #         print(f"获取评论失败: {str(e)}")
    #         return []

    def get_comments_by_xpath(self, max_comments=50):
        """使用XPath精确定位评论"""
        try:
            # 首先获取视频标题
            time.sleep(3)
            title = self.get_video_title()
            if title:
                print(f"\n视频标题: {title}")
            else:
                print("\n未能获取到视频标题")

            print("\n开始获取评论...")
            comments = []
            last_height = self.driver.execute_script("return document.documentElement.scrollHeight")
            no_new_comments_count = 0

            while len(comments) < max_comments and no_new_comments_count < 3:
                initial_comment_count = len(comments)

                comment_elements = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_all_elements_located((
                        By.XPATH,
                        "//span[text()='回复']/ancestor::div[contains(@class, 'comment')]//span[not(contains(@class, 'action')) and not(text()='回复')]"
                    ))
                )

                for element in comment_elements:
                    comment_text = self._filter_comment(element.text)
                    if comment_text and comment_text not in comments:
                        comments.append(comment_text)
                        print(f"找到评论 [{len(comments)}]: {comment_text}")

                # 滚动到页面底部
                self.driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
                time.sleep(2)

                # 检查是否有新评论
                if len(comments) == initial_comment_count:
                    no_new_comments_count += 1
                else:
                    no_new_comments_count = 0

                # 检查是否到达底部
                new_height = self.driver.execute_script("return document.documentElement.scrollHeight")
                if new_height == last_height:
                    no_new_comments_count += 1
                last_height = new_height

            return comments

        except Exception as e:
            print(f"获取评论失败: {str(e)}")
            return []

    def close(self):
        """关闭爬取器"""
        try:
            self.browser.close()
            sys.exit(0)  # 确保脚本完全停止
        except:
            pass

    def format_output(self, title, comments):
        """格式化输出标题和评论"""
        try:
            # 处理标题
            title_text = f"标题:{title if title else '未获取到标题'}"

            # 处理评论
            comment_texts = []
            for i, comment in enumerate(comments, 1):
                comment_texts.append(f"{i}、{comment}")

            # 读取commandtodoubao.txt的内容
            command_text = ""
            try:
                with open('commandtodoubao.txt', 'r', encoding='utf-8') as f:
                    command_text = " " + f.read().strip()  # 用空格代替换行符
                    time.sleep(1)
            except Exception as e:
                print(f"\n读取commandtodoubao.txt失败: {str(e)}")

            # 组合评论文本
            comment_text = "评论:" + "".join(comment_texts)

            # 组合最终输出(标题 + 评论 + 命令)
            final_text = title_text + " " + comment_text
            if command_text:
                final_text += command_text

            return final_text

        except Exception as e:
            print(f"格式化输出失败: {str(e)}")
            return f"标题:{title if title else '未获取到标题'} 评论:获取失败"

    def send_to_api(self, formatted_text):
        """发送格式化文本到API并自动填写评论"""
        self.mycomment = ""
        try:
            # 发送POST请求到API
            response = requests.post(
                'http://localhost:5000/chat',
                json={'message': formatted_text}
            )

            # 获取并处理响应
            result = response.json()

            if result.get('success'):
                # 获取响应文本
                response_text = result.get('response', '无响应内容')
                
                # 检查是否包含关键词
                if '具体需求' in response_text or '消息发送失败' in response_text:
                    filtered_response = "666"
                else:
                    # 过滤响应内容
                    filtered_lines = []
                    for line in response_text.split('\n'):
                        # 跳过包含"回复完毕"的行
                        if '回复完毕' in line:
                            continue
                        
                        # 基本清理
                        line = line.strip()
                        
                        # 计算中文字符数量
                        chinese_chars = len([c for c in line if '\u4e00' <= c <= '\u9fff'])
                        
                        # 保留有效内容
                        if chinese_chars >= 4:
                            filtered_lines.append(line)

                    # 合并所有有效行
                    filtered_response = '\n'.join(filtered_lines)

                # 存储过滤后的响应
                self.mycomment = filtered_response

                # 打印API响应
                print("\nAPI响应:")
                print("-" * 50)
                print(filtered_response)
                print("-" * 50)

                # 检查当前URL
                current_url = self.driver.current_url
                if 'search' in current_url.lower():
                    print("\n检测到search在网址里,采用inputdef.py方法发送评论")
                    comment_input = CommentInput(self.driver)
                    if comment_input.post_comment(response_text):
                        print("评论已发送")
                    else:
                        print("评论发送失败")
                else:
                    print("\n使用默认方法发送评论")
                    # 原有的评论发送逻辑
                    # 尝试定位并操作输入框
                    result = self.driver.execute_script("""
                        // 查找输入框容器
                        let container = document.querySelector('.comment-input-inner-container');
                        if (!container) {
                            return { success: false, message: '未找到评论容器' };
                        }
                        
                        // 点击容器以激活输入
                        container.click();
                        
                        // 等待输入框出现并设置内容
                        return new Promise((resolve) => {
                            setTimeout(() => {
                                // 查找实际的输入框
                                let input = container.querySelector('.public-DraftStyleDefault-block') ||
                                          container.querySelector('[contenteditable="true"]') ||
                                          container.querySelector('.lFk180Rt');
                                          
                                if (!input) {
                                    resolve({ success: false, message: '未找到输入框元素' });
                                    return;
                                }
                                
                                // 设置文本
                                let text = arguments[0];
                                input.textContent = text;
                                
                                // 触发输入事件
                                input.dispatchEvent(new InputEvent('input', {
                                    bubbles: true,
                                    cancelable: true,
                                    inputType: 'insertText',
                                    data: text
                                }));
                                
                                // 等待内容设置完成
                                setTimeout(() => {
                                    resolve({ 
                                        success: true, 
                                        message: '内容已设置',
                                        text: input.textContent,
                                        element: input
                                    });
                                }, 500);
                            }, 500);
                        });
                    """, filtered_response)
                    
                    if result and result.get('success'):
                        print("\n评论内容设置成功")
                        
                        # 发送评论
                        send_result = self.driver.execute_script("""
                            let input = arguments[0];
                            
                            // 创建回车键事件
                            let enterEvent = new KeyboardEvent('keydown', {
                                key: 'Enter',
                                code: 'Enter',
                                keyCode: 13,
                                which: 13,
                                bubbles: true,
                                cancelable: true
                            });
                            
                            // 触发回车键事件
                            input.dispatchEvent(enterEvent);
                            
                            return true;
                        """, result['element'])
                        
                        if send_result:
                            print("\n评论已发送")
                            time.sleep(1)  # 等待发送完成
                        else:
                            print("\n发送评论失败")
                    else:
                        print(f"\n设置评论内容失败: {result.get('message', '未知错误')}")

            else:
                print(f"错误: {result.get('error', '未知错误')}")
            print("-" * 50)

        except requests.exceptions.ConnectionError:
            print("\nAPI连接失败: 请确保API服务正在运行 (http://localhost:5000)")
        except Exception as e:
            print(f"\nAPI调用失败: {str(e)}")


def main():
    try:
        crawler = CommentCrawler()

        video_url = input("请输入抖音视频链接(直接回车使用当前页面):").strip()

        if video_url:
            comments = crawler.get_comments_by_xpath()

        # 获取标题
        title = crawler.get_video_title()

        # 显示详细输出
        print("\n获取到的评论:")
        print("-" * 50)
        for i, comment in enumerate(comments, 1):
            print(f"{i}. {comment}")
        print("-" * 50)
        print(f"\n共获取到 {len(comments)} 条评论")

        # 格式化输出并发送到API
        formatted_text = crawler.format_output(title, comments)

        # 显示格式化输出
        print("\n格式化输出:")
        print(formatted_text)

        # 发送完整的格式化文本到API
        print("\n正在发送到API...")
        crawler.send_to_api(formatted_text)  # 发送包含命令文本的完整内容

    except Exception as e:
        print(f"运行出错: {str(e)}")
    finally:
        input("\n按回车键退出...")
        crawler.close()

# if __name__ == "__main__":
#     main()

后端API代码

from flask import Flask, request, jsonify
from douban_chat_client import DoubanChatClient
import threading
import time

app = Flask(__name__)

# 全局变量用于追踪服务状态
service_status = {
    "is_initialized": False,
    "init_lock": threading.Lock(),
    "last_activity": time.time()
}

def initialize_client():
    """初始化聊天客户端"""
    with service_status["init_lock"]:
        if not service_status["is_initialized"]:
            client = DoubanChatClient.get_instance()
            if client.initialize():
                service_status["is_initialized"] = True
                return True
        return service_status["is_initialized"]

@app.route('/chat', methods=['POST'])
def chat():
    """处理聊天请求"""
    try:
        # 检查请求数据
        data = request.get_json()
        if not data or 'message' not in data:
            return jsonify({
                'success': False,
                'error': '缺少消息内容'
            }), 400
            
        message = data['message']
        
        # 确保客户端已初始化
        if not service_status["is_initialized"]:
            if not initialize_client():
                return jsonify({
                    'success': False,
                    'error': '客户端初始化失败'
                }), 500
        
        # 发送消息并获取回复
        client = DoubanChatClient.get_instance()
        response = client.chat(message)
        
        # 更新最后活动时间
        service_status["last_activity"] = time.time()
        
        return jsonify({
            'success': True,
            'response': response
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

@app.route('/status', methods=['GET'])
def status():
    """获取服务状态"""
    return jsonify({
        'initialized': service_status["is_initialized"],
        'last_activity': service_status["last_activity"]
    })

def cleanup_service():
    """清理服务资源"""
    if service_status["is_initialized"]:
        try:
            client = DoubanChatClient.get_instance()
            client.close()
        except:
            pass
        finally:
            service_status["is_initialized"] = False

# 注册退出时的清理函数
import atexit
atexit.register(cleanup_service)

if __name__ == '__main__':
    try:
        # 启动时初始化客户端
        initialize_client()
        # 启动服务
        app.run(host='0.0.0.0', port=5000)
    finally:
        cleanup_service() 

se

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import time
import json
import os
import sys
import select
import random
import string
import pyperclip

# 根据操作系统导入不同的模块
if sys.platform != 'win32':
    import termios

class DouBanChatCrawler:
    def __init__(self):
        # 获取项目根目录路径
        root_dir = os.path.dirname(os.path.abspath(__file__))
        
        # 配置Chrome选项
        self.chrome_options = Options()
        
        # 设置Chrome二进制文件路径
        chrome_path = os.path.join(root_dir, "chrome-win64", "chrome.exe")
        if not os.path.exists(chrome_path):
            raise Exception(f"Chrome不存在: {chrome_path}")
        self.chrome_options.binary_location = chrome_path
        
        # 添加参数
        self.chrome_options.add_argument('--disable-gpu')
        self.chrome_options.add_argument('--disable-software-rasterizer')
        self.chrome_options.add_argument('--disable-dev-shm-usage')
        self.chrome_options.add_argument('--no-sandbox')
        self.chrome_options.add_argument('--ignore-certificate-errors')
        self.chrome_options.add_argument('--enable-unsafe-swiftshader')
        self.chrome_options.add_argument('--disable-web-security')
        self.chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        
        # 添加实验性选项
        self.chrome_options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
        self.chrome_options.add_experimental_option('useAutomationExtension', False)
        
        try:
            # 设置ChromeDriver路径
            chromedriver_path = os.path.join(root_dir, "chromedriver.exe")
            if not os.path.exists(chromedriver_path):
                raise Exception(f"ChromeDriver不存在: {chromedriver_path}")
            service = Service(chromedriver_path)
            
            # 初始化浏览器
            self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
            
            # 修改 window.navigator.webdriver 标记
            self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
                "source": """
                    Object.defineProperty(navigator, 'webdriver', {
                        get: () => undefined
                    })
                """
            })
            
            # 最大化窗口
            self.driver.maximize_window()
            
        except Exception as e:
            print(f"浏览器初始化失败: {str(e)}")
            raise

    def wait_for_network_idle(self, timeout=10):
        """等待网络请求完成"""
        time.sleep(timeout)  # 简单的等待方式

    def get_network_requests(self):
        """获取网络请求信息,优化过滤逻辑"""
        return self.driver.execute_script("""
            let items = [];
            try {
                let entries = performance.getEntries() || [];
                items = entries.filter(e => {
                    // 扩展过滤条件以包含更多相关接口
                    const apiPatterns = [
                        '/samantha/',  // 主要的API接口
                        '/alice/',     // 用户相关接口
                        '/chat/',      // 聊天相关接口
                        '/message/',   // 消息相关接口
                        '/monitor_browser/' // 监控相关接口
                    ];
                    
                    return apiPatterns.some(pattern => e.name.includes(pattern)) && 
                           e.entryType === 'resource' &&
                           (e.initiatorType === 'xmlhttprequest' || e.initiatorType === 'fetch');
                           
                }).map(e => ({
                    url: e.name,
                    method: e.initiatorType,
                    type: e.initiatorType,
                    duration: e.duration,
                    status: e.responseStatus,
                    timestamp: e.startTime,
                    size: e.transferSize || 0
                }));
                
                // 按时间戳排序
                items.sort((a, b) => a.timestamp - b.timestamp);
                
            } catch(err) {
                console.error('Error:', err);
            }
            return items;
        """)

    def generate_chat_id(self):
        """生成5位数字和字母的唯一对话标识"""
        chars = string.ascii_letters + string.digits
        return ''.join(random.choice(chars) for _ in range(5))

    def send_message(self, message):
        """发送消息到豆包对话框"""
        try:
            # 检查消息内容
            if message.strip() == "":
                print("消息内容为空,已取消发送")
                return False
            
            # 生成唯一对话标识
            chat_id = self.generate_chat_id()
            
            # 在消息后添加完成提示请求和唯一标识
            message_with_id = f"{message},回复完毕请告诉我本次回复完毕[{chat_id}]"
            
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    # 重置页面焦点
                    self.driver.execute_script("document.activeElement.blur();")
                    time.sleep(0.1)
                    
                    # 重新定位输入框
                    input_box = WebDriverWait(self.driver, 10).until(
                        EC.presence_of_element_located((By.CSS_SELECTOR, "textarea.semi-input-textarea"))
                    )
                    
                    # 确保输入框可交互
                    WebDriverWait(self.driver, 10).until(
                        EC.element_to_be_clickable((By.CSS_SELECTOR, "textarea.semi-input-textarea"))
                    )
                    
                    # 重置输入框状态
                    self.driver.execute_script("""
                        var input = arguments[0];
                        input.value = '';
                        input.blur();
                        setTimeout(() => {
                            input.focus();
                            input.click();
                        }, 100);
                    """, input_box)
                    time.sleep(0.2)
                    
                    # 再次检查输入框状态
                    input_box = WebDriverWait(self.driver, 10).until(
                        EC.presence_of_element_located((By.CSS_SELECTOR, "textarea.semi-input-textarea"))
                    )
                    
                    # 直接输入消息
                    input_box.clear()
                    input_box.send_keys(message_with_id)
                    time.sleep(0.1)
                    
                    # 发送消息
                    input_box.send_keys(Keys.ENTER)
                    time.sleep(0.2)
                    
                    # 验证消息是否发送成功
                    try:
                        current_value = input_box.get_attribute('value')
                        if not current_value.strip():
                            print(f"已发送消息: {message}")
                            
                            # 发送成功后重置输入框状态
                            self.driver.execute_script("""
                                var input = arguments[0];
                                input.value = '';
                                input.blur();
                            """, input_box)
                            
                            return chat_id
                    except:
                        # 如果无法获取值,可能是因为消息已发送
                        print(f"已发送消息: {message}")
                        return chat_id
                    
                    if attempt < max_retries - 1:
                        print(f"重试发送消息 (尝试 {attempt + 2}/{max_retries})")
                        time.sleep(0.5)
                        continue
                    
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"发送失败,正在重试 ({attempt + 2}/{max_retries})")
                        time.sleep(0.5)
                        continue
                    else:
                        raise e
            
            return None
            
        except Exception as e:
            print(f"发送消息失败: {str(e)}")
            import traceback
            print(traceback.format_exc())
            return None

    def wait_for_response(self, timeout=30, chat_id=None):
        """等待并获取豆包的回复"""
        try:
            start_time = time.time()
            last_check_time = time.time()
            
            # 获取初始状态
            initial_elements = self.driver.find_elements(
                By.CSS_SELECTOR, 
                "div.paragraph-JOTKXA, li, pre.container-_HmLba"
            )
            initial_count = len(initial_elements)
            last_response_text = ""
            
            print(f"等待新回复... (当前元素数: {initial_count})")
            print("按回车键可以中断等待")
            
            import threading
            interrupt_wait = threading.Event()
            
            def check_input():
                try:
                    # 使用 select 来检测输入,避免阻塞
                    while not interrupt_wait.is_set():
                        if sys.platform == 'win32':
                            import msvcrt
                            if msvcrt.kbhit():
                                if msvcrt.getch() in [b'\r', b'\n']:
                                    interrupt_wait.set()
                                    break
                        else:
                            if select.select([sys.stdin], [], [], 0.1)[0]:
                                if sys.stdin.read(1) in ['\n', '\r']:
                                    interrupt_wait.set()
                                    break
                        time.sleep(0.1)
                except:
                    pass
            
            # 启动输入监听线程
            input_thread = threading.Thread(target=check_input)
            input_thread.daemon = True
            input_thread.start()
            
            # 等待回复的主循环
            while time.time() - start_time < timeout and not interrupt_wait.is_set():
                try:
                    current_time = time.time()
                    
                    # 获取所有回复元素
                    current_elements = self.driver.find_elements(
                        By.CSS_SELECTOR,
                        "div.paragraph-JOTKXA, li, pre.container-_HmLba"
                    )
                    
                    # 获取当前所有回复文本
                    current_response_text = ""
                    code_block_complete = False
                    response_complete = False
                    
                    for element in current_elements[initial_count:]:
                        if element.tag_name == 'pre':
                            try:
                                # 查找代码块中的复制按钮
                                copy_button = element.find_element(
                                    By.CSS_SELECTOR, 
                                    "button[data-testid='message_action_copy']"
                                )
                                # 点击复制按钮
                                copy_button.click()
                                time.sleep(0.1)
                                
                                # 从剪贴板获取代码
                                text = pyperclip.paste()
                                
                                if text.strip():
                                    # 检查代码块中是否包含对话标识,如果有则移除
                                    if chat_id:
                                        text = text.replace(f"[{chat_id}]", "")
                                    current_response_text += text.strip() + "\n"
                                    # 检查代码块是否完整
                                    if "def" in text and "if __name__ == \"__main__\":" in text:
                                        code_block_complete = True
                            except Exception as e:
                                # 如果复制按钮不可用,回退到原来的方法
                                code_element = element.find_element(By.CSS_SELECTOR, "code")
                                text = code_element.text
                                if text.strip():
                                    current_response_text += text.strip() + "\n"
                        else:
                            text = element.text.strip()
                            if text:
                                # 检查是否包含完成标记
                                if chat_id and f"回复完毕[{chat_id}]" in text:
                                    response_complete = True
                                
                                # 移除标记和标识
                                if chat_id:
                                    text = text.replace(f"回复完毕[{chat_id}]", "")
                                    text = text.replace(f"[{chat_id}]", "")
                                text = text.replace(",本次回复完毕", "").replace("本次回复完毕", "")
                                text = text.strip()
                                
                                if text:
                                    current_response_text += text + "\n"
                    
                    # 检查是否有新内容
                    if current_response_text != last_response_text:
                        last_check_time = current_time
                        last_response_text = current_response_text
                    
                    # 检查回复是否完成
                    if chat_id:
                        response_complete = f"回复完毕[{chat_id}]" in current_response_text
                    else:
                        response_complete = "回复完毕" in current_response_text
                    
                    # 如果有回复内容,且满足完成条件
                    if current_response_text and (
                        response_complete or
                        (code_block_complete and current_time - last_check_time > 3) or
                        (current_time - last_check_time > 5)
                    ):
                        # 处理回复文本
                        response_parts = []
                        for element in current_elements[initial_count:]:
                            if element.tag_name == 'pre':
                                try:
                                    # 查找并点击复制按钮
                                    copy_button = element.find_element(
                                        By.CSS_SELECTOR, 
                                        "button[data-testid='message_action_copy']"
                                    )
                                    copy_button.click()
                                    time.sleep(0.1)
                                    
                                    # 从剪贴板获取代码并清理
                                    text = pyperclip.paste()
                                    if text.strip():
                                        # 移除代码中的标识
                                        if chat_id:
                                            text = text.replace(f"[{chat_id}]", "")
                                        response_parts.append("\n```python\n" + text.strip() + "\n```\n")
                                except:
                                    # 如果复制按钮不可用,回退到原来的方法
                                    code_element = element.find_element(By.CSS_SELECTOR, "code")
                                    text = code_element.text
                                    if text.strip():
                                        response_parts.append("\n```python\n" + text.strip() + "\n```\n")
                            else:
                                text = element.text.strip()
                                if text:
                                    # 移除所有标记和标识
                                    if chat_id:
                                        text = text.replace(f"回复完毕[{chat_id}]", "")
                                        text = text.replace(f"[{chat_id}]", "")
                                    text = text.replace(",本次回复完毕", "").replace("本次回复完毕", "")
                                    text = text.strip()
                                    if text:
                                        if element.tag_name == 'li':
                                            response_parts.append(f"• {text}")
                                        else:
                                            response_parts.append(text)
                        
                        if response_parts:
                            complete_response = "\n".join(response_parts)
                            if response_complete:
                                print(f"收到完整回复 (元素数: {len(current_elements) - initial_count})")
                            elif code_block_complete:
                                print("代码块已完成")
                            else:
                                print("回复似乎已完成")
                            return complete_response
                    
                    time.sleep(0.2)
                    
                except Exception as e:
                    print(f"获取回复时出错: {str(e)}")
                    time.sleep(0.2)
                    continue
            
            # 等待输入线程完成
            interrupt_wait.set()  # 确保输入线程退出
            input_thread.join(0.1)
            
            # 清空输入缓冲区
            if sys.platform == 'win32':
                import msvcrt
                while msvcrt.kbhit():
                    msvcrt.getch()
            elif hasattr(sys, 'stdin'):  # 对于其他系统
                try:
                    import termios
                    termios.tcflush(sys.stdin, termios.TCIOFLUSH)
                except (ImportError, AttributeError):
                    pass  # 如果无法使用 termios,就跳过清理
            
            if interrupt_wait.is_set():
                return "等待被中断"
            return "等待回复超时"
            
        except Exception as e:
            print(f"等待回复失败: {str(e)}")
            import traceback
            print(traceback.format_exc())
            return None

    def chat_session(self):
        """启动交互式聊天会话"""
        print("\n=== 豆包聊天会话已启动 ===")
        print("输入 'quit' 退出会话")
        
        # 用于跟踪上一次响应的状态
        last_response_time = 0
        
        while True:
            try:
                # 确保在获取新输入前有足够的冷却时间
                current_time = time.time()
                if current_time - last_response_time < 0.5:  # 减少冷却时间
                    time.sleep(0.5)
                
                # 使用 input() 获取用户输入
                raw_message = input("\n你: ")
                
                # 处理消息内容
                message = raw_message.strip()
                
                # 检查是否为退出命令
                if message.lower() == 'quit':
                    break
                
                # 检查消息是否为空或太短
                if not message:
                    print("请输入有效的消息内容")
                    continue
                    
                if len(message) < 2:  # 允许单个字符的消息,但给出提示
                    print("提示:消息太短可能无法获得好的回复,是否继续?(y/n)")
                    confirm = input().strip().lower()
                    if confirm != 'y':
                        continue
                
                # 清除可能存在的旧输入框内容
                try:
                    input_box = self.driver.find_element(By.CSS_SELECTOR, "textarea.semi-input-textarea")
                    if input_box:
                        self.driver.execute_script("arguments[0].value = '';", input_box)
                except:
                    pass
                
                # 发送消息并获取对话标识
                chat_id = self.send_message(message)
                if chat_id:
                    print("\n豆包正在回复...")
                    response = self.wait_for_response(chat_id=chat_id)
                    if response:
                        print(f"\n豆包: {response}")
                        last_response_time = time.time()
                        time.sleep(0.5)
                    else:
                        print("未能获取到回复")
                        last_response_time = time.time()
                
            except Exception as e:
                print(f"聊天过程出错: {str(e)}")
                continue

    def save_login_state(self):
        """保存完整的登录状态到 cookie.txt"""
        try:
            # 获取 cookies
            cookies = self.driver.get_cookies()
            
            # 获取 localStorage
            local_storage = self.driver.execute_script("""
                let items = {};
                try {
                    for (let i = 0; i < localStorage.length; i++) {
                        const key = localStorage.key(i);
                        items[key] = localStorage.getItem(key);
                    }
                } catch(err) {
                    console.error('Error:', err);
                }
                return items;
            """)
            
            # 保存登录状态
            login_state = {
                'cookies': cookies,
                'localStorage': local_storage,
                'timestamp': time.time()
            }
            
            # 保存到 cookie.txt
            with open('cookie.txt', 'w', encoding='utf-8') as f:
                json.dump(login_state, f, ensure_ascii=False, indent=2)
            
            print("已保存登录状态到 cookie.txt")
            return True
            
        except Exception as e:
            print(f"保存登录状态失败: {str(e)}")
            return False

    def load_login_state(self):
        """从 cookie.txt 加载登录状态"""
        try:
            if not os.path.exists('cookie.txt'):
                print("未找到登录状态文件,需要首次登录")
                return False
            
            with open('cookie.txt', 'r', encoding='utf-8') as f:
                login_state = json.load(f)
            
            # 检查登录状态是否过期(7天)
            if time.time() - login_state.get('timestamp', 0) > 7 * 24 * 3600:
                print("登录状态已过期,需要重新登录")
                return False
            
            # 先访问网站
            self.driver.get("https://www.doubao.com")
            time.sleep(1)
            
            # 添加 cookies
            for cookie in login_state.get('cookies', []):
                try:
                    # 移除可能导致问题的属性
                    if 'expiry' in cookie:
                        del cookie['expiry']
                    self.driver.add_cookie(cookie)
                except Exception as e:
                    print(f"添加cookie失败: {str(e)}")
                    continue
            
            # 添加 localStorage
            for key, value in login_state.get('localStorage', {}).items():
                try:
                    self.driver.execute_script(f"window.localStorage.setItem('{key}', '{value}')")
                except Exception as e:
                    print(f"添加localStorage失败: {str(e)}")
                    continue
            
            print("已从 cookie.txt 加载登录状态")
            return True
            
        except Exception as e:
            print(f"加载登录状态失败: {str(e)}")
            return False

    def check_login_status(self):
        """检查登录状态"""
        try:
            # 等待页面加载
            WebDriverWait(self.driver, 5).until(
                EC.presence_of_element_located((By.TAG_NAME, "body"))
            )
            
            # 检查是否存在登录相关元素
            login_indicators = self.driver.execute_script("""
                return {
                    'hasTextarea': !!document.querySelector('textarea.semi-input-textarea'),
                    'hasLoginButton': !!document.querySelector('button[data-testid="login-button"]'),
                    'hasUserInfo': !!document.querySelector('div[data-testid="user-info"]')
                }
            """)
            
            return login_indicators.get('hasTextarea', False) and not login_indicators.get('hasLoginButton', True)
            
        except Exception as e:
            print(f"检查登录状态失败: {str(e)}")
            return False

    def get_chat_info(self):
        try:
            print("正在打开豆包网站...")
            
            # 尝试从 cookie.txt 加载登录状态
            if self.load_login_state():
                print("已加载登录状态,尝试自动登录...")
                self.driver.get("https://www.doubao.com/chat/")
                time.sleep(3)
                
                # 检查登录状态
                if self.check_login_status():
                    print("自动登录成功")
                else:
                    print("自动登录失败,需要重新登录")
                    print("\n请在打开的浏览器中完成以下操作:")
                    print("1. 登录豆包账号")
                    print("2. 进入任意对话")
                    print("3. 等待页面加载完成")
                    print("4. 按回车键继续...")
                    input()
                    # 保存新的登录状态到 cookie.txt
                    self.save_login_state()
            else:
                self.driver.get("https://www.doubao.com/chat/")
                print("\n首次登录,请完成以下操作:")
                print("1. 登录豆包账号")
                print("2. 进入任意对话")
                print("3. 等待页面加载完成")
                print("4. 按回车键继续...")
                input()
                # 保存登录状态到 cookie.txt
                self.save_login_state()
            
            # 启动聊天会话
            self.chat_session()
            
            try:
                # 获取cookies
                cookies = self.driver.get_cookies()
                cookie_dict = {cookie['name']: cookie['value'] for cookie in cookies}
                
                # 获取localStorage
                local_storage = self.driver.execute_script("""
                    let items = {};
                    try {
                        for (let i = 0; i < localStorage.length; i++) {
                            const key = localStorage.key(i);
                            items[key] = localStorage.getItem(key);
                        }
                    } catch(err) {
                        console.error('Error:', err);
                    }
                    return items;
                """)
                
                # 获取网络请求
                network_requests = self.get_network_requests()
                
                # 保存信息到文件
                data = {
                    'cookies': cookie_dict,
                    'local_storage': local_storage,
                    'network_requests': network_requests
                }
                
                with open('chat_info.json', 'w', encoding='utf-8') as f:
                    json.dump(data, f, ensure_ascii=False, indent=2)
                
                print("\n成功!所有信息已保存到 chat_info.json 文件中")
                print(f"发现的网络请求数量: {len(network_requests)}")
                print("\n主要API接口:")
                
                # 按类型分组显示API接口
                api_groups = {
                    'Samantha APIs': [req for req in network_requests if '/samantha/' in req['url']],
                    'Alice APIs': [req for req in network_requests if '/alice/' in req['url']],
                    'Chat APIs': [req for req in network_requests if '/chat/' in req['url']],
                    'Message APIs': [req for req in network_requests if '/message/' in req['url']],
                    'Monitor APIs': [req for req in network_requests if '/monitor/' in req['url']]
                }
                
                for group_name, requests in api_groups.items():
                    if requests:
                        print(f"\n{group_name}:")
                        for req in requests:
                            print(f"- {req['method'].upper()}: {req['url']}")
                            print(f"  Duration: {req['duration']:.2f}ms, Size: {req['size']} bytes")
                
            except Exception as e:
                print(f"收集信息时发生错误: {str(e)}")
                
        except Exception as e:
            print(f"发生错误: {str(e)}")
        finally:
            input("\n按回车键关闭浏览器...")
            try:
                self.driver.quit()
            except:
                pass

def main():
    try:
        crawler = DouBanChatCrawler()
        crawler.get_chat_info()
    except Exception as e:
        print(f"程序运行失败: {str(e)}")
        input("按回车键退出...")

if __name__ == "__main__":
    main() 

service`

from flask import Flask, request, jsonify
from douban_chat_client import DoubanChatClient
import threading
import time

app = Flask(__name__)

# 全局变量用于追踪服务状态
service_status = {
    "is_initialized": False,
    "init_lock": threading.Lock(),
    "last_activity": time.time()
}

def initialize_client():
    """初始化聊天客户端"""
    with service_status["init_lock"]:
        if not service_status["is_initialized"]:
            client = DoubanChatClient.get_instance()
            if client.initialize():
                service_status["is_initialized"] = True
                return True
        return service_status["is_initialized"]

@app.route('/chat', methods=['POST'])
def chat():
    """处理聊天请求"""
    try:
        # 检查请求数据
        data = request.get_json()
        if not data or 'message' not in data:
            return jsonify({
                'success': False,
                'error': '缺少消息内容'
            }), 400
            
        message = data['message']
        
        # 确保客户端已初始化
        if not service_status["is_initialized"]:
            if not initialize_client():
                return jsonify({
                    'success': False,
                    'error': '客户端初始化失败'
                }), 500
        
        # 发送消息并获取回复
        client = DoubanChatClient.get_instance()
        response = client.chat(message)
        
        # 更新最后活动时间
        service_status["last_activity"] = time.time()
        
        return jsonify({
            'success': True,
            'response': response
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

@app.route('/status', methods=['GET'])
def status():
    """获取服务状态"""
    return jsonify({
        'initialized': service_status["is_initialized"],
        'last_activity': service_status["last_activity"]
    })

def cleanup_service():
    """清理服务资源"""
    if service_status["is_initialized"]:
        try:
            client = DoubanChatClient.get_instance()
            client.close()
        except:
            pass
        finally:
            service_status["is_initialized"] = False

# 注册退出时的清理函数
import atexit
atexit.register(cleanup_service)

if __name__ == '__main__':
    try:
        # 启动时初始化客户端
        initialize_client()
        # 启动服务
        app.run(host='0.0.0.0', port=5000)
    finally:
        cleanup_service() 

程序还在测试优化bug,正在打包程序,觉得对你有帮助的朋友可以到风车自主获取打包好的程序