Skip to content

Commit cc9d5ad

Browse files
authored
Merge pull request #1532 from kuu-26/fix/replyto-fk-violation
fix: FK violation when replying to deleted message (fk_replyto)
2 parents c1db854 + 76ba3fb commit cc9d5ad

1 file changed

Lines changed: 105 additions & 96 deletions

File tree

Lines changed: 105 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
using System.Collections.Concurrent;
1+
using System.Collections.Concurrent;
22

33
namespace Valour.Server.Workers
44
{
5-
public class PlanetMessageWorker : IHostedService, IDisposable
6-
{
7-
private readonly IServiceProvider _serviceProvider;
8-
private readonly ILogger<PlanetMessageWorker> _logger;
9-
5+
public class PlanetMessageWorker : IHostedService, IDisposable
6+
{
7+
private readonly IServiceProvider _serviceProvider;
8+
private readonly ILogger<PlanetMessageWorker> _logger;
9+
1010
// A queue of all messages that need to be processed
1111
private static readonly BlockingCollection<Message> MessageQueue = new(new ConcurrentQueue<Message>());
1212

@@ -21,29 +21,29 @@ public class PlanetMessageWorker : IHostedService, IDisposable
2121

2222
// Prevents deleted messages from being staged
2323
private static readonly ConcurrentDictionary<long, byte> BlockSet = new();
24-
25-
/// <summary>
26-
/// Holds the long-running queue task
24+
25+
/// <summary>
26+
/// Holds the long-running queue task
2727
/// </summary>
2828
private static Task _queueTask;
2929

3030
// Timer for executing timed tasks
3131
private Timer _timer;
3232
private int _isFlushing;
33-
34-
public PlanetMessageWorker(ILogger<PlanetMessageWorker> logger,
35-
IServiceProvider serviceProvider)
36-
{
37-
_logger = logger;
38-
_serviceProvider = serviceProvider;
39-
}
40-
41-
public static void AddToQueue(Message message)
42-
{
43-
if (message.PlanetId is null)
44-
{
45-
Console.WriteLine("[!!!] Tried to add a message to the planet message queue queue without a planet id");
46-
return;
33+
34+
public PlanetMessageWorker(ILogger<PlanetMessageWorker> logger,
35+
IServiceProvider serviceProvider)
36+
{
37+
_logger = logger;
38+
_serviceProvider = serviceProvider;
39+
}
40+
41+
public static void AddToQueue(Message message)
42+
{
43+
if (message.PlanetId is null)
44+
{
45+
Console.WriteLine("[!!!] Tried to add a message to the planet message queue queue without a planet id");
46+
return;
4747
}
4848

4949
QueuedMessages[message.Id] = message;
@@ -72,7 +72,7 @@ public static void RemoveFromQueue(Message message)
7272
BlockSet.TryRemove(message.Id, out _);
7373
}
7474
}
75-
75+
7676
public static Message GetStagedMessage(long messageId)
7777
{
7878
StagedMessages.TryGetValue(messageId, out var staged);
@@ -84,28 +84,28 @@ public static Message GetQueuedMessage(long messageId)
8484
QueuedMessages.TryGetValue(messageId, out var queued);
8585
return queued;
8686
}
87-
87+
8888
public static List<Message> GetStagedMessages(long channelId)
8989
{
9090
if (StagedChannelMessages.TryGetValue(channelId, out var stagedQueue))
9191
return stagedQueue.ToList();
9292

9393
return new List<Message>();
9494
}
95-
96-
public Task StartAsync(CancellationToken stoppingToken)
97-
{
98-
_logger.LogInformation("Starting Message Worker");
99-
100-
// Start the queue task
101-
_queueTask = Task.Run(ConsumeMessageQueue, stoppingToken);
102-
103-
_timer = new Timer(DoWork, null, TimeSpan.Zero,
104-
TimeSpan.FromSeconds(20));
105-
106-
return Task.CompletedTask;
107-
}
108-
95+
96+
public Task StartAsync(CancellationToken stoppingToken)
97+
{
98+
_logger.LogInformation("Starting Message Worker");
99+
100+
// Start the queue task
101+
_queueTask = Task.Run(ConsumeMessageQueue, stoppingToken);
102+
103+
_timer = new Timer(DoWork, null, TimeSpan.Zero,
104+
TimeSpan.FromSeconds(20));
105+
106+
return Task.CompletedTask;
107+
}
108+
109109
private async void DoWork(object state)
110110
{
111111
if (Interlocked.Exchange(ref _isFlushing, 1) == 1)
@@ -118,8 +118,8 @@ private async void DoWork(object state)
118118
{
119119
// If not, restart it
120120
_queueTask = Task.Run(ConsumeMessageQueue);
121-
122-
_logger.LogInformation("Planet Message Worker queue task stopped at: {Time}, Restarting queue task", DateTime.UtcNow);
121+
122+
_logger.LogInformation("Planet Message Worker queue task stopped at: {Time}, Restarting queue task", DateTime.UtcNow);
123123
}
124124

125125
// Don't work if there's no staged messages
@@ -129,7 +129,7 @@ private async void DoWork(object state)
129129
/* Get required services in new scope */
130130
await using var scope = _serviceProvider.CreateAsyncScope();
131131
await using var db = scope.ServiceProvider.GetRequiredService<ValourDb>();
132-
132+
133133
_logger.LogInformation(@"Planet Message Worker running at: {Time}
134134
Queue size: {QueueSize}
135135
Saving {StagedCount} messages to DB", DateTimeOffset.Now, MessageQueue.Count, StagedMessages.Count);
@@ -148,14 +148,14 @@ private async void DoWork(object state)
148148
{
149149
await db.Messages.AddRangeAsync(messages);
150150
await db.SaveChangesAsync();
151-
}
152-
catch (Exception e)
153-
{
154-
_logger.LogError(e, "Failed to save messages to database. Falling back to per-message save.");
155-
156-
// If we fail to save all messages at once, we can try to save them one by one
157-
// We will just dump any messages that fail to save
158-
151+
}
152+
catch (Exception e)
153+
{
154+
_logger.LogError(e, "Failed to save messages to database. Falling back to per-message save.");
155+
156+
// If we fail to save all messages at once, we can try to save them one by one
157+
// We will just dump any messages that fail to save
158+
159159
foreach (var message in messages)
160160
{
161161
try
@@ -169,8 +169,8 @@ private async void DoWork(object state)
169169
{
170170
_logger.LogError(ex, "Failed to save message {MessageId} to database. Skipping.", message.Id);
171171
}
172-
}
173-
}
172+
}
173+
}
174174
foreach (var staged in stagedSnapshot)
175175
{
176176
StagedMessages.TryRemove(staged.Id, out _);
@@ -183,21 +183,21 @@ private async void DoWork(object state)
183183
Volatile.Write(ref _isFlushing, 0);
184184
}
185185
}
186-
187-
/// <summary>
188-
/// This task should run forever and consume messages from
189-
/// the queue.
190-
/// </summary>
191-
private async Task ConsumeMessageQueue()
192-
{
193-
// This scope is long-living, which is usually bad. But it's only used to send events,
194-
// and does not insert into the database, so it should be fine.
195-
await using var scope = _serviceProvider.CreateAsyncScope();
196-
var hubService = scope.ServiceProvider.GetRequiredService<CoreHubService>();
197-
198-
// This is ONLY READ FROM
199-
var dbService = scope.ServiceProvider.GetRequiredService<ValourDb>();
200-
186+
187+
/// <summary>
188+
/// This task should run forever and consume messages from
189+
/// the queue.
190+
/// </summary>
191+
private async Task ConsumeMessageQueue()
192+
{
193+
// This scope is long-living, which is usually bad. But it's only used to send events,
194+
// and does not insert into the database, so it should be fine.
195+
await using var scope = _serviceProvider.CreateAsyncScope();
196+
var hubService = scope.ServiceProvider.GetRequiredService<CoreHubService>();
197+
198+
// This is ONLY READ FROM
199+
var dbService = scope.ServiceProvider.GetRequiredService<ValourDb>();
200+
201201
// This is a stream and will run forever
202202
foreach (var message in MessageQueue.GetConsumingEnumerable())
203203
{
@@ -208,19 +208,28 @@ private async Task ConsumeMessageQueue()
208208
BlockSet.TryRemove(message.Id, out _);
209209
continue;
210210
}
211-
212-
message.TimeSent = DateTime.UtcNow;
213-
214-
hubService.NotifyChannelStateUpdate(message.PlanetId!.Value, message.ChannelId, message.TimeSent);
215-
216-
if (message.ReplyToId is not null && message.ReplyTo is null)
217-
{
218-
var replyTo = (await dbService.Messages.FindAsync(message.ReplyToId)).ToModel();
219-
message.ReplyTo = replyTo;
220-
}
221-
222-
hubService.RelayMessage(message);
223-
211+
212+
message.TimeSent = DateTime.UtcNow;
213+
214+
hubService.NotifyChannelStateUpdate(message.PlanetId!.Value, message.ChannelId, message.TimeSent);
215+
216+
if (message.ReplyToId is not null && message.ReplyTo is null)
217+
{
218+
var replyToDb = await dbService.Messages.FindAsync(message.ReplyToId);
219+
if (replyToDb is not null)
220+
{
221+
message.ReplyTo = replyToDb.ToModel();
222+
}
223+
else
224+
{
225+
// The referenced reply-to message no longer exists (deleted).
226+
// Clear ReplyToId to avoid FK constraint violation (fk_replyto) on insert.
227+
message.ReplyToId = null;
228+
}
229+
}
230+
231+
hubService.RelayMessage(message);
232+
224233
// Add message to message staging
225234
StagedMessages[message.Id] = message;
226235

@@ -248,19 +257,19 @@ private static void RemoveStagedMessageFromChannel(long channelId, long messageI
248257
StagedChannelMessages[channelId] = new ConcurrentQueue<Message>(remaining);
249258
}
250259
}
251-
252-
public Task StopAsync(CancellationToken stoppingToken)
253-
{
254-
_logger.LogInformation("Message Worker is Stopping");
255-
256-
_timer?.Change(Timeout.Infinite, 0);
257-
258-
return Task.CompletedTask;
259-
}
260-
261-
public void Dispose()
262-
{
263-
_timer?.Dispose();
264-
}
265-
}
266-
}
260+
261+
public Task StopAsync(CancellationToken stoppingToken)
262+
{
263+
_logger.LogInformation("Message Worker is Stopping");
264+
265+
_timer?.Change(Timeout.Infinite, 0);
266+
267+
return Task.CompletedTask;
268+
}
269+
270+
public void Dispose()
271+
{
272+
_timer?.Dispose();
273+
}
274+
}
275+
}

0 commit comments

Comments
 (0)