How Big Techs Manage High Volume of Real-Time Data Streaming with Node.js
Jo?o Victor Fran?a Dias
Senior Fullstack Software Engineer | Typescript | Node | React | Nextjs | Python| Golang | AWS
In the current technology landscape, where data is generated and consumed at unprecedented volumes and speeds, big tech companies face significant scalability challenges. An effective solution to this problem is the combination of message queues with the efficient architecture of Node.js. This article explores how technologies like RabbitMQ and Apache Kafka, integrated with Node.js, are used to efficiently manage large volumes of real-time data.
RabbitMQ and Apache Kafka are message queue systems that enable the processing of continuous and high-volume data streams, essential for applications requiring real-time analysis and response. RabbitMQ is known for its low latency and ability to send thousands of messages per second, making it ideal for tasks that require quick point-to-point message delivery. On the other hand, Kafka is designed to handle extremely high throughput, transmitting millions of messages per second, making it suitable for processing large volumes of real-time data.
The architecture of Node.js, with its non-blocking I/O operations and event-driven model, complements these queue systems perfectly. This allows Node.js applications to efficiently consume data from Kafka or RabbitMQ while maintaining the ability to scale and respond quickly to real-time data demands. The integration of Node.js with these queues facilitates the construction of resilient and scalable systems that can handle the demands of modern data streaming applications, such as those used by big tech companies to monitor and react to events in real-time.
Building a Real-Time Data Consumption Architecture with Node.js and RabbitMQ
In this practical example, we will build a simple application that serves as a proof of concept to demonstrate how big tech companies can handle high volumes of real-time data using Node.js and RabbitMQ. The application aims to consume random numbers that are introduced into a queue by another application and display them in real time on the frontend of a user connected via WebSocket. As shown in the diagram below:
The architecture consists of a backend in Node.js that connects to RabbitMQ to consume data from the queue and transmit it to connected clients via WebSockets. This approach allows data to be processed and distributed efficiently and in real-time, ensuring that users receive the information without delays, even in high-demand scenarios. We will explore how this architecture works and how it can be implemented to solve scalability and real-time data processing issues.
Connecting Node.js to a RabbitMQ Queue: The First Step
To start integrating Node.js with RabbitMQ, we will use a class called Queue, which encapsulates all the necessary logic to connect to a RabbitMQ queue, consume messages, and emit events to other parts of the application. This class is built on top of the amqplib library, which is one of the most popular libraries for working with RabbitMQ in Node.js.
Code:
import amqp from 'amqplib'
import { EventEmitter } from 'stream';
import { QUEUE_HOST, QUEUE_NAME, QUEUE_PASS, QUEUE_PORT, QUEUE_USER, QUEUE_VHOST } from './constants';
export class Queue extends EventEmitter {
private url: string;
private channel: amqp.Channel | null;
private static instance: Queue
constructor() {
super();
this.url = `amqp://${QUEUE_USER}:${QUEUE_PASS}@${QUEUE_HOST}:${QUEUE_PORT}${QUEUE_VHOST}`
this.channel = null
}
static getInstance() {
if (!this.instance) {
this.instance = new Queue()
}
return this.instance
}
addEvent(event: string, cb: (...args: any[]) => void): void {
this.on(event, cb)
}
async consumeQueue(queueName: string, event: string) {
if (!this.channel) {
throw new Error('Channel not initialized')
}
this.channel.consume(queueName, (msg) => {
if (!msg) {
return
}
const content = msg.content.toString()
this.emit(event, content)
this.channel!.ack(msg)
})
}
async connect() {
try {
const connection = await amqp.connect(this.url);
const channel = await connection.createChannel()
this.channel = channel
await this.assertQueue(channel, QUEUE_NAME)
console.log('Connected to RabbitMQ')
} catch (err) {
console.error(err);
}
}
private async assertQueue(channel: amqp.Channel, queue: string, option?: amqp.Options.AssertQueue) {
const defaultOption: amqp.Options.AssertQueue = {
exclusive: false,
durable: true,
autoDelete: false,
arguments: null
}
await channel.assertQueue(queue, { ...defaultOption, ...option })
}
async sendToQueue(data: string | number) {
if (!this.channel) {
throw new Error('Channel not initialized')
}
this.channel.sendToQueue(QUEUE_NAME, Buffer.from(data.toString()))
}
}
Code Explanation:
The Queue class is designed to be a singleton, ensuring that only one instance of the connection to RabbitMQ is created during the application's lifecycle. This is important to avoid multiple unnecessary connections, which could overload the message server.
With this Queue class, we establish a solid foundation for connecting our Node.js application to a RabbitMQ queue. This allows us to consume and process real-time data efficiently and at scale. In the next step, we will integrate this queue with the rest of the application, enabling random numbers to be consumed and displayed in real-time on the frontend of users connected via WebSocket.
Step two: Implementing the Real-Time Data Streaming Service
Code:
import { PassThrough, Readable } from "stream"
import { Queue } from "./Queue"
import { QUEUE_NAME } from "./constants"
export class StreamService {
private dataStrem: Readable
constructor(private queueService: Queue) {
this.dataStrem = new Readable({
read() { }
});
const event = 'dataStrem'
this.queueService.consumeQueue(QUEUE_NAME, event).catch(console.error)
this.queueService.on(event, (data) => {
console.log('data', data)
this.dataStrem.push(data)
})
}
public getData() {
const clientStream = this.createClientStream()
this.dataStrem.pipe(clientStream)
return {
clientStream
}
}
private createClientStream() {
const clientStream = new PassThrough()
return clientStream
}
}
Code Explanation:
The StreamService class is a central component in the real-time data streaming architecture, responsible for consuming messages from the RabbitMQ queue and transforming them into a continuous data stream that can be transmitted to connected clients. This class utilizes Node.js's Streams API, which is highly efficient for handling real-time data.
领英推荐
The StreamService class is essential for transforming the messages consumed from the RabbitMQ queue into a continuous data stream that can be easily transmitted to clients. By utilizing Node.js's Streams API, StreamService ensures that data is processed efficiently and in real-time, allowing the application to handle large volumes of data without compromising performance. In the next step, we will see how to integrate this class with the controller that manages WebSocket connections, enabling the data to be displayed in real-time on the users' frontend.
Step Three: Controlling Data Flow with the StreamController
Code:
import { Writable } from "stream"
import { StreamService } from "./service"
export class StreamController {
constructor(private streamService: StreamService) { }
public getData(socket: any, channel: string) {
const { clientStream } = this.streamService.getData()
const writeStream = this.writeStreamOnObj((chunk) => {
socket.emit(channel, chunk)
})
clientStream.pipe(writeStream)
return {
onClose: () => {
console.info(`closing connection of ${socket.id}`)
}
}
}
private writeStreamOnObj(func: (chunk: any) => void) {
return new Writable({
objectMode: true,
write(chunk, encoding, callback) {
func(chunk)
callback()
}
})
}
}
The StreamController is the final piece of the architecture that connects the data streaming service (StreamService) to clients that connect via WebSocket. It is responsible for managing client connections, ensuring that real-time data is transmitted efficiently and continuously to each connected client.
The StreamController plays a crucial role in the real-time data streaming architecture, acting as the intermediary between the StreamService and the clients connected via WebSocket. It ensures that data is transmitted continuously and efficiently, managing client connections and ensuring that each one receives real-time data without interruptions. With this structure, the application is capable of handling multiple simultaneous clients, maintaining the scalability and efficiency required for modern data streaming applications
Step Four: Managing WebSocket Connections for Real-Time Data Streaming
code:
import { Server } from "socket.io"
import { StreamController } from "./controllet"
import { StreamService } from "./service"
import { Queue } from "./Queue"
export const mountRuter = async (io: Server,) => {
const queue = new Queue()
await queue.connect()
const streamService = new StreamService(queue)
const streamController = new StreamController(streamService)
io.on("connection", async (socket) => {
console.log(`user connected: ${socket.id}`)
const { onClose } = streamController.getData(socket, 'stream_data');
socket.on("disconnect", () => {
console.log(`user disconnected: ${socket.id}`)
onClose()
})
})
}
The provided code is responsible for managing WebSocket connections and integrating the real-time data stream with connected clients. It begins by establishing a connection to RabbitMQ through the Queue class, followed by the creation of instances of StreamService and StreamController, which are responsible for consuming and transmitting the data. When a client connects via WebSocket, the StreamController initiates the real-time data transmission to the client through the stream_data channel. The code also handles client disconnections, ensuring that resources are properly released by calling the onClose function when the client disconnects. This ensures that the system continues to operate efficiently, even with multiple connections and disconnections
Step Five: Demonstrating the Real-Time Results
Now that we have explored the architecture and implementation of the real-time data streaming system, it's time to see it all in action. In the following video, available at this LINK, you will see a practical demonstration of how the application consumes random numbers from a RabbitMQ queue and transmits them in real-time to clients connected via WebSocket. The demonstration illustrates the efficiency and smoothness of the system, showing how the data is processed and instantly displayed on the frontend, providing a responsive and dynamic user experience.
Conclusion
In this article, we explored how big tech companies manage high volumes of real-time data using Node.js and RabbitMQ. Through a proof of concept, we demonstrated how a message queue-based architecture can be implemented to efficiently and scalably consume, process, and distribute data. From setting up the queue to transmitting data to clients via WebSocket, each component plays a crucial role in building a robust and responsive system. With this approach, it is possible to ensure that data is processed in real-time, providing a smooth and reliable user experience, even in high-demand scenarios. For those interested in accessing the code and exploring the implementation further, the repository for this project can be found at https://github.com/joao99sb/nodejs-queue.
Mobile Engineer | React Native Developer | React | TypeScript | JavaScript | Mobile Developer | Node
6 个月Very helpful
.NET Developer | C# | TDD | Angular | Azure | SQL
6 个月Great article Jo?o Victor
Senior Software Engineer | TypeScript | Node.js | Nest.js | React | Next.js | 5x AWS Certified
6 个月Excellent post! You've provided a comprehensive overview of how to manage high-volume real-time data streaming with Node.js and RabbitMQ. Thanks for sharing, Jo?o Victor Fran?a Dias!
Senior Ux Designer | Product Designer | UX/UI Designer | UI/UX Designer | Figma | Design System |
6 个月Fascinating article on how big techs manage high volumes of real-time data streaming with Node.js! The use of message queues like RabbitMQ and Apache Kafka is a game-changer for scalability and efficiency.?
.NET Software Engineer | Full Stack Developer | C# | Angular & Blazor | Azure & AWS | Microservices Expert
6 个月Great content