Exploring Fluid Dynamics Using Python: A Numerical Approach with Navier-Stokes Equations
Fluid dynamics, a fascinating field of study, describes the behavior of fluids in motion. The incompressible Navier-Stokes equations stand as the backbone of understanding fluid flow. In this article, we'll delve into the mathematical model and use Python to implement a numerical solution to simulate fluid behavior within a closed box.
Mathematical Model - Incompressible Navier-Stokes Equations
The equations governing fluid flow in an incompressible space are defined as follows:
Momentum Equation:
The momentum equation for an incompressible fluid in a 2D space:
?u/?t+(u.?)u=-1/ρ ?p+ ν?^2 u+f
Where:
This equation encapsulates how various forces, including pressure gradients, viscosity, and external forces, influence the fluid's velocity evolution.
Incompressibility Condition:
The incompressibility condition ensures that the fluid remains incompressible. It's expressed as: ??u=0
Here:
Solution Strategy
Initial state: Zero velocity everywhere: u= [0, 0]
Steps of the Solution Strategy (*):
Implementing the Model Using Python
Let's bring this theoretical understanding to life using Python. The following code implements a numerical solution for the Navier-Stokes equations within a closed box. It utilizes a method called "Stable Fluids" to simulate the behavior of an incompressible fluid. After this section, I'll provide a detailed explanation of each part of the code.
import numpy as np
import scipy.sparse.linalg as splinalg
from scipy import interpolate
import matplotlib.pyplot as plt
# Optional
import cmasher as cmr
from tqdm import tqdm
DOMAIN_SIZE = 1.0
N_POINTS = 41
N_TIME_STEPS = 100
TIME_STEP_LENGTH = 0.1
KINEMATIC_VISCOSITY = 0.0001
MAX_ITER_CG = None
def forcing_function(time, point):
time_decay = np.maximum(
2.0 - 0.5 * time,
0.0,
)
forced_value = (
time_decay
*
np.where(
(
(point[0] > 0.4)
&
(point[0] < 0.6)
&
(point[1] > 0.1)
&
(point[1] < 0.3)
),
np.array([0.0, 1.0]),
np.array([0.0, 0.0]),
)
)
return forced_value
def main():
element_length = DOMAIN_SIZE / (N_POINTS - 1)
scalar_shape = (N_POINTS, N_POINTS)
scalar_dof = N_POINTS ** 2
vector_shape = (N_POINTS, N_POINTS, 2)
vector_dof = N_POINTS ** 2 * 2
x = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
y = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
# Using "ij" indexing makes the differential operators more logical.
X, Y = np.meshgrid(x, y, indexing="ij")
coordinates = np.concatenate(
(
X[..., np.newaxis],
Y[..., np.newaxis],
),
axis=-1,
)
forcing_function_vectorized = np.vectorize(
pyfunc=forcing_function,
signature="(),(d)->(d)",
)
def partial_derivative_x(field):
diff = np.zeros_like(field)
diff[1:-1, 1:-1] = (
(
field[2:, 1:-1]
-
field[0:-2, 1:-1]
) / (
2 * element_length
)
)
return diff
def partial_derivative_y(field):
diff = np.zeros_like(field)
diff[1:-1, 1:-1] = (
(
field[1:-1, 2:]
-
field[1:-1, 0:-2]
) / (
2 * element_length
)
)
return diff
def laplace(field):
diff = np.zeros_like(field)
diff[1:-1, 1:-1] = (
(
field[0:-2, 1:-1]
+
field[1:-1, 0:-2]
- 4 *
field[1:-1, 1:-1]
+
field[2:, 1:-1]
+
field[1:-1, 2:]
) / (
element_length ** 2
)
)
return diff
def divergence(vector_field):
divergence_applied = (
partial_derivative_x(vector_field[..., 0])
+
partial_derivative_y(vector_field[..., 1])
)
return divergence_applied
def gradient(field):
gradient_applied = np.concatenate(
(
partial_derivative_x(field)[..., np.newaxis],
partial_derivative_y(field)[..., np.newaxis],
),
axis=-1,
)
return gradient_applied
def curl_2d(vector_field):
curl_applied = (
partial_derivative_x(vector_field[..., 1])
-
partial_derivative_y(vector_field[..., 0])
)
return curl_applied
def advect(field, vector_field):
backtraced_positions = np.clip(
(
coordinates
-
TIME_STEP_LENGTH
*
vector_field
),
0.0,
DOMAIN_SIZE,
)
advected_field = interpolate.interpn(
points=(x, y),
values=field,
xi=backtraced_positions,
)
return advected_field
def diffusion_operator(vector_field_flattened):
vector_field = vector_field_flattened.reshape(vector_shape)
diffusion_applied = (
vector_field
-
KINEMATIC_VISCOSITY
*
TIME_STEP_LENGTH
*
laplace(vector_field)
)
return diffusion_applied.flatten()
def poisson_operator(field_flattened):
field = field_flattened.reshape(scalar_shape)
poisson_applied = laplace(field)
return poisson_applied.flatten()
plt.style.use("dark_background")
plt.figure(figsize=(5, 5), dpi=160)
velocities_prev = np.zeros(vector_shape)
time_current = 0.0
for i in tqdm(range(N_TIME_STEPS)):
time_current += TIME_STEP_LENGTH
forces = forcing_function_vectorized(
time_current,
coordinates,
)
# (1) Apply Forces
velocities_forces_applied = (
velocities_prev
+
TIME_STEP_LENGTH
*
forces
)
# (2) Nonlinear convection (=self-advection)
velocities_advected = advect(
field=velocities_forces_applied,
vector_field=velocities_forces_applied,
)
# (3) Diffuse
velocities_diffused = splinalg.cg(
A=splinalg.LinearOperator(
shape=(vector_dof, vector_dof),
matvec=diffusion_operator,
),
b=velocities_advected.flatten(),
maxiter=MAX_ITER_CG,
)[0].reshape(vector_shape)
# (4.1) Compute a pressure correction
pressure = splinalg.cg(
A=splinalg.LinearOperator(
shape=(scalar_dof, scalar_dof),
matvec=poisson_operator,
),
b=divergence(velocities_diffused).flatten(),
maxiter=MAX_ITER_CG,
)[0].reshape(scalar_shape)
# (4.2) Correct the velocities to be incompressible
velocities_projected = (
velocities_diffused
-
gradient(pressure)
)
# Advance to next time step
velocities_prev = velocities_projected
# Plot
curl = curl_2d(velocities_projected)
plt.contourf(
X,
Y,
curl,
cmap=cmr.redshift,
levels=100,
)
plt.quiver(
X,
Y,
velocities_projected[..., 0],
velocities_projected[..., 1],
color="dimgray",
)
plt.draw()
plt.pause(0.0001)
plt.clf()
plt.show()
if __name__ == "__main__":
main()
Code Analysis:
import numpy as np
import scipy.sparse.linalg as splinalg
from scipy import interpolate
import matplotlib.pyplot as plt
# Optional
import cmasher as cmr
from tqdm import tqdm
DOMAIN_SIZE = 1.0
N_POINTS = 41
N_TIME_STEPS = 100
TIME_STEP_LENGTH = 0.1
KINEMATIC_VISCOSITY = 0.0001
MAX_ITER_CG = None
def forcing_function(time, point):
time_decay = np.maximum(
2.0 - 0.5 * time,
0.0,
)
forced_value = (
time_decay
*
np.where(
(
(point[0] > 0.4)
&
(point[0] < 0.6)
&
(point[1] > 0.1)
&
(point[1] < 0.3)
),
np.array([0.0, 1.0]),
np.array([0.0, 0.0]),
)
)
return forced_value
def main():
element_length = DOMAIN_SIZE / (N_POINTS - 1)
scalar_shape = (N_POINTS, N_POINTS)
scalar_dof = N_POINTS ** 2
vector_shape = (N_POINTS, N_POINTS, 2)
vector_dof = N_POINTS ** 2 * 2
x = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
y = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
# Using "ij" indexing makes the differential operators more logical.
X, Y = np.meshgrid(x, y, indexing="ij")
coordinates = np.concatenate(
(
X[..., np.newaxis],
Y[..., np.newaxis],
),
axis=-1,
)
forcing_function_vectorized = np.vectorize(
pyfunc=forcing_function,
signature="(),(d)->(d)",
Grid Setup and Parameters:
Determines element length and shapes for scalar and vector fields based on domain size and point count.
Computes x and y coordinates using np.linspace for a grid within the domain size.
Constructs a mesh grid (X, Y) for logical indexing and differential operations.
Concatenates X and Y to form a coordinate grid representing points in the simulation domain.
Vectorization of Forcing Function:
np.vectorize creates an efficient version of the forcing function for bulk computation.
Signature specification defines the input and output format for the vectorized function.
def partial_derivative_x(field):
diff = np.zeros_like(field)
diff[1:-1, 1:-1] = (
(
field[2:, 1:-1]
-
field[0:-2, 1:-1]
) / (
2 * element_length
)
)
return diff
(same applies for y)
Laplacian Operator:
def laplace(field):
diff = np.zeros_like(field)
diff[1:-1, 1:-1] = (
(
field[0:-2, 1:-1]
+
field[1:-1, 0:-2]
- 4 *
field[1:-1, 1:-1]
+
field[2:, 1:-1]
+
field[1:-1, 2:]
) / (
element_length ** 2
)
)
return diff
Divergence Operator:
def divergence(vector_field):
divergence_applied = (
partial_derivative_x(vector_field[..., 0])
+
partial_derivative_y(vector_field[..., 1])
)
return divergence_applied
Gradient Operator:
def gradient(field):
gradient_applied = np.concatenate(
(
partial_derivative_x(field)[..., np.newaxis],
partial_derivative_y(field)[..., np.newaxis],
),
axis=-1,
)
return gradient_applied
Curl Operator:
def curl_2d(vector_field):
curl_applied = (
partial_derivative_x(vector_field[..., 1])
-
partial_derivative_y(vector_field[..., 0])
)
return curl_applied
Advection:
def advect(field, vector_field):
backtraced_positions = np.clip(
(
coordinates
-
TIME_STEP_LENGTH
*
vector_field
),
0.0,
DOMAIN_SIZE,
)
advected_field = interpolate.interpn(
points=(x, y),
values=field,
xi=backtraced_positions,
)
return advected_field
Diffusion Operator:
def diffusion_operator(vector_field_flattened):
vector_field = vector_field_flattened.reshape(vector_shape)
diffusion_applied = (
vector_field
-
KINEMATIC_VISCOSITY
*
TIME_STEP_LENGTH
*
laplace(vector_field)
)
return diffusion_applied.flatten()
Poisson Operator:
def poisson_operator(field_flattened):
field = field_flattened.reshape(scalar_shape)
poisson_applied = laplace(field)
return poisson_applied.flatten()
plt.style.use("dark_background")
plt.figure(figsize=(5, 5), dpi=160)
velocities_prev = np.zeros(vector_shape)
time_current = 0.0
for i in tqdm(range(N_TIME_STEPS)):
time_current += TIME_STEP_LENGTH
forces = forcing_function_vectorized(
time_current,
coordinates,
)
# (1) Apply Forces
velocities_forces_applied = (
velocities_prev
+
TIME_STEP_LENGTH
*
forces
)
# (2) Nonlinear convection (=self-advection)
velocities_advected = advect(
field=velocities_forces_applied,
vector_field=velocities_forces_applied,
)
# (3) Diffuse
velocities_diffused = splinalg.cg(
A=splinalg.LinearOperator(
shape=(vector_dof, vector_dof),
matvec=diffusion_operator,
),
b=velocities_advected.flatten(),
maxiter=MAX_ITER_CG,
)[0].reshape(vector_shape)
# (4.1) Compute a pressure correction
pressure = splinalg.cg(
A=splinalg.LinearOperator(
shape=(scalar_dof, scalar_dof),
matvec=poisson_operator,
),
b=divergence(velocities_diffused).flatten(),
maxiter=MAX_ITER_CG,
)[0].reshape(scalar_shape)
# (4.2) Correct the velocities to be incompressible
velocities_projected = (
velocities_diffused
-
gradient(pressure)
)
# Advance to next time step
velocities_prev = velocities_projected
# Plot
curl = curl_2d(velocities_projected)
plt.contourf(
X,
Y,
curl,
cmap=cmr.redshift,
levels=100,
)
plt.quiver(
X,
Y,
velocities_projected[..., 0],
velocities_projected[..., 1],
color="dimgray",
)
plt.draw()
plt.pause(0.0001)
plt.clf()
plt.show()
if __name__ == "__main__":
main()
The conditional statement if __name__ == "__main__": is a common Python idiom used to check if the current script is being run as the main program. If it is, it executes the main() function.
Plots Visualization
The individual plots, depicting various time steps of the simulated fluid behavior, were compiled into a video. This video format allows for a continuous sequence of frames, creating a fluid animation that captures the evolving dynamics within the simulated closed box.
Unfortunately I can't upload a video into a LinkedIn article, so here's the link: https://youtu.be/0AX-SHyE_uk
Red and Blue Colors :
Little Arrows :
Conclusion
Python, with its numerical and scientific libraries like NumPy and SciPy, offers a powerful platform to solve complex mathematical models like the Navier-Stokes equations. The code provided demonstrates how these equations can be discretized and solved numerically to simulate the behavior of fluids within a confined space. This approach allows us to explore and understand fluid dynamics in various scenarios, aiding in the study of real-world phenomena and engineering applications.
Reference
(*) Stam, Jos. (2001). Stable Fluids. ACM SIGGRAPH 99. 1999.10.1145 / 311535 . 311548. https://www.researchgate.net/publication/2486965_Stable_Fluids