diff --git a/.gitignore b/.gitignore index 0ebf97c..1104f41 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,3 @@ emails.txt *.keys.txt api_keys_database.json logs/ -gemini_key_manager copy/ \ No newline at end of file diff --git a/gemini_key_manager/__init__.py b/gemini_key_manager/__init__.py index 6166aa8..e69de29 100644 --- a/gemini_key_manager/__init__.py +++ b/gemini_key_manager/__init__.py @@ -1 +0,0 @@ -"""Initializes the gemini_key_manager package.""" diff --git a/gemini_key_manager/actions.py b/gemini_key_manager/actions.py index afbd85e..65d2bbc 100644 --- a/gemini_key_manager/actions.py +++ b/gemini_key_manager/actions.py @@ -1,15 +1,18 @@ -"""This module contains the core functions that perform actions on GCP projects.""" -import os +""" +Core action functions for the Gemini Key Management script. +""" import logging import threading +import time import concurrent.futures from datetime import datetime, timezone from google.api_core import exceptions as google_exceptions from google.cloud import resourcemanager_v3, api_keys_v2 -from . import config, gcp_api, database +from . import config, gcp_api, database, utils +from .exceptions import TermsOfServiceNotAcceptedError +# Helper class to create a mock key object compatible with add_key_to_database class TempKey: - """A temporary container for key data to ensure compatibility with database functions.""" def __init__(self, cloud_key, key_string): self.key_string = key_string self.uid = cloud_key.uid @@ -19,19 +22,37 @@ class TempKey: self.update_time = cloud_key.update_time self.restrictions = cloud_key.restrictions -def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): - """ - Compares the API keys in a GCP project with the local database and syncs them. - - This function will: - 1. Fetch all keys from the GCP project. - 2. Fetch all keys for the project from the local database. - 3. Add keys that only exist in GCP to the local database. - 4. Mark keys as INACTIVE in the local database if they no longer exist in GCP. +class TosAcceptanceHelper: + """Helper class to manage the interactive ToS acceptance process using an Event.""" + def __init__(self): + self.lock = threading.Lock() + self.prompted_event = threading.Event() + self.prompt_in_progress = False - Returns: - bool: True if a Gemini-specific API key already exists in the project, False otherwise. - """ +def _enable_api_with_interactive_retry(project_id, creds, dry_run, tos_helper): + """Calls the enable_api function with a retry loop that handles ToS exceptions.""" + while True: + try: + if gcp_api.enable_api(project_id, creds, dry_run=dry_run): + return True + else: + return False + except TermsOfServiceNotAcceptedError as err: + with tos_helper.lock: + if not tos_helper.prompt_in_progress: + tos_helper.prompt_in_progress = True + logging.error(err.message) + logging.error(f"Please accept the terms by visiting this URL: {err.url}") + input("Press Enter to continue after accepting the Terms of Service...") + tos_helper.prompted_event.set() + + tos_helper.prompted_event.wait() + except Exception as e: + logging.error(f"An unexpected error occurred while trying to enable API for project {project_id}: {e}", exc_info=True) + return False + +def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): + """Reconciles API keys between Google Cloud and the local database for a single project.""" project_id = project.project_id logging.info(f" Reconciling keys for project {project_id}") gemini_key_exists = False @@ -50,7 +71,6 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) if not project_entry: - # If the project is not yet in our database, create a new entry for it. project_entry = { "project_info": { "project_id": project.project_id, @@ -83,15 +103,10 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): continue try: - # The key object from the list_keys method does not include the key string. - # A separate API call is required to fetch the unencrypted key string. key_string_response = api_keys_client.get_key_string(name=key_object.name) - hydrated_key = TempKey(key_object, key_string_response.key_string) - with db_lock: database.add_key_to_database(account_entry, project, hydrated_key) - except google_exceptions.PermissionDenied: logging.warning(f" Permission denied to get key string for {uid}. Skipping.") except google_exceptions.GoogleAPICallError as err: @@ -116,8 +131,40 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): logging.error(f" An API error occurred while reconciling keys for project {project_id}: {err}") return False -def process_project_for_action(project, creds, action, dry_run, db_lock, account_entry): - """Coordinates the sequence of operations for a single project based on the specified action.""" +def _create_and_process_new_project(project_number, creds, dry_run, db_lock, account_entry, tos_helper): + """Creates a single project, waits for API enablement, and creates the key.""" + random_string = utils.generate_random_string() + project_id = f"project{project_number}-{random_string}" + display_name = f"Project{project_number}" + + logging.info(f"Attempting to create project: ID='{project_id}', Name='{display_name}'") + + if dry_run: + logging.info(f"[DRY RUN] Would create project '{display_name}' with ID '{project_id}'.") + return + + try: + resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) + project_to_create = resourcemanager_v3.Project(project_id=project_id, display_name=display_name) + operation = resource_manager.create_project(project=project_to_create) + logging.info(f"Waiting for project creation operation for '{display_name}' to complete...") + created_project = operation.result() + logging.info(f"Successfully initiated creation for project '{display_name}'.") + + if _enable_api_with_interactive_retry(project_id, creds, dry_run, tos_helper): + logging.info(f"Generative AI API enabled for project '{display_name}' ({project_id}). Project is ready.") + key_object = gcp_api.create_api_key(project_id, creds, dry_run=dry_run) + if key_object: + with db_lock: + database.add_key_to_database(account_entry, created_project, key_object) + else: + logging.error(f"Failed to enable API for new project '{display_name}' ({project_id}). Skipping key creation.") + + except Exception as e: + logging.error(f"Failed to create project '{display_name}': {e}", exc_info=True) + +def process_project_for_action(project, creds, action, dry_run, db_lock, account_entry, tos_helper): + """Processes a single existing project for the given action in a thread-safe manner.""" project_id = project.project_id logging.info(f"- Starting to process project: {project_id} ({project.display_name})") @@ -127,7 +174,7 @@ def process_project_for_action(project, creds, action, dry_run, db_lock, account logging.info(f" '{config.GEMINI_API_KEY_DISPLAY_NAME}' already exists in project {project_id}. Skipping creation.") return - if gcp_api.enable_api(project_id, creds, dry_run=dry_run): + if _enable_api_with_interactive_retry(project_id, creds, dry_run, tos_helper): key_object = gcp_api.create_api_key(project_id, creds, dry_run=dry_run) if key_object: with db_lock: @@ -137,15 +184,11 @@ def process_project_for_action(project, creds, action, dry_run, db_lock, account if deleted_keys_uids: with db_lock: database.remove_keys_from_database(account_entry, project_id, deleted_keys_uids) + logging.info(f"- Finished processing project: {project_id}") def process_account(email, creds, action, api_keys_data, dry_run=False, max_workers=5): - """ - Orchestrates the entire process for a single user account. - - This includes finding all accessible projects and then running the specified - action ('create' or 'delete') on each project concurrently. - """ + """Processes a single account for the given action.""" logging.info(f"--- Processing account: {email} for action: {action} ---") if dry_run: logging.info("*** DRY RUN MODE ENABLED ***") @@ -160,7 +203,7 @@ def process_account(email, creds, action, api_keys_data, dry_run=False, max_work "account_details": { "email": email, "authentication_details": { - "token_file": os.path.join(config.CREDENTIALS_DIR, f"{email}.json"), + "token_file": f"{config.CREDENTIALS_DIR}/{email}.json", "scopes": config.SCOPES } }, @@ -170,31 +213,39 @@ def process_account(email, creds, action, api_keys_data, dry_run=False, max_work try: resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) - projects = list(resource_manager.search_projects()) - - if action == 'create': - new_projects = gcp_api.create_projects_if_needed(projects, creds, dry_run) - projects.extend(new_projects) - - if not projects: - logging.info(f"No projects found for {email}.") - return - - logging.info(f"Found {len(projects)} projects. Processing with up to {max_workers} workers...") + existing_projects = list(resource_manager.search_projects()) + if not existing_projects and action == 'create': + logging.warning(f"No projects found for {email}. This could be due to several reasons:") + logging.warning(" 1. The account truly has no projects.") + logging.warning(" 2. The Cloud Resource Manager API Terms of Service have not been accepted.") + logging.warning(f"Please ensure the ToS are accepted by visiting: https://console.cloud.google.com/iam-admin/settings?user={email}") + + projects_to_create_count = 0 + if action == 'create': + if len(existing_projects) < 12: + projects_to_create_count = 12 - len(existing_projects) + + tos_helper = TosAcceptanceHelper() db_lock = threading.Lock() with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_project = { - executor.submit(process_project_for_action, project, creds, action, dry_run, db_lock, account_entry): project - for project in projects - } - for future in concurrent.futures.as_completed(future_to_project): - project = future_to_project[future] + futures = [] + # Submit tasks for existing projects + for project in existing_projects: + futures.append(executor.submit(process_project_for_action, project, creds, action, dry_run, db_lock, account_entry, tos_helper)) + + # Submit tasks for new projects + if action == 'create' and projects_to_create_count > 0: + for i in range(len(existing_projects), 12): + project_number = str(i + 1).zfill(2) + futures.append(executor.submit(_create_and_process_new_project, project_number, creds, dry_run, db_lock, account_entry, tos_helper)) + + for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as exc: - logging.error(f"Project {project.project_id} generated an exception: {exc}", exc_info=True) + logging.error(f"A task in the thread pool generated an exception: {exc}", exc_info=True) except google_exceptions.PermissionDenied as err: logging.error(f"Permission denied for account {email}. Check IAM roles.") diff --git a/gemini_key_manager/auth.py b/gemini_key_manager/auth.py index 16fb897..8b48444 100644 --- a/gemini_key_manager/auth.py +++ b/gemini_key_manager/auth.py @@ -1,19 +1,22 @@ -"""Handles Google Cloud authentication, including token refresh and interactive OAuth2 flows.""" +""" +Functions for handling Google Cloud authentication. +""" import os import json import logging import time import google.auth from google.oauth2.credentials import Credentials +import google_auth_oauthlib.flow from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport import requests from . import config +logger = logging.getLogger(__name__) + + def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): - """ - Attempts to load credentials from a token file and refresh them if they are expired. - This function operates non-interactively and will not prompt the user to log in. - """ + """Tries to load and refresh credentials for an email with retries. Does not start interactive flow.""" token_file = os.path.join(config.CREDENTIALS_DIR, f"{email}.json") creds = None if os.path.exists(token_file): @@ -46,14 +49,11 @@ def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): return None def run_interactive_auth(email, max_retries=3, retry_delay=5): - """ - Initiates an interactive, browser-based OAuth2 flow to get new credentials for a user. - The new credentials are then saved to a token file for future non-interactive use. - """ + """Runs the interactive OAuth2 flow for a given email with retries.""" for attempt in range(max_retries): try: logging.info(f"Please authenticate with: {email} (attempt {attempt + 1}/{max_retries})") - flow = InstalledAppFlow.from_client_secrets_file( + flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( config.CLIENT_SECRETS_FILE, config.SCOPES ) creds = flow.run_local_server(port=0) diff --git a/gemini_key_manager/config.py b/gemini_key_manager/config.py index 496bf71..680b77e 100644 --- a/gemini_key_manager/config.py +++ b/gemini_key_manager/config.py @@ -1,20 +1,22 @@ -"""This module defines configuration constants used throughout the application.""" +""" +Configuration constants for the Gemini Key Management script. +""" import os -# --- Directory Paths --- +# --- DIRECTORIES --- CREDENTIALS_DIR = "credentials" LOG_DIR = "logs" SCHEMA_DIR = "schemas" -# --- File Names --- +# --- FILENAMES --- EMAILS_FILE = "emails.txt" CLIENT_SECRETS_FILE = "credentials.json" API_KEYS_DATABASE_FILE = "api_keys_database.json" -# --- Schema Configuration --- +# --- SCHEMA --- API_KEYS_SCHEMA_FILE = os.path.join(SCHEMA_DIR, "v1", "api_keys_database.schema.json") -# --- Google API Settings --- +# --- GOOGLE API --- SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", ] diff --git a/gemini_key_manager/database.py b/gemini_key_manager/database.py index 34f34a8..95323c4 100644 --- a/gemini_key_manager/database.py +++ b/gemini_key_manager/database.py @@ -1,4 +1,6 @@ -"""This module handles all interactions with the local JSON database.""" +""" +Functions for managing the JSON database of API keys. +""" import os import json import logging @@ -8,7 +10,7 @@ import jsonschema from . import config def load_schema(filename): - """Loads and parses a JSON schema file.""" + """Loads a JSON schema from a file.""" if not os.path.exists(filename): logging.error(f"Schema file not found at '{filename}'") sys.exit(1) @@ -20,10 +22,7 @@ def load_schema(filename): sys.exit(1) def load_keys_database(filename, schema): - """ - Loads the API keys database from a JSON file. - If the file doesn't exist, is empty, or invalid, it returns a new, empty database structure. - """ + """Loads and validates the JSON database of API keys.""" if not os.path.exists(filename): return { "schema_version": "1.0.0", @@ -45,7 +44,7 @@ def load_keys_database(filename, schema): } def save_keys_to_json(data, filename, schema): - """Saves the provided data structure to a JSON file after validating it against the schema.""" + """Validates and saves the API key data to a single JSON file.""" now = datetime.now(timezone.utc).isoformat() data["generation_timestamp_utc"] = data.get("generation_timestamp_utc", now) data["last_modified_utc"] = now @@ -60,10 +59,7 @@ def save_keys_to_json(data, filename, schema): sys.exit(1) def add_key_to_database(account_entry, project, key_object): - """ - Adds a new API key to the database under the appropriate account and project. - If the project does not exist for the account, it will be created. - """ + """Adds a new API key's details to the data structure.""" project_id = project.project_id project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) @@ -107,7 +103,7 @@ def add_key_to_database(account_entry, project, key_object): logging.warning(f" Key {key_object.uid} already exists in local database for project {project_id}") def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): - """Removes a list of API keys from a project's entry in the database.""" + """Removes deleted API keys from the data structure.""" project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) if not project_entry: return @@ -120,4 +116,4 @@ def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): final_key_count = len(project_entry["api_keys"]) num_removed = initial_key_count - final_key_count if num_removed > 0: - logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") + logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") \ No newline at end of file diff --git a/gemini_key_manager/exceptions.py b/gemini_key_manager/exceptions.py new file mode 100644 index 0000000..3bb45c2 --- /dev/null +++ b/gemini_key_manager/exceptions.py @@ -0,0 +1,10 @@ +""" +Custom exceptions for the Gemini Key Management script. +""" + +class TermsOfServiceNotAcceptedError(Exception): + """Raised when the Terms of Service for a required API have not been accepted.""" + def __init__(self, message, url): + self.message = message + self.url = url + super().__init__(self.message) diff --git a/gemini_key_manager/gcp_api.py b/gemini_key_manager/gcp_api.py index 5275aea..0e08337 100644 --- a/gemini_key_manager/gcp_api.py +++ b/gemini_key_manager/gcp_api.py @@ -1,14 +1,16 @@ -"""This module contains functions for interacting with various Google Cloud Platform APIs.""" +""" +Functions for interacting with Google Cloud Platform APIs. +""" import logging import time import concurrent.futures from datetime import datetime, timezone from google.cloud import resourcemanager_v3, service_usage_v1, api_keys_v2 from google.api_core import exceptions as google_exceptions -from . import config, utils +from . import config, utils, exceptions def enable_api(project_id, credentials, dry_run=False): - """Enables the Generative Language API for a given project.""" + """Enables the Generative Language API.""" service_name = config.GENERATIVE_LANGUAGE_API service_path = f"projects/{project_id}/services/{service_name}" service_usage_client = service_usage_v1.ServiceUsageClient(credentials=credentials) @@ -21,7 +23,7 @@ def enable_api(project_id, credentials, dry_run=False): enable_request = service_usage_v1.EnableServiceRequest(name=service_path) operation = service_usage_client.enable_service(request=enable_request) - # This is a long-running operation, so we wait for it to complete. + # Wait for the operation to complete. operation.result() logging.info(f" Successfully enabled Generative Language API for project {project_id}") return True @@ -30,17 +32,20 @@ def enable_api(project_id, credentials, dry_run=False): logging.warning(f" Permission denied to enable API for project {project_id}. Skipping.") return False except google_exceptions.GoogleAPICallError as err: + if 'UREQ_TOS_NOT_ACCEPTED' in str(err): + tos_url = "https://console.developers.google.com/terms/generative-language-api" + raise exceptions.TermsOfServiceNotAcceptedError( + f"Terms of Service for the Generative Language API have not been accepted for project {project_id}.", + url=tos_url + ) logging.error(f" Error enabling API for project {project_id}: {err}") return False def create_api_key(project_id, credentials, dry_run=False): - """ - Creates a new API key in the specified project. - The key is restricted to only allow access to the Generative Language API. - """ + """Creates a new, restricted API key.""" if dry_run: logging.info(f" [DRY RUN] Would create API key for project {project_id}") - # In a dry run, return a mock key object to allow the rest of the logic to proceed. + # Return a mock key object for dry run return api_keys_v2.Key( name=f"projects/{project_id}/locations/global/keys/mock-key-id", uid="mock-key-id", @@ -79,7 +84,7 @@ def create_api_key(project_id, credentials, dry_run=False): return None def delete_api_keys(project_id, credentials, dry_run=False): - """Deletes all API keys with the configured display name from a project.""" + """Deletes all API keys with the display name 'Gemini API Key' and returns their UIDs.""" deleted_keys_uids = [] try: api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) @@ -113,80 +118,3 @@ def delete_api_keys(project_id, credentials, dry_run=False): logging.error(f" An API error occurred while deleting keys for project {project_id}: {err}") return [] - - -def _create_single_project(project_number, creds, dry_run, timeout_seconds=300, initial_delay=5): - """ - Creates a new GCP project and waits for it to be ready. - Readiness is determined by successfully enabling the Generative Language API. - """ - random_string = utils.generate_random_string() - project_id = f"project{project_number}-{random_string}" - display_name = f"Project{project_number}" - - logging.info(f"Attempting to create project: ID='{project_id}', Name='{display_name}'") - - if dry_run: - logging.info(f"[DRY RUN] Would create project '{display_name}' with ID '{project_id}'.") - return None - - try: - resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) - project_to_create = resourcemanager_v3.Project( - project_id=project_id, - display_name=display_name - ) - operation = resource_manager.create_project(project=project_to_create) - logging.info(f"Waiting for project creation operation for '{display_name}' to complete...") - created_project = operation.result() - logging.info(f"Successfully initiated creation for project '{display_name}'.") - - # After creation, there can be a delay before the project is fully available - # for API enablement. This loop polls until the API can be enabled. - start_time = time.time() - delay = initial_delay - while time.time() - start_time < timeout_seconds: - if enable_api(project_id, creds): - logging.info(f"Generative AI API enabled for project '{display_name}' ({project_id}). Project is ready.") - return created_project - else: - logging.info(f"Waiting for project '{display_name}' ({project_id}) to become ready... Retrying in {delay} seconds.") - time.sleep(delay) - delay = min(delay * 2, 30) - - logging.error(f"Timed out waiting for project '{display_name}' ({project_id}) to become ready after {timeout_seconds} seconds.") - return None - - except Exception as e: - logging.error(f"Failed to create project '{display_name}': {e}") - return None - -def create_projects_if_needed(projects, creds, dry_run=False, max_workers=5): - """Creates new GCP projects in parallel until the account has at least 12 projects.""" - existing_project_count = len(projects) - logging.info(f"Found {existing_project_count} existing projects.") - newly_created_projects = [] - - if existing_project_count >= 12: - logging.info("Account already has 12 or more projects. No new projects will be created.") - return newly_created_projects - - projects_to_create_count = 12 - existing_project_count - logging.info(f"Need to create {projects_to_create_count} more projects.") - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_project_number = { - executor.submit(_create_single_project, str(i + 1).zfill(2), creds, dry_run): i - for i in range(existing_project_count, 12) - } - - for future in concurrent.futures.as_completed(future_to_project_number): - try: - created_project = future.result() - if created_project: - newly_created_projects.append(created_project) - except Exception as exc: - project_number = future_to_project_number[future] - logging.error(f"Project number {project_number} generated an exception: {exc}", exc_info=True) - - return newly_created_projects diff --git a/gemini_key_manager/main.py b/gemini_key_manager/main.py index 5398500..53a4110 100644 --- a/gemini_key_manager/main.py +++ b/gemini_key_manager/main.py @@ -1,4 +1,6 @@ -"""This is the main entry point for the Gemini Key Management command-line tool.""" +""" +Main entry point for the Gemini Key Management script. +""" import argparse import logging import sys @@ -7,7 +9,7 @@ import concurrent.futures from . import utils, config, auth, database, actions def main(): - """Parses command-line arguments and orchestrates the key management process.""" + """Main function to orchestrate API key creation or deletion.""" parser = argparse.ArgumentParser(description="Manage Gemini API keys in Google Cloud projects.") parser.add_argument("action", choices=['create', 'delete'], help="The action to perform: 'create' or 'delete' API keys.") parser.add_argument("--email", help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.") @@ -40,7 +42,7 @@ def main(): elif args.action == 'delete': logging.error("The 'delete' action requires the --email argument to specify which account's keys to delete.") sys.exit(1) - else: # 'create' action without a specific email + else: # action is 'create' and no email provided emails_to_process = utils.load_emails_from_file(config.EMAILS_FILE) if not emails_to_process: logging.info("No emails found in emails.txt. Exiting.") diff --git a/gemini_key_manager/utils.py b/gemini_key_manager/utils.py index 9121ee1..a9062b6 100644 --- a/gemini_key_manager/utils.py +++ b/gemini_key_manager/utils.py @@ -1,4 +1,6 @@ -"""This module provides utility functions for logging, file handling, and string generation.""" +""" +Utility functions for the Gemini Key Management script. +""" import logging import os import sys @@ -9,7 +11,7 @@ from colorama import Fore, Style, init from . import config class ColoredFormatter(logging.Formatter): - """A logging formatter that adds color to console output for different log levels.""" + """A custom logging formatter that adds color to console output.""" LOG_COLORS = { logging.DEBUG: Fore.CYAN, @@ -20,11 +22,11 @@ class ColoredFormatter(logging.Formatter): } def format(self, record): - """Applies color to the formatted log message.""" + """Formats the log record with appropriate colors.""" color = self.LOG_COLORS.get(record.levelno) message = super().format(record) if color: - # For better readability, only color the message part of the log string. + # Only color the message part for readability parts = message.split(" - ", 2) if len(parts) > 2: parts[2] = color + parts[2] + Style.RESET_ALL @@ -34,11 +36,8 @@ class ColoredFormatter(logging.Formatter): return message def setup_logging(): - """ - Configures the root logger to output to both a timestamped file and the console. - Console output is colorized for readability. - """ - init(autoreset=True) # Required for colorama on Windows + """Sets up logging to both console and a file, with colors for the console.""" + init(autoreset=True) # Initialize Colorama if not os.path.exists(config.LOG_DIR): os.makedirs(config.LOG_DIR) @@ -49,11 +48,11 @@ def setup_logging(): logger = logging.getLogger() logger.setLevel(logging.INFO) - # Avoids duplicate log messages if the function is called multiple times. + # Clear existing handlers to avoid duplicate logs if logger.hasHandlers(): logger.handlers.clear() - # The file handler logs detailed, non-colored messages. + # File handler for detailed, non-colored logging file_handler = logging.FileHandler(log_filepath, encoding='utf-8') file_formatter = logging.Formatter( "%(asctime)s - %(levelname)s - [%(name)s:%(module)s:%(lineno)d] - %(message)s" @@ -61,7 +60,7 @@ def setup_logging(): file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) - # The console handler logs concise, colored messages. + # Console handler for concise, colored logging console_handler = logging.StreamHandler(sys.stdout) console_formatter = ColoredFormatter("%(asctime)s - %(levelname)s - %(message)s") console_handler.setFormatter(console_formatter) @@ -70,18 +69,16 @@ def setup_logging(): logging.info(f"Logging initialized. Log file: {log_filepath}") def load_emails_from_file(filename): - """ - Reads a list of email addresses from a text file. - It ignores empty lines and lines that start with a '#' comment character. - """ + """Loads a list of emails from a text file, ignoring comments.""" if not os.path.exists(filename): logging.error(f"Email file not found at '{filename}'") logging.info("Please create it and add one email address per line.") return [] with open(filename, "r") as f: + # Ignore empty lines and lines starting with # return [line.strip() for line in f if line.strip() and not line.startswith("#")] def generate_random_string(length=10): - """Generates a random alphanumeric string for creating unique project IDs.""" + """Generates a random alphanumeric string of a given length.""" letters_and_digits = string.ascii_lowercase + string.digits return ''.join(random.choice(letters_and_digits) for i in range(length))