Introducing strict-stream - composable streams for AsyncIterable

Introducing strict-stream - composable streams for AsyncIterable

strict-stream?- a library for strictly-typed streams in TypeScript to manage & compose?AsyncIterable<T>

Asynchronous processing and stream-based data handling are becoming increasingly popular in modern software development.

However, handling streams can be challenging, particularly in strongly-typed languages such as TypeScript. I mentioned that in the previous post?Why AsyncIterable?Matters in Data Pipelines TypeScript .

To address this challenge, I would like to announce the release of?strict-stream, a new TypeScript library that provides strongly-typed, composable streams and operators for functional reactive programming based on?AsyncIterable.

strict-stream?allows you to create, transform, and consume streams in a strictly-typed, composable and reactive way. It leverages TypeScript's advanced type system to provide type safety and intuitive code completion, making it easier to handle streams without sacrificing code quality.

Let's take a look at some of the key features of?strict-stream

Type Safety and Composability

strict-stream?is built with TypeScript and provides strong type safety and composability out of the box.

With?strict-stream, you can create streams that emit and process strongly-typed data, and chain operators to compose complex transformations. This helps reduce the risk of type errors and makes it easier to build robust and maintainable applications.

Reactive Programming

strict-stream?is based on the principles of reactive programming, which allows you to handle streams of data in a declarative, composable, and efficient way. You can use operators like?map,?filter,?reduce, and more to transform, filter, or aggregate stream data, all while maintaining a reactive, data-driven approach to your application logic.

Backpressure Handling

strict-stream?provides built-in backpressure handling to ensure that streams are processed efficiently without overflowing. You can use operators like?buffer,?batchTimed, and?scale?to control the flow of data and prevent data loss or processing delays.

Node.js Integration

strict-stream?provides seamless integration with Node.js streams, allowing you to easily convert between?StrictStream?and Node.js streams using the?nodeReadable,?nodeWritable, and?nodeTransform?functions.

Code Examples

Here are a few examples of?strict-stream?in action:

import {run, of} from "strict-stream";
import {map} from "strict-stream/map";
import {tap} from "strict-stream/tap";

async function example() {
  // this example uses array as a source, but could be any kind of I
  // it makes: AsyncIterable<number>
  const stream = of([1, 2, 3])
    .pipe(map((value) => value * 2));

    for await (const value of stream) {
      // console.log(value);
    }

    // 2   
    // 4  
    // 6 
}

await example();
import {of} from "strict-stream";
import {sequence} from "strict-stream/generators";
import {tap} from "strict-stream/tap";

// some logic to fetch the user here sync or async  
async function fetchUserById(id: number) {
  // do some backend call here
  return {id, userName: `User ${id}`};
}

// example of AsyncIterable sequence, imaginary user ids :)
async function getUserIds() {
  return sequence(3);
}

async function example() {
  const usersStream = of(await getUserIds())
    .pipe(map(fetchUserById))
    .pipe(tap((user) => console.log(user)));

  await run(stream);
  // {id: 0, userName: 'User 0'}
  // {id: 1, userName: 'User 1'}
  // {id: 2, userName: 'User 2'}
}        

Benefits

strict-stream?provides a number of benefits:

  • Declarative programming:?Strict Stream provides a declarative programming model that allows you to express complex stream processing pipelines in a concise and easy-to-understand way.
  • Simplicity:?The API is straightforward and easy to use, and the library has minimal dependencies.
  • Efficiency:?Strict Stream is designed to be efficient and fast, with a low overhead.
  • Type safety:?Strict Stream is written in TypeScript, and provides type definitions for all of its APIs. This makes it easy to catch errors and improve code quality.
  • Flexibility:?Strict Stream works both in Node.js and in the browser, and can be used with a variety of data sources, such as arrays, object streams, and event streams.
  • Interoperability:?Strict Stream is compatible with other popular stream libraries, such as Node.js'?stream?module.

More examples

Creating a stream from an array

import {from} from 'strict-stream';

const myArray = [1, 2, 3];
const myStream = from(myArray);        

Transforming a stream

import {from} from 'strict-stream';
import {map} from 'strict-stream/map';

const myArray = [1, 2, 3];
const myStream = from(myArray).pipe(map((value) => value * 2));        

Reacting to events on a stream

import {from} from 'strict-stream';
import {tap} from 'strict-stream/tap';

const myArray = [1, 2, 3];
const myStream = from(myArray)
  .pipe(tap((value) => console.log(value)));

for await (const value of myStream) {
  console.log(value);
}        

Running a stream

There is a helper to loop over that stream called?run

import {from, run} from 'strict-stream';

const myArray = [1, 2, 3];
const myStream = from(myArray);

await run(myStream);        

Combining multiple streams

import {from, run} from 'strict-stream';
import {concatenate} from 'strict-stream/concatenate';
import {map} from 'strict-stream/map';

const myArray1 = [1, 2];
const myArray2 = [3, 4];

const myStream = from(concatenate(from(myArray1), from(myArray2)))
  .pipe(map((value) => value * 2));

await run(myStream); // Outputs: 2, 4, 6, 8        

Filtering a stream

import {from} from 'strict-stream';
import {filter} from 'strict-stream/filter';

const myArray = [1, 2, 3, 4, 5];
const myStream = from(myArray).pipe(filter((value) => value % 2 === 0));        

Advanced Stream Operations

strict-stream?includes several advanced stream operations for handling more complex use cases. These include:

  • batch: groups items in the stream into batches of a specific size.
  • concatenate: concatenates multiple streams into a single stream while maintaining the ordering of the output stream.
  • interval: creates a stream that emits a sequence of integers at regular intervals.
  • merge: merges multiple streams into a single stream.
  • scale: maps the stream with?max?concurrently.
  • scaleSync: maps the stream with?max?concurrently while maintaining the ordering of the output stream.
  • buffer: adds a bit of buffer to have more room for reader / upstream.
  • objectReader: simplifies reading source of objects.
  • batchTimed: emits batches by?size?or?maxTimeout.

Conclusion

StrictStream?provides a powerful and flexible way to work with streams in TypeScript, making it easy to build complex data processing pipelines. With its comprehensive set of built-in operators and utilities,?StrictStream?makes it easy to work with a wide variety of data sources and formats, while ensuring that your code is correct and error-free at compile-time. If you're looking for a simple and elegant way to work with streams in TypeScript,?StrictStream?is definitely worth checking out!

There is NPM:?https://www.npmjs.com/package/strict-stream

Happy stream processing! With ?? from Ukraine ????

要查看或添加评论,请登录

社区洞察

其他会员也浏览了