Exploring Fluid Dynamics Using Python: A Numerical Approach with Navier-Stokes Equations

Exploring Fluid Dynamics Using Python: A Numerical Approach with Navier-Stokes Equations


Fluid dynamics, a fascinating field of study, describes the behavior of fluids in motion. The incompressible Navier-Stokes equations stand as the backbone of understanding fluid flow. In this article, we'll delve into the mathematical model and use Python to implement a numerical solution to simulate fluid behavior within a closed box.


Mathematical Model - Incompressible Navier-Stokes Equations

The equations governing fluid flow in an incompressible space are defined as follows:

Momentum Equation:

The momentum equation for an incompressible fluid in a 2D space:

?u/?t+(u.?)u=-1/ρ ?p+ ν?^2 u+f

Where:

  • u is the fluid velocity vector.
  • ?u/?t represents the time derivative of velocity.
  • (u.?)u denotes convective acceleration, capturing how the fluid motion affects itself.
  • -1/ρ ?p accounts for pressure gradients within the fluid.
  • ν?^2 u describes the viscous forces acting on the fluid.
  • f signifies any external forcing applied to the fluid.

This equation encapsulates how various forces, including pressure gradients, viscosity, and external forces, influence the fluid's velocity evolution.


Incompressibility Condition:

The incompressibility condition ensures that the fluid remains incompressible. It's expressed as: ??u=0

Here:

  • ??u represents the divergence of the velocity field.
  • This condition enforces that the divergence of the velocity vector (u) is zero, signifying no net change in volume for fluid elements.


Solution Strategy

Initial state: Zero velocity everywhere: u= [0, 0]

Steps of the Solution Strategy (*):

  • Add forces: Calculate ω1=u+Δt.f , where f represents forces.
  • Self-advection: Update ω2 by backtracing values along the streamline: ω2=ω1 (p(x, -Δt)). (This step is unconditionally stable.)
  • Implicit diffusion: Solve a linear system matrix-free using Conjugate Gradient: (I -ν /Δt ?^2 ) ω3= ω2. (This step is unconditionally stable.)
  • Pressure correction: Solve a linear system matrix-free using Conjugate Gradient: ?^2 p= ? . ω3. Correct velocities for incompressibility: ω4 = ω3? - ?p.
  • Advance to next time step: Update u=ω4.
  • Boundary Conditions: Prescribed indirectly using discrete differential operators. Considerations: The solver is unconditionally stable, allowing arbitrary parameter choices, but excessively high timesteps can lead to significant inaccuracies in the advection step.


Implementing the Model Using Python

Let's bring this theoretical understanding to life using Python. The following code implements a numerical solution for the Navier-Stokes equations within a closed box. It utilizes a method called "Stable Fluids" to simulate the behavior of an incompressible fluid. After this section, I'll provide a detailed explanation of each part of the code.

import numpy as np
import scipy.sparse.linalg as splinalg
from scipy import interpolate
import matplotlib.pyplot as plt

# Optional
import cmasher as cmr
from tqdm import tqdm

DOMAIN_SIZE = 1.0
N_POINTS = 41
N_TIME_STEPS = 100
TIME_STEP_LENGTH = 0.1

KINEMATIC_VISCOSITY = 0.0001

MAX_ITER_CG = None


def forcing_function(time, point):
    time_decay = np.maximum(
        2.0 - 0.5 * time,
        0.0,
    )

    forced_value = (
            time_decay
            *
            np.where(
                (
                        (point[0] > 0.4)
                        &
                        (point[0] < 0.6)
                        &
                        (point[1] > 0.1)
                        &
                        (point[1] < 0.3)
                ),
                np.array([0.0, 1.0]),
                np.array([0.0, 0.0]),
            )
    )

    return forced_value


def main():
    element_length = DOMAIN_SIZE / (N_POINTS - 1)
    scalar_shape = (N_POINTS, N_POINTS)
    scalar_dof = N_POINTS ** 2
    vector_shape = (N_POINTS, N_POINTS, 2)
    vector_dof = N_POINTS ** 2 * 2

    x = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
    y = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)

    # Using "ij" indexing makes the differential operators more logical.
    
    X, Y = np.meshgrid(x, y, indexing="ij")

    coordinates = np.concatenate(
        (
            X[..., np.newaxis],
            Y[..., np.newaxis],
        ),
        axis=-1,
    )

    forcing_function_vectorized = np.vectorize(
        pyfunc=forcing_function,
        signature="(),(d)->(d)",
    )

    def partial_derivative_x(field):
        diff = np.zeros_like(field)

        diff[1:-1, 1:-1] = (
                (
                        field[2:, 1:-1]
                        -
                        field[0:-2, 1:-1]
                ) / (
                        2 * element_length
                )
        )

        return diff

    def partial_derivative_y(field):
        diff = np.zeros_like(field)

        diff[1:-1, 1:-1] = (
                (
                        field[1:-1, 2:]
                        -
                        field[1:-1, 0:-2]
                ) / (
                        2 * element_length
                )
        )

        return diff

    def laplace(field):
        diff = np.zeros_like(field)

        diff[1:-1, 1:-1] = (
                (
                        field[0:-2, 1:-1]
                        +
                        field[1:-1, 0:-2]
                        - 4 *
                        field[1:-1, 1:-1]
                        +
                        field[2:, 1:-1]
                        +
                        field[1:-1, 2:]
                ) / (
                        element_length ** 2
                )
        )

        return diff

    def divergence(vector_field):
        divergence_applied = (
                partial_derivative_x(vector_field[..., 0])
                +
                partial_derivative_y(vector_field[..., 1])
        )

        return divergence_applied

    def gradient(field):
        gradient_applied = np.concatenate(
            (
                partial_derivative_x(field)[..., np.newaxis],
                partial_derivative_y(field)[..., np.newaxis],
            ),
            axis=-1,
        )

        return gradient_applied

    def curl_2d(vector_field):
        curl_applied = (
                partial_derivative_x(vector_field[..., 1])
                -
                partial_derivative_y(vector_field[..., 0])
        )

        return curl_applied

    def advect(field, vector_field):
        backtraced_positions = np.clip(
            (
                    coordinates
                    -
                    TIME_STEP_LENGTH
                    *
                    vector_field
            ),
            0.0,
            DOMAIN_SIZE,
        )

        advected_field = interpolate.interpn(
            points=(x, y),
            values=field,
            xi=backtraced_positions,
        )

        return advected_field

    def diffusion_operator(vector_field_flattened):
        vector_field = vector_field_flattened.reshape(vector_shape)

        diffusion_applied = (
                vector_field
                -
                KINEMATIC_VISCOSITY
                *
                TIME_STEP_LENGTH
                *
                laplace(vector_field)
        )

        return diffusion_applied.flatten()

    def poisson_operator(field_flattened):
        field = field_flattened.reshape(scalar_shape)

        poisson_applied = laplace(field)

        return poisson_applied.flatten()

    plt.style.use("dark_background")
    plt.figure(figsize=(5, 5), dpi=160)

    velocities_prev = np.zeros(vector_shape)

    time_current = 0.0
    for i in tqdm(range(N_TIME_STEPS)):
        time_current += TIME_STEP_LENGTH

        forces = forcing_function_vectorized(
            time_current,
            coordinates,
        )

        # (1) Apply Forces
        velocities_forces_applied = (
                velocities_prev
                +
                TIME_STEP_LENGTH
                *
                forces
        )

        # (2) Nonlinear convection (=self-advection)
        velocities_advected = advect(
            field=velocities_forces_applied,
            vector_field=velocities_forces_applied,
        )

        # (3) Diffuse
        velocities_diffused = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(vector_dof, vector_dof),
                matvec=diffusion_operator,
            ),
            b=velocities_advected.flatten(),
            maxiter=MAX_ITER_CG,
        )[0].reshape(vector_shape)

        # (4.1) Compute a pressure correction
        pressure = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(scalar_dof, scalar_dof),
                matvec=poisson_operator,
            ),
            b=divergence(velocities_diffused).flatten(),
            maxiter=MAX_ITER_CG,
        )[0].reshape(scalar_shape)

        # (4.2) Correct the velocities to be incompressible
        velocities_projected = (
                velocities_diffused
                -
                gradient(pressure)
        )

        # Advance to next time step
        velocities_prev = velocities_projected

        # Plot
        curl = curl_2d(velocities_projected)
        plt.contourf(
            X,
            Y,
            curl,
            cmap=cmr.redshift,
            levels=100,
        )
        plt.quiver(
            X,
            Y,
            velocities_projected[..., 0],
            velocities_projected[..., 1],
            color="dimgray",
        )
        plt.draw()
        plt.pause(0.0001)
        plt.clf()

    plt.show()


if __name__ == "__main__":
    main()
        

Code Analysis:

  • Imports: Import necessary libraries - NumPy, SciPy, Matplotlib, and optional ones for visualization (cmasher, tqdm).

import numpy as np
import scipy.sparse.linalg as splinalg
from scipy import interpolate
import matplotlib.pyplot as plt

# Optional
import cmasher as cmr
from tqdm import tqdm        

  • Constants and Parameters: Define various constants like domain size, number of points, time steps, time step length, viscosity, and maximum iteration count for certain operations.

DOMAIN_SIZE = 1.0
N_POINTS = 41
N_TIME_STEPS = 100
TIME_STEP_LENGTH = 0.1
KINEMATIC_VISCOSITY = 0.0001
MAX_ITER_CG = None        

  • Forcing Function: The forcing_function calculates the external forces applied to the fluid at a given time and point within the simulation domain.

def forcing_function(time, point):
    time_decay = np.maximum(
        2.0 - 0.5 * time,
        0.0,
    )

    forced_value = (
            time_decay
            *
            np.where(
                (
                        (point[0] > 0.4)
                        &
                        (point[0] < 0.6)
                        &
                        (point[1] > 0.1)
                        &
                        (point[1] < 0.3)
                ),
                np.array([0.0, 1.0]),
                np.array([0.0, 0.0]),
            )
    )

    return forced_value        

  • Main Function:

def main():
    element_length = DOMAIN_SIZE / (N_POINTS - 1)
    scalar_shape = (N_POINTS, N_POINTS)
    scalar_dof = N_POINTS ** 2
    vector_shape = (N_POINTS, N_POINTS, 2)
    vector_dof = N_POINTS ** 2 * 2

    x = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
    y = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)

    # Using "ij" indexing makes the differential operators more logical.
    
    X, Y = np.meshgrid(x, y, indexing="ij")

    coordinates = np.concatenate(
        (
            X[..., np.newaxis],
            Y[..., np.newaxis],
        ),
        axis=-1,
    )

    forcing_function_vectorized = np.vectorize(
        pyfunc=forcing_function,
        signature="(),(d)->(d)",        

Grid Setup and Parameters:

Determines element length and shapes for scalar and vector fields based on domain size and point count.

Computes x and y coordinates using np.linspace for a grid within the domain size.

Constructs a mesh grid (X, Y) for logical indexing and differential operations.

Concatenates X and Y to form a coordinate grid representing points in the simulation domain.


Vectorization of Forcing Function:

np.vectorize creates an efficient version of the forcing function for bulk computation.

Signature specification defines the input and output format for the vectorized function.


  • Functions: Functions are defined for operations like partial derivatives, Laplacian, divergence, gradient, curl, advection, etc. Partial derivative:

def partial_derivative_x(field):
        diff = np.zeros_like(field)

        diff[1:-1, 1:-1] = (
                (
                        field[2:, 1:-1]
                        -
                        field[0:-2, 1:-1]
                ) / (
                        2 * element_length
                )
        )

        return diff        

(same applies for y)


Laplacian Operator:

def laplace(field):
        diff = np.zeros_like(field)

        diff[1:-1, 1:-1] = (
                (
                        field[0:-2, 1:-1]
                        +
                        field[1:-1, 0:-2]
                        - 4 *
                        field[1:-1, 1:-1]
                        +
                        field[2:, 1:-1]
                        +
                        field[1:-1, 2:]
                ) / (
                        element_length ** 2
                )
        )

        return diff        


Divergence Operator:

def divergence(vector_field):
        divergence_applied = (
                partial_derivative_x(vector_field[..., 0])
                +
                partial_derivative_y(vector_field[..., 1])
        )

        return divergence_applied        


Gradient Operator:

def gradient(field):
        gradient_applied = np.concatenate(
            (
                partial_derivative_x(field)[..., np.newaxis],
                partial_derivative_y(field)[..., np.newaxis],
            ),
            axis=-1,
        )

        return gradient_applied         


Curl Operator:

def curl_2d(vector_field):
        curl_applied = (
                partial_derivative_x(vector_field[..., 1])
                -
                partial_derivative_y(vector_field[..., 0])
        )

        return curl_applied        


Advection:

def advect(field, vector_field):
        backtraced_positions = np.clip(
            (
                    coordinates
                    -
                    TIME_STEP_LENGTH
                    *
                    vector_field
            ),
            0.0,
            DOMAIN_SIZE,
        )

        advected_field = interpolate.interpn(
            points=(x, y),
            values=field,
            xi=backtraced_positions,
        )

        return advected_field
        


Diffusion Operator:

def diffusion_operator(vector_field_flattened):
        vector_field = vector_field_flattened.reshape(vector_shape)

        diffusion_applied = (
                vector_field
                -
                KINEMATIC_VISCOSITY
                *
                TIME_STEP_LENGTH
                *
                laplace(vector_field)
        )

        return diffusion_applied.flatten()        


Poisson Operator:

def poisson_operator(field_flattened):
        field = field_flattened.reshape(scalar_shape)

        poisson_applied = laplace(field)

        return poisson_applied.flatten()        

  • Visualization: Plots the simulated fluid behavior using Matplotlib.

 plt.style.use("dark_background")
    plt.figure(figsize=(5, 5), dpi=160)

    velocities_prev = np.zeros(vector_shape)

    time_current = 0.0
    for i in tqdm(range(N_TIME_STEPS)):
        time_current += TIME_STEP_LENGTH

        forces = forcing_function_vectorized(
            time_current,
            coordinates,
        )

        # (1) Apply Forces
        velocities_forces_applied = (
                velocities_prev
                +
                TIME_STEP_LENGTH
                *
                forces
        )

        # (2) Nonlinear convection (=self-advection)
        velocities_advected = advect(
            field=velocities_forces_applied,
            vector_field=velocities_forces_applied,
        )

        # (3) Diffuse
        velocities_diffused = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(vector_dof, vector_dof),
                matvec=diffusion_operator,
            ),
            b=velocities_advected.flatten(),
            maxiter=MAX_ITER_CG,
        )[0].reshape(vector_shape)

        # (4.1) Compute a pressure correction
        pressure = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(scalar_dof, scalar_dof),
                matvec=poisson_operator,
            ),
            b=divergence(velocities_diffused).flatten(),
            maxiter=MAX_ITER_CG,
        )[0].reshape(scalar_shape)

        # (4.2) Correct the velocities to be incompressible
        velocities_projected = (
                velocities_diffused
                -
                gradient(pressure)
        )

        # Advance to next time step
        velocities_prev = velocities_projected

        # Plot
        curl = curl_2d(velocities_projected)
        plt.contourf(
            X,
            Y,
            curl,
            cmap=cmr.redshift,
            levels=100,
        )
        plt.quiver(
            X,
            Y,
            velocities_projected[..., 0],
            velocities_projected[..., 1],
            color="dimgray",
        )
        plt.draw()
        plt.pause(0.0001)
        plt.clf()

    plt.show()

if __name__ == "__main__":
    main()        

The conditional statement if __name__ == "__main__": is a common Python idiom used to check if the current script is being run as the main program. If it is, it executes the main() function.

Plots Visualization

The individual plots, depicting various time steps of the simulated fluid behavior, were compiled into a video. This video format allows for a continuous sequence of frames, creating a fluid animation that captures the evolving dynamics within the simulated closed box.

Unfortunately I can't upload a video into a LinkedIn article, so here's the link: https://youtu.be/0AX-SHyE_uk

Red and Blue Colors :

  • Color Representation: Red and blue colors indicate areas of positive and negative vorticity or rotational motion in the fluid. Red signifies one direction of rotation, while blue represents the opposite. Intense red or blue regions suggest higher vorticity in those areas.

Little Arrows :

  • Shape and Movement: The small arrows depict the velocity and direction of the fluid at different points in the simulation. These arrows form patterns that show the evolving flow dynamics of the simulated fluid. The movement and shape changes in the arrows represent alterations in fluid velocity and flow direction over time.


Conclusion

Python, with its numerical and scientific libraries like NumPy and SciPy, offers a powerful platform to solve complex mathematical models like the Navier-Stokes equations. The code provided demonstrates how these equations can be discretized and solved numerically to simulate the behavior of fluids within a confined space. This approach allows us to explore and understand fluid dynamics in various scenarios, aiding in the study of real-world phenomena and engineering applications.


Reference

(*) Stam, Jos. (2001). Stable Fluids. ACM SIGGRAPH 99. 1999.10.1145 / 311535 . 311548. https://www.researchgate.net/publication/2486965_Stable_Fluids




要查看或添加评论,请登录

Zakaria Temouch的更多文章

社区洞察