Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
WatchError,
)
from redis.lock import Lock
from redis.local_cache import LocalCache
from redis.maint_notifications import (
MaintNotificationsConfig,
OSSMaintNotificationsHandler,
Expand Down Expand Up @@ -290,6 +291,8 @@ def __init__(
oss_cluster_maint_notifications_handler: Optional[
OSSMaintNotificationsHandler
] = None,
local_cache_ttl: int = 0,
local_cache_max_size: int = 0,
) -> None:
"""
Initialize a new Redis client.
Expand Down Expand Up @@ -342,6 +345,16 @@ def __init__(
`redis.maint_notifications.OSSMaintNotificationsHandler` for details.
Only supported with RESP3
Argument is ignored when connection_pool is provided.
local_cache_ttl:
**Experimental.** Local memory cache TTL in milliseconds.
If both `local_cache_ttl` and `local_cache_max_size` are greater than 0,
the local memory cache layer will be activated for GET commands.
This is an experimental feature for reducing hot key read latency.
local_cache_max_size:
**Experimental.** Maximum number of keys to store in local memory cache.
If both `local_cache_ttl` and `local_cache_max_size` are greater than 0,
the local memory cache layer will be activated.
LRU eviction is used when cache exceeds this size.
"""
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
Expand Down Expand Up @@ -491,6 +504,10 @@ def __init__(
else:
self.response_callbacks.update(_RedisCallbacksRESP2)

self._local_cache: Optional[LocalCache] = None
if local_cache_ttl > 0 and local_cache_max_size > 0:
self._local_cache = LocalCache(local_cache_ttl, local_cache_max_size)

def __repr__(self) -> str:
return (
f"<{type(self).__module__}.{type(self).__name__}"
Expand Down Expand Up @@ -774,6 +791,31 @@ def _execute_command(self, *args, **options):
"""Execute a command and return a parsed response"""
pool = self.connection_pool
command_name = args[0]
conn = None

if self._local_cache is not None:
if isinstance(command_name, bytes):
try:
command_upper = command_name.decode('utf-8').upper()
except UnicodeDecodeError:
command_upper = command_name.decode('latin-1').upper()
else:
command_upper = command_name.upper()

if command_upper == "GET" and len(args) >= 2:
key = args[1]
cached_value = self._local_cache.get(key)
if cached_value is not None:
return cached_value

elif command_upper in ("SET", "SETEX", "DEL") and len(args) >= 2:
if command_upper == "DEL":
for key in args[1:]:
self._local_cache.delete(key)
else:
key = args[1]
self._local_cache.delete(key)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many write commands don't invalidate the local cache

High Severity

Cache invalidation only covers SET, SETEX, and DEL, but many other write commands (INCR, DECRBY, APPEND, GETSET, GETDEL, MSET, SETRANGE, INCRBYFLOAT, etc.) all flow through execute_command and modify key values without invalidating the local cache. A sequence like r.get("counter") followed by r.incr("counter") then r.get("counter") will silently return the stale cached value, producing incorrect results with no indication of error.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8d310b6. Configure here.


conn = self.connection or pool.get_connection()

# Start timing for observability
Expand Down Expand Up @@ -805,6 +847,19 @@ def failure_callback(error, failure_count):
server_port=getattr(conn, "port", None),
db_namespace=str(conn.db),
)

if self._local_cache is not None:
if isinstance(command_name, bytes):
try:
command_upper = command_name.decode('utf-8').upper()
except UnicodeDecodeError:
command_upper = command_name.decode('latin-1').upper()
else:
command_upper = command_name.upper()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant command_upper computation after command execution

Low Severity

The command_upper variable is computed identically twice — first at the pre-execution cache lookup block, then again in the post-execution cache store block — under the same self._local_cache is not None guard. Since reaching the second block guarantees the first block already ran (the only early return is line 809, which exits the function entirely), command_upper is already in scope and can be reused directly.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8d310b6. Configure here.

if command_upper == "GET" and len(args) >= 2:
key = args[1]
self._local_cache.set(key, result)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cached None values waste space, never serve hits

High Severity

LocalCache.get() returns None for both cache misses and cached None values, but the caller at _execute_command uses if cached_value is not None to detect cache hits. When Redis GET returns None (key doesn't exist), it's stored in the cache via self._local_cache.set(key, result), but subsequent lookups always treat it as a miss. These None entries consume cache capacity and evict valid entries through LRU, degrading cache effectiveness. A sentinel object distinct from None is needed to differentiate "not in cache" from "cached None."

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8d310b6. Configure here.


return result
except Exception as e:
record_error_count(
Expand All @@ -824,7 +879,7 @@ def failure_callback(error, failure_count):
conn.connect()
if self._single_connection_client:
self.single_connection_lock.release()
if not self.connection:
if not self.connection and conn:
pool.release(conn)

def parse_response(self, connection, command_name, **options):
Expand Down
139 changes: 139 additions & 0 deletions redis/local_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import threading
import time
from collections import OrderedDict
from typing import Any, Optional


class LocalCache:
"""
线程安全的本地内存缓存,支持 TTL 和 LRU 淘汰策略。

此缓存用于降低对远端 Redis 的高频读取延迟,特别适合 Hot Key 场景。

Attributes:
ttl_ms: 缓存条目存活时间(毫秒)
max_size: 缓存最大条目数量
_cache: 内部存储,使用 OrderedDict 实现 LRU
_lock: 线程锁
"""

def __init__(self, ttl_ms: int, max_size: int):
"""
初始化本地缓存。

Args:
ttl_ms: 缓存存活时间(毫秒),必须大于 0
max_size: 最大缓存条目数量,必须大于 0

Raises:
ValueError: 如果 ttl_ms 或 max_size 小于等于 0
"""
if ttl_ms <= 0:
raise ValueError("ttl_ms must be greater than 0")
if max_size <= 0:
raise ValueError("max_size must be greater than 0")

self.ttl_ms = ttl_ms
self.max_size = max_size
self._cache: OrderedDict = OrderedDict()
self._lock = threading.RLock()

def get(self, key: Any) -> Optional[Any]:
"""
从缓存中获取值。

如果缓存命中且未过期,返回对应值并更新 LRU 顺序。
如果缓存未命中或已过期,返回 None。

Args:
key: 缓存键

Returns:
缓存值或 None
"""
with self._lock:
if key not in self._cache:
return None

entry = self._cache[key]
current_time = time.time() * 1000

if current_time - entry["timestamp"] > self.ttl_ms:
del self._cache[key]
return None

self._cache.move_to_end(key)
return entry["value"]

def set(self, key: Any, value: Any) -> None:
"""
设置缓存值。

如果缓存已达到最大容量,会淘汰最久未使用的条目(LRU)。

Args:
key: 缓存键
value: 缓存值
"""
with self._lock:
if key in self._cache:
del self._cache[key]
elif len(self._cache) >= self.max_size:
self._cache.popitem(last=False)

self._cache[key] = {
"value": value,
"timestamp": time.time() * 1000
}

def delete(self, key: Any) -> bool:
"""
删除缓存条目。

Args:
key: 缓存键

Returns:
如果键存在并被删除返回 True,否则返回 False
"""
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False

def clear(self) -> int:
"""
清空所有缓存条目。

Returns:
被清除的条目数量
"""
with self._lock:
count = len(self._cache)
self._cache.clear()
return count

def __len__(self) -> int:
"""
返回缓存中的条目数量。

注意:此方法不会清理过期条目,仅返回当前存储的条目数。

Returns:
缓存中的条目数量
"""
with self._lock:
return len(self._cache)

def __contains__(self, key: Any) -> bool:
"""
检查键是否存在且未过期。

Args:
key: 缓存键

Returns:
如果键存在且未过期返回 True,否则返回 False
"""
return self.get(key) is not None