Building Resilient E-commerce Microservices with NestJS and Apache Kafka
Learn how to build a fault-tolerant e-commerce system using NestJS microservices and Apache Kafka. We'll implement order processing, inventory management, email notifications, and order fulfillment with a focus on reliability and scalability.
#Apache #Kafka #NestJS #Microservices #Event-Driven #Architecture #Node.js #TypeScript #E-commerce #System #Design
Introduction to Event-Driven Microservices
In this guide, we'll build a robust e-commerce system using NestJS microservices and Apache Kafka. Our architecture will handle order processing, inventory management, notifications, and fulfillment while ensuring high availability and fault tolerance.
System Architecture Overview
Our system consists of five main microservices communicating through Kafka topics, each handling specific business domains:
Kafka Topic Design
// kafka-topics.ts
export const KAFKA_TOPICS = {
ORDERS: {
NEW_ORDER: 'orders.new',
ORDER_CONFIRMED: 'orders.confirmed',
ORDER_CANCELLED: 'orders.cancelled'
},
INVENTORY: {
STOCK_CHECK: 'inventory.stock.check',
STOCK_RESERVED: 'inventory.stock.reserved',
STOCK_RELEASED: 'inventory.stock.released'
},
NOTIFICATIONS: {
EMAIL_ORDER_CONFIRMATION: 'notifications.email.order.confirmation',
EMAIL_ORDER_SHIPPED: 'notifications.email.order.shipped'
},
FULFILLMENT: {
ORDER_READY: 'fulfillment.order.ready',
ORDER_SHIPPED: 'fulfillment.order.shipped'
}
};
Setting up Kafka Configuration
// kafka.config.ts
import { KafkaOptions, Transport } from '@nestjs/microservices';
export const kafkaConfig: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'], // Multiple brokers for fault tolerance
clientId: 'e-commerce-app',
},
consumer: {
groupId: 'e-commerce-consumer-group',
allowAutoTopicCreation: false,
sessionTimeout: 30000,
heartbeatInterval: 3000,
retry: {
initialRetryTime: 100,
retries: 5
}
},
producer: {
allowAutoTopicCreation: false,
idempotent: true, // Ensures exactly-once delivery
transactionalId: 'e-commerce-producer-tx'
}
}
};
Implementing the Order Service
// order.service.ts
@Injectable()
export class OrderService {
constructor(
@Inject('KAFKA_CLIENT')
private readonly kafkaClient: ClientKafka,
private readonly orderRepository: OrderRepository,
) {}
async createOrder(orderData: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.create(orderData);
// Emit new order event
await this.kafkaClient.emit(KAFKA_TOPICS.ORDERS.NEW_ORDER, {
orderId: order.id,
items: order.items,
timestamp: new Date().toISOString()
});
return order;
}
@MessagePattern(KAFKA_TOPICS.INVENTORY.STOCK_RESERVED)
async handleStockReserved(payload: StockReservedEvent) {
const order = await this.orderRepository.findById(payload.orderId);
await this.orderRepository.update(order.id, { status: 'CONFIRMED' });
// Emit order confirmed event
await this.kafkaClient.emit(KAFKA_TOPICS.ORDERS.ORDER_CONFIRMED, {
orderId: order.id,
timestamp: new Date().toISOString()
});
}
}
Implementing the Inventory Service
// inventory.service.ts
@Injectable()
export class InventoryService {
constructor(
@Inject('KAFKA_CLIENT')
private readonly kafkaClient: ClientKafka,
private readonly inventoryRepository: InventoryRepository,
) {}
@MessagePattern(KAFKA_TOPICS.ORDERS.NEW_ORDER)
async handleNewOrder(payload: NewOrderEvent) {
const { orderId, items } = payload;
try {
// Start a database transaction
await this.inventoryRepository.startTransaction();
// Check and reserve stock for all items
for (const item of items) {
await this.inventoryRepository.reserveStock(item.productId, item.quantity);
}
await this.inventoryRepository.commitTransaction();
// Emit stock reserved event
await this.kafkaClient.emit(KAFKA_TOPICS.INVENTORY.STOCK_RESERVED, {
orderId,
success: true,
timestamp: new Date().toISOString()
});
} catch (error) {
await this.inventoryRepository.rollbackTransaction();
// Emit stock check failed event
await this.kafkaClient.emit(KAFKA_TOPICS.ORDERS.ORDER_CANCELLED, {
orderId,
reason: 'INSUFFICIENT_STOCK',
timestamp: new Date().toISOString()
});
}
}
}
Implementing the Email Service
// email.service.ts
@Injectable()
export class EmailService {
constructor(
private readonly mailerService: MailerService,
@Inject('KAFKA_CLIENT')
private readonly kafkaClient: ClientKafka,
) {}
@MessagePattern(KAFKA_TOPICS.ORDERS.ORDER_CONFIRMED)
async sendOrderConfirmation(payload: OrderConfirmedEvent) {
try {
await this.mailerService.sendMail({
to: payload.customerEmail,
subject: 'Order Confirmation',
template: 'order-confirmation',
context: {
orderId: payload.orderId,
items: payload.items
}
});
await this.kafkaClient.emit(KAFKA_TOPICS.NOTIFICATIONS.EMAIL_ORDER_CONFIRMATION, {
orderId: payload.orderId,
success: true,
timestamp: new Date().toISOString()
});
} catch (error) {
// Implement retry mechanism
await this.handleEmailError(payload, error);
}
}
}
Implementing the Fulfillment Service
// fulfillment.service.ts
@Injectable()
export class FulfillmentService {
constructor(
@Inject('KAFKA_CLIENT')
private readonly kafkaClient: ClientKafka,
private readonly fulfillmentRepository: FulfillmentRepository,
) {}
@MessagePattern(KAFKA_TOPICS.ORDERS.ORDER_CONFIRMED)
async handleOrderFulfillment(payload: OrderConfirmedEvent) {
const fulfillment = await this.fulfillmentRepository.create({
orderId: payload.orderId,
status: 'PROCESSING'
});
// Emit fulfillment started event
await this.kafkaClient.emit(KAFKA_TOPICS.FULFILLMENT.ORDER_READY, {
fulfillmentId: fulfillment.id,
orderId: payload.orderId,
timestamp: new Date().toISOString()
});
}
async markOrderShipped(fulfillmentId: string) {
const fulfillment = await this.fulfillmentRepository.update(fulfillmentId, {
status: 'SHIPPED',
shippedAt: new Date()
});
// Emit order shipped event
await this.kafkaClient.emit(KAFKA_TOPICS.FULFILLMENT.ORDER_SHIPPED, {
fulfillmentId: fulfillment.id,
orderId: fulfillment.orderId,
timestamp: new Date().toISOString()
});
}
}
Fault Tolerance and High Availability
To ensure system reliability and fault tolerance, we implement several key strategies:
领英推荐
Monitoring and Error Handling
// monitoring.service.ts
@Injectable()
export class MonitoringService {
constructor(
private readonly prometheusService: PrometheusService,
private readonly logger: Logger,
) {}
recordKafkaLatency(topic: string, duration: number) {
this.prometheusService.recordHistogram({
name: 'kafka_message_processing_duration',
help: 'Duration of processing Kafka messages',
labelNames: ['topic'],
buckets: [0.1, 0.5, 1, 2, 5]
}, duration, { topic });
}
async handleDeadLetter(topic: string, message: any, error: Error) {
await this.logger.error(`Dead letter in topic ${topic}`, {
message,
error,
timestamp: new Date().toISOString()
});
// Store in DLQ for later processing
await this.deadLetterQueue.store(topic, message, error);
}
}
Best Practices and Considerations
This microservices architecture provides a scalable and resilient foundation for e-commerce operations. The combination of NestJS and Apache Kafka enables reliable event-driven communication between services while maintaining system consistency and fault tolerance. Remember to adjust configurations based on your specific requirements and load patterns.
Microservices Architecture Deep Dive
Each service in our system is built as an independent NestJS application, with its own database and business logic. This separation allows for independent scaling, deployment, and maintenance of each component.
// app.module.ts for each microservice
@Module({
imports: [
ConfigModule.forRoot(),
ClientsModule.register([
{
name: 'KAFKA_CLIENT',
transport: Transport.KAFKA,
options: kafkaConfig.options
}
]),
TypeOrmModule.forRoot({
type: 'postgres',
host: process.env.DB_HOST,
port: parseInt(process.env.DB_PORT),
username: process.env.DB_USERNAME,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
autoLoadEntities: true,
synchronize: false
}),
HealthModule,
MetricsModule,
ServiceSpecificModules...
],
})
export class AppModule {}
Service Communication Flow
Message Flow and Topic Relationships
Service Independence and Scalability
Each microservice is containerized and can be independently scaled based on its specific requirements. Here's how each service maintains its independence:
# docker-compose.yml example for service independence
version: '3.8'
services:
order-service:
build: ./order-service
environment:
- DB_HOST=order-db
- KAFKA_BROKERS=kafka1:9092,kafka2:9093
depends_on:
- order-db
- kafka1
- kafka2
inventory-service:
build: ./inventory-service
environment:
- DB_HOST=inventory-db
- KAFKA_BROKERS=kafka1:9092,kafka2:9093
depends_on:
- inventory-db
- kafka1
- kafka2
email-service:
build: ./email-service
environment:
- SMTP_HOST=smtp.example.com
- KAFKA_BROKERS=kafka1:9092,kafka2:9093
depends_on:
- kafka1
- kafka2
fulfillment-service:
build: ./fulfillment-service
environment:
- DB_HOST=fulfillment-db
- KAFKA_BROKERS=kafka1:9092,kafka2:9093
depends_on:
- fulfillment-db
- kafka1
- kafka2
Event Flow Sequence