Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions pkg/handler/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (ch *ConversationsHandler) ConversationsAddMessageHandler(ctx context.Conte
}
ch.logger.Debug("Fetched conversation history", zap.Int("message_count", len(history.Messages)))

messages := ch.convertMessagesFromHistory(history.Messages, historyParams.ChannelID, false)
messages := ch.convertMessagesFromHistory(ctx, history.Messages, historyParams.ChannelID, false)
return marshalMessagesToCSV(messages)
}

Expand Down Expand Up @@ -539,7 +539,7 @@ func (ch *ConversationsHandler) ConversationsHistoryHandler(ctx context.Context,

ch.logger.Debug("Fetched conversation history", zap.Int("message_count", len(history.Messages)))

messages := ch.convertMessagesFromHistory(history.Messages, params.channel, params.activity)
messages := ch.convertMessagesFromHistory(ctx, history.Messages, params.channel, params.activity)

if len(messages) > 0 && history.HasMore {
messages[len(messages)-1].Cursor = history.ResponseMetaData.NextCursor
Expand Down Expand Up @@ -578,7 +578,7 @@ func (ch *ConversationsHandler) ConversationsRepliesHandler(ctx context.Context,
}
ch.logger.Debug("Fetched conversation replies", zap.Int("count", len(replies)))

messages := ch.convertMessagesFromHistory(replies, params.channel, params.activity)
messages := ch.convertMessagesFromHistory(ctx, replies, params.channel, params.activity)
if len(messages) > 0 && hasMore {
messages[len(messages)-1].Cursor = nextCursor
}
Expand All @@ -588,7 +588,7 @@ func (ch *ConversationsHandler) ConversationsRepliesHandler(ctx context.Context,
func (ch *ConversationsHandler) ConversationsSearchHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
ch.logger.Debug("ConversationsSearchHandler called", zap.Any("params", request.Params))

params, err := ch.parseParamsToolSearch(request)
params, err := ch.parseParamsToolSearch(ctx, request)
if err != nil {
ch.logger.Error("Failed to parse search params", zap.Error(err))
return nil, err
Expand All @@ -609,7 +609,7 @@ func (ch *ConversationsHandler) ConversationsSearchHandler(ctx context.Context,
}
ch.logger.Debug("Search completed", zap.Int("matches", len(messagesRes.Matches)))

messages := ch.convertMessagesFromSearch(messagesRes.Matches)
messages := ch.convertMessagesFromSearch(ctx, messagesRes.Matches)
if len(messages) > 0 && messagesRes.Pagination.Page < messagesRes.Pagination.PageCount {
nextCursor := fmt.Sprintf("page:%d", messagesRes.Pagination.Page+1)
messages[len(messages)-1].Cursor = base64.StdEncoding.EncodeToString([]byte(nextCursor))
Expand Down Expand Up @@ -897,7 +897,7 @@ func (ch *ConversationsHandler) processClientCountsResponse(ctx context.Context,
unreadChannels[i].UnreadCount = len(history.Messages)

// Convert messages
channelMessages := ch.convertMessagesFromHistory(history.Messages, unreadChannels[i].ChannelName, false)
channelMessages := ch.convertMessagesFromHistory(ctx, history.Messages, unreadChannels[i].ChannelName, false)
allMessages = append(allMessages, channelMessages...)
}

Expand Down Expand Up @@ -1023,7 +1023,7 @@ func (ch *ConversationsHandler) getUnreadsViaConversationsInfo(ctx context.Conte
continue
}

channelMessages := ch.convertMessagesFromHistory(history.Messages, uc.ChannelName, false)
channelMessages := ch.convertMessagesFromHistory(ctx, history.Messages, uc.ChannelName, false)
allMessages = append(allMessages, channelMessages...)
}

Expand Down Expand Up @@ -1463,8 +1463,8 @@ func (ch *ConversationsHandler) resolveChannelID(ctx context.Context, channel st
return channelsMaps.Channels[chn].ID, nil
}

func (ch *ConversationsHandler) convertMessagesFromHistory(slackMessages []slack.Message, channel string, includeActivity bool) []Message {
usersMap := ch.apiProvider.ProvideUsersMap()
func (ch *ConversationsHandler) convertMessagesFromHistory(ctx context.Context, slackMessages []slack.Message, channel string, includeActivity bool) []Message {
resolver := ch.newUserResolver(ctx)
var messages []Message
warn := false

Expand All @@ -1473,7 +1473,7 @@ func (ch *ConversationsHandler) convertMessagesFromHistory(slackMessages []slack
continue
}

userName, realName, ok := getUserInfo(msg.User, usersMap.Users)
userName, realName, ok := resolver.resolve(msg.User)

if !ok && msg.SubType == "bot_message" {
userName, realName, ok = getBotInfo(msg.Username)
Expand Down Expand Up @@ -1539,17 +1539,19 @@ func (ch *ConversationsHandler) convertMessagesFromHistory(slackMessages []slack
return messages
}

func (ch *ConversationsHandler) convertMessagesFromSearch(slackMessages []slack.SearchMessage) []Message {
usersMap := ch.apiProvider.ProvideUsersMap()
func (ch *ConversationsHandler) convertMessagesFromSearch(ctx context.Context, slackMessages []slack.SearchMessage) []Message {
resolver := ch.newUserResolver(ctx)
var messages []Message
warn := false

for _, msg := range slackMessages {
userName, realName, ok := getUserInfo(msg.User, usersMap.Users)
userName, realName, ok := resolver.resolve(msg.User)

if !ok && msg.User == "" && msg.Username != "" {
userName, realName, ok = getBotInfo(msg.Username)
} else if !ok {
}

if !ok {
warn = true
}

Expand Down Expand Up @@ -1867,7 +1869,7 @@ func (ch *ConversationsHandler) parseParamsToolMark(request mcp.CallToolRequest)
ts: ts,
}, nil
}
func (ch *ConversationsHandler) parseParamsToolSearch(req mcp.CallToolRequest) (*searchParams, error) {
func (ch *ConversationsHandler) parseParamsToolSearch(ctx context.Context, req mcp.CallToolRequest) (*searchParams, error) {
rawQuery := strings.TrimSpace(req.GetString("search_query", ""))
freeText, filters := splitQuery(rawQuery)

Expand All @@ -1882,23 +1884,23 @@ func (ch *ConversationsHandler) parseParamsToolSearch(req mcp.CallToolRequest) (
}
addFilter(filters, "in", f)
} else if im := req.GetString("filter_in_im_or_mpim", ""); im != "" {
f, err := ch.paramFormatUser(im)
f, err := ch.paramFormatUser(ctx, im)
if err != nil {
ch.logger.Error("Invalid IM/MPIM filter", zap.String("filter", im), zap.Error(err))
return nil, err
}
addFilter(filters, "in", f)
}
if with := req.GetString("filter_users_with", ""); with != "" {
f, err := ch.paramFormatUser(with)
f, err := ch.paramFormatUser(ctx, with)
if err != nil {
ch.logger.Error("Invalid with-user filter", zap.String("filter", with), zap.Error(err))
return nil, err
}
addFilter(filters, "with", f)
}
if from := req.GetString("filter_users_from", ""); from != "" {
f, err := ch.paramFormatUser(from)
f, err := ch.paramFormatUser(ctx, from)
if err != nil {
ch.logger.Error("Invalid from-user filter", zap.String("filter", from), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -1965,13 +1967,20 @@ func isSlackUserIDPrefix(s string) bool {
return strings.HasPrefix(s, "U") || strings.HasPrefix(s, "W")
}

func (ch *ConversationsHandler) paramFormatUser(raw string) (string, error) {
func (ch *ConversationsHandler) paramFormatUser(ctx context.Context, raw string) (string, error) {
users := ch.apiProvider.ProvideUsersMap()
raw = strings.TrimSpace(raw)
if isSlackUserIDPrefix(raw) {
u, ok := users.Users[raw]
if !ok {
return "", fmt.Errorf("user %q not found", raw)
// Targeted fetch: single users.info call instead of full cache rebuild
patched, err := ch.apiProvider.PatchUser(ctx, raw)
if err != nil {
ch.logger.Debug("Targeted user fetch failed, user not found",
zap.String("user_id", raw), zap.Error(err))
return "", fmt.Errorf("user %q not found", raw)
}
return fmt.Sprintf("<@%s>", patched.ID), nil
}
return fmt.Sprintf("<@%s>", u.ID), nil
}
Expand Down Expand Up @@ -2022,6 +2031,42 @@ func getUserInfo(userID string, usersMap map[string]slack.User) (userName, realN
return userID, userID, false
}

// userResolver resolves user IDs to names, fetching unknown users from the
// Slack API on demand. It caches the snapshot locally and remembers which IDs
// it already tried to fetch, so a user that doesn't exist in Slack is only
// looked up once per batch rather than once per message.
type userResolver struct {
apiProvider *provider.ApiProvider
ctx context.Context
usersMap *provider.UsersCache
attemptedIDs map[string]bool
}

func (ch *ConversationsHandler) newUserResolver(ctx context.Context) *userResolver {
return &userResolver{
apiProvider: ch.apiProvider,
ctx: ctx,
usersMap: ch.apiProvider.ProvideUsersMap(),
attemptedIDs: make(map[string]bool),
}
}

func (r *userResolver) resolve(userID string) (userName, realName string, ok bool) {
if u, ok := r.usersMap.Users[userID]; ok {
return u.Name, u.RealName, true
}
if userID == "" || r.attemptedIDs[userID] {
return userID, userID, false
}
r.attemptedIDs[userID] = true
patched, err := r.apiProvider.PatchUser(r.ctx, userID)
if err != nil {
return userID, userID, false
}
r.usersMap = r.apiProvider.ProvideUsersMap()
return patched.Name, patched.RealName, true
}

func getBotInfo(botID string) (userName, realName string, ok bool) {
return botID, botID, true
}
Expand Down
49 changes: 44 additions & 5 deletions pkg/provider/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ type MCPSlackClient struct {
authResponse *slack.AuthTestResponse
authProvider auth.Provider

isEnterprise bool
isOAuth bool
isBotToken bool
edgeFailed bool // set when edge API fails; subsequent calls skip straight to standard API
teamEndpoint string
isEnterprise bool
isOAuth bool
isBotToken bool
edgeFailed bool // set when edge API fails; subsequent calls skip straight to standard API
teamEndpoint string
}

type ApiProvider struct {
Expand Down Expand Up @@ -760,6 +760,45 @@ func (ap *ApiProvider) ForceRefreshUsers(ctx context.Context) error {
return ap.refreshUsersInternal(ctx, true)
}

// PatchUser fetches a single user by ID from the Slack API and adds them to
// the in-memory users snapshot. This is much cheaper than a full cache rebuild
// for a single cache miss (O(1) API call vs O(all users)).
// Disk persistence is skipped — the next full refresh will persist the entry.
func (ap *ApiProvider) PatchUser(ctx context.Context, userID string) (*slack.User, error) {
usersInfo, err := ap.client.GetUsersInfo(userID)
if err != nil {
ap.logger.Warn("Failed to fetch user for cache patch", zap.String("user_id", userID), zap.Error(err))
return nil, err
}
if usersInfo == nil || len(*usersInfo) == 0 {
ap.logger.Debug("User not found via API", zap.String("user_id", userID))
return nil, errors.New("user not found")
}

user := (*usersInfo)[0]
current := ap.usersSnapshot.Load()

newSnapshot := &UsersCache{
Users: make(map[string]slack.User, len(current.Users)+1),
UsersInv: make(map[string]string, len(current.UsersInv)+1),
}
for k, v := range current.Users {
newSnapshot.Users[k] = v
}
for k, v := range current.UsersInv {
newSnapshot.UsersInv[k] = v
}
newSnapshot.Users[user.ID] = user
newSnapshot.UsersInv[user.Name] = user.ID

ap.usersSnapshot.Store(newSnapshot)
ap.logger.Debug("Patched user into cache",
zap.String("user_id", user.ID),
zap.String("user_name", user.Name))

return &user, nil
}

func (ap *ApiProvider) refreshUsersInternal(ctx context.Context, force bool) error {
ap.usersMu.Lock()
defer ap.usersMu.Unlock()
Expand Down
Loading