Skip to content

Commit 22911ea

Browse files
authored
Merged PR 1685054: Add more logs and function wait_futures for easier post analysis (#1438)
- Add function wait_futures for easier post analysis - Use logger instead of print ---- #### AI description (iteration 1) #### PR Classification A code enhancement for debugging asynchronous mlflow logging and improving post-run analysis. #### PR Summary This PR adds detailed debug logging to the mlflow integration and introduces a new `wait_futures` function to streamline the collection of asynchronous task results for improved analysis. - `flaml/fabric/mlflow.py`: Added debug log statements around starting and ending mlflow runs to trace run IDs and execution flow. - `flaml/automl/automl.py`: Implemented the `wait_futures` function to handle asynchronous task results and replaced a print call with `logger.info` for consistent logging. <!-- GitOpsUserAgent=GitOps.Apps.Server.pullrequestcopilot --> Related work items: #4029592
1 parent 12183e5 commit 22911ea

2 files changed

Lines changed: 20 additions & 1 deletion

File tree

flaml/automl/automl.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1732,7 +1732,7 @@ def cv_score_agg_func(val_loss_folds, log_metrics_folds):
17321732
if not (mlflow.active_run() is not None or is_autolog_enabled()):
17331733
self.mlflow_integration.only_history = True
17341734
except KeyError:
1735-
print("Not in Fabric, Skipped")
1735+
logger.info("Not in Fabric, Skipped")
17361736
task.validate_data(
17371737
self,
17381738
self._state,
@@ -2756,6 +2756,9 @@ def _search(self):
27562756
)
27572757
else:
27582758
logger.warning("not retraining because the time budget is too small.")
2759+
self.wait_futures()
2760+
2761+
def wait_futures(self):
27592762
if self.mlflow_integration is not None:
27602763
logger.debug("Collecting results from submitted record_state tasks")
27612764
t1 = time.perf_counter()
@@ -2775,6 +2778,8 @@ def _search(self):
27752778
logger.warning(f"Exception for log_model task {_task}: {e}")
27762779
t2 = time.perf_counter()
27772780
logger.debug(f"Collecting results from tasks submitted to executors costs {t2-t1} seconds.")
2781+
else:
2782+
logger.debug("No futures to wait for.")
27782783

27792784
def __del__(self):
27802785
if (

flaml/fabric/mlflow.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,14 +516,19 @@ def log_model(self, model, estimator, signature=None, run_id=None):
516516
)
517517
run = mlflow.active_run()
518518
if run and run.info.run_id == self.parent_run_id:
519+
logger.debug(
520+
f"Current active run_id {run.info.run_id} == parent_run_id {self.parent_run_id}, Starting run_id {run_id}"
521+
)
519522
mlflow.start_run(run_id=run_id, nested=True)
520523
elif run and run.info.run_id != run_id:
521524
ret_message = (
522525
f"Error: Should log_model {estimator} to run_id {run_id}, but logged to run_id {run.info.run_id}"
523526
)
524527
logger.error(ret_message)
525528
else:
529+
logger.debug(f"No active run, start run_id {run_id}")
526530
mlflow.start_run(run_id=run_id)
531+
logger.debug(f"logged model {estimator} to run_id {mlflow.active_run().info.run_id}")
527532
if estimator.endswith("_spark"):
528533
# mlflow.spark.log_model(model, estimator, signature=signature)
529534
mlflow.spark.log_model(model, "model", signature=signature)
@@ -550,6 +555,7 @@ def log_model(self, model, estimator, signature=None, run_id=None):
550555
)
551556
self.futures[future] = f"run_{run_id}_requirements_updated"
552557
if not run or run.info.run_id == self.parent_run_id:
558+
logger.debug(f"Ending current run_id {mlflow.active_run().info.run_id}")
553559
mlflow.end_run()
554560
return ret_message
555561

@@ -575,12 +581,19 @@ def _log_pipeline(self, pipeline, flavor_name, pipeline_name, signature, run_id,
575581
)
576582
run = mlflow.active_run()
577583
if run and run.info.run_id == self.parent_run_id:
584+
logger.debug(
585+
f"Current active run_id {run.info.run_id} == parent_run_id {self.parent_run_id}, Starting run_id {run_id}"
586+
)
578587
mlflow.start_run(run_id=run_id, nested=True)
579588
elif run and run.info.run_id != run_id:
580589
ret_message = f"Error: Should _log_pipeline {flavor_name}:{pipeline_name}:{estimator} model to run_id {run_id}, but logged to run_id {run.info.run_id}"
581590
logger.error(ret_message)
582591
else:
592+
logger.debug(f"No active run, start run_id {run_id}")
583593
mlflow.start_run(run_id=run_id)
594+
logger.debug(
595+
f"logging pipeline {flavor_name}:{pipeline_name}:{estimator} to run_id {mlflow.active_run().info.run_id}"
596+
)
584597
if flavor_name == "sklearn":
585598
mlflow.sklearn.log_model(pipeline, pipeline_name, signature=signature)
586599
elif flavor_name == "spark":
@@ -596,6 +609,7 @@ def _log_pipeline(self, pipeline, flavor_name, pipeline_name, signature, run_id,
596609
)
597610
self.futures[future] = f"run_{run_id}_requirements_updated"
598611
if not run or run.info.run_id == self.parent_run_id:
612+
logger.debug(f"Ending current run_id {mlflow.active_run().info.run_id}")
599613
mlflow.end_run()
600614
return ret_message
601615

0 commit comments

Comments
 (0)