Skip to content

Commit d1d0509

Browse files
authored
Clear retried executions from successful periodic tasks (#188)
Previously, if you had a periodic task that retries up to N times for valid reasons, we would never clear the executions, so N would accumulate over the lifetime of a periodic task (forever). This commit doesn't completely fix the issue of tracking executions for previously failed periods, but at least it prevents executions from accumulating if the periodic task completes successfully after retries.
1 parent 1bb92cc commit d1d0509

3 files changed

Lines changed: 112 additions & 4 deletions

File tree

tasktiger/task.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,23 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None):
274274

275275
if not to_state: # Remove the task if necessary
276276
if self.unique:
277-
# Only delete if it's not in any other queue
278-
check_states = {ACTIVE, QUEUED, ERROR, SCHEDULED}
279-
check_states.remove(from_state)
280277
# TODO: Do the following two in one call.
278+
279+
# Delete executions if there were no errors.
280+
if from_state == ERROR:
281+
check_states = {}
282+
else:
283+
check_states = {ERROR}
281284
scripts.delete_if_not_in_zsets(
282285
_key('task', self.id, 'executions'),
283286
self.id,
284287
[_key(state, queue) for state in check_states],
285288
client=pipeline,
286289
)
290+
291+
# Only delete task if it's not in any other queue
292+
check_states = {ACTIVE, QUEUED, ERROR, SCHEDULED}
293+
check_states.remove(from_state)
287294
scripts.delete_if_not_in_zsets(
288295
_key('task', self.id),
289296
self.id,

tests/tasks_periodic.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,18 @@
1010
tiger = get_tiger()
1111

1212

13-
@tiger.task(schedule=periodic(seconds=1), queue='periodic')
13+
@tiger.task(
14+
schedule=periodic(seconds=1), queue='periodic', retry_on=(ValueError,)
15+
)
1416
def periodic_task():
1517
"""Periodic task."""
1618
conn = redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True)
1719
conn.incr('period_count', 1)
20+
fail = conn.get('fail-periodic-task')
21+
if fail == 'retriable':
22+
raise ValueError('retriable failure')
23+
elif fail == 'permanent':
24+
raise Exception('permanent failure')
1825

1926

2027
@tiger.task(schedule=periodic(seconds=1), queue='periodic_ignore')

tests/test_periodic.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,97 @@ def test_periodic_execution_unique_ids_self_correct(self):
229229
# pull task out of the queue by the self-corrected id
230230
task = Task.from_id(tiger, 'periodic', SCHEDULED, correct_unique_id)
231231
assert task is not None
232+
233+
def test_successful_execution_clears_executions_from_retries(self):
234+
"""
235+
Ensure previous executions from retries are cleared after a successful
236+
execution.
237+
"""
238+
sleep_until_next_second()
239+
240+
# Queue the periodic task.
241+
self._ensure_queues()
242+
Worker(tiger).run(once=True)
243+
244+
# Prepare to execute the periodic task (as retriable failure).
245+
tiger.connection.set('fail-periodic-task', 'retriable')
246+
n_total, tasks = Task.tasks_from_queue(tiger, 'periodic', SCHEDULED)
247+
task_id = tasks[0].id
248+
time.sleep(1)
249+
250+
# Queue the periodic task.
251+
self._ensure_queues(scheduled={'periodic': 1})
252+
Worker(tiger).run(once=True)
253+
254+
# Run the failing periodic task.
255+
self._ensure_queues(queued={'periodic': 1})
256+
Worker(tiger).run(once=True)
257+
258+
task = Task.from_id(
259+
tiger, 'periodic', SCHEDULED, task_id, load_executions=10
260+
)
261+
assert len(task.executions) == 1
262+
263+
tiger.connection.delete('fail-periodic-task')
264+
time.sleep(1)
265+
266+
# Queue the periodic task.
267+
self._ensure_queues(scheduled={'periodic': 1})
268+
Worker(tiger).run(once=True)
269+
270+
# Run the successful periodic task.
271+
self._ensure_queues(queued={'periodic': 1})
272+
Worker(tiger).run(once=True)
273+
274+
# Ensure we cleared any previous executions.
275+
task = Task.from_id(
276+
tiger, 'periodic', SCHEDULED, task_id, load_executions=10
277+
)
278+
assert len(task.executions) == 0
279+
280+
def test_successful_execution_doesnt_clear_previous_errors(self):
281+
"""
282+
Ensure previous executions are not cleared if we have had non-retriable
283+
errors.
284+
"""
285+
sleep_until_next_second()
286+
287+
# Queue the periodic task.
288+
self._ensure_queues()
289+
Worker(tiger).run(once=True)
290+
291+
# Prepare to execute the periodic task (as permanent failure).
292+
tiger.connection.set('fail-periodic-task', 'permanent')
293+
n_total, tasks = Task.tasks_from_queue(tiger, 'periodic', SCHEDULED)
294+
task_id = tasks[0].id
295+
time.sleep(1)
296+
297+
# Queue the periodic task.
298+
self._ensure_queues(scheduled={'periodic': 1})
299+
Worker(tiger).run(once=True)
300+
301+
# Run the failing periodic task.
302+
self._ensure_queues(queued={'periodic': 1})
303+
Worker(tiger).run(once=True)
304+
305+
task = Task.from_id(
306+
tiger, 'periodic', SCHEDULED, task_id, load_executions=10
307+
)
308+
assert len(task.executions) == 1
309+
310+
tiger.connection.delete('fail-periodic-task')
311+
time.sleep(1)
312+
313+
# Queue the periodic task.
314+
self._ensure_queues(scheduled={'periodic': 1}, error={'periodic': 1})
315+
Worker(tiger).run(once=True)
316+
317+
# Run the successful periodic task.
318+
self._ensure_queues(queued={'periodic': 1}, error={'periodic': 1})
319+
Worker(tiger).run(once=True)
320+
321+
# Ensure we didn't clear previous executions.
322+
task = Task.from_id(
323+
tiger, 'periodic', SCHEDULED, task_id, load_executions=10
324+
)
325+
assert len(task.executions) == 1

0 commit comments

Comments
 (0)