added Conditions and Actions

added Pipeline to Entry
separate_condition
alex 2023-08-15 21:37:21 +02:00
parent fd834500e0
commit 187fcc12c2
6 changed files with 218 additions and 36 deletions

View File

@ -1,7 +1,7 @@
import os import os
import shutil
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Tuple, List from typing import Tuple, List
from logger import Logger from logger import Logger
@ -20,11 +20,17 @@ class LoggMessageAction(Action):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
entry = self.kwargs.get('entry') entry = self.kwargs.get('entry')
Logger.write_message(entry, self.args[0]) Logger.write_message(entry, self.args[0])
messages = args[0]
if isinstance(messages, list):
for m in messages:
Logger.write_message(entry, "\t" + str(m))
else:
Logger.write_message(entry, "\t" + str(messages))
class StubAction(Action): class StubAction(Action):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
pass return True, None
class ReadAction(Action): class ReadAction(Action):
@ -36,9 +42,55 @@ class ReadAction(Action):
class MoveAction(Action): class MoveAction(Action):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
pass files = args[0]
destination = self.args[0]
if destination:
if os.path.exists(destination) and not os.path.isdir(destination):
return False, "Not a Directory"
elif not os.path.exists(destination):
os.makedirs(destination)
if isinstance(files, list):
for file in files:
f = os.path.basename(file)
os.rename(file, destination + os.path.sep + f)
else:
f = os.path.basename(files)
os.rename(files, destination + os.path.sep + f)
return True, files
return False, None
class CopyAction(Action):
def execute(self, *args, **kwargs):
files = args[0]
destination = self.args[0]
if destination:
if os.path.exists(destination) and not os.path.isdir(destination):
return False, "Not a Directory"
elif not os.path.exists(destination):
os.makedirs(destination)
if isinstance(files, list):
for file in files:
f = os.path.basename(file)
shutil.copy(file, destination + os.path.sep + f)
else:
f = os.path.basename(files)
shutil.copy(files, destination + os.path.sep + f)
return True, files
return False, None
class RemoveAction(Action):
def execute(self, *args, **kwargs):
files = args[0]
if isinstance(files, list):
for file in files:
os.remove(file)
else:
os.remove(files)
return True, files
# return False, None
class CompressAction(Action): class CompressAction(Action):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
@ -46,3 +98,12 @@ class CompressAction(Action):
actions = {
"LoggMessage": LoggMessageAction,
"Stub": StubAction,
"Move": MoveAction,
"Copy": CopyAction,
"Remove": RemoveAction,
}

11
app.py
View File

@ -13,14 +13,7 @@ class App:
def start(self): def start(self):
self._config.read_config() self._config.read_config()
self._config.add_entry(Entry('myTest',
IsDirectoryEmptyCondition(path='emptydir', entry='myTest'),
LoggMessageAction('success', entry='myTest'),
LoggMessageAction('fail', entry='myTest')))
for entry in self._config: for entry in self._config:
result = entry.action.execute() entry.execute()
if result:
entry.success.execute()
else:
entry.fail.execute()

View File

@ -1,3 +1,4 @@
import datetime
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import os import os
from logger import Logger from logger import Logger
@ -14,26 +15,101 @@ class Condition(ABC):
pass pass
class IsFileExistsCondition(Condition): class IfFileExistsCondition(Condition):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
path = self.kwargs.get('path') path = None
if len(self.args) > 0:
path = self.args[0]
entry = self.kwargs.get('entry') entry = self.kwargs.get('entry')
if not path: if not path:
Logger.write_error(entry, 'Need a path') Logger.write_error(entry, 'Need a path')
else: else:
return os.path.exists(path) if os.path.exists(path):
return True, path
return False, None
class IsDirectoryEmptyCondition(Condition): class IfDirectoryEmptyCondition(Condition):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
path = self.kwargs.get('path') path = None
if len(self.args) > 0:
path = self.args[0]
entry = self.kwargs.get('entry') entry = self.kwargs.get('entry')
if not path: if not path:
Logger.write_error(entry, 'Need a path') Logger.write_error(entry, 'Need a path')
else: else:
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
if files: if files:
return False return False, files
return True return True, None
class IfDirectoryNotEmptyCondition(Condition):
def execute(self, *args, **kwargs):
path = None
if len(self.args) > 0:
path = self.args[0]
entry = self.kwargs.get('entry')
if not path:
Logger.write_error(entry, 'Need a path')
else:
files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
if files:
return True, files
return False, None
class GetCreationTimeCondition(Condition):
def execute(self, *args, **kwargs):
path = args[0]
result = list()
if isinstance(path, list):
for p in path:
time = datetime.datetime.fromtimestamp(os.path.getctime(p))
time = time.strftime("%Y.%m.%d %H:%M:%S")
result.append(time)
else:
time = datetime.datetime.fromtimestamp(os.path.getctime(path))
time = time.strftime("%Y.%m.%d %H:%M:%S")
result.append(time)
return True, result
class IfCreationTimeCondition(Condition):
def execute(self, *args, **kwargs):
s = self.args[0].split(' ', 1)
if len(s) > 1:
oper = s[0]
time = s[1]
search_time = datetime.datetime.strptime(time, "%Y.%m.%d %H:%M:%S")
path = args[0]
result = list()
if isinstance(path, list):
for p in path:
time = datetime.datetime.fromtimestamp(os.path.getctime(p))
# time = time.strftime("%Y.%m.%d %H:%M:%S")
if oper == '<' and time < search_time:
result.append(p)
elif oper == '=' and time == search_time:
result.append(p)
elif oper == '>' and time > search_time:
result.append(p)
else:
time = datetime.datetime.fromtimestamp(os.path.getctime(path))
# time = time.strftime("%Y.%m.%d %H:%M:%S")
if oper == '<' and time < search_time:
result.append(path)
elif oper == '=' and time == search_time:
result.append(path)
elif oper == '>' and time > search_time:
result.append(path)
return True, result
conditions = {
"IfFileExists": IfFileExistsCondition,
"IfDirectoryEmpty": IfDirectoryEmptyCondition,
"IfDirectoryNotEmpty": IfDirectoryNotEmptyCondition,
"GetCreationTime": GetCreationTimeCondition,
"IfCreationTime": IfCreationTimeCondition,
}

View File

@ -1,5 +1,6 @@
from entry import Entry from entry import Entry
from actions import * from actions import *
from conditions import *
class Config: class Config:
@ -22,13 +23,44 @@ class Config:
def read_config(self, *args, **kwargs): def read_config(self, *args, **kwargs):
with open('config.txt', 'r') as file: with open('config.txt', 'r') as file:
for line in file: for line in file:
self._create_entry(line) if len(line) < 2:
continue
if line.startswith('#'):
continue
entry = self._create_entry(line)
self._entries.append(entry)
def _create_entry(self, line): def _create_entry(self, line):
splitts = line.split(';') splits = line.split(';')
assert(len(splitts) == 4) assert(len(splits) == 4)
name = splitts[0] name = splits[0]
condition, con_param = splitts[1].strip().split(' ') condition_list = self._split_conditions(splits[1], name)
success, suc_param = splitts[2].strip().split(' ', 1) success_list = self._split_actions(splits[2], name)
fail, fail_param = splitts[3].strip().split(' ', 1) fail_list = self._split_actions(splits[3], name)
self._entries.append(Entry(name, condition(con_param), success(suc_param), fail(fail_param))) return Entry(name, condition_list, success_list, fail_list)
def _split_conditions(self, split, entry_name):
splits = list()
split.strip()
for s in split.split(','):
s = s.strip()
ss = s.split(' ', 1)
condition = conditions[ss[0]]
argument = None
if len(ss) > 1:
argument = str(ss[1])
splits.append(condition(argument, entry=entry_name))
return splits
def _split_actions(self, split, entry_name):
splits = list()
split.strip()
for s in split.split(','):
s = s.strip()
ss = s.split(' ', 1)
action = actions[ss[0]]
argument = None
if len(ss) > 1:
argument = str(ss[1])
splits.append(action(argument, entry=entry_name))
return splits

View File

@ -1 +1,7 @@
MyTest; IsDirectoryEmpty C:\Users\a.jurcenko\PycharmProjects\logreader\emptydir; LoggMessage Alles Gut; LoggMessage Alles Schlecht # FirstTest; IfFileExists testdata/log.txt, IsFileExists testdata/second.log, IsFileExists testdata/third.log; LoggMessage Alles OK; LoggMessage Nicht Ok
# MoveTest; IfFileExists testdata/log.txt; Move emptydir; LoggMessage Somthing went wrong
# MoveAllFiles; IsDirectoryEmpty testdata; LoggMessage Nop; Copy emptydir
# RemoveTest; IfFileExists testdata/third.log; Remove; LoggMessage Not Removed
# RemoveFilesDirectoryTest; IfDirectoryEmpty test; Stub; Remove
CreationTimeTest; IfDirectoryNotEmpty testdata, GetCreationTime; LoggMessage Creation Time; Stub
IfCreationTimeTest; IfDirectoryNotEmpty testdata, IfCreationTime > 2023.08.15 20:09:39; LoggMessage IfCreation Time; Stub

View File

@ -1,22 +1,36 @@
from actions import Action, Condition from actions import Action
from conditions import Condition
class Entry: class Entry:
def __init__(self, name: str, condition: Condition, success: Action, fail: Action): def __init__(self, name: str, condition: list[Condition], success: list[Action], fail: list[Action]):
self._name = name self._name = name
self._action = condition self._conditions = condition
self._success = success self._success = success
self._fail = fail self._fail = fail
def execute(self):
result, output = True, None
for con in self._conditions:
res, output = con.execute(output)
if not res:
result = False
break
if result:
for s in self._success:
s.execute(output)
else:
for f in self._fail:
f.execute(output)
@property @property
def name(self): def name(self):
return self._name return self._name
@property @property
def action(self): def condition(self):
return self._action return self._conditions
@property @property
def success(self): def success(self):