Blob Flow - 2025-12-15
Analysis of blob flow through validators, builders, and relays on Ethereum mainnet.
Show code
import pandas as pd
import plotly.graph_objects as go
import plotly.colors as pc
from loaders import load_parquet
MIN_BLOCKS = 10 # Minimum blocks for entity filtering
# Margin for Sankey node positioning (prevents cutoff at edges)
Y_MARGIN = 0.02
def calculate_y_positions(node_weights: list[float], pad: float = 0.02) -> list[float]:
"""Calculate y positions for nodes based on their weights (flow values).
Positions nodes so they don't overlap, accounting for their heights which
are proportional to their weights.
"""
if len(node_weights) == 0:
return []
if len(node_weights) == 1:
return [0.5]
total_weight = sum(node_weights)
n = len(node_weights)
total_pad = pad * (n - 1)
available_space = 1.0 - 2 * Y_MARGIN - total_pad
positions = []
current_y = Y_MARGIN
for i, weight in enumerate(node_weights):
# Node height is proportional to its weight
node_height = (weight / total_weight) * available_space
# Position is at center of node
pos = current_y + node_height / 2
positions.append(pos)
# Move to next position (after this node + padding)
current_y += node_height + pad
return positions
# Sankey domain to leave room at edges
SANKEY_DOMAIN = dict(x=[0, 1], y=[0.01, 0.99])
Show code
# Load blob flow data
df_proposer_blobs = load_parquet("proposer_blobs", target_date)
# Fill missing values
df_proposer_blobs["proposer_entity"] = df_proposer_blobs["proposer_entity"].fillna("Unknown")
df_proposer_blobs["winning_relay"] = df_proposer_blobs["winning_relay"].fillna("Local/Unknown")
print(f"Total blocks: {len(df_proposer_blobs)}")
print(f"Unique proposer entities: {df_proposer_blobs['proposer_entity'].nunique()}")
print(f"Unique relays: {df_proposer_blobs['winning_relay'].nunique()}")
Proposer Entity -> Blob Count¶
Sankey diagram showing how different staking entities (pools, solo stakers) distribute their blocks across blob counts. Wider flows indicate more blocks. Entities with fewer than 10 blocks are filtered out.
Show code
# Calculate block counts per entity
entity_block_counts = df_proposer_blobs.groupby("proposer_entity").size()
# Get entities that meet the threshold
valid_entities = entity_block_counts[entity_block_counts >= MIN_BLOCKS].index
# Filter the dataframe
df_filtered = df_proposer_blobs[df_proposer_blobs["proposer_entity"].isin(valid_entities)]
entity_blob_flow = (
df_filtered.groupby(["proposer_entity", "blob_count"])
.size()
.reset_index(name="block_count")
)
# Sort entities by total block count (descending)
entity_totals = entity_blob_flow.groupby("proposer_entity")["block_count"].sum()
entities = entity_totals.sort_values(ascending=False).index.tolist()
blob_counts = sorted(entity_blob_flow["blob_count"].unique(), reverse=True) # Descending
# Create node labels: entities + blob counts (blob counts sorted descending)
entity_nodes = [f"E:{e}" for e in entities]
blob_nodes = [f"{int(bc)} blobs" for bc in blob_counts]
all_nodes = entity_nodes + blob_nodes
# Create mapping from name to index
node_map = {name: idx for idx, name in enumerate(all_nodes)}
# Calculate node weights (total flow through each node)
entity_weights = [entity_totals[e] for e in entities]
blob_totals = entity_blob_flow.groupby("blob_count")["block_count"].sum()
blob_weights = [blob_totals.get(bc, 0) for bc in blob_counts]
n_entities = len(entity_nodes)
n_blobs = len(blob_nodes)
# Create color gradient for blob nodes (higher blob count = darker)
max_blob = max(blob_counts)
min_blob = min(blob_counts)
blob_colors = [
pc.sample_colorscale("Amp", (bc - min_blob) / (max_blob - min_blob) if max_blob > min_blob else 0.5)[0]
for bc in blob_counts
]
entity_colors = [pc.qualitative.Plotly[i % len(pc.qualitative.Plotly)] for i in range(n_entities)]
# Calculate y positions based on node weights to prevent overlap
entity_y = calculate_y_positions(entity_weights)
blob_y = calculate_y_positions(blob_weights)
x_pos = [0.01] * n_entities + [0.99] * n_blobs
y_pos = entity_y + blob_y
sources = []
targets = []
values = []
for _, row in entity_blob_flow.iterrows():
e_node = f"E:{row['proposer_entity']}"
bc_node = f"{int(row['blob_count'])} blobs"
if e_node in node_map and bc_node in node_map:
sources.append(node_map[e_node])
targets.append(node_map[bc_node])
values.append(row["block_count"])
fig = go.Figure(
data=[
go.Sankey(
arrangement="snap",
domain=SANKEY_DOMAIN,
node=dict(
pad=15,
thickness=20,
line=dict(color="black", width=0.5),
label=all_nodes,
x=x_pos,
y=y_pos,
color=entity_colors + blob_colors,
),
link=dict(source=sources, target=targets, value=values),
)
]
)
fig.update_layout(
title="Blob flow: Proposer Entity -> Blob Count",
font_size=12,
width=800,
height=6000,
margin=dict(t=50, b=50, l=10, r=10),
)
fig.show()
Relay -> Blob Count¶
Shows which MEV-boost relays are associated with different blob counts. Reveals whether certain relays tend to produce blocks with more or fewer blobs.
Show code
relay_blob_flow = (
df_proposer_blobs.groupby(["winning_relay", "blob_count"])
.size()
.reset_index(name="block_count")
)
# Sort relays by total block count (descending)
relay_totals = relay_blob_flow.groupby("winning_relay")["block_count"].sum()
relays = relay_totals.sort_values(ascending=False).index.tolist()
blob_counts = sorted(relay_blob_flow["blob_count"].unique(), reverse=True) # Descending
# Create node labels: relays + blob counts (blob counts sorted descending)
relay_nodes = [f"R:{r}" for r in relays]
blob_nodes = [f"{int(bc)} blobs" for bc in blob_counts]
all_nodes = relay_nodes + blob_nodes
# Create mapping from name to index
node_map = {name: idx for idx, name in enumerate(all_nodes)}
# Calculate node weights
relay_weights = [relay_totals[r] for r in relays]
blob_totals = relay_blob_flow.groupby("blob_count")["block_count"].sum()
blob_weights = [blob_totals.get(bc, 0) for bc in blob_counts]
n_relays = len(relay_nodes)
n_blobs = len(blob_nodes)
# Create color gradient for blob nodes (higher blob count = darker)
max_blob = max(blob_counts)
min_blob = min(blob_counts)
blob_colors = [
pc.sample_colorscale("Amp", (bc - min_blob) / (max_blob - min_blob) if max_blob > min_blob else 0.5)[0]
for bc in blob_counts
]
relay_colors = [pc.qualitative.Pastel[i % len(pc.qualitative.Pastel)] for i in range(n_relays)]
# Calculate y positions based on node weights
relay_y = calculate_y_positions(relay_weights)
blob_y = calculate_y_positions(blob_weights)
x_pos = [0.01] * n_relays + [0.99] * n_blobs
y_pos = relay_y + blob_y
sources = []
targets = []
values = []
for _, row in relay_blob_flow.iterrows():
r_node = f"R:{row['winning_relay']}"
bc_node = f"{int(row['blob_count'])} blobs"
if r_node in node_map and bc_node in node_map:
sources.append(node_map[r_node])
targets.append(node_map[bc_node])
values.append(row["block_count"])
fig = go.Figure(
data=[
go.Sankey(
arrangement="snap",
domain=SANKEY_DOMAIN,
node=dict(
pad=15,
thickness=20,
line=dict(color="black", width=0.5),
label=all_nodes,
x=x_pos,
y=y_pos,
color=relay_colors + blob_colors,
),
link=dict(source=sources, target=targets, value=values),
)
]
)
fig.update_layout(
title="Blob flow: Relay -> Blob Count",
font_size=12,
height=1200,
margin=dict(t=50, b=50, l=10, r=10),
)
fig.show()
Proposer Entity -> Relay¶
Maps which staking entities use which relays. Shows the relationship between validators and the MEV-boost relay infrastructure they rely on.
Show code
# Calculate block counts per entity
entity_block_counts = df_proposer_blobs.groupby("proposer_entity").size()
valid_entities = entity_block_counts[entity_block_counts >= MIN_BLOCKS].index
# Filter the dataframe
df_filtered = df_proposer_blobs[df_proposer_blobs["proposer_entity"].isin(valid_entities)]
proposer_relay_flow = (
df_filtered.groupby(["proposer_entity", "winning_relay"])
.size()
.reset_index(name="block_count")
)
# Sort entities by total block count (descending)
entity_totals = proposer_relay_flow.groupby("proposer_entity")["block_count"].sum()
entities = entity_totals.sort_values(ascending=False).index.tolist()
# Sort relays by total block count (descending)
relay_totals = proposer_relay_flow.groupby("winning_relay")["block_count"].sum()
relays = relay_totals.sort_values(ascending=False).index.tolist()
# Create node labels: entities + relays
entity_nodes = [f"E:{e}" for e in entities]
relay_nodes = [f"R:{r}" for r in relays]
all_nodes = entity_nodes + relay_nodes
# Create mapping from name to index
node_map = {name: idx for idx, name in enumerate(all_nodes)}
# Calculate node weights
entity_weights = [entity_totals[e] for e in entities]
relay_weights = [relay_totals[r] for r in relays]
n_entities = len(entity_nodes)
n_relays = len(relay_nodes)
entity_colors = [pc.qualitative.Plotly[i % len(pc.qualitative.Plotly)] for i in range(n_entities)]
relay_colors = [pc.qualitative.Pastel[i % len(pc.qualitative.Pastel)] for i in range(n_relays)]
# Calculate y positions based on node weights
entity_y = calculate_y_positions(entity_weights)
relay_y = calculate_y_positions(relay_weights)
x_pos = [0.01] * n_entities + [0.99] * n_relays
y_pos = entity_y + relay_y
sources = []
targets = []
values = []
for _, row in proposer_relay_flow.iterrows():
e_node = f"E:{row['proposer_entity']}"
r_node = f"R:{row['winning_relay']}"
if e_node in node_map and r_node in node_map:
sources.append(node_map[e_node])
targets.append(node_map[r_node])
values.append(row["block_count"])
fig = go.Figure(
data=[
go.Sankey(
arrangement="snap",
domain=SANKEY_DOMAIN,
node=dict(
pad=15,
thickness=20,
line=dict(color="black", width=0.5),
label=all_nodes,
x=x_pos,
y=y_pos,
color=entity_colors + relay_colors,
),
link=dict(source=sources, target=targets, value=values),
)
]
)
fig.update_layout(
title="Blob flow: Proposer Entity -> Relay",
font_size=12,
width=800,
height=6000,
margin=dict(t=50, b=50, l=10, r=10),
)
fig.show()
Proposer Entity -> Relay -> Blob Count¶
Complete three-stage flow: from staking entities through relays to final blob counts. This comprehensive view shows the full pipeline of how blobs flow through the Ethereum block production ecosystem.
Show code
# Calculate block counts per entity
entity_block_counts = df_proposer_blobs.groupby("proposer_entity").size()
valid_entities = entity_block_counts[entity_block_counts >= MIN_BLOCKS].index
# Filter the dataframe
df_filtered = df_proposer_blobs[df_proposer_blobs["proposer_entity"].isin(valid_entities)]
# Aggregate flows: entity -> relay
entity_relay_flow = (
df_filtered.groupby(["proposer_entity", "winning_relay"])
.size()
.reset_index(name="block_count")
)
# Aggregate flows: relay -> blob_count
relay_blob_flow = (
df_filtered.groupby(["winning_relay", "blob_count"])
.size()
.reset_index(name="block_count")
)
# Sort entities by total block count (descending)
entity_totals = entity_relay_flow.groupby("proposer_entity")["block_count"].sum()
entities = entity_totals.sort_values(ascending=False).index.tolist()
# Sort relays by total block count (descending)
relay_totals = relay_blob_flow.groupby("winning_relay")["block_count"].sum()
relays = relay_totals.sort_values(ascending=False).index.tolist()
blob_counts = sorted(df_filtered["blob_count"].unique(), reverse=True) # Descending
# Create node labels: entities + relays + blob counts
entity_nodes = [f"E:{e}" for e in entities]
relay_nodes = [f"R:{r}" for r in relays]
blob_nodes = [f"{int(bc)} blobs" for bc in blob_counts]
all_nodes = entity_nodes + relay_nodes + blob_nodes
# Create mapping from name to index
node_map = {name: idx for idx, name in enumerate(all_nodes)}
n_entities = len(entity_nodes)
n_relays = len(relay_nodes)
n_blobs = len(blob_nodes)
# Calculate node weights
entity_weights = [entity_totals[e] for e in entities]
relay_weights = [relay_totals[r] for r in relays]
blob_totals = relay_blob_flow.groupby("blob_count")["block_count"].sum()
blob_weights = [blob_totals.get(bc, 0) for bc in blob_counts]
entity_colors = [pc.qualitative.Plotly[i % len(pc.qualitative.Plotly)] for i in range(n_entities)]
relay_colors = [pc.qualitative.Pastel[i % len(pc.qualitative.Pastel)] for i in range(n_relays)]
max_blob = max(blob_counts)
min_blob = min(blob_counts)
blob_colors = [
pc.sample_colorscale("Amp", (bc - min_blob) / (max_blob - min_blob) if max_blob > min_blob else 0.5)[0]
for bc in blob_counts
]
# Calculate y positions based on node weights
entity_y = calculate_y_positions(entity_weights)
relay_y = calculate_y_positions(relay_weights)
blob_y = calculate_y_positions(blob_weights)
x_pos = [0.01] * n_entities + [0.5] * n_relays + [0.99] * n_blobs
y_pos = entity_y + relay_y + blob_y
sources = []
targets = []
values = []
# Entity -> Relay links
for _, row in entity_relay_flow.iterrows():
e_node = f"E:{row['proposer_entity']}"
r_node = f"R:{row['winning_relay']}"
if e_node in node_map and r_node in node_map:
sources.append(node_map[e_node])
targets.append(node_map[r_node])
values.append(row["block_count"])
# Relay -> Blob count links
for _, row in relay_blob_flow.iterrows():
r_node = f"R:{row['winning_relay']}"
bc_node = f"{int(row['blob_count'])} blobs"
if r_node in node_map and bc_node in node_map:
sources.append(node_map[r_node])
targets.append(node_map[bc_node])
values.append(row["block_count"])
fig = go.Figure(
data=[
go.Sankey(
arrangement="snap",
domain=SANKEY_DOMAIN,
node=dict(
pad=15,
thickness=30,
line=dict(color="black", width=0.5),
label=all_nodes,
x=x_pos,
y=y_pos,
color=entity_colors + relay_colors + blob_colors,
),
link=dict(source=sources, target=targets, value=values),
)
]
)
fig.update_layout(
title=f"Blob flow: Proposer Entity -> Relay -> Blob Count (min {MIN_BLOCKS} blocks)",
font_size=12,
width=800,
height=6000,
margin=dict(t=50, b=50, l=10, r=10),
)
fig.show()