As etapas a seguir mostram como criar um modelo personalizado para instanciar agentes que podem ser implantados na Agent Platform:
- Exemplo básico
- Opcional: respostas de stream
- Opcional: registrar métodos personalizados
- Opcional: Fornecer anotações de tipo
- Opcional: enviar traces para o Cloud Trace
- Opcional: trabalhar com variáveis de ambiente
- Opcional: integrar ao Secret Manager
- Opcional: processar credenciais
- Opcional: processar erros
Exemplo básico
Para dar um exemplo básico, a classe Python a seguir é um modelo para instanciar agentes que podem ser implantados na Agent Platform (é possível atribuir um valor como MyAgent à variável CLASS_NAME):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerações sobre implantação
Ao escrever a classe Python, os três métodos a seguir são importantes:
__init__():- Use esse método apenas para parâmetros de configuração do agente. Por exemplo, é possível usar esse método para coletar os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos usuários. Também é possível usar esse método para coletar parâmetros como o ID do projeto, a região, as credenciais de aplicativo e as chaves de API.
- O construtor retorna um objeto que precisa ser "serializável" para que possa ser implantado no Agent Runtime. Portanto, inicialize clientes de serviço e estabeleça conexões com bancos de dados no método
.set_upno lugar do método__init__. - Esse método é opcional. Se não for especificado, o Agent Runtime usará o construtor padrão do Python para a classe.
set_up():- Use esse método para definir a lógica de inicialização do agente. Por exemplo, esse método é usado para estabelecer conexões com bancos de dados ou serviços dependentes, importar pacotes dependentes ou pré-computar dados usados para atender a consultas.
- Esse método é opcional. Se não for especificado, o Agent Runtime vai presumir que o agente não precisa chamar um método
.set_upantes de atender às consultas do usuário.
query()/stream_query():- Use
query()para retornar a resposta completa como um único resultado. - Use
stream_query()para retornar a resposta em partes à medida que ela fica disponível, permitindo uma experiência de streaming. O métodostream_queryprecisa retornar um objeto iterável (por exemplo, um gerador) para ativar o streaming. - É possível implementar os dois métodos se você quiser oferecer suporte a interações de resposta única e de streaming com o agente.
- Forneça a esse método uma docstring clara que defina o que ele faz, documente os atributos e forneça anotações de tipo para as entradas.
Evite argumentos variáveis nos métodos
queryestream_query.
- Use
Instanciar o agente localmente
É possível criar uma instância local do agente usando o código a seguir:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método query
Para testar o agente, envie consultas para a instância local:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
A resposta é um dicionário semelhante a este:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Consultar de forma assíncrona
Para responder a consultas de forma assíncrona, defina um método (como async_query)
que retorne uma corrotina Python. Como exemplo, o modelo a seguir estende o exemplo básico para responder de forma assíncrona e pode ser implantado na Agent Platform:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método async_query
É possível testar o agente localmente chamando o método async_query. Veja um exemplo:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
A resposta é um dicionário semelhante a este:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respostas de streaming
Para transmitir respostas a consultas, defina um método chamado stream_query que gere respostas. Como exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas e pode ser implantado na Agent Platform:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Confira alguns pontos importantes ao usar a API de streaming:
- Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o agente exigir tempos de processamento mais longos, divida a tarefa em partes menores.
- Modelos e cadeias de streaming: a interface Runnable do LangChain oferece suporte a streaming, assim, é possível transmitir respostas não apenas de agentes, mas também de modelos e cadeias.
- Compatibilidade com o LangChain: no momento, não há suporte para métodos assíncronos, como o método
do LangChain
astream_event. - Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor pode processá-los), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar estouros de buffer e garantir uma experiência de streaming tranquila.
Testar o método stream_query
É possível testar a consulta de streaming localmente chamando o método stream_query e iterando os resultados. Veja um exemplo:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Esse código imprime cada parte da resposta à medida que ela é gerada. A saída pode ser semelhante a esta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Neste exemplo, cada parte contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e a saída final.
Respostas de streaming de forma assíncrona
Para transmitir respostas de forma assíncrona, defina um método (como
async_stream_query) que retorne um gerador
assíncrono. Como exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas de forma assíncrona e pode ser implantado na Agent Platform:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método async_stream_query
Semelhante ao código para testar consultas de streaming, é possível
testar o agente localmente chamando o método async_stream_query e iterando
os resultados. Veja um exemplo:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Esse código imprime cada parte da resposta à medida que ela é gerada. A saída pode ser semelhante a esta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Registrar métodos personalizados
Por padrão, os métodos query e stream_query são registrados como operações
no agente implantado. É possível substituir o comportamento padrão e definir o conjunto de operações a serem registradas usando o método register_operations.
As operações podem ser registradas como modos de execução padrão (representados por uma string vazia
"") ou de streaming ("stream") de execução.
Para registrar várias operações, defina um método chamado register_operations que liste os métodos a serem disponibilizados aos usuários quando o agente for implantado. No exemplo de código a seguir, o register_operations
método resulta no agente implantado que registra query e get_state como
operações executadas de forma síncrona, e stream_query e get_state_history como
operações que transmitem as respostas:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
É possível testar os métodos personalizados chamando-os diretamente na instância local
do agente, de maneira semelhante a como você testaria os query e
stream_query métodos.
Fornecer anotações de tipo
É possível usar anotações de tipo para especificar os tipos de entrada e saída esperados dos métodos do agente. Quando o agente é implantado, apenas os tipos serializáveis em JSON são aceitos na entrada e saída das operações com suporte do agente. Os esquemas das entradas e saídas podem ser anotados usando modelos TypedDict ou Pydantic.
No exemplo a seguir, anotamos a entrada como um TypedDict e convertemos a saída bruta de .get_state (que é um NamedTuple) em um dicionário serializável usando o método ._asdict():
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Enviar traces para o Cloud Trace
Para enviar traces para o Cloud Trace com bibliotecas de instrumentação que oferecem suporte ao OpenTelemetry, importe e inicialize-as no método .set_up. Para
frameworks de agentes comuns, é possível usar a integração do OpenTelemetry Google Cloud em combinação com um framework de instrumentação
, como OpenInference ou
OpenLLMetry.
Como exemplo, o modelo a seguir é uma modificação do exemplo básico para exportar traces para o Cloud Trace:
OpenInference
Primeiro, instale o pacote necessário
usando pip executando
pip install openinference-instrumentation-langchain==0.1.34Em seguida, importe e inicialize o instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Primeiro, instale o pacote necessário
usando pip executando
pip install opentelemetry-instrumentation-langchain==0.38.10Em seguida, importe e inicialize o instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Trabalhar com variáveis de ambiente
Para definir variáveis de ambiente, verifique se elas estão disponíveis em
os.environ durante o desenvolvimento e siga as instruções em Definir
variáveis de ambiente
ao implantar o agente.
Integrar ao Secret Manager
Para integrar ao Secret Manager:
Instale a biblioteca de cliente executando
pip install google-cloud-secret-managerSiga as instruções em Conceder papéis para um agente implantado para conceder à conta de serviço o papel "Acessador de secrets do Secret Manager" (
roles/secretmanager.secretAccessor) pelo Google Cloud console.Importe e inicialize o cliente no método
.set_upe receba o secret correspondente quando necessário. Como exemplo, o modelo a seguir é uma modificação do exemplo básico para usar uma chave de API paraChatAnthropicque foi armazenada no Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Processar credenciais
Quando o agente é implantado, ele pode precisar processar diferentes tipos de credenciais:
- Credenciais padrão do aplicativo (ADC, na sigla em inglês) que geralmente surgem de contas de serviço,
- OAuth que geralmente surgem de contas de usuário e
- Provedores de identidade para credenciais de contas externas (federação de identidade da carga de trabalho).
Application Default Credentials
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Elas podem ser usadas no código da seguinte maneira:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Para mais detalhes, acesse Como o Application Default Credentials funciona.
OAuth
As credenciais do usuário são normalmente recebidas usando o OAuth 2.0.
Se você tiver um token de acesso (como de
oauthlib), crie uma instância
google.oauth2.credentials.Credentials. Além disso, se você receber um token de atualização, também poderá especificar o token de atualização e o URI do token para permitir que as credenciais sejam atualizadas automaticamente:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Aqui, o TOKEN_URI, CLIENT_ID e
CLIENT_SECRET são baseados em Criar uma credencial de cliente OAuth.
Se você não tiver um token de acesso, use google_auth_oauthlib.flow para
executar o fluxo de concessão de autorização do OAuth 2.0 para
receber uma instância google.oauth2.credentials.Credentials correspondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Para mais detalhes, acesse a documentação do google_auth_oauthlib.flow
módulo.
Provedor de identidade
Se você quiser autenticar usuários por e-mail/senha, número de telefone, provedores sociais, como Google, Facebook ou GitHub, ou um mecanismo de autenticação personalizado, use o Identity Platform ou o Firebase Authentication, ou qualquer provedor de identidade que ofereça suporte ao OpenID Connect (OIDC).
Para mais detalhes, acesse Como acessar recursos de um provedor de identidade OIDC.
Tratamento de erros
Para garantir que os erros da API sejam retornados em um formato JSON estruturado, recomendamos implementar o tratamento de erros no código do agente usando um bloco try...except, que pode ser abstraído em um decorador.
Embora a Agent Platform possa processar vários códigos de status internamente, o Python não tem uma maneira padronizada de representar erros com códigos de status HTTP associados em todos os tipos de exceção. Tentar mapear todas as exceções possíveis do Python para status HTTP no serviço subjacente seria complexo e difícil de manter.
Uma abordagem mais escalonável é capturar explicitamente exceções relevantes nos métodos do agente ou usando um decorador reutilizável, como error_wrapper. Em seguida, é possível
associar códigos de status apropriados (por exemplo, adicionando code e
error atributos a exceções personalizadas ou processando exceções padrão
especificamente) e formatar o erro como um dicionário JSON para o valor de retorno.
Isso exige uma mudança mínima de código nos próprios métodos do agente, geralmente apenas a adição do decorador.
Confira a seguir um exemplo de como implementar o tratamento de erros no agente:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
O código anterior resulta na seguinte saída:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
A seguir
Implantar agentes
Aprenda as cinco maneiras de implantar um agente no Agent Platform Runtime com base nas suas necessidades de desenvolvimento.
Avaliar seus agentes
Crie e implante um agente básico e use o serviço de avaliação de IA generativa para avaliar o agente.
Resolver problemas de criação de agentes
Aprenda a resolver erros comuns ao criar agentes personalizados.