launcher/task.py

86 lines
2.2 KiB
Python

# Copyright (c) 20.09.25, 14:01. a.jurcenko@inproduct.de
import abc
import subprocess
import requests
class Task(abc.ABC):
def __init__(self, name=None, description=None, destination=None, arguments=None, on_success=None, on_failure=None):
self._name = name
self._description = description
self._destination = destination
self._arguments = arguments
self._on_success = on_success
self._on_failure = on_failure
def get_name(self):
return self._name
def get_description(self):
return self._description
def get_destination(self):
return self._destination
def get_arguments(self):
return self._arguments
def get_on_success(self):
return self._on_success
def get_on_failure(self):
return self._on_failure
def set_name(self, name):
self._name = name
def set_description(self, description):
self._description = description
def set_destination(self, destination):
self._destination = destination
def set_arguments(self, arguments):
self._arguments = arguments
def set_on_success(self, on_success):
self._on_success = on_success
def set_on_failure(self, on_failure):
self._on_failure = on_failure
@abc.abstractmethod
def execute(self):
print(self._name)
class ConsolePrintTask(Task):
def execute(self):
print(self._name, 'success')
class SystemTask(Task):
def execute(self):
super().execute()
res = subprocess.run([self._destination, self._arguments], text=True)
if res.returncode == 0:
if self._on_success is not None:
self._on_success()
else:
if self._on_failure is not None:
self._on_failure()
class GETTask(Task):
#todo: HTTP Task mit möglichkeit schema -> get/post/put/head auszusuchen
def execute(self):
super().execute()
res = requests.get(self._destination)
if res.status_code == 200:
if self._on_success is not None:
self._on_success()
else:
if self._on_failure is not None:
self._on_failure()