297 lines
11 KiB
Python
297 lines
11 KiB
Python
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QPushButton,
|
|
QProgressDialog, QFileDialog, QMessageBox)
|
|
from PySide6.QtCore import Qt
|
|
from database import get_session
|
|
from models import Document, DocumentVersion, Folder, Tag
|
|
import os
|
|
import json
|
|
import shutil
|
|
import zipfile
|
|
from datetime import datetime
|
|
|
|
class BackupManager(QWidget):
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self.setup_ui()
|
|
|
|
def setup_ui(self):
|
|
layout = QVBoxLayout(self)
|
|
|
|
# Export button
|
|
self.export_btn = QPushButton("Exportar Documentos")
|
|
self.export_btn.clicked.connect(self.export_documents)
|
|
layout.addWidget(self.export_btn)
|
|
|
|
# Import button
|
|
self.import_btn = QPushButton("Importar Documentos")
|
|
self.import_btn.clicked.connect(self.import_documents)
|
|
layout.addWidget(self.import_btn)
|
|
|
|
def export_documents(self):
|
|
export_dir = QFileDialog.getExistingDirectory(
|
|
self,
|
|
"Selecionar Pasta para Exportação"
|
|
)
|
|
|
|
if not export_dir:
|
|
return
|
|
|
|
try:
|
|
session = get_session()
|
|
|
|
# Create export directory structure
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
export_path = os.path.join(export_dir, f"doc_export_{timestamp}")
|
|
os.makedirs(export_path)
|
|
|
|
# Create progress dialog
|
|
documents = session.query(Document).all()
|
|
progress = QProgressDialog(
|
|
"Exportando documentos...",
|
|
"Cancelar",
|
|
0,
|
|
len(documents),
|
|
self
|
|
)
|
|
progress.setWindowModality(Qt.WindowModal)
|
|
|
|
# Export metadata
|
|
metadata = {
|
|
'documents': [],
|
|
'folders': [],
|
|
'tags': []
|
|
}
|
|
|
|
# Export documents and their versions
|
|
for i, doc in enumerate(documents):
|
|
if progress.wasCanceled():
|
|
break
|
|
|
|
# Create document directory
|
|
doc_dir = os.path.join(export_path, f"doc_{doc.id}")
|
|
os.makedirs(doc_dir)
|
|
|
|
# Copy main document
|
|
if os.path.exists(doc.file_path):
|
|
shutil.copy2(doc.file_path,
|
|
os.path.join(doc_dir, os.path.basename(doc.file_path)))
|
|
|
|
# Copy versions
|
|
versions_dir = os.path.join(doc_dir, "versions")
|
|
os.makedirs(versions_dir)
|
|
for version in doc.versions:
|
|
if os.path.exists(version.file_path):
|
|
shutil.copy2(
|
|
version.file_path,
|
|
os.path.join(versions_dir, os.path.basename(version.file_path))
|
|
)
|
|
|
|
# Add document metadata
|
|
doc_data = {
|
|
'id': doc.id,
|
|
'file_name': doc.file_name,
|
|
'marca': doc.marca,
|
|
'modelo': doc.modelo,
|
|
'ano': doc.ano,
|
|
'cilindrada': doc.cilindrada,
|
|
'codigo_motor': doc.codigo_motor,
|
|
'tipo_documento': doc.tipo_documento,
|
|
'variante': doc.variante,
|
|
'observacoes': doc.observacoes,
|
|
'folder_id': doc.folder_id,
|
|
'tags': [tag.name for tag in doc.tags],
|
|
'versions': [{
|
|
'version_number': v.version_number,
|
|
'file_name': os.path.basename(v.file_path),
|
|
'changes': v.changes,
|
|
'created_at': v.created_at.isoformat()
|
|
} for v in doc.versions]
|
|
}
|
|
metadata['documents'].append(doc_data)
|
|
|
|
progress.setValue(i + 1)
|
|
|
|
# Export folders
|
|
folders = session.query(Folder).all()
|
|
for folder in folders:
|
|
folder_data = {
|
|
'id': folder.id,
|
|
'name': folder.name,
|
|
'parent_id': folder.parent_id
|
|
}
|
|
metadata['folders'].append(folder_data)
|
|
|
|
# Export tags
|
|
tags = session.query(Tag).all()
|
|
for tag in tags:
|
|
tag_data = {
|
|
'name': tag.name,
|
|
'color': tag.color
|
|
}
|
|
metadata['tags'].append(tag_data)
|
|
|
|
# Save metadata
|
|
with open(os.path.join(export_path, 'metadata.json'), 'w',
|
|
encoding='utf-8') as f:
|
|
json.dump(metadata, f, ensure_ascii=False, indent=2)
|
|
|
|
# Create ZIP archive
|
|
zip_path = f"{export_path}.zip"
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
for root, _, files in os.walk(export_path):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.relpath(file_path, export_path)
|
|
zipf.write(file_path, arcname)
|
|
|
|
# Clean up temporary directory
|
|
shutil.rmtree(export_path)
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Sucesso",
|
|
f"Exportação concluída com sucesso!\nArquivo: {zip_path}"
|
|
)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Erro",
|
|
f"Erro durante a exportação: {str(e)}")
|
|
finally:
|
|
session.close()
|
|
|
|
def import_documents(self):
|
|
zip_path, _ = QFileDialog.getOpenFileName(
|
|
self,
|
|
"Selecionar Arquivo de Importação",
|
|
"",
|
|
"Arquivos ZIP (*.zip)"
|
|
)
|
|
|
|
if not zip_path:
|
|
return
|
|
|
|
try:
|
|
session = get_session()
|
|
|
|
# Create temporary directory for extraction
|
|
temp_dir = os.path.join(os.path.dirname(zip_path), "temp_import")
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
# Extract ZIP file
|
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
|
zipf.extractall(temp_dir)
|
|
|
|
# Read metadata
|
|
with open(os.path.join(temp_dir, 'metadata.json'), 'r',
|
|
encoding='utf-8') as f:
|
|
metadata = json.load(f)
|
|
|
|
# Import tags
|
|
for tag_data in metadata['tags']:
|
|
if not session.query(Tag)\
|
|
.filter(Tag.name == tag_data['name']).first():
|
|
tag = Tag(
|
|
name=tag_data['name'],
|
|
color=tag_data['color']
|
|
)
|
|
session.add(tag)
|
|
|
|
# Import folders
|
|
for folder_data in metadata['folders']:
|
|
if not session.query(Folder)\
|
|
.filter(Folder.id == folder_data['id']).first():
|
|
folder = Folder(
|
|
id=folder_data['id'],
|
|
name=folder_data['name'],
|
|
parent_id=folder_data['parent_id']
|
|
)
|
|
session.add(folder)
|
|
|
|
# Create progress dialog
|
|
progress = QProgressDialog(
|
|
"Importando documentos...",
|
|
"Cancelar",
|
|
0,
|
|
len(metadata['documents']),
|
|
self
|
|
)
|
|
progress.setWindowModality(Qt.WindowModal)
|
|
|
|
# Import documents
|
|
for i, doc_data in enumerate(metadata['documents']):
|
|
if progress.wasCanceled():
|
|
break
|
|
|
|
# Check if document already exists
|
|
existing_doc = session.query(Document)\
|
|
.filter(Document.file_name == doc_data['file_name'])\
|
|
.filter(Document.folder_id == doc_data['folder_id'])\
|
|
.first()
|
|
|
|
if not existing_doc:
|
|
# Copy document file
|
|
doc_dir = os.path.join(temp_dir, f"doc_{doc_data['id']}")
|
|
original_file = os.path.join(doc_dir, doc_data['file_name'])
|
|
|
|
if os.path.exists(original_file):
|
|
# Create new document
|
|
new_doc = Document(
|
|
file_name=doc_data['file_name'],
|
|
file_path=original_file,
|
|
marca=doc_data['marca'],
|
|
modelo=doc_data['modelo'],
|
|
ano=doc_data['ano'],
|
|
cilindrada=doc_data['cilindrada'],
|
|
codigo_motor=doc_data['codigo_motor'],
|
|
tipo_documento=doc_data['tipo_documento'],
|
|
variante=doc_data['variante'],
|
|
observacoes=doc_data['observacoes'],
|
|
folder_id=doc_data['folder_id']
|
|
)
|
|
|
|
# Add tags
|
|
for tag_name in doc_data['tags']:
|
|
tag = session.query(Tag)\
|
|
.filter(Tag.name == tag_name).first()
|
|
if tag:
|
|
new_doc.tags.append(tag)
|
|
|
|
session.add(new_doc)
|
|
session.flush() # Get new_doc.id
|
|
|
|
# Import versions
|
|
versions_dir = os.path.join(doc_dir, "versions")
|
|
if os.path.exists(versions_dir):
|
|
for version_data in doc_data['versions']:
|
|
version_file = os.path.join(
|
|
versions_dir,
|
|
version_data['file_name']
|
|
)
|
|
if os.path.exists(version_file):
|
|
new_version = DocumentVersion(
|
|
document_id=new_doc.id,
|
|
version_number=version_data['version_number'],
|
|
file_path=version_file,
|
|
changes=version_data['changes'],
|
|
created_at=datetime.fromisoformat(
|
|
version_data['created_at'])
|
|
)
|
|
session.add(new_version)
|
|
|
|
progress.setValue(i + 1)
|
|
|
|
session.commit()
|
|
|
|
# Clean up
|
|
shutil.rmtree(temp_dir)
|
|
|
|
QMessageBox.information(self, "Sucesso",
|
|
"Importação concluída com sucesso!")
|
|
|
|
except Exception as e:
|
|
session.rollback()
|
|
QMessageBox.critical(self, "Erro",
|
|
f"Erro durante a importação: {str(e)}")
|
|
finally:
|
|
session.close()
|