プログラミング実践

【Python】Pythonと格子ボルツマン法(LBM)でカルマン渦を可視化する

記事内に商品プロモーションを含む場合があります

どうも、コンです。

私は工学系出身のプログラマなのですが、正直なところ「物理方程式をプログラムで解く」という経験がなく、興味だけずっと持ち続けていた人でした。

せっかくならよく出てくる流体まわりをやってみたい。でも教科書を開くと偏微分方程式がずらっと並んでいて、読むだけでお腹いっぱいになります。

そんなときふと「プログラミングで物理方程式を解いて、結果を可視化したら、流体ってもっと感覚的にわかるんじゃないか」と思い立ち、Python で カルマン渦列を可視化するシミュレーションを書いて動かしてみました。

これがその出力結果です。

背景の色(オレンジ〜紫のヒートマップ):そのマスにおける流体の速さ (|u|) を表しています。明るい(オレンジ・黄色)ほど速く、暗い(紫・黒)ほど止まりかけ。

白い小さな点(トレーサー粒子):左から流し続けている小さな目印です。

円柱のうしろにきれいな渦が交互に剥がれていく、いわゆるカルマン渦列です。

コードは200行弱、依存ライブラリは numpymatplotlib(GIF 化に Pillow)だけです。

そもそも「カルマン渦列」って何?

まず「渦」とは何かから整理しておくと、流体のなかでぐるぐる回転している部分のことを渦と呼びます。お風呂の栓を抜いたときに水が回りながら吸い込まれる、あの渦巻きをイメージしてもらえれば近いです。

カルマン渦列は、この渦が特定の条件下で規則的に発生する現象のことを言います。具体的には、棒や円柱のような「丸っこい物体」に一定の流れ(風や水)を当て続けると、後ろ側で次のようなことが起きます。

  1. 流れが物体の前にぶつかって、左右に分かれて回り込む
  2. 物体の後ろ側では左右の流れが合流できずに、ぐるぐる巻きの「淀み」が生まれる
  3. その巻きがある瞬間にプリッと剥がれて下流に流される
  4. 剥がれた直後に、今度は反対側でぐるぐる巻きが生まれて、またプリッと剥がれる
  5. これが規則的に交互に繰り返される

結果として、物体の後ろには「右回りの渦・左回りの渦・右回りの渦…」が交互にずらっと並びます。これがカルマン渦列です。冒頭の GIF で円柱の右側にぽつぽつ見える、規則正しく並んだ暗いパターンがまさにそれにあたります。

Wikipedia の定義もシンプルなので引用しておきます。

カルマン渦(カルマンうず、Kármán vortex street)あるいはカルマン渦列とは、ある一定の条件下で、流れの中に置かれた物体の後方に交互にできる渦の列のことである。

カルマン渦 – Wikipedia

流体の支配方程式:ナビエ・ストークス

流体の動きを記述する基本方程式は、ナビエ・ストークス方程式と呼ばれるものです。非圧縮流体(密度が一定)の場合、こんな式になります。

\[ \rho \left( \frac{\partial \mathbf{u}}{\partial t} + (\mathbf{u} \cdot \nabla) \mathbf{u} \right) = -\nabla p + \mu \nabla^2 \mathbf{u} \]

記号がいっぱい出てきて怖いですが、ざっくり訳すと各項はこういう意味らしいです。

  • 左辺:流体の塊の加速度(時間的な変化+自分自身の流れに乗った変化)
  • 右辺第1項 \(-\nabla p\):圧力の高いほうから低いほうへ押される力
  • 右辺第2項 \(\mu \nabla^2 \mathbf{u}\):粘性で「隣のお水に引きずられる」力

つまり「F=ma を流体バージョンに書き直した式」と言えるみたいです。原理的にはこの式を解けば流体の動きはわかる、はず。ただし非線形の偏微分方程式で解析的に解けるケースはほとんどないので、コンピュータで近似的に解く(数値流体力学、CFD)必要があります。

レイノルズ数 Re

流体の挙動を特徴づける無次元の数として、レイノルズ数 \(Re\) があります。

\[ Re = \frac{\rho U L}{\mu} = \frac{U L}{\nu} \]

  • \(U\):代表速度(今回は流入速度)
  • \(L\):代表長さ(今回は円柱の直径)
  • \(\nu\):動粘性係数(粘りけのあらわれ)

ざっくり言うと 「慣性の力」 ÷ 「粘りの力」 の比です。\(Re\) が大きいほど勢いが粘りに勝つ、というイメージ。

  • \(Re\) が小さい(粘性が支配的)→ 流れはぬるっと層状になり、渦は出ない
  • \(Re \approx 40〜200\) → きれいな周期的なカルマン渦列が出る ← 今回ここ
  • \(Re\) がもっと大きい → 乱流に突入する

コードでは RE = 200 と設定しています。これはまさに「きれいなカルマン渦が出るスイートスポット」を狙った値、ということになります。Wikipedia のレイノルズ数の項にも、円柱まわりの流れが \(Re\) によってどう変わるかの図があり、見ていて結構面白いです。

LBM(格子ボルツマン法)

ナビエ・ストークス方程式を直接離散化して解く方法(FDM/FVM/FEM 系)が一般的らしいのですが、今回のコードは格子ボルツマン法(Lattice Boltzmann Method, LBM)という別アプローチを使っています。

LBM の考え方は驚くほどシンプルでした。

流体を「格子上の小さな粒子の集団」と見なし、各マスで 衝突(混ざる)→ 移動(隣のマスへ進む) の2ステップだけ繰り返す。それだけ。


「え、それだけで本当にナビエ・ストークスと同じ流体になるの?」というのが初見の正直な感想でした。これでもチャップマン・エンスコグ展開という手続きで「マクロに見るとナビエ・ストークスと一致する」ことが示されているらしいです。

2次元・9方向の D2Q9 というモデルでは、各マスが「9方向への移動量」を持ちます。図にするとこんな感じ。

   6  2  5
    \ | /
   3 - 0 - 1
    / | \
   7  4  8

各方向 \(i\) について粒子の分布関数 \(f_i(\mathbf{x}, t)\) を持ち、こう更新します。

衝突(BGK 近似):

\[ f_i(\mathbf{x}, t+1) = f_i(\mathbf{x}, t) – \frac{1}{\tau} \left( f_i(\mathbf{x}, t) – f_i^{\mathrm{eq}}(\mathbf{x}, t) \right) \]

平衡分布 \(f_i^{\mathrm{eq}}\) に向かって緩和時間 \(\tau\) で近づける、というだけ。「混ざって落ち着く」イメージです。

ストリーミング(移動):

\[ f_i(\mathbf{x} + \mathbf{c}_i, t+1) = f_i(\mathbf{x}, t+1) \]

衝突で更新した値を、各方向のベクトル \(\mathbf{c}_i\) に沿って隣のマスへコピーする。それだけです。

たったこれだけで、流体らしい動きが立ち上がってくる。いいアルゴリズムだなと感じました。

コードの中身(重要4ブロック)

シミュレーションの心臓部にあたる4ブロックを、折りたたみで貼っていきます。クリックで展開できます。

① 平衡分布関数 f^eq の計算
def equilibrium(rho, ux, uy):
    """マクスウェル分布(feq)。"""
    cu = 3.0 * (CXS[:, None, None] * ux + CYS[:, None, None] * uy)
    usq = 1.5 * (ux ** 2 + uy ** 2)
    return W[:, None, None] * rho * (1.0 + cu + 0.5 * cu ** 2 - usq)

各マスの密度 \(\rho\) と速度 \((u_x, u_y)\) から、9方向それぞれの「ありたい平衡分布」を計算しています。BGK 衝突はこの \(f^{\mathrm{eq}}\) に向かって寄っていくだけ、という関数。

② 衝突ステップ(BGK)
# 3) 衝突 (BGK)
feq = equilibrium(rho, ux, uy)
f = f - OMEGA * (f - feq)

たった2行。\(f\) を平衡分布のほうへ \(\omega = 1/\tau\) の割合だけ寄せる、というのを格子全体で一気にやっています。NumPy のブロードキャストが効いていて、ループを書く必要がないのが気持ちいいところ。

③ ストリーミング(隣のマスへの移動)
# 6) ストリーミング(各方向にロール)
for i in range(9):
    f[i] = np.roll(f[i], (CYS[i], CXS[i]), axis=(0, 1))

9方向それぞれの分布を、その方向のベクトル分だけ np.roll でずらすだけ。これが「移動」のステップ。難しい数値計算は何もしていません。本当にこれだけで流体になるのが、何度見ても不思議です。

④ 障害物の bounce-back(円柱の境界条件)
# 5) 障害物の bounce-back(衝突後 → ストリーミング前)
f_obs = f[:, obstacle].copy()
f[:, obstacle] = f_obs[OPP, :]

円柱マスクの中にあるセルは、各方向の分布を「逆方向」に入れ替えるだけ。ぶつかった粒子をそのまま跳ね返す、というすごく直感的な境界条件です。これで「壁」が表現できる。

動かしてみた感想

実行時間は私の手元の Mac(M1, NumPy のみ・GPU 不使用)で 約2分でした。具体的な内訳はこんな感じ。

  • 格子サイズ 400 × 100、12,000 ステップ計算
  • 40 ステップごとにスナップショットを保存(→ 300 枚で GIF 化)
  • 白いトレーサー粒子 250 個も同時に動かしている

NumPy のブロードキャストとロールだけでここまで動くんだ、というのは、書いていて気持ちのいい体験でした。個人で物理シミュレーションを遊ぶには十分すぎるパフォーマンスがあるなと思います。

そして当初の動機だった「可視化したら物理が腹落ちするか?」という問いについては、自分の中では答えが出た気がしました。

「\(Re\) が大きいと乱流」とただ覚えるよりも、自分で RE の値を切り替えて、本当に渦の出方が変わる様子を見るほうが、教科書を10回読むより記憶に残ります。物理わからん勢にとって、可視化アプローチはかなり相性がいいなと思いました。次は \(Re\) を変えて遊んでみたり、円柱の代わりに翼の形にして揚力みたいな話に進めたら面白そうだなと考えています。

参考にした書籍・資料

まとめ

  • カルマン渦列は、円柱の後ろに渦が交互に剥がれる現象。一定流入なのに勝手に周期が立ち上がるのが面白い
  • 支配方程式はナビエ・ストークス、挙動はレイノルズ数 \(Re\) で決まる。今回は \(Re=200\) のスイートスポットを狙った
  • LBM(格子ボルツマン法)は「衝突 + 移動」の2ステップだけで流体を再現する、プログラマと相性のいいアルゴリズム
  • Python(NumPy + matplotlib)200 行弱、手元の Mac で約2分。物理わからん勢でも可視化で腹落ちする

流体の方程式は記号が多くて重そうですが、可視化を絡めると一気に親しみが湧きます。「数式を解くのではなく、シミュレーションを動かして観察する」という入り口は、物理に苦手意識のある自分のようなプログラマにはなかなかおすすめできる学習法だと感じました。

ここまで読んでいただきありがとうございました。それでは、また。

全コード

最後に、上記のシミュレーションのフルコードを置いておきます。numpymatplotlib(GIF 化に Pillow)だけで動きます。

karman.py 全コード
"""
2D Lattice Boltzmann Method (D2Q9) で円柱周りの流れを解く。
カルマン渦列が自然に立ち上がる。

可視化:
  - 背景: 速度の大きさ |u|(カラーバー付き、inferno)
  - 白いトレーサー粒子: インレットから流して動きを直感的に見せる

依存: numpy, matplotlib (GIF 保存に Pillow)
実行: python3 karman.py
"""
from pathlib import Path
import time
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation

# ---------- 計算条件 ----------
NX, NY = 400, 100              # 格子サイズ
N_STEPS = 12000                # 総タイムステップ
SAVE_EVERY = 40                # スナップショット間隔
RE = 200                       # レイノルズ数(カルマン渦列が出る範囲)
U_IN = 0.05                    # 流入速度(lattice units, 通常 < 0.1)
CXC, CYC, CR = NX // 5, NY // 2 + 2, 10  # 円柱の中心(x,y)と半径

NU = U_IN * (2 * CR) / RE      # 動粘性係数
TAU = 3.0 * NU + 0.5           # 緩和時間
OMEGA = 1.0 / TAU              # BGK の緩和係数(< 2 で安定)

# ---------- D2Q9 格子 ----------
# 0:静止 / 1:E 2:N 3:W 4:S / 5:NE 6:NW 7:SW 8:SE
CXS = np.array([0,  1, 0, -1,  0,  1, -1, -1,  1])
CYS = np.array([0,  0, 1,  0, -1,  1,  1, -1, -1])
W   = np.array()
OPP = np.array([0,  3, 4,  1,  2,  7,  8,  5,  6])  # 反対方向(bounce-back用)

# ---------- 障害物(円柱)マスク ----------
xs, ys = np.meshgrid(np.arange(NX), np.arange(NY))
obstacle = (xs - CXC) ** 2 + (ys - CYC) ** 2 < CR ** 2
# 上側に小さなコブを足して幾何的に対称性を破る(=シェディングを強制)
obstacle |= (xs - (CXC + 2)) ** 2 + (ys - (CYC + CR + 1)) ** 2 < 9


def equilibrium(rho, ux, uy):
    """マクスウェル分布(feq)。"""
    cu = 3.0 * (CXS[:, None, None] * ux + CYS[:, None, None] * uy)
    usq = 1.5 * (ux ** 2 + uy ** 2)
    return W[:, None, None] * rho * (1.0 + cu + 0.5 * cu ** 2 - usq)


def bilinear_sample(field, xp, yp):
    """field (NY,NX) を浮動小数点位置 (xp, yp) で双線形補間して返す。"""
    xi = np.clip(xp.astype(int), 0, NX - 2)
    yi = np.clip(yp.astype(int), 0, NY - 2)
    fx = xp - xi
    fy = yp - yi
    return (field[yi, xi]     * (1 - fx) * (1 - fy) +
            field[yi, xi + 1] * fx       * (1 - fy) +
            field[yi + 1, xi] * (1 - fx) * fy       +
            field[yi + 1, xi + 1] * fx   * fy)


# ---------- 初期化 ----------
rng = np.random.RandomState(0)
rho = np.ones((NY, NX))
ux = np.full((NY, NX), U_IN) * (1 + 0.05 * (rng.rand(NY, NX) - 0.5))
uy = 0.05 * U_IN * (rng.rand(NY, NX) - 0.5)
f = equilibrium(rho, ux, uy)

# 流入境界に持続的に与える微小サイン波擾乱(対称性を破り続ける)
y_arr = np.arange(NY)
INFLOW_UY_PERT = 0.001 * np.sin(2 * np.pi * y_arr / NY)

# ---------- トレーサー粒子 ----------
N_TRACERS = 250
rng_t = np.random.RandomState(1)
tracer_x = rng_t.uniform(1.0, 5.0, N_TRACERS)
tracer_y = rng_t.uniform(2.0, NY - 2.0, N_TRACERS)


def reset_tracers(mask):
    """mask が True の粒子を流入側に再投入する。"""
    n = int(mask.sum())
    if n == 0:
        return
    tracer_x[mask] = rng_t.uniform(1.0, 5.0, n)
    tracer_y[mask] = rng_t.uniform(2.0, NY - 2.0, n)


# ---------- 保存用バッファ ----------
speed_snaps: list[np.ndarray] = []
tracer_snaps: list[tuple[np.ndarray, np.ndarray]] = []

print(f"開始: {NX}×{NY}, Re={RE}, U_IN={U_IN}, ω={OMEGA:.3f}")
print(f"  (ω は < 2 が安定条件)")

t0 = time.time()
for step in range(N_STEPS):
    # 1) マクロ量
    rho = f.sum(axis=0)
    ux = (CXS[:, None, None] * f).sum(axis=0) / rho
    uy = (CYS[:, None, None] * f).sum(axis=0) / rho

    # 2) 流入境界(左端)の速度を固定 + 持続擾乱(サイン + 確率的ノイズ)
    ux[:, 0] = U_IN
    uy[:, 0] = INFLOW_UY_PERT + 0.005 * U_IN * rng.randn(NY)
    rho[:, 0] = 1.0

    # 3) 衝突 (BGK)
    feq = equilibrium(rho, ux, uy)
    f = f - OMEGA * (f - feq)

    # 4) 流入境界の f を feq に置換(簡易 BC)
    f[:, :, 0] = feq[:, :, 0]

    # 5) 障害物の bounce-back(衝突後 → ストリーミング前)
    f_obs = f[:, obstacle].copy()
    f[:, obstacle] = f_obs[OPP, :]

    # 6) ストリーミング(各方向にロール)
    for i in range(9):
        f[i] = np.roll(f[i], (CYS[i], CXS[i]), axis=(0, 1))

    # 7) 出口境界(右端): 一つ手前列をコピー
    f[:, :, -1] = f[:, :, -2]

    # 8) トレーサー前進(双線形補間で速度をサンプリング)
    ut = bilinear_sample(ux, tracer_x, tracer_y)
    vt = bilinear_sample(uy, tracer_x, tracer_y)
    tracer_x += ut
    tracer_y += vt
    # 領域外に出たら再投入
    out_mask = ((tracer_x > NX - 2) | (tracer_x < 0) |
                (tracer_y < 1) | (tracer_y > NY - 2))
    reset_tracers(out_mask)
    # 障害物に入ったら再投入
    tx_i = np.clip(np.round(tracer_x).astype(int), 0, NX - 1)
    ty_i = np.clip(np.round(tracer_y).astype(int), 0, NY - 1)
    in_obs = obstacle[ty_i, tx_i]
    reset_tracers(in_obs)

    # 9) スナップショット
    if step % SAVE_EVERY == 0:
        speed = np.sqrt(ux ** 2 + uy ** 2)
        speed[obstacle] = np.nan
        speed_snaps.append(speed.astype(np.float32))
        tracer_snaps.append((tracer_x.copy(), tracer_y.copy()))

    if step % 500 == 0:
        elapsed = time.time() - t0
        print(f"  step {step:5d}/{N_STEPS}  max|u|={np.nanmax(speed):.4f}  ({elapsed:.1f}s)")

print(f"計算完了 ({time.time()-t0:.1f}s)、スナップショット {len(speed_snaps)} 枚")

# ---------- アニメーション ----------
out_path = Path(__file__).parent / "karman.gif"

fig, ax = plt.subplots(figsize=(11, 3.4), dpi=100)
fig.subplots_adjust(left=0.02, right=0.92, top=0.92, bottom=0.04)

vmax = U_IN * 2.5
im = ax.imshow(speed_snaps[0], cmap='inferno', vmin=0, vmax=vmax,
               origin='lower', interpolation='bilinear',
               extent=[0, NX, 0, NY], zorder=1)

tx0, ty0 = tracer_snaps[0]
sc = ax.scatter(tx0, ty0, s=10, c='white', alpha=0.95,
                edgecolors='black', linewidths=0.3, zorder=3)

# 円柱(コブ含む)を実マスクからオーバーレイ
ax.imshow(np.where(obstacle, 1.0, np.nan), cmap='gray', vmin=0, vmax=1,
          origin='lower', extent=[0, NX, 0, NY], zorder=4)

ax.set_xticks([]); ax.set_yticks([])
title = ax.set_title(f"2D LBM (D2Q9), Re={RE}, step 0", fontsize=11)

cb = fig.colorbar(im, ax=ax, fraction=0.025, pad=0.012)
cb.set_label("speed |u|  (lattice units)", fontsize=9)
cb.ax.tick_params(labelsize=8)


def update(idx):
    im.set_data(speed_snaps[idx])
    tx, ty = tracer_snaps[idx]
    sc.set_offsets(np.column_stack([tx, ty]))
    title.set_text(f"2D LBM (D2Q9), Re={RE}, step {idx * SAVE_EVERY}")
    return [im, sc, title]


anim = animation.FuncAnimation(fig, update, frames=len(speed_snaps),
                                interval=70, blit=False)
print(f"GIF を書き込み中: {out_path}")
anim.save(out_path, writer=animation.PillowWriter(fps=15))
plt.close(fig)
print("完了")

COMMENT

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA