为 AutoGPT 代理服务器做贡献:创建和测试区块¶
本指南将以 WikipediaSummaryBlock 为例,引导您完成为 AutoGPT 代理服务器创建和测试新区块的过程。
理解区块与测试¶
区块是可重用的组件,可以连接起来形成表示代理行为的图。每个区块都有输入、输出和一个特定功能。适当的测试对于确保区块正确且一致地工作至关重要。
创建和测试新区块¶
按照以下步骤创建和测试新区块
-
在
autogpt_platform/backend/backend/blocks
目录中为您的区块创建一个新的 Python 文件。使用描述性名称并采用 snake_case 命名约定。例如:get_wikipedia_summary.py
。 -
导入必要的模块并创建一个继承自
Block
的类。确保包含您的区块所需的所有必要导入。每个区块应包含以下内容
from backend.data.block import Block, BlockSchema, BlockOutput
维基百科摘要区块示例
from backend.data.block import Block, BlockSchema, BlockOutput from backend.utils.get_request import GetRequest import requests class WikipediaSummaryBlock(Block, GetRequest): # Block implementation will go here
-
使用
BlockSchema
定义输入和输出模式。这些模式指定了区块期望接收(输入)和生成(输出)的数据结构。 -
输入模式定义了区块将处理的数据结构。模式中的每个字段都代表一个必需的输入数据。
-
输出模式定义了区块处理后将返回的数据结构。模式中的每个字段都代表一个输出数据。
示例
class Input(BlockSchema): topic: str # The topic to get the Wikipedia summary for class Output(BlockSchema): summary: str # The summary of the topic from Wikipedia error: str # Any error message if the request fails, error field needs to be named `error`.
-
实现
__init__
方法,包括测试数据和模拟重要提示
为每个新区块的
id
使用 UUID 生成器(例如 https://www.uuidgenerator.net/),并且不要自己随意创建。或者,您可以运行此 Python 代码生成 UUID:print(__import__('uuid').uuid4())
def __init__(self): super().__init__( # Unique ID for the block, used across users for templates # If you are an AI leave it as is or change to "generate-proper-uuid" id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", input_schema=WikipediaSummaryBlock.Input, # Assign input schema output_schema=WikipediaSummaryBlock.Output, # Assign output schema # Provide sample input, output and test mock for testing the block test_input={"topic": "Artificial Intelligence"}, test_output=("summary", "summary content"), test_mock={"get_request": lambda url, json: {"extract": "summary content"}}, )
-
id
:区块的唯一标识符。 -
input_schema
和output_schema
:定义输入和输出数据的结构。
让我们分解一下测试组件
-
test_input
:这是一个用于测试区块的示例输入。它应根据您的输入模式是一个有效输入。 -
test_output
:这是使用test_input
运行区块时期望的输出。它应与您的输出模式匹配。对于非确定性输出或当您只想断言类型时,可以使用 Python 类型代替特定值。在此示例中,("summary", str)
断言输出键为 "summary" 且其值为字符串。 -
test_mock
:这对于进行网络调用的区块至关重要。它提供了一个模拟函数,用于在测试期间替换实际的网络调用。
在此情况下,我们模拟了
get_request
方法,使其始终返回一个带有 'extract' 键的字典,模拟成功的 API 响应。这使我们能够在不进行实际网络请求的情况下测试区块的逻辑,因为实际网络请求可能会慢、不可靠或受到速率限制。 -
-
实现带有错误处理的
run
方法。 这应该包含区块的主要逻辑
def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
topic = input_data.topic
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{topic}"
response = self.get_request(url, json=True)
yield "summary", response['extract']
except requests.exceptions.HTTPError as http_err:
raise RuntimeError(f"HTTP error occurred: {http_err}")
- Try 块:包含获取和处理维基百科摘要的主要逻辑。
- API 请求:向维基百科 API 发送 GET 请求。
- 错误处理:处理在 API 请求和数据处理过程中可能发生的各种异常。我们不需要捕获所有异常,只需要捕获我们期望并能处理的异常。未捕获的异常将自动在输出中作为
error
产生。任何引发异常(或产生error
输出)的区块都将被标记为失败。优先于产生error
抛出异常,因为它会立即停止执行。 - 产生 (Yield):使用
yield
来输出结果。优先一次输出一个结果对象。如果您调用的函数返回一个列表,您可以单独产生列表中的每个项。您也可以产生整个列表,但请两者都做,而不是只产生列表。例如:如果您正在编写一个输出电子邮件的区块,您会以单独的结果对象产生每封电子邮件,但也可以将整个列表作为一个额外的单一结果对象产生。产生名为error
的输出将立即中断执行并将区块执行标记为失败。 - kwargs:
kwargs
参数用于向区块传递附加参数。在上面的示例中未使用它,但区块可以使用它。您也可以在 run 方法中将 args 作为内联签名,例如def run(self, input_data: Input, *, user_id: str, **kwargs) -> BlockOutput:
。可用的 kwargs 有user_id
:运行区块的用户 ID。graph_id
:正在执行区块的代理 ID。这对于代理的每个版本都是相同的graph_exec_id
:代理执行的 ID。代理每次有新的“运行”时,此 ID 都会更改node_exec_id
:节点执行的 ID。节点每次执行时,此 ID 都会更改node_id
:正在执行的节点 ID。图的每个版本此 ID 都会更改,但节点每次执行时不会更改。
字段类型¶
oneOf 字段¶
oneOf
允许您指定一个字段必须是几个可能选项中的一个。当您希望区块接受互斥的不同类型输入时,这很有用。
示例
attachment: Union[Media, DeepLink, Poll, Place, Quote] = SchemaField(
discriminator='discriminator',
description="Attach either media, deep link, poll, place or quote - only one can be used"
)
discriminator
参数告诉 AutoGPT 在输入中查看哪个字段来确定其类型。
在每个模型中,您需要定义 discriminator 值
class Media(BaseModel):
discriminator: Literal['media']
media_ids: List[str]
class DeepLink(BaseModel):
discriminator: Literal['deep_link']
direct_message_deep_link: str
OptionalOneOf 字段¶
OptionalOneOf
类似于 oneOf
,但允许字段是可选的 (None)。这意味着该字段可以是指定的类型之一,也可以是 None。
示例
attachment: Union[Media, DeepLink, Poll, Place, Quote] | None = SchemaField(
discriminator='discriminator',
description="Optional attachment - can be media, deep link, poll, place, quote or None"
)
关键区别在于 | None
,它使整个字段成为可选。
带有身份验证的区块¶
我们的系统支持 API 密钥和 OAuth2 授权流程的认证卸载。添加带有 API 密钥认证的区块非常简单,添加我们已经支持 OAuth2 的服务的区块也一样简单。
实现区块本身相对简单。除了上述说明之外,您还需要向 Input
模型和 run
方法添加一个 credentials
参数
from backend.data.model import (
APIKeyCredentials,
OAuth2Credentials,
Credentials,
)
from backend.data.block import Block, BlockOutput, BlockSchema
from backend.data.model import CredentialsField
from backend.integrations.providers import ProviderName
# API Key auth:
class BlockWithAPIKeyAuth(Block):
class Input(BlockSchema):
# Note that the type hint below is require or you will get a type error.
# The first argument is the provider name, the second is the credential type.
credentials: CredentialsMetaInput[
Literal[ProviderName.GITHUB], Literal["api_key"]
] = CredentialsField(
description="The GitHub integration can be used with "
"any API key with sufficient permissions for the blocks it is used on.",
)
# ...
def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs,
) -> BlockOutput:
...
# OAuth:
class BlockWithOAuth(Block):
class Input(BlockSchema):
# Note that the type hint below is require or you will get a type error.
# The first argument is the provider name, the second is the credential type.
credentials: CredentialsMetaInput[
Literal[ProviderName.GITHUB], Literal["oauth2"]
] = CredentialsField(
required_scopes={"repo"},
description="The GitHub integration can be used with OAuth.",
)
# ...
def run(
self,
input_data: Input,
*,
credentials: OAuth2Credentials,
**kwargs,
) -> BlockOutput:
...
# API Key auth + OAuth:
class BlockWithAPIKeyAndOAuth(Block):
class Input(BlockSchema):
# Note that the type hint below is require or you will get a type error.
# The first argument is the provider name, the second is the credential type.
credentials: CredentialsMetaInput[
Literal[ProviderName.GITHUB], Literal["api_key", "oauth2"]
] = CredentialsField(
required_scopes={"repo"},
description="The GitHub integration can be used with OAuth, "
"or any API key with sufficient permissions for the blocks it is used on.",
)
# ...
def run(
self,
input_data: Input,
*,
credentials: Credentials,
**kwargs,
) -> BlockOutput:
...
凭据将由后端执行器自动注入。
APIKeyCredentials
和 OAuth2Credentials
模型定义在此处。例如,要在 API 请求中使用它们,您可以直接访问令牌
# credentials: APIKeyCredentials
response = requests.post(
url,
headers={
"Authorization": f"Bearer {credentials.api_key.get_secret_value()})",
},
)
# credentials: OAuth2Credentials
response = requests.post(
url,
headers={
"Authorization": f"Bearer {credentials.access_token.get_secret_value()})",
},
)
或使用快捷方式 credentials.auth_header()
# credentials: APIKeyCredentials | OAuth2Credentials
response = requests.post(
url,
headers={"Authorization": credentials.auth_header()},
)
ProviderName
枚举是我们系统中存在哪些提供者的唯一真相来源。自然地,要为一个新提供者添加一个经过身份验证的区块,您也必须将其添加到此处。
ProviderName
定义
class ProviderName(str, Enum):
ANTHROPIC = "anthropic"
APOLLO = "apollo"
COMPASS = "compass"
DISCORD = "discord"
D_ID = "d_id"
E2B = "e2b"
EXA = "exa"
FAL = "fal"
GENERIC_WEBHOOK = "generic_webhook"
GITHUB = "github"
GOOGLE = "google"
GOOGLE_MAPS = "google_maps"
GROQ = "groq"
HUBSPOT = "hubspot"
IDEOGRAM = "ideogram"
JINA = "jina"
LINEAR = "linear"
LLAMA_API = "llama_api"
MEDIUM = "medium"
MEM0 = "mem0"
NOTION = "notion"
NVIDIA = "nvidia"
OLLAMA = "ollama"
OPENAI = "openai"
OPENWEATHERMAP = "openweathermap"
OPEN_ROUTER = "open_router"
PINECONE = "pinecone"
REDDIT = "reddit"
REPLICATE = "replicate"
REVID = "revid"
SCREENSHOTONE = "screenshotone"
SLANT3D = "slant3d"
SMARTLEAD = "smartlead"
SMTP = "smtp"
TWITTER = "twitter"
TODOIST = "todoist"
UNREAL_SPEECH = "unreal_speech"
ZEROBOUNCE = "zerobounce"
多个凭据输入¶
支持多个凭据输入,但需满足以下条件: - 每个凭据输入字段的名称必须以 _credentials
结尾。 - 凭据输入字段的名称必须与区块 run(..)
方法上相应参数的名称匹配。 - 如果需要多个凭据参数,test_credentials
是一个 dict[str, Credentials]
,其中每个必需的凭据输入的参数名称作为键,合适的测试凭据作为值。
添加 OAuth2 服务集成¶
要为一个新的 OAuth2 认证服务添加支持,您需要添加一个 OAuthHandler
。我们所有现有的处理程序和基类都可以在此处找到。
每个处理程序必须实现 [BaseOAuthHandler
] 接口的以下部分
PROVIDER_NAME: ClassVar[ProviderName]
DEFAULT_SCOPES: ClassVar[list[str]] = []
def __init__(self, client_id: str, client_secret: str, redirect_uri: str): ...
def get_login_url(
self, scopes: list[str], state: str, code_challenge: Optional[str]
) -> str:
def exchange_code_for_tokens(
self, code: str, scopes: list[str], code_verifier: Optional[str]
) -> OAuth2Credentials:
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
如您所见,这是根据标准的 OAuth2 流程建模的。
除了实现 OAuthHandler
本身之外,向系统中添加处理程序还需要另外两件事
- 将处理程序类添加到
integrations/oauth/__init__.py
下的HANDLERS_BY_NAME
中
HANDLERS_BY_NAME: dict["ProviderName", type["BaseOAuthHandler"]] = {
handler.PROVIDER_NAME: handler
for handler in [
GitHubOAuthHandler,
GoogleOAuthHandler,
NotionOAuthHandler,
TwitterOAuthHandler,
LinearOAuthHandler,
TodoistOAuthHandler,
]
}
- 将
{provider}_client_id
和{provider}_client_secret
添加到util/settings.py
下应用程序的Secrets
中
github_client_id: str = Field(default="", description="GitHub OAuth client ID")
github_client_secret: str = Field(
default="", description="GitHub OAuth client secret"
)
添加到前端¶
您需要在 frontend/src/components/integrations/credentials-input.tsx
中的 CredentialsInput
组件中添加提供者(api 或 oauth)。
export const providerIcons: Record<
CredentialsProviderName,
React.FC<{ className?: string }>
> = {
anthropic: fallbackIcon,
apollo: fallbackIcon,
e2b: fallbackIcon,
github: FaGithub,
google: FaGoogle,
groq: fallbackIcon,
notion: NotionLogoIcon,
nvidia: fallbackIcon,
discord: FaDiscord,
d_id: fallbackIcon,
google_maps: FaGoogle,
jina: fallbackIcon,
ideogram: fallbackIcon,
linear: fallbackIcon,
medium: FaMedium,
mem0: fallbackIcon,
ollama: fallbackIcon,
openai: fallbackIcon,
openweathermap: fallbackIcon,
open_router: fallbackIcon,
llama_api: fallbackIcon,
pinecone: fallbackIcon,
slant3d: fallbackIcon,
screenshotone: fallbackIcon,
smtp: fallbackIcon,
replicate: fallbackIcon,
reddit: fallbackIcon,
fal: fallbackIcon,
revid: fallbackIcon,
twitter: FaTwitter,
unreal_speech: fallbackIcon,
exa: fallbackIcon,
hubspot: FaHubspot,
smartlead: fallbackIcon,
todoist: fallbackIcon,
zerobounce: fallbackIcon,
};
您还需要将提供者添加到 frontend/src/components/integrations/credentials-provider.tsx
中的 CredentialsProvider
组件中。
const providerDisplayNames: Record<CredentialsProviderName, string> = {
anthropic: "Anthropic",
apollo: "Apollo",
discord: "Discord",
d_id: "D-ID",
e2b: "E2B",
exa: "Exa",
fal: "FAL",
github: "GitHub",
google: "Google",
google_maps: "Google Maps",
groq: "Groq",
hubspot: "Hubspot",
ideogram: "Ideogram",
jina: "Jina",
linear: "Linear",
medium: "Medium",
mem0: "Mem0",
notion: "Notion",
nvidia: "Nvidia",
ollama: "Ollama",
openai: "OpenAI",
openweathermap: "OpenWeatherMap",
open_router: "Open Router",
llama_api: "Llama API",
pinecone: "Pinecone",
screenshotone: "ScreenshotOne",
slant3d: "Slant3D",
smartlead: "SmartLead",
smtp: "SMTP",
reddit: "Reddit",
replicate: "Replicate",
revid: "Rev.ID",
twitter: "Twitter",
todoist: "Todoist",
unreal_speech: "Unreal Speech",
zerobounce: "ZeroBounce",
} as const;
最后,您需要将提供者添加到 frontend/src/lib/autogpt-server-api/types.ts
中的 CredentialsType
枚举中。
export const PROVIDER_NAMES = {
ANTHROPIC: "anthropic",
APOLLO: "apollo",
D_ID: "d_id",
DISCORD: "discord",
E2B: "e2b",
EXA: "exa",
FAL: "fal",
GITHUB: "github",
GOOGLE: "google",
GOOGLE_MAPS: "google_maps",
GROQ: "groq",
HUBSPOT: "hubspot",
IDEOGRAM: "ideogram",
JINA: "jina",
LINEAR: "linear",
MEDIUM: "medium",
MEM0: "mem0",
NOTION: "notion",
NVIDIA: "nvidia",
OLLAMA: "ollama",
OPENAI: "openai",
OPENWEATHERMAP: "openweathermap",
OPEN_ROUTER: "open_router",
LLAMA_API: "llama_api",
PINECONE: "pinecone",
SCREENSHOTONE: "screenshotone",
SLANT3D: "slant3d",
SMARTLEAD: "smartlead",
SMTP: "smtp",
TWITTER: "twitter",
REPLICATE: "replicate",
REDDIT: "reddit",
REVID: "revid",
UNREAL_SPEECH: "unreal_speech",
TODOIST: "todoist",
ZEROBOUNCE: "zerobounce",
} as const;
示例:GitHub 集成¶
- 支持 API 密钥 + OAuth2 的 GitHub 区块:
blocks/github
class GithubCommentBlock(Block):
class Input(BlockSchema):
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
issue_url: str = SchemaField(
description="URL of the GitHub issue or pull request",
placeholder="https://github.com/owner/repo/issues/1",
)
comment: str = SchemaField(
description="Comment to post on the issue or pull request",
placeholder="Enter your comment",
)
class Output(BlockSchema):
id: int = SchemaField(description="ID of the created comment")
url: str = SchemaField(description="URL to the comment on GitHub")
error: str = SchemaField(
description="Error message if the comment posting failed"
)
def __init__(self):
super().__init__(
id="a8db4d8d-db1c-4a25-a1b0-416a8c33602b",
description="This block posts a comment on a specified GitHub issue or pull request.",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=GithubCommentBlock.Input,
output_schema=GithubCommentBlock.Output,
test_input=[
{
"issue_url": "https://github.com/owner/repo/issues/1",
"comment": "This is a test comment.",
"credentials": TEST_CREDENTIALS_INPUT,
},
{
"issue_url": "https://github.com/owner/repo/pull/1",
"comment": "This is a test comment.",
"credentials": TEST_CREDENTIALS_INPUT,
},
],
test_credentials=TEST_CREDENTIALS,
test_output=[
("id", 1337),
("url", "https://github.com/owner/repo/issues/1#issuecomment-1337"),
("id", 1337),
(
"url",
"https://github.com/owner/repo/issues/1#issuecomment-1337",
),
],
test_mock={
"post_comment": lambda *args, **kwargs: (
1337,
"https://github.com/owner/repo/issues/1#issuecomment-1337",
)
},
)
@staticmethod
def post_comment(
credentials: GithubCredentials, issue_url: str, body_text: str
) -> tuple[int, str]:
api = get_api(credentials)
data = {"body": body_text}
if "pull" in issue_url:
issue_url = issue_url.replace("pull", "issues")
comments_url = issue_url + "/comments"
response = api.post(comments_url, json=data)
comment = response.json()
return comment["id"], comment["html_url"]
def run(
self,
input_data: Input,
*,
credentials: GithubCredentials,
**kwargs,
) -> BlockOutput:
id, url = self.post_comment(
credentials,
input_data.issue_url,
input_data.comment,
)
yield "id", id
yield "url", url
- GitHub OAuth2 处理程序:
integrations/oauth/github.py
class GitHubOAuthHandler(BaseOAuthHandler):
"""
Based on the documentation at:
- [Authorizing OAuth apps - GitHub Docs](https://githubdocs.cn/en/apps/oauth-apps/building-oauth-apps/authorizing-oauth-apps)
- [Refreshing user access tokens - GitHub Docs](https://githubdocs.cn/en/apps/creating-github-apps/authenticating-with-a-github-app/refreshing-user-access-tokens)
Notes:
- By default, token expiration is disabled on GitHub Apps. This means the access
token doesn't expire and no refresh token is returned by the authorization flow.
- When token expiration gets enabled, any existing tokens will remain non-expiring.
- When token expiration gets disabled, token refreshes will return a non-expiring
access token *with no refresh token*.
""" # noqa
PROVIDER_NAME = ProviderName.GITHUB
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.auth_base_url = "https://github.com/login/oauth/authorize"
self.token_url = "https://github.com/login/oauth/access_token"
self.revoke_url = "https://api.github.com/applications/{client_id}/token"
def get_login_url(
self, scopes: list[str], state: str, code_challenge: Optional[str]
) -> str:
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": " ".join(scopes),
"state": state,
}
return f"{self.auth_base_url}?{urlencode(params)}"
def exchange_code_for_tokens(
self, code: str, scopes: list[str], code_verifier: Optional[str]
) -> OAuth2Credentials:
return self._request_tokens({"code": code, "redirect_uri": self.redirect_uri})
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
if not credentials.access_token:
raise ValueError("No access token to revoke")
headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
requests.delete(
url=self.revoke_url.format(client_id=self.client_id),
auth=(self.client_id, self.client_secret),
headers=headers,
json={"access_token": credentials.access_token.get_secret_value()},
)
return True
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
if not credentials.refresh_token:
return credentials
return self._request_tokens(
{
"refresh_token": credentials.refresh_token.get_secret_value(),
"grant_type": "refresh_token",
}
)
def _request_tokens(
self,
params: dict[str, str],
current_credentials: Optional[OAuth2Credentials] = None,
) -> OAuth2Credentials:
request_body = {
"client_id": self.client_id,
"client_secret": self.client_secret,
**params,
}
headers = {"Accept": "application/json"}
response = requests.post(self.token_url, data=request_body, headers=headers)
token_data: dict = response.json()
username = self._request_username(token_data["access_token"])
now = int(time.time())
new_credentials = OAuth2Credentials(
provider=self.PROVIDER_NAME,
title=current_credentials.title if current_credentials else None,
username=username,
access_token=token_data["access_token"],
# Token refresh responses have an empty `scope` property (see docs),
# so we have to get the scope from the existing credentials object.
scopes=(
token_data.get("scope", "").split(",")
or (current_credentials.scopes if current_credentials else [])
),
# Refresh token and expiration intervals are only given if token expiration
# is enabled in the GitHub App's settings.
refresh_token=token_data.get("refresh_token"),
access_token_expires_at=(
now + expires_in
if (expires_in := token_data.get("expires_in", None))
else None
),
refresh_token_expires_at=(
now + expires_in
if (expires_in := token_data.get("refresh_token_expires_in", None))
else None
),
)
if current_credentials:
new_credentials.id = current_credentials.id
return new_credentials
def _request_username(self, access_token: str) -> str | None:
url = "https://api.github.com/user"
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {access_token}",
"X-GitHub-Api-Version": "2022-11-28",
}
response = requests.get(url, headers=headers)
if not response.ok:
return None
# Get the login (username)
return response.json().get("login")
示例:Google 集成¶
- Google OAuth2 处理程序:
integrations/oauth/google.py
class GoogleOAuthHandler(BaseOAuthHandler):
"""
Based on the documentation at https://developers.google.com/identity/protocols/oauth2/web-server
""" # noqa
PROVIDER_NAME = ProviderName.GOOGLE
EMAIL_ENDPOINT = "https://www.googleapis.com/oauth2/v2/userinfo"
DEFAULT_SCOPES = [
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
"openid",
]
您可以看到 Google 定义了一个 DEFAULT_SCOPES
变量,这用于设置无论用户请求什么都需要的范围 (scopes)。
secrets = Secrets()
GOOGLE_OAUTH_IS_CONFIGURED = bool(
secrets.google_client_id and secrets.google_client_secret
)
您还可以看到 GOOGLE_OAUTH_IS_CONFIGURED
用于在 OAuth 未配置时禁用需要 OAuth 的区块。这位于每个区块的 __init__
方法中。这是因为 Google 区块没有 API 密钥回退,所以我们需要确保在允许用户使用这些区块之前,OAuth 已经配置好。
Webhook 触发的区块¶
Webhook 触发的区块允许您的代理实时响应外部事件。这些区块由来自第三方服务的传入 Webhook 触发,而不是手动执行。
创建和运行 Webhook 触发的区块涉及三个主要组件
- 区块本身,它指定了
- 供用户选择资源和要订阅事件的输入
- 一个
credentials
输入,包含管理 webhooks 所需的范围 (scopes) - 将 webhook 有效载荷转换为 webhook 区块输出的逻辑
- 相应 webhook 服务提供者的
WebhooksManager
,它处理- 向提供者注册/注销 webhooks
- 解析和验证传入的 webhook 有效载荷
- 相应服务提供者的凭据系统,其中可能包含一个
OAuthHandler
底层还有更多正在进行的工作,例如存储和检索 webhooks 及其与节点的链接,但要添加一个 webhook 触发的区块,您应该不需要修改系统的这些部分。
创建 Webhook 触发的区块¶
要创建一个 webhook 触发的区块,请在基本区块创建过程之外遵循以下附加步骤
-
在您的区块的
__init__
方法中定义webhook_config
。示例:
GitHubPullRequestTriggerBlock
backend/blocks/github/triggers.pywebhook_config=BlockWebhookConfig( provider=ProviderName.GITHUB, webhook_type=GithubWebhookType.REPO, resource_format="{repo}", event_filter_input="events", event_format="pull_request.{event}", ),
BlockWebhookConfig
定义backend/data/block.pyclass BlockManualWebhookConfig(BaseModel): """ Configuration model for webhook-triggered blocks on which the user has to manually set up the webhook at the provider. """ provider: ProviderName """The service provider that the webhook connects to""" webhook_type: str """ Identifier for the webhook type. E.g. GitHub has repo and organization level hooks. Only for use in the corresponding `WebhooksManager`. """ event_filter_input: str = "" """ Name of the block's event filter input. Leave empty if the corresponding webhook doesn't have distinct event/payload types. """ event_format: str = "{event}" """ Template string for the event(s) that a block instance subscribes to. Applied individually to each event selected in the event filter input. Example: `"pull_request.{event}"` -> `"pull_request.opened"` """ class BlockWebhookConfig(BlockManualWebhookConfig): """ Configuration model for webhook-triggered blocks for which the webhook can be automatically set up through the provider's API. """ resource_format: str """ Template string for the resource that a block instance subscribes to. Fields will be filled from the block's inputs (except `payload`). Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented) Only for use in the corresponding `WebhooksManager`. """
-
在您的区块的输入模式中定义事件过滤器输入。这允许用户选择哪些特定类型的事件将在其代理中触发该区块。
示例:
GitHubPullRequestTriggerBlock
backend/blocks/github/triggers.pyclass Input(GitHubTriggerBase.Input): class EventsFilter(BaseModel): """ https://githubdocs.cn/en/webhooks/webhook-events-and-payloads#pull_request """ opened: bool = False edited: bool = False closed: bool = False reopened: bool = False synchronize: bool = False assigned: bool = False unassigned: bool = False labeled: bool = False unlabeled: bool = False converted_to_draft: bool = False locked: bool = False unlocked: bool = False enqueued: bool = False dequeued: bool = False milestoned: bool = False demilestoned: bool = False ready_for_review: bool = False review_requested: bool = False review_request_removed: bool = False auto_merge_enabled: bool = False auto_merge_disabled: bool = False events: EventsFilter = SchemaField( title="Events", description="The events to subscribe to" )
- 输入字段的名称(在本例中为
events
)必须与webhook_config.event_filter_input
匹配。 - 事件过滤器本身必须是一个只有布尔字段的 Pydantic 模型。
- 输入字段的名称(在本例中为
-
在您的区块的输入模式中包含 payload 字段。
示例:
GitHubTriggerBase
backend/blocks/github/triggers.pypayload: dict = SchemaField(hidden=True, default_factory=dict)
-
在您的区块的输入模式中定义
credentials
输入。- 其范围 (scopes) 必须足以通过提供者的 API 管理用户的 webhooks
- 有关更多详细信息,请参阅带有身份验证的区块
-
在您的区块的
run
方法中处理 webhook 有效载荷并输出其相关部分。示例:
GitHubPullRequestTriggerBlock
def run(self, input_data: Input, **kwargs) -> BlockOutput: yield "payload", input_data.payload yield "sender", input_data.payload["sender"] yield "event", input_data.payload["action"] yield "number", input_data.payload["number"] yield "pull_request", input_data.payload["pull_request"]
请注意,如果凭据在区块运行时未使用(如示例中所示),可以省略
credentials
参数。
添加 Webhooks Manager¶
要为一个新的 webhook 提供者添加支持,您需要创建一个实现 BaseWebhooksManager
接口的 WebhooksManager
PROVIDER_NAME: ClassVar[ProviderName]
@abstractmethod
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: WT,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Registers a new webhook with the provider.
Params:
credentials: The credentials with which to create the webhook
webhook_type: The provider-specific webhook type to create
resource: The resource to receive events for
events: The events to subscribe to
ingress_url: The ingress URL for webhook payloads
secret: Secret used to verify webhook payloads
Returns:
str: Webhook ID assigned by the provider
config: Provider-specific configuration for the webhook
"""
...
@classmethod
@abstractmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
"""
Validates an incoming webhook request and returns its payload and type.
Params:
webhook: Object representing the configured webhook and its properties in our system.
request: Incoming FastAPI `Request`
Returns:
dict: The validated payload
str: The event type associated with the payload
"""
@abstractmethod
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None: ...
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
"""
Triggers a ping to the given webhook.
Raises:
NotImplementedError: if the provider doesn't support pinging
"""
并将您的 WebhooksManager
类引用添加到 load_webhook_managers
中
def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]:
if _WEBHOOK_MANAGERS:
return _WEBHOOK_MANAGERS
from .compass import CompassWebhookManager
from .generic import GenericWebhooksManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
_WEBHOOK_MANAGERS.update(
{
handler.PROVIDER_NAME: handler
for handler in [
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
GenericWebhooksManager,
]
}
)
return _WEBHOOK_MANAGERS
示例:GitHub Webhook 集成¶
GitHub Webhook 触发器:blocks/github/triggers.py
class GitHubTriggerBase:
class Input(BlockSchema):
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
repo: str = SchemaField(
description=(
"Repository to subscribe to.\n\n"
"**Note:** Make sure your GitHub credentials have permissions "
"to create webhooks on this repo."
),
placeholder="{owner}/{repo}",
)
payload: dict = SchemaField(hidden=True, default_factory=dict)
class Output(BlockSchema):
payload: dict = SchemaField(
description="The complete webhook payload that was received from GitHub. "
"Includes information about the affected resource (e.g. pull request), "
"the event, and the user who triggered the event."
)
triggered_by_user: dict = SchemaField(
description="Object representing the GitHub user who triggered the event"
)
error: str = SchemaField(
description="Error message if the payload could not be processed"
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "payload", input_data.payload
yield "triggered_by_user", input_data.payload["sender"]
class GithubPullRequestTriggerBlock(GitHubTriggerBase, Block):
EXAMPLE_PAYLOAD_FILE = (
Path(__file__).parent / "example_payloads" / "pull_request.synchronize.json"
)
class Input(GitHubTriggerBase.Input):
class EventsFilter(BaseModel):
"""
https://githubdocs.cn/en/webhooks/webhook-events-and-payloads#pull_request
"""
opened: bool = False
edited: bool = False
closed: bool = False
reopened: bool = False
synchronize: bool = False
assigned: bool = False
unassigned: bool = False
labeled: bool = False
unlabeled: bool = False
converted_to_draft: bool = False
locked: bool = False
unlocked: bool = False
enqueued: bool = False
dequeued: bool = False
milestoned: bool = False
demilestoned: bool = False
ready_for_review: bool = False
review_requested: bool = False
review_request_removed: bool = False
auto_merge_enabled: bool = False
auto_merge_disabled: bool = False
events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
class Output(GitHubTriggerBase.Output):
event: str = SchemaField(
description="The PR event that triggered the webhook (e.g. 'opened')"
)
number: int = SchemaField(description="The number of the affected pull request")
pull_request: dict = SchemaField(
description="Object representing the affected pull request"
)
pull_request_url: str = SchemaField(
description="The URL of the affected pull request"
)
def __init__(self):
from backend.integrations.webhooks.github import GithubWebhookType
example_payload = json.loads(
self.EXAMPLE_PAYLOAD_FILE.read_text(encoding="utf-8")
)
super().__init__(
id="6c60ec01-8128-419e-988f-96a063ee2fea",
description="This block triggers on pull request events and outputs the event type and payload.",
categories={BlockCategory.DEVELOPER_TOOLS, BlockCategory.INPUT},
input_schema=GithubPullRequestTriggerBlock.Input,
output_schema=GithubPullRequestTriggerBlock.Output,
webhook_config=BlockWebhookConfig(
provider=ProviderName.GITHUB,
webhook_type=GithubWebhookType.REPO,
resource_format="{repo}",
event_filter_input="events",
event_format="pull_request.{event}",
),
test_input={
"repo": "Significant-Gravitas/AutoGPT",
"events": {"opened": True, "synchronize": True},
"credentials": TEST_CREDENTIALS_INPUT,
"payload": example_payload,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", example_payload),
("triggered_by_user", example_payload["sender"]),
("event", example_payload["action"]),
("number", example_payload["number"]),
("pull_request", example_payload["pull_request"]),
("pull_request_url", example_payload["pull_request"]["html_url"]),
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput: # type: ignore
yield from super().run(input_data, **kwargs)
yield "event", input_data.payload["action"]
yield "number", input_data.payload["number"]
yield "pull_request", input_data.payload["pull_request"]
yield "pull_request_url", input_data.payload["pull_request"]["html_url"]
GitHub Webhooks Manager:integrations/webhooks/github.py
class GithubWebhookType(StrEnum):
REPO = "repo"
class GithubWebhooksManager(BaseWebhooksManager):
PROVIDER_NAME = ProviderName.GITHUB
WebhookType = GithubWebhookType
GITHUB_API_URL = "https://api.github.com"
GITHUB_API_DEFAULT_HEADERS = {"Accept": "application/vnd.github.v3+json"}
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
if not (event_type := request.headers.get("X-GitHub-Event")):
raise HTTPException(
status_code=400, detail="X-GitHub-Event header is missing!"
)
if not (signature_header := request.headers.get("X-Hub-Signature-256")):
raise HTTPException(
status_code=403, detail="X-Hub-Signature-256 header is missing!"
)
payload_body = await request.body()
hash_object = hmac.new(
webhook.secret.encode("utf-8"), msg=payload_body, digestmod=hashlib.sha256
)
expected_signature = "sha256=" + hash_object.hexdigest()
if not hmac.compare_digest(expected_signature, signature_header):
raise HTTPException(
status_code=403, detail="Request signatures didn't match!"
)
payload = await request.json()
if action := payload.get("action"):
event_type += f".{action}"
return payload, event_type
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
if not credentials:
raise ValueError("Credentials are required but were not passed")
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": credentials.auth_header(),
}
repo, github_hook_id = webhook.resource, webhook.provider_webhook_id
ping_url = f"{self.GITHUB_API_URL}/repos/{repo}/hooks/{github_hook_id}/pings"
response = requests.post(ping_url, headers=headers)
if response.status_code != 204:
error_msg = extract_github_error_msg(response)
raise ValueError(f"Failed to ping GitHub webhook: {error_msg}")
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: GithubWebhookType,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
if webhook_type == self.WebhookType.REPO and resource.count("/") > 1:
raise ValueError("Invalid repo format: expected 'owner/repo'")
# Extract main event, e.g. `pull_request.opened` -> `pull_request`
github_events = list({event.split(".")[0] for event in events})
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": credentials.auth_header(),
}
webhook_data = {
"name": "web",
"active": True,
"events": github_events,
"config": {
"url": ingress_url,
"content_type": "json",
"insecure_ssl": "0",
"secret": secret,
},
}
response = requests.post(
f"{self.GITHUB_API_URL}/repos/{resource}/hooks",
headers=headers,
json=webhook_data,
)
if response.status_code != 201:
error_msg = extract_github_error_msg(response)
if "not found" in error_msg.lower():
error_msg = (
f"{error_msg} "
"(Make sure the GitHub account or API key has 'repo' or "
f"webhook create permissions to '{resource}')"
)
raise ValueError(f"Failed to create GitHub webhook: {error_msg}")
webhook_id = response.json()["id"]
config = response.json()["config"]
return str(webhook_id), config
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
webhook_type = self.WebhookType(webhook.webhook_type)
if webhook.credentials_id != credentials.id:
raise ValueError(
f"Webhook #{webhook.id} does not belong to credentials {credentials.id}"
)
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": credentials.auth_header(),
}
if webhook_type == self.WebhookType.REPO:
repo = webhook.resource
delete_url = f"{self.GITHUB_API_URL}/repos/{repo}/hooks/{webhook.provider_webhook_id}" # noqa
else:
raise NotImplementedError(
f"Unsupported webhook type '{webhook.webhook_type}'"
)
response = requests.delete(delete_url, headers=headers)
if response.status_code not in [204, 404]:
# 204 means successful deletion, 404 means the webhook was already deleted
error_msg = extract_github_error_msg(response)
raise ValueError(f"Failed to delete GitHub webhook: {error_msg}")
# If we reach here, the webhook was successfully deleted or didn't exist
要记住的要点¶
- 唯一 ID:在 init 方法中为您的区块指定一个唯一的 ID。
- 输入和输出模式:定义清晰的输入和输出模式。
- 错误处理:在
run
方法中实现错误处理。 - 输出结果:在
run
方法中使用yield
输出结果。 - 测试:在 init 方法中提供测试输入和输出,以便进行自动测试。
理解测试过程¶
区块的测试由 test_block.py
处理,它执行以下操作
- 它使用提供的
test_input
调用该区块。如果区块有credentials
字段,也会传入test_credentials
。 - 如果提供了
test_mock
,它会暂时用模拟函数替换指定的方法。 - 然后它断言输出与
test_output
匹配。
对于 WikipediaSummaryBlock
- 测试将以主题“人工智能”调用该区块。
- 它将使用模拟函数,而不是进行真实的 API 调用,该模拟函数返回
{"extract": "summary content"}
。 - 然后它会检查输出键是否为“summary”及其值是否为字符串。
这种方法允许我们全面测试区块的逻辑,而无需依赖外部服务,同时也适应非确定性输出。
SSRF 防护安全最佳实践¶
创建处理外部 URL 输入或进行网络请求的区块时,使用平台内置的 SSRF 防护机制至关重要。backend.util.request
模块提供了一个安全的 Requests
封装器类,应将其用于所有 HTTP 请求。
使用安全请求封装器¶
from backend.util.request import requests
class MyNetworkBlock(Block):
def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
# The requests wrapper automatically validates URLs and blocks dangerous requests
response = requests.get(input_data.url)
yield "result", response.text
except ValueError as e:
# URL validation failed
raise RuntimeError(f"Invalid URL provided: {e}")
except requests.exceptions.RequestException as e:
# Request failed
raise RuntimeError(f"Request failed: {e}")
Requests
封装器提供以下安全特性
-
URL 验证:
- 阻止对私有 IP 范围 (RFC 1918) 的请求
- 验证 URL 格式和协议
- 解析 DNS 并检查 IP 地址
- 支持可信源白名单
-
安全默认设置:
- 默认禁用重定向
- 对非 200 状态码抛出异常
- 支持自定义头部和验证器
-
受保护的 IP 范围:该封装器拒绝访问这些网络的请求
backend/util/request.py# IPv4 Ranges ipaddress.ip_network("0.0.0.0/8"), # "This" Network ipaddress.ip_network("10.0.0.0/8"), # Private-Use ipaddress.ip_network("127.0.0.0/8"), # Loopback ipaddress.ip_network("169.254.0.0/16"), # Link Local ipaddress.ip_network("172.16.0.0/12"), # Private-Use ipaddress.ip_network("192.168.0.0/16"), # Private-Use ipaddress.ip_network("224.0.0.0/4"), # Multicast ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use # IPv6 Ranges ipaddress.ip_network("::1/128"), # Loopback ipaddress.ip_network("fc00::/7"), # Unique local addresses (ULA) ipaddress.ip_network("fe80::/10"), # Link-local ipaddress.ip_network("ff00::/8"), # Multicast
自定义请求配置¶
如果您需要自定义请求行为
from backend.util.request import Requests
# Create a custom requests instance with specific trusted origins
custom_requests = Requests(
trusted_origins=["api.trusted-service.com"],
raise_for_status=True,
extra_headers={"User-Agent": "MyBlock/1.0"}
)
有效区块测试技巧¶
-
提供真实的 test_input:确保您的测试输入涵盖典型用例。
-
定义适当的 test_output:
- 对于确定性输出,使用特定的期望值。
- 对于非确定性输出或仅类型重要的输出,使用 Python 类型(例如,
str
、int
、dict
)。 - 您可以混合使用特定值和类型,例如,
("key1", str), ("key2", 42)
。
-
对网络调用使用 test_mock:这可以防止测试因网络问题或 API 更改而失败。
-
考虑对没有外部依赖的区块省略 test_mock:如果您的区块不进行网络调用或使用外部资源,您可能不需要模拟。
-
考虑边缘情况:在您的
run
方法中包含针对潜在错误条件的测试。 -
在更改区块行为时更新测试:如果您修改区块,请确保相应地更新测试。
通过遵循这些步骤,您可以创建新的区块来扩展 AutoGPT 代理服务器的功能。
我们希望看到的区块¶
以下是我们希望在 AutoGPT 代理服务器中实现的区块列表。如果您有兴趣贡献,请随意选择其中一个区块或选择您自己的区块。
如果您想实现其中一个区块,请提交拉取请求,我们将开始审查过程。
消费者服务/平台¶
- Google Sheets -
读取/追加 - 电子邮件 - 使用
Gmail、Outlook、Yahoo、Proton 等读取/发送 - 日历 - 使用 Google Calendar、Outlook Calendar 等读取/写入
- Home Assistant - 调用服务、获取状态
- 达美乐 - 订购披萨、追踪订单
- 优步 - 预订乘车、追踪乘车
- Notion - 创建/读取页面、创建/追加/读取数据库
- Google Drive - 读取/写入/覆盖文件/文件夹
社交媒体¶
- Twitter - 发布、回复、获取回复、获取评论、获取关注者、获取正在关注的用户、获取推文、获取提及
- Instagram - 发布、回复、获取评论、获取关注者、获取正在关注的用户、获取帖子、获取提及、获取热门帖子
- TikTok - 发布、回复、获取评论、获取关注者、获取正在关注的用户、获取视频、获取提及、获取热门视频
- LinkedIn - 发布、回复、获取评论、获取关注者、获取正在关注的用户、获取帖子、获取提及、获取热门帖子
- YouTube - 转录视频/短视频、发布视频/短视频、读取/回复/回应评论、更新缩略图、更新描述、更新标签、更新标题、获取观看次数、获取点赞、获取点踩、获取订阅者、获取评论、获取分享、获取观看时长、获取收入、获取热门视频、获取置顶视频、获取置顶频道
- Reddit - 发布、回复、获取评论、获取关注者、获取正在关注的用户、获取帖子、获取提及、获取热门帖子
- Treatwell(及相关平台)- 预订、取消、评论、获取推荐
- Substack - 读取/订阅/取消订阅、发布/回复、获取推荐
- Discord - 读取/发布/回复、审核操作
- GoodReads - 读取/发布/回复、获取推荐
电子商务¶
- Airbnb - 预订、取消、评论、获取推荐
- 亚马逊 - 订购、追踪订单、退货、评论、获取推荐
- eBay - 订购、追踪订单、退货、评论、获取推荐
- Upwork - 发布招聘、雇佣自由职业者、评论自由职业者、解雇自由职业者
商业工具¶
- 外部代理 - 调用类似于 AutoGPT 的其他代理
- Trello - 创建/读取/更新/删除卡片、列表、看板
- Jira - 创建/读取/更新/删除问题、项目、看板
- Linear - 创建/读取/更新/删除问题、项目、看板
- Excel - 读取/写入/更新/删除行、列、工作表
- Slack - 读取/发布/回复消息、创建频道、邀请用户
- ERPNext - 创建/读取/更新/删除发票、订单、客户、产品
- Salesforce - 创建/读取/更新/删除潜在客户、商机、账户
- HubSpot - 创建/读取/更新/删除联系人、交易、公司
- Zendesk - 创建/读取/更新/删除工单、用户、组织
- Odoo - 创建/读取/更新/删除销售订单、发票、客户
- Shopify - 创建/读取/更新/删除产品、订单、客户
- WooCommerce - 创建/读取/更新/删除产品、订单、客户
- Squarespace - 创建/读取/更新/删除页面、产品、订单
我们希望看到的代理模板¶
数据/信息¶
- 通过 Apple News 或其他大型媒体(BBC、TechCrunch、hackernews 等)总结今日、本周、本月头条新闻
- 创建、阅读和总结 substack 通讯或任何通讯(博客写手 vs 博客读者)
- 获取/阅读/总结今日、本周、本月最热门的 Twitter、Instagram、TikTok(一般社交媒体账户)
- 获取/阅读任何提及 AI 代理的 LinkedIn 帖子或个人资料
- 阅读/总结 Discord(可能无法做到,因为您需要访问权限)
- 从 GoodReads 或亚马逊图书等获取/阅读给定月份、年份等阅读量最高的图书
- 获取所有流媒体服务中特定节目的日期
- 建议/推荐/获取所有流媒体平台中给定月份、年份等观看量最高的节目
- xlsx 数据集的数据分析
- 通过 Excel 或 Google Sheets 收集数据 > 随机采样数据(采样区块获取顶部 X 个、底部 X 个、随机等)> 将其传递给 LLM 区块生成用于分析完整数据的脚本 > Python 区块运行脚本 > 出错时通过 LLM Fix 区块循环返回 > 创建图表/可视化(可能在代码块中?)> 将图像显示为输出(这可能需要前端更改才能显示)
- TikTok 视频搜索和下载
营销¶
- 作品集网站设计和增强