Optimization in Java: Choosing Between Sequential and Parallel Streams
Manish K Singh
Sr. Software Developer (Java, Spring boot, Oracle, Postgre) | Data Engineering
In Java applications, performance optimization is essential, particularly when processing large datasets. One common use case is replacing specific values in a dataset. This article will guide you through optimizing this process using parallel streaming and functional indexing in PostgreSQL, improving both lookup performance and processing speed. (Considering that this is a legacy code project where architecture changes are not frequent, it becomes even more important to take a cautious approach when introducing parallelism.)
Scenario: Replacing Values in Large Datasets
We have a large dataset containing 500K records, and each record has a value that may need to be replaced. These mappings are stored in a separate table (value_mappings), and we need to replace values based on this mapping.
The challenge is to do this efficiently, both in terms of memory and processing time. To address this challenge, we'll apply two optimizations:
The Data Model
We have two generic tables:
Our task is to efficiently look up values in the value_mappings table and replace them in the main_data table.
Schema of the value_mappings table:
CREATE TABLE value_mappings (
??? id INT PRIMARY KEY,
??? original_value VARCHAR(100) NOT NULL,
??? replacement_value VARCHAR(100)
);
Old Java Code: Sequential Processing
In the original implementation, we processed records sequentially, looking up values from the database for each record. Here’s how it looked:
public String processRecords() {
??? List<Object[]> records = entityManager.createNativeQuery("SELECT * FROM main_data WHERE original_value IS NOT NULL")
??????????? .getResultList();
??? List<DataRecord> resultList = new ArrayList<>();
??? for (Object[] record : records) {
??????? String value = record[0].toString();
??????? String modifiedValue = valueReplacer.replaceValue(value);
??????? record[0] = modifiedValue;
??????? // Add to the list to persist later
??????? resultList.add(createDataRecord(record));
??? }
??? return insertIntoDatabase(resultList);
}
Old Value Replacer Code:
@Component
public class ValueReplacer {
??? private final ValueMappingRepository valueMappingRepository;
??? @Autowired
??? public ValueReplacer(ValueMappingRepository valueMappingRepository) {
??????? this.valueMappingRepository = valueMappingRepository;
??? }
? public String replaceValue(String value) {
??????? String trimmedValue = value.trim().toLowerCase();
??????? ValueMapping mapping = valueMappingRepository.findAll()
??????????????? .stream()
??????????????? .filter(m -> m.getOriginalValue().trim().equalsIgnoreCase(trimmedValue))
??????????????? .findFirst()
??????????????? .orElse(null);
??????? return mapping != null ? mapping.getReplacementValue() : value;
??? }
}
Challenges with the Old Approach:
New Java Code: Parallel Processing with Optimized Lookups
We can significantly improve performance by processing records in parallel using Java’s parallelStream(). Additionally, we’ll optimize the database lookups by caching the mappings in memory and using a functional index in the database.
Updated Code with Parallel Stream and Optimized Caching:
public String processRecords() {
??? List<Object[]> records = entityManager.createNativeQuery("SELECT * FROM main_data WHERE original_value IS NOT NULL")
??????????? .getResultList();
??? // Use parallel stream for processing records concurrently
??? List<DataRecord> resultList = records.parallelStream()
??????? .map(record -> {
领英推荐
??????????? String value = record[0].toString();
??????????? String modifiedValue = valueReplacer.replaceValue(value);
??????????? record[0] = modifiedValue;
??????????? return createDataRecord(record);? // Map to a new DataRecord object
??????? })
??????? .collect(Collectors.toList());? // Collect into a list for batch insertion
??? return insertIntoDatabase(resultList);
}
Improvements:
Updated Value Replacer (with Caching):
@Component
public class ValueReplacer {
??? private final Map<String, String> valueMappings = new HashMap<>();
??? private final ValueMappingRepository valueMappingRepository;
??? @Autowired
??? public ValueReplacer(ValueMappingRepository valueMappingRepository) {
??????? this.valueMappingRepository = valueMappingRepository;
??????? loadMappings();
??? }
??? private void loadMappings() {
??????? List<ValueMapping> mappings = valueMappingRepository.findAll();
??????? for (ValueMapping mapping : mappings) {
??????????? valueMappings.put(mapping.getOriginalValue().trim().toLowerCase(), mapping.getReplacementValue());
??????? }
??? }
??? public String replaceValue(String value) {
??????? String trimmedValue = value.trim().toLowerCase();
??????? return valueMappings.getOrDefault(trimmedValue, value);
??? }
}
Optimizing Database with Functional Indexing
To further improve performance, especially when querying the database for value mappings, we apply a functional index in PostgreSQL to handle case-insensitive lookups efficiently. Here’s how you can create the functional index:
CREATE INDEX idx_original_value_lower ON value_mappings(LOWER(original_value));
This index ensures that queries for value replacements (with case-insensitivity) can leverage the database index, rather than performing a full table scan.
Performance Comparison: Old vs New Code
Parallel Stream Processing: Theoretical Overview
Parallel streams in Java enable concurrent processing of large collections. This technique is beneficial for CPU-intensive tasks like data transformations, computations, and aggregations. By splitting the dataset into smaller chunks and processing them in parallel, parallel streams can reduce the overall time complexity significantly. A sequential stream can be transformed into a parallel one when there are specific performance requirements. In such cases, it's important to first conduct performance measurements and evaluate parallelism as a potential optimization strategy.
Example:
// Sequential version
List<Integer> numbersList = Arrays.asList(1, 2, 3, 4);
numbersList.stream().forEach(num ->
??? System.out.println(num + " " + Thread.currentThread().getName())
);
?
// Parallel version
List<Integer> numbersList = Arrays.asList(1, 2, 3, 4);
numbersList.parallelStream().forEach(num ->
??? System.out.println(num + " " + Thread.currentThread().getName())
); // Be careful about order of output print ??
Conclusion
By applying parallel stream processing and functional indexing, we were able to optimize the value replacement process, significantly reducing the processing time for large datasets.
If you’re working with large datasets and need to perform transformations like replacing values, I highly recommend:
With these optimizations, your application will be able to scale efficiently as your dataset grows.
Let me know if you have applied similar techniques in your projects or any further improvement in the above code samples!
#Java #JavaForDevelopers #JavaOptimization #JavaApplicationDevelopment
?
Researcher |Data Science | AI and Machine Learning | Python and Generative AI Professional@Orange Business
2 个月kudos ??
Very insightful