import subprocess import logging import json import argparse import sys from datetime import datetime # Configuration du logging log_file = "/var/log/nextcloud_admin_sync.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger("admin_sync") def run_command(command): """Exécute une commande shell et retourne le résultat""" logger.debug(f"Exécution de la commande: {command}") try: result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return result.stdout.strip() except subprocess.CalledProcessError as e: logger.error(f"Erreur lors de l'exécution de la commande: {e}") logger.error(f"Sortie d'erreur: {e.stderr}") return None def get_nextcloud_groups(container_name): """Récupère tous les groupes Nextcloud avec leurs membres""" cmd = f"docker exec -u 33 {container_name} php occ group:list --output=json" output = run_command(cmd) if output: try: return json.loads(output) except json.JSONDecodeError: logger.error("Impossible de décoder la sortie JSON des groupes") return {} return {} def get_group_members(groups_data, group_name): """Récupère les membres d'un groupe spécifique à partir des données de group:list""" if group_name in groups_data: return groups_data[group_name] return [] def get_admin_groups(container_name, groups): """Identifie les groupes admin""" admin_groups = [] for group_name in groups: # Vérifier si le nom du groupe est 'admin' if group_name == 'admin': admin_groups.append(group_name) else: # Vérifier si le displayName du groupe est 'admin' cmd = f"docker exec -u 33 {container_name} php occ group:info {group_name} --output=json" output = run_command(cmd) if output: try: group_info = json.loads(output) if 'displayName' in group_info and group_info['displayName'] == 'admin': admin_groups.append(group_name) except json.JSONDecodeError: logger.error(f"Impossible de décoder les infos du groupe {group_name}") return admin_groups def get_current_admins(container_name): """Récupère la liste actuelle des administrateurs en utilisant la commande group:list pour le groupe admin""" cmd = f"docker exec -u 33 {container_name} php occ group:list --output=json" output = run_command(cmd) if output: try: groups_data = json.loads(output) # Le groupe "admin" contient les administrateurs Nextcloud if "admin" in groups_data: return groups_data["admin"] else: logger.warning("Groupe 'admin' non trouvé dans Nextcloud") return [] except json.JSONDecodeError: logger.error("Impossible de décoder la sortie JSON des groupes") return [] def set_admin_status(container_name, username, is_admin=True): """Définit ou révoque le statut d'administrateur pour un utilisateur en ajoutant/retirant l'utilisateur du groupe admin""" if is_admin: cmd = f"docker exec -u 33 {container_name} php occ group:adduser admin {username}" status = "accordé" else: cmd = f"docker exec -u 33 {container_name} php occ group:removeuser admin {username}" status = "révoqué" output = run_command(cmd) if output is not None: logger.info(f"Statut d'administrateur {status} pour {username}") return True else: logger.error(f"Impossible de {status} le statut d'administrateur pour {username}") return False def main(container_name): logger.info("=== Début de la synchronisation des administrateurs ===") # Récupération de tous les groupes avec leurs membres groups_data = get_nextcloud_groups(container_name) if not groups_data: logger.error("Impossible de récupérer les groupes Nextcloud") return False groups = list(groups_data.keys()) logger.info(f"Groupes trouvés: {len(groups)}") # Identification des groupes admin admin_groups = get_admin_groups(container_name, groups) logger.info(f"Groupes admin identifiés: {admin_groups}") # Récupération des utilisateurs dans les groupes admin should_be_admins = set() for group in admin_groups: members = get_group_members(groups_data, group) logger.info(f"Groupe {group}: {len(members)} membres") should_be_admins.update(members) logger.info(f"Utilisateurs qui devraient être administrateurs: {len(should_be_admins)}") # Récupération des administrateurs actuels current_admins = get_current_admins(container_name) logger.info(f"Administrateurs actuels: {len(current_admins)}") # Attribution des droits admin aux nouveaux admins for username in should_be_admins: if username not in current_admins: logger.info(f"Attribution des droits admin à {username}") set_admin_status(container_name, username, True) # Révocation des droits admin pour ceux qui ne sont plus dans un groupe admin for username in current_admins: if username not in should_be_admins: logger.info(f"Révocation des droits admin pour {username}") set_admin_status(container_name, username, False) # Résumé new_admins = get_current_admins(container_name) logger.info("=== Résumé de la synchronisation ===") logger.info(f"Groupes admin: {', '.join(admin_groups)}") logger.info(f"Utilisateurs dans des groupes admin: {len(should_be_admins)}") logger.info(f"Administrateurs avant synchronisation: {len(current_admins)}") logger.info(f"Administrateurs après synchronisation: {len(new_admins)}") logger.info(f"Ajouts: {len(should_be_admins - set(current_admins))}") logger.info(f"Suppressions: {len(set(current_admins) - should_be_admins)}") logger.info("=== Fin de la synchronisation ===") return True if __name__ == "__main__": parser = argparse.ArgumentParser(description='Synchronise les administrateurs Nextcloud avec Keycloak') parser.add_argument('--container', default='neah-nextcloud', help='Nom du conteneur Nextcloud (défaut: neah-nextcloud)') args = parser.parse_args() main(args.container)