/>
S
Shineos Tech Blog

目次

Claude CodeとMCP Serverによる次世代開発環境の構築:実践的アーキテクチャと実装

Claude CodeとMCP Serverによる次世代開発環境の構築:実践的アーキテクチャと実装

| Shineos Dev Team
Share:

この記事の要点

  • MCP (Model Context Protocol) の基本概念と、AIアシスタントが開発ツールと安全に連携するためのアーキテクチャ
  • TypeScriptにおけるAsyncLocalStorageとPrisma Client Extensionsを活用した高パフォーマンスなMCPサーバーの実装手法
  • PythonとSQLAlchemy 2.0記法を用いた、データベースリソースを安全に公開するMCPサーバーの構築
  • 既存のプラグイン方式やRAGと比較したMCPの優位性と、ビジネスユースケースでの適用範囲

はじめに

現代のソフトウェア開発において、AIアシスタントは単なるコード補完ツールを超え、設計やデバッグのパートナーとして不可欠な存在となっています。しかし、多くのエンジニアが直面する課題があります。それは、AIがプロジェクトの内部構造やデータベーススキーマ、社内ドキュメントなどの「コンテキスト」を十分に理解できていないという点です。

従来の解決策として、プロンプトにコードを貼り付ける方法や、RAG(検索拡張生成)によるドキュメント検索がありましたが、これらはリアルタイムのデータアクセスやツールの実行において柔軟性に欠けていました。この問題を解決するために登場したのが、Anthropicが提唱する MCP (Model Context Protocol) です。

本記事では、Claude CodeとMCP Serverを活用し、AIが開発環境のリソースやツールを直接かつ安全に操作できる仕組みを技術的に解説します。実務で即座に応用できる実装コードを中心に、深い技術的な洞察を提供します。

MCP (Model Context Protocol) とは?

MCPは、AIアシスタント(クライアント)とローカルまたはクラウドのリソース・ツール(サーバー)との間で標準化された通信を行うためのオープンプロトコルです。具体的には、LLMがデータベースをクエリしたり、ファイルシステムにアクセスしたり、APIを実行したりするための「共通言語」を提供します。

このプロトコルの核心は、「リソース (Resources)」「ツール (Tools)」 という2つの概念です。リソースは読み取り専用のデータ(設定ファイル、DBスキーマなど)を指し、ツールは実行可能な操作(データの作成、更新、APIリクエストなど)を指します。これらをJSON-RPC 2.0ベースのプロトコルで標準化することで、AI側は特定の実装詳細を知らなくても、統一されたインターフェースを通じてシステムと対話できます。

従来のカスタムAPI連携と異なり、MCPはトランスポート層(stdioやSSEなど)を抽象化しており、開発者はプロトコルの実装よりもビジネスロジックの実装に集中できます。

アーキテクチャと内部動作

MCPのアーキテクチャは、クライアント(Claude CodeやIDE)、ホスト(MCPクライアントライブラリ)、サーバー(MCP Server)の3つの主要なコンポーネントで構成されます。

クライアントがツールの実行を要求すると、ホストを介してJSON-RPCリクエストがサーバーに送信されます。サーバーはリクエストを解析し、バリデーションを行った後、実際の処理(DBクエリやファイル操作)を実行します。その結果をJSON-RPCレスポンスとして返却し、クライアントがそれを解釈してユーザーに提示や次のアクションの提案を行います。

通信は双方向であり、サーバーからクライアントへ通知を送ることも可能です。これにより、ログのストリーミングや長時間実行されるタスクの進捗報告など、動的なインタラクションが実現されています。

sequenceDiagram
    participant User as ユーザー
    participant Claude as Claude Code (Client)
    participant Host as MCP Host
    participant Server as MCP Server
    participant DB as Database

    User->>Claude: "ユーザー一覧を取得して"
    Claude->>Host: tools/list (利用可能ツールの確認)
    Host-->>Claude: tools (user_search)
    Claude->>Host: tools/call (user_search実行リクエスト)
    Host->>Server: JSON-RPC Request (tool call)
    Server->>DB: SELECT * FROM users
    DB-->>Server: Query Result
    Server-->>Host: JSON-RPC Response (Result)
    Host-->>Claude: Response Data
    Claude-->>User: "以下のユーザーが見つかりました..."

実装例 1: TypeScriptによるPrisma活用サーバー

最初の実装例では、TypeScriptを使用してデータベースアクセスを行うMCPサーバーを構築します。要件にある通り、最新のPrisma Client Extensionsを使用し、リクエストごとのトレーシングのために AsyncLocalStorage を活用します。これにより、どのAIリクエストがどのDBクエリを発行したかを特定しやすくし、デバッグや監査を容易にします。

このサーバーは user_lookup というツールを提供し、メールアドレスを基にユーザー情報を検索します。

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { PrismaClient } from "@prisma/client";
import { AsyncLocalStorage } from "async_hooks";

// リクエストコンテキストの型定義
interface RequestContext {
  requestId: string;
  timestamp: number;
}

// AsyncLocalStorageでリクエストスコープを管理
const asyncLocalStorage = new AsyncLocalStorage<RequestContext>();

// Prisma Clientの初期化
const prisma = new PrismaClient();

// Prisma Client Extensionsを用いたロギング拡張
const xprisma = prisma.$extends({
  query: {
    $allOperations({ operation, model, args, query }) {
      const context = asyncLocalStorage.getStore();
      const requestId = context?.requestId || "unknown";
      
      console.log(`[DB Log] Req: ${requestId} | Model: ${model} | Op: ${operation}`);
      
      return query(args);
    },
  },
});

// MCPサーバーのインスタンス化
const server = new Server(
  {
    name: "user-db-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 利用可能なツールの定義
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "user_lookup",
        description: "Retrieve user details by email address",
        inputSchema: {
          type: "object",
          properties: {
            email: {
              type: "string",
              format: "email",
              description: "The email address of the user to lookup",
            },
          },
          required: ["email"],
        },
      },
    ],
  };
});

// ツール実行のハンドリング
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const context = {
    requestId: crypto.randomUUID(),
    timestamp: Date.now(),
  };

  return asyncLocalStorage.run(context, async () => {
    try {
      if (request.params.name === "user_lookup") {
        const args = request.params.arguments as { email: string };
        
        // バリデーション
        if (!args.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(args.email)) {
          throw new Error("Invalid email format provided.");
        }

        // 拡張したPrisma Clientを使用
        const user = await xprisma.user.findUnique({
          where: { email: args.email },
          select: {
            id: true,
            name: true,
            email: true,
            role: true,
            createdAt: true,
          },
        });

        if (!user) {
          return {
            content: [
              {
                type: "text",
                text: JSON.stringify({ error: "User not found" }, null, 2),
              },
            ],
          };
        }

        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(user, null, 2),
            },
          ],
        };
      }
      
      throw new Error(`Tool ${request.params.name} not found`);
    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : String(error);
      console.error(`[Error] Req: ${context.requestId} | ${errorMessage}`);
      
      return {
        content: [
          {
            type: "text",
            text: JSON.stringify({ error: errorMessage }, null, 2),
          },
        ],
        isError: true,
      };
    }
  });
});

// サーバーの起動
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("User DB MCP Server running on stdio");
}

main().catch((error) => {
  console.error("Fatal error in main():", error);
  process.exit(1);
});

実装例 2: Pythonによるログ解析サーバー

次にPythonの実装例です。ここでは、アプリケーションのログファイルをAIが解析できるようにする「リソース」ベースのサーバーを作成します。PythonのMCP SDKを使用し、SQLAlchemy 2.0以降のスタイルでログデータベースへのアクセスも併せて実装します。

このコードでは、ログファイルを動的に読み取るリソースと、エラーログをフィルタリングするツールの2つを提供します。

import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Any

from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource,
    Tool,
    TextContent,
    ImageContent,
    EmbeddedResource,
)
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# ロギングの設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("log-server")

# SQLAlchemy 2.0 非同期設定
# (仮定: Logモデルは別途定義されているとします)
DATABASE_URL = "sqlite+aiosqlite:///./logs.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# サーバーのインスタンス化
server = Server("log-analysis-server")

# リソース定義: 最新のエラーログファイルを提供
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
    """
    利用可能なリソースのリストを返却
    """
    return [
        Resource(
            uri="file:///var/log/app/error.log",
            name="Application Error Log",
            description="Real-time error log stream",
            mimeType="text/plain",
        )
    ]

@server.read_resource()
async def handle_read_resource(uri: str) -> str:
    """
    リソースの内容を読み取る
    """
    if uri == "file:///var/log/app/error.log":
        try:
            # 非同期でファイルを読み込む(例: aiofilesを使用するのが望ましいが、ここでは擬似的に実装)
            with open("/var/log/app/error.log", "r") as f:
                # 最新の50行のみを返す
                lines = f.readlines()
                return "".join(lines[-50:])
        except FileNotFoundError:
            logger.error(f"Log file not found at {uri}")
            return "Error log file not found."
        except Exception as e:
            logger.error(f"Failed to read log: {e}")
            return f"Error reading log: {str(e)}"
    
    raise ValueError(f"Resource not found: {uri}")

# ツール定義: 過去24時間のエラー集計
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
    return [
        Tool(
            name="get_error_summary",
            description="Get a summary of errors from the database in the last 24 hours",
            inputSchema={
                "type": "object",
                "properties": {
                    "limit": {
                        "type": "number",
                        "description": "Maximum number of errors to return",
                        "default": 10,
                    },
                },
            },
        )
    ]

@server.call_tool()
async def handle_call_tool(name: str, arguments: Any) -> list[TextContent | ImageContent | EmbeddedResource]:
    """
    ツールの実行ロジック
    """
    if name == "get_error_summary":
        limit = arguments.get("limit", 10)
        
        async with async_session() as session:
            try:
                # SQLAlchemy 2.0 スタイルのクエリ
                # 過去24時間のレコードを検索
                since = datetime.now() - timedelta(days=1)
                stmt = (
                    select(Log)
                    .where(Log.timestamp >= since, Log.level == "ERROR")
                    .order_by(Log.timestamp.desc())
                    .limit(limit)
                )
                
                result = await session.execute(stmt)
                errors = result.scalars().all()
                
                # 結果をJSONシリアライズ可能な形式に変換
                output_data = [
                    {
                        "id": err.id,
                        "message": err.message,
                        "timestamp": err.timestamp.isoformat(),
                        "context": err.context,
                    }
                    for err in errors
                ]
                
                return [TextContent(
                    type="text",
                    text=json.dumps(output_data, indent=2)
                )]
                
            except Exception as e:
                logger.exception("Database query failed")
                return [TextContent(
                    type="text",
                    text=json.dumps({"error": str(e)})
                )]
    
    raise ValueError(f"Unknown tool: {name}")

async def main():
    # サーバーの起動
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="log-analysis-server",
                server_version="1.0.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    asyncio.run(main())

実装例 3: TypeScriptによるファイルシステム操作とリソース監視

最後の実装例は、プロジェクトのソースコード構造をAIが理解するための支援を行うサーバーです。単なるファイル読み取りではなく、特定のパターンに一致するファイルを検索したり、依存関係を分析したりする機能を提供します。

ここでは、ファイルシステムへのアクセスをカプセル化し、セキュリティのために許可されたディレクトリ外へのアクセスを防ぐガード機能を実装します。

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";

// 設定: アクセスを許可するルートディレクトリ
const ALLOWED_ROOT = process.env.PROJECT_ROOT || process.cwd();

// セキュリティ: パスがルート内にあるか検証
function validatePath(inputPath: string): string {
  const resolvedPath = path.resolve(ALLOWED_ROOT, inputPath);
  if (!resolvedPath.startsWith(ALLOWED_ROOT)) {
    throw new Error(`Access denied: Path ${inputPath} is outside allowed root`);
  }
  return resolvedPath;
}

const server = new Server(
  {
    name: "project-structure-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
      resources: {},
    },
  }
);

// ツール: ファイル検索(grep的な機能)
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "search_files",
        description: "Search for files containing a specific string within the project",
        inputSchema: {
          type: "object",
          properties: {
            pattern: {
              type: "string",
              description: "String or regex pattern to search for",
            },
            fileExtension: {
              type: "string",
              description: "Filter by file extension (e.g., 'ts', 'tsx')",
              default: "",
            },
          },
          required: ["pattern"],
        },
      },
      {
        name: "list_dependencies",
        description: "List dependencies used in TypeScript/JavaScript files",
        inputSchema: {
          type: "object",
          properties: {
            targetPath: {
              type: "string",
              description: "Relative path to the file or directory to analyze",
            },
          },
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    if (name === "search_files") {
      const { pattern, fileExtension } = args as { pattern: string; fileExtension?: string };
      const results: string[] = [];
      
      // 再帰的なディレクトリ探索(簡易版)
      async function searchDir(dir: string) {
        const entries = await fs.readdir(dir, { withFileTypes: true });
        for (const entry of entries) {
          const fullPath = path.join(dir, entry.name);
          if (entry.isDirectory()) {
            // node_modulesなどは除外
            if (entry.name !== "node_modules" && entry.name !== ".git") {
              await searchDir(fullPath);
            }
          } else if (entry.isFile()) {
            if (fileExtension && !fullPath.endsWith(`.${fileExtension}`)) continue;
            
            const content = await fs.readFile(fullPath, "utf-8");
            if (content.includes(pattern)) {
              results.push(fullPath.replace(ALLOWED_ROOT, ""));
            }
          }
        }
      }

      await searchDir(ALLOWED_ROOT);
      
      return {
        content: [
          {
            type: "text",
            text: JSON.stringify({ matches: results }, null, 2),
          },
        ],
      };
    }

    if (name === "list_dependencies") {
      const { targetPath } = args as { targetPath?: string };
      const target = validatePath(targetPath || ".");
      
      // package.jsonがあれば読み込む
      const pkgPath = path.join(target, "package.json");
      try {
        const pkgContent = await fs.readFile(pkgPath, "utf-8");
        const pkgJson = JSON.parse(pkgContent);
        const deps = {
          ...pkgJson.dependencies,
          ...pkgJson.devDependencies,
        };
        
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify({ path: target, dependencies: Object.keys(deps) }, null, 2),
            },
          ],
        };
      } catch (e) {
        // package.jsonがない場合、TSファイルからimport文を抽出するロジックなどが考えられるが、
        // ここでは簡易的にエラーを返す
        throw new Error("package.json not found in target path");
      }
    }

    throw new Error(`Tool ${name} not found`);
  } catch (error) {
    return {
      content: [
        {
          type: "text",
          text: JSON.stringify({ error: error instanceof Error ? error.message : String(error) }),
        },
      ],
      isError: true,
    };
  }
});

// リソース: 特定の設定ファイルを読み取る
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "file:///project/package.json",
        name: "Project Package Config",
        mimeType: "application/json",
      },
      {
        uri: "file:///project/tsconfig.json",
        name: "TypeScript Config",
        mimeType: "application/json",
      },
    ],
  };
});

server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const uri = request.params.uri;
  let filename = "";
  
  if (uri.includes("package.json")) filename = "package.json";
  else if (uri.includes("tsconfig.json")) filename = "tsconfig.json";
  else throw new Error("Unknown resource URI");

  const filePath = validatePath(filename);
  const content = await fs.readFile(filePath, "utf-8");
  
  return {
    contents: [
      {
        uri,
        mimeType: "application/json",
        text: content,
      },
    ],
  };
});

async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Project Structure MCP Server running");
}

main().catch((error) => {
  console.error("Server error:", error);
  process.exit(1);
});

技術的な選択肢の比較

MCPを導入する際、既存の手法や他の統合アプローチと比較検討することが重要です。以下に、MCP、カスタムAPIラッパー、および従来のRAG(検索拡張生成)を比較した表を示します。

アプローチメリットデメリット適したケース
MCP Server標準化されたプロトコルによりクライアント依存性が低い。ツールとリソースの明確な分離。双方向通信が可能。プロトコルの学習コストがわずかに発生する。サーバー側の実装が必要。AIエージェントと開発ツールの深い統合が必要な場合。複数のAIツールで同じサーバーを再利用したい場合。
カスタムAPI Wrapper既存のバックエンドAPIをそのまま利用可能。実装が最も単純。AIがAPI仕様を理解するためのプロンプトエンジニアリングが必要。セキュリティ制御(権限管理)が複雑になりがち。既存のREST APIをAIに利用させるだけの簡易的な場合。プロトコルの標準化が不要な社内ツール。
RAG (検索拡張)静的なドキュメントや知識ベースの検索に最適。導入が容易。リアルタイムデータの反映が難しい。データの更新(書き込み)操作が苦手。検索精度に依存する。社内Wikiやドキュメントの検索、過去のチケット履歴の参照など、読み取り専用の知識提供を行う場合。

ビジネスユースケース:レガシーシステムのマイグレーション支援

具体的なビジネスユースケースとして、大規模なレガシーシステムのモダナイゼーションプロジェクトにおけるMCPの活用が挙げられます。

多くの企業において、10年以上前に構築されたモノリシックなアプリケーションが依然として稼働しており、その保守・改修は困難を極めます。これらのシステムでは、ドキュメントが不足していたり、コードと実際のDBスキーマが乖離していたりすることが一般的です。

MCP Serverを導入することで、以下のようなプロセスを自動化・効率化できます。

  1. スキーマの動的解析: MCP Serverを通じて、レガシーデータベースの現在のスキーマ情報をAIが直接クエリし、最新の状態を把握します。
  2. コードの関連付け: ソースコード内の特定の関数がどのテーブルを参照しているかを、静的解析と実行時ログの両方からAIがクロスチェックします。
  3. 移行計画の生成: 現在のデータ構造とビジネスロジックに基づき、マイクロサービス化への移行プランや、新しいORM(Prismaなど)用のスキーマ定義ファイルをAIが自動生成します。

これにより、エンジニアは「翻訳作業」から解放され、アーキテクチャの設計決定など、より高付加価値な業務に集中できるようになります。

よくある質問

MCP Serverのホスティング方法はどうなりますか?クラウド上で実行することも可能ですか?

はい、可能です。MCPはトランスポート層が抽象化されているため、ローカルマシンでのstdio通信だけでなく、SSE (Server-Sent Events) を用いたHTTP通信もサポートしています。これにより、社内の安全なVPC内にMCP Serverをホストし、ファイアウォール内のデータベースやAPIにアクセスさせるアーキテクチャが構築可能です。クラウド上で実行する場合は、認証機構(API KeyやOAuth)をサーバー側に実装し、不正アクセスを防ぐ設定が推奨されます。

既存のLangChainやAutoGPTなどのエージェントフレームワークとMCPは競合しますか?

いいえ、競合しません。むしろ補完関係にあります。LangChainなどは「AIがどのようにツールを組み合わせてタスクを達成するか(推論)」を担当するフレームワークです。対してMCPは「AIとツールがどのように通信するか(インターフェース)」を規定するプロトコルです。将来的にはLangChainがMCPクライアントとして機能し、MCP Serverで定義されたツールを利用するような統合も期待されています。現在はClaude Codeなどの特定のクライアントでの利用が主流ですが、エコシステムは拡大しています。

MCP Serverの開発における主なデバッグ方法は何ですか?

MCP Serverは基本的にstdio(標準入出力)を通じてJSON-RPCメッセージをやり取りするため、通常のHTTP APIのようなcurlコマンドでのデバッグは直感的ではありません。開発時には、MCP_DEBUG 環境変数を設定してログレベルを上げる、あるいはMCP Inspectorと呼ばれる公式のデバッグツールを使用するのが一般的です。これにより、送受信されるJSONメッセージのペイロードをリアルタイムで確認でき、通信エラーやスキーマの不整合を迅速に特定できます。また、サーバー側の実装ロジックについては、通常のユニットテストや統合テストを併用することが重要です。

おわりに

Claude CodeとMCP Serverを活用することで、AIアシスタントは単なるチャットボットから、開発環境の一部として深く統合された「インテリジェントエージェント」へと進化します。本記事で解説した実装例は、その第一歩に過ぎません。データベースへのアクセス、ファイルシステムの操作、ログ解析といった日々の開発タスクをAIに委譲することで、エンジニアの生産性は飛躍的に向上するでしょう。

Shineos Dev Teamでは、AIと開発プロセスの統合に関するコンサルティングや開発支援を行っています。貴社の開発環境に最適なMCP Serverの設計・構築をお手伝いしますので、お気軽にお問い合わせください。

参考リンク

[1] Model Context Protocol Specification [2] Anthropic Claude Code Documentation [3] Prisma Client Extensions Guide [4] SQLAlchemy 2.0 Documentation