liminfo

MongoDB Aggregation Pipeline

A practical guide to performing complex data aggregation, joins, and transformations using MongoDB's Aggregation Pipeline, with index utilization and performance optimization

MongoDB Aggregationaggregation pipeline$group $match $lookupMongoDB data analysisMongoDB performance tuningaggregation pipeline optimizationMongoDB JOIN$unwind $project

Problem

An e-commerce platform needs to generate various business analysis reports based on order, product, and user data stored in MongoDB. Monthly revenue aggregation, popular product rankings by category, and purchase pattern analysis by customer segment are required, and simple find() queries cannot handle these needs. Cross-collection joins are needed, and near real-time response speeds must be maintained on large datasets. You need to build efficient data analysis queries using the Aggregation Pipeline.

Required Tools

MongoDB Shell (mongosh)

The official MongoDB shell for directly executing and testing Aggregation Pipelines.

MongoDB Compass

A GUI tool for visually building Aggregation Pipelines and previewing results at each stage.

Mongoose

A MongoDB ODM for Node.js. Executes pipelines via the aggregate() method with type safety.

explain()

A tool for analyzing pipeline execution plans to determine index usage and identify performance bottlenecks.

Solution Steps

1

Basic Aggregation with $match and $group

$match is equivalent to SQL's WHERE clause, and $group corresponds to GROUP BY. Placing $match early in the pipeline allows index utilization, significantly improving performance. Specify grouping criteria with $group's _id field and aggregate with accumulator operators ($sum, $avg, $max, $min, $count).

// Monthly revenue aggregation
db.orders.aggregate([
  // 1. Filter only completed orders (uses index)
  { $match: {
    status: "completed",
    createdAt: {
      $gte: ISODate("2024-01-01"),
      $lt: ISODate("2025-01-01")
    }
  }},
  // 2. Group by month and aggregate
  { $group: {
    _id: {
      year: { $year: "$createdAt" },
      month: { $month: "$createdAt" }
    },
    totalRevenue: { $sum: "$totalAmount" },
    orderCount: { $count: {} },
    avgOrderValue: { $avg: "$totalAmount" },
    maxOrder: { $max: "$totalAmount" },
    uniqueCustomers: { $addToSet: "$userId" }
  }},
  // 3. Calculate unique customer count
  { $addFields: {
    customerCount: { $size: "$uniqueCustomers" }
  }},
  // 4. Remove unnecessary fields and sort
  { $project: { uniqueCustomers: 0 } },
  { $sort: { "_id.year": 1, "_id.month": 1 } }
])

// Example result:
// { _id: { year: 2024, month: 1 }, totalRevenue: 15420000,
//   orderCount: 342, avgOrderValue: 45087, customerCount: 218 }
2

Cross-Collection JOINs with $lookup

$lookup is equivalent to SQL's LEFT OUTER JOIN and is used to combine data from other collections. There are two forms: basic (equality match) and pipeline (subquery), with the pipeline form being more flexible. $lookup results are returned as arrays, so use $unwind to flatten or $arrayElemAt to extract a single value.

// Join product and user info to order details
db.orders.aggregate([
  { $match: { status: "completed" } },

  // JOIN user info
  { $lookup: {
    from: "users",
    localField: "userId",
    foreignField: "_id",
    as: "user"
  }},
  { $unwind: "$user" },  // array -> single object

  // JOIN order product details (pipeline form - conditional join)
  { $lookup: {
    from: "products",
    let: { productIds: "$items.productId" },
    pipeline: [
      { $match: {
        $expr: { $in: ["$_id", "$$productIds"] }
      }},
      { $project: { name: 1, category: 1, price: 1 } }
    ],
    as: "productDetails"
  }},

  // Extract only needed fields
  { $project: {
    orderNumber: 1,
    totalAmount: 1,
    createdAt: 1,
    "user.name": 1,
    "user.email": 1,
    items: 1,
    productDetails: 1
  }},

  { $limit: 100 }
])

// Popular products by category (multi-collection join + aggregation)
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $unwind: "$items" },  // Separate into individual products per order
  { $group: {
    _id: "$items.productId",
    totalSold: { $sum: "$items.quantity" },
    totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } }
  }},
  { $lookup: {
    from: "products",
    localField: "_id",
    foreignField: "_id",
    as: "product"
  }},
  { $unwind: "$product" },
  { $sort: { totalSold: -1 } },
  { $limit: 20 },
  { $project: {
    productName: "$product.name",
    category: "$product.category",
    totalSold: 1,
    totalRevenue: 1
  }}
])
3

Multiple Aggregations Simultaneously with $facet

$facet executes multiple parallel pipelines on a single input dataset simultaneously. It is useful for fetching multiple statistics in a single query for dashboards, returning each pipeline's results as separate fields. $bucket and $bucketAuto enable histogram-style distribution analysis.

// Fetch multiple dashboard statistics in one query
db.orders.aggregate([
  { $match: {
    createdAt: { $gte: ISODate("2024-01-01") },
    status: "completed"
  }},
  { $facet: {
    // Pipeline 1: Monthly revenue trend
    monthlyTrend: [
      { $group: {
        _id: { $dateToString: { format: "%Y-%m", date: "$createdAt" } },
        revenue: { $sum: "$totalAmount" },
        count: { $count: {} }
      }},
      { $sort: { _id: 1 } }
    ],

    // Pipeline 2: Payment method breakdown
    paymentMethods: [
      { $group: {
        _id: "$paymentMethod",
        count: { $count: {} },
        total: { $sum: "$totalAmount" }
      }},
      { $sort: { count: -1 } }
    ],

    // Pipeline 3: Order amount distribution (histogram)
    priceDistribution: [
      { $bucket: {
        groupBy: "$totalAmount",
        boundaries: [0, 10000, 30000, 50000, 100000, 500000],
        default: "500000+",
        output: {
          count: { $count: {} },
          avgAmount: { $avg: "$totalAmount" }
        }
      }}
    ],

    // Pipeline 4: Overall summary statistics
    summary: [
      { $group: {
        _id: null,
        totalRevenue: { $sum: "$totalAmount" },
        totalOrders: { $count: {} },
        avgOrderValue: { $avg: "$totalAmount" }
      }}
    ]
  }}
])
4

Window Functions with $setWindowFields (Running Totals, Moving Averages, Rankings)

$setWindowFields, supported since MongoDB 5.0, is equivalent to SQL window functions. It can calculate running totals, moving averages, ranks, and more within partitions. It is extremely useful for time-series data analysis and ranking.

// Cumulative spend and purchase ranking per customer
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $setWindowFields: {
    partitionBy: "$userId",
    sortBy: { createdAt: 1 },
    output: {
      // Cumulative spend per customer
      cumulativeSpend: {
        $sum: "$totalAmount",
        window: { documents: ["unbounded", "current"] }
      },
      // Order sequence number per customer
      orderSequence: {
        $documentNumber: {}
      },
      // Moving average of last 3 orders
      movingAvg: {
        $avg: "$totalAmount",
        window: { documents: [-2, "current"] }
      }
    }
  }},
  { $project: {
    userId: 1, totalAmount: 1, createdAt: 1,
    cumulativeSpend: 1, orderSequence: 1, movingAvg: { $round: ["$movingAvg", 0] }
  }}
])

// Product revenue ranking within category
db.products.aggregate([
  { $setWindowFields: {
    partitionBy: "$category",
    sortBy: { totalSales: -1 },
    output: {
      categoryRank: { $rank: {} },
      categoryDenseRank: { $denseRank: {} }
    }
  }},
  { $match: { categoryRank: { $lte: 5 } } }  // Top 5 per category
])
5

Index Strategy and Performance Optimization with explain()

Aggregation Pipeline performance depends heavily on whether the first $match and $sort stages utilize indexes. Analyzing execution plans with explain("executionStats") reveals whether it performs a COLLSCAN (full scan) or IXSCAN (index scan). The key is placing $match as early as possible in the pipeline and creating appropriate compound indexes.

// Create indexes (for Aggregation performance optimization)
db.orders.createIndex({ status: 1, createdAt: -1 })
db.orders.createIndex({ userId: 1, createdAt: -1 })
db.orders.createIndex({ "items.productId": 1 })

// Analyze execution plan with explain()
db.orders.explain("executionStats").aggregate([
  { $match: { status: "completed", createdAt: { $gte: ISODate("2024-01-01") } } },
  { $group: { _id: "$userId", total: { $sum: "$totalAmount" } } },
  { $sort: { total: -1 } },
  { $limit: 10 }
])

// Key metrics to check in results:
// - stages[0].inputStage.stage: "IXSCAN" (index scan) vs "COLLSCAN" (full scan)
// - executionStats.totalDocsExamined: number of documents examined
// - executionStats.executionTimeMillis: execution time

// Performance optimization tips
// 1. Place $match as early as possible (for index utilization)
// 2. Remove unnecessary fields early with $project (saves memory)
// 3. $sort + $limit combination bypasses memory limit (100MB)
// 4. allowDiskUse: true enables large-scale sorting

db.orders.aggregate([
  { $match: { status: "completed" } },            // Can use index
  { $project: { userId: 1, totalAmount: 1 } },    // Keep only needed fields
  { $group: { _id: "$userId", total: { $sum: "$totalAmount" } } },
  { $sort: { total: -1 } },
  { $limit: 100 }
], { allowDiskUse: true })  // Bypass 100MB memory limit
6

Using Aggregation Pipeline in Node.js (Mongoose)

Execute pipelines from application code using Mongoose's aggregate() method. This covers practical patterns including dynamic pipeline construction, error handling, and result type definitions. Building reusable pipeline builders reduces code duplication.

// Node.js + Mongoose Aggregation practical code
import mongoose from 'mongoose';

// Dynamic pipeline builder
interface ReportFilters {
  startDate?: Date;
  endDate?: Date;
  status?: string;
  category?: string;
}

async function generateSalesReport(filters: ReportFilters) {
  const pipeline: any[] = [];

  // Dynamic $match construction
  const matchStage: any = {};
  if (filters.status) matchStage.status = filters.status;
  if (filters.startDate || filters.endDate) {
    matchStage.createdAt = {};
    if (filters.startDate) matchStage.createdAt.$gte = filters.startDate;
    if (filters.endDate) matchStage.createdAt.$lt = filters.endDate;
  }
  if (Object.keys(matchStage).length > 0) {
    pipeline.push({ $match: matchStage });
  }

  // Join product info
  pipeline.push(
    { $unwind: "$items" },
    { $lookup: {
      from: "products",
      localField: "items.productId",
      foreignField: "_id",
      as: "product"
    }},
    { $unwind: "$product" }
  );

  // Category filter (post-join filtering)
  if (filters.category) {
    pipeline.push({ $match: { "product.category": filters.category } });
  }

  // Aggregation
  pipeline.push(
    { $group: {
      _id: "$product.category",
      totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
      totalQuantity: { $sum: "$items.quantity" },
      uniqueProducts: { $addToSet: "$items.productId" }
    }},
    { $addFields: { productCount: { $size: "$uniqueProducts" } } },
    { $project: { uniqueProducts: 0 } },
    { $sort: { totalRevenue: -1 } }
  );

  const result = await Order.aggregate(pipeline).allowDiskUse(true);
  return result;
}

// Usage in API router
app.get('/api/reports/sales', async (req, res) => {
  try {
    const report = await generateSalesReport({
      startDate: new Date(req.query.start as string),
      endDate: new Date(req.query.end as string),
      category: req.query.category as string,
    });
    res.json(report);
  } catch (error) {
    res.status(500).json({ error: 'Report generation failed' });
  }
});

Core Code

The core 5-stage pattern of MongoDB Aggregation Pipeline: $match(filter) -> $unwind(array decompose) -> $lookup(join) -> $group(aggregate) -> $sort(sort). Placing $match first is the key to performance.

// MongoDB Aggregation Pipeline Core Pattern
db.orders.aggregate([
  // Stage 1: Filter ($match - uses index, must be placed first)
  { $match: { status: "completed", createdAt: { $gte: ISODate("2024-01-01") } } },

  // Stage 2: Flatten array fields ($unwind)
  { $unwind: "$items" },

  // Stage 3: Join other collections ($lookup)
  { $lookup: {
    from: "products", localField: "items.productId",
    foreignField: "_id", as: "product"
  }},
  { $unwind: "$product" },

  // Stage 4: Group and aggregate ($group)
  { $group: {
    _id: "$product.category",
    revenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
    count: { $count: {} }
  }},

  // Stage 5: Sort and limit
  { $sort: { revenue: -1 } },
  { $limit: 10 }
], { allowDiskUse: true })

Common Mistakes

Placing $match late in the pipeline, causing a full collection scan

$match must be placed at the very beginning of the pipeline to utilize indexes. Placing it after $lookup or $unwind means all documents are processed before filtering, causing severe performance degradation. Use explain() to verify IXSCAN usage.

Out of memory (OOM) from $unwind on large arrays

Applying $unwind to documents with arrays of hundreds of elements causes document count to explode. Reduce document count with $match first, and use the preserveNullAndEmptyArrays option appropriately. Always set allowDiskUse: true for large pipelines.

Extremely slow join performance from using an unindexed foreignField in $lookup

Always create an index on the field corresponding to $lookup's foreignField. For example, with from: "products" and foreignField: "_id", the default _id index is used and is fast, but custom fields require separate indexes.

Related liminfo Services