Coverage for roc/dingo/tasks/file_handler.py: 0%
65 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-02 14:29 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-02 14:29 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4""" Tasks for handling files processed by DINGO plugin."""
6import os
7from pathlib import Path
9from poppy.core.logger import logger
10from poppy.core.task import Task
12__all__ = ['MoveProcessedFiles', 'MoveFailedFiles']
14from roc.dingo.tools import safe_move
17class MoveProcessedFiles(Task):
18 """Task to move input list of well processed files in a given target directory."""
19 plugin_name = 'roc.dingo'
20 name = 'move_processed_files'
22 def setup_inputs(self):
24 # See if --no-move keyword is defined
25 self.no_move = self.pipeline.get('no_move', default=False, args=True)
27 # Get or create processed_files list from pipeline properties
28 self.processed_files = self.pipeline.get(
29 'processed_files', default=[], create=True)
31 # Get or create processed_files_dir from pipeline properties
32 self.processed_files_dir = self.pipeline.get(
33 'processed_files_dir', default=[None], create=True)[0]
35 def run(self):
37 try:
38 self.setup_inputs()
39 except:
40 logger.exception(
41 'Initializing inputs for task MoveProcessedFiles has failed!')
42 self.pipeline.exit()
43 return
45 if self.no_move:
46 logger.debug(
47 'Skip MoveProcessedFiles task')
48 return
50 if not self.processed_files:
51 logger.debug(
52 'Input list of processed files is empty: skip MoveProcessedFiles task')
53 return
54 else:
56 if not self.processed_files_dir:
57 logger.debug(
58 'processed_files_dir argument not passed, skip task')
59 return
61 # Create folder if it does not exist
62 Path(self.processed_files_dir).mkdir(parents=True, exist_ok=True)
64 for current_file in self.processed_files:
65 logger.info(f'Moving {current_file} into {self.processed_files_dir}')
66 if not safe_move(current_file, self.processed_files_dir):
67 logger.error(f'Cannot move {current_file} into {self.processed_files_dir}')
70class MoveFailedFiles(Task):
71 """Move failed files found
72 into a target directory."""
73 plugin_name = 'roc.dingo'
74 name = 'move_failed_files'
76 def setup_inputs(self):
78 # See if --no-move keyword is defined
79 self.no_move = self.pipeline.get('no_move', default=False, args=True)
81 # Get or create failed_files list from pipeline properties
82 self.failed_files = self.pipeline.get(
83 'failed_files', default=[], create=True)
85 # Get or create failed_files_dir list from pipeline properties
86 self.failed_files_dir = self.pipeline.get(
87 'failed_files_dir', default=[None], create=True)[0]
89 def run(self):
91 try:
92 self.setup_inputs()
93 except:
94 logger.exception(
95 'Initializing inputs for task MoveFailedFiles has failed!')
96 self.pipeline.exit()
97 return
99 if self.no_move:
100 logger.debug(
101 'Skip current task MoveFailedFiles')
102 return
104 if not self.failed_files:
105 logger.debug(
106 'Input list of failed files is empty: skip MoveFailedFiles task')
107 return
108 else:
110 if not self.failed_files_dir:
111 logger.warning(f'There are failed files but failed_files_dir value is not defined, skip MoveFailedFiles task')
112 return
114 # Create folder if it does not exist
115 Path(self.failed_files_dir).mkdir(parents=True, exist_ok=True)
117 for current_file in self.failed_files:
118 logger.info(f'Moving {current_file} into {self.failed_files_dir}')
119 # If failed file does not exist, create an empty file
120 if not os.path.isfile(current_file):
121 Path(current_file).touch(exist_ok=True)
123 if not safe_move(current_file, self.failed_files_dir):
124 logger.error(f'Cannot move {current_file} into {self.failed_files_dir}')