Zum Inhalt springen

Automatische Datenablage auf Dateisystem - Python


Empfohlene Beiträge

Hallo zusammen,

ich habe mir mit dem schon bestehenden PowerShell Skript und ein wenig ChatGPT eine Vorlage für ein Python Skript zusammengestellt. Dieses Skript sorgt dafür, dass die verschiedenen Datentypen Downloaden lässt. An sich erstmal wie das PowerShell Skript. Ich habe nur noch ein paar Funktionen hinzugefügt.

1. Für jeden Datentyp kann ausgewählt werden, ob ein Unterordner erzeugt werden soll, den man mit Bausteinen benennen kann.

2. Man kann festlegen ob überhaupt überprüft wird, ob der jeweilige Datensatz schon als Exportiert Markiert wurde, auch wenn man ihn neu markiert. So kann man exportieren und trotzdem neu Markieren.

3. Bausteinreferenzen funktionieren mit einfacher Syntax: {Bausteinname}

4. Auch Inhalte von Wiederholungsgruppen, lassen sich zu Inhalten im Dateinamen oder im Pfad machen per Referenz.

 

Ich hoffe irgendjemandem hilft dieses Skript.

 

import requests
import os
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path

# Konfiguration für die API und Pfade
config = {
    "smapId": "- Platzhalter für Smap-ID -",  # Die ID der Smap, die in der API verwendet wird.
    "token": "- Platzhalter für API-Token -",  # Das API-Token für die Authentifizierung bei den API-Anfragen.
    "exportJSON": True,  # Ob JSON exportiert werden soll. Wenn True, wird der JSON-Export aktiviert.
    "exportJSONPath": "- Platzhalter für JSON-Pfad -",  # Fester Pfad für den JSON-Export.
    "exportJSONName": "JSON_{record_id}.json",  # Dateiname für den JSON-Export, bei dem Platzhalter verwendet werden können.
    "exportJSONUseSubfolder": False,  # Wenn True, wird ein Unterordner für den JSON-Export erstellt.
    "exportJSONSubfolderName": "- Platzhalter für den JSON-Unterordner -",  # Name des Unterordners für JSON-Exporte.
    "exportPDF": True,  # Ob PDF exportiert werden soll. Wenn True, wird der PDF-Export aktiviert.
    "exportPDFPath": "- Platzhalter für PDF-Pfad -",  # Fester Pfad für den PDF-Export.
    "exportPDFNameUsePlattform": False,  # Ob der PDF-Dateiname von der Plattform verwendet wird.
    "exportPDFName": "{Name}.pdf",  # Benutzerdefinierter PDF-Name mit Platzhaltern.
    "exportPDFUseSubfolder": False,  # Wenn True, wird ein Unterordner für den PDF-Export erstellt.
    "exportPDFSubfolderName": "- Platzhalter für PDF Unterordner -",  # Name des Unterordners für PDF-Exporte.
    "exportAssets": True,  # Ob Assets exportiert werden sollen. Wenn True, wird der Asset-Export aktiviert.
    "exportAssetsPath": "- Platzhalter für Asset-Pfad -",  # Fester Pfad für den Asset-Export.
    "exportAssetsUseSubfolder": True,  # Wenn True, wird ein Unterordner für den Asset-Export erstellt.
    "exportAssetsSubfolderName": "{Name}",  # Name des Unterordners für Asset-Exporte.
    "deleteAfterExport": False,  # Ob die Datensätze nach dem Export gelöscht werden sollen.
    "markAsExported": False,  # Ob die Datensätze als exportiert markiert werden sollen.
    "checkIfExported": False,  # Wenn True, wird überprüft, ob der Datensatz bereits exportiert wurde (basierend auf 'lastExportDate').
    "logFilePath": "- Platzhalter für Logfile-Pfad -",  # Pfad für das Logfile.
    "logFileMaxSize": 5 * 1024 * 1024,  # Max. Größe des Logfiles in Bytes (hier 5 MB).
    "logFileBackupCount": 3,  # Anzahl der Backup-Logfiles.
    "repeatingGroups": [  # Liste der zu verwendenden Wiederholungsgruppen. Mehrere Gruppen können definiert werden.
    ],
}

# Logging-Konfiguration mit RotatingFileHandler
log_directory = os.path.dirname(config["logFilePath"])
os.makedirs(log_directory, exist_ok=True)
log_handler = RotatingFileHandler(
    config["logFilePath"],
    maxBytes=config["logFileMaxSize"],
    backupCount=config["logFileBackupCount"]
)
logging.basicConfig(
    handlers=[log_handler],
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

def log(message):
    """Protokolliert eine Nachricht sowohl in der Konsole als auch in der Log-Datei."""
    print(message)
    logging.info(message)

def download_file(url, path, filename):
    """Lade eine Datei von der angegebenen URL herunter und speichere sie."""
    # Trimmen von Leerzeichen aus Pfaden und Dateinamen
    path = path.strip()
    filename = filename.strip()
    
    os.makedirs(path, exist_ok=True)
    file_path = os.path.join(path, filename)
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(file_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
        log(f"Datei erfolgreich heruntergeladen: {file_path}")
    except requests.RequestException as e:
        log(f"Fehler beim Herunterladen der Datei: {e}")

def format_filename(template, record):
    """Ersetze Platzhalter im Dateinamen basierend auf den Daten im Datensatz."""
    formatted_name = template.strip()

    # Ersetze einfache Platzhalter, z.B. {Project}
    for key, value in record["data"].items():
        if not isinstance(value, list):  # Nur einfache Werte
            formatted_name = formatted_name.replace(f"{{{key}}}", str(value))

    # Ersetze Platzhalter für Wiederholungsgruppen
    for group in config["repeatingGroups"]:
        group_name = group["name"]
        group_field = group["field"]
        use_first_only = group["useFirstOnly"]

        if group_name in record["data"]:
            group_data = record["data"][group_name]
            if use_first_only:
                first_value = group_data[0].get(group_field, "") if group_data else ""
                formatted_name = formatted_name.replace(f"{{{group_name}_{group_field}}}", first_value)
            else:
                values = [entry.get(group_field, "") for entry in group_data]
                value_string = "_".join(values)
                formatted_name = formatted_name.replace(f"{{{group_name}_{group_field}}}", value_string)

    # Füge die ID als Standardplatzhalter ein
    formatted_name = formatted_name.replace("{id}", record["id"])

    return formatted_name

def should_skip_record(record):
    """Überprüft, ob der Datensatz bereits exportiert wurde, wenn 'checkIfExported' True ist."""
    if config["checkIfExported"]:
        last_export_date = record.get("lastExportDate", None)
        return last_export_date is not None
    return False

def delete_record(record_id, version):
    """Lösche einen Datensatz über die API, wenn 'deleteAfterExport' True ist."""
    if config["deleteAfterExport"]:
        delete_url = f"https://platform.smapone.com/backend/intern/Smaps/{config['smapId']}/Versions/{version}/Data/{record_id}?accesstoken={config['token']}"
        try:
            response = requests.delete(delete_url)
            if response.status_code == 204:
                log(f"Datensatz {record_id} wurde erfolgreich gelöscht.")
            else:
                log(f"Fehler beim Löschen des Datensatzes {record_id}: {response.status_code}")
        except requests.RequestException as e:
            log(f"Fehler bei der Anfrage zum Löschen des Datensatzes {record_id}: {e}")

def get_final_export_path(base_path, use_subfolder, subfolder_template, record):
    """Bestimmt den endgültigen Exportpfad, abhängig davon, ob ein Unterordner verwendet wird."""
    if use_subfolder:
        # Formatieren des Ordnernamens mit Platzhaltern
        subfolder_name = format_filename(subfolder_template, record)
        final_path = os.path.join(base_path, subfolder_name.strip())
    else:
        final_path = base_path.strip()
    os.makedirs(final_path, exist_ok=True)
    return final_path

def process_record(record):
    """Verarbeite den einzelnen Datensatz."""
    
    # Exportiere JSON, wenn aktiviert
    if config["exportJSON"]:
        json_path = get_final_export_path(config["exportJSONPath"], config["exportJSONUseSubfolder"], config["exportJSONSubfolderName"], record)
        json_url = (
            f"https://platform.smapone.com/backend/intern/Smaps/{record['smapId']}/Versions/{record['version']}"
            f"/Data/{record['id']}.json?markAsExported={str(config['markAsExported']).lower()}&useDefault=false&accesstoken={config['token']}"
        )
        json_filename = format_filename(config["exportJSONName"], record)
        download_file(json_url, json_path, json_filename)

    # PDF-Export
    if config["exportPDF"]:
        pdf_path = get_final_export_path(config["exportPDFPath"], config["exportPDFUseSubfolder"], config["exportPDFSubfolderName"], record)
        pdf_url = (
            f"https://platform.smapone.com/backend/intern/Smaps/{record['smapId']}/Versions/{record['version']}"
            f"/Data/{record['id']}.Pdf?markAsExported={str(config['markAsExported']).lower()}&useDefault=false&accesstoken={config['token']}"
        )
        pdf_filename_template = config["exportPDFName"]
        pdf_filename = format_filename(pdf_filename_template, record)
        download_file(pdf_url, pdf_path, pdf_filename)

    # Exportiere Assets, wenn aktiviert
    if config["exportAssets"]:
        assets_path = get_final_export_path(config["exportAssetsPath"], config["exportAssetsUseSubfolder"], config["exportAssetsSubfolderName"], record)
        assets_url = f"https://platform.smapone.com/backend/intern/Smaps/{record['smapId']}/Versions/{record['version']}/Data/{record['id']}/Files?accesstoken={config['token']}"
        response = requests.get(assets_url)
        if response.status_code == 200:
            assets = response.json()
            for asset in assets:
                asset_url = (
                    f"https://platform.smapone.com/backend/intern/Smaps/{config['smapId']}/Versions/{record['version']}"
                    f"/Data/{record['id']}/Files/{asset['fileId']}?accesstoken={config['token']}"
                )
                asset_filename = asset["fileName"].strip()
                download_file(asset_url, assets_path, asset_filename)

    # Lösche den Datensatz, falls aktiviert
    delete_record(record["id"], record["version"])

def fetch_records():
    """Rufe die Datensätze von der API ab."""
    url = f"https://platform.smapone.com/backend/intern/Smaps/{config['smapId']}/Data?markAsExported=false&format=Json&accessToken={config['token']}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        records = response.json()
        if not records:
            log("Die Antwort enthält keine Datensätze.")
            return []
        log(f"Anzahl der Datensätze gefunden: {len(records)}")
        return records
    except requests.RequestException as e:
        log(f"Fehler bei der Webanfrage: {e}")
        return []

def main():
    """Hauptfunktion zum Ausführen des Skripts."""
    log("Start Script")
    records = fetch_records()
    if records:
        for record in records:
            process_record(record)
    log("Script beendet")

if __name__ == "__main__":
    main()

 

  • Danke 1
Link zu diesem Kommentar
Auf anderen Seiten teilen

Bitte melde Dich an, um einen Kommentar zu hinterlassen

Du kannst nach der Anmeldung einen Kommentar hinterlassen



Jetzt anmelden
×
×
  • Neu erstellen...