Optimization in Java: Choosing Between Sequential and Parallel Streams

Optimization in Java: Choosing Between Sequential and Parallel Streams

In Java applications, performance optimization is essential, particularly when processing large datasets. One common use case is replacing specific values in a dataset. This article will guide you through optimizing this process using parallel streaming and functional indexing in PostgreSQL, improving both lookup performance and processing speed. (Considering that this is a legacy code project where architecture changes are not frequent, it becomes even more important to take a cautious approach when introducing parallelism.)

Scenario: Replacing Values in Large Datasets

We have a large dataset containing 500K records, and each record has a value that may need to be replaced. These mappings are stored in a separate table (value_mappings), and we need to replace values based on this mapping.

The challenge is to do this efficiently, both in terms of memory and processing time. To address this challenge, we'll apply two optimizations:

  1. Parallel Stream Processing: We’ll use Java’s parallel streams to speed up the processing of large datasets.
  2. Database Indexing: We'll use a functional index in PostgreSQL to optimize the lookup of replacements.

The Data Model

We have two generic tables:

  1. main_data: Contains the large dataset records with values that need to be replaced. Let’s assume 500K records.
  2. value_mappings: Contains mappings of original values to their replacements. Let’s assume 1K records.

Our task is to efficiently look up values in the value_mappings table and replace them in the main_data table.

Schema of the value_mappings table:

CREATE TABLE value_mappings (

??? id INT PRIMARY KEY,

??? original_value VARCHAR(100) NOT NULL,

??? replacement_value VARCHAR(100)

);

Old Java Code: Sequential Processing

In the original implementation, we processed records sequentially, looking up values from the database for each record. Here’s how it looked:

public String processRecords() {

??? List<Object[]> records = entityManager.createNativeQuery("SELECT * FROM main_data WHERE original_value IS NOT NULL")

??????????? .getResultList();

??? List<DataRecord> resultList = new ArrayList<>();

??? for (Object[] record : records) {

??????? String value = record[0].toString();

??????? String modifiedValue = valueReplacer.replaceValue(value);

??????? record[0] = modifiedValue;

??????? // Add to the list to persist later

??????? resultList.add(createDataRecord(record));

??? }

??? return insertIntoDatabase(resultList);

}

Old Value Replacer Code:

@Component

public class ValueReplacer {

??? private final ValueMappingRepository valueMappingRepository;

??? @Autowired

??? public ValueReplacer(ValueMappingRepository valueMappingRepository) {

??????? this.valueMappingRepository = valueMappingRepository;

??? }

? public String replaceValue(String value) {

??????? String trimmedValue = value.trim().toLowerCase();

??????? ValueMapping mapping = valueMappingRepository.findAll()

??????????????? .stream()

??????????????? .filter(m -> m.getOriginalValue().trim().equalsIgnoreCase(trimmedValue))

??????????????? .findFirst()

??????????????? .orElse(null);

??????? return mapping != null ? mapping.getReplacementValue() : value;

??? }

}

Challenges with the Old Approach:

  • Sequential Processing: The loop processes records one at a time, which can be slow for large datasets.
  • Database Lookup: Every value lookup involves a query to the repository, which is inefficient for large datasets.

New Java Code: Parallel Processing with Optimized Lookups

We can significantly improve performance by processing records in parallel using Java’s parallelStream(). Additionally, we’ll optimize the database lookups by caching the mappings in memory and using a functional index in the database.

Updated Code with Parallel Stream and Optimized Caching:

public String processRecords() {

??? List<Object[]> records = entityManager.createNativeQuery("SELECT * FROM main_data WHERE original_value IS NOT NULL")

??????????? .getResultList();

??? // Use parallel stream for processing records concurrently

??? List<DataRecord> resultList = records.parallelStream()

??????? .map(record -> {

??????????? String value = record[0].toString();

??????????? String modifiedValue = valueReplacer.replaceValue(value);

??????????? record[0] = modifiedValue;

??????????? return createDataRecord(record);? // Map to a new DataRecord object

??????? })

??????? .collect(Collectors.toList());? // Collect into a list for batch insertion

??? return insertIntoDatabase(resultList);

}

Improvements:

  1. Parallel Stream Processing: The parallelStream() processes the records concurrently, utilizing multiple CPU cores. This speeds up the processing of large datasets significantly. Parallel processing reduces the time complexity by breaking the task into smaller chunks.
  2. Optimized Value Lookup: We cache the value mappings in memory and perform lookups against this cache rather than querying the database repeatedly.

Updated Value Replacer (with Caching):

@Component

public class ValueReplacer {

??? private final Map<String, String> valueMappings = new HashMap<>();

??? private final ValueMappingRepository valueMappingRepository;

??? @Autowired

??? public ValueReplacer(ValueMappingRepository valueMappingRepository) {

??????? this.valueMappingRepository = valueMappingRepository;

??????? loadMappings();

??? }

??? private void loadMappings() {

??????? List<ValueMapping> mappings = valueMappingRepository.findAll();

??????? for (ValueMapping mapping : mappings) {

??????????? valueMappings.put(mapping.getOriginalValue().trim().toLowerCase(), mapping.getReplacementValue());

??????? }

??? }

??? public String replaceValue(String value) {

??????? String trimmedValue = value.trim().toLowerCase();

??????? return valueMappings.getOrDefault(trimmedValue, value);

??? }

}

Optimizing Database with Functional Indexing

To further improve performance, especially when querying the database for value mappings, we apply a functional index in PostgreSQL to handle case-insensitive lookups efficiently. Here’s how you can create the functional index:

CREATE INDEX idx_original_value_lower ON value_mappings(LOWER(original_value));        

This index ensures that queries for value replacements (with case-insensitivity) can leverage the database index, rather than performing a full table scan.

Performance Comparison: Old vs New Code

  • Old Code (Sequential Processing): Each record is processed one at a time in a loop. Lookups for replacements are made for each record, which is slow for large datasets.
  • New Code (Parallel Processing): The processing is done in parallel, utilizing multiple CPU cores, speeding up the overall operation. The lookups are optimized by caching the value mappings in memory, and the database queries are indexed for faster case-insensitive lookups.

Parallel Stream Processing: Theoretical Overview

Parallel streams in Java enable concurrent processing of large collections. This technique is beneficial for CPU-intensive tasks like data transformations, computations, and aggregations. By splitting the dataset into smaller chunks and processing them in parallel, parallel streams can reduce the overall time complexity significantly. A sequential stream can be transformed into a parallel one when there are specific performance requirements. In such cases, it's important to first conduct performance measurements and evaluate parallelism as a potential optimization strategy.

Example:

// Sequential version

List<Integer> numbersList = Arrays.asList(1, 2, 3, 4);

numbersList.stream().forEach(num ->

??? System.out.println(num + " " + Thread.currentThread().getName())

);

?

// Parallel version

List<Integer> numbersList = Arrays.asList(1, 2, 3, 4);

numbersList.parallelStream().forEach(num ->

??? System.out.println(num + " " + Thread.currentThread().getName())

); // Be careful about order of output print ??

Conclusion

By applying parallel stream processing and functional indexing, we were able to optimize the value replacement process, significantly reducing the processing time for large datasets.

If you’re working with large datasets and need to perform transformations like replacing values, I highly recommend:

  • Using parallel streams for concurrent processing.
  • Caching data in memory to avoid repeated database lookups.
  • Indexing your database on frequently queried columns to speed up lookups.

With these optimizations, your application will be able to scale efficiently as your dataset grows.

Let me know if you have applied similar techniques in your projects or any further improvement in the above code samples!

#Java #JavaForDevelopers #JavaOptimization #JavaApplicationDevelopment

?

Sumit Kumar Jha

Researcher |Data Science | AI and Machine Learning | Python and Generative AI Professional@Orange Business

2 个月

kudos ??

回复

Very insightful

回复

要查看或添加评论,请登录

社区洞察

其他会员也浏览了