打造出色的数据科学作品集:全面指南
了解如何创建一个有影响力的数据科学作品集,展示你的技能并吸引潜在雇主
https://ydong029.medium.com/?source=post_page---byline--6dabd0ec7059--------------------------------https://towardsdatascience.com/?source=post_page---byline--6dabd0ec7059-------------------------------- Yu Dong
·发表于Towards Data Science ·9 分钟阅读·2024 年 7 月 10 日
–
背景
我在 2018 年刚从学校毕业时开始了我的数据科学作品集网站。毫不奇怪,我创建它的目的就是希望它能帮助我的求职和职业发展。六年后,我为自己的进步和持续更新感到自豪。我的作品集已经成为我职业历程、项目和见解的丰富宝库。
拥有一个数据科学作品集可以帮助你记录学习成果、反思职业生涯,并与数据科学社区互动。它在求职过程中也非常宝贵,比传统的简历更深入地展示了你的技能和项目。
在这篇文章中,我将讨论如何建立数据科学作品集、其内容策略以及什么样的作品集是好的。如果你曾考虑过这个想法,但不知道从哪里开始,那么这篇文章就是为你准备的。
建立你的作品集
有多种方式可以建立你数据科学作品集。以下是五种最常见的选择及其优缺点。
从零开始构建产品经理的用户洞察收集工具
向付费的客户洞察中心告别吧!学习如何将五个开源 AI 模型结合起来,自动化从用户访谈中收集洞察。
https://medium.com/@zaninihugo?source=post_page---byline--a6459dc1c3f6--------------------------------https://towardsdatascience.com/?source=post_page---byline--a6459dc1c3f6-------------------------------- Hugo Zanini
·发表于 Towards Data Science ·阅读时间 11 分钟 ·2024 年 8 月 12 日
–
图片来自 Daria Nepriakhina 于 Unsplash
作为一个数据平台的技术产品经理,我经常进行用户访谈,以识别与数据开发过程相关的挑战。
然而,当我与用户一起探索一个新的问题领域时,我很容易被与组织内各个个体的众多对话所压倒。
随着时间的推移,我采用了一种系统化的方法来应对这一挑战。我专注于在每次访谈中做详细的笔记,然后再回顾这些笔记。这让我能够巩固理解并识别出用户讨论的模式。
然而,在做笔记和积极倾听之间分心往往会影响我对话的质量。我注意到,当别人为我做笔记时,我的访谈效果显著提升。这使我能够完全投入到与受访者的交流中,专注于他们所说的内容,从而进行更有意义和富有成效的互动。
为了提高效率,我从在会议中做笔记转变为每当有功能时,就进行录音并转录。这大大减少了我需要进行的访谈数量,因为我可以从较少的对话中获得更多的洞察。然而,这一变化要求我花时间回顾转录内容并观看视频。下图展示了我在绘制新的产品开发机会时所遵循的简化流程。
由于会议转录的体积和分类用户洞察的困难,整合与分析变得具有挑战性。
此外,现有的会议转录工具仅限于英语,而我大多数的对话是葡萄牙语。因此,我决定在市场上寻找一个能够帮助我应对这些挑战的解决方案。
我找到的解决了大多数痛点的工具是 Dovetail、Marvin、Condens 和 Reduct。它们定位自己为客户洞察中心,主要产品通常是客户访谈的转录。
基本上,你可以在这里上传访谈视频,并获得一份转录,指明每一句话的发言人,并在每句话上附有指向原视频的超链接。在文本上,你可以添加高亮、标签和评论,还可以请求对话的总结。这些功能将解决我的问题;然而,这些工具价格昂贵,特别是考虑到我住在巴西,而他们收取美元费用。
这些工具没有任何革命性创新,所以我决定实现一个开源替代方案,可以在 Colab 笔记本上免费运行。
需求
作为一名优秀的产品经理,我做的第一件事是根据用户(我的)需求识别产品的必备功能。以下是我所列出的高层次需求:
成本与可访问性
-
免费;
-
无需编程经验即可使用;
数据隐私与安全
- 保持数据私密——与外部服务无连接;
性能
-
执行速度必须快于视频时长;
-
高精度的多语言转录;
功能
-
发言人识别;
-
易于在转录内容中搜索;
-
轻松突出显示转录内容;
-
容易创建正在进行的研究的存储库;
集成
-
与我公司现有工具(Google Workspace)集成;
-
集成 LLM 模型以接收任务提示,基于转录内容执行;
解决方案
根据需求,我设计了我的解决方案应具备的功能:
然后,我设计了预期的输入和用户界面来定义这些功能:
用户将把他们的面试上传到 YouTube,并设置为未列出的视频,然后创建一个 Google Drive 文件夹来存储转录文件。接着,他们可以访问 Google Colab 笔记本,提供面试的基本信息、粘贴视频网址,并可以选择为大型语言模型(LLM)定义任务。输出结果将是 Google Docs,用户可以在其中整合洞察。
以下是产品架构。这个解决方案结合了五个不同的机器学习模型和一些 Python 库。接下来的部分将提供每个构建模块的概述;但是,如果你更感兴趣的是尝试这个产品,请跳到“I got it”部分。
图片由作者提供
面试设置与视频上传
为了创建一个用户友好的界面,用于设置面试并提供视频链接,我使用了Google Colab 的表单功能。它允许创建文本框、滑动条、下拉框等。代码被隐藏在表单后面,非常适合非技术用户使用。
面试选择表格 — 图片由作者提供
音频下载与转换
我使用了 yt-dlp 库来仅下载 YouTube 视频的音频,并将其转换为 mp3 格式。这个工具非常简单易用,你可以在这里查看它的文档。
图片由yt-dlp提供
音频转录
为了转录会议内容,我使用了Open AI 的 Whisper。这是一个开源的语音识别模型,训练数据来自超过 68 万小时的多语言数据。
该模型运行非常迅速;一段一小时的音频大约需要 6 分钟就能在 16GB T4 GPU(Google Colab 提供的免费 GPU)上完成转录,并且它支持99 种不同语言。
由于隐私是这个解决方案的一个要求,模型的权重被下载,所有推理操作都在 Colab 实例内部进行。我还在笔记本中添加了一个模型选择表单,用户可以根据他们所需要的精度选择不同的模型。
图片由作者提供
讲话者识别
讲话者识别是通过一种叫做讲话者分段技术(Speakers Diarization)来完成的。其原理是将音频划分为不同的语音段,每个段落对应一个特定的讲话者。通过这种方式,我们可以识别出谁在什么时候发言。
由于从 YouTube 上传的视频没有元数据来标识谁在说话,讲话者将被分为讲话者 1、讲话者 2 等……稍后,用户可以在 Google Docs 中查找并替换这些名字,以添加讲话者的身份标识。
图片由作者提供
对于说话者分离,我们将使用一个名为多尺度说话者分离解码器(MSDD)的模型,该模型由 Nvidia 研究人员开发。这是一种先进的说话者分离方法,利用多尺度分析和动态加权来实现高精度和灵活性。
该模型以在识别并正确分类多个讲者交替发言的时刻而闻名——这是访谈中常见的现象。
这个模型可以通过NVIDIA NeMo 框架使用。它让我能够获取 MSDD 检查点,并直接在 colab 笔记本中运行说话者分离,只需几行代码。
查看 MSDD 的说话者分离结果时,我注意到标点符号很差,长句子中某些像“嗯”和“是的”的插入语被误认为是说话者的中断——这使得文本难以阅读。
因此,我决定在管道中添加一个标点模型,以提高转录文本的可读性并便于人工分析。于是,我从Hugging Face 获取了 punctuate-all 模型,这是一个非常精确且快速的解决方案,支持以下语言:英语、德语、法语、西班牙语、保加利亚语、意大利语、波兰语、荷兰语、捷克语、葡萄牙语、斯洛伐克语和斯洛文尼亚语。
视频同步
从我所对比的行业解决方案来看,一个强烈的需求是每个短语都应该与采访中讲述的时刻相关联。
Whisper 转录包含指示短语说出时间戳的元数据;然而,这些元数据并不十分精确。
因此,我使用了一个名为Wav2Vec2的模型来更准确地进行这种匹配。基本上,该解决方案是一个神经网络,旨在学习音频表示并执行语音识别对齐。该过程包括在音频信号中找到每个段落说出的确切时间戳,并相应地对齐文本。
通过准确地匹配转录文本与时间戳,我通过简单的 Python 代码创建了超链接,指向视频中开始说出短语的时刻。
LLM 模型
这个管道步骤有一个准备好在本地运行并分析文本的大型语言模型,提供关于访谈的洞见。默认情况下,我添加了 Gemma 模型 1.1b,并设置了一个提示来总结文本。如果用户选择进行总结,它将以项目符号列表的形式显示在文档顶部。
图片由作者提供
此外,通过点击显示代码,用户可以更改提示并要求模型执行不同的任务。
用于标签、重点和评论的文档生成
解决方案执行的最后一项任务是生成带有访谈转录和超链接的 Google Docs。这是通过Google API Python 客户端库完成的。
我知道了
由于该产品在我日常工作中变得非常有用,我决定给它起个名字,以便更容易引用。我将它称为Insights Gathering Open-source Tool,简称 iGot。
在首次使用该解决方案时,需要进行一些初始设置。让我通过一个实际的例子来指导您开始使用。
打开 iGot 笔记本并安装所需的库
点击此链接打开笔记本并运行第一个单元格以安装所需的库。大约需要 5 分钟。
作者提供的图片
如果系统提示您重启笔记本,请直接取消。无需重启。
作者提供的图片
如果一切按预期运行,您将看到“所有库已安装!”的消息。
作者提供的图片
获取 Hugging 用户访问令牌和模型访问权限
(此步骤仅在首次执行笔记本时需要)
为了运行 Gemma 和 punctuate-all 模型,我们将从 Hugging Face 下载权重文件。为此,您必须申请一个用户令牌并获得模型访问权限。
为此,您需要创建一个 Hugging Face 账户,并按照以下步骤获取具有阅读权限的令牌。
作者提供的图片
一旦您获得了令牌,复制它并返回到实验笔记本。进入“Secrets”选项卡,然后点击“Add new secret”。
作者提供的图片
将您的令牌命名为HF_TOKEN,并粘贴您从 Hugging Face 获得的密钥。
作者提供的图片
接下来,点击此链接打开 Hugging Face 上的 Gemma 模型。然后点击“确认许可”以获得模型访问权限。
作者提供的图片
发送访谈
要将访谈发送到 iGot,您需要先将其作为未列出的 YouTube 视频上传。为了本教程的目的,我从 Andrej Karpathy 与 Lex Fridman 的访谈中截取了一部分,并将其上传到我的账户。这是 Andrej 给机器学习初学者的一些建议部分。
然后,您需要获取视频的 URL,将其粘贴到Interview Selection笔记本单元格的video_url字段中,定义一个名称,并注明视频中的语言。
一旦你运行了该单元,你将收到一条消息,指示音频文件已生成。
作者提供的图片
模型选择与执行
在下一个单元中,你可以选择你想要用于转录的 Whisper 模型的大小。模型越大,转录精度越高。
默认情况下,选择的是最大的模型。做出选择后,运行该单元。
作者提供的图片
然后,运行模型执行单元,执行上一部分中显示的模型流程。如果一切顺利,你应该在最后看到消息“标点符号处理完毕!”。
作者提供的图片
如果你收到提示消息,询问是否允许访问 Hugging Face 令牌,请授予访问权限。
作者提供的图片
配置转录输出
最后一步是将转录保存到 Google Docs 文件中。为此,你需要指定文件路径,提供采访名称,并指示是否希望 Gemma 总结会议内容。
第一次执行该单元时,你可能会收到一条提示消息,询问是否允许访问你的 Google Drive。点击“允许”。
作者提供的图片
然后,给 Colab 完全访问你的 Google Drive 工作区。
作者提供的图片
如果一切顺利,你将会看到一个指向 Google Docs 文件的链接。只需点击它,你就可以访问你的采访转录。
作者提供的图片
从生成的文档中提取见解
最终文档将包含转录内容,每个短语都与视频中开始的相应时刻相关联。由于 YouTube 不提供讲者元数据,建议使用 Google Docs 的查找和替换工具,将“Speaker 0”、“Speaker 1”等替换为实际的讲者名字。
作者提供的图片
有了这个,你可以处理高亮、笔记、反应等内容。如最初设想的那样:
作者提供的图片
最后的思考
该工具仍处于第一版本,我计划将其发展为一个更用户友好的解决方案。也许会搭建一个网站,让用户不需要直接与笔记本交互,或者开发一个插件,用于在 Google Meets 和 Zoom 中使用。
我的这个项目的主要目标是创建一个高质量的会议转录工具,它不仅对他人有益,还能展示现有的开源工具如何与商业解决方案的能力相匹配。
希望你觉得它有用!如果你有任何反馈或对 iGot 的发展有兴趣,欢迎随时通过LinkedIn 联系我。
为工业应用构建视觉检查 CNN
使用 PyTorch 的逐步方法
https://medium.com/@ingo.nowitzky?source=post_page---byline--138936d7a34a--------------------------------https://towardsdatascience.com/?source=post_page---byline--138936d7a34a-------------------------------- Ingo Nowitzky
·发表于 Towards Data Science ·28 分钟阅读·2024 年 11 月 21 日
–
在本文中,我们为汽车电子行业的视觉检查分类任务开发并编码了一个卷积神经网络(CNN)。在此过程中,我们深入研究了卷积层的概念和数学,并分析了 CNN 实际“看到”的内容以及哪些图像部分引导它们做出决策。
目录
第一部分:概念背景
第二部分:定义和编码 CNN
第三部分:在生产中使用训练好的模型
第四部分:CNN 在“决策”中考虑了什么?
第一部分:概念背景
1.1 任务:将工业部件分类为良品或废品
在自动化装配线的一个工位上,带有两个突出金属销的线圈需要精确地放置在外壳中。金属销插入小的插座中。在某些情况下,销钉略微弯曲,因此无法通过机器连接。视觉检查的任务是识别这些线圈,以便它们可以被自动筛选出来。
对于检查,每个线圈会单独拾起,并放置在屏幕前。在这个位置,摄像机会拍摄一张灰度图像。然后,这张图像会被 CNN 检查并分类为良品或废品。
现在,我们想要定义一个卷积神经网络,能够处理图像并从预先分类的标签中学习。
1.2 什么是卷积神经网络(CNN)?
卷积神经网络是 卷积滤波器 和 全连接神经网络(NN)的组合。CNN 常用于 图像处理,如面部识别或视觉检测任务,像我们这个案例中的任务。卷积滤波器 是滑过图像并重新计算每个像素的矩阵操作。我们将在本文后面研究卷积滤波器。滤波器的权重是 不是预设的(例如 Photoshop 中的锐化功能),而是从数据中学习得到的。
1.3 卷积神经网络的架构
让我们检查一下 CNN 架构的一个例子。为了方便起见,我们选择 稍后将实现的模型。
图 3:我们的视觉检测 CNN 架构 | 图片来自作者
我们希望将大小为 400 像素高和 700 像素宽的检测图像输入到 CNN 中。由于图像是灰度图像,相关的 PyTorch 张量大小为 1x400x700。如果使用彩色图像,则会有 3 个输入通道:一个用于红色,一个用于绿色,一个用于蓝色(RGB)。在这种情况下,张量的大小将是 3x400x700。
第一个 卷积滤波器 有 6 个大小为 5x5 的 卷积核,它们滑过图像并生成 6 个独立的新图像,这些图像被称为 特征图,大小略微减小(6x396x696)。ReLU 激活函数 在图 3 中没有明确显示。它不会改变张量的维度,但会将所有负值设置为零。ReLU 后面是 MaxPooling 层,卷积核大小为 2x2。它会将每个图像的宽度和高度减半。
这三层——卷积层、ReLU 层和 MaxPooling 层——会再次实现。这最终给我们带来 16 个特征图,图像的高度为 97 像素,宽度为 172 像素。接下来,所有矩阵值会被展平,并输入到同样大小的全连接神经网络的第一层。它的第二层已经缩减为 120 个神经元。第三层和输出层只有 2 个神经元:一个表示标签“OK”,另一个表示标签“not OK” 或“scrap”。
如果你还不清楚维度变化的情况,请耐心等待。 我们将在接下来的章节中详细研究不同类型的层——卷积层、ReLU 层和 MaxPooling 层——如何工作,并如何影响张量维度。
1.4 卷积滤波器层
卷积滤波器的任务是寻找图像中的典型结构/模式。常用的卷积核尺寸是 3x3 或 5x5。卷积核的 9 个或 25 个权重并不是预先指定的,而是在训练过程中学习的(这里假设只有一个输入通道;否则,权重的数量会乘以输入通道数)。卷积核会以定义的步幅在图像的矩阵表示上滑动(每个输入通道都有自己的卷积核),在水平和垂直方向上进行卷积。卷积核与矩阵中对应的值相乘并求和。每个滑动位置的求和结果形成新的图像,我们称之为特征图。在一个卷积层中,我们可以指定多个卷积核。在这种情况下,我们会得到多个特征图作为结果。卷积核从左到右、从上到下滑动。因此,图 4 显示了卷积核在第五个滑动位置(不包括“…”)的状态。我们可以看到三个输入通道,分别表示红色、绿色和蓝色(RGB)。每个通道只有一个卷积核。在实际应用中,我们通常为每个输入通道定义多个卷积核。
卷积核 1 在红色输入通道上执行其操作。在当前显示的位置,我们计算该位置在特征图中的新值,计算过程为 (-0.7)0 + (-0.9)(-0.2) + (-0.6)0.5 + (-0.6)0.6 + 0.6(-0.3) + 0.7(-1) + 00.7 + (-0.1)(-0.1) + (-0.2)(-0.1) = (-1.33).* 对应的绿色通道(卷积核 2)的计算结果为 -0.14, 蓝色通道(卷积核 3)的结果为 0.69。为了得到该滑动位置的最终特征图值,我们将三个通道的值相加并加上偏置(偏置和所有卷积核权重在 CNN 训练过程中定义): (-1.33) + (-0.14) + 0.69 + 0.2 = -0.58。该值被放置在特征图中黄色高亮的对应位置。
最后,如果我们将输入矩阵的大小与特征图的大小进行比较,会发现通过卷积操作后,我们丢失了两行和两列。
1.5 ReLU 激活层
卷积之后,特征图会通过激活层。激活是必需的,目的是赋予网络非线性能力。最常用的激活方法是Sigmoid和ReLU(修正线性单元)。ReLU 激活将所有负值设为零,而正值保持不变。
在图 5 中,我们可以看到特征图的值逐元素通过了 ReLU 激活。
ReLU 激活对特征图的维度没有影响。
1.6 最大池化层
池化层的主要任务是减少特征图的大小,同时保留对分类重要的信息。通常,我们可以通过计算卷积核区域的平均值或返回最大值来进行池化。在大多数应用中,MaxPooling 更有益,因为它减少了数据中的噪声。池化的典型卷积核大小为 2x2 或 3x3。
图 6:使用 2x2 卷积核的 MaxPooling 和 AvgPooling | 图像来自作者
在图 6 中,我们看到使用 2x2 卷积核的 MaxPooling 和 AvgPooling 的示例。特征图被划分为与卷积核大小相同的区域,在这些区域内,我们选择最大值(→ MaxPooling)或平均值(→ AvgPooling)。
通过使用 2x2 的卷积核进行池化,我们将特征图的高度和宽度减半。
1.7 卷积神经网络中的张量维度
现在我们已经研究了卷积滤波器、ReLU 激活函数和池化,我们可以回顾一下图 3 及张量的维度。我们从一个大小为 400x700 的图像开始。由于它是灰度图像,所以只有 1 个通道,相应的张量大小为 1x400x700。我们对图像应用 6 个大小为 5x5、步长为 1x1 的卷积滤波器。每个滤波器返回自己的特征图,因此我们获得 6 个特征图。由于与图 4 中使用的卷积核相比(5x5 代替了 3x3),这次卷积会丢失 4 列和 4 行。因此,返回的张量大小为 6x396x696。
在下一步中,我们对特征图应用 2x2 卷积核的 MaxPooling(每个图都有自己的池化核)。正如我们所学,这将特征图的维度缩小一倍。因此,张量现在的大小为 6x198x348。
现在我们应用 16 个大小为 5x5 的卷积滤波器。每个滤波器的内核深度为 6,这意味着每个滤波器为输入张量的 6 个通道提供一个单独的层。每个卷积核层在 6 个输入通道中的一个上滑动,如图 4 所示,6 个返回的特征图相加合成一个。因此,到目前为止,我们只考虑了一个卷积滤波器,但实际上我们有 16 个滤波器。这就是为什么我们得到 16 个新的特征图,每个比输入小 4 列和 4 行。此时张量的大小是 16x194x344。
再次,我们应用 2x2 卷积核的 MaxPooling。由于这会将特征图的大小减半,所以现在我们得到的张量大小为 16x97x172。
最后,张量被展平,这意味着我们将所有1697172 = 266,944个值排列成一行,并将它们输入到一个相应大小的全连接神经网络中。
第二部分:定义和编码 CNN
从概念上讲,我们已经具备了所需的一切。接下来,让我们进入第 1.1 章中描述的工业应用案例。
2.1 加载所需的库
matplotlib.pyplotPIL
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import WeightedRandomSampler
from torch.utils.data import random_split
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import os
import warnings
warnings.filterwarnings("ignore")
2.2 配置设备并指定超参数
device‘cuda’‘cpu’minibatch_sizelearning_rateepochs
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using {device} device")
# Specify hyperparameters
minibatch_size = 10
learning_rate = 0.01
epochs = 60
2.3 自定义加载器函数
custom_loaderdata/Coil_Vision/01_train_val_test
# Define loader function
def custom_loader(path):
with open(path, 'rb') as f:
img = Image.open(f)
img = img.crop((50, 60, 750, 460)) #Size: 700x400 px
img.load()
return img
# Path of images (local to accelerate loading)
path = "data/Coil_Vision/01_train_val_test"
2.4 定义数据集
datasets.ImageFolder()
# Transform function for loading transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5), (0.5))]) # Create dataset out of folder structure dataset = datasets.ImageFolder(path, transform=transform, loader=custom_loader) train_set, val_set, test_set = random_split(dataset, [round(0.5*len(dataset)), round(0.3*len(dataset)), round(0.2*len(dataset))])
2.5 平衡数据集
WeightedRandomSamplerlblsnp.bincount()bc[0]bc[1]p_nOKp_OKlst_trainWeightedRandomSamplertrain_sampler
# Define a sampler to balance the classes
# training dataset
lbls = [dataset[idx][1] for idx in train_set.indices]
bc = np.bincount(lbls)
p_nOK = bc.sum()/bc[0]
p_OK = bc.sum()/bc[1]
lst_train = [p_nOK if lbl==0 else p_OK for lbl in lbls]
train_sampler = WeightedRandomSampler(weights=lst_train, num_samples=len(lbls))
2.6 定义数据加载器
最后,我们为训练、验证和测试数据定义了三个数据加载器。数据加载器按批次将数据集输入到神经网络中,每个批次包括图像数据和标签。
train_loaderval_loadertest_loader
# Define loader with batchsize
train_loader = DataLoader(dataset=train_set, batch_size=minibatch_size, sampler=train_sampler)
val_loader = DataLoader(dataset=val_set, batch_size=minibatch_size, shuffle=True)
test_loader = DataLoader(dataset=test_set, shuffle=True)
2.7 检查数据:绘制 5 个合格品和 5 个废品
matplotlibtrain_loadercount_OKcount_nOK
# Figure and axes object
fig, axs = plt.subplots(nrows=2, ncols=5, figsize=(20,7), sharey=True, sharex=True)
count_OK = 0
count_nOK = 0
# Loop over loader batches
for (batch_data, batch_lbls) in train_loader:
# Loop over batch_lbls
for i, lbl in enumerate(batch_lbls):
# If label is 0 (nOK) plot image in row 1
if (lbl.item() == 0) and (count_nOK < 5):
axs[1, count_nOK].imshow(batch_data[i][0], cmap='gray')
axs[1, count_nOK].set_title(f"nOK Part#: {str(count_nOK)}", fontsize=14)
count_nOK += 1
# If label is 1 (OK) plot image in row 0
elif (lbl.item() == 1) and (count_OK < 5):
axs[0, count_OK].imshow(batch_data[i][0], cmap='gray')
axs[0, count_OK].set_title(f"OK Part#: {str(count_OK)}", fontsize=14)
count_OK += 1
# If both counters are >=5 stop looping
if (count_OK >=5) and (count_nOK >=5):
break
# Config the plot canvas
fig.suptitle("Sample plot of OK and nonOK Parts", fontsize=24)
plt.setp(axs, xticks=[], yticks=[])
plt.show()
图 7:OK(上排)和非 OK(下排)部分的示例 | 图片由作者提供
在图 7 中,我们看到大多数 nOK 样本明显弯曲,但有少数样本肉眼难以区分(例如,右下角的样本)。
2.8 定义 CNN 模型
__init__()
forward()x
class CNN(nn.Module):
def __init__(self):
super().__init__()
# Define model layers
self.model_layers = nn.Sequential(
nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Flatten(),
nn.Linear(16*97*172, 120),
nn.ReLU(),
nn.Linear(120, 2)
)
def forward(self, x):
out = self.model_layers(x)
return out
2.9 实例化模型并定义损失函数和优化器
model
# Define model on cpu or gpu
model = CNN().to(device)
# Loss and optimizer
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
2.10 检查模型的大小
model.parameters()num_paramnum_param_trainable
# Count number of parameters / thereof trainable
num_param = sum([p.numel() for p in model.parameters()])
num_param_trainable = sum([p.numel() for p in model.parameters() if p.requires_grad == True])
print(f"Our model has {num_param:,} parameters. Thereof trainable are {num_param_trainable:,}!")
打印输出告诉我们,模型有超过 3200 万个参数,其中所有参数都是可训练的。
2.11 定义用于验证和测试的函数
val_test()dataloadermodeltorch.no_grad()dataloadermodeloutput.argmax(1)
我们统计并汇总正确的预测结果,同时保存图像数据、预测类别和错误预测的标签。最后,我们计算准确率,并将其与误分类的图像一并作为函数的输出返回。
def val_test(dataloader, model):
# Get dataset size
dataset_size = len(dataloader.dataset)
# Turn off gradient calculation for validation
with torch.no_grad():
# Loop over dataset
correct = 0
wrong_preds = []
for (images, labels) in dataloader:
images, labels = images.to(device), labels.to(device)
# Get raw values from model
output = model(images)
# Derive prediction
y_pred = output.argmax(1)
# Count correct classifications over all batches
correct += (y_pred == labels).type(torch.float32).sum().item()
# Save wrong predictions (image, pred_lbl, true_lbl)
for i, _ in enumerate(labels):
if y_pred[i] != labels[i]:
wrong_preds.append((images[i], y_pred[i], labels[i]))
# Calculate accuracy
acc = correct / dataset_size
return acc, wrong_preds
2.12 模型训练
epochstrain_loaderimagesoutputsoutputslabelsloptimizer.stepoutputs
n_correctn_true_OKn_samplesval_test()“model.pth”
acc_train = {}
acc_val = {}
# Iterate over epochs
for epoch in range(epochs):
n_correct=0; n_samples=0; n_true_OK=0
for idx, (images, labels) in enumerate(train_loader):
model.train()
# Push data to gpu if available
images, labels = images.to(device), labels.to(device)
# Forward pass
outputs = model(images)
l = loss(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
l.backward()
optimizer.step()
# Get prediced labels (.max returns (value,index))
_, y_pred = torch.max(outputs.data, 1)
# Count correct classifications
n_correct += (y_pred == labels).sum().item()
n_true_OK += (labels == 1).sum().item()
n_samples += labels.size(0)
# At end of epoch: Eval accuracy and print information
if (epoch+1) % 2 == 0:
model.eval()
# Calculate accuracy
acc_train[epoch+1] = n_correct / n_samples
true_OK = n_true_OK / n_samples
acc_val[epoch+1] = val_test(val_loader, model)[0]
# Print info
print (f"Epoch [{epoch+1}/{epochs}], Loss: {l.item():.4f}")
print(f" Training accuracy: {acc_train[epoch+1]*100:.2f}%")
print(f" True OK: {true_OK*100:.3f}%")
print(f" Validation accuracy: {acc_val[epoch+1]*100:.2f}%")
# Save model and state_dict
torch.save(model, "model.pth")
训练在我笔记本的 GPU 上只需几分钟。强烈建议从本地驱动加载图像,否则训练时间可能会增加几个数量级!
训练的输出表明损失已经显著减少,验证准确率——即模型未用于更新参数的数据上的准确率——已达到 98.4%。
如果我们绘制训练过程中每个 epoch 的训练和验证准确率,将能更好地了解训练进展。由于我们每隔一个 epoch 就保存一次值,因此可以轻松地做到这一点。
matplotlibplt.subplots()
# Instantiate figure and axe object
fig, ax = plt.subplots(figsize=(10,6))
plt.plot(list(acc_train.keys()), list(acc_train.values()), label="training accuracy")
plt.plot(list(acc_val.keys()), list(acc_val.values()), label="validation accuracy")
plt.title("Accuracies", fontsize=24)
plt.ylabel("%", fontsize=14)
plt.xlabel("Epochs", fontsize=14)
plt.setp(ax.get_xticklabels(), fontsize=14)
plt.legend(loc='best', fontsize=14)
plt.show()
图 8:模型训练期间的训练和验证准确率 | 图片由作者提供
2.13 加载训练好的模型
如果你想将模型用于生产而不仅仅是用于学习,强烈建议保存并加载包含所有参数的模型。保存已经是训练代码的一部分,从你的驱动器加载模型同样简单。
# Read model from file
model = torch.load("model.pth")
model.eval()
2.14 使用测试数据再次检查模型准确度
test_loaderval_test()
print(f"test accuracy: {val_test(test_loader,model)[0]*100:0.1f}%")
在这个特定的例子中,我们达到了 99.2%的测试准确度,但这很大程度上取决于随机性(记住:图像随机分配到训练、验证和测试数据中)。
2.15 可视化错误分类的图像
val_test()tup[0]tup[1]tup[1][0]tup[1][1]tup[1][2]tup[1]
%matplotlib inline
# Call test function
tup = val_test(test_loader, model)
# Check if wrong predictions occur
if len(tup[1])>=1:
# Loop over wrongly predicted images
for i, t in enumerate(tup[1]):
plt.figure(figsize=(7,5))
img, y_pred, y_true = t
img = img.to("cpu").reshape(400, 700)
plt.imshow(img, cmap="gray")
plt.title(f"Image {i+1} - Predicted: {y_pred}, True: {y_true}", fontsize=24)
plt.axis("off")
plt.show()
plt.close()
else:
print("No wrong predictions!")
在我们的例子中,只有一张错误分类的图像,它占测试数据集的 0.8%(我们有 125 张测试图像)。这张图像被分类为合格,但标签为 nOK。坦白说,我也会错误分类它 😃.
图 9:错误分类的图像 | 图片来自作者
第三部分:在生产中使用训练好的模型
3.1 加载模型、所需库和参数
在生产阶段,我们假设 CNN 模型已经训练好,并且参数可以加载。我们的目标是将新图像加载到模型中,让模型判断相应的电子组件是否适合用于组装(见章节 1.1 任务:将工业组件分类为合格或废料)。
‘cuda’‘cpu’CNNtorch.load()CNN
# Load the required libraries
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
from PIL import Image
import os
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Define the CNN model exactly as in chapter 2.8
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
# Define model layers
self.model_layers = nn.Sequential(
nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Flatten(),
nn.Linear(16*97*172, 120),
nn.ReLU(),
nn.Linear(120, 2),
#nn.LogSoftmax(dim=1)
)
def forward(self, x):
out = self.model_layers(x)
return out
# Load the model's parameters
model = torch.load("model.pth")
model.eval()
通过运行这段代码,我们已经将 CNN 模型加载并在计算机内存中进行了参数化。
3.2 将图像加载到数据集中
对于训练阶段,我们需要准备图像以供 CNN 模型处理。我们从指定文件夹中加载图像,裁剪出内部 700x400 像素,并将图像数据转换为 PyTorch 张量。
# Define custom dataset
class Predict_Set(Dataset):
def __init__(self, img_folder, transform):
self.img_folder = img_folder
self.transform = transform
self.img_lst = os.listdir(self.img_folder)
def __len__(self):
return len(self.img_lst)
def __getitem__(self, idx):
img_path = os.path.join(self.img_folder, self.img_lst[idx])
img = Image.open(img_path)
img = img.crop((50, 60, 750, 460)) #Size: 700x400
img.load()
img_tensor = self.transform(img)
return img_tensor, self.img_lst[idx]
Predict_Set()__init__()transformself.img_lst__len__()__getitem__()transform
3.3 路径、变换函数和数据加载器
pathtransformPredict_Set()predict_setpredict_loaderpredict_loader
# Path to images (preferably local to accelerate loading)
path = "data/Coil_Vision/02_predict"
# Transform function for loading
transform = transforms.Compose([transforms.ToTensor(),
transforms.Normalize((0.5), (0.5))])
# Create dataset as instance of custom dataset
predict_set = Predict_Set(path, transform=transform)
# Define loader
predict_loader = DataLoader(dataset=predict_set)
3.4 分类的自定义函数
predict()
def predict(dataloader, model):
# Turn off gradient calculation
with torch.no_grad():
img_lst = []; y_pred_lst = []; name_lst = []
# Loop over data loader
for image, name in dataloader:
img_lst.append(image)
image = image.to(device)
# Get raw values from model
output = model(image)
# Derive prediction
y_pred = output.argmax(1)
y_pred_lst.append(y_pred.item())
name_lst.append(name[0])
return img_lst, y_pred_lst, name_lst
predict()output.argmax(1)
3.5 预测标签并绘制图像
“data/Coil_Vision/02_predict”
predict()
# Predict labels for images
imgs, lbls, names = predict(predict_loader, model)
# Iterate over classified images
for idx, image in enumerate(imgs):
plt.figure(figsize=(8,6))
plt.imshow(image.squeeze(), cmap="gray")
plt.title(f"\nFile: {names[idx]}, Predicted label: {lbls[idx]}", fontsize=18)
plt.axis("off")
plt.show()
plt.close()
我们可以看到,左侧的两张图片被分类为合格(标签1),右侧的两张则被分类为废品(标签0)。由于我们的训练数据,模型非常敏感,即使是针脚的微小弯曲也会导致它们被分类为废品。
第四部分:CNN 在其“决策”中考虑了什么?
到目前为止,我们已经深入探讨了卷积神经网络(CNN)及其在工业应用中的使用案例。这似乎是一个很好的机会,可以进一步了解 CNN 模型在处理图像数据时“看到了”什么。为此,我们首先研究卷积层,然后检查图像的哪些部分对分类尤为重要。
4.1 研究卷积滤波器的维度
为了更好地理解卷积滤波器的工作原理及其对图像的影响,让我们更详细地检查我们工业示例中的层。
model.children()all_layersconv_weightsall_layers
# Empty lists to store the layers and the weights
all_layers = []; conv_weights = []
# Iterate over the model's structure
# (First level nn.Sequential)
for _, layer in enumerate(list(model.children())[0]):
if type(layer) == nn.Conv2d:
all_layers.append(layer)
conv_weights.append(layer.weight)
elif type(layer) in [nn.ReLU, nn.MaxPool2d]:
all_layers.append(layer)
conv_weights.append("*")
# Print layers and dimensions of weights
for idx, layer in enumerate(all_layers):
print(f"{idx+1}. Layer: {layer}")
if type(layer) == nn.Conv2d:
print(f" weights: {conv_weights[idx].shape}")
else:
print(f" weights: {conv_weights[idx]}")
print()
图 11:层及权重的维度
torch.Size([6, 1, 5, 5])torch.Size([16, 6, 5, 5])
4.2 可视化卷积滤波器的权重
现在,我们已经知道了卷积滤波器的维度。接下来,我们想查看它们在训练过程中获得的权重。由于我们有很多不同的滤波器(第一层有 6 个,第二层有 16 个),我们在这两种情况下都选择第一个输入通道(索引0)。
import itertools
# Iterate through all layers
for idx_out, layer in enumerate(all_layers):
# If layer is a convolutional filter
if type(layer) == nn.Conv2d:
# Print layer name
print(f"\n{idx_out+1}. Layer: {layer} \n")
# Prepare plot and weights
plt.figure(figsize=(25,6))
weights = conv_weights[idx_out][:,0,:,:] # only first input channel
weights = weights.detach().to('cpu')
# Enumerate over filter weights (only first input channel)
for idx_in, f in enumerate(weights):
plt.subplot(2,8, idx_in+1)
plt.imshow(f, cmap="gray")
plt.title(f"Filter {idx_in+1}")
# Print texts
for i, j in itertools.product(range(f.shape[0]), range(f.shape[1])):
if f[i,j] > f.mean():
color = 'black'
else:
color = 'white'
plt.text(j, i, format(f[i, j], '.2f'), horizontalalignment='center', verticalalignment='center', color=color)
plt.axis("off")
plt.show()
plt.close()
all_layersnn.Conv2dplt.imshow()
图 12 展示了第 1 层的六个卷积滤波器内核和第 4 层的 16 个内核(对于输入通道0)。右上角的模型示意图显示了带红色轮廓的滤波器。我们看到大部分值接近 0,部分值在正负 0.20–0.25 范围内。这些数字表示在图 4 中演示的卷积操作中使用的值。这些给出了特征图,接下来我们将检查这些特征图。
4.3 检查特征图
test_loader
# Test loader has a batch size of 1
img = next(iter(test_loader))[0].to(device)
print(f"\nImage has shape: {img.shape}\n")
# Plot image
img_copy = img.to('cpu')
plt.imshow(img_copy.reshape(400,700), cmap="gray")
plt.axis("off")
plt.show()
imgall_layers[0]resultsall_layersresults
# Pass the image through the first layer
results = all_layers[0]
# Pass the results of the previous layer to the next layer
for idx in range(1, len(all_layers)): # Start at 1, first layer already passed!
results.append(all_layersidx) # Pass the last result to the layer
最后,我们绘制了原始图像、经过第一层(卷积)、第二层(ReLU)、第三层(MaxPooling)、第四层(第二次卷积)、第五层(第二次 ReLU)和第六层(第二次 MaxPooling)处理后的特征图。
我们看到卷积核(对比图 12)重新计算了图像的每个像素。这在特征图中表现为灰度值的变化。与原始图像相比,一些特征图被锐化,或者具有更强的黑白对比,而其他特征图则显得更模糊。
ReLU 操作将深灰色转为黑色,因为负值被设置为零。
MaxPooling 保持图像几乎不变,同时在两个维度上将图像大小减半。
4.4 可视化对分类影响最大的图像区域
在我们完成之前,让我们分析一下图像中哪些区域对分类为废料(索引0)或良品(索引1)至关重要。为此,我们使用梯度加权类别激活映射(gradCAM)。该技术计算训练好的模型相对于预测类别的梯度(梯度显示输入——图像像素——如何影响预测)。每个特征图(即卷积层的输出通道)梯度的平均值构成了计算热图时,用于与特征图相乘的权重。
但让我们逐步看一下。
def gradCAM(x):
# Run model and predict
logits = model(x)
pred = logits.max(-1)[-1] # Returns index of max value (0 or 1)
# Fetch activations at final conv layer
last_conv = model.model_layers[:5]
activations = last_conv(x)
# Compute gradients with respect to model's prediction
model.zero_grad()
logits[0,pred].backward(retain_graph=True)
# Compute average gradient per output channel of last conv layer
pooled_grads = model.model_layers[3].weight.grad.mean((1,2,3))
# Multiply each output channel with its corresponding average gradient
for i in range(activations.shape[1]):
activations[:,i,:,:] *= pooled_grads[i]
# Compute heatmap as average over all weighted output channels
heatmap = torch.mean(activations, dim=1)[0].cpu().detach()
return heatmap
gradCAMxheatmap
xmodellogitspred
last_convxactivations
logits[0,pred]model.zero_grad()
pooled_grads
pooled_gradsactivations
heatmapactivationsgradCAM
upsampleHeatmap()cv2
import cv2
def upsampleHeatmap(map, img):
m,M = map.min(), map.max()
i,I = img.min(), img.max()
map = 255 * ((map-m) / (M-m))
img = 255 * ((img-i) / (I-i))
map = np.uint8(map)
img = np.uint8(img)
map = cv2.resize(map, (700,400))
map = cv2.applyColorMap(255-map, cv2.COLORMAP_JET)
map = np.uint8(map)
img = cv2.applyColorMap(255-img, cv2.COLORMAP_JET)
img = np.uint8(img)
map = np.uint8(map*0.7 + img*0.3)
return map
predict_loadergradCAM()upsampleHeatmap()matplotlib.pyplot
# Iterate over dataloader
for idx, (image, name) in enumerate(predict_loader):
# Compute heatmap
image = image.to(device)
heatmap = gradCAM(image)
image = image.cpu().squeeze(0).permute(1,2,0)
heatmap = upsampleHeatmap(heatmap, image)
# Plot images and heatmaps
fig = plt.figure(figsize=(14,5))
fig.suptitle(f"\nFile: {names[idx]}, Predicted label: {lbls[idx]}\n", fontsize=24)
plt.subplot(1, 2, 1)
plt.imshow(image, cmap="gray")
plt.title(f"Image", fontsize=14)
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(heatmap)
plt.title(f"Heatmap", fontsize=14)
plt.tight_layout()
plt.axis("off")
plt.show()
plt.close()
图 15:图像与热力图(输出的内两行)| 图像由作者提供
热力图中的蓝色区域对模型决策的影响较小,而黄色和红色区域则非常重要。我们看到,在我们的使用案例中,主要是电子元件的轮廓(尤其是金属引脚)对分类为废料或好件至关重要。当然,这一点非常合理,因为我们的用例主要处理的是弯曲引脚。
结论
卷积神经网络(CNN)如今已成为工业环境中常见且广泛使用的视觉检测工具。在我们的使用案例中,通过相对少量的代码行,我们成功定义了一个模型,能够高精度地将电子元件分类为好件或废料。与传统的视觉检测方法相比,最大的优势在于不需要过程工程师在图像中指定视觉标记来进行分类。相反,CNN 通过标签化的示例进行学习,并能够将这种知识复制到其他图像。在我们的具体用例中,626 张标签化的图像足以进行训练和验证。在更复杂的情况下,训练数据的需求可能会显著增加。
像 gradCAM(梯度加权类激活映射)这样的算法在理解图像中哪些区域对模型的决策特别相关方面具有重要帮助。通过这种方式,它们通过增强对模型功能的信任,支持卷积神经网络(CNN)在工业环境中的广泛应用。
在本文中,我们探讨了卷积神经网络的许多内部工作细节。希望您喜欢这次旅程,并且深入理解了 CNN 的工作原理。
使用 IBM Watsonx 和 Langchain 构建代理型检索增强生成(RAG)系统
快速入门教程
https://medium.com/@lakshmi.sunil5486?source=post_page---byline--a0182c9f5b01--------------------------------https://towardsdatascience.com/?source=post_page---byline--a0182c9f5b01-------------------------------- Lakshmi Narayanan
·发布于 Towards Data Science ·阅读时间 5 分钟·2024 年 8 月 23 日
–
AI 生成图像(由 GPT-4o 生成)
人工智能(AI)领域,特别是在生成式 AI 方面,最近取得了显著进展。大型语言模型(LLMs)在这方面具有革命性意义。构建 LLM 应用程序的一种流行方法是检索增强生成(RAG),它结合了利用组织数据的能力和这些 LLM 的生成能力。代理是一种流行且有用的方式,可以将自主行为引入 LLM 应用程序中。
什么是代理型 RAG?
代理型 RAG代表了 AI 系统的一个高级演进,在这种系统中,自主代理利用 RAG 技术来增强决策和响应能力。与传统的 RAG 模型不同,后者通常依赖用户输入来触发行动,代理型 RAG 系统采取了主动的方法。这些代理主动寻找相关信息,分析并利用它生成响应或采取具体行动。代理被配备了一套工具,并能够谨慎地选择并使用适当的工具来解决特定问题。
这种主动行为在许多应用场景中尤为宝贵,如客户服务、研究协助和复杂问题处理等…
使用 DSPy 构建 AI 助手
一种编程和调整提示无关的 LLM 代理流水线的方法
https://lakshmanok.medium.com/?source=post_page---byline--2e1e749a1a95--------------------------------https://towardsdatascience.com/?source=post_page---byline--2e1e749a1a95-------------------------------- Lak Lakshmanan
·发表于Towards Data Science ·阅读时间 9 分钟·2024 年 3 月 7 日
–
我讨厌提示工程。有一方面,我不想在 LLM 面前俯首称臣(“你是世界上最棒的文案写手……”)、贿赂它(“如果你……,我会给你 10 美元小费”)或烦扰它(“确保……”)。另一方面,提示是脆弱的——对提示语做些许更改就能导致输出发生重大变化。这使得使用 LLM 开发可重复的功能变得困难。
不幸的是,今天开发基于 LLM 的应用程序涉及调整和修改提示语。从编写计算机精确执行的编程语言代码,到编写自然语言指令(计算机并不完全遵循)似乎并没有进步。这就是我发现使用 LLM 进行工作的原因感到沮丧——我更喜欢编写和调试我可以实际推理的计算机程序。
那么,如果你能使用一个高级编程框架在 LLM 之上进行编程,并让框架为你编写和调整提示语呢?那岂不是太好了?这——能够在不处理提示的情况下编程构建代理流水线,并且以数据驱动和与 LLM 无关的方式调整这些流水线——正是 DSPy背后的关键前提。
一个 AI 助手
为了说明 DSPy 是如何工作的,我将构建一个 AI 助手。
什么是 AI 助手?它是一个为人类执行任务提供帮助的计算机程序。理想的 AI 助手会主动地代表用户工作(聊天机器人可以作为功能备份,用于查找产品中不易找到的功能或为终端用户提供客户支持,但不应是应用程序中主要/唯一的 AI 助手)。因此,设计 AI 助手的过程包括思考工作流程,并确定如何通过 AI 来简化它。
一个典型的 AI 助手通过以下方式简化工作流程:(1)检索与任务相关的公司政策等信息,(2)从客户发送的文档中提取信息,(3)根据对政策和文档的文本分析填写表格或检查单,(4)收集参数并代表用户调用函数,(5)识别潜在错误并突出风险。
我将使用一个桥牌作为例子来说明 AI 助手的用例。尽管我正在为桥牌叫牌构建 AI 助手,但你并不需要了解桥牌就能理解这里的概念。我选择桥牌的原因是,桥牌中有大量术语、涉及相当多的人类判断,并且有多个外部工具供顾问使用。这些是你可能想为其构建 AI 助手的行业问题和后台流程的关键特征。但因为它是一个游戏,所以其中不涉及机密信息。
代理框架
当被问到类似“什么是 Stayman?”的问题时,助手会使用多个后台服务来执行其任务。这些后台服务通过代理进行调用,而这些代理本身是基于语言模型构建的。与软件工程中的微服务类似,使用代理和后台服务允许解耦和专业化——AI 助手不需要知道事情是如何完成的,只需要知道需要完成什么,而每个代理只需了解如何做自己的事情。
一个代理框架。图片由作者提供。图片中的草图是使用 Gemini 生成的。
在代理框架中,代理通常是较小的语言模型(LMs),这些模型需要准确,但不具备世界知识。代理能够进行“推理”(通过思维链)、搜索(通过检索增强生成)和执行非文本工作(通过提取参数传递给后台函数)。代理框架的前端是一个非常流利且连贯的大型语言模型(LLM)。这个 LLM 知道它需要处理的意图,以及如何路由这些意图。它还需要具备世界知识。通常,会有一个单独的政策或监管 LLM 作为过滤器。当用户发起查询时(聊天机器人用例)或发生触发事件时(主动助手用例),AI 助手会被调用。
使用 DSPy 的零样本提示
要构建上述整个架构,我将使用 DSPy。整个代码可以在 GitHub 上找到;从该目录下的bidding_advisor.py开始,跟着一起操作。
在 DSPy 中,发送提示给 LLM 并获取响应的过程如下:
class ZeroShot(dspy.Module):
"""
Provide answer to question
"""
def __init__(self):
super().__init__()
self.prog = dspy.Predict("question -> answer")
def forward(self, question):
return self.prog(question="In the game of bridge, " + question)
上面的代码段中发生了四个事情:
-
编写一个 dspy.Module 的子类
-
在 init 方法中,设置一个 LM 模块。最简单的方式是使用 dspy.Predict,它是一个单一的调用。
-
Predict 构造函数接受一个签名。这里,我表示有一个输入(问题)和一个输出(答案)。
-
编写一个 forward()方法,接受指定的输入(这里是:问题),并返回签名中承诺的内容(这里是:答案)。它通过调用在 init 方法中创建的 dspy.Predict 对象来实现。
我本可以直接传递问题,但为了展示我可以在某种程度上影响提示,我添加了一些上下文。
注意,上面的代码完全与 LLM 无关,且提示中没有任何谄媚、贿赂等内容。
要调用上述模块,首先初始化 dspy 并配置一个 LLM:
gemini = dspy.Google("models/gemini-1.0-pro",
api_key=api_key,
temperature=temperature)
dspy.settings.configure(lm=gemini, max_tokens=1024)
然后,调用你的模块:
module = ZeroShot()
response = module("What is Stayman?")
print(response)
当我这么做时,得到了:
Prediction(
answer='Question: In the game of bridge, What is Stayman?\nAnswer: A conventional bid of 2♣ by responder after a 1NT opening bid, asking opener to bid a four-card major suit if he has one, or to pass if he does not.'
)
想要使用不同的 LLM?将设置配置行更改为:
gpt35 = dspy.OpenAI(model="gpt-3.5-turbo",
api_key=api_key,
temperature=temperature)
dspy.settings.configure(lm=gpt35, max_tokens=1024)
文本提取
如果 DSPy 只是让调用 LLMs 更容易并且将 LLM 进行抽象化,那么人们也不会对 DSPy 如此兴奋。让我们继续构建 AI 助手,并在过程中展示一些其他的优势。
假设我们想使用 LLM 进行实体提取。我们可以通过指示 LLM 识别我们要提取的内容(日期、产品 SKU 等)来实现。在这里,我们会要求它找出桥牌术语:
class Terms(dspy.Signature):
"""
List of extracted entities
"""
prompt = dspy.InputField()
terms = dspy.OutputField(format=list)
class FindTerms(dspy.Module):
"""
Extract bridge terms from a question
"""
def __init__(self):
super().__init__()
self.entity_extractor = dspy.Predict(Terms)
def forward(self, question):
max_num_terms = max(1, len(question.split())//4)
instruction = f"Identify up to {max_num_terms} terms in the following question that are jargon in the card game bridge."
prediction = self.entity_extractor(
prompt=f"{instruction}\n{question}"
)
return prediction.terms
虽然我们本可以将模块的签名表示为“提示 -> 条件”,但我们也可以将签名表示为一个 Python 类。
在一个语句上调用此模块:
module = FindTerms()
response = module("Playing Stayman and Transfers, what do you bid with 5-4 in the majors?")
print(response)
我们将得到:
['Stayman', 'Transfers']
注意,这段代码是多么简洁和易读。
RAG
DSPy 内置了多个检索器。但这些本质上只是函数,你可以将现有的检索代码封装到 dspy.Retriever 中。它支持多个流行的检索器,包括 ChromaDB:
from chromadb.utils import embedding_functions
default_ef = embedding_functions.DefaultEmbeddingFunction()
bidding_rag = ChromadbRM(CHROMA_COLLECTION_NAME, CHROMADB_DIR, default_ef, k=3)
当然,我得先获取一本关于桥牌叫牌的文档,将其拆分并加载到 ChromaDB 中。如果你感兴趣,代码在仓库里,但由于与本文无关,我将略过不提。
编排
现在你已经实现了所有的代理,每个代理都是一个独立的 dspy.Module。接下来,构建编排 LLM,它接收命令或触发器并以某种方式调用代理模块。
模块的编排也发生在一个 dspy.Module 中:
class AdvisorSignature(dspy.Signature):
definitions = dspy.InputField(format=str) # function to call on input to make it a string
bidding_system = dspy.InputField(format=str) # function to call on input to make it a string
question = dspy.InputField()
answer = dspy.OutputField()
class BridgeBiddingAdvisor(dspy.Module):
"""
Functions as the orchestrator. All questions are sent to this module.
"""
def __init__(self):
super().__init__()
self.find_terms = FindTerms()
self.definitions = Definitions()
self.prog = dspy.ChainOfThought(AdvisorSignature, n=3)
def forward(self, question):
terms = self.find_terms(question)
definitions = [self.definitions(term) for term in terms]
bidding_system = bidding_rag(question)
prediction = self.prog(definitions=definitions,
bidding_system=bidding_system,
question="In the game of bridge, " + question,
max_tokens=-1024)
return prediction.answer
我没有使用 dspy.Predict 作为最终步骤,而是使用了 ChainOfThought(COT=3)。
优化器
现在我们已经设置好了整个链条,我们当然可以直接调用调度模块来进行测试。但更重要的是,我们可以让 dspy 根据示例数据自动调整提示。
要加载这些示例并让 dspy 进行调整(这叫做提示器,但这个名称将改为优化器,这更准确地描述了它的功能),我这样做:
traindata = json.load(open("trainingdata.json", "r"))['examples']
trainset = [dspy.Example(question=e['question'], answer=e['answer']) for e in traindata]
# train
teleprompter = teleprompt.LabeledFewShot()
optimized_advisor = teleprompter.compile(student=BridgeBiddingAdvisor(), trainset=trainset)
# use optimized advisor just like the original orchestrator
response = optimized_advisor("What is Stayman?")
print(response)
我在上面的示例中只用了 3 个示例,但显然,你会使用成百上千个示例来获得一个经过适当调优的提示集。值得注意的是,调整是针对整个管道进行的;你不需要一个一个模块地调整。
优化后的管道更好吗?
原始管道对这个问题的返回结果如下(也显示了中间输出,并且“两颗梅花”是错误的):
a: Playing Stayman and Transfers, what do you bid with 5-4 in the majors?
b: ['Stayman', 'Transfers']
c: ['Stayman convention | Stayman is a bidding convention in the card game contract bridge. It is used by a partnership to find a 4-4 or 5-3 trump fit in a major suit after making a one notrump (1NT) opening bid and it has been adapted for use after a 2NT opening, a 1NT overcall, and many other natural notrump bids.', "Jacoby transfer | The Jacoby transfer, or simply transfers, in the card game contract bridge, is a convention initiated by responder following partner's notrump opening bid that forces opener to rebid in the suit ranked just above that bid by responder. For example, a response in diamonds forces a rebid in hearts and a response in hearts forces a rebid in spades. Transfers are used to show a weak hand with a long major suit, and to ensure that opener declare the hand if the final contract is in the suit transferred to, preventing the opponents from seeing the cards of the stronger hand."]
d: ['stayman ( possibly a weak ... 1602', '( scrambling for a two - ... 1601', '( i ) two hearts is weak ... 1596']
Two spades.
优化后的管道返回了正确答案“Smolen”:
a: Playing Stayman and Transfers, what do you bid with 5-4 in the majors?
b: ['Stayman', 'Transfers']
c: ['Stayman convention | Stayman is a bidding convention in the card game contract bridge. It is used by a partnership to find a 4-4 or 5-3 trump fit in a major suit after making a one notrump (1NT) opening bid and it has been adapted for use after a 2NT opening, a 1NT overcall, and many other natural notrump bids.', "Jacoby transfer | The Jacoby transfer, or simply transfers, in the card game contract bridge, is a convention initiated by responder following partner's notrump opening bid that forces opener to rebid in the suit ranked just above that bid by responder. For example, a response in diamonds forces a rebid in hearts and a response in hearts forces a rebid in spades. Transfers are used to show a weak hand with a long major suit, and to ensure that opener declare the hand if the final contract is in the suit transferred to, preventing the opponents from seeing the cards of the stronger hand."]
d: ['stayman ( possibly a weak ... 1602', '( scrambling for a two - ... 1601', '( i ) two hearts is weak ... 1596']
After a 1NT opening, Smolen allows responder to show 5-4 in the majors with game-forcing values.
原因在于 dspy 创建的提示。例如,对于问题“什么是 Stayman?”,请注意,它已经根据术语定义和 RAG 中的多个匹配项构建了一个推理过程:
该提示由 dspy.ChainOfThought 根据术语定义、RAG 等创建。
再次强调,我并没有编写上面调整过的提示。这些都是为我编写的。你也可以看到这未来的发展方向——你可能能够微调整个管道,使其在更小的语言模型上运行。
祝你愉快!
下一步
构建一个 AI 驱动的业务管理系统
使用 DALL·E 创建
将 AI 代理与 SQL 数据库连接的逐步指南——系列文章的第二部分
https://medium.com/@lukas.kowejsza?source=post_page---byline--e2a31a2fe984--------------------------------https://towardsdatascience.com/?source=post_page---byline--e2a31a2fe984-------------------------------- Lukasz Kowejsza
·发布于 Towards Data Science ·29 分钟阅读·2024 年 4 月 23 日
–
想象一下通过一个简单易用的手机界面来简化整个业务管理。虽然同时使用多个应用程序是常见做法,但未来的趋势是将所有互动整合到一个基于聊天的平台中,这个平台由大型语言模型(LLM)驱动。
对于小型企业来说,这种方法具有显著优势。通过将数据管理任务集中在统一的聊天界面中,企业主可以节省时间,减少复杂性,并最小化对不同软件工具的依赖。最终的结果是资源分配更为高效,能够将更多精力集中在核心业务增长活动上。
然而,这一潜力不仅限于小型企业。本教程中详细介绍的概念和技术同样适用于个人使用案例。从管理待办事项和跟踪开支到整理收藏,基于聊天的界面为与数据交互提供了一种直观且高效的方式。
本文是一个系列文章中的第二篇,旨在引导你完成从最初的概念到实际实现的整个软件开发过程。在上一篇文章中介绍的组件基础上,我们将建立我们应用程序的基础元素,包括:
-
设置数据库架构
-
定义核心应用功能
-
结构化项目仓库
-
创建能够使用自然语言命令与多个 SQL 数据库表交互的工具
到本教程结束时,你将清楚地理解如何设计一个基于聊天界面的架构,利用大语言模型(LLM)简化数据管理任务。无论你是希望优化运营的小企业主,还是寻求个人组织优化的个体,这里讲解的原则将为你的项目提供一个坚实的起点。
让我们先简要回顾一下上一篇文章的关键要点,为我们当前的目标设定背景。
回顾
gpt-3.5-turbo
为了实现这一目标,我们实施了两个关键变更:
-
删除了工具模式中的必需参数
-
在执行所需功能之前增加了参数验证步骤
通过将所有工具参数设为可选,并手动检查缺失的参数,我们消除了代理/大语言模型生成缺失值的幻觉冲动。
在上一篇文章中介绍的关键对象是:
OpenAiAgentToolToolResultStepResult
这些组件构成了我们代理系统的基础,使其能够处理用户请求,选择合适的工具,并生成响应。
如果你想要更详细的解释或了解特定设计选择背后的原因,可以查看上一篇文章:利用 OpenAI 工具调用:从零开始构建可靠的 AI 代理
记住这些回顾内容后,让我们进入项目的下一阶段——集成数据库功能以存储和管理业务数据。
为什么为小企业数据管理提供聊天界面
小企业在数据维护方面经常面临独特的挑战。与大公司一样,它们需要定期更新和维护各种类型的数据,如会计记录、时间跟踪、发票等。然而,现代 ERP(企业资源规划)系统的复杂性和成本对小企业而言可能是一个障碍。因此,许多小企业不得不依赖一系列 Excel 电子表格来捕捉和维护关键数据。
这种方法的问题在于,小企业主通常并非完全专注于行政任务,无法投入大量时间和精力进行复杂的行政管理和控制流程。关键在于定义精简的流程,并在数据出现时及时更新,最小化数据管理的开销。
通过利用大型语言模型的强大功能并创建聊天界面,我们旨在简化和优化小型企业的数据管理。该聊天机器人将充当统一接口,允许用户输入数据、检索信息,并通过自然语言命令执行各种任务。这消除了需要在多个电子表格之间切换或开发具有多个表单和仪表盘的复杂 web 应用程序的需求。
在这一系列教程中,我们将逐步增强聊天机器人的功能,添加诸如基于角色的访问控制、先进的查询与评估、多模态支持,以及与流行的通讯平台(如 WhatsApp)的集成等功能。到系列结束时,您将拥有一个强大而灵活的工具,能够根据您的具体需求进行调整,无论您是经营一家小型企业,还是仅仅希望更高效地组织个人生活。
让我们开始吧!
1. 项目结构
为了确保项目井井有条并易于维护,我们已经有系统地构建了我们的代码库,封装了不同的功能和组件。以下是代码库结构的概述:
project-root/
│
├── database/
│ ├── db.py # Database connection and setup
│ ├── models.py # Database models/schemas
| └── utils.py # Database utilities
│
├── tools/
│ ├── base.py # Base class for tools
│ ├── add.py # Tool for adding data to the database
│ ├── query.py # Tool for querying data from the database
| └── utils.py # Tool utilities
│
├── agents/
│ ├── base.py # Main AI agent logic
│ ├── routing.py # Specialized agent for routing tasks
│ ├── task.py # Tool wrapper for OpenAI subagents
| └── utils.py # agent utilities
│
└── utils.py # Utility functions and classes
这种结构使得关注点分离更加清晰,简化了应用程序的开发、维护和扩展。
2. 设置数据库
选择合适的数据库和 ORM(对象关系映射)库对我们的应用程序至关重要。对于这个项目,我们选择了以下框架:
-
SQLAlchemy:一个强大的 SQL 工具包和 Python 的对象关系映射(ORM)库。它提供了一套与数据库交互的工具,通过 Python 对象和类进行操作。
-
SQLModel:一个构建在 SQLAlchemy 和 Pydantic 之上的库,提供了一种简单直观的方式来定义数据库模型并执行数据库操作。
Tool
为确保我们应用程序的安全性和稳健性,我们实施了以下措施:
-
基于角色的访问控制:可执行的操作与用户角色绑定,确保用户只能执行他们被授权的操作。这为系统增加了额外的安全层,防止未经授权访问敏感数据。
-
防止 SQL 注入攻击:通过利用 ChatGPT 的自然语言理解能力,我们可以验证和清理用户输入,从而减轻 SQL 注入漏洞的风险。SQLModel 与 Pydantic 的集成帮助我们强制执行严格的数据验证规则。
在确定了我们的技术栈后,让我们开始设置数据库并定义我们的模型。
2.1 数据库模型
为了开始构建我们的原型应用程序,我们将定义基本的数据库表和相应的 SQLModel 定义。对于本教程,我们将重点介绍三个核心表:
-
支出
-
收入
-
客户
这些表将作为我们应用程序的基础,允许我们演示关键功能和交互。
databasemodels.py
# database\models.py
from typing import Optional
from pydantic import BeforeValidator, model_validator
from sqlmodel import SQLModel, Field
from datetime import time, datetime
from typing_extensions import Annotated
def validate_date(v):
if isinstance(v, datetime):
return v
for f in ["%Y-%m-%d", "%Y-%m-%d %H:%M:%S"]:
try:
return datetime.strptime(v, f)
except ValueError:
pass
raise ValueError("Invalid date format")
def numeric_validator(v):
if isinstance(v, int):
return float(v)
elif isinstance(v, float):
return v
raise ValueError("Value must be a number")
DateFormat = Annotated[datetime, BeforeValidator(validate_date)]
Numeric = Annotated[float, BeforeValidator(numeric_validator)]
class Customer(SQLModel, table=True):
id: Optional[int] = Field(primary_key=True, default=None)
company: str
first_name: str
last_name: str
phone: str
address: str
city: str
zip: str
country: str
class Revenue(SQLModel, table=True):
id: Optional[int] = Field(primary_key=True, default=None)
description: str
net_amount: Numeric
gross_amount: Numeric
tax_rate: Numeric
date: DateFormat
class Expense(SQLModel, table=True):
id: Optional[int] = Field(primary_key=True, default=None)
description: str
net_amount: Numeric = Field(description="The net amount of the expense")
gross_amount: Numeric
tax_rate: Numeric
date: DateFormat
DateFormatTimeFormatNumericBeforeValidatorvalidate_datedatetime
2.2 数据库引擎
databasedb.py
# database/db.py
from database.models import *
from sqlmodel import SQLModel, create_engine
import os
# local stored database
DATABASE_URL = "sqlite:///app.db"
engine = create_engine(DATABASE_URL, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
create_db_and_tables()
DATABASE_URLapp.dbcreate_engineengineDATABASE_URLecho=True
create_db_and_tablesSQLModel.metadata.create_all
Tool
3. 工具类
ToolTool
UnionType[SQLModel]modelToolBaseModelSQLModel
exclude_keyslist[str]["id"]idSqlModelid
Toolparse_model
validate_input()exclude_keysid
openai_tool_schemaopenai_tool_schemarequired
from pydantic.v1 import BaseModelfrom pydantic import BaseModelSQLModel
Tool
# tools/base.py
from typing import Type, Callable, Union
from tools.convert import convert_to_openai_tool
from pydantic import BaseModel, ConfigDict
from sqlmodel import SQLModel
class ToolResult(BaseModel):
content: str
success: bool
class Tool(BaseModel):
name: str
model: Union[Type[BaseModel], Type[SQLModel], None]
function: Callable
validate_missing: bool = True
parse_model: bool = False
exclude_keys: list[str] = ["id"]
model_config = ConfigDict(arbitrary_types_allowed=True)
def run(self, **kwargs) -> ToolResult:
if self.validate_missing and model is not None:
missing_values = self.validate_input(**kwargs)
if missing_values:
content = f"Missing values: {', '.join(missing_values)}"
return ToolResult(content=content, success=False)
if self.parse_model:
if hasattr(self.model, "model_validate"):
input_ = self.model.model_validate(kwargs)
else:
input_ = self.model(**kwargs)
result = self.function(input_)
else:
result = self.function(**kwargs)
return ToolResult(content=str(result), success=True)
def validate_input(self, **kwargs):
if not self.validate_missing or not self.model:
return []
model_keys = set(self.model.__annotations__.keys()) - set(self.exclude_keys)
input_keys = set(kwargs.keys())
missing_values = model_keys - input_keys
return list(missing_values)
@property
def openai_tool_schema(self):
schema = convert_to_openai_tool(self.model)
# set function name
schema["function"]["name"] = self.name
# remove required field
if schema["function"]["parameters"].get("required"):
del schema["function"]["parameters"]["required"]
# remove exclude keys
if self.exclude_keys:
for key in self.exclude_keys:
if key in schema["function"]["parameters"]["properties"]:
del schema["function"]["parameters"]["properties"][key]
return schema
Tool
3.1 自定义工具模式转换
Toolconvert_to_openai_toolconvert.py
# tools/convert.py
from langchain_core.utils.function_calling import _rm_titles
from typing import Type, Optional
from langchain_core.utils.json_schema import dereference_refs
from pydantic import BaseModel
def convert_to_openai_tool(
model: Type[BaseModel],
*,
name: Optional[str] = None,
description: Optional[str] = None,
) -> dict:
"""Converts a Pydantic model to a function description for the OpenAI API."""
function = convert_pydantic_to_openai_function(
model, name=name, description=description
)
return {"type": "function", "function": function}
def convert_pydantic_to_openai_function(
model: Type[BaseModel],
*,
name: Optional[str] = None,
description: Optional[str] = None,
rm_titles: bool = True,
) -> dict:
"""Converts a Pydantic model to a function description for the OpenAI API."""
model_schema = model.model_json_schema() if hasattr(model, "model_json_schema") else model.schema()
schema = dereference_refs(model_schema)
schema.pop("definitions", None)
title = schema.pop("title", "")
default_description = schema.pop("description", "")
return {
"name": name or title,
"description": description or default_description,
"parameters": _rm_titles(schema) if rm_titles else schema,
}
Tool
tools/base.pyconvert_to_openai_tool
# tools/base.py
from typing import Type, Callable, Union
from tools.convert import convert_to_openai_tool
from pydantic import BaseModel
from sqlmodel import SQLModel
#...rest of the code ...
Tool
convert.py*_rm_titles**dereference_refs*
通过调整工具模式转换过程,我们确保了我们的应用能够与 SQLModel 和 Pydantic v2 无缝协作,使我们能够在保持与 OpenAI API 兼容的同时,利用这些库的优势。
4. 定义 SQL 工具
在本节中,我们将创建函数和工具,以便使用 SQL 与我们的数据库表进行交互。
4.1 添加数据工具
add_row_to_table
# tools/add.py
from sqlmodel import SQLModel, Session, select
def add_row_to_table(model_instance: SQLModel):
with Session(engine) as session:
session.add(model_instance)
session.commit()
session.refresh(model_instance)
return f"Successfully added {model_instance} to the table"
add_expense_to_table
# tools/add.py
# ...
def add_expense_to_table(**kwargs):
model_instance = Expense.model_validate(kwargs)
return add_row_to_table(model_instance)
add_expense_to_tablemodel_validate()
为了避免为每个表或 SQLModel 编写单独的函数,我们可以动态生成这些函数:
# example usage
def add_entry_to_table(sql_model: Type[SQLModel]):
# return a Callable that takes a SQLModel instance and adds it to the table
return lambda **data: add_row_to_table(model_instance=sql_model.model_validate(data))
add_expense_to_table = add_entry_to_table(Expense)
这种方法产生相同的结果,并且可以用来动态生成所有其他模型的函数。
Tool
add_expense_tool = Tool(
name="add_expense_tool",
description="useful for adding expenses to database",
function=add_entry_to_table(Expense),
model=Expense,
validate_missing=True
)
add_revenue_tool = Tool(
name="add_revenue_tool",
description="useful for adding revenue to database",
function=add_entry_to_table(Revenue),
model=Revenue,
validate_missing=True
)
4.2 查询工具
虽然我们需要为每个表创建一个 add_xxx_tool,因为输入模式不同,但我们只需要一个查询工具来查询所有表。为了消除 SQL 注入的风险,我们将使用 SQLAlchemy 和 SQLModel 提供的 SQL 清理功能。这意味着我们将通过标准的 Python 类和对象来查询数据库,而不是直接解析 SQL 语句。
对于我们想在表上执行的查询,我们需要以下逻辑:
SELECT * FROM table_namecolumnstable_nameWHERE column_name = valuecolumnoperatorvalue
Expense
result = database.execute(
select(Expense).where(Expense.description == "Coffee")
)
将其抽象为一个 pydantic 模型:
# tools/query.py
from typing import Union, Literal
from pydantic import BaseModel
class WhereStatement(BaseModel):
column: str
operator: Literal["eq", "gt", "lt", "gte", "lte", "ne", "ct"]
value: str
class QueryConfig(BaseModel):
table_name: str
columns: list[str]
where: list[Union[WhereStatement, None]]
QueryConfigtable_namecolumnswherewhereWhereStatementWhereStatementLiteral
QueryConfig
# tools/query.py
# ...
from database.models import Expense, Revenue, Customer
TABLES = {
"expense": Expense,
"revenue": Revenue,
"customer": Customer
}
def query_data_function(**kwargs) -> ToolResult:
"""Query the database via natural language."""
query_config = QueryConfig.model_validate(kwargs)
if query_config.table_name not in TABLES:
return ToolResult(content=f"Table name {query_config.table_name} not found in database models", success=False)
sql_model = TABLES[query_config.table_name]
# query_config = validate_query_config(query_config, sql_model)
data = sql_query_from_config(query_config, sql_model)
return ToolResult(content=f"Query results: {data}", success=True)
def sql_query_from_config(
query_config: QueryConfig,
sql_model: Type[SQLModel]):
with Session(engine) as session:
selection = []
for column in query_config.select_columns:
if column not in sql_model.__annotations__:
return f"Column {column} not found in model {sql_model.__name__}"
selection.append(getattr(sql_model, column))
statement = select(*selection)
wheres = query_config.where
if wheres:
for where in wheres:
if where.column not in sql_model.__annotations__: # noqa
return (f"Column {where['column']} not found "
"in model {sql_model.__name__}")
elif where.operator == "eq":
statement = statement.where(
getattr(sql_model, where.column) == where.value)
elif where.operator == "gt":
statement = statement.where(
getattr(sql_model, where.column) > where.value)
elif where.operator == "lt":
statement = statement.where(
getattr(sql_model, where.column) < where.value)
elif where.operator == "gte":
statement = statement.where(
getattr(sql_model, where.column) >= where.value)
elif where.operator == "lte":
statement = statement.where(
getattr(sql_model, where.column) <= where.value)
elif where.operator == "ne":
statement = statement.where(
getattr(sql_model, where.column) != where.value)
elif where.operator == "ct":
statement = statement.where(
getattr(sql_model, where.column).contains(where.value))
result = session.exec(statement)
data = result.all()
try:
data = [repr(d) for d in data]
except:
pass
return data
query_data_functionTABLESsql_query_from_configQueryConfig
QueryConfigtable_namestable_nameQueryConfig
最后,我们可以定义我们的查询工具:
query_data_tool = Tool(
name="query_data_tool",
description = "useful to perform queries on a database table",
model=QueryConfig,
function=query_data_function,
)
有了这些工具,我们的 OpenAIAgent 现在能够使用自然语言命令向数据库表中添加和查询数据。
5. 配置代理
为了使我们之前定义的工具能够成功使用,前一篇文章中的 Agent 需要更多的上下文信息,尤其是用于查询工具时。Agent 的提示需要包括可用表格及其架构信息。由于目前我们只使用两个表格,我们可以在系统提示或用户提示中包含 ORM 架构和表格名称。这两种方式都可以,但我更倾向于在用户提示中包含像这样的变量信息。通过这样做,我们可以创建少量示例,演示基于上下文的工具使用。
为了使我们的 Agent 能够处理系统提示和用户提示中的变量上下文,我们可以按以下方式更新我们的 Agent 类:
import colorama
from colorama import Fore
from openai import OpenAI
from pydantic import BaseModel
from tools.base import Tool, ToolResult
from agents.utils import parse_function_args, run_tool_from_response
class StepResult(BaseModel):
event: str
content: str
success: bool
SYSTEM_MESSAGE = """You are tasked with completing specific objectives and must report the outcomes. At your disposal, you have a variety of tools, each specialized in performing a distinct type of task.
For successful task completion:
Thought: Consider the task at hand and determine which tool is best suited based on its capabilities and the nature of the work. If you can complete the task or answer a question, soley by the information provided you can use the report_tool directly.
Use the report_tool with an instruction detailing the results of your work or to answer a user question.
If you encounter an issue and cannot complete the task:
Use the report_tool to communicate the challenge or reason for the task's incompletion.
You will receive feedback based on the outcomes of each tool's task execution or explanations for any tasks that couldn't be completed. This feedback loop is crucial for addressing and resolving any issues by strategically deploying the available tools.
Return only one tool call at a time.
{context}
"""
class OpenAIAgent:
def __init__(
self,
tools: list[Tool],
client: OpenAI = OpenAI(),
system_message: str = SYSTEM_MESSAGE,
model_name: str = "gpt-3.5-turbo-0125",
max_steps: int = 5,
verbose: bool = True,
examples: list[dict] = None,
context: str = None,
user_context: str = None
):
self.tools = tools
self.client = client
self.model_name = model_name
self.system_message = system_message
self.step_history = []
self.max_steps = max_steps
self.verbose = verbose
self.examples = examples or []
self.context = context or ""
self.user_context = user_context
def to_console(self, tag: str, message: str, color: str = "green"):
if self.verbose:
color_prefix = Fore.__dict__[color.upper()]
print(color_prefix + f"{tag}: {message}{colorama.Style.RESET_ALL}")
def run(self, user_input: str, context: str = None):
openai_tools = [tool.openai_tool_schema for tool in self.tools]
system_message = self.system_message.format(context=context)
if self.user_context:
context = f"{self.user_context}\n{context}" if context else self.user_context
if context:
user_input = f"{context}\n---\n\nUser Message: {user_input}"
self.to_console("START", f"Starting Agent with Input:\n'''{user_input}'''")
self.step_history = [
{"role": "system", "content": system_message},
*self.examples,
{"role": "user", "content": user_input}
]
step_result = None
i = 0
while i < self.max_steps:
step_result = self.run_step(self.step_history, openai_tools)
if step_result.event == "finish":
break
elif step_result.event == "error":
self.to_console(step_result.event, step_result.content, "red")
else:
self.to_console(step_result.event, step_result.content, "yellow")
i += 1
self.to_console("Final Result", step_result.content, "green")
return step_result.content
def run_step(self, messages: list[dict], tools):
# plan the next step
response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
tools=tools
)
# check for multiple tool calls
if response.choices[0].message.tool_calls and len(response.choices[0].message.tool_calls) > 1:
messages = [
*self.step_history,
{"role": "user", "content": "Error: Please return only one tool call at a time."}
]
return self.run_step(messages, tools)
# add message to history
self.step_history.append(response.choices[0].message)
# check if tool call is present
if not response.choices[0].message.tool_calls:
msg = response.choices[0].message.content
step_result = StepResult(event="Error", content=f"No tool calls were returned.\nMessage: {msg}", success=False)
return step_result
tool_name = response.choices[0].message.tool_calls[0].function.name
tool_kwargs = parse_function_args(response)
# execute the tool call
self.to_console("Tool Call", f"Name: {tool_name}\nArgs: {tool_kwargs}", "magenta")
tool_result = run_tool_from_response(response, tools=self.tools)
tool_result_msg = self.tool_call_message(response, tool_result)
self.step_history.append(tool_result_msg)
if tool_name == "report_tool":
try:
step_result = StepResult(
event="finish",
content=tool_result.content,
success=True
)
except:
print(tool_result)
raise ValueError("Report Tool failed to run.")
return step_result
elif tool_result.success:
step_result = StepResult(
event="tool_result",
content=tool_result.content,
success=True)
else:
step_result = StepResult(
event="error",
content=tool_result.content,
success=False
)
return step_result
def tool_call_message(self, response, tool_result: ToolResult):
tool_call = response.choices[0].message.tool_calls[0]
return {
"tool_call_id": tool_call.id,
"role": "tool",
"name": tool_call.function.name,
"content": tool_result.content,
}
相较于我们之前的版本,主要的变化如下:
contextuser_context__init__()contextrun()run()context__init__()examplesrun()
contextuser_context
5.1 向 Agent 提供上下文
user_context
经过几次尝试和错误,以下函数可以完成这个任务:
# utils.py
from typing import Type
import types
import typing
import sqlalchemy
from pydantic import BaseModel
def orm_model_to_string(input_model_cls: Type[BaseModel]):
"""Get the ORM model string from the input model"""
def process_field(key, value):
if key.startswith("__"):
return None
if isinstance(value, typing._GenericAlias):
if value.__origin__ == sqlalchemy.orm.base.Mapped:
return None
if isinstance(value, typing._AnnotatedAlias): # noqa
return key, value.__origin__
elif isinstance(value, typing._UnionGenericAlias) or isinstance(value, types.UnionType):
return key, value.__args__[0]
return key, value
fields = dict(filter(None, (process_field(k, v) for k, v in input_model_cls.__annotations__.items())))
return ", ".join([f"{k} = <{v.__name__}>" for k, v in fields.items()])
def generate_context(*table_models) -> str:
context_str = "You can access the following tables in database:\n"
for table in table_models:
context_str += f" - {table.__name__}: {orm_model_to_string(table)}\n"
return context_str
ExpenseRevenuegenerate_context()
我们希望 Agent 知道当前的日期和星期几,以便我们可以引用正确的日期。因此,接下来我们将在工具类中添加一些日期解析函数:
# utils.py
from datetime import datetime
#... rest of utils.py ...
def weekday_by_date(date: datetime):
days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
return days[date.weekday()]
def date_to_string(date: datetime):
return f"{weekday_by_date(date)} {parse_date(date)}"
def parse_date(date: datetime):
return date.strftime("%Y-%m-%d")
现在让我们为查询代理创建上下文
# utils.py
# ...
def generate_query_context(*table_models) -> str:
today = f"Today is {date_to_string(datetime.now())}"
context_str = "You can access the following tables in database:\n"
for table in table_models:
context_str += f" - {table.__name__}: {orm_model_to_string(table)}\n"
return f"{today}\n{context_str}"
from database.models import Expense, Revenue
print(generate_query_context(Expense, Revenue))
Today is Sunday 2024-04-21
You can access the following tables in database:
- Expense: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
- Revenue: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
5.2 路由 Agent
随着我们添加更多的工具,我们的设置复杂性可能会开始限制像“gpt-3.5-turbo”这样的便宜模型的可用性。在下一篇文章中,我们可能会考虑切换到 Anthropic Claude,因为它们新发布的工具使用 API 功能似乎在同时处理多个工具时很有前景,甚至适用于更便宜的 HAIKU 模型。然而,目前我们将继续使用 OpenAI 的 GPT 模型。
gpt-3.5-turbo
gpt-3.5-turbo
通过引入路由代理,我们可以将问题分解成更小、更易管理的部分。路由代理将负责理解用户的意图,并将查询引导到相关的任务代理。这种方法不仅简化了各个代理的职责,还使系统更加模块化,更易于维护。
此外,分离执行逻辑和复杂性将为未来实现基于角色的访问控制铺平道路。每个任务代理可以被分配特定的权限和访问级别,确保敏感操作仅由授权代理执行。
虽然路由代理在流程中增加了一个额外的步骤,但它最终会导致一个更强大、更具可扩展性的系统。通过优化较小的模型并专注于清晰、简洁的提示,我们可以创建一个坚实的基础,在切换到更强大的模型如 Claude Opus 或 GPT-4 时,系统的性能会更好。
让我们来看看路由代理的实现
# agents/routing.py
from openai import OpenAI
import colorama
from agents.task_agent import TaskAgent
from agents.utils import parse_function_args
SYSTEM_MESSAGE = """You are a helpful assistant.
Role: You are an AI Assistant designed to serve as the primary point of contact for users interacting through a chat interface.
Your primary role is to understand users' requests related to database operations and route these requests to the appropriate tool.
Capabilities:
You have access to a variety of tools designed for Create, Read operations on a set of predefined tables in a database.
Tables:
{table_names}
"""
NOTES = """Important Notes:
Always confirm the completion of the requested operation with the user.
Maintain user privacy and data security throughout the interaction.
If a request is ambiguous or lacks specific details, ask follow-up questions to clarify the user's needs."""
class RoutingAgent:
def __init__(
self,
tools: list[TaskAgent] = None,
client: OpenAI = OpenAI(),
system_message: str = SYSTEM_MESSAGE,
model_name: str = "gpt-3.5-turbo-0125",
max_steps: int = 5,
verbose: bool = True,
prompt_extra: dict = None,
examples: list[dict] = None,
context: str = None
):
self.tools = tools or ROUTING_AGENTS
self.client = client
self.model_name = model_name
self.system_message = system_message
self.memory = []
self.step_history = []
self.max_steps = max_steps
self.verbose = verbose
self.prompt_extra = prompt_extra or PROMPT_EXTRA
self.examples = self.load_examples(examples)
self.context = context or ""
def load_examples(self, examples: list[dict] = None):
examples = examples or []
for agent in self.tools:
examples.extend(agent.routing_example)
return examples
def run(self, user_input: str, employee_id: int = None, **kwargs):
context = create_routing_agent_context(employee_id)
if context:
user_input_with_context = f"{context}\n---\n\nUser Message: {user_input}"
else:
user_input_with_context = user_input
self.to_console("START", f"Starting Task Agent with Input:\n'''{user_input_with_context}'''")
partial_variables = {**self.prompt_extra, "context": context}
system_message = self.system_message.format(**partial_variables)
messages = [
{"role": "system", "content": system_message},
*self.examples,
{"role": "user", "content": user_input}
]
tools = [tool.openai_tool_schema for tool in self.tools]
response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
tools=tools
)
self.step_history.append(response.choices[0].message)
self.to_console("RESPONSE", response.choices[0].message.content, color="blue")
tool_kwargs = parse_function_args(response)
tool_name = response.choices[0].message.tool_calls[0].function.name
self.to_console("Tool Name", tool_name)
self.to_console("Tool Args", tool_kwargs)
agent = self.prepare_agent(tool_name, tool_kwargs)
return agent.run(user_input)
def prepare_agent(self, tool_name, tool_kwargs):
for agent in self.tools:
if agent.name == tool_name:
input_kwargs = agent.arg_model.model_validate(tool_kwargs)
return agent.load_agent(**input_kwargs.dict())
raise ValueError(f"Agent {tool_name} not found")
def to_console(self, tag: str, message: str, color: str = "green"):
if self.verbose:
color_prefix = colorama.Fore.__dict__[color.upper()]
print(color_prefix + f"{tag}: {message}{colorama.Style.RESET_ALL}")
OpenAIAgent
OpenAIAgent
5.3 代理作为工具——任务代理
OpenAIAgentTaskAgent
TaskAgentToolarg_model
from typing import Type, Callable, Optional
from agents.base import OpenAIAgent
from tools.base import Tool
from tools.report_tool import report_tool
from pydantic import BaseModel, ConfigDict, Field
from tools.utils import convert_to_openai_tool
SYSTEM_MESSAGE = """You are tasked with completing specific objectives and must report the outcomes. At your disposal, you have a variety of tools, each specialized in performing a distinct type of task.
For successful task completion:
Thought: Consider the task at hand and determine which tool is best suited based on its capabilities and the nature of the work.
If you can complete the task or answer a question, soley by the information provided you can use the report_tool directly.
Use the report_tool with an instruction detailing the results of your work or to answer a user question.
If you encounter an issue and cannot complete the task:
Use the report_tool to communicate the challenge or reason for the task's incompletion.
You will receive feedback based on the outcomes of each tool's task execution or explanations for any tasks that couldn't be completed. This feedback loop is crucial for addressing and resolving any issues by strategically deploying the available tools.
On error: If information are missing consider if you can deduce or calculate the missing information and repeat the tool call with more arguments.
Use the information provided by the user to deduct the correct tool arguments.
Before using a tool think about the arguments and explain each input argument used in the tool.
Return only one tool call at a time! Explain your thoughts!
{context}
"""
class EmptyArgModel(BaseModel):
pass
class TaskAgent(BaseModel):
name: str
description: str
arg_model: Type[BaseModel] = EmptyArgModel
create_context: Callable = None
create_user_context: Callable = None
tool_loader: Callable = None
system_message: str = SYSTEM_MESSAGE
tools: list[Tool]
examples: list[dict] = None
routing_example: list[dict] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
def load_agent(self, **kwargs) -> OpenAIAgent:
input_kwargs = self.arg_model(**kwargs)
kwargs = input_kwargs.dict()
context = self.create_context(**kwargs) if self.create_context else None
user_context = self.create_user_context(**kwargs) if self.create_user_context else None
if self.tool_loader:
self.tools.extend(self.tool_loader(**kwargs))
if report_tool not in self.tools:
self.tools.append(report_tool)
return OpenAIAgent(
tools=self.tools,
context=context,
user_context=user_context,
system_message=self.system_message,
examples=self.examples,
)
@property
def openai_tool_schema(self):
return convert_to_openai_tool(self.arg_model, name=self.name, description=self.description)
TaskAgentOpenAIAgent
create_contextcreate_user_contexttool_loadersystem_messagetoolsexamplesrouting_example
EmptyArgModelTaskAgentarg_model
作者创建:mermaid
让我们看看它是否都能协同工作!
运行代理
现在,是时候测试我们的路由和子代理是否能够很好地协作。由于我们引入了作为参数的示例,我们可以通过多次测试运行来检查执行中的主要缺陷,并为每个子代理定义示例用法。
让我们先定义我们的子代理:
from database.models import Expense, Revenue, Customer
from agents.task import TaskAgent
from utils import generate_query_context
from tools.base import Tool
from tools.query import query_data_tool
from tools.add import add_entry_to_table
query_task_agent = TaskAgent(
name="query_agent",
description="An agent that can perform queries on multiple data sources",
create_user_context=lambda: generate_query_context(Expense, Revenue, Customer),
tools=[query_data_tool]
)
add_expense_agent = TaskAgent(
name="add_expense_agent",
description="An agent that can add an expense to the database",
create_user_context=lambda: generate_query_context(Expense) + "\nRemarks: The tax rate is 0.19\. The user provide the net amount you need to calculate the gross amount.",
tools=[
Tool(
name="add_expense",
description="Add an expense to the database",
function=add_entry_to_table(Expense),
model=Expense
)
]
)
add_revenue_agent = TaskAgent(
name="add_revenue_agent",
description="An agent that can add a revenue entry to the database",
create_user_context=lambda: generate_query_context(Revenue) + "\nRemarks: The tax rate is 0.19\. The user provide the gross_amount you should use the tax rate to calculate the net_amount.",
tools=[
Tool(
name="add_revenue",
description="Add a revenue entry to the database",
function=add_entry_to_table(Revenue),
model=Revenue
)
]
)
add_customer_agent = TaskAgent(
name="add_customer_agent",
description="An agent that can add a customer to the database",
create_user_context=lambda: generate_query_context(Customer),
tools=[
Tool(
name="add_customer",
description="Add a customer to the database",
function=add_entry_to_table(Customer),
model=Customer
)
]
)
create_user_context
from agents.routing import RoutingAgent
routing_agent = RoutingAgent(
tools=[
query_task_agent,
add_expense_agent,
add_revenue_agent,
add_customer_agent
]
)
routing_agent.run("I have spent 5 € on a office stuff. Last Thursday")
START: Starting Routing Agent with Input:
I have spent 5 € on a office stuff. Last Thursday
Tool Name: add_expense_agent
Tool Args: {}
START: Starting Task Agent with Input:
"""Today is Sunday 2024-04-21
You can access the following tables in database:
- expense: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
Remarks: The tax rate is 0.19\. The user provide the net amount you need to calculate the gross amount.
---
User Message: I have spent 5 € on a office stuff. Last Thursday"""
Tool Call: Name: add_expense
Args: {'description': 'office stuff', 'net_amount': 5, 'tax_rate': 0.19, 'date': '2024-04-18'}
Message: None
error: Missing values: gross_amount
Tool Call: Name: add_expense
Args: {'description': 'office stuff', 'net_amount': 5, 'tax_rate': 0.19, 'date': '2024-04-18', 'gross_amount': 5.95}
Message: None
tool_result: Successfully added net_amount=5.0 id=2 gross_amount=5.95 description='office stuff' date=datetime.datetime(2024, 4, 18, 0, 0) tax_rate=0.19 to the table
Error: No tool calls were returned.
Message: I have successfully added the expense for office stuff with a net amount of 5€, calculated the gross amount, and recorded it in the database.
Tool Call: Name: report_tool
Args: {'report': 'Expense for office stuff with a net amount of 5€ has been successfully added. Gross amount calculated as 5.95€.'}
Message: None
Final Result: Expense for office stuff with a net amount of 5€ has been successfully added. Gross amount calculated as 5.95€.
现在让我们添加一笔收入:
routing_agent.run("Two weeks ago on Saturday we had a revenue of 1000 € in the shop")
START: Starting Routing Agent with Input:
Two weeks ago on Saturday we had a revenue of 1000 € in the shop
Tool Name: add_revenue_agent
Tool Args: {}
START: Starting Task Agent with Input:
"""Today is Sunday 2024-04-21
You can access the following tables in database:
- revenue: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
Remarks: The tax rate is 0.19\. The user provide the gross_amount you should use the tax rate to calculate the net_amount.
---
User Message: Two weeks ago on Saturday we had a revenue of 1000 € in the shop"""
Tool Call: Name: add_revenue
Args: {'description': 'Revenue from the shop', 'gross_amount': 1000, 'tax_rate': 0.19, 'date': '2024-04-06'}
Message: None
error: Missing values: net_amount
Tool Call: Name: add_revenue
Args: {'description': 'Revenue from the shop', 'gross_amount': 1000, 'tax_rate': 0.19, 'date': '2024-04-06', 'net_amount': 840.34}
Message: None
tool_result: Successfully added net_amount=840.34 gross_amount=1000.0 tax_rate=0.19 description='Revenue from the shop' id=1 date=datetime.datetime(2024, 4, 6, 0, 0) to the table
Error: No tool calls were returned.
Message: The revenue entry for the shop on April 6, 2024, with a gross amount of 1000€ has been successfully added to the database. The calculated net amount after applying the tax rate is 840.34€.
Tool Call: Name: report_tool
Args: {'report': 'completed'}
Message: None
Final Result: completed
对于最后的测试,让我们尝试查询从数据库中创建的收入:
routing_agent.run("How much revenue did we made this month?")
START: Starting Routing Agent with Input:
How much revenue did we made this month?
Tool Name: query_agent
Tool Args: {}
START: Starting Agent with Input:
"""Today is Sunday 2024-04-21
You can access the following tables in database:
- expense: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
- revenue: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
- customer: id = <int>, company_name = <str>, first_name = <str>, last_name = <str>, phone = <str>, address = <str>, city = <str>, zip = <str>, country = <str>
---
User Message: How much revenue did we made this month?"""
Tool Call: Name: query_data_tool
Args: {'table_name': 'revenue', 'select_columns': ['gross_amount'], 'where': [{'column': 'date', 'operator': 'gte', 'value': '2024-04-01'}, {'column': 'date', 'operator': 'lte', 'value': '2024-04-30'}]}
Message: None
tool_result: content="Query results: ['1000.0']" success=True
Error: No tool calls were returned.
Message: The revenue made this month is $1000.00.
Tool Call: Name: report_tool
Args: {'report': 'The revenue made this month is $1000.00.'}
Message: None
Final Result: The revenue made this month is $1000.00.
所有工具都按预期工作。路由代理运行得非常完美。对于任务代理,我不得不多次更新提示。
我建议在没有使用像 gpt-4 这样的最新模型时,为每个任务代理添加一些示例工具调用。一般来说,我建议通过示例和更直观的设计来解决缺陷,而不是使用提示工程。重复出现的缺陷是设计不够直观的信号。例如,当代理在计算毛额或净额时遇到困难,只需添加一个‘calculate_gross_amount_tool’或‘calculate_net_amount_tool’。另一方面,GPT-4 会毫不犹豫地处理这种用例。
结论
在本文中,我们在创建一个全面的基于聊天的界面以管理小型企业的过程中迈出了重要的一步,利用大型语言模型(Large Language Models)。
通过设置我们的数据库模式、定义核心功能和构建项目仓库,我们为应用程序的开发奠定了坚实的基础。
我们从使用 SQLModel 设计数据库模型开始,这使我们能够无缝地与 Pydantic 和 SQLAlchemy 集成。该方法确保了高效的数据验证和数据库操作,同时最大限度地减少了 SQL 注入攻击的风险。
Tool
我们配置了 OpenAIAgent,以通过更新代理类来处理系统提示和用户提示中的可变上下文,从而提供上下文感知的工具使用。这使得我们的代理能够理解可用的表及其模式,从而实现更准确和高效的工具使用。虽然我们已经取得了显著进展,但仍有很多内容需要探索和实现。
为了进一步增强我们的聊天机器人,我们引入了 TaskAgent 类,它具有类似 Tool 类的功能。TaskAgent 允许我们为每个代理定义名称、描述和输入模型,自动化初始化过程。
最后,我们通过为查询数据、添加支出、增加收入定义子代理来测试我们的路由和子代理。我们展示了代理如何处理税率并自动计算净额或毛额,展示了我们子代理的推理能力。
下一步
在本系列的下一部分,我们将重点通过添加对更多工具的支持,并可能将 Claude 测试作为新的默认语言模型,来增强我们代理的能力。我们还将探索将我们的应用程序与流行的通讯平台(如 WhatsApp)集成,使其更加易于访问和用户友好。
随着我们不断完善和扩展应用程序,可能性是无穷无尽的。通过利用大语言模型的力量并创建直观的基于聊天的界面,我们可以彻底改变小型企业管理数据和简化操作的方式。敬请期待本系列的下一部分!
源代码
github.com/elokus/ArticleDemo2