Skip to content

Commit de21794

Browse files
author
Aki Ariga
authored
Support td-client perform timeout (#151)
* Support td-client perform timeout. Follow up treasure-data/td-client-python#132 * Fix dependent td-client version
1 parent e3bd49c commit de21794

3 files changed

Lines changed: 34 additions & 3 deletions

File tree

pytd/tests/test_writer.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,25 @@ def test_perform_wait_callback_parameter(self):
446446

447447
# Check that perform was called with the wait_callback parameter
448448
mock_bulk_import.perform.assert_called_with(
449-
wait=True, wait_callback=callback_func
449+
wait=True, timeout=None, wait_callback=callback_func
450+
)
451+
452+
def test_perform_timeout_parameter(self):
453+
"""Test that perform_timeout parameter is passed correctly"""
454+
df = pd.DataFrame([[1, 2], [3, 4]])
455+
timeout_value = 300 # 5 minutes
456+
457+
# Mock the bulk_import.perform method to check if timeout is passed
458+
mock_bulk_import = self.table.client.api_client.create_bulk_import.return_value
459+
mock_bulk_import.perform = MagicMock()
460+
461+
self.writer.write_dataframe(
462+
df, self.table, "overwrite", perform_timeout=timeout_value
463+
)
464+
465+
# Check that perform was called with the timeout parameter
466+
mock_bulk_import.perform.assert_called_with(
467+
wait=True, timeout=timeout_value, wait_callback=None
450468
)
451469

452470

pytd/writer.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ def write_dataframe(
335335
show_progress=False,
336336
bulk_import_name=None,
337337
commit_timeout=None,
338+
perform_timeout=None,
338339
perform_wait_callback=None,
339340
):
340341
"""Write a given DataFrame to a Treasure Data table.
@@ -453,6 +454,10 @@ def write_dataframe(
453454
Timeout in seconds for the bulk import commit operation. If None,
454455
no timeout is applied.
455456
457+
perform_timeout : int, optional, default: None
458+
Timeout in seconds for the bulk import perform operation. If None,
459+
no timeout is applied.
460+
456461
perform_wait_callback : callable, optional, default: None
457462
A callable to be called on every tick of wait interval during
458463
bulk import job execution.
@@ -539,6 +544,7 @@ def write_dataframe(
539544
show_progress=show_progress,
540545
bulk_import_name=bulk_import_name,
541546
commit_timeout=commit_timeout,
547+
perform_timeout=perform_timeout,
542548
perform_wait_callback=perform_wait_callback,
543549
)
544550
stack.close()
@@ -553,6 +559,7 @@ def _bulk_import(
553559
show_progress=False,
554560
bulk_import_name=None,
555561
commit_timeout=None,
562+
perform_timeout=None,
556563
perform_wait_callback=None,
557564
):
558565
"""Write a specified CSV file to a Treasure Data table.
@@ -594,6 +601,10 @@ def _bulk_import(
594601
Timeout in seconds for the bulk import commit operation. If None,
595602
no timeout is applied.
596603
604+
perform_timeout : int, optional, default: None
605+
Timeout in seconds for the bulk import perform operation. If None,
606+
no timeout is applied.
607+
597608
perform_wait_callback : callable, optional, default: None
598609
A callable to be called on every tick of wait interval during
599610
bulk import job execution.
@@ -659,7 +670,9 @@ def _bulk_import(
659670
logger.debug(f"uploaded data in {time.time() - s_time:.2f} sec")
660671

661672
logger.info("performing a bulk import job")
662-
job = bulk_import.perform(wait=True, wait_callback=perform_wait_callback)
673+
job = bulk_import.perform(
674+
wait=True, timeout=perform_timeout, wait_callback=perform_wait_callback
675+
)
663676

664677
if 0 < bulk_import.error_records:
665678
logger.warning(

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ install_requires =
3333
trino>=0.334.0
3434
pandas>=2.1.0
3535
numpy>=1.25.2
36-
td-client>=1.1.0
36+
td-client>=1.5.0
3737
pytz>=2018.5
3838
tqdm>=4.60.0
3939

0 commit comments

Comments
 (0)