Coverage for roc/dingo/tasks/file_handler.py: 0%

65 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-02 14:29 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4""" Tasks for handling files processed by DINGO plugin.""" 

5 

6import os 

7from pathlib import Path 

8 

9from poppy.core.logger import logger 

10from poppy.core.task import Task 

11 

12__all__ = ['MoveProcessedFiles', 'MoveFailedFiles'] 

13 

14from roc.dingo.tools import safe_move 

15 

16 

17class MoveProcessedFiles(Task): 

18 """Task to move input list of well processed files in a given target directory.""" 

19 plugin_name = 'roc.dingo' 

20 name = 'move_processed_files' 

21 

22 def setup_inputs(self): 

23 

24 # See if --no-move keyword is defined 

25 self.no_move = self.pipeline.get('no_move', default=False, args=True) 

26 

27 # Get or create processed_files list from pipeline properties 

28 self.processed_files = self.pipeline.get( 

29 'processed_files', default=[], create=True) 

30 

31 # Get or create processed_files_dir from pipeline properties 

32 self.processed_files_dir = self.pipeline.get( 

33 'processed_files_dir', default=[None], create=True)[0] 

34 

35 def run(self): 

36 

37 try: 

38 self.setup_inputs() 

39 except: 

40 logger.exception( 

41 'Initializing inputs for task MoveProcessedFiles has failed!') 

42 self.pipeline.exit() 

43 return 

44 

45 if self.no_move: 

46 logger.debug( 

47 'Skip MoveProcessedFiles task') 

48 return 

49 

50 if not self.processed_files: 

51 logger.debug( 

52 'Input list of processed files is empty: skip MoveProcessedFiles task') 

53 return 

54 else: 

55 

56 if not self.processed_files_dir: 

57 logger.debug( 

58 'processed_files_dir argument not passed, skip task') 

59 return 

60 

61 # Create folder if it does not exist 

62 Path(self.processed_files_dir).mkdir(parents=True, exist_ok=True) 

63 

64 for current_file in self.processed_files: 

65 logger.info(f'Moving {current_file} into {self.processed_files_dir}') 

66 if not safe_move(current_file, self.processed_files_dir): 

67 logger.error(f'Cannot move {current_file} into {self.processed_files_dir}') 

68 

69 

70class MoveFailedFiles(Task): 

71 """Move failed files found 

72 into a target directory.""" 

73 plugin_name = 'roc.dingo' 

74 name = 'move_failed_files' 

75 

76 def setup_inputs(self): 

77 

78 # See if --no-move keyword is defined 

79 self.no_move = self.pipeline.get('no_move', default=False, args=True) 

80 

81 # Get or create failed_files list from pipeline properties 

82 self.failed_files = self.pipeline.get( 

83 'failed_files', default=[], create=True) 

84 

85 # Get or create failed_files_dir list from pipeline properties 

86 self.failed_files_dir = self.pipeline.get( 

87 'failed_files_dir', default=[None], create=True)[0] 

88 

89 def run(self): 

90 

91 try: 

92 self.setup_inputs() 

93 except: 

94 logger.exception( 

95 'Initializing inputs for task MoveFailedFiles has failed!') 

96 self.pipeline.exit() 

97 return 

98 

99 if self.no_move: 

100 logger.debug( 

101 'Skip current task MoveFailedFiles') 

102 return 

103 

104 if not self.failed_files: 

105 logger.debug( 

106 'Input list of failed files is empty: skip MoveFailedFiles task') 

107 return 

108 else: 

109 

110 if not self.failed_files_dir: 

111 logger.warning(f'There are failed files but failed_files_dir value is not defined, skip MoveFailedFiles task') 

112 return 

113 

114 # Create folder if it does not exist 

115 Path(self.failed_files_dir).mkdir(parents=True, exist_ok=True) 

116 

117 for current_file in self.failed_files: 

118 logger.info(f'Moving {current_file} into {self.failed_files_dir}') 

119 # If failed file does not exist, create an empty file 

120 if not os.path.isfile(current_file): 

121 Path(current_file).touch(exist_ok=True) 

122 

123 if not safe_move(current_file, self.failed_files_dir): 

124 logger.error(f'Cannot move {current_file} into {self.failed_files_dir}')