AI

Bright DataとMage AIでAmazonパイプラインを構築する方法

Bright DataとMage AIを活用し、商品データを収集、AIによるレビュー分析を実施、ライブダッシュボードで結果を提供する完全なAmazonデータパイプラインを構築する。
4 分読
Mage AI with Bright Data blog image

Amazon商品データパイプラインをノートPCで動作させるのは一つの課題です。プロキシ、CAPTCHA、レイアウト変更、IPブロックを伴う本番環境での継続的な稼働は別の課題です。スクレイピング自体を解決しても、スケジューリング、再試行、エラー処理、収集したデータの可視化手段が必要です。

ここではそれら全てを構築します。Bright DataのウェブスクレイピングAPIと Mage AIを活用し、Amazon商品とレビューを収集→Gemini感情分析を実行→PostgreSQLとStreamlitダッシュボードへ全データをプッシュするパイプラインを構築します。フルパイプラインはDockerと単一APIキー(AI分析用Geminiキーはオプション)で動作します

要約:スクレイピングインフラを構築せずに実現するAmazon商品インテリジェンス。

  • 得られるもの:キーワードで商品を発見し、Gemini AIでレビューを分析し、ライブStreamlitダッシュボードを提供するパイプライン
  • 費用:従量課金制(レコード単位課金・価格ページ参照)、エンドツーエンド処理時間5~8分
  • 仕組み:Bright Dataがプロキシ・CAPTCHA・パース処理を担当、Mage AIがスケジューリング・再試行・分岐処理を担当
  • 開始方法docker compose up– 全コードはGitHubリポジトリに公開

構築内容:Bright DataとMage AIの連携パイプライン

Bright DataのウェブスクレイピングAPIがスクレイピング層を処理。キーワードまたは商品URLを送信すると、構造化されたJSON(タイトル、価格、評価、レビュー、販売者情報)が解析済みで返却されます。プロキシインフラの管理やHTMLのパースは不要。Amazonがサイトを変更しても、Bright Dataが通常はパーサーを更新。コードは変更不要。

Mage AIを初めて利用する場合:Airflowのような無料のオープンソースデータパイプラインツールですが、定型コードが不要です。ノートブック形式のエディターでPythonを記述し、各ブロックは独自のテストと出力プレビューを備えた再利用可能な単位となります。 重要な点:Mage AIは分岐パイプライン(並列パスを持つ有向非循環グラフ)をサポート。ブロック単位の再試行ロジックと、UIから変更可能なパイプライン変数を内蔵しており、コード編集は不要です。

このパイプラインは2つの並列分岐に6つのブロックを配置:

Mage AI pipeline integrating Bright Data API for Amazon product data collection and review analysis
Mage AIの分岐パイプライン。左分岐は製品を即時エクスポート、右分岐はレビューを収集・分析

パイプラインはBright Data経由でキーワードから商品を発見し、価格帯と評価で情報を補完した後、分岐します。一方の経路は商品をPostgreSQLへ即時エクスポートし、もう一方の経路は上位商品のレビューを収集、Geminiで感情分析を実行し、それらもエクスポートします。

ここでMage AIを使用しているのは、パイプラインが分岐する(線形スクリプトではなくDAGであるため、レビュー収集に失敗しても製品データは既に安全)ためです。ただしBright Data API呼び出しは単なるHTTPリクエストであり、Airflow、Prefect、Dagster、あるいは単純なPythonスクリプトでも同様に動作します。

クイックスタート

リポジトリをクローンし、APIキーを追加して実行してください。すべてDocker内で動作するため、ローカルにPythonをインストールする必要はありません。

前提条件

以下のものが必要です:

  • Docker および Docker ComposeDocker を入手
  • APIトークン付きのBright Dataアカウント
  • Google GeminiAPIキー(無料利用枠あり、制限あり。詳細は後述のGeminiセクションを参照)
  • PythonとDockerの基本的な知識。スクレイピング経験は不要です。それがポイントです

ステップ1: クローンと設定

リポジトリをクローンし設定ファイルを作成:

git clone https://github.com/triposat/mage-brightdata-demo.git
cd mage-brightdata-demo
cp .env.example .env

次に、.envにAPIキーを追加します:

BRIGHT_DATA_API_TOKEN=your_api_token_here
GEMINI_API_KEY=your_gemini_api_key_here

Bright Data APIトークンの取得方法:[Bright Data](https://brightdata.com) でサインアップ(無料トライアル、クレジットカード不要)後、アカウント設定からAPIキーを作成します。本パイプラインでは2つのウェブスクレイピング APIスクレイパー(商品発見用とレビュー用)を使用し、レコード単位で課金される従量課金制です。最新料金は価格ページをご確認ください

Gemini APIキーの取得方法Google AI Studioにアクセスし、ログイン後「APIキーを作成」をクリック。無料プランでクレジットカード不要。本パイプラインはキーなしでも動作し、評価ベースの感情分析にフォールバックします。

ステップ2: サービスの起動

docker compose up -d

キーの読み込み状態を確認したい場合:

docker compose exec mage python -c "import os; t=os.getenv('BRIGHT_DATA_API_TOKEN',''); assert t and t!='your_api_token_here', 'Token not set'; print('OK')"

これにより3つのコンテナが起動します:

サービス URL 目的
Mage AI http://localhost:6789 パイプラインエディターおよびスケジューラー
Streamlit ダッシュボード http://localhost:8501 ライブデータ可視化 + チャット
PostgreSQL localhost:5432 データストレージ

初回実行時はイメージの取得と依存関係のインストールが行われ、接続環境により約3~5分かかります。docker compose stop/startによる再起動は数秒で完了します。docker compose down/upはpipパッケージの再インストールを伴い、約1分かかります。

Docker containers running Mage AI, Streamlit, and PostgreSQL
3つのサービスすべてが稼働中

ステップ3: パイプラインの実行

http://localhost:6789 を開き、[Pipelines] に移動してamazon_product_intelligence をクリックし、左サイドバーの[Triggers]をクリックして[Run@once] を押します。

Mage AI dashboard with amazon_product_intelligence pipeline
Mage AIダッシュボード

パイプラインの全体所要時間は約5~8分です。大半の時間はBright Data APIがAmazonからデータを収集する工程に費やされます。エンリッチメントとデータベースエクスポートは数秒で完了し、Gemini分析はバッチサイズとレート制限に依存します。6つのブロック全てが緑色に変わったら、http://localhost:8501でダッシュボードを確認してください。

Mage AI pipeline with all 6 blocks completed – Bright Data API collection, enrichment, Gemini analysis, and PostgreSQL export
全6ブロックが緑色に。パイプライン完了

Mage AIデータパイプラインの仕組み

コードの流れを解説します。Bright Dataの統合とGemini分析に焦点を当てます。

Bright DataのウェブスクレイピングAPIをMage AIに接続

Amazon Products APIにキーワードを送信し、構造化データを取得します。Bright Dataはこれを「ディスカバリー」スクレイパーと呼び、キーワードやカテゴリで製品を検索します。レビューブロックでは別途「レビュー」スクレイパーを使用し、製品URLを入力として受け取ります。APIは非同期パターンを採用:収集をトリガー→スナップショットID取得→結果準備完了までポーリング。

DATASET_ID = "gd_l7q7dkf244hwjntr0"  # Amazon Products(最新IDはリポジトリで確認)
API_BASE = "https://api.brightdata.com/data sets/v3"

# 収集をトリガー(/scrapeを使用 – 1分超で自動非同期化;本番環境では/triggerを検討)
response = requests.post(
    f"{API_BASE}/scrape",
    headers={"Authorization": f"Bearer {api_token}",
             "Content-Type": "application/json"},
    params={"dataset_id": DATASET_ID,
            "type": "discover_new",
            "discover_by": "keyword",
            "limit_per_input": kwargs.get('limit_per_keyword', 5)},
    json={"input": [{"keyword": kw} for kw in keywords]})

snapshot_id = response.json()["snapshot_id"]

# 結果が準備できるまでポーリング
data = requests.get(
    f"{API_BASE}/snapshot/{snapshot_id}",
    headers={"Authorization": f"Bearer {api_token}"},
    params={"format": "json"}
).json()

Bright Dataから返される内容は以下の通りです:

{
  "title": "BESIGN LS03 Aluminum Laptop Stand",
  "asin": "B07YFY5MM8",          // Amazonの固有商品ID
  "url": "https://www.amazon.com/dp/B07YFY5MM8",
  "initial_price": 19.99,
  "final_price": 16.99,
  "currency": "USD",
  "rating": 4.8,
  "reviews_count": 22776,
  "seller_name": "BESIGN",
  "categories": ["Office Products", "Office & School Supplies"],
  "image_url": "https://m.media-amazon.com/images/I/..."
}

kwargs.get('limit_per_keyword', 5)は Mage AI パイプライン変数から取得するため、UI から調整可能です。

2つ目のAPI呼び出しの追加:Amazonレビュー収集

レビュー収集ツールは上流ブロックから処理済み製品を受け取り、レビュー数でソートします。上位 N 件を選択し、それらのAmazon URLを2つ目のBright Data APIに渡します:

REVIEWS_DATASET_ID = "gd_le8e811kzy4ggddlq"  # Amazonレビュー

# 上流からの上位商品(Mage AIから自動渡される)
top_products = data.sort_values('reviews_count', ascending=False).head(top_n)
product_urls = top_products['url'].dropna().tolist()

# レビューAPIにURLを送信(/scrapeパターンと同様)
response = requests.post(
    f"{API_BASE}/scrape",
    headers={"Authorization": f"Bearer {api_token}",
             "Content-Type": "application/json"},
    params={"dataset_id": REVIEWS_DATASET_ID},
    json={"input": [{"url": url} for url in product_urls]})

# 製品APIと同様の非同期ポーリングパターン...

両方のAPIブロックには、デモのmetadata.yamlにリトライ設定があります:呼び出しが失敗した場合、パイプラインは30秒間隔で3回リトライします。このデモの各ブロックには、実行後に実行される@test関数もあります。これが失敗すると、下流のブロックは実行されず、不良データがデータベースに保存されることはありません。

AI分析の追加: Gemini感情分析パイプラインブロック

キーワードマッチング(例:「not cheap, great quality!」を「cheap」という単語のためネガティブと判定)の代わりに、Geminiで文脈を理解します。ブロックは無料利用枠の制限内に収めるため、3モデルローテーションでレビューをバッチ処理します:

GEMINI_MODELS = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-2.5-pro"]  # 最新モデルはリポジトリで確認

prompt = f"""これらのレビューを分析してください。各レビューに対して、以下のJSONを返します:
- "sentiment": "Positive", "Neutral", or "Negative"
- "issues": 言及された具体的な製品の問題点
- "themes": 1~3つのトピックタグ
- "summary": 一文の要約
JSON形式でのみ返すこと。nn{reviews_text}"""

for model in models:
    try:
        response = client.models.generate_content(model=model, contents=prompt)
        return json.loads(response.text.strip())
    except Exception as e:
        if '429' in str(e):
            continue  # レート制限 -- 次のモデルへローテーション

ローテーションはflash-lite(最安・最速)から開始し、flash→proの順でフォールバック。3モデル全てが枯渇した場合、レビューは評価ベースの感情分析に切り替わる。 無料プランのクォータは定期的に変更されますが、3モデルローテーションにより大半のレート制限は自動で処理されます。Geminiは各レビューに対し、感情分析結果、具体的な問題点(「凹凸のある路面で揺れる」「ヒンジが経年劣化で緩む」など)、1~3つのテーマタグを返します。各レビューには1文の要約も付随します。

残りのブロック(価格帯と割引計算用のトランスフォーマー、およびアップサートロジックを備えた2つのデータベースエクスポーター)は単純明快です。詳細を確認したい場合はGitHubリポジトリにあります。

パイプライン出力:結果とStreamlitダッシュボード

デフォルトキーワード「laptop stand」と「wireless earbuds」でパイプラインを1回実行した結果です。Amazonの現在の商品リストによって結果は異なります。

今回の実行結果:10商品検出、Geminiによる20件のレビュー分析。イヤホンのレビューからは、4.3点という平均評価には表れない不満が浮上しました。「音質」「バッテリー持続時間」「接続性」といったテーマと具体的な問題点が指摘されています。

パイプラインが生成データに付加する価値

フィールド 追加元
best_price $16.99 変換率(計算値)
discount_percent 15.0% トランスフォーマー(計算済み)
価格帯 予算(25ドル未満) トランスフォーマー(強化版)
評価カテゴリ 優秀 (4.5-5) トランスフォーマー(強化版)
感情 ネガティブ ジェミニAI
問題 [“Bluetoothが頻繁に接続を落とす”] ジェミニAI
テーマ [“接続性”, “バッテリー寿命”] Gemini AI
ai_summary 「8時間持続と謳っているのに、バッテリーはわずか2時間しか持たない」 Gemini AI

実際の表示例 – 拡張フィールドが表示された全10商品:

Amazon product data collected by Bright Data API with enriched price tiers, ratings, and review counts
強化フィールド付き全10商品。2つの異なる商品カテゴリからの価格帯、評価、レビュー数

ダッシュボード

Streamlitダッシュボードはhttp://localhost:8501で表示。サイドバーの「Refresh Data」をクリックするとPostgreSQLから最新データを取得します。

Amazon product intelligence dashboard showing price comparison, price tiers, and filtering controls
製品インテリジェンスダッシュボード ― 価格比較、価格帯、フィルタリング機能

サイドバーでは価格帯・評価・感情でフィルタリング可能。感情ビューでは全レビューの肯定/否定内訳と、Geminiが抽出した具体的な問題点(「Bluetooth接続が切れる」「ヒンジが経年劣化で緩む」など)を表示。星評価では埋もれる詳細情報を可視化。

Amazon review sentiment analysis results showing positive and negative breakdown with AI-detected issues
感情分析の内訳とAI検出の製品問題点。Geminiが抽出した実際の苦情(キーワードマッチングではない)

ダッシュボードには「データと対話」機能も搭載。平易な英語で質問すると、Geminiが実際のスクレイピングデータを文脈として用いて回答します。別の実行例(より多くの製品を含む)を以下に示します:

Chat with Your Data feature – ask questions about scraped Amazon data in plain English
スクレイピングしたデータについて平易な英語で質問

パイプラインのスケーリング

デモは2つのキーワードと10商品で実行されます。

パイプライン変数

Mage AI UIまたはmetadata.yamlからすべて設定可能:

変数 制御対象 デフォルト
キーワード Amazon検索用語 ["ノートパソコンスタンド", "ワイヤレスイヤホン"]
キーワードごとの制限 Bright Dataからのキーワードごとの製品数 5
上位n商品 上位製品のレビュー収集数 2
reviews_per_product 商品ごとの最大レビュー数 10
sort_by レビュー選択のための商品ランク付け方法 レビュー数

キーワードを ["phone case", "USB-C hub"]に変更するだけで、全く異なるデータセットが得られます。コードの変更は不要です。

Configurable pipeline variables in the Mage AI UI for Amazon search keywords and limits
Mage AI UI内のパイプライン変数

スケジュール設定

スケジュールで実行するには、Mage AI サイドバーの「トリガー」に移動し、「+ 新規トリガー」をクリック、「スケジュール」を選択し、頻度(1 回、毎時、毎日、毎週、毎月、またはカスタム cron)を選択します。

各実行はASIN単位でアップサート(上書き更新)します。つまり、同一商品のデータは上書きされますが、他のキーワードの結果は保持されます。タイムスタンプ付きのCSVバックアップも保存され、履歴比較が可能です。

数回の実行データが蓄積されたら、星評価では捕捉できない苦情を抽出するため、PostgreSQLを直接クエリできます:

-- ネガティブな感情が強い商品を探す
SELECT asin, product_name,
    AVG(CASE WHEN sentiment = 'Negative' THEN 1 ELSE 0 END) as negative_rate
FROM amazon_reviews
GROUP BY asin, product_name
HAVING AVG(CASE WHEN sentiment = 'Negative' THEN 1 ELSE 0 END) > 0.2;

検索キーワードではなく自社製品を監視するには、typediscover_bylimit_per_inputパラメータを削除し、製品URLを直接[{"url": "https://www.amazon.com/dp/YOUR_ASIN"} ]として渡します。

ダッシュボードやアラートを自作せずに利用したい場合、Bright Insightsは小売データ向けに追加設定なしでこれを実現します。

スケーリングについて。このデモは単一マシン上のDockerで実行されますが、Mage AIは本番環境向けにKubernetesエグゼキュータをサポートし、Bright DataのAPIはバッチリクエストのレート制限により並行処理を処理します。スケーリングとは、データ収集コードを変更するのではなく、Mage AIの容量を追加することです。

他のBright Dataスクレイパーとの統合

同じパイプラインパターンは、100以上のウェブサイトに対応するBrightDataの既製スクレイパーすべてで機能します。例として、Google Maps ScraperLinkedIn ScraperCrunchbase Scraperのリポジトリを参照してください。Amazonから別のプラットフォームに切り替えるには、データローダーブロック内のDATASET_IDを交換し、新しいスクレイパーのスキーマに合わせて入力パラメータを調整します。

適切なIDと入力フィールドを見つけるには、ダッシュボードのスクレイパーライブラリを参照するか、/dataセット/listエンドポイントを呼び出してください。ダッシュボードのAPIリクエストビルダーでは、各スクレイパーが要求する内容を正確に確認できます。Geminiの分析とパイプライン構造はそのまま引き継がれます。エンリッチメントブロックとエクスポートブロックは、新しいスクレイパーのレスポンスフィールドがAmazonと異なる場合、列名の調整が必要になる可能性があります。

トラブルシューティング

設定または実行中に問題が発生した場合、最も一般的な解決策は以下の通りです:

  • ポート6789または8501が既に使用中。他のサービスがポートを占有しています。該当サービスを停止するか、docker-compose.ymlを編集してポートを再マッピングしてください(例:6789:67896790:6789に変更)。
  • Bright Data APIが401 Unauthorizedを返す。APIトークンが欠落または不正な形式です。アカウント設定で完全なトークンをコピーし、.envファイルに末尾の空白がないことを確認してください。トークンは長い16進数文字列(64文字)です。コピーした文字列が短い、またはUUIDのようなダッシュを含む場合は、誤ったフィールドをコピーした可能性があります。
  • Geminiが全モデルで429(レート制限)を返す。無料プランには分単位の制限があり、定期的に変更される。パイプラインは3モデルをローテーションすることで対応するが、全てが上限に達した場合、レビューは評価ベースの感情分析にフォールバックする。回避策:パイプライン変数のreviews_per_productを減少させる、Geminiブロック内のバッチ間にtime.sleep(60)を追加する、またはGoogle AIプロジェクトで課金機能を有効化しクォータを増やす。 現在のクォータはGoogleのレート制限ページで確認してください。
  • パイプラインブロックが赤色(失敗)で表示される場合パイプラインのログページ(左サイドバーからアクセス可能)でエラーを確認してください。ブロック名やログレベルでフィルタリングできます。主な原因:APIトークンの期限切れ、Bright Data APIのネットワークタイムアウト(ブロック内のmax_wait_secondsを増やす)、Geminiの応答が有効なJSONでない場合(ブロックの@test関数がこれを捕捉します)。
  • Apple Silicon環境でDocker Composeの動作が遅い/失敗する場合。Mage AIイメージはマルチアーキテクチャ対応でARMでも動作しますが、初回プルに時間がかかることがあります。メモリエラーでビルドが失敗する場合は、Docker Desktopの設定→リソースでメモリ割り当てを4GB以上に増やしてください。

次のステップ

Amazon商品データを収集し、AIによるレビュー分析を実行し、すべてをPostgreSQLに保存する動作するパイプラインが完成しました。プロキシもパーサーも、触るのが怖いcronジョブも不要です。

手順に沿って進めた方は、これを自分用にカスタマイズしてください。メタデータ.yaml内のキーワードリストを別の商品カテゴリに置き換えるだけで、コード変更は不要です。さらに詳細なカスタマイズには、特定のASINを指定するか、別のBright Dataスクレイパーに完全に切り替えてください。

初めての方?[Bright Dataの無料トライアルから始める]()(クレジットカード不要)デモリポジトリをクローン → `docker compose up`を実行。

よくある質問

この設定に関するよくある質問:

PythonでAmazon商品データをスクレイピングするには?

requestsとBeautifulSoupで独自スクレイパーを構築する方法(Amazonのレイアウト変更で動作しなくなる)か、単一API呼び出しで構造化JSONを返すBrightDataのAmazonスクレイパーを利用する方法があります。スタンドアロンのPython例はAmazon Scraperリポジトリを参照。詳細な解説はBright Dataの完全なAmazonスクレイピングガイドをご覧ください

Bright DataでAmazonをスクレイピングする費用は?

ウェブスクレイピングAPIは従量課金制で、収集したレコード1,000件ごとに課金されます。Geminiの無料プランではAI分析が対象です。新規アカウントは無料トライアルを利用できます。現在の料金は価格ページをご確認ください。

このパイプラインでウォルマート、eBay、その他のeコマースサイトもスクレイピングできますか?

データローダーブロック内のDATASET_IDを交換し、入力パラメータを新しいスクレイパーのスキーマに合わせて調整してください。Geminiの分析とパイプライン構造は引き継がれますが、エンリッチメントブロックとエクスポートブロックでは列名の調整が必要になる場合があります。

Amazonがページレイアウトを変更した場合、どうなりますか?

ユーザー側での対応は不要です。Bright Dataがパーサーを管理するため、AmazonがHTMLを更新しても、API呼び出しと応答形式は通常変更されません。

Geminiが必要ですか?他のLLMは使えますか?

Geminiなしでもパイプラインは動作します(評価ベースの感情分析にフォールバック)。別のLLM(OpenAI、Claude、Llama)に切り替えるには、Geminiブロック内のanalyze_reviews関数を修正してください。プロンプト形式は変更不要で、API呼び出しのみ変更します。