Asynchronous API Design: Server-Sent-Event(SSE) for Real-time Communication

Asynchronous API Design: Server-Sent-Event(SSE) for Real-time Communication

In the realm of modern application development, real-time communication is no longer a luxury but a necessity. Asynchronous API design is the key to achieving this, enabling applications to provide timely updates and notifications without the constraints of traditional request-response patterns.

In this article, we’ll explore four powerful techniques for asynchronous API design: Callbacks, WebSockets, Message Queues, and Server-Sent Events (SSE). These methods offer unique advantages, making them important for creating responsive, real-time applications.

Why Asynchronous API Design Matters:

Traditional request-response patterns in API design have their limitations. When a client sends a request to a server, it often has to wait for a response, which can result in delays and reduced user experience, especially in scenarios where real-time updates are crucial.

Asynchronous API design breaks free from these constraints by allowing the server to handle time-consuming tasks asynchronously and respond immediately with an acknowledgment. This enables clients to continue their operations without waiting and receive updates as soon as the task is completed.

Basic Asynchronous API Workflow:

In asynchronous API design,

  1. Client Requests Server Through A Rest Endpoint.
  2. The server processes the Time Consuming Task asynchronously and immediately sends a response to acknowledge the task, “I am doing the task”, Server may send a unique ID with the immediate response, so that client can periodically call with this ID to get task status.

3. After the task is completed, the server informs clients with a response message using various mechanisms. The choice of mechanism often depends on the specific requirements of the application and the communication protocols being used.

What if we could push data to the API client?

The ideal situation is to have our servers inform the API client when new data or events are available. However, we can’t do this with a traditional request-response interaction style common with HTTP. We have to find a way to allow servers to push data to the client. Enter async APIs.

How Server to Client Real Time Notification Response Works?

Approach 1: Polling:

Clients repeatedly send requests to the server, asking for updates. The server responds when it has new information or results. While simple to implement, polling can lead to increased network traffic and delays.

Approach 2: WebSockets:

WebSockets provide full-duplex communication channels, allowing the server to push messages to clients as soon as new data is available. WebSockets are ideal for applications requiring low-latency, real-time communication.

Approach 3: Server-Sent Events (SSE):

Server needs to push updates to clients in a unidirectional manner. It uses a single HTTP connection, reducing overhead compared to opening multiple connections.

SSE is unidirectional, meaning clients can only receive updates from the server, not send data back. SSE is best suited for scenarios where one-way communication suffices.

Approach 4: Message Queues:

Servers can use message queues (e.g., RabbitMQ, Apache Kafka) to publish messages. Clients subscribe to specific topics or queues and receive messages asynchronously as they arrive.

Approach 5: Callback URLs:

Efficient for scenarios where the server needs to notify the client of long-running operations. They minimize the need for clients to poll or maintain persistent connections.

  • Drawbacks: Clients must expose publicly accessible Callback URLs, which can introduce security and privacy concerns. Additionally, managing Callback URLs and handling retries in case of failures can be challenging.

Async APIs using server-sent events

Server-sent events (SSE) represent a powerful mechanism for enabling asynchronous communication between a server and clients, particularly in the context of APIs. SSE is based on the EventSource browser interface standardized as part of HTML5 by the World Wide Web Consortium (W3C). It introduces a method for using HTTP to establish longer-lived connections, allowing servers to proactively push data to clients. This data is typically structured as events and can include associated payload information.

Originally, SSE was conceived to facilitate data delivery to web applications, but it has found increasing relevance in the world of APIs. SSE offers a compelling alternative to traditional polling mechanisms, addressing some of the inherent challenges associated with client-server communication.

How does SSE work?

SSE uses a standard HTTP connection, but holds onto the connection for a longer period of time rather than disconnecting immediately. This connection allows servers to push data back to the client when it becomes available:


The specification outlines a few options for the format of the data coming back, allowing for event names, comments, single or multi-line text-based data, and event identifiers.


Use Case: E-Commerce Bulk Product Update API

In this use case, an e-commerce website allows clients to upload a CSV file containing a large number of product listings.

The server processes the CSV file asynchronously and immediately sends a response to acknowledge the upload. After the parsing and validation of the CSV file are completed, the server sends the processed product data to the client using Server-Sent Events (SSE).

Client to Server (CSV Upload and Asynchronous Processing):

1. Client Initiates CSV Upload:

The client interacts with the e-commerce website and initiates the CSV file upload through the user interface.

2. Client Sends CSV File:

The client selects a CSV file containing product data and uploads it to the server via a POST request to the /api/upload/csv endpoint.

3. Server Validates File and Generates Transaction ID:

The server receives the CSV file and validates it. If the file is valid, the server acknowledges the upload by responding immediately with an HTTP 202 (Accepted) status code.

  • The server generates a unique transaction ID for this upload, which is used to track the progress of the processing.

4. Asynchronous Processing Starts:

The server starts asynchronous processing of the CSV file. This processing includes CSV parsing, data validation, and the creation of product listings.

  • The processing may occur in the background, allowing the server to handle other requests while the CSV is being processed.

5. Progress Updates Sent via SSE:

As the CSV file is processed and product data is generated, the server sends real-time progress updates to the client using Server-Sent Events (SSE). The SSE endpoint (/sse) is established and connected with the client using the transaction ID.

6. Server to Client (Progress Updates and Completion):

  1. Client Listens to SSE Endpoint- Real Time Progress Update: On the client side, JavaScript is used to listen to the SSE endpoint (/sse) associated with the unique transaction ID. The client establishes a persistent connection to the server through SSE, enabling real-time updates.

2. Server Sends Progress Updates:

  • While processing the CSV file, the server sends partial product data updates and progress messages to the SSE endpoint, which is actively monitored by the client.
  • These updates are pushed from the server to the client in real-time, providing user with feedback on the progress of the CSV processing.

// Inside your CSV processing logic
String transactionId = "TXN-123"; // Replace with the actual transaction ID
String progressMessage = "Processing 50% complete"; // Replace with your progress message

// Send an SSE update to the client
sseController.sendSseUpdate(transactionId, progressMessage);        

3. Completion Message Sent via SSE:

  • When the entire CSV file has been successfully processed, the server sends a final completion message via SSE to inform the client that the product data is ready for retrieval.
  • The client receives this message and can proceed to retrieve the processed product data from the server.

4. Error Handling:

  • If there is an issue with the uploaded CSV file (e.g., invalid format or data) or if errors occur during processing, the server sends an error response to the client.
  • The client is informed of the error and can rectify the issue.

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@RestController
@RequestMapping("/api/upload")
public class CsvUploadController {

    private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

    @PostMapping("/csv")
    public ResponseEntity<String> uploadCsv(@RequestParam("file") MultipartFile file) {
        if (file.isEmpty()) {
            return ResponseEntity.badRequest().body("Please select a CSV file to upload.");
        }

        if (!isCsvValid(file)) {
            return ResponseEntity.badRequest().body("Invalid CSV file format or data.");
        }

        String transactionId = "TXN-" + System.currentTimeMillis();

        //1. Start asynchronous processing and return a CompletableFuture
        CompletableFuture<Void> processingFuture = CompletableFuture.runAsync(() -> {
            asyncProcessCsv(file, transactionId);
        });

        //2. Return a 202 (Accepted) response with the transaction ID
        return ResponseEntity.status(HttpStatus.ACCEPTED).body(transactionId);
    }



    private boolean isCsvValid(MultipartFile file) {
        // Add your CSV validation logic here
        // Return true if the CSV is valid; otherwise, return false
        return true;
    }


    private void asyncProcessCsv(MultipartFile file, String transactionId) {
        CompletableFuture<Void> processingFuture = CompletableFuture.runAsync(() -> {
            // Your CSV processing logic here
            try (CSVReader csvReader = new CSVReader(
                    new InputStreamReader(file.getInputStream()))) {

                // Process CSV rows here
                // ...

                // Send progress updates via SSE
                for (int i = 1; i <= totalRows; i++) {
                    String progressMessage = "Processing row " + i;
                    sendProgressUpdate(transactionId, progressMessage);
                }

                // Send completion message via SSE
                sendCompletionMessage(transactionId, "CSV processing completed.");
            } catch (Exception e) {
                // Handle exceptions during processing
                sendErrorMessage(transactionId, "Error during processing: " + e.getMessage());
            } finally {
                sseEmitters.remove(transactionId);
            }
        });

        // Handle any exceptions that occur during processing
        processingFuture.exceptionally(ex -> {
            sendErrorMessage(transactionId, "Error during processing: " + ex.getMessage());
            return null;
        });
    }


    @GetMapping("/sse/{transactionId}")
    public SseEmitter getSseEmitter(@PathVariable String transactionId) {
        SseEmitter sseEmitter = new SseEmitter();
        sseEmitters.put(transactionId, sseEmitter);
        return sseEmitter;
    }


    private void sendProgressUpdate(String transactionId, String message) {
        SseEmitter sseEmitter = sseEmitters.get(transactionId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event().name("progress").data(message));
            } catch (IOException e) {
                // Handle exceptions when sending SSE updates
                e.printStackTrace();
            }
        }
    }


    private void sendCompletionMessage(String transactionId, String message) {
        SseEmitter sseEmitter = sseEmitters.get(transactionId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event().name("complete").data(message));
                sseEmitter.complete(); // Close the SSE connection
            } catch (IOException e) {
                // Handle exceptions when sending SSE updates
                e.printStackTrace();
            }
        }
    }


    private void sendErrorMessage(String transactionId, String message) {
        SseEmitter sseEmitter = sseEmitters.get(transactionId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event().name("error").data(message));
                sseEmitter.completeWithError(new RuntimeException(message)); // Complete with an error
            } catch (IOException e) {
                // Handle exceptions when sending SSE updates
                e.printStackTrace();
            }
        }
    }
}        

Key Points:

  • Upon receiving request, server immediately responds with TXN ID
  • Client upon receiving TXN ID, registers to SSE for real-time progress update, error handling message and task completion notification
  • Server Asynchronously process the task and upon completion, notification sent by SSE to client

Client-Side Implementation:

On the client side (typically a web page), you need to use JavaScript to listen to the SSE endpoint (/sse/stream) and handle incoming updates. Here's a simplified example of how you can do this in JavaScript:

<!DOCTYPE html>
<html>
<head>
    <title>Asynchronous Order Processing</title>
</head>
<body>
    <h1>Asynchronous Order Processing</h1>
    <button onclick="processOrder()">Process Order</button>
    <div id="result"></div>

    <script>
        let eventSource = null;

        async function processOrder() {
            const orderRequest = {
                csvFilePath: "Path"
            };

            try {
                const response = await fetch('/api/upload/csv', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json'
                    },
                    body: JSON.stringify(orderRequest)
                });

                if (response.status === 202) {
                    document.getElementById('result').textContent = 'Order processing initiated. Waiting for completion...';
                     const transactionId =response.result.transactionId;
                    // Connect to the SSE endpoint for this order
        const eventSource = new EventSource(`/sse/stream?transactionId=${transactionId}`);                    
                    eventSource.onmessage = (event) => {
                        document.getElementById('result').textContent = event.data;
                    };
                    
                    eventSource.onerror = (error) => {
                        console.error('SSE Error:', error);
                    };
                }
            } catch (error) {
                console.error('Error:', error);
            }
        }

        function closeEventSource() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        // Close the SSE connection when leaving the page
        window.addEventListener('beforeunload', closeEventSource);
    </script>
</body>
</html>        

Conclusion:

In the era of modern application development, responsiveness and real-time communication are essential. Asynchronous API design, with its array of techniques such as Callbacks, WebSockets, Message Queues, and Server-Sent Events (SSE), empowers developers to build applications that deliver timely updates and notifications to users. In our real-life e-commerce use case, SSE proves to be a game-changer, providing clients with real-time progress updates and completion notifications while optimizing performance and user experience.

As you navigate the landscape of asynchronous API design, consider the unique requirements of your application and choose the method that best aligns with your goals. Whether it’s keeping users informed of product updates or enabling real-time collaboration in a collaborative platform, mastering these asynchronous techniques will set your applications apart in today’s dynamic digital world.

要查看或添加评论,请登录

社区洞察

其他会员也浏览了