166 lines
6.6 KiB
Python
166 lines
6.6 KiB
Python
import subprocess
|
|
import logging
|
|
import json
|
|
import argparse
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# Configuration du logging
|
|
log_file = "/var/log/nextcloud_admin_sync.log"
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger("admin_sync")
|
|
|
|
def run_command(command):
|
|
"""Exécute une commande shell et retourne le résultat"""
|
|
logger.debug(f"Exécution de la commande: {command}")
|
|
try:
|
|
result = subprocess.run(command, shell=True, check=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True)
|
|
return result.stdout.strip()
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Erreur lors de l'exécution de la commande: {e}")
|
|
logger.error(f"Sortie d'erreur: {e.stderr}")
|
|
return None
|
|
|
|
def get_nextcloud_groups(container_name):
|
|
"""Récupère tous les groupes Nextcloud avec leurs membres"""
|
|
cmd = f"docker exec -u 33 {container_name} php occ group:list --output=json"
|
|
output = run_command(cmd)
|
|
if output:
|
|
try:
|
|
return json.loads(output)
|
|
except json.JSONDecodeError:
|
|
logger.error("Impossible de décoder la sortie JSON des groupes")
|
|
return {}
|
|
return {}
|
|
|
|
def get_group_members(groups_data, group_name):
|
|
"""Récupère les membres d'un groupe spécifique à partir des données de group:list"""
|
|
if group_name in groups_data:
|
|
return groups_data[group_name]
|
|
return []
|
|
|
|
def get_admin_groups(container_name, groups):
|
|
"""Identifie les groupes admin"""
|
|
admin_groups = []
|
|
for group_name in groups:
|
|
# Vérifier si le nom du groupe est 'admin'
|
|
if group_name == 'admin':
|
|
admin_groups.append(group_name)
|
|
else:
|
|
# Vérifier si le displayName du groupe est 'admin'
|
|
cmd = f"docker exec -u 33 {container_name} php occ group:info {group_name} --output=json"
|
|
output = run_command(cmd)
|
|
if output:
|
|
try:
|
|
group_info = json.loads(output)
|
|
if 'displayName' in group_info and group_info['displayName'] == 'admin':
|
|
admin_groups.append(group_name)
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Impossible de décoder les infos du groupe {group_name}")
|
|
|
|
return admin_groups
|
|
|
|
def get_current_admins(container_name):
|
|
"""Récupère la liste actuelle des administrateurs en utilisant la commande group:list pour le groupe admin"""
|
|
cmd = f"docker exec -u 33 {container_name} php occ group:list --output=json"
|
|
output = run_command(cmd)
|
|
if output:
|
|
try:
|
|
groups_data = json.loads(output)
|
|
# Le groupe "admin" contient les administrateurs Nextcloud
|
|
if "admin" in groups_data:
|
|
return groups_data["admin"]
|
|
else:
|
|
logger.warning("Groupe 'admin' non trouvé dans Nextcloud")
|
|
return []
|
|
except json.JSONDecodeError:
|
|
logger.error("Impossible de décoder la sortie JSON des groupes")
|
|
return []
|
|
|
|
def set_admin_status(container_name, username, is_admin=True):
|
|
"""Définit ou révoque le statut d'administrateur pour un utilisateur en ajoutant/retirant l'utilisateur du groupe admin"""
|
|
if is_admin:
|
|
cmd = f"docker exec -u 33 {container_name} php occ group:adduser admin {username}"
|
|
status = "accordé"
|
|
else:
|
|
cmd = f"docker exec -u 33 {container_name} php occ group:removeuser admin {username}"
|
|
status = "révoqué"
|
|
|
|
output = run_command(cmd)
|
|
if output is not None:
|
|
logger.info(f"Statut d'administrateur {status} pour {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Impossible de {status} le statut d'administrateur pour {username}")
|
|
return False
|
|
|
|
def main(container_name):
|
|
logger.info("=== Début de la synchronisation des administrateurs ===")
|
|
|
|
# Récupération de tous les groupes avec leurs membres
|
|
groups_data = get_nextcloud_groups(container_name)
|
|
if not groups_data:
|
|
logger.error("Impossible de récupérer les groupes Nextcloud")
|
|
return False
|
|
|
|
groups = list(groups_data.keys())
|
|
logger.info(f"Groupes trouvés: {len(groups)}")
|
|
|
|
# Identification des groupes admin
|
|
admin_groups = get_admin_groups(container_name, groups)
|
|
logger.info(f"Groupes admin identifiés: {admin_groups}")
|
|
|
|
# Récupération des utilisateurs dans les groupes admin
|
|
should_be_admins = set()
|
|
for group in admin_groups:
|
|
members = get_group_members(groups_data, group)
|
|
logger.info(f"Groupe {group}: {len(members)} membres")
|
|
should_be_admins.update(members)
|
|
|
|
logger.info(f"Utilisateurs qui devraient être administrateurs: {len(should_be_admins)}")
|
|
|
|
# Récupération des administrateurs actuels
|
|
current_admins = get_current_admins(container_name)
|
|
logger.info(f"Administrateurs actuels: {len(current_admins)}")
|
|
|
|
# Attribution des droits admin aux nouveaux admins
|
|
for username in should_be_admins:
|
|
if username not in current_admins:
|
|
logger.info(f"Attribution des droits admin à {username}")
|
|
set_admin_status(container_name, username, True)
|
|
|
|
# Révocation des droits admin pour ceux qui ne sont plus dans un groupe admin
|
|
for username in current_admins:
|
|
if username not in should_be_admins:
|
|
logger.info(f"Révocation des droits admin pour {username}")
|
|
set_admin_status(container_name, username, False)
|
|
|
|
# Résumé
|
|
new_admins = get_current_admins(container_name)
|
|
logger.info("=== Résumé de la synchronisation ===")
|
|
logger.info(f"Groupes admin: {', '.join(admin_groups)}")
|
|
logger.info(f"Utilisateurs dans des groupes admin: {len(should_be_admins)}")
|
|
logger.info(f"Administrateurs avant synchronisation: {len(current_admins)}")
|
|
logger.info(f"Administrateurs après synchronisation: {len(new_admins)}")
|
|
logger.info(f"Ajouts: {len(should_be_admins - set(current_admins))}")
|
|
logger.info(f"Suppressions: {len(set(current_admins) - should_be_admins)}")
|
|
logger.info("=== Fin de la synchronisation ===")
|
|
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description='Synchronise les administrateurs Nextcloud avec Keycloak')
|
|
parser.add_argument('--container', default='neah-nextcloud',
|
|
help='Nom du conteneur Nextcloud (défaut: neah-nextcloud)')
|
|
args = parser.parse_args()
|
|
|
|
main(args.container) |