この記事では以下の内容を学びます:
- – 会話内容をデータベースに永続化する本番環境対応AIエージェントの構築方法
- インテリジェントなデータ抽出とエンティティ追跡の実装方法
- 自動回復機能を備えた堅牢なエラー処理の構築方法
- Bright DataのリアルタイムWebデータでエージェントを強化する方法
それでは始めましょう!
ステートレスAI会話の課題
現在のAIエージェントは通常、ステートレスシステムとして動作します。各会話を独立したイベントとして扱うため、履歴コンテキストが欠如し、ユーザーは情報を繰り返し伝える必要があります。その結果、業務効率の低下やユーザーの不満を招きます。さらに、企業は長期データを活用したパーソナライゼーションやサービス改善の機会を逃しています。
データ永続型AIは、すべての対話を構造化されたデータベースに記録することでこの問題を解決します。継続的な記録を保持することで、これらのシステムは過去の文脈を記憶し、特定のエンティティを時間軸で追跡し、過去の対話パターンを活用して一貫性とパーソナライズされたユーザー体験を提供できます。
構築対象:データベース連携型AIエージェントシステム
LangChainとGPT-4を用いてメッセージを処理する、本番環境対応のAIエージェントを構築します。各会話をPostgreSQLに保存し、エンティティとインサイトをリアルタイムで抽出します。セッションを跨いだ完全な会話履歴を保持し、自動再試行システムでエラーを管理します。ログ記録によるモニタリング機能を提供します。
本システムが対応する事項:
- 適切なリレーションシップとインデックスを備えたデータベーススキーマ
- カスタムデータベースツールを備えたLangChainエージェント
- 自動会話永続化とエンティティ抽出
- データ収集のためのバックグラウンド処理パイプライン
- トランザクション管理を伴うエラー処理
- 履歴データを取得するためのクエリインターフェース
- ウェブインテリジェンスのためのBright DataとのRAG統合
前提条件
開発環境のセットアップ:
- Python 3.10 以上。最新の非同期機能と型ヒントに必須
- PostgreSQL 14+またはSQLite 3.35+。データ永続化用データベース
- OpenAI APIキー。GPT-4アクセス用。OpenAI Platformから取得

- LangChain。AIエージェント構築用フレームワーク。ドキュメント参照
- Python仮想環境。依存関係を分離します。
venvドキュメントを参照
環境設定
プロジェクトディレクトリを作成し、依存関係をインストールします:
mkdir database-agent
cd database-agent
python -m venv venv
# macOS/Linux: source venv/bin/activate
# Windows: venv\Scripts\activate
pip install langchain langchain-openai sqlalchemy psycopg2-binary python-dotenv pydantic
agent.pyという新しいファイルを作成し、以下のインポートを追加:
import os
import json
import logging
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from queue import Queue
from threading import Thread
# SQLAlchemy インポート
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, JSON, ForeignKey, text
from sqlalchemy.orm import sessionmaker, relationship, Session, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError
# LangChain インポート
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.schema import HumanMessage, AIMessage, SystemMessage
# RAG インポート
from langchain_community.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
import requests
# 環境設定
from dotenv import load_dotenv
load_dotenv()
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
認証情報を記載した.envファイルを作成:
# データベース設定
DATABASE_URL="postgresql://username:password@localhost:5432/agent_db"
# SQLiteの場合: DATABASE_URL="sqlite:///./agent_data.db"
# APIキー
OPENAI_API_KEY="your-openai-api-key"
# オプション: Bright Data (ステップ7用)
BRIGHT_DATA_API_KEY="your-bright-data-api-key"
# アプリケーション設定
AGENT_MODEL="gpt-4-turbo-preview"
CONNECTION_POOL_SIZE=5
MAX_RETRIES=3
必要なもの:
- データベースURL: PostgreSQLまたはSQLiteの接続文字列
- OpenAI APIキー: GPT-4によるエージェント知能化用
- Bright Data APIキー: オプション(ステップ7のリアルタイムWebデータ取得用)

データベース接続型AIエージェントの構築
ステップ1: データベーススキーマの設計
ユーザー、会話、メッセージ、抽出エンティティ用のテーブルを設計します。スキーマは外部キーとリレーションシップを使用してデータ整合性を維持します。
Base = declarative_base()
class User(Base):
"""ユーザープロファイルテーブル - ユーザー情報と設定を保存"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
user_id = Column(String(255), unique=True, nullable=False, index=True)
name = Column(String(255))
email = Column(String(255))
preferences = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
last_active = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# リレーションシップ
conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(user_id='{self.user_id}', name='{self.name}')>"
class Conversation(Base):
"""会話セッションテーブル - 個々の会話セッションを追跡します。"""
__tablename__ = 'conversations'
id = Column(Integer, primary_key=True)
conversation_id = Column(String(255), unique=True, nullable=False, index=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
title = Column(String(500))
summary = Column(Text)
status = Column(String(50), default='active') # active, archived, deleted
meta_data = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# リレーションシップ
user = relationship("User", back_populates="conversations")
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
entities = relationship("Entity", back_populates="conversation", cascade="all, delete-orphan")
def __repr__(self):
return f"<Conversation(id='{self.conversation_id}', user='{self.user_id}')>"
class Message(Base):
"""個々のメッセージテーブル - 会話内の各メッセージを格納"""
__tablename__ = 'messages'
id = Column(Integer, primary_key=True)
conversation_id = Column(Integer, ForeignKey('conversations.id'), nullable=False, index=True)
role = Column(String(50), nullable=False) # user, assistant, system
content = Column(Text, nullable=False)
tokens = Column(Integer)
model = Column(String(100))
meta_data = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
conversation = relationship("Conversation", back_populates="messages")
def __repr__(self):
return f"<Message(role='{self.role}', conversation='{self.conversation_id}')>"
class Entity(Base):
"""会話から抽出された固有表現を格納するテーブル"""
__tablename__ = 'entities'
id = Column(Integer, primary_key=True)
conversation_id = Column(Integer, ForeignKey('conversations.id'), nullable=False, index=True)
entity_type = Column(String(100), nullable=False, index=True) # 人物、組織、場所など
entity_value = Column(String(500), nullable=False)
context = Column(Text)
confidence = Column(Float, default=0.0)
meta_data = Column(JSON, default={})
extracted_at = Column(DateTime, default=datetime.utcnow)
# リレーションシップ
conversation = relationship("Conversation", back_populates="entities")
def __repr__(self):
return f"<Entity(type='{self.entity_type}', value='{self.entity_value}')>"
class AgentLog(Base):
"""エージェント操作ログテーブル - 監視用の操作ログを保存します。"""
__tablename__ = 'agent_logs'
id = Column(Integer, primary_key=True)
conversation_id = Column(String(255), index=True)
level = Column(String(50), nullable=False) # INFO, WARNING, ERROR
operation = Column(String(255), nullable=False)
message = Column(Text, nullable=False)
error_details = Column(JSON)
execution_time = Column(Float) # 秒単位
created_at = Column(DateTime, default=datetime.utcnow)
def __repr__(self):
return f"<AgentLog(level='{self.level}', operation='{self.operation}')>"
スキーマは5つのコアテーブルを定義する。Userは柔軟なデータのためのJSON設定付きプロファイルを保存する。Conversationはステータス追跡付きセッションを追跡する。Messageはユーザーとアシスタントのメッセージを区別する役割指標付き個々のやり取りを保持する。Entityは信頼度スコア付き抽出情報を捕捉する。AgentLogは監視のための操作追跡を提供する。外部キーは参照整合性を維持する。頻繁にクエリされるフィールドのインデックスはパフォーマンスを最適化する。cascade="all, delete-orphan"設定は親レコード削除時に関連レコードをクリーンアップする。
ステップ2: データベース接続層の設定
SQLAlchemyでデータベース接続マネージャーを設定します。マネージャーは接続プーリング、ヘルスチェック、信頼性向上のための自動再試行ロジックを処理します。
class DatabaseManager:
"""
データベース接続と操作を管理します。
機能:
- 効率的なリソース使用のための接続プール
- データベース接続性を検証するヘルスチェック
- 自動テーブル作成
"""
def __init__(self, database_url: str, pool_size: int = 5, max_retries: int = 3):
"""
データベースマネージャーを初期化します。
引数:
database_url: データベース接続文字列 (例: 'sqlite:///./agent_data.db')
pool_size: プール内で維持する接続数
max_retries: 失敗した操作に対する最大再試行回数
"""
self.database_url = database_url
self.max_retries = max_retries
# 接続プール機能付きエンジンを作成
self.engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=10,
pool_pre_ping=True, # 使用前に接続を検証
echo=False # SQLデバッグ時はTrueに設定
)
# セッションファクトリを作成
self.SessionLocal = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False
)
logger.info(f"✓ {pool_size}接続プールでデータベースエンジンを作成")
def initialize_database(self):
"""データベース内の全テーブルを作成"""
try:
Base.metadata.create_all(bind=self.engine)
logger.info("✓ データベーステーブル作成成功")
except Exception as e:
logger.error(f"❌ データベーステーブル作成失敗: {e}")
raise
def get_session(self) -> Session:
"""操作実行用の新規データベースセッションを取得します。"""
return self.SessionLocal()
def health_check(self) -> bool:
"""
データベース接続性を確認します。
戻り値:
bool: データベースが正常な場合True、それ以外の場合False
"""
try:
with self.engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("✓ データベース健全性チェックに合格")
return True
except Exception as e:
logger.error(f"❌ データベース健全性チェックに失敗: {e}")
return False
DatabaseManagerはSQLAlchemyの接続プールを使用して接続を確立します。pool_size=5を設定することで、効率化のため5つの永続接続を維持します。pool_pre_pingオプションは使用前に接続を検証します。これにより古い接続エラーを防ぎます。リトライ機構は指数関数的バックオフで最大3回まで失敗した操作を再試行します。一時的なネットワーク問題を処理します。
ステップ3: LangChainエージェントコアの構築
LangChain を使用して、データベースとやり取りするカスタムツールを備えた AI エージェントを作成します。エージェントは関数呼び出しを使用して情報を保存し、会話履歴を取得します。
class DataPersistentAgent:
"""
データベース永続化機能を備えたAIエージェント。
このエージェントは:
- セッションを跨いだ会話を記憶
- ユーザー情報の保存と取得
- 重要なエンティティの抽出と保存
- 履歴に基づくパーソナライズされた応答を提供
"""
def __init__(
self,
db_manager: DatabaseManager,
model_name: str = "gpt-4-turbo-preview",
temperature: float = 0.7
):
"""
データ永続化エージェントを初期化します。
引数:
db_manager: データベース管理インスタンス
model_name: 使用するLLMモデル (デフォルト: gpt-4-turbo-preview)
temperature: 応答生成時のモデル温度
"""
self.db_manager = db_manager
self.model_name = model_name
# LLMの初期化
self.llm = ChatOpenAI(
model=model_name,
temperature=temperature,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
# エージェント用ツールの作成
self.tools = self._create_agent_tools()
# エージェントプロンプトの作成
self.prompt = self._create_agent_prompt()
# メモリの初期化
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# エージェントの作成
self.agent = create_openai_functions_agent(
llm=self.llm,
tools=self.tools,
prompt=self.prompt
)
# エージェント実行器の作成
self.agent_executor = AgentExecutor(
agent=self.agent,
tools=self.tools,
memory=self.memory,
verbose=True,
handle_parsing_errors=True,
max_iterations=5
)
logger.info(f"✓ データ永続化エージェントを {model_name} で初期化")
def _create_agent_tools(self) -> List[Tool]:
"""データベース操作用のカスタムツールを作成"""
def save_user_info(user_data: str) -> str:
"""ユーザー情報をデータベースに保存します。"""
try:
data = json.loads(user_data)
session = self.db_manager.get_session()
user = session.query(User).filter_by(user_id=data['user_id']).first()
if not user:
user = User(**data)
session.add(user)
else:
for key, value in data.items():
setattr(user, key, value)
session.commit()
session.close()
return f"✓ ユーザー情報の保存に成功しました"
except Exception as e:
logger.error(f"ユーザー情報の保存に失敗しました: {e}")
return f"❌ ユーザー情報の保存エラー: {str(e)}"
def retrieve_user_history(user_id: str) -> str:
"""ユーザーの会話履歴を取得します。"""
try:
session = self.db_manager.get_session()
user = session.query(User).filter_by(user_id=user_id).first()
if not user:
return "ユーザーが見つかりません"
conversations = session.query(Conversation).filter_by(user_id=user.id).order_by(Conversation.created_at.desc()).limit(5).all()
history = []
for conv in conversations:
messages = session.query(Message).filter_by(conversation_id=conv.id).all()
history.append({
'conversation_id': conv.conversation_id,
'created_at': conv.created_at.isoformat(),
'message_count': len(messages),
'summary': conv.summary
})
session.close()
return json.dumps(history, indent=2)
except Exception as e:
logger.error(f"履歴取得に失敗しました: {e}")
return f"❌ 履歴取得エラー: {str(e)}"
def extract_entities(text: str) -> str:
"""テキストからエンティティを抽出し、データベースに保存する。"""
try:
entities = []
# 簡易キーワード抽出(適切なNER処理に置き換える)
keywords = ['important', 'key', 'critical']
for keyword in keywords:
if keyword in text.lower():
entities.append({
'entity_type': 'keyword',
'entity_value': keyword,
'confidence': 0.8
})
return json.dumps(entities, indent=2)
except Exception as e:
logger.error(f"エンティティ抽出に失敗しました: {e}")
return f"❌ エンティティ抽出エラー: {str(e)}"
tools = [
Tool(
name="SaveUserInfo",
func=save_user_info,
description="ユーザー情報をデータベースに保存します。入力はユーザー詳細を含むJSON文字列である必要があります。"
),
Tool(
name="RetrieveUserHistory",
func=retrieve_user_history,
description="データベースからユーザーの会話履歴を取得します。入力はuser_idである必要があります。"
),
Tool(
name="ExtractEntities",
func=extract_entities,
description="テキストから重要なエンティティを抽出しデータベースに保存します。入力は解析対象のテキストである必要があります。"
)
]
return tools
def _create_agent_prompt(self) -> ChatPromptTemplate:
"""エージェントプロンプトテンプレートを作成します。"""
system_message = """あなたは会話内容を記憶し学習できるAIアシスタントです。
以下のツールを利用できます:
- SaveUserInfo: 将来の会話のために記憶すべきユーザー情報を保存
- RetrieveUserHistory: ユーザーとの過去の会話を参照
- ExtractEntities: 会話から重要な情報を抽出して保存
これらのツールを活用し、パーソナライズされた文脈に応じた応答を提供してください。応答前には必ずユーザーとの過去の会話履歴を確認してください。
将来の会話のために重要な情報を積極的に保存してください。"""
prompt = ChatPromptTemplate.from_messages([
("system", system_message),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
return prompt
def chat(self, user_id: str, message: str, conversation_id: Optional[str] = None) -> Dict[str, Any]:
"""
チャットメッセージを処理し、データベースに永続化します。
このメソッドが扱う処理:
1. 会話の作成または取得
2. ユーザーメッセージのデータベースへの保存
3. エージェント応答の生成
4. エージェント応答のデータベースへの保存
5. 監視のための操作のログ記録
引数:
user_id: ユーザーの一意の識別子
message: ユーザーのメッセージテキスト
conversation_id: 既存の会話を引き継ぐためのオプションの会話ID
戻り値:
dict: conversation_id, response, execution_time を含む
"""
start_time = datetime.utcnow()
try:
# 会話の取得または作成
session = self.db_manager.get_session()
if conversation_id:
conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
else:
# 新規会話の作成
user = session.query(User).filter_by(user_id=user_id).first()
if not user:
user = User(user_id=user_id, name=user_id)
session.add(user)
session.commit()
conversation = Conversation(
conversation_id=f"conv_{user_id}_{datetime.utcnow().timestamp()}",
user_id=user.id,
title=message[:100]
)
session.add(conversation)
session.commit()
# ユーザーメッセージを保存
user_message = Message(
conversation_id=conversation.id,
role="user",
content=message,
model=self.model_name
)
session.add(user_message)
session.commit()
# エージェント応答を取得
response = self.agent_executor.invoke({
"input": f"[User ID: {user_id}] {message}"
})
# アシスタントメッセージを保存
assistant_message = Message(
conversation_id=conversation.id,
role="assistant",
content=response['output'],
model=self.model_name
)
session.add(assistant_message)
session.commit()()
# 操作のログ記録
execution_time = (datetime.utcnow() - start_time).total_seconds()
log_entry = AgentLog(
conversation_id=conversation.conversation_id,
level="INFO",
operation="chat",
message="チャット処理成功",
execution_time=execution_time
)
session.add(log_entry)
session.commit()
# セッション終了前に conversation_id を抽出
conversation_id_result = conversation.conversation_id
session.close()
logger.info(f"✓ ユーザー {user_id} のチャット処理を {execution_time:.2f}秒で完了")
return {
'conversation_id': conversation_id_result,
'response': response['output'],
'execution_time': execution_time
}
except Exception as e:
logger.error(f"❌ チャット処理エラー: {e}")
# エラーをログに記録
session = self.db_manager.get_session()
error_log = AgentLog(
conversation_id=conversation_id or "unknown",
level="ERROR",
operation="chat",
message=str(e),
error_details={'exception_type': type(e).__name__}
)
session.add(error_log)
session.commit()
session.close()
raise
DataPersistentAgentは、LangChainの関数呼び出しエージェントをデータベースツールでラップします。SaveUserInfoツールはUserレコードの作成または更新によりユーザーデータを永続化します。RetrieveHistoryツールは過去の会話をクエリしコンテキストを提供します。システムプロンプトはエージェントに対し、情報の保存と履歴確認を積極的に行うよう指示します。ConversationBufferMemoryはセッション内の短期コンテキストを維持します。データベースストレージはセッションを跨いだ長期的な永続性を提供します。

ステップ3.5: データ収集モジュールの作成
会話からデータを抽出し構造化するツールを構築します。コレクターはLLMを用いて要約を生成し、嗜好を抽出し、エンティティを識別します。
class DataCollector:
"""
エージェント会話からデータを収集・構造化します。
このモジュールは:
- 会話要約を生成
- 会話履歴からユーザー嗜好を抽出
- 固有名詞を識別・保存
"""
def __init__(self, db_manager: DatabaseManager, llm: ChatOpenAI):
"""
データコレクターを初期化します。
引数:
db_manager: データベース管理インスタンス
llm: テキスト分析用言語モデル
"""
self.db_manager = db_manager
self.llm = llm
logger.info("✓ データコレクター初期化完了")
def extract_conversation_summary(self, conversation_id: str) -> str:
"""
LLMを使用して会話の要約を生成・保存する。
引数:
conversation_id: 要約対象の会話ID
戻り値:
str: 生成された要約テキスト
"""
try:
session = self.db_manager.get_session()
conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
if not conversation:
return "会話が見つかりません"
messages = session.query(Message).filter_by(conversation_id=conversation.id).all()
# 会話テキストの構築
conv_text = "n".join([
f"{msg.role}: {msg.content}" for msg in messages
])
# LLMによる要約生成
summary_prompt = f"""以下の会話を2~3文で要約し、主要なトピックと結果を捉えてください:
{conv_text}
要約:"""
summary_response = self.llm.invoke([HumanMessage(content=summary_prompt)])
summary = summary_response.content
# 要約で会話更新
conversation.summary = summary
session.commit()
session.close()
logger.info(f"✓ 会話 {conversation_id} の要約を生成しました")
return summary
except Exception as e:
logger.error(f"要約生成に失敗しました: {e}")
return ""
def extract_user_preferences(self, user_id: str) -> Dict[str, Any]:
"""
会話履歴からユーザー設定を抽出して保存する。
引数:
user_id: 分析対象ユーザーのID
戻り値:
dict: 抽出された設定
"""
try:
session = self.db_manager.get_session()
user = session.query(User).filter_by(user_id=user_id).first()
if not user:
return {}
# 最近の会話取得
conversations = session.query(Conversation).filter_by(user_id=user.id).order_by(Conversation.created_at.desc()).limit(10).all()
all_messages = []
for conv in conversations:
messages = session.query(Message).filter_by(conversation_id=conv.id).all()
all_messages.extend([msg.content for msg in messages if msg.role == "user"])
if not all_messages:
return {}
# LLMを用いた嗜好分析
analysis_prompt = f"""ユーザーの以下のメッセージを分析し、嗜好、興味、コミュニケーションスタイルを抽出してください。
メッセージ:
{chr(10).join(all_messages[:20])}
以下の構造を持つJSONオブジェクトを返してください:
{{
"interests": ["interest1", "interest2"],
"communication_style": "description",
"preferred_topics": ["topic1", "topic2"],
"language_preference": "language"
}}"""
response = self.llm.invoke([HumanMessage(content=analysis_prompt)])
try:
# レスポンスからJSONを抽出
content = response.content
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
preferences = json.loads(content)
# ユーザー設定を更新
user.preferences = preferences
session.commit()
logger.info(f"✓ ユーザー {user_id} の設定を抽出しました")
return preferences
except json.JSONDecodeError:
logger.warning("設定のJSONパースに失敗しました")
return {}
finally:
session.close()
except Exception as e:
logger.error(f"設定の抽出に失敗しました: {e}")
return {}
def extract_entities_with_llm(self, conversation_id: str) -> List[Dict[str, Any]]:
"""
LLMを使用して固有表現を抽出します。
引数:
conversation_id: 分析対象の会話のID
戻り値:
list: 抽出されたエンティティのリスト
"""
try:
session = self.db_manager.get_session()
conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
if not conversation:
return []
messages = session.query(Message).filter_by(conversation_id=conversation.id).all()
conv_text = "n".join([msg.content for msg in messages])
# LLMを使用してエンティティを抽出
entity_prompt = f"""以下の会話から固有名詞を抽出してください。特定対象:
- 人物 (PERSON) (PERSON)
- 組織 (ORG)
- 場所 (LOC)
- 日付 (DATE)
- 製品 (PRODUCT)
- 技術 (TECH)
会話:
{conv_text}
以下の形式でエンティティのJSON配列を返す:
[
{{"type": "PERSON", "value": "John Doe", "context": "チームリーダーとして言及"}},
{{"type": "ORG", "value": "Acme Corp", "context": "顧客企業"}}
]"""
response = self.llm.invoke([HumanMessage(content=entity_prompt)])
try:
content = response.content
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
entities_data = json.loads(content)
# エンティティをデータベースに保存
saved_entities = []
for entity_data in entities_data:
entity = Entity(
conversation_id=conversation.id,
entity_type=entity_data['type'],
entity_value=entity_data['value'],
context=entity_data.get('context', ''),
confidence=0.9 # LLM抽出は信頼度が高い
)
session.add(entity)
saved_entities.append(entity_data)
session.commit()
session.close()
logger.info(f"✓ 会話 {conversation_id} から {len(saved_entities)} 個のエンティティを抽出しました")
return saved_entities
except json.JSONDecodeError:
logger.warning("エンティティのJSONパースに失敗しました")
return []
except Exception as e:
logger.error(f"エンティティ抽出に失敗: {e}")
return []
DataCollectorはLLMを用いて会話を分析します。extract_conversation_summaryメソッドは会話の簡潔な要約を作成します。extract_user_preferencesメソッドはメッセージパターンを分析し、ユーザーの興味やコミュニケーションスタイルを特定します。extract_entities_with_llmメソッドは構造化されたプロンプトを用いて、人物、組織、技術などの固有名詞エンティティを抽出します。抽出された全データは将来参照のためデータベースに保存されます。
ステップ4: スマートデータ処理パイプラインの構築
エージェントをブロックせずにデータ収集を処理するバックグラウンド処理を実装します。このパイプラインはワーカースレッドとキューを使用して要約とエンティティを処理します。
class DataProcessingPipeline:
"""
非同期データ処理パイプライン。
このパイプラインは:
- バックグラウンドで会話を処理
- 要約を生成
- メインフローをブロックせずにエンティティを抽出
- ユーザー設定を定期的に更新
"""
def __init__(self, db_manager: DatabaseManager, collector: DataCollector, batch_size: int = 10):
"""
処理パイプラインを初期化します。
引数:
db_manager: データベース管理インスタンス
collector: 処理操作用のデータコレクター
batch_size: 各バッチで処理するアイテム数
"""
self.db_manager = db_manager
self.collector = collector
self.batch_size = batch_size
# 処理キュー
self.summary_queue = Queue()
self.entity_queue = Queue()
self.preference_queue = Queue()
# ワーカースレッド
self.workers = []
self.running = False
logger.info("✓ データ処理パイプライン初期化完了")
def start(self):
"""バックグラウンド処理ワーカーを開始"""
self.running = True
# ワーカースレッドの作成
summary_worker = Thread(target=self._process_summaries, daemon=True)
entity_worker = Thread(target=self._process_entities, daemon=True)
preference_worker = Thread(target=self._process_preferences, daemon=True)
summary_worker.start()
entity_worker.start()
preference_worker.start()
self.workers = [summary_worker, entity_worker, preference_worker]
logger.info("✓ 3つのバックグラウンド処理ワーカーを開始しました")
def stop(self):
"""バックグラウンド処理ワーカーを停止します。"""
self.running = False
for worker in self.workers:
worker.join(timeout=5)
logger.info("✓ バックグラウンド処理ワーカーを停止しました")
def queue_conversation_for_processing(self, conversation_id: str, user_id: str):
"""
会話を処理キューに追加します。
引数:
conversation_id: 処理対象の会話ID
user_id: 嗜好抽出対象のユーザーID
"""
self.summary_queue.put(conversation_id)
self.entity_queue.put(conversation_id)
self.preference_queue.put(user_id)
logger.info(f"✓ 処理待ち会話 {conversation_id} をキューに追加")
def _process_summaries(self):
"""会話サマリーの処理を行うワーカー"""
while self.running:
try:
if not self.summary_queue.empty():
conversation_id = self.summary_queue.get()
self.collector.extract_conversation_summary(conversation_id)
self.summary_queue.task_done()
else:
time.sleep(1)
except Exception as e:
logger.error(f"要約ワーカーでエラー: {e}")
def _process_entities(self):
"""エンティティ抽出処理ワーカー。"""
while self.running:
try:
if not self.entity_queue.empty():
conversation_id = self.entity_queue.get()
self.collector.extract_entities_with_llm(conversation_id)
self.entity_queue.task_done()
else:
time.sleep(1)
except Exception as e:
logger.error(f"エンティティワーカーでエラー: {e}")
def _process_preferences(self):
"""ユーザープリファレンスの処理を行うワーカー。"""
while self.running:
try:
if not self.preference_queue.empty():
user_id = self.preference_queue.get()
self.collector.extract_user_preferences(user_id)
self.preference_queue.task_done()
else:
time.sleep(1)
except Exception as e:
logger.error(f"プリファレンスワーカーでエラー: {e}")
def get_queue_status(self) -> Dict[str, int]:
"""
現在のキューサイズを取得します。
戻り値:
dict: 各処理タイプのキューサイズ
"""
return {
'summary_queue': self.summary_queue.qsize(),
'entity_queue': self.entity_queue.qsize(),
'preference_queue': self.preference_queue.qsize()
}
ProcessingPipelineはデータ収集とメッセージ処理を分離します。会話が完了すると、すぐに処理されるのではなくキューに追加されます。別々のワーカースレッドがこれらのキューからアイテムを取得し、バックグラウンドで処理します。これにより、データ収集がエージェントの応答を妨げるのを防ぎます。daemon=Trueの設定により、メインプログラム終了時にワーカーが確実に終了します。キュー状態の監視は処理のバックログを追跡するのに役立ちます。

ステップ5: リアルタイム監視とロギングの追加
エージェントのパフォーマンス追跡、エラー検出、レポート生成を行う監視システムを構築します。モニターはログを分析し、運用上の洞察を提供します。
class AgentMonitor:
"""
リアルタイム監視とメトリクス収集。
このモジュールは:
- パフォーマンスメトリクスを追跡
- システムの健全性を監視
- 分析レポートを生成
"""
def __init__(self, db_manager: DatabaseManager):
"""
エージェントモニターを初期化。
引数:
db_manager: データベースマネージャーインスタンス
"""
self.db_manager = db_manager
logger.info("✓ エージェントモニター初期化完了")
def get_performance_metrics(self, hours: int = 24) -> Dict[str, Any]:
"""
指定期間のパフォーマンスメトリクスを取得。
引数:
hours: 遡る時間数 (時間単位)
戻り値:
dict: 操作回数やエラー率を含むパフォーマンスメトリクス
"""
try:
session = self.db_manager.get_session()
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
# ログのクエリ
logs = session.query(AgentLog).filter(
AgentLog.created_at >= cutoff_time
).all()
# メトリクスを計算
total_operations = len(logs)
error_count = len([log for log in logs if log.level == "ERROR"])
avg_execution_time = sum([log.execution_time or 0 for log in logs]) / max(total_operations, 1)
# 会話数の取得
conversations = session.query(Conversation).filter(
Conversation.created_at >= cutoff_time
).count()
messages = session.query(Message).join(Conversation).filter(
Message.created_at >= cutoff_time
).count()
session.close()
metrics = {
'time_period_hours': hours,
'total_operations': total_operations,
'error_count': error_count,
'error_rate': error_count / max(total_operations, 1),
'avg_execution_time': avg_execution_time,
'conversations_created': conversations,
'messages_processed': messages
}
logger.info(f"✓ 過去 {hours} 時間のパフォーマンスメトリクスを生成しました")
return metrics
except Exception as e:
logger.error(f"パフォーマンスメトリクスの取得に失敗しました: {e}")
return {}
def health_check(self) -> Dict[str, Any]:
"""
ヘルスチェックを実行します。
戻り値:
dict: データベース接続状態やエラー率を含むヘルスステータス
"""
try:
# データベース接続状態を確認
db_healthy = self.db_manager.health_check()
# 直近のエラー率を確認
metrics = self.get_performance_metrics(hours=1)
recent_errors = metrics.get('error_count', 0)
# 全体的な健全性を判定
is_healthy = db_healthy and recent_errors < 10
health_status = {
'status': 'healthy' if is_healthy else 'degraded',
'database_connected': db_healthy,
'recent_errors': recent_errors,
'timestamp': datetime.utcnow().isoformat()
}
logger.info(f"✓ Health check: {health_status['status']}")
return health_status
except Exception as e:
logger.error(f"ヘルスチェック失敗: {e}")
return {
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}
AgentMonitorはシステム運用状況の可観測性を提供します。AgentLogテーブルをクエリすることで、総操作数、エラー率、平均実行時間などのメトリクスを追跡します。get_metricsメソッドは設定可能な時間枠で統計を算出します。get_error_reportメソッドはデバッグ用の詳細なエラー情報を取得します。この監視により、問題の事前検知が可能になります。高いエラー率はユーザーに影響が出る前に調査をトリガーします。
ステップ6: クエリインターフェースの構築
保存されたデータを取得・分析するクエリ機能を構築します。このインターフェースは、会話の検索、エンティティの追跡、分析生成のためのメソッドを提供します。
class DataQueryInterface:
"""
保存されたエージェントデータをクエリするためのインターフェース。
このモジュールは以下のメソッドを提供します:
- ユーザー分析データのクエリ
- 会話履歴の取得
- 特定情報の検索
"""
def __init__(self, db_manager: DatabaseManager):
"""
クエリインターフェースを初期化します。
引数:
db_manager: データベースマネージャインスタンス
"""
self.db_manager = db_manager
logger.info("✓ クエリインターフェース初期化完了")
def get_user_analytics(self, user_id: str) -> Dict[str, Any]:
"""
特定のユーザーの分析データを取得します。
引数:
user_id: 分析対象ユーザーのID
戻り値:
dict: 会話数や嗜好を含むユーザー分析データ
"""
try:
session = self.db_manager.get_session()
user = session.query(User).filter_by(user_id=user_id).first()
if not user:
return {}
# 会話数取得
conversation_count = session.query(Conversation).filter_by(user_id=user.id).count()
# メッセージ数取得
message_count = session.query(Message).join(Conversation).filter(
Conversation.user_id == user.id
).count()
# エンティティ数を取得
entity_count = session.query(Entity).join(Conversation).filter(
Conversation.user_id == user.id
).count()
# 時間範囲を取得
first_conversation = session.query(Conversation).filter_by(
user_id=user.id
).order_by(Conversation.created_at).first()
last_conversation = session.query(Conversation).filter_by(
user_id=user.id
).order_by(Conversation.created_at.desc()).first()
session.close()
analytics = {
'user_id': user_id,
'name': user.name,
'conversation_count': conversation_count,
'message_count': message_count,
'entity_count': entity_count,
'preferences': user.preferences,
'first_interaction': first_conversation.created_at.isoformat() if first_conversation else None,
'last_interaction': last_conversation.created_at.isoformat() if last_conversation else None,
'avg_messages_per_conversation': message_count / max(conversation_count, 1)
}
logger.info(f"✓ ユーザー {user_id} の分析データを生成しました")
return analytics
except Exception as e:
logger.error(f"ユーザー分析データの取得に失敗しました: {e}")
return {}
QueryInterfaceは保存データにアクセスするメソッドを提供します。get_user_conversationsメソッドは、オプションでメッセージを含めるフル会話履歴を取得します。search_conversationsメソッドは SQLのILIKE演算子を用いてメッセージ内容全体を全文検索します。get_entity_mentionsメソッドは特定のエンティティが言及された全会話を検索します。get_user_analyticsメソッドはユーザー活動に関する統計を生成します。これらのクエリによりダッシュボード構築、レポート生成、パーソナライズされた体験の創出が可能となります。
ステップ7: Bright DataのリアルタイムWebデータを活用したRAG構築
Bright DataのリアルタイムWebインテリジェンスによるRAG機能で、データベース接続型エージェントを強化します。この統合により、会話履歴と最新のWebデータを組み合わせ、より優れた応答を実現します。
class BrightDataRAGEnhancer:
"""
Bright Dataのウェブインテリジェンスでデータ永続型エージェントを強化します。
このモジュールは:
- Bright Dataからリアルタイムウェブデータを取得
- RAG用にウェブデータをベクターストアに取り込み
- ウェブ拡張知識でエージェントを強化
"""
def __init__(self, api_key: str, db_manager: DatabaseManager):
"""
Bright DataでRAGエンハンサーを初期化します。
引数:
api_key: Bright Data APIキー
db_manager: データベースマネージャーインスタンス
"""
self.api_key = api_key
self.db_manager = db_manager
self.base_url = "https://api.brightdata.com"
# RAG用ベクトルストアを初期化
self.embeddings = OpenAIEmbeddings()
self.vector_store = Chroma(
embedding_function=self.embeddings,
persist_directory="./chroma_db"
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
logger.info("✓ Bright Data RAG エンハンサーの初期化完了")
def fetch_dataset_data(
self,
dataset_id: str,
filters: Optional[Dict[str, Any]] = None,
limit: int = 1000
) -> List[Dict[str, Any]]:
"""
Bright Data Dataset Marketplace からデータを取得します。
引数:
dataset_id: 取得するデータセットのID
filters: データのオプションフィルター
limit: 取得するレコードの最大数
戻り値:
list: 取得したデータセットのレコード
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
endpoint = f"{self.base_url}/データセット/v3/snapshot/{データセットID}"
params = {
"format": "json",
"limit": limit
}
if filters:
params["filter"] = json.dumps(filters)
try:
response = requests.get(endpoint, headers=headers, params=params)
response.raise_for_status()
data = response.json()
logger.info(f"✓ Bright Data データセット {dataset_id} から {len(data)} 件のレコードを取得しました")
return data
except Exception as e:
logger.error(f"Bright Dataデータセットの取得に失敗しました: {e}")
return []
def ingest_web_data_to_rag(
self,
dataset_records: List[Dict[str, Any]],
text_fields: List[str],
metadata_fields: Optional[List[str]] = None
) -> int:
"""
ウェブデータをRAGベクトルストアに取り込む。
引数:
dataset_records: Bright Dataからのレコード
text_fields: テキストコンテンツとして使用するフィールド
metadata_fields: メタデータに含めるフィールド
戻り値:
int: インジェストされたドキュメントチャンクの総数
"""
try:
documents = []
for record in dataset_records:
# テキストフィールドを結合
text_content = " ".join([
str(record.get(field, ""))
for field in text_fields
if record.get(field)
])
if not text_content.strip():
continue
# メタデータ構築
metadata = {
"source": "bright_data",
"record_id": record.get("id", "unknown"),
"timestamp": datetime.utcnow().isoformat()
}
if metadata_fields:
for field in metadata_fields:
if field in record:
metadata[field] = record[field]
# テキストをチャンクに分割
chunks = self.text_splitter.split_text(text_content)
for chunk in chunks:
documents.append({
"content": chunk,
"metadata": metadata
})
# ベクトルストアに追加
if documents:
texts = [doc["content"] for doc in documents]
metadatas = [doc["metadata"] for doc in documents]
self.vector_store.add_texts(
texts=texts,
metadatas=metadatas
)
logger.info(f"✓ {len(documents)} 個の文書チャンクをRAGに取り込み完了")
return len(documents)
except Exception as e:
logger.error(f"RAGへのウェブデータ取り込みに失敗しました: {e}")
return 0
def create_rag_enhanced_agent(
self,
base_agent: DataPersistentAgent
) -> DataPersistentAgent:
"""
既存エージェントをRAG機能で強化します。
引数:
base_agent: 強化対象のベースエージェント
戻り値:
DataPersistentAgent: RAGツールを付与した強化エージェント
"""
def rag_search(query: str) -> str:
"""会話履歴とウェブデータの両方を検索します。"""
try:
# 会話履歴から取得
session = self.db_manager.get_session()
messages = session.query(Message).filter(
Message.content.ilike(f'%{query}%')
).order_by(Message.created_at.desc()).limit(5).all()
results = []
for msg in messages:
results.append({
'content': msg.content,
'source': 'conversation_history',
'relevance': 0.8
})
session.close()
# ベクターストア(ウェブデータ)から取得
try:
vector_results = self.vector_store.similarity_search_with_score(query, k=5)
for doc, score in vector_results:
results.append({
'content': doc.page_content,
'source': 'web_data',
'relevance': 1 - score
})
except Exception as e:
logger.error(f"ベクターストアからの取得に失敗しました: {e}")
if not results:
return "関連する情報は見つかりませんでした。"
# コンテキストのフォーマット
context_text = "nn".join([
f"[{item['source']}] {item['content'][:200]}..."
for item in results[:5]
])
return f"取得したコンテキスト:n{context_text}"
except Exception as e:
logger.error(f"RAG検索失敗: {e}")
return f"検索実行エラー: {str(e)}"
# エージェントにRAGツールを追加
rag_tool = Tool(
name="SearchKnowledgeBase",
func=rag_search,
description="会話履歴とリアルタイムWebデータの両方から関連情報を検索します。 入力は検索クエリである必要があります。"
)
base_agent.tools.append(rag_tool)
# 新しいツールでエージェントを再作成
base_agent.agent = create_openai_functions_agent(
llm=base_agent.llm,
tools=base_agent.tools,
prompt=base_agent.prompt
)
base_agent.agent_executor = AgentExecutor(
agent=base_agent.agent,
tools=base_agent.tools,
memory=base_agent.memory,
verbose=True,
handle_parsing_errors=True,
max_iterations=5
)
logger.info("✓ RAG機能を備えた強化エージェント")
return base_agent
BrightDataEnhancerはリアルタイムWebデータをエージェントに統合します。fetch_datasetメソッドはBright Dataマーケットプレイスから構造化データを取得します。ingest_to_ragメソッドはこのデータを処理・分割し、意味検索用にChromaベクトルデータベースに保存します。retrieve_contextメソッドはハイブリッド検索を実行し、データベース履歴とベクトル類似性検索を組み合わせます。create_rag_toolメソッドはこの機能をLangChainツールとしてパッケージ化し、エージェントが利用できるようにします。enhance_agentメソッドは既存エージェントにこのRAG機能を追加します。これによりエージェントは内部会話履歴と最新の外部データの両方を使用して質問に回答できるようになります。
完全なデータ永続化エージェントシステムの実行
すべてのコンポーネントを統合して機能するシステムを作成します。
def main():
"""全コンポーネント連携を示すメイン実行フロー"""
print("=" * 60)
print("データ永続化AIエージェントシステム - 初期化")
print("=" * 60)
# ステップ1: データベース初期化
print("n[Step 1] データベース接続設定中...")
db_manager = DatabaseManager(
database_url=os.getenv("DATABASE_URL"),
pool_size=5,
max_retries=3
)
db_manager.initialize_database()
# ステップ2: コアエージェントの初期化
print("n[Step 2] AIエージェントコアの構築中...")
agent = DataPersistentAgent(
db_manager=db_manager,
model_name=os.getenv("AGENT_MODEL", "gpt-4-turbo-preview")
)
# Step 3: データコレクターの初期化
print("n[Step 3] データ収集モジュールの作成中...")
collector = DataCollector(db_manager, agent.llm)
# Step 4: 処理パイプラインの初期化
print("n[Step 4] データ処理パイプラインの実装中...")
pipeline = DataProcessingPipeline(db_manager, collector)
pipeline.start()
# ステップ5: モニタリングの初期化
print("n[Step 5] モニタリングとロギングを追加中...")
monitor = AgentMonitor(db_manager)
# ステップ6: クエリインターフェースの初期化
print("n[Step 6] クエリインターフェースを構築中...")
query_interface = DataQueryInterface(db_manager)
# ステップ7: オプションのBright Data RAG強化
print("n[Step 7] RAG強化 (オプション)...")
bright_data_key = os.getenv("BRIGHT_DATA_API_KEY")
if bright_data_key and bright_data_key != "your-bright-data-api-key":
print("Bright DataからリアルタイムWebデータを取得中...")
enhancer = BrightDataRAGEnhancer(bright_data_key, db_manager)
# 例: Webデータの取得と取り込み
web_data = enhancer.fetch_dataset_data(
dataset_id="example_dataset_id",
limit=100
)
if web_data:
enhancer.ingest_web_data_to_rag(
dataset_records=web_data,
text_fields=["title", "content", "description"],
metadata_fields=["url", "published_date"]
)
# RAGでエージェントを強化
agent = enhancer.create_rag_enhanced_agent(agent)
print("✓ Bright Data RAG機能でエージェントを強化")
else:
print("⚠️ Bright Data APIキーが見つかりません - Webデータ統合をスキップ")
print("n" + "=" * 60)
print("デモ会話")
print("=" * 60)
# デモユーザーインタラクション
test_user = "demo_user_001"
# 最初の会話
print("n📝 会話 1:")
response1 = agent.chat(
user_id=test_user,
message="こんにちは! 機械学習について学びたいです。"
)
print(f"エージェント: {response1['response']}n")
# 処理待ちキューへ追加
pipeline.queue_conversation_for_processing(
response1['conversation_id'],
test_user
)
# 2回目の会話
print("📝 会話2:")
response2 = agent.chat(
user_id=test_user,
message="ニューラルネットワークについて教えてくれますか?",
conversation_id=response1['conversation_id']
)
print(f"エージェント: {response2['response']}n")
# バックグラウンド処理を待機
print("⏳ バックグラウンドでデータを処理中...")
time.sleep(5)
print("n" + "=" * 60)
print("分析と監視")
print("=" * 60)
# パフォーマンス指標を取得
metrics = monitor.get_performance_metrics(hours=1)
print(f"n📊 パフォーマンス指標:")
print(f" - 総操作数: {metrics.get('total_operations', 0)}")
print(f" - エラー率: {metrics.get('error_rate', 0):.2%}")
print(f" - 平均実行時間: {metrics.get('avg_execution_time', 0):.2f}秒")
print(f" - 作成された会話: {metrics.get('conversations_created', 0)}件")
print(f" - 処理済みメッセージ数: {metrics.get('messages_processed', 0)}")
# ユーザー分析データを取得
analytics = query_interface.get_user_analytics(test_user)
print(f"n👤 ユーザー分析データ:")
print(f" - 会話数: {analytics.get('conversation_count', 0)}")
print(f" - メッセージ数: {analytics.get('message_count', 0)}")
print(f" - エンティティ数: {analytics.get('entity_count', 0)}")
print(f" - 平均メッセージ数/会話: {analytics.get('avg_messages_per_conversation', 0):.1f}")
# ヘルスチェック
health = monitor.health_check()
print(f"n🏥 システム状態: {health['status']}")
# キュー状態
queue_status = pipeline.get_queue_status()
print(f"n📋 処理キュー:")
print(f" - サマリーキュー: {queue_status['summary_queue']}")
print(f" - エンティティキュー: {queue_status['entity_queue']}")
print(f" - プリファレンスキュー: {queue_status['preference_queue']}")
# パイプラインを停止
pipeline.stop()
print("n" + "=" * 60)
print("データ永続化エージェントシステム - 完了")
print("=" * 60)
print("n✓ 全データがデータベースに永続化されました")
print("✓ バックグラウンド処理が完了しました")
print("✓ システムは本番運用準備完了です")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("nn⚠️ 正常にシャットダウン中...")
except Exception as e:
logger.error(f"システムエラー: {e}")
import traceback
traceback.print_exc()
データベース接続エージェントシステムを実行:
python agent.py
システムは完全なワークフローを実行します。データベースを初期化し、全てのテーブルを作成します。データベースツールを用いてLangChainエージェントを設定します。処理用のバックグラウンドワーカーを起動します。デモ会話を処理しデータベースに保存します。エンティティを抽出し、バックグラウンドで要約を生成します。リアルタイム分析とメトリクスを表示します。
各コンポーネントが初期化されデータを処理する過程で詳細なログを確認できます。エージェントは全メッセージを保存し、洞察を抽出し、会話の完全な文脈を維持します。

実用的なユースケース
1. 完全履歴付きカスタマーサポート
# エージェントが過去のやり取りを取得
support_agent = DataPersistentAgent(db_manager)
response = support_agent.chat(
user_id="customer_123",
message="接続の問題がまだ発生しています")
# エージェントが過去の接続問題に関する会話を参照
2. 学習機能付きパーソナルAIアシスタント
# エージェントが時間の経過とともに好みを学習
query_interface = QueryInterface(db_manager)
analytics = query_interface.get_user_analytics("user_456")
# 対話パターン、好み、共通トピックを表示
3. ナレッジベース搭載リサーチアシスタント
# 会話履歴とウェブデータを統合
enhancer = BrightDataEnhancer(api_key, db_manager)
enhancer.ingest_to_rag(research_data, ["title", "abstract", "content"])
agent = enhancer.enhance_agent(agent)
# エージェントは過去の議論と最新研究の両方を参照
メリット概要
| 機能 | データベースなし | データベースあり永続性 |
|---|---|---|
| メモリ | 再起動時に消失 | 永続ストレージ |
| パーソナライズ | なし | 完全な履歴に基づく |
| 分析 | 不可 | 完全なインタラクションデータ |
| エラー回復 | 手動介入 | 自動再試行とログ記録 |
| スケーラビリティ | シングルインスタンス | 状態を共有するマルチインスタンス |
| インサイト | セッション終了後に消失 | 抽出および追跡 |
まとめ
データベースへ会話を永続化する本番環境対応のAIエージェントシステムが完成しました。本システムは全インタラクションを保存し、エンティティとインサイトを抽出、完全な会話履歴を維持し、自動エラー回復機能を備えた監視を提供します。
セキュリティ強化のためのユーザー認証追加、分析可視化ダッシュボード構築、意味検索のための埋め込み実装、統合用APIエンドポイント作成、スケーラビリティのためのDockerデプロイなどで機能を拡張できます。モジュール設計により、特定のニーズに合わせたカスタマイズが容易です。
高度なAIエージェントパターンやBright DataのWebインテリジェンスプラットフォームを活用し、さらなる機能を探求してください。
記憶し学習するインテリジェントシステム構築を始めるには、無料アカウントを作成してください。