liminfo

Claude API Practical Examples Collection

7 advanced patterns you can immediately apply in production with the Claude API. Covers structured JSON output, multi-turn conversation management, streaming responses, image analysis (Vision), prompt caching, Batch API, and error handling patterns with Python & TypeScript code.

Claude API examplesClaude structured outputClaude streamingClaude Vision image analysisClaude prompt cachingClaude Batch APIAnthropic API advancedClaude multi-turn conversation

Problem

Seven practical challenges commonly encountered in Claude API projects: 1. Difficult to parse API responses — need structured output like JSON reliably 2. Chatbot forgets previous conversation — multi-turn conversation management needed 3. Users wait too long for lengthy responses — real-time streaming output needed 4. Need to analyze text or content within images — Vision (image input) required 5. Sending the same system prompt repeatedly spikes costs — prompt caching needed for savings 6. Need to process thousands of requests at once — Batch API required 7. Must handle 429 Rate Limits, network errors, etc. in production — retry patterns needed Model pricing (input/output per MTok): Opus 4.6 ($5/$25), Sonnet 4.5 ($3/$15), Haiku 4.5 ($1/$5) Tool Use (function calling) is covered in a separate guide, so this article focuses on the other advanced patterns.

Required Tools

Anthropic Python SDK

pip install anthropic — Requires Python 3.8+. Supports all advanced features including streaming and Batch API.

Anthropic TypeScript SDK

npm install @anthropic-ai/sdk — Requires Node.js 18+. Provides the same features as the Python SDK.

Python / TypeScript

All examples are provided in both languages. Setting the ANTHROPIC_API_KEY environment variable is required.

Solution Steps

1

Structured Output — Extracting Data in JSON Mode

By requesting responses from Claude in JSON format, you can get structured data that is easy to parse. Specifying JSON output in both the system prompt and user prompt increases reliability. Key points: - Explicitly state "respond only in JSON format" in the system prompt - Providing an example of the desired JSON schema significantly improves accuracy - Parse the response with json.loads() / JSON.parse() - Prefill technique: setting the start of the assistant message to "{" forces JSON output Practical uses: extracting title/summary/keywords/sentiment from news articles, parsing resumes, structuring log analysis results

import anthropic
import json

client = anthropic.Anthropic()

# ============================================
# Structured JSON output extraction
# ============================================
def extract_structured_data(text: str) -> dict:
    """Extract structured data from unstructured text."""
    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=1024,
        system="""You are a data extraction specialist.
Extract information from the given text and respond ONLY in the following JSON format.
Do not include any text outside of JSON.

{
  "title": "Title",
  "summary": "Summary in 2 lines or less",
  "keywords": ["keyword1", "keyword2", "keyword3"],
  "sentiment": "positive | negative | neutral",
  "confidence": 0.0 ~ 1.0
}""",
        messages=[
            {"role": "user", "content": f"Analyze the following text:\n\n{text}"},
            # Prefill: force the assistant response to start with "{"
            {"role": "assistant", "content": "{"},
        ],
    )

    # Prepend "{" since we used it as prefill
    raw = "{" + response.content[0].text
    return json.loads(raw)

# Test
article = """Apple reported record Q2 earnings with revenue of $94.8 billion,
up 5% year over year. iPhone sales reached $46.8 billion, driven by strong
demand for the iPhone 16 Pro series. The Services segment also hit an
all-time high of $24.2 billion, with AI-powered features driving adoption."""

result = extract_structured_data(article)
print(json.dumps(result, indent=2))
# {
#   "title": "Apple Reports Record Q2 Earnings",
#   "summary": "Apple Q2 revenue $94.8B, up 5% YoY...",
#   "keywords": ["Apple", "earnings", "iPhone"],
#   "sentiment": "positive",
#   "confidence": 0.95
# }

# ============================================
# TypeScript version
# ============================================
# import Anthropic from "@anthropic-ai/sdk";
# const client = new Anthropic();
#
# const response = await client.messages.create({
#   model: "claude-sonnet-4-5-20250514",
#   max_tokens: 1024,
#   system: "Respond only in JSON format...",
#   messages: [
#     { role: "user", content: `Analyze: ${text}` },
#     { role: "assistant", content: "{" },
#   ],
# });
# const data = JSON.parse("{" + response.content[0].text);
2

Multi-Turn Conversation Management — Implementing Chat History

The Claude API is stateless, so to maintain conversation context, you must include the full conversation history in the messages array with each request. Key points: - Include all previous messages in the messages array (user -> assistant -> user -> assistant -> ...) - As conversations grow longer, token costs increase, so context window management is important - Sliding window: keep only the most recent N turns to reduce costs - Summarization approach: summarize older conversations and include them in the system prompt Practical tips: - Monitor current token usage via response.usage.input_tokens - When approaching the model's context window (200K), clean up old messages

import anthropic

client = anthropic.Anthropic()

# ============================================
# Multi-turn conversation management class
# ============================================
class Conversation:
    def __init__(
        self,
        model: str = "claude-sonnet-4-5-20250514",
        system: str = "You are a helpful AI assistant.",
        max_history: int = 20,  # Max message pairs to keep
    ):
        self.model = model
        self.system = system
        self.max_history = max_history
        self.messages: list[dict] = []
        self.total_tokens = 0

    def chat(self, user_message: str) -> str:
        """Send a user message and get a response."""
        self.messages.append({"role": "user", "content": user_message})

        # Sliding window: keep only the most recent N pairs
        if len(self.messages) > self.max_history * 2:
            self.messages = self.messages[-(self.max_history * 2):]

        response = client.messages.create(
            model=self.model,
            max_tokens=4096,
            system=self.system,
            messages=self.messages,
        )

        assistant_text = response.content[0].text
        self.messages.append({"role": "assistant", "content": assistant_text})
        self.total_tokens += response.usage.input_tokens + response.usage.output_tokens

        return assistant_text

    def get_token_usage(self) -> int:
        return self.total_tokens

    def clear(self):
        self.messages = []

# ============================================
# Usage example
# ============================================
conv = Conversation(system="You are a Python programming tutor. Explain concepts clearly.")

print(conv.chat("What is the difference between a list and a tuple in Python?"))
# -> A list is mutable while a tuple is immutable...

print(conv.chat("When should I use a tuple instead?"))
# -> (Remembering previous context) Tuples can be used as dictionary keys...

print(conv.chat("Show me a code example of what you just described"))
# -> (Maintaining context) Provides code examples

print(f"Total tokens used: {conv.get_token_usage()}")
3

Streaming Responses — Real-Time Output

Instead of waiting for the entire response, you can receive tokens in real-time and progressively display them to the user. Streaming dramatically improves UX and minimizes TTFT (Time To First Token). Key points: - Python: use client.messages.stream() context manager - TypeScript: use client.messages.stream() method - Event types: text (text chunks), message_start, content_block_start, content_block_stop, etc. - Use stream.text_stream for convenient iteration over text only Practical uses: - Implement ChatGPT-like typing effects in web apps - Deliver to clients via SSE (Server-Sent Events) from the server - Show long analysis results partially as they become available

import anthropic

client = anthropic.Anthropic()

# ============================================
# Python streaming - simple approach
# ============================================
def stream_response(prompt: str):
    """Stream the response in real-time."""
    with client.messages.stream(
        model="claude-sonnet-4-5-20250514",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
        print()  # newline

    # Access the final message object
    final = stream.get_final_message()
    print(f"\nTokens: input {final.usage.input_tokens}, output {final.usage.output_tokens}")

stream_response("Explain Python's asyncio in under 500 words.")

# ============================================
# Python streaming - event-based (for SSE servers)
# ============================================
def stream_events(prompt: str):
    """Process streaming at the event level."""
    with client.messages.stream(
        model="claude-sonnet-4-5-20250514",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        for event in stream:
            if event.type == "content_block_delta":
                if hasattr(event.delta, "text"):
                    yield event.delta.text  # Return as generator

# Use in Flask/FastAPI SSE endpoint:
# for chunk in stream_events("Explain this"):
#     yield f"data: {json.dumps({'text': chunk})}\n\n"

# ============================================
# TypeScript streaming
# ============================================
# import Anthropic from "@anthropic-ai/sdk";
# const client = new Anthropic();
#
# const stream = client.messages.stream({
#   model: "claude-sonnet-4-5-20250514",
#   max_tokens: 2048,
#   messages: [{ role: "user", content: "Explain asyncio" }],
# });
#
# // Method 1: Event listener
# stream.on("text", (text) => process.stdout.write(text));
#
# // Method 2: Async iterator
# for await (const event of stream) {
#   if (event.type === "content_block_delta" &&
#       event.delta.type === "text_delta") {
#     process.stdout.write(event.delta.text);
#   }
# }
#
# const finalMessage = await stream.finalMessage();
# console.log(finalMessage.usage);
4

Image Analysis (Vision) — Image Input + Text Analysis

Claude can accept images as input and analyze their content. Images are passed via base64 encoding or URL, and can be sent alongside text. Supported formats: JPEG, PNG, GIF, WebP Maximum size: approximately 5MB per image Multiple images: multiple images can be included in a single message Key points: - Include both image and text blocks in the content array - image block source.type: "base64" (file) or "url" (web image) - media_type: "image/jpeg", "image/png", "image/gif", "image/webp" Practical uses: OCR (text extraction), chart/graph analysis, product image classification, UI screenshot review, preliminary medical imaging analysis

import anthropic
import base64

client = anthropic.Anthropic()

# ============================================
# Method 1: Analyze local image file (base64)
# ============================================
def analyze_image_file(image_path: str, question: str) -> str:
    """Analyze a local image file."""
    with open(image_path, "rb") as f:
        image_data = base64.standard_b64encode(f.read()).decode("utf-8")

    # Determine media_type from extension
    ext = image_path.rsplit(".", 1)[-1].lower()
    media_types = {"jpg": "image/jpeg", "jpeg": "image/jpeg",
                   "png": "image/png", "gif": "image/gif", "webp": "image/webp"}
    media_type = media_types.get(ext, "image/jpeg")

    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": media_type,
                        "data": image_data,
                    },
                },
                {"type": "text", "text": question},
            ],
        }],
    )
    return response.content[0].text

# Usage
# result = analyze_image_file("receipt.jpg", "Extract the total and all items as JSON.")

# ============================================
# Method 2: Analyze image from URL
# ============================================
def analyze_image_url(image_url: str, question: str) -> str:
    """Analyze an image from a web URL."""
    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "source": {"type": "url", "url": image_url},
                },
                {"type": "text", "text": question},
            ],
        }],
    )
    return response.content[0].text

# ============================================
# Method 3: Compare multiple images
# ============================================
def compare_images(image_paths: list[str], question: str) -> str:
    """Compare and analyze multiple images."""
    content = []
    for path in image_paths:
        with open(path, "rb") as f:
            data = base64.standard_b64encode(f.read()).decode("utf-8")
        content.append({
            "type": "image",
            "source": {"type": "base64", "media_type": "image/jpeg", "data": data},
        })
    content.append({"type": "text", "text": question})

    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=2048,
        messages=[{"role": "user", "content": content}],
    )
    return response.content[0].text

# compare_images(["before.jpg", "after.jpg"], "Describe the differences between these images.")
5

Prompt Caching — Cost Reduction Patterns

Prompt caching caches repeatedly used long system prompts or documents, reducing costs by up to 90%. Key points: - Add cache_control: {"type": "ephemeral"} to the block you want to cache - Minimum caching size: 1024 tokens (Haiku), 2048 tokens (Sonnet and above) - Cache TTL: 5 minutes (cache hit if the same request comes within 5 minutes) - On cache hit: 90% discount on input cost for cached tokens - On cache miss: 25% additional cost for writing to cache When to use: - Sending a long system prompt repeatedly with every request - Using the same document for multiple questions in RAG - Repeatedly analyzing the same codebase for code reviews

import anthropic

client = anthropic.Anthropic()

# ============================================
# Prompt caching: caching a long system prompt
# ============================================

# Example: including a 10,000-token manual in the system prompt
long_manual = "Full product manual content... (thousands of lines)" * 100  # Assume long text

response = client.messages.create(
    model="claude-sonnet-4-5-20250514",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "You are a customer support specialist. Answer based on the product manual below.",
        },
        {
            "type": "text",
            "text": long_manual,
            "cache_control": {"type": "ephemeral"},  # Cache this block
        },
    ],
    messages=[
        {"role": "user", "content": "How do I replace the battery?"}
    ],
)

# Check cache usage
usage = response.usage
print(f"Input tokens: {usage.input_tokens}")
print(f"Cache creation tokens: {getattr(usage, 'cache_creation_input_tokens', 0)}")
print(f"Cache read tokens: {getattr(usage, 'cache_read_input_tokens', 0)}")
# First call: cache_creation_input_tokens = ~10000 (cache write, 25% extra)
# Subsequent calls: cache_read_input_tokens = ~10000 (cache hit, 90% discount!)

# ============================================
# Prompt caching: RAG document caching
# ============================================
def ask_with_cached_context(context: str, questions: list[str]) -> list[str]:
    """Use caching when asking multiple questions about the same document."""
    answers = []
    for q in questions:
        response = client.messages.create(
            model="claude-sonnet-4-5-20250514",
            max_tokens=1024,
            system=[
                {
                    "type": "text",
                    "text": "Answer questions based on the document below.",
                },
                {
                    "type": "text",
                    "text": context,
                    "cache_control": {"type": "ephemeral"},
                },
            ],
            messages=[{"role": "user", "content": q}],
        )
        answers.append(response.content[0].text)
    return answers
    # Only the first question incurs cache write cost;
    # remaining questions get 90% savings via cache hits
6

Batch API — Bulk Processing Patterns

The Message Batches API lets you submit thousands of requests at once and receive results within 24 hours at a 50% cost discount. Key points: - Submit up to 100,000 requests in a single batch - 50% cost discount (due to asynchronous processing) - Results delivered within 24 hours (typically minutes to hours) - Each request is identified by a custom_id for result matching When to use: - Analyzing thousands of resumes or documents - Large-scale classification/tagging tasks on datasets - Bulk processing where real-time responses are not needed

import anthropic
import time

client = anthropic.Anthropic()

# ============================================
# Batch API: bulk request processing
# ============================================

# 1. Create batch requests
batch_requests = []
articles = [
    "Apple reports record quarterly earnings driven by AI...",
    "Tesla faces challenges in European EV market...",
    "Microsoft Azure sees 30% cloud revenue growth...",
    # ... thousands more
]

for i, article in enumerate(articles):
    batch_requests.append({
        "custom_id": f"article-{i}",
        "params": {
            "model": "claude-sonnet-4-5-20250514",
            "max_tokens": 512,
            "messages": [
                {
                    "role": "user",
                    "content": f"Classify the sentiment of this article as positive/negative/neutral "
                               f"and respond in JSON:\n\n{article}",
                }
            ],
        },
    })

# 2. Submit batch
batch = client.messages.batches.create(requests=batch_requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")  # "in_progress"

# 3. Poll for status
while True:
    status = client.messages.batches.retrieve(batch.id)
    print(f"Status: {status.processing_status}")

    if status.processing_status == "ended":
        break
    time.sleep(30)  # Check every 30 seconds

# 4. Retrieve results
results = {}
for result in client.messages.batches.results(batch.id):
    custom_id = result.custom_id
    if result.result.type == "succeeded":
        text = result.result.message.content[0].text
        results[custom_id] = text
    else:
        results[custom_id] = f"Error: {result.result.type}"

print(f"Completed: {len(results)} items")
# Cost: 50% discount compared to regular API!
7

Error Handling & Retry Patterns — Production Essentials

In production environments, robust retry logic for various errors is essential. Major error types: - 429 RateLimitError: Request limit exceeded — retry with exponential backoff - 529 OverloadedError: Server overloaded — wait and retry - 500 InternalServerError: Server error — retryable - 401 AuthenticationError: API key error — not retryable - 400 BadRequestError: Invalid request — not retryable, fix input The Anthropic SDK has built-in auto-retry (max_retries), and custom retry logic is also easy to implement. Recommendations: - Use the SDK's built-in max_retries (default: 2) - Check the Retry-After header on 429 errors - Consider circuit breaker patterns for production

import anthropic
import time

# ============================================
# Method 1: Using SDK built-in auto-retry
# ============================================
client = anthropic.Anthropic(
    max_retries=3,  # Auto-retry on 429, 5xx errors (default: 2)
    timeout=60.0,   # 60-second timeout
)

# ============================================
# Method 2: Custom retry logic (fine-grained control)
# ============================================
def call_with_retry(
    messages: list,
    max_retries: int = 5,
    base_delay: float = 1.0,
    model: str = "claude-sonnet-4-5-20250514",
) -> anthropic.types.Message:
    """API call with exponential backoff retry."""
    client_no_retry = anthropic.Anthropic(max_retries=0)

    for attempt in range(max_retries):
        try:
            return client_no_retry.messages.create(
                model=model,
                max_tokens=4096,
                messages=messages,
            )

        except anthropic.RateLimitError as e:
            # Use Retry-After header if available
            retry_after = getattr(e.response, "headers", {}).get("retry-after")
            delay = float(retry_after) if retry_after else base_delay * (2 ** attempt)
            print(f"Rate limit (429). Waiting {delay:.1f}s before retry ({attempt + 1}/{max_retries})")
            time.sleep(delay)

        except anthropic.InternalServerError:
            delay = base_delay * (2 ** attempt)
            print(f"Server error (500). Waiting {delay:.1f}s before retry ({attempt + 1}/{max_retries})")
            time.sleep(delay)

        except anthropic.APIStatusError as e:
            if e.status_code == 529:  # Overloaded
                delay = base_delay * (2 ** attempt) + 5
                print(f"Server overloaded (529). Waiting {delay:.1f}s before retry")
                time.sleep(delay)
            else:
                # 400, 401, 403, etc. — not retryable
                raise

        except anthropic.APIConnectionError:
            delay = base_delay * (2 ** attempt)
            print(f"Connection failed. Waiting {delay:.1f}s before retry ({attempt + 1}/{max_retries})")
            time.sleep(delay)

    raise RuntimeError(f"Failed after {max_retries} retries")

# ============================================
# TypeScript version
# ============================================
# import Anthropic from "@anthropic-ai/sdk";
#
# // SDK built-in retry
# const client = new Anthropic({ maxRetries: 3, timeout: 60_000 });
#
# // Custom retry
# async function callWithRetry(messages, maxRetries = 5) {
#   const client = new Anthropic({ maxRetries: 0 });
#   for (let i = 0; i < maxRetries; i++) {
#     try {
#       return await client.messages.create({
#         model: "claude-sonnet-4-5-20250514",
#         max_tokens: 4096,
#         messages,
#       });
#     } catch (e) {
#       if (e instanceof Anthropic.RateLimitError) {
#         await new Promise(r => setTimeout(r, 1000 * 2 ** i));
#       } else if (e instanceof Anthropic.InternalServerError) {
#         await new Promise(r => setTimeout(r, 1000 * 2 ** i));
#       } else {
#         throw e;
#       }
#     }
#   }
#   throw new Error("Max retries exceeded");
# }

Core Code

Summary of 4 core Claude API patterns: (1) force JSON output with prefill, (2) real-time responses with stream(), (3) Vision analysis with image blocks, (4) 90% cost savings with cache_control.

import anthropic, json, base64

client = anthropic.Anthropic()

# 1. Structured output: force JSON with prefill
resp = client.messages.create(
    model="claude-sonnet-4-5-20250514", max_tokens=1024,
    system="Respond only in JSON format.",
    messages=[
        {"role": "user", "content": "Analyze this text..."},
        {"role": "assistant", "content": "{"},
    ],
)
data = json.loads("{" + resp.content[0].text)

# 2. Streaming: real-time text output
with client.messages.stream(
    model="claude-sonnet-4-5-20250514", max_tokens=2048,
    messages=[{"role": "user", "content": "Explain this"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# 3. Vision: image + text analysis
with open("image.jpg", "rb") as f:
    img = base64.standard_b64encode(f.read()).decode()
resp = client.messages.create(
    model="claude-sonnet-4-5-20250514", max_tokens=1024,
    messages=[{"role": "user", "content": [
        {"type": "image", "source": {"type": "base64",
         "media_type": "image/jpeg", "data": img}},
        {"type": "text", "text": "Analyze this image"},
    ]}],
)

# 4. Prompt caching: 90% cost reduction on repeated requests
resp = client.messages.create(
    model="claude-sonnet-4-5-20250514", max_tokens=1024,
    system=[
        {"type": "text", "text": "Base instructions"},
        {"type": "text", "text": long_context,
         "cache_control": {"type": "ephemeral"}},
    ],
    messages=[{"role": "user", "content": "Question"}],
)

Common Mistakes

Requesting JSON output without prefill, leading to unstable parsing

Add {"role": "assistant", "content": "{"} to force the response to start as JSON. When parsing, prepend the "{" you used as prefill. Also wrap the parsing in try/except to handle JSON parse errors.

Not managing the messages array size in multi-turn conversations, causing token cost spikes

Use a sliding window to keep only the most recent N turns, or summarize older conversations and include them in the system prompt. Monitor response.usage.input_tokens to track costs.

Not flushing output during streaming, causing text to appear all at once

In print(text, end="", flush=True), flush=True is essential. For web servers, each chunk must be sent immediately in SSE format.

Specifying wrong media_type when sending images, causing API errors

Specify the correct MIME type matching the file extension: .jpg/.jpeg is "image/jpeg", .png is "image/png", .webp is "image/webp". An incorrect media_type causes a 400 error.

Applying prompt caching to short text under 1024 tokens, actually increasing costs

Caching requires minimum token thresholds (Haiku: 1024, Sonnet: 2048). Applying cache_control to short text only adds the cache write cost (25%) without creating a cache.

Related liminfo Services