diff --git a/gemini_key_manager/actions.py b/gemini_key_manager/actions.py index 7178c9f..8aba7cd 100644 --- a/gemini_key_manager/actions.py +++ b/gemini_key_manager/actions.py @@ -6,9 +6,9 @@ This module handles: - Thread-safe database interactions - Interactive Terms of Service acceptance workflows """ + import logging import threading -import time import concurrent.futures from datetime import datetime, timezone from google.api_core import exceptions as google_exceptions @@ -16,9 +16,10 @@ from google.cloud import resourcemanager_v3, api_keys_v2 from . import config, gcp_api, database, utils from .exceptions import TermsOfServiceNotAcceptedError + class TempKey: """Mock key object compatible with database operations. - + Provides a temporary representation of an API key for database insertion when direct API key string retrieval is not possible. @@ -31,6 +32,7 @@ class TempKey: update_time (datetime): Last update timestamp restrictions (api_keys_v2.Restrictions): Key usage restrictions """ + def __init__(self, cloud_key, key_string): self.key_string = key_string self.uid = cloud_key.uid @@ -40,9 +42,10 @@ class TempKey: self.update_time = cloud_key.update_time self.restrictions = cloud_key.restrictions + class TosAcceptanceHelper: """Manages Terms of Service acceptance workflow with thread synchronization. - + Coordinates interactive ToS acceptance across multiple threads to prevent duplicate prompts and ensure proper sequencing. @@ -51,14 +54,16 @@ class TosAcceptanceHelper: prompted_event (threading.Event): Signals ToS acceptance completion prompt_in_progress (bool): Indicates active prompt display status """ + def __init__(self): self.lock = threading.Lock() self.prompted_event = threading.Event() self.prompt_in_progress = False + def _enable_api_with_interactive_retry(project_id, creds, dry_run, tos_helper): """Attempts to enable API with retry logic for ToS acceptance. - + Args: project_id (str): Target GCP project ID creds (Credentials: Authenticated Google credentials @@ -67,7 +72,7 @@ def _enable_api_with_interactive_retry(project_id, creds, dry_run, tos_helper): Returns: bool: True if API enabled successfully - + Raises: GoogleAPICallError: For non-ToS related API failures """ @@ -89,16 +94,17 @@ def _enable_api_with_interactive_retry(project_id, creds, dry_run, tos_helper): logging.error(f"API enablement error for {project_id}: {e}", exc_info=True) return False + def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): """Reconciles cloud and local database API key states. - + Args: project (Project): GCP Project resource creds (Credentials): Authenticated credentials dry_run (bool): Simulation mode flag db_lock (threading.Lock): Database access lock account_entry (dict): Account data structure - + Returns: bool: True if Gemini key exists, False otherwise """ @@ -109,30 +115,43 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): try: api_keys_client = api_keys_v2.ApiKeysClient(credentials=creds) parent = f"projects/{project_id}/locations/global" - + cloud_keys_list = list(api_keys_client.list_keys(parent=parent)) for key in cloud_keys_list: - if key.display_name in [config.GEMINI_API_KEY_DISPLAY_NAME, config.GENERATIVE_LANGUAGE_API_KEY_DISPLAY_NAME]: + if key.display_name in [ + config.GEMINI_API_KEY_DISPLAY_NAME, + config.GENERATIVE_LANGUAGE_API_KEY_DISPLAY_NAME, + ]: gemini_key_exists = True - + cloud_keys = {key.uid: key for key in cloud_keys_list} - - project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) - + + project_entry = next( + ( + p + for p in account_entry["projects"] + if p.get("project_info", {}).get("project_id") == project_id + ), + None, + ) + if not project_entry: project_entry = { "project_info": { "project_id": project.project_id, "project_name": project.display_name, - "project_number": project.name.split('/')[-1], - "state": str(project.state) + "project_number": project.name.split("/")[-1], + "state": str(project.state), }, - "api_keys": [] + "api_keys": [], } with db_lock: account_entry["projects"].append(project_entry) - - local_keys = {key['key_details']['key_id']: key for key in project_entry.get('api_keys', [])} + + local_keys = { + key["key_details"]["key_id"]: key + for key in project_entry.get("api_keys", []) + } cloud_uids = set(cloud_keys.keys()) local_uids = set(local_keys.keys()) @@ -150,9 +169,11 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): if dry_run: logging.info(f"[DRY RUN] Would fetch key string for {uid}") continue - + try: - key_string_response = api_keys_client.get_key_string(name=key_object.name) + key_string_response = api_keys_client.get_key_string( + name=key_object.name + ) hydrated_key = TempKey(key_object, key_string_response.key_string) with db_lock: database.add_key_to_database(account_entry, project, hydrated_key) @@ -166,11 +187,13 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): if dry_run: logging.info(f"[DRY RUN] Would deactivate {uid}") continue - + with db_lock: - local_keys[uid]['state'] = 'INACTIVE' - local_keys[uid]['key_details']['last_updated_timestamp_utc'] = datetime.now(timezone.utc).isoformat() - + local_keys[uid]["state"] = "INACTIVE" + local_keys[uid]["key_details"]["last_updated_timestamp_utc"] = ( + datetime.now(timezone.utc).isoformat() + ) + return gemini_key_exists except google_exceptions.PermissionDenied: @@ -180,9 +203,12 @@ def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): logging.error(f"API error during reconciliation: {err}") return False -def _create_and_process_new_project(project_number, creds, dry_run, db_lock, account_entry, tos_helper): + +def _create_and_process_new_project( + project_number, creds, dry_run, db_lock, account_entry, tos_helper +): """Creates and initializes new GCP project with API key. - + Args: project_number (str): Sequential project identifier creds (Credentials): Authenticated credentials @@ -194,7 +220,7 @@ def _create_and_process_new_project(project_number, creds, dry_run, db_lock, acc random_string = utils.generate_random_string() project_id = f"project{project_number}-{random_string}" display_name = f"Project{project_number}" - + logging.info(f"Creating project: {display_name} ({project_id})") if dry_run: @@ -203,7 +229,9 @@ def _create_and_process_new_project(project_number, creds, dry_run, db_lock, acc try: resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) - project_to_create = resourcemanager_v3.Project(project_id=project_id, display_name=display_name) + project_to_create = resourcemanager_v3.Project( + project_id=project_id, display_name=display_name + ) operation = resource_manager.create_project(project=project_to_create) logging.info(f"Awaiting project creation: {display_name}") created_project = operation.result() @@ -214,16 +242,21 @@ def _create_and_process_new_project(project_number, creds, dry_run, db_lock, acc key_object = gcp_api.create_api_key(project_id, creds, dry_run=dry_run) if key_object: with db_lock: - database.add_key_to_database(account_entry, created_project, key_object) + database.add_key_to_database( + account_entry, created_project, key_object + ) else: logging.error(f"API enablement failed for {display_name}") except Exception as e: logging.error(f"Project creation failed: {e}", exc_info=True) -def process_project_for_action(project, creds, action, dry_run, db_lock, account_entry, tos_helper): + +def process_project_for_action( + project, creds, action, dry_run, db_lock, account_entry, tos_helper +): """Executes specified action on a single GCP project. - + Args: project (Project): Target GCP project creds (Credentials): Authenticated credentials @@ -236,8 +269,10 @@ def process_project_for_action(project, creds, action, dry_run, db_lock, account project_id = project.project_id logging.info(f"Processing {project_id} ({project.display_name})") - if action == 'create': - gemini_key_exists = reconcile_project_keys(project, creds, dry_run, db_lock, account_entry) + if action == "create": + gemini_key_exists = reconcile_project_keys( + project, creds, dry_run, db_lock, account_entry + ) if gemini_key_exists: logging.info(f"Existing Gemini key in {project_id}") return @@ -247,17 +282,22 @@ def process_project_for_action(project, creds, action, dry_run, db_lock, account if key_object: with db_lock: database.add_key_to_database(account_entry, project, key_object) - elif action == 'delete': + elif action == "delete": deleted_keys_uids = gcp_api.delete_api_keys(project_id, creds, dry_run=dry_run) if deleted_keys_uids: with db_lock: - database.remove_keys_from_database(account_entry, project_id, deleted_keys_uids) - + database.remove_keys_from_database( + account_entry, project_id, deleted_keys_uids + ) + logging.info(f"Completed processing {project_id}") -def process_account(email, creds, action, api_keys_data, schema, dry_run=False, max_workers=5): + +def process_account( + email, creds, action, api_keys_data, schema, dry_run=False, max_workers=5 +): """Orchestrates account-level key management operations. - + Args: email (str): Service account email creds (Credentials): Authenticated credentials @@ -275,31 +315,40 @@ def process_account(email, creds, action, api_keys_data, schema, dry_run=False, logging.warning(f"Invalid credentials for {email}") return - account_entry = next((acc for acc in api_keys_data["accounts"] if acc.get("account_details", {}).get("email") == email), None) + account_entry = next( + ( + acc + for acc in api_keys_data["accounts"] + if acc.get("account_details", {}).get("email") == email + ), + None, + ) if not account_entry: account_entry = { "account_details": { "email": email, "authentication_details": { "token_file": f"{config.CREDENTIALS_DIR}/{email}.json", - "scopes": config.SCOPES - } + "scopes": config.SCOPES, + }, }, - "projects": [] + "projects": [], } api_keys_data["accounts"].append(account_entry) try: resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) existing_projects = list(resource_manager.search_projects()) - - if not existing_projects and action == 'create': + + if not existing_projects and action == "create": logging.warning(f"No projects found for {email}") logging.warning("Possible reasons: No projects or unaccepted ToS") - logging.warning(f"Verify ToS: https://console.cloud.google.com/iam-admin/settings?user={email}") + logging.warning( + f"Verify ToS: https://console.cloud.google.com/iam-admin/settings?user={email}" + ) projects_to_create_count = 0 - if action == 'create': + if action == "create": if len(existing_projects) < 12: projects_to_create_count = 12 - len(existing_projects) @@ -309,13 +358,34 @@ def process_account(email, creds, action, api_keys_data, schema, dry_run=False, with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for project in existing_projects: - futures.append(executor.submit(process_project_for_action, project, creds, action, dry_run, db_lock, account_entry, tos_helper)) + futures.append( + executor.submit( + process_project_for_action, + project, + creds, + action, + dry_run, + db_lock, + account_entry, + tos_helper, + ) + ) - if action == 'create' and projects_to_create_count > 0: + if action == "create" and projects_to_create_count > 0: for i in range(len(existing_projects), 12): project_number = str(i + 1).zfill(2) - futures.append(executor.submit(_create_and_process_new_project, project_number, creds, dry_run, db_lock, account_entry, tos_helper)) - + futures.append( + executor.submit( + _create_and_process_new_project, + project_number, + creds, + dry_run, + db_lock, + account_entry, + tos_helper, + ) + ) + for future in concurrent.futures.as_completed(futures): try: future.result() diff --git a/gemini_key_manager/auth.py b/gemini_key_manager/auth.py index 88620e0..1ec8246 100644 --- a/gemini_key_manager/auth.py +++ b/gemini_key_manager/auth.py @@ -5,15 +5,14 @@ Handles OAuth2 credential management including: - Interactive authentication flows - Credential storage/retrieval """ + import os import json import logging import time -import google.auth from google.oauth2.credentials import Credentials -import google_auth_oauthlib.flow from google_auth_oauthlib.flow import InstalledAppFlow -from google.auth.transport import requests +from google.auth.transport.requests import Request from . import config logger = logging.getLogger(__name__) @@ -21,12 +20,12 @@ logger = logging.getLogger(__name__) def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): """Manages credential lifecycle with automated refresh and retry. - + Args: email (str): Service account email address max_retries (int): Maximum authentication retry attempts retry_delay (int): Seconds between retry attempts - + Returns: Credentials: Valid credentials or None if unrecoverable """ @@ -36,7 +35,9 @@ def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): try: creds = Credentials.from_authorized_user_file(token_file, config.SCOPES) except (ValueError, json.JSONDecodeError): - logging.warning(f"Could not decode token file for {email}. Re-authentication will be required.") + logging.warning( + f"Could not decode token file for {email}. Re-authentication will be required." + ) return None if creds and creds.valid: @@ -45,37 +46,46 @@ def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): if creds and creds.expired and creds.refresh_token: for attempt in range(max_retries): try: - logging.info(f"Refreshing credentials for {email} (attempt {attempt + 1}/{max_retries})...") - creds.refresh(google.auth.transport.requests.Request()) + logging.info( + f"Refreshing credentials for {email} (attempt {attempt + 1}/{max_retries})..." + ) + creds.refresh(Request()) with open(token_file, "w") as token: token.write(creds.to_json()) logging.info(f"Successfully refreshed credentials for {email}.") return creds except Exception as e: - logging.warning(f"Failed to refresh credentials for {email} on attempt {attempt + 1}: {e}") + logging.warning( + f"Failed to refresh credentials for {email} on attempt {attempt + 1}: {e}" + ) if attempt < max_retries - 1: time.sleep(retry_delay) - - logging.error(f"Failed to refresh credentials for {email} after {max_retries} attempts.") + + logging.error( + f"Failed to refresh credentials for {email} after {max_retries} attempts." + ) return None - + return None + def run_interactive_auth(email, max_retries=3, retry_delay=5): """Executes interactive OAuth2 flow with error handling. - + Args: email (str): Target service account email max_retries (int): Allowed authentication attempts retry_delay (int): Pause between failed attempts - + Returns: Credentials: On successful authentication """ for attempt in range(max_retries): try: - logging.info(f"Please authenticate with: {email} (attempt {attempt + 1}/{max_retries})") - flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( + logging.info( + f"Please authenticate with: {email} (attempt {attempt + 1}/{max_retries})" + ) + flow = InstalledAppFlow.from_client_secrets_file( config.CLIENT_SECRETS_FILE, config.SCOPES ) creds = flow.run_local_server(port=0) @@ -84,7 +94,9 @@ def run_interactive_auth(email, max_retries=3, retry_delay=5): token.write(creds.to_json()) return creds except Exception as e: - logging.error(f"An unexpected error occurred during authentication for {email} on attempt {attempt + 1}: {e}") + logging.error( + f"An unexpected error occurred during authentication for {email} on attempt {attempt + 1}: {e}" + ) if attempt < max_retries - 1: logging.info(f"Retrying authentication in {retry_delay} seconds...") time.sleep(retry_delay) diff --git a/gemini_key_manager/config.py b/gemini_key_manager/config.py index 1350801..7f6a7f1 100644 --- a/gemini_key_manager/config.py +++ b/gemini_key_manager/config.py @@ -5,6 +5,7 @@ Contains: - API endpoint configurations - Security scopes and schema locations """ + import os # --- DIRECTORIES --- diff --git a/gemini_key_manager/database.py b/gemini_key_manager/database.py index c3c20e2..ae8af25 100644 --- a/gemini_key_manager/database.py +++ b/gemini_key_manager/database.py @@ -6,23 +6,24 @@ Implements: - Key lifecycle tracking - Data versioning and backup """ + import os import json import logging import sys from datetime import datetime, timezone import jsonschema -from . import config + def load_schema(filename): """Validates and loads JSON schema definition. - + Args: filename (str): Path to schema file - + Returns: dict: Parsed schema document - + Exits: SystemExit: On invalid schema file """ @@ -36,13 +37,11 @@ def load_schema(filename): logging.error(f"Could not decode JSON schema from {filename}.") sys.exit(1) + def load_keys_database(filename, schema): """Loads and validates the JSON database of API keys.""" if not os.path.exists(filename): - return { - "schema_version": "1.0.0", - "accounts": [] - } + return {"schema_version": "1.0.0", "accounts": []} with open(filename, "r") as f: try: data = json.load(f) @@ -51,12 +50,12 @@ def load_keys_database(filename, schema): except json.JSONDecodeError: logging.warning(f"Could not decode JSON from {filename}. Starting fresh.") except jsonschema.ValidationError as e: - logging.warning(f"Database file '{filename}' is not valid. {e.message}. Starting fresh.") - - return { - "schema_version": "1.0.0", - "accounts": [] - } + logging.warning( + f"Database file '{filename}' is not valid. {e.message}. Starting fresh." + ) + + return {"schema_version": "1.0.0", "accounts": []} + def save_keys_to_json(data, filename, schema): """Validates and saves the API key data to a single JSON file.""" @@ -73,20 +72,28 @@ def save_keys_to_json(data, filename, schema): logging.error(f"Validation Error: {e.message}") sys.exit(1) + def add_key_to_database(account_entry, project, key_object): """Adds a new API key's details to the data structure.""" project_id = project.project_id - project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) + project_entry = next( + ( + p + for p in account_entry["projects"] + if p.get("project_info", {}).get("project_id") == project_id + ), + None, + ) if not project_entry: project_entry = { "project_info": { "project_id": project_id, "project_name": project.display_name, - "project_number": project.name.split('/')[-1], - "state": str(project.state) + "project_number": project.name.split("/")[-1], + "state": str(project.state), }, - "api_keys": [] + "api_keys": [], } account_entry["projects"].append(project_entry) @@ -104,31 +111,51 @@ def add_key_to_database(account_entry, project, key_object): "creation_timestamp_utc": key_object.create_time.isoformat(), "last_updated_timestamp_utc": key_object.update_time.isoformat(), }, - "restrictions": { - "api_targets": api_targets - }, - "state": "ACTIVE" + "restrictions": {"api_targets": api_targets}, + "state": "ACTIVE", } - existing_key = next((k for k in project_entry["api_keys"] if k.get("key_details", {}).get("key_id") == key_object.uid), None) + existing_key = next( + ( + k + for k in project_entry["api_keys"] + if k.get("key_details", {}).get("key_id") == key_object.uid + ), + None, + ) if not existing_key: project_entry["api_keys"].append(new_key_entry) - logging.info(f" Added key {key_object.uid} to local database for project {project_id}") + logging.info( + f" Added key {key_object.uid} to local database for project {project_id}" + ) else: - logging.warning(f" Key {key_object.uid} already exists in local database for project {project_id}") + logging.warning( + f" Key {key_object.uid} already exists in local database for project {project_id}" + ) + def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): """Removes deleted API keys from the data structure.""" - project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) + project_entry = next( + ( + p + for p in account_entry["projects"] + if p.get("project_info", {}).get("project_id") == project_id + ), + None, + ) if not project_entry: return initial_key_count = len(project_entry["api_keys"]) project_entry["api_keys"] = [ - key for key in project_entry["api_keys"] + key + for key in project_entry["api_keys"] if key.get("key_details", {}).get("key_id") not in deleted_keys_uids ] final_key_count = len(project_entry["api_keys"]) num_removed = initial_key_count - final_key_count if num_removed > 0: - logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") \ No newline at end of file + logging.info( + f" Removed {num_removed} key(s) from local database for project {project_id}" + ) diff --git a/gemini_key_manager/exceptions.py b/gemini_key_manager/exceptions.py index 55b796a..7e3fdda 100644 --- a/gemini_key_manager/exceptions.py +++ b/gemini_key_manager/exceptions.py @@ -6,13 +6,15 @@ Defines domain-specific exceptions for: - API operation constraints """ + class TermsOfServiceNotAcceptedError(Exception): """Indicates unaccepted Terms of Service for critical API operations. - + Attributes: message (str): Human-readable error description url (str): URL for Terms of Service acceptance portal """ + def __init__(self, message, url): self.message = message self.url = url diff --git a/gemini_key_manager/gcp_api.py b/gemini_key_manager/gcp_api.py index d6b7f81..f81679d 100644 --- a/gemini_key_manager/gcp_api.py +++ b/gemini_key_manager/gcp_api.py @@ -1,25 +1,25 @@ """ Functions for interacting with Google Cloud Platform APIs. """ + import logging -import time -import concurrent.futures from datetime import datetime, timezone -from google.cloud import resourcemanager_v3, service_usage_v1, api_keys_v2 +from google.cloud import service_usage_v1, api_keys_v2 from google.api_core import exceptions as google_exceptions -from . import config, utils, exceptions +from . import config, exceptions + def enable_api(project_id, credentials, dry_run=False): """Manages Generative Language API enablement with error handling. - + Args: project_id (str): Target GCP project ID credentials (Credentials): Authenticated credentials dry_run (bool): Simulation mode flag - + Returns: bool: True if enabled successfully - + Raises: TermsOfServiceNotAcceptedError: When required ToS not accepted """ @@ -28,7 +28,9 @@ def enable_api(project_id, credentials, dry_run=False): service_usage_client = service_usage_v1.ServiceUsageClient(credentials=credentials) try: - logging.info(f" Attempting to enable Generative Language API for project {project_id}...") + logging.info( + f" Attempting to enable Generative Language API for project {project_id}..." + ) if dry_run: logging.info(f" [DRY RUN] Would enable API for project {project_id}") return True @@ -37,33 +39,40 @@ def enable_api(project_id, credentials, dry_run=False): operation = service_usage_client.enable_service(request=enable_request) # Wait for the operation to complete. operation.result() - logging.info(f" Successfully enabled Generative Language API for project {project_id}") + logging.info( + f" Successfully enabled Generative Language API for project {project_id}" + ) return True except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to enable API for project {project_id}. Skipping.") + logging.warning( + f" Permission denied to enable API for project {project_id}. Skipping." + ) return False except google_exceptions.GoogleAPICallError as err: - if 'UREQ_TOS_NOT_ACCEPTED' in str(err): - tos_url = "https://console.developers.google.com/terms/generative-language-api" + if "UREQ_TOS_NOT_ACCEPTED" in str(err): + tos_url = ( + "https://console.developers.google.com/terms/generative-language-api" + ) raise exceptions.TermsOfServiceNotAcceptedError( f"Terms of Service for the Generative Language API have not been accepted for project {project_id}.", - url=tos_url + url=tos_url, ) logging.error(f" Error enabling API for project {project_id}: {err}") return False + def create_api_key(project_id, credentials, dry_run=False): """Generates restricted API key with security constraints. - + Args: project_id (str): Target GCP project ID credentials (Credentials): Authenticated credentials dry_run (bool): Simulation mode flag - + Returns: api_keys_v2.Key: Created key object or None on failure - + Raises: PermissionDenied: For insufficient credentials """ @@ -98,15 +107,20 @@ def create_api_key(project_id, credentials, dry_run=False): logging.info(" Creating API key...") operation = api_keys_client.create_key(request=request) result = operation.result() - logging.info(f" Successfully created restricted API key for project {project_id}") + logging.info( + f" Successfully created restricted API key for project {project_id}" + ) return result except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to create API key for project {project_id}. Skipping.") + logging.warning( + f" Permission denied to create API key for project {project_id}. Skipping." + ) return None except google_exceptions.GoogleAPICallError as err: logging.error(f" Error creating API key for project {project_id}: {err}") return None + def delete_api_keys(project_id, credentials, dry_run=False): """Deletes all API keys with the display name 'Gemini API Key' and returns their UIDs.""" deleted_keys_uids = [] @@ -115,13 +129,21 @@ def delete_api_keys(project_id, credentials, dry_run=False): parent = f"projects/{project_id}/locations/global" keys = api_keys_client.list_keys(parent=parent) - keys_to_delete = [key for key in keys if key.display_name == config.GEMINI_API_KEY_DISPLAY_NAME] + keys_to_delete = [ + key + for key in keys + if key.display_name == config.GEMINI_API_KEY_DISPLAY_NAME + ] if not keys_to_delete: - logging.info(f" No '{config.GEMINI_API_KEY_DISPLAY_NAME}' found to delete.") + logging.info( + f" No '{config.GEMINI_API_KEY_DISPLAY_NAME}' found to delete." + ) return [] - logging.info(f" Found {len(keys_to_delete)} key(s) with display name '{config.GEMINI_API_KEY_DISPLAY_NAME}'. Deleting...") + logging.info( + f" Found {len(keys_to_delete)} key(s) with display name '{config.GEMINI_API_KEY_DISPLAY_NAME}'. Deleting..." + ) for key in keys_to_delete: if dry_run: logging.info(f" [DRY RUN] Would delete key: {key.uid}") @@ -137,8 +159,11 @@ def delete_api_keys(project_id, credentials, dry_run=False): logging.error(f" Error deleting key {key.uid}: {err}") return deleted_keys_uids except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to list or delete API keys for project {project_id}. Skipping.") + logging.warning( + f" Permission denied to list or delete API keys for project {project_id}. Skipping." + ) except google_exceptions.GoogleAPICallError as err: - logging.error(f" An API error occurred while deleting keys for project {project_id}: {err}") + logging.error( + f" An API error occurred while deleting keys for project {project_id}: {err}" + ) return [] - diff --git a/gemini_key_manager/main.py b/gemini_key_manager/main.py index b005413..2af2cf7 100644 --- a/gemini_key_manager/main.py +++ b/gemini_key_manager/main.py @@ -1,6 +1,7 @@ """ Main entry point for the Gemini Key Management script. """ + import argparse import logging import sys @@ -8,32 +9,63 @@ import os import concurrent.futures from . import utils, config, auth, database, actions + def main(): """Orchestrates API key lifecycle management workflow. - + Handles: - Command line argument parsing - Credential management - Multi-account processing - Thread pool execution """ - parser = argparse.ArgumentParser(description="Manage Gemini API keys in Google Cloud projects.") - parser.add_argument("action", choices=['create', 'delete'], help="The action to perform: 'create' or 'delete' API keys.") - parser.add_argument("--email", help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.") - parser.add_argument("--dry-run", action="store_true", help="Simulate the run without making any actual changes to Google Cloud resources.") - parser.add_argument("--max-workers", type=int, default=5, help="The maximum number of concurrent projects to process.") - parser.add_argument("--auth-retries", type=int, default=3, help="Number of retries for a failed authentication attempt.") - parser.add_argument("--auth-retry-delay", type=int, default=5, help="Delay in seconds between authentication retries.") + parser = argparse.ArgumentParser( + description="Manage Gemini API keys in Google Cloud projects." + ) + parser.add_argument( + "action", + choices=["create", "delete"], + help="The action to perform: 'create' or 'delete' API keys.", + ) + parser.add_argument( + "--email", + help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Simulate the run without making any actual changes to Google Cloud resources.", + ) + parser.add_argument( + "--max-workers", + type=int, + default=5, + help="The maximum number of concurrent projects to process.", + ) + parser.add_argument( + "--auth-retries", + type=int, + default=3, + help="Number of retries for a failed authentication attempt.", + ) + parser.add_argument( + "--auth-retry-delay", + type=int, + default=5, + help="Delay in seconds between authentication retries.", + ) args = parser.parse_args() utils.setup_logging() logging.info(f"Program arguments: {vars(args)}") - if args.action == 'delete' and not args.email: + if args.action == "delete" and not args.email: parser.error("the --email argument is required for the 'delete' action") if not os.path.exists(config.CLIENT_SECRETS_FILE): - logging.error(f"OAuth client secrets file not found at '{config.CLIENT_SECRETS_FILE}'") + logging.error( + f"OAuth client secrets file not found at '{config.CLIENT_SECRETS_FILE}'" + ) logging.error("Please follow the setup instructions in README.md to create it.") sys.exit(1) @@ -46,10 +78,12 @@ def main(): emails_to_process = [] if args.email: emails_to_process.append(args.email) - elif args.action == 'delete': - logging.error("The 'delete' action requires the --email argument to specify which account's keys to delete.") + elif args.action == "delete": + logging.error( + "The 'delete' action requires the --email argument to specify which account's keys to delete." + ) sys.exit(1) - else: # action is 'create' and no email provided + else: # action is 'create' and no email provided emails_to_process = utils.load_emails_from_file(config.EMAILS_FILE) if not emails_to_process: logging.info("No emails found in emails.txt. Exiting.") @@ -59,10 +93,20 @@ def main(): emails_needing_interactive_auth = [] logging.info("Checking credentials and refreshing tokens for all accounts...") - - with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor: - future_to_email = {executor.submit(auth.get_and_refresh_credentials, email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay): email for email in emails_to_process} - + + with concurrent.futures.ThreadPoolExecutor( + max_workers=args.max_workers + ) as executor: + future_to_email = { + executor.submit( + auth.get_and_refresh_credentials, + email, + max_retries=args.auth_retries, + retry_delay=args.auth_retry_delay, + ): email + for email in emails_to_process + } + for future in concurrent.futures.as_completed(future_to_email): email = future_to_email[future] try: @@ -72,25 +116,44 @@ def main(): else: emails_needing_interactive_auth.append(email) except Exception as exc: - logging.error(f"Credential check for {email} generated an exception: {exc}", exc_info=True) + logging.error( + f"Credential check for {email} generated an exception: {exc}", + exc_info=True, + ) emails_needing_interactive_auth.append(email) if emails_needing_interactive_auth: - logging.info(f"\n--- INTERACTIVE AUTHENTICATION REQUIRED ---") - logging.info(f"The following accounts require manual authentication: {', '.join(sorted(emails_needing_interactive_auth))}") - + logging.info("\n--- INTERACTIVE AUTHENTICATION REQUIRED ---") + logging.info( + f"The following accounts require manual authentication: {', '.join(sorted(emails_needing_interactive_auth))}" + ) + for email in sorted(emails_needing_interactive_auth): - creds = auth.run_interactive_auth(email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay) + creds = auth.run_interactive_auth( + email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay + ) if creds: logging.info(f"Successfully authenticated {email}.") creds_map[email] = creds else: - logging.warning(f"Authentication failed or was cancelled for {email}. This account will be skipped.") - + logging.warning( + f"Authentication failed or was cancelled for {email}. This account will be skipped." + ) + logging.info("\n--- Credential checking complete ---") for email in emails_to_process: if email in creds_map: - actions.process_account(email, creds_map[email], args.action, api_keys_data, schema, dry_run=args.dry_run, max_workers=args.max_workers) + actions.process_account( + email, + creds_map[email], + args.action, + api_keys_data, + schema, + dry_run=args.dry_run, + max_workers=args.max_workers, + ) else: - logging.warning(f"Skipping account {email} because authentication was not successful.") + logging.warning( + f"Skipping account {email} because authentication was not successful." + )