Skip to content

DataStore memory usage exceeds pandas for analytics operations on large datasets #552

@wudidapaopao

Description

@wudidapaopao

Summary

When running typical pandas-style analytics (load, filter, groupby, join, concat, etc.) on 10M-row Parquet files, DataStore sometimes uses more peak memory than plain pandas .

Benchmark results

Each test runs in an isolated subprocess. Peak memory measured via VmHWM from /proc/self/status.

Test Operation pandas peak DataStore peak DS / pd
T01 Load parquet (10M rows) 1918 MB 1953 MB 1.02x
T02 Filter + column select 2699 MB 2699 MB 1.00x
T03 GroupBy single key 2153 MB 2167 MB 1.01x
T05 Sort + top-N 2776 MB 3753 MB 1.35x
T06 Derived columns (assign) 2308 MB 3821 MB 1.66x
T07 Two-table merge/join 2487 MB 4506 MB 1.81x
T08 Three-table join + agg 2685 MB 4655 MB 1.73x
T12 Filter→join→groupby→sort 2729 MB 4399 MB 1.61x
T14 fillna + conditional + groupby 2175 MB 3299 MB 1.52x
T15 concat + groupby 3000 MB 4367 MB 1.46x

Especially for join/merge (T07, T08, T12) and concat (T15), DataStore peak memory is significantly higher than pandas.

Reproduction

Step 1: Generate test data (~205 MB on disk, ~1.5 GB in memory)

# generate_data.py
import numpy as np, pandas as pd, os
np.random.seed(42)
DATA_DIR = "./bench_data"
os.makedirs(DATA_DIR, exist_ok=True)

# Products (100K rows)
n_products = 100_000
products = pd.DataFrame({
    'product_id': np.arange(1, n_products + 1),
    'product_name': [f"Product_{i:06d}" for i in range(1, n_products + 1)],
    'category': np.random.choice(['Electronics','Clothing','Home','Sports','Books',
                                   'Toys','Food','Health','Auto','Office'], n_products),
    'price': np.round(np.random.lognormal(3.5, 1.2, n_products), 2).clip(0.99, 9999.99),
})
products.to_parquet(f"{DATA_DIR}/products.parquet", index=False)

# Orders (10M rows)
n_orders = 10_000_000
orders = pd.DataFrame({
    'order_id': np.arange(1, n_orders + 1),
    'customer_id': np.random.randint(1, 1_000_001, n_orders),
    'product_id': np.random.randint(1, n_products + 1, n_orders),
    'order_date': pd.to_datetime('2022-01-01') + pd.to_timedelta(
        np.random.randint(0, 365*3, n_orders), unit='D'),
    'quantity': np.random.randint(1, 20, n_orders),
    'unit_price': np.round(np.random.lognormal(3.5, 1.2, n_orders), 2).clip(0.99, 9999.99),
    'discount_pct': np.random.choice([0,0,0,0,5,10,15,20,25,30], n_orders).astype(float),
    'status': np.random.choice(['completed','shipped','pending','cancelled','returned'],
                                n_orders, p=[0.60,0.15,0.10,0.10,0.05]),
    'channel': np.random.choice(['web','mobile','app','store','phone'],
                                 n_orders, p=[0.35,0.25,0.20,0.15,0.05]),
    'shipping_cost': np.round(np.random.exponential(8.0, n_orders), 2),
})
orders.to_parquet(f"{DATA_DIR}/orders.parquet", index=False)
print("Done. Files:", os.listdir(DATA_DIR))

Step 2: Run benchmark

# bench_memory.py
"""Compare peak memory: DataStore vs pandas for common operations."""
import subprocess, sys, os, json, textwrap

DATA_DIR = "./bench_data"

TESTS = {
    "T01_load": """
df = lib.read_parquet(f"{data}/orders.parquet")
n = len(df)
""",
    "T02_filter": """
df = lib.read_parquet(f"{data}/orders.parquet")
result = df[df['status'] == 'completed'][['order_id','customer_id','unit_price','quantity']]
n = len(result)
""",
    "T03_groupby": """
df = lib.read_parquet(f"{data}/orders.parquet")
df = df.assign(revenue=df['unit_price'] * df['quantity'])
result = df.groupby('status')['revenue'].sum()
n = len(result)
""",
    "T06_assign": """
df = lib.read_parquet(f"{data}/orders.parquet")
df = df.assign(
    revenue=df['unit_price'] * df['quantity'],
    net_revenue=df['unit_price'] * df['quantity'] * (1 - df['discount_pct'] / 100),
    total_cost=df['unit_price'] * df['quantity'] + df['shipping_cost'],
)
n = len(df)
""",
    "T07_merge": """
orders = lib.read_parquet(f"{data}/orders.parquet")
products = lib.read_parquet(f"{data}/products.parquet")
result = orders.merge(products, on='product_id', how='left')
n = len(result)
""",
    "T15_concat_groupby": """
df = lib.read_parquet(f"{data}/orders.parquet")
sample = df.head(5_000_000)
big = lib.concat([sample, sample], ignore_index=True)
result = big.groupby(['status', 'channel'])['unit_price'].sum()
n = len(result)
""",
}

RUNNER = textwrap.dedent("""\
import os, sys
data = "{data}"
lib_name = sys.argv[1]
if lib_name == "pandas":
    import pandas as lib
    lib.concat = lib.concat
else:
    from datastore import DataStore as _DS
    class _Lib:
        def read_parquet(self, p): return _DS.from_file(p)
        def concat(self, objs, **kw): return objs[0].concat(objs, **kw)
    lib = _Lib()
{code}
# Read VmHWM
with open("/proc/self/status") as f:
    for line in f:
        if line.startswith("VmHWM"):
            peak_kb = int(line.split()[1])
            print(f"PEAK_MB={{peak_kb // 1024}}")
            break
""")

results = {}
for test_name, code in TESTS.items():
    results[test_name] = {}
    for lib_name in ["pandas", "datastore"]:
        script = RUNNER.format(data=DATA_DIR, code=code)
        proc = subprocess.run(
            [sys.executable, "-c", script, lib_name],
            capture_output=True, text=True, timeout=300,
        )
        peak = 0
        for line in proc.stdout.splitlines():
            if line.startswith("PEAK_MB="):
                peak = int(line.split("=")[1])
        results[test_name][lib_name] = peak
        status = "OK" if proc.returncode == 0 else "FAIL"
        print(f"  [{lib_name:10s}] {test_name:30s} | peak={peak}MB | {status}")

print("\n" + "=" * 70)
print(f"{'Test':<25s} {'pandas':>10s} {'datastore':>10s} {'ratio':>8s}")
print("-" * 70)
for t, r in results.items():
    pd_mb = r.get("pandas", 0)
    ds_mb = r.get("datastore", 0)
    ratio = f"{ds_mb/pd_mb:.2f}x" if pd_mb > 0 else "N/A"
    flag = " ⚠" if ds_mb > pd_mb else ""
    print(f"{t:<25s} {pd_mb:>8d}MB {ds_mb:>8d}MB {ratio:>8s}{flag}")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions