Compare commits
2 Commits
master
...
separate_c
| Author | SHA1 | Date |
|---|---|---|
|
|
187fcc12c2 | |
|
|
fd834500e0 |
80
action.py
80
action.py
|
|
@ -1,80 +0,0 @@
|
||||||
import abc
|
|
||||||
import os
|
|
||||||
from abc import ABC
|
|
||||||
from logger import Logger
|
|
||||||
|
|
||||||
|
|
||||||
class Action(ABC):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.args = args
|
|
||||||
self.kwargs = kwargs
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class Condition(ABC):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.args = args
|
|
||||||
self.kwargs = kwargs
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class LoggMessageAction(Action):
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
entry = self.kwargs.get('entry')
|
|
||||||
Logger.write_message(entry, self.args[0])
|
|
||||||
|
|
||||||
|
|
||||||
class StubAction(Action):
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ReadAction(Action):
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class MoveAction(Action):
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class CompressAction(Action):
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IsFileExistsCondition(Condition):
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
path = self.kwargs.get('path')
|
|
||||||
entry = self.kwargs.get('entry')
|
|
||||||
if not path:
|
|
||||||
Logger.write_error(entry, 'Need a path')
|
|
||||||
else:
|
|
||||||
return os.path.exists(path)
|
|
||||||
|
|
||||||
|
|
||||||
class IsDirectoryEmptyCondition(Condition):
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
path = self.kwargs.get('path')
|
|
||||||
entry = self.kwargs.get('entry')
|
|
||||||
if not path:
|
|
||||||
Logger.write_error(entry, 'Need a path')
|
|
||||||
else:
|
|
||||||
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
|
|
||||||
if files:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,109 @@
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Tuple, List
|
||||||
|
from logger import Logger
|
||||||
|
|
||||||
|
|
||||||
|
class Action(ABC):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LoggMessageAction(Action):
|
||||||
|
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
entry = self.kwargs.get('entry')
|
||||||
|
Logger.write_message(entry, self.args[0])
|
||||||
|
messages = args[0]
|
||||||
|
if isinstance(messages, list):
|
||||||
|
for m in messages:
|
||||||
|
Logger.write_message(entry, "\t" + str(m))
|
||||||
|
else:
|
||||||
|
Logger.write_message(entry, "\t" + str(messages))
|
||||||
|
|
||||||
|
|
||||||
|
class StubAction(Action):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
class ReadAction(Action):
|
||||||
|
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MoveAction(Action):
|
||||||
|
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
files = args[0]
|
||||||
|
destination = self.args[0]
|
||||||
|
if destination:
|
||||||
|
if os.path.exists(destination) and not os.path.isdir(destination):
|
||||||
|
return False, "Not a Directory"
|
||||||
|
elif not os.path.exists(destination):
|
||||||
|
os.makedirs(destination)
|
||||||
|
if isinstance(files, list):
|
||||||
|
for file in files:
|
||||||
|
f = os.path.basename(file)
|
||||||
|
os.rename(file, destination + os.path.sep + f)
|
||||||
|
else:
|
||||||
|
f = os.path.basename(files)
|
||||||
|
os.rename(files, destination + os.path.sep + f)
|
||||||
|
return True, files
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
|
class CopyAction(Action):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
files = args[0]
|
||||||
|
destination = self.args[0]
|
||||||
|
if destination:
|
||||||
|
if os.path.exists(destination) and not os.path.isdir(destination):
|
||||||
|
return False, "Not a Directory"
|
||||||
|
elif not os.path.exists(destination):
|
||||||
|
os.makedirs(destination)
|
||||||
|
if isinstance(files, list):
|
||||||
|
for file in files:
|
||||||
|
f = os.path.basename(file)
|
||||||
|
shutil.copy(file, destination + os.path.sep + f)
|
||||||
|
else:
|
||||||
|
f = os.path.basename(files)
|
||||||
|
shutil.copy(files, destination + os.path.sep + f)
|
||||||
|
return True, files
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
|
class RemoveAction(Action):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
files = args[0]
|
||||||
|
if isinstance(files, list):
|
||||||
|
for file in files:
|
||||||
|
os.remove(file)
|
||||||
|
else:
|
||||||
|
os.remove(files)
|
||||||
|
return True, files
|
||||||
|
# return False, None
|
||||||
|
|
||||||
|
class CompressAction(Action):
|
||||||
|
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
actions = {
|
||||||
|
"LoggMessage": LoggMessageAction,
|
||||||
|
"Stub": StubAction,
|
||||||
|
"Move": MoveAction,
|
||||||
|
"Copy": CopyAction,
|
||||||
|
"Remove": RemoveAction,
|
||||||
|
}
|
||||||
13
app.py
13
app.py
|
|
@ -1,7 +1,7 @@
|
||||||
import os
|
import os
|
||||||
from config import Config
|
from config import Config
|
||||||
from entry import Entry
|
from entry import Entry
|
||||||
from action import *
|
from actions import *
|
||||||
from logger import Logger
|
from logger import Logger
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -13,14 +13,7 @@ class App:
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self._config.read_config()
|
self._config.read_config()
|
||||||
self._config.add_entry(Entry('myTest',
|
|
||||||
IsDirectoryEmptyCondition(path='emptydir', entry='myTest'),
|
|
||||||
LoggMessageAction('success', entry='myTest'),
|
|
||||||
LoggMessageAction('fail', entry='myTest')))
|
|
||||||
|
|
||||||
for entry in self._config:
|
for entry in self._config:
|
||||||
result = entry.action.execute()
|
entry.execute()
|
||||||
if result:
|
|
||||||
entry.success.execute()
|
|
||||||
else:
|
|
||||||
entry.fail.execute()
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,115 @@
|
||||||
|
import datetime
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
import os
|
||||||
|
from logger import Logger
|
||||||
|
from typing import Tuple, List
|
||||||
|
|
||||||
|
|
||||||
|
class Condition(ABC):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def execute(self, *args, **kwargs) -> Tuple[bool, List]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class IfFileExistsCondition(Condition):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
path = None
|
||||||
|
if len(self.args) > 0:
|
||||||
|
path = self.args[0]
|
||||||
|
entry = self.kwargs.get('entry')
|
||||||
|
if not path:
|
||||||
|
Logger.write_error(entry, 'Need a path')
|
||||||
|
else:
|
||||||
|
if os.path.exists(path):
|
||||||
|
return True, path
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
|
class IfDirectoryEmptyCondition(Condition):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
path = None
|
||||||
|
if len(self.args) > 0:
|
||||||
|
path = self.args[0]
|
||||||
|
entry = self.kwargs.get('entry')
|
||||||
|
if not path:
|
||||||
|
Logger.write_error(entry, 'Need a path')
|
||||||
|
else:
|
||||||
|
files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
|
||||||
|
if files:
|
||||||
|
return False, files
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
class IfDirectoryNotEmptyCondition(Condition):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
path = None
|
||||||
|
if len(self.args) > 0:
|
||||||
|
path = self.args[0]
|
||||||
|
entry = self.kwargs.get('entry')
|
||||||
|
if not path:
|
||||||
|
Logger.write_error(entry, 'Need a path')
|
||||||
|
else:
|
||||||
|
files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
|
||||||
|
if files:
|
||||||
|
return True, files
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
|
class GetCreationTimeCondition(Condition):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
path = args[0]
|
||||||
|
result = list()
|
||||||
|
if isinstance(path, list):
|
||||||
|
for p in path:
|
||||||
|
time = datetime.datetime.fromtimestamp(os.path.getctime(p))
|
||||||
|
time = time.strftime("%Y.%m.%d %H:%M:%S")
|
||||||
|
result.append(time)
|
||||||
|
else:
|
||||||
|
time = datetime.datetime.fromtimestamp(os.path.getctime(path))
|
||||||
|
time = time.strftime("%Y.%m.%d %H:%M:%S")
|
||||||
|
result.append(time)
|
||||||
|
return True, result
|
||||||
|
|
||||||
|
|
||||||
|
class IfCreationTimeCondition(Condition):
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
s = self.args[0].split(' ', 1)
|
||||||
|
if len(s) > 1:
|
||||||
|
oper = s[0]
|
||||||
|
time = s[1]
|
||||||
|
search_time = datetime.datetime.strptime(time, "%Y.%m.%d %H:%M:%S")
|
||||||
|
path = args[0]
|
||||||
|
result = list()
|
||||||
|
if isinstance(path, list):
|
||||||
|
for p in path:
|
||||||
|
time = datetime.datetime.fromtimestamp(os.path.getctime(p))
|
||||||
|
# time = time.strftime("%Y.%m.%d %H:%M:%S")
|
||||||
|
if oper == '<' and time < search_time:
|
||||||
|
result.append(p)
|
||||||
|
elif oper == '=' and time == search_time:
|
||||||
|
result.append(p)
|
||||||
|
elif oper == '>' and time > search_time:
|
||||||
|
result.append(p)
|
||||||
|
else:
|
||||||
|
time = datetime.datetime.fromtimestamp(os.path.getctime(path))
|
||||||
|
# time = time.strftime("%Y.%m.%d %H:%M:%S")
|
||||||
|
if oper == '<' and time < search_time:
|
||||||
|
result.append(path)
|
||||||
|
elif oper == '=' and time == search_time:
|
||||||
|
result.append(path)
|
||||||
|
elif oper == '>' and time > search_time:
|
||||||
|
result.append(path)
|
||||||
|
return True, result
|
||||||
|
|
||||||
|
|
||||||
|
conditions = {
|
||||||
|
"IfFileExists": IfFileExistsCondition,
|
||||||
|
"IfDirectoryEmpty": IfDirectoryEmptyCondition,
|
||||||
|
"IfDirectoryNotEmpty": IfDirectoryNotEmptyCondition,
|
||||||
|
"GetCreationTime": GetCreationTimeCondition,
|
||||||
|
"IfCreationTime": IfCreationTimeCondition,
|
||||||
|
}
|
||||||
48
config.py
48
config.py
|
|
@ -1,4 +1,6 @@
|
||||||
from entry import Entry
|
from entry import Entry
|
||||||
|
from actions import *
|
||||||
|
from conditions import *
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
|
|
@ -21,12 +23,44 @@ class Config:
|
||||||
def read_config(self, *args, **kwargs):
|
def read_config(self, *args, **kwargs):
|
||||||
with open('config.txt', 'r') as file:
|
with open('config.txt', 'r') as file:
|
||||||
for line in file:
|
for line in file:
|
||||||
self._create_entry(line)
|
if len(line) < 2:
|
||||||
|
continue
|
||||||
|
if line.startswith('#'):
|
||||||
|
continue
|
||||||
|
entry = self._create_entry(line)
|
||||||
|
self._entries.append(entry)
|
||||||
|
|
||||||
def _create_entry(self, line):
|
def _create_entry(self, line):
|
||||||
splitts = line.split(';')
|
splits = line.split(';')
|
||||||
assert(len(splitts) == 4)
|
assert(len(splits) == 4)
|
||||||
name = splitts[0]
|
name = splits[0]
|
||||||
action, act_param = splitts[1].strip().split(' ')
|
condition_list = self._split_conditions(splits[1], name)
|
||||||
success, suc_param = splitts[2].strip().split(' ', 1)
|
success_list = self._split_actions(splits[2], name)
|
||||||
fail, fail_param = splitts[3].strip().split(' ', 1)
|
fail_list = self._split_actions(splits[3], name)
|
||||||
|
return Entry(name, condition_list, success_list, fail_list)
|
||||||
|
|
||||||
|
def _split_conditions(self, split, entry_name):
|
||||||
|
splits = list()
|
||||||
|
split.strip()
|
||||||
|
for s in split.split(','):
|
||||||
|
s = s.strip()
|
||||||
|
ss = s.split(' ', 1)
|
||||||
|
condition = conditions[ss[0]]
|
||||||
|
argument = None
|
||||||
|
if len(ss) > 1:
|
||||||
|
argument = str(ss[1])
|
||||||
|
splits.append(condition(argument, entry=entry_name))
|
||||||
|
return splits
|
||||||
|
|
||||||
|
def _split_actions(self, split, entry_name):
|
||||||
|
splits = list()
|
||||||
|
split.strip()
|
||||||
|
for s in split.split(','):
|
||||||
|
s = s.strip()
|
||||||
|
ss = s.split(' ', 1)
|
||||||
|
action = actions[ss[0]]
|
||||||
|
argument = None
|
||||||
|
if len(ss) > 1:
|
||||||
|
argument = str(ss[1])
|
||||||
|
splits.append(action(argument, entry=entry_name))
|
||||||
|
return splits
|
||||||
|
|
|
||||||
|
|
@ -1 +1,7 @@
|
||||||
MyTest; IsDirectoryEmpty C:\Users\a.jurcenko\PycharmProjects\logreader\emptydir; LoggMessage Alles Gut; LoggMessage Alles Schlecht
|
# FirstTest; IfFileExists testdata/log.txt, IsFileExists testdata/second.log, IsFileExists testdata/third.log; LoggMessage Alles OK; LoggMessage Nicht Ok
|
||||||
|
# MoveTest; IfFileExists testdata/log.txt; Move emptydir; LoggMessage Somthing went wrong
|
||||||
|
# MoveAllFiles; IsDirectoryEmpty testdata; LoggMessage Nop; Copy emptydir
|
||||||
|
# RemoveTest; IfFileExists testdata/third.log; Remove; LoggMessage Not Removed
|
||||||
|
# RemoveFilesDirectoryTest; IfDirectoryEmpty test; Stub; Remove
|
||||||
|
CreationTimeTest; IfDirectoryNotEmpty testdata, GetCreationTime; LoggMessage Creation Time; Stub
|
||||||
|
IfCreationTimeTest; IfDirectoryNotEmpty testdata, IfCreationTime > 2023.08.15 20:09:39; LoggMessage IfCreation Time; Stub
|
||||||
24
entry.py
24
entry.py
|
|
@ -1,22 +1,36 @@
|
||||||
from action import Action, Condition
|
from actions import Action
|
||||||
|
from conditions import Condition
|
||||||
|
|
||||||
|
|
||||||
class Entry:
|
class Entry:
|
||||||
|
|
||||||
def __init__(self, name: str, condition: Condition, success: Action, fail: Action):
|
def __init__(self, name: str, condition: list[Condition], success: list[Action], fail: list[Action]):
|
||||||
self._name = name
|
self._name = name
|
||||||
self._action = condition
|
self._conditions = condition
|
||||||
self._success = success
|
self._success = success
|
||||||
self._fail = fail
|
self._fail = fail
|
||||||
|
|
||||||
|
def execute(self):
|
||||||
|
result, output = True, None
|
||||||
|
for con in self._conditions:
|
||||||
|
res, output = con.execute(output)
|
||||||
|
if not res:
|
||||||
|
result = False
|
||||||
|
break
|
||||||
|
if result:
|
||||||
|
for s in self._success:
|
||||||
|
s.execute(output)
|
||||||
|
else:
|
||||||
|
for f in self._fail:
|
||||||
|
f.execute(output)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def name(self):
|
||||||
return self._name
|
return self._name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def action(self):
|
def condition(self):
|
||||||
return self._action
|
return self._conditions
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def success(self):
|
def success(self):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue