Ichinomiya wealth-exchange model

Autoplay, sharp plots, original layout
seed42
N200
k4
J′0.2
σ1.0
p0
α0
steps300
Waiting to run…
Network (node size ∝ wealth)
Wealth distribution
Cumulative Wealth Distribution
packages = ["numpy", "networkx", "matplotlib"] from js import document, window from pyscript import display from pyodide.ffi import create_proxy import numpy as np import networkx as nx import matplotlib.pyplot as plt WEALTH_HISTORY = [] GLOB_G = None GLOB_POS = None GLOB_LOGHIST = False PLAY_HANDLE = None PLAY_CB = None def clear_target(target_id: str): el = document.getElementById(target_id) if el is not None: el.innerHTML = "" def euler_maruyama_step(x, A, J, sigma, Dt, rng, pow_val, alpha_val): """Performs one Euler-Maruyama step of the wealth exchange model.""" N = x.shape[0] Ax = A.dot(x) kmax = int(A.sum(axis=1).max()) if N > 0 else 0 # Use new parameters pow = pow_val alpha = alpha_val # Deterministic part det = J * Dt * ((x**pow) * Ax - kmax * x) # Stochastic part (correlated noise) u0 = rng.normal(size=N) v = rng.normal(size=N) delta = alpha / np.sqrt(1 + alpha**2) xi = delta * np.abs(u0) + np.sqrt(1 - delta**2) * v sto = np.sqrt(2 * Dt) * x * sigma * xi x_new = x + det + sto x_new = np.maximum(x_new, 0.0) return x_new def make_layout(G, layout, seed=0): if layout == "circular": return nx.circular_layout(G) elif layout == "spectral": return nx.spectral_layout(G) else: return nx.spring_layout(G, seed=seed) def get_play_interval_ms(): sp_el = document.getElementById("speed-slider") fps = int(sp_el.value) if fps < 1: fps = 1 return int(1000 / fps) def get_current_frame_index(): slider = document.getElementById("timestep-slider") return int(slider.value) def replot_current_frame(event=None): """Re-plots the current frame when the checkbox state changes.""" idx = get_current_frame_index() show_frame_from_index(idx) def run_sim(event=None): global WEALTH_HISTORY, GLOB_G, GLOB_POS, GLOB_LOGHIST stop_play(event) status_el = document.getElementById("status") status_el.innerText = "Running…" # Read all parameters seed = int(document.getElementById("seed").value) N = int(document.getElementById("nodes").value) k = int(document.getElementById("k").value) Jprime = float(document.getElementById("jprime").value) sigma = float(document.getElementById("sigma").value) # Read new parameters pow_val = float(document.getElementById("pow").value) alpha_val = float(document.getElementById("alpha").value) timesteps = int(document.getElementById("timesteps").value) layout = document.getElementById("layout").value normalize = document.getElementById("normalize").checked loghist = document.getElementById("loghist").checked GLOB_LOGHIST = loghist rng = np.random.default_rng(seed) if k >= N: status_el.innerText = "Error: k must be smaller than N." return J = Jprime * k # Recalculate time step based on J and sigma dt = 0.01 if J > 0 and sigma > 0: dt = 0.01 * min(1.0 / J, 1.0 / (sigma**2)) elif J > 0: dt = 0.01 * (1.0 / J) elif sigma > 0: dt = 0.01 * (1.0 / (sigma**2)) G = nx.random_regular_graph(k, N, seed=seed) A = nx.to_numpy_array(G) pos = make_layout(G, layout, seed=seed) x = np.full(N, 1.0 / N) WEALTH_HISTORY = [] for t in range(timesteps): # Pass new parameters to the step function x = euler_maruyama_step(x, A, J, sigma, dt, rng, pow_val, alpha_val) if normalize: s = x.sum() if s > 0: x = x / s WEALTH_HISTORY.append(x.copy()) GLOB_G = G GLOB_POS = pos # show frame controls fc = document.getElementById("frame-controls") fc.style.display = "block" slider = document.getElementById("timestep-slider") slider.max = str(len(WEALTH_HISTORY) - 1) slider.oninput = create_proxy(replot_current_frame) # Attach listener to slider # start at frame 1 if possible start_frame = 1 if len(WEALTH_HISTORY) > 1 else 0 slider.value = str(start_frame) document.getElementById("current-step").innerText = str(start_frame) document.getElementById("total-steps").innerText = str(len(WEALTH_HISTORY) - 1) show_frame_from_index(start_frame) status_el.innerText = f"Done. N={N}, k={k}, J={J:.3f}, σ={sigma}, pow={pow_val:.1f}, α={alpha_val:.2f}, steps={timesteps}" # auto-start start_play() def plot_cumulative_wealth(x, idx): """Generates and displays the cumulative wealth distribution (Lorenz Curve).""" clear_target("cumul_plot") # Read the checkbox state order_wealth = document.getElementById("order_wealth").checked # Prepare data if order_wealth: x_data = np.sort(x) # Sort ascending for Lorenz curve title = f"Lorenz Curve (Step {idx})" xlabel = "Cumulative share of population (ordered by wealth)" else: x_data = x # Use original order title = f"Cumulative Wealth (Step {idx})" xlabel = "Node index (original order)" # Calculate cumulative distribution # Normalize cumulative wealth (sum should be 1 if normalized) cum_wealth = np.cumsum(x_data) if cum_wealth[-1] > 0: cum_wealth /= cum_wealth[-1] # Normalized node indices N = len(x) nodes_normalized = np.arange(1, N + 1) / N fig_c, ax_c = plt.subplots(figsize=(3.3, 2.6), dpi=200) fig_c.patch.set_facecolor("#0f172a") ax_c.set_facecolor("#0f172a") # Plot the data ax_c.plot(nodes_normalized, cum_wealth, color="#f471b5", linewidth=2.5, zorder=2) # Plot the line of equality ax_c.plot([0, 1], [0, 1], color="#475569", linestyle='--', linewidth=0.8, alpha=0.7, zorder=1) # Styling ax_c.set_xlim(0, 1) ax_c.set_ylim(0, 1) ax_c.set_title(title, color="#e2e8f0", fontsize=9) ax_c.set_xlabel(xlabel, color="#e2e8f0", fontsize=8) ax_c.set_ylabel("Cumulative share of wealth", color="#e2e8f0", fontsize=8) ax_c.tick_params(colors="#e2e8f0", labelsize=7) ax_c.grid(linestyle=':', alpha=0.2) fig_c.tight_layout() display(fig_c, target="cumul_plot") plt.close(fig_c) def show_frame_from_index(idx: int): global WEALTH_HISTORY, GLOB_G, GLOB_POS, GLOB_LOGHIST if not WEALTH_HISTORY: return if idx < 0: idx = 0 if idx >= len(WEALTH_HISTORY): idx = len(WEALTH_HISTORY) - 1 x = WEALTH_HISTORY[idx] clear_target("network_plot") clear_target("hist_plot") # clear_target("cumul_plot") # Cleared inside plot_cumulative_wealth for efficiency # Update step display document.getElementById("current-step").innerText = str(idx) # --- Network plot --- fig_net, ax_net = plt.subplots(figsize=(4, 4), dpi=200) fig_net.patch.set_facecolor("#0f172a") ax_net.set_facecolor("#0f172a") ax_net.set_axis_off() G = GLOB_G pos = GLOB_POS maxx = max(x.max(), 1e-12) sizes = (x / maxx) * 750 + 1 for u, v in G.edges(): xu, yu = pos[u] xv, yv = pos[v] ax_net.plot([xu, xv], [yu, yv], lw=0.35, color="#475569", alpha=0.35, zorder=1) xs = [pos[i][0] for i in range(len(x))] ys = [pos[i][1] for i in range(len(x))] sc = ax_net.scatter(xs, ys, s=sizes, c=x, cmap="plasma", edgecolors="#0f172a", linewidths=0.35, zorder=2) ax_net.set_title(f"Network, step {idx}", fontsize=10, color="#e2e8f0", pad=4) cbar = fig_net.colorbar(sc, ax=ax_net, fraction=0.046, pad=0.03) cbar.ax.set_ylabel("wealth", color="#e2e8f0", fontsize=8) for tick in cbar.ax.get_yticklabels(): tick.set_color("#e2e8f0") display(fig_net, target="network_plot") plt.close(fig_net) # --- Histogram --- fig_h, ax_h = plt.subplots(figsize=(3.3, 2.6), dpi=200) fig_h.patch.set_facecolor("#0f172a") ax_h.set_facecolor("#0f172a") if GLOB_LOGHIST and np.sum(x > 0) > 0: x_nz = x[x > 0] x_min = max(x_nz.min() * 0.9, 1e-6) x_max = x.max() * 1.1 bins = np.logspace(np.log10(x_min), np.log10(x_max), 30) ax_h.hist(x, bins=bins, log=True, color="#38bdf8", alpha=0.85, edgecolor="#0f172a", linewidth=0.4) ax_h.set_xscale("log") ax_h.set_title("Wealth (lin-log)", color="#e2e8f0", fontsize=9) else: ax_h.hist(x, bins=30, color="#38bdf8", alpha=0.85, edgecolor="#0f172a", linewidth=0.4) ax_h.set_title("Wealth distribution", color="#e2e8f0", fontsize=9) ax_h.set_xlabel("wealth x", color="#e2e8f0", fontsize=8) ax_h.set_ylabel("count", color="#e2e8f0", fontsize=8) ax_h.tick_params(colors="#e2e8f0", labelsize=7) ax_h.grid(linestyle=':', alpha=0.2) fig_h.tight_layout() display(fig_h, target="hist_plot") plt.close(fig_h) # --- Cumulative Wealth Plot (NEW) --- plot_cumulative_wealth(x, idx) def play_step(*args, **kwargs): slider = document.getElementById("timestep-slider") maxv = int(slider.max) cur = int(slider.value) nxt = cur + 1 if nxt > maxv: nxt = 0 slider.value = str(nxt) document.getElementById("current-step").innerText = str(nxt) show_frame_from_index(nxt) def start_play(event=None): global PLAY_HANDLE, PLAY_CB if not WEALTH_HISTORY: return if PLAY_HANDLE is not None: return interval_ms = get_play_interval_ms() PLAY_CB = create_proxy(play_step) PLAY_HANDLE = window.setInterval(PLAY_CB, interval_ms) def stop_play(event=None): global PLAY_HANDLE, PLAY_CB if PLAY_HANDLE is not None: window.clearInterval(PLAY_HANDLE) PLAY_HANDLE = None if PLAY_CB is not None: PLAY_CB.destroy() PLAY_CB = None