1717 remove_duplicated_files ,
1818 transform_images ,
1919 start_df ,
20+ shuffle_csv ,
2021 Checkpoint ,
2122 Stages ,
2223)
4243GEN_IMAGES_TASK_ID = "gen_images"
4344REMOVE_DUPLICATES_TASK_ID = "remove_duplicates"
4445TRANSFORM_TASK_ID = "transform_images"
46+ SHUFFLE_DATASET_ID = "shuffle_df"
4547
4648
4749def next_step (checkpoint : Checkpoint ) -> str :
@@ -55,6 +57,9 @@ def next_step(checkpoint: Checkpoint) -> str:
5557 if checkpoint .stage == Stages .GEN_IMAGES :
5658 return GEN_IMAGES_TASK_ID
5759
60+ if checkpoint .stage == Stages .SHUFFLE :
61+ return SHUFFLE_DATASET_ID
62+
5863 if checkpoint .stage == Stages .DUPLICATES :
5964 return REMOVE_DUPLICATES_TASK_ID
6065
@@ -131,14 +136,35 @@ def update_checkpoint(checkpoint: Checkpoint, stage: Stages):
131136 Qiskit framework.
132137 """
133138
134- transtion_gen_to_remove = PythonOperator (
135- task_id = "gen_to_remove" ,
139+ transtion_gen_to_shuffle = PythonOperator (
140+ task_id = "gen_to_shuffle" ,
141+ python_callable = update_checkpoint ,
142+ op_args = [checkpoint , Stages .SHUFFLE ],
143+ )
144+
145+ transtion_gen_to_shuffle .doc_md = """
146+ Update checkpoint to start shuffling rows.
147+ """
148+
149+ shuffle = PythonOperator (
150+ task_id = SHUFFLE_DATASET_ID ,
151+ python_callable = shuffle_csv ,
152+ op_args = [folder ],
153+ trigger_rule = TriggerRule .NONE_FAILED_MIN_ONE_SUCCESS
154+ )
155+
156+ shuffle .doc_md = """
157+ Shuffle dataset rows.
158+ """
159+
160+ transtion_shuffle_to_remove = PythonOperator (
161+ task_id = "shuffle_to_remove" ,
136162 python_callable = update_checkpoint ,
137163 op_args = [checkpoint , Stages .DUPLICATES ],
138164 )
139165
140- transtion_gen_to_remove .doc_md = """
141- Update checkpoint to start removing duplicated files .
166+ transtion_shuffle_to_remove .doc_md = """
167+ Update checkpoint to start deleting duplicated rows .
142168 """
143169
144170 remove_duplicates = PythonOperator (
@@ -228,11 +254,14 @@ def update_checkpoint(checkpoint: Checkpoint, stage: Stages):
228254 gen_df >> branch_checkpoint
229255
230256 branch_checkpoint >> gen_images
257+ branch_checkpoint >> shuffle
231258 branch_checkpoint >> remove_duplicates
232259 branch_checkpoint >> transform_img
233260
234- gen_images >> transtion_gen_to_remove
235- transtion_gen_to_remove >> remove_duplicates
261+ gen_images >> transtion_gen_to_shuffle
262+ transtion_gen_to_shuffle >> shuffle
263+ shuffle >> transtion_shuffle_to_remove
264+ transtion_shuffle_to_remove >> remove_duplicates
236265 remove_duplicates >> transition_remove_to_transform
237266 transition_remove_to_transform >> transform_img
238267 transform_img >> reset_checkpoint
0 commit comments