From 00c7c9f455339dec1fbb61c9e6a81ab48c3d5ea0 Mon Sep 17 00:00:00 2001 From: not-lucky Date: Sat, 23 Aug 2025 13:01:30 +0530 Subject: [PATCH] modularized the program; oops --- .gitignore | 5 +- gemini_key_manager/__init__.py | 1 + gemini_key_manager/actions.py | 203 +++++++++ gemini_key_manager/auth.py | 71 ++++ gemini_key_manager/config.py | 23 ++ gemini_key_manager/database.py | 123 ++++++ gemini_key_manager/gcp_api.py | 192 +++++++++ gemini_key_manager/main.py | 90 ++++ gemini_key_manager/utils.py | 87 ++++ main.py | 733 +-------------------------------- 10 files changed, 801 insertions(+), 727 deletions(-) create mode 100644 gemini_key_manager/__init__.py create mode 100644 gemini_key_manager/actions.py create mode 100644 gemini_key_manager/auth.py create mode 100644 gemini_key_manager/config.py create mode 100644 gemini_key_manager/database.py create mode 100644 gemini_key_manager/gcp_api.py create mode 100644 gemini_key_manager/main.py create mode 100644 gemini_key_manager/utils.py diff --git a/.gitignore b/.gitignore index 1a59d40..0ebf97c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,9 +7,10 @@ __pycache__/ # Credentials and Secrets credentials/ -credentials.json +credentials* *.key emails.txt *.keys.txt api_keys_database.json -logs/ \ No newline at end of file +logs/ +gemini_key_manager copy/ \ No newline at end of file diff --git a/gemini_key_manager/__init__.py b/gemini_key_manager/__init__.py new file mode 100644 index 0000000..6166aa8 --- /dev/null +++ b/gemini_key_manager/__init__.py @@ -0,0 +1 @@ +"""Initializes the gemini_key_manager package.""" diff --git a/gemini_key_manager/actions.py b/gemini_key_manager/actions.py new file mode 100644 index 0000000..afbd85e --- /dev/null +++ b/gemini_key_manager/actions.py @@ -0,0 +1,203 @@ +"""This module contains the core functions that perform actions on GCP projects.""" +import os +import logging +import threading +import concurrent.futures +from datetime import datetime, timezone +from google.api_core import exceptions as google_exceptions +from google.cloud import resourcemanager_v3, api_keys_v2 +from . import config, gcp_api, database + +class TempKey: + """A temporary container for key data to ensure compatibility with database functions.""" + def __init__(self, cloud_key, key_string): + self.key_string = key_string + self.uid = cloud_key.uid + self.name = cloud_key.name + self.display_name = cloud_key.display_name + self.create_time = cloud_key.create_time + self.update_time = cloud_key.update_time + self.restrictions = cloud_key.restrictions + +def reconcile_project_keys(project, creds, dry_run, db_lock, account_entry): + """ + Compares the API keys in a GCP project with the local database and syncs them. + + This function will: + 1. Fetch all keys from the GCP project. + 2. Fetch all keys for the project from the local database. + 3. Add keys that only exist in GCP to the local database. + 4. Mark keys as INACTIVE in the local database if they no longer exist in GCP. + + Returns: + bool: True if a Gemini-specific API key already exists in the project, False otherwise. + """ + project_id = project.project_id + logging.info(f" Reconciling keys for project {project_id}") + gemini_key_exists = False + + try: + api_keys_client = api_keys_v2.ApiKeysClient(credentials=creds) + parent = f"projects/{project_id}/locations/global" + + cloud_keys_list = list(api_keys_client.list_keys(parent=parent)) + for key in cloud_keys_list: + if key.display_name in [config.GEMINI_API_KEY_DISPLAY_NAME, config.GENERATIVE_LANGUAGE_API_KEY_DISPLAY_NAME]: + gemini_key_exists = True + + cloud_keys = {key.uid: key for key in cloud_keys_list} + + project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) + + if not project_entry: + # If the project is not yet in our database, create a new entry for it. + project_entry = { + "project_info": { + "project_id": project.project_id, + "project_name": project.display_name, + "project_number": project.name.split('/')[-1], + "state": str(project.state) + }, + "api_keys": [] + } + with db_lock: + account_entry["projects"].append(project_entry) + + local_keys = {key['key_details']['key_id']: key for key in project_entry.get('api_keys', [])} + + cloud_uids = set(cloud_keys.keys()) + local_uids = set(local_keys.keys()) + + synced_uids = cloud_uids.intersection(local_uids) + cloud_only_uids = cloud_uids - local_uids + local_only_uids = local_uids - cloud_uids + + for uid in synced_uids: + logging.info(f" Key {uid} is synchronized.") + + for uid in cloud_only_uids: + key_object = cloud_keys[uid] + logging.info(f" Key {uid} ({key_object.display_name}) found in cloud only. Adding to local database.") + if dry_run: + logging.info(f" [DRY RUN] Would fetch key string for {uid} and add to database.") + continue + + try: + # The key object from the list_keys method does not include the key string. + # A separate API call is required to fetch the unencrypted key string. + key_string_response = api_keys_client.get_key_string(name=key_object.name) + + hydrated_key = TempKey(key_object, key_string_response.key_string) + + with db_lock: + database.add_key_to_database(account_entry, project, hydrated_key) + + except google_exceptions.PermissionDenied: + logging.warning(f" Permission denied to get key string for {uid}. Skipping.") + except google_exceptions.GoogleAPICallError as err: + logging.error(f" Error getting key string for {uid}: {err}") + + for uid in local_only_uids: + logging.info(f" Key {uid} found in local database only. Marking as INACTIVE.") + if dry_run: + logging.info(f" [DRY RUN] Would mark key {uid} as INACTIVE.") + continue + + with db_lock: + local_keys[uid]['state'] = 'INACTIVE' + local_keys[uid]['key_details']['last_updated_timestamp_utc'] = datetime.now(timezone.utc).isoformat() + + return gemini_key_exists + + except google_exceptions.PermissionDenied: + logging.warning(f" Permission denied to list keys for project {project_id}. Skipping reconciliation.") + return False + except google_exceptions.GoogleAPICallError as err: + logging.error(f" An API error occurred while reconciling keys for project {project_id}: {err}") + return False + +def process_project_for_action(project, creds, action, dry_run, db_lock, account_entry): + """Coordinates the sequence of operations for a single project based on the specified action.""" + project_id = project.project_id + logging.info(f"- Starting to process project: {project_id} ({project.display_name})") + + if action == 'create': + gemini_key_exists = reconcile_project_keys(project, creds, dry_run, db_lock, account_entry) + if gemini_key_exists: + logging.info(f" '{config.GEMINI_API_KEY_DISPLAY_NAME}' already exists in project {project_id}. Skipping creation.") + return + + if gcp_api.enable_api(project_id, creds, dry_run=dry_run): + key_object = gcp_api.create_api_key(project_id, creds, dry_run=dry_run) + if key_object: + with db_lock: + database.add_key_to_database(account_entry, project, key_object) + elif action == 'delete': + deleted_keys_uids = gcp_api.delete_api_keys(project_id, creds, dry_run=dry_run) + if deleted_keys_uids: + with db_lock: + database.remove_keys_from_database(account_entry, project_id, deleted_keys_uids) + logging.info(f"- Finished processing project: {project_id}") + +def process_account(email, creds, action, api_keys_data, dry_run=False, max_workers=5): + """ + Orchestrates the entire process for a single user account. + + This includes finding all accessible projects and then running the specified + action ('create' or 'delete') on each project concurrently. + """ + logging.info(f"--- Processing account: {email} for action: {action} ---") + if dry_run: + logging.info("*** DRY RUN MODE ENABLED ***") + + if not creds: + logging.warning(f"Could not get credentials for {email}. Skipping.") + return + + account_entry = next((acc for acc in api_keys_data["accounts"] if acc.get("account_details", {}).get("email") == email), None) + if not account_entry: + account_entry = { + "account_details": { + "email": email, + "authentication_details": { + "token_file": os.path.join(config.CREDENTIALS_DIR, f"{email}.json"), + "scopes": config.SCOPES + } + }, + "projects": [] + } + api_keys_data["accounts"].append(account_entry) + + try: + resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) + projects = list(resource_manager.search_projects()) + + if action == 'create': + new_projects = gcp_api.create_projects_if_needed(projects, creds, dry_run) + projects.extend(new_projects) + + if not projects: + logging.info(f"No projects found for {email}.") + return + + logging.info(f"Found {len(projects)} projects. Processing with up to {max_workers} workers...") + + db_lock = threading.Lock() + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_project = { + executor.submit(process_project_for_action, project, creds, action, dry_run, db_lock, account_entry): project + for project in projects + } + for future in concurrent.futures.as_completed(future_to_project): + project = future_to_project[future] + try: + future.result() + except Exception as exc: + logging.error(f"Project {project.project_id} generated an exception: {exc}", exc_info=True) + + except google_exceptions.PermissionDenied as err: + logging.error(f"Permission denied for account {email}. Check IAM roles.") + logging.error(f" Error: {err}") + except google_exceptions.GoogleAPICallError as err: + logging.error(f"An API error occurred while processing account {email}: {err}") diff --git a/gemini_key_manager/auth.py b/gemini_key_manager/auth.py new file mode 100644 index 0000000..16fb897 --- /dev/null +++ b/gemini_key_manager/auth.py @@ -0,0 +1,71 @@ +"""Handles Google Cloud authentication, including token refresh and interactive OAuth2 flows.""" +import os +import json +import logging +import time +import google.auth +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport import requests +from . import config + +def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): + """ + Attempts to load credentials from a token file and refresh them if they are expired. + This function operates non-interactively and will not prompt the user to log in. + """ + token_file = os.path.join(config.CREDENTIALS_DIR, f"{email}.json") + creds = None + if os.path.exists(token_file): + try: + creds = Credentials.from_authorized_user_file(token_file, config.SCOPES) + except (ValueError, json.JSONDecodeError): + logging.warning(f"Could not decode token file for {email}. Re-authentication will be required.") + return None + + if creds and creds.valid: + return creds + + if creds and creds.expired and creds.refresh_token: + for attempt in range(max_retries): + try: + logging.info(f"Refreshing credentials for {email} (attempt {attempt + 1}/{max_retries})...") + creds.refresh(google.auth.transport.requests.Request()) + with open(token_file, "w") as token: + token.write(creds.to_json()) + logging.info(f"Successfully refreshed credentials for {email}.") + return creds + except Exception as e: + logging.warning(f"Failed to refresh credentials for {email} on attempt {attempt + 1}: {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + + logging.error(f"Failed to refresh credentials for {email} after {max_retries} attempts.") + return None + + return None + +def run_interactive_auth(email, max_retries=3, retry_delay=5): + """ + Initiates an interactive, browser-based OAuth2 flow to get new credentials for a user. + The new credentials are then saved to a token file for future non-interactive use. + """ + for attempt in range(max_retries): + try: + logging.info(f"Please authenticate with: {email} (attempt {attempt + 1}/{max_retries})") + flow = InstalledAppFlow.from_client_secrets_file( + config.CLIENT_SECRETS_FILE, config.SCOPES + ) + creds = flow.run_local_server(port=0) + token_file = os.path.join(config.CREDENTIALS_DIR, f"{email}.json") + with open(token_file, "w") as token: + token.write(creds.to_json()) + return creds + except Exception as e: + logging.error(f"An unexpected error occurred during authentication for {email} on attempt {attempt + 1}: {e}") + if attempt < max_retries - 1: + logging.info(f"Retrying authentication in {retry_delay} seconds...") + time.sleep(retry_delay) + + logging.error(f"Failed to authenticate {email} after {max_retries} attempts.") + return None diff --git a/gemini_key_manager/config.py b/gemini_key_manager/config.py new file mode 100644 index 0000000..496bf71 --- /dev/null +++ b/gemini_key_manager/config.py @@ -0,0 +1,23 @@ +"""This module defines configuration constants used throughout the application.""" +import os + +# --- Directory Paths --- +CREDENTIALS_DIR = "credentials" +LOG_DIR = "logs" +SCHEMA_DIR = "schemas" + +# --- File Names --- +EMAILS_FILE = "emails.txt" +CLIENT_SECRETS_FILE = "credentials.json" +API_KEYS_DATABASE_FILE = "api_keys_database.json" + +# --- Schema Configuration --- +API_KEYS_SCHEMA_FILE = os.path.join(SCHEMA_DIR, "v1", "api_keys_database.schema.json") + +# --- Google API Settings --- +SCOPES = [ + "https://www.googleapis.com/auth/cloud-platform", +] +GENERATIVE_LANGUAGE_API = "generativelanguage.googleapis.com" +GEMINI_API_KEY_DISPLAY_NAME = "Gemini API Key" +GENERATIVE_LANGUAGE_API_KEY_DISPLAY_NAME = "Generative Language API Key" diff --git a/gemini_key_manager/database.py b/gemini_key_manager/database.py new file mode 100644 index 0000000..34f34a8 --- /dev/null +++ b/gemini_key_manager/database.py @@ -0,0 +1,123 @@ +"""This module handles all interactions with the local JSON database.""" +import os +import json +import logging +import sys +from datetime import datetime, timezone +import jsonschema +from . import config + +def load_schema(filename): + """Loads and parses a JSON schema file.""" + if not os.path.exists(filename): + logging.error(f"Schema file not found at '{filename}'") + sys.exit(1) + with open(filename, "r") as f: + try: + return json.load(f) + except json.JSONDecodeError: + logging.error(f"Could not decode JSON schema from {filename}.") + sys.exit(1) + +def load_keys_database(filename, schema): + """ + Loads the API keys database from a JSON file. + If the file doesn't exist, is empty, or invalid, it returns a new, empty database structure. + """ + if not os.path.exists(filename): + return { + "schema_version": "1.0.0", + "accounts": [] + } + with open(filename, "r") as f: + try: + data = json.load(f) + jsonschema.validate(instance=data, schema=schema) + return data + except json.JSONDecodeError: + logging.warning(f"Could not decode JSON from {filename}. Starting fresh.") + except jsonschema.ValidationError as e: + logging.warning(f"Database file '{filename}' is not valid. {e.message}. Starting fresh.") + + return { + "schema_version": "1.0.0", + "accounts": [] + } + +def save_keys_to_json(data, filename, schema): + """Saves the provided data structure to a JSON file after validating it against the schema.""" + now = datetime.now(timezone.utc).isoformat() + data["generation_timestamp_utc"] = data.get("generation_timestamp_utc", now) + data["last_modified_utc"] = now + try: + jsonschema.validate(instance=data, schema=schema) + with open(filename, "w") as f: + json.dump(data, f, indent=2) + logging.info(f"--- Database saved to {filename} ---") + except jsonschema.ValidationError as e: + logging.error(f"Data to be saved is invalid. Could not write to '{filename}'.") + logging.error(f"Validation Error: {e.message}") + sys.exit(1) + +def add_key_to_database(account_entry, project, key_object): + """ + Adds a new API key to the database under the appropriate account and project. + If the project does not exist for the account, it will be created. + """ + project_id = project.project_id + + project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) + if not project_entry: + project_entry = { + "project_info": { + "project_id": project_id, + "project_name": project.display_name, + "project_number": project.name.split('/')[-1], + "state": str(project.state) + }, + "api_keys": [] + } + account_entry["projects"].append(project_entry) + + api_targets = [] + if key_object.restrictions and key_object.restrictions.api_targets: + for target in key_object.restrictions.api_targets: + api_targets.append({"service": target.service, "methods": []}) + + new_key_entry = { + "key_details": { + "key_string": key_object.key_string, + "key_id": key_object.uid, + "key_name": key_object.name, + "display_name": key_object.display_name, + "creation_timestamp_utc": key_object.create_time.isoformat(), + "last_updated_timestamp_utc": key_object.update_time.isoformat(), + }, + "restrictions": { + "api_targets": api_targets + }, + "state": "ACTIVE" + } + + existing_key = next((k for k in project_entry["api_keys"] if k.get("key_details", {}).get("key_id") == key_object.uid), None) + if not existing_key: + project_entry["api_keys"].append(new_key_entry) + logging.info(f" Added key {key_object.uid} to local database for project {project_id}") + else: + logging.warning(f" Key {key_object.uid} already exists in local database for project {project_id}") + +def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): + """Removes a list of API keys from a project's entry in the database.""" + project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) + if not project_entry: + return + + initial_key_count = len(project_entry["api_keys"]) + project_entry["api_keys"] = [ + key for key in project_entry["api_keys"] + if key.get("key_details", {}).get("key_id") not in deleted_keys_uids + ] + final_key_count = len(project_entry["api_keys"]) + num_removed = initial_key_count - final_key_count + if num_removed > 0: + logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") diff --git a/gemini_key_manager/gcp_api.py b/gemini_key_manager/gcp_api.py new file mode 100644 index 0000000..5275aea --- /dev/null +++ b/gemini_key_manager/gcp_api.py @@ -0,0 +1,192 @@ +"""This module contains functions for interacting with various Google Cloud Platform APIs.""" +import logging +import time +import concurrent.futures +from datetime import datetime, timezone +from google.cloud import resourcemanager_v3, service_usage_v1, api_keys_v2 +from google.api_core import exceptions as google_exceptions +from . import config, utils + +def enable_api(project_id, credentials, dry_run=False): + """Enables the Generative Language API for a given project.""" + service_name = config.GENERATIVE_LANGUAGE_API + service_path = f"projects/{project_id}/services/{service_name}" + service_usage_client = service_usage_v1.ServiceUsageClient(credentials=credentials) + + try: + logging.info(f" Attempting to enable Generative Language API for project {project_id}...") + if dry_run: + logging.info(f" [DRY RUN] Would enable API for project {project_id}") + return True + + enable_request = service_usage_v1.EnableServiceRequest(name=service_path) + operation = service_usage_client.enable_service(request=enable_request) + # This is a long-running operation, so we wait for it to complete. + operation.result() + logging.info(f" Successfully enabled Generative Language API for project {project_id}") + return True + + except google_exceptions.PermissionDenied: + logging.warning(f" Permission denied to enable API for project {project_id}. Skipping.") + return False + except google_exceptions.GoogleAPICallError as err: + logging.error(f" Error enabling API for project {project_id}: {err}") + return False + +def create_api_key(project_id, credentials, dry_run=False): + """ + Creates a new API key in the specified project. + The key is restricted to only allow access to the Generative Language API. + """ + if dry_run: + logging.info(f" [DRY RUN] Would create API key for project {project_id}") + # In a dry run, return a mock key object to allow the rest of the logic to proceed. + return api_keys_v2.Key( + name=f"projects/{project_id}/locations/global/keys/mock-key-id", + uid="mock-key-id", + display_name=config.GEMINI_API_KEY_DISPLAY_NAME, + key_string="mock-key-string-for-dry-run", + create_time=datetime.now(timezone.utc), + update_time=datetime.now(timezone.utc), + restrictions=api_keys_v2.Restrictions( + api_targets=[ + api_keys_v2.ApiTarget(service=config.GENERATIVE_LANGUAGE_API) + ] + ), + ) + + try: + api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) + api_target = api_keys_v2.ApiTarget(service=config.GENERATIVE_LANGUAGE_API) + key = api_keys_v2.Key( + display_name=config.GEMINI_API_KEY_DISPLAY_NAME, + restrictions=api_keys_v2.Restrictions(api_targets=[api_target]), + ) + request = api_keys_v2.CreateKeyRequest( + parent=f"projects/{project_id}/locations/global", + key=key, + ) + logging.info(" Creating API key...") + operation = api_keys_client.create_key(request=request) + result = operation.result() + logging.info(f" Successfully created restricted API key for project {project_id}") + return result + except google_exceptions.PermissionDenied: + logging.warning(f" Permission denied to create API key for project {project_id}. Skipping.") + return None + except google_exceptions.GoogleAPICallError as err: + logging.error(f" Error creating API key for project {project_id}: {err}") + return None + +def delete_api_keys(project_id, credentials, dry_run=False): + """Deletes all API keys with the configured display name from a project.""" + deleted_keys_uids = [] + try: + api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) + parent = f"projects/{project_id}/locations/global" + + keys = api_keys_client.list_keys(parent=parent) + keys_to_delete = [key for key in keys if key.display_name == config.GEMINI_API_KEY_DISPLAY_NAME] + + if not keys_to_delete: + logging.info(f" No '{config.GEMINI_API_KEY_DISPLAY_NAME}' found to delete.") + return [] + + logging.info(f" Found {len(keys_to_delete)} key(s) with display name '{config.GEMINI_API_KEY_DISPLAY_NAME}'. Deleting...") + for key in keys_to_delete: + if dry_run: + logging.info(f" [DRY RUN] Would delete key: {key.uid}") + deleted_keys_uids.append(key.uid) + continue + try: + request = api_keys_v2.DeleteKeyRequest(name=key.name) + operation = api_keys_client.delete_key(request=request) + operation.result() + logging.info(f" Successfully deleted key: {key.uid}") + deleted_keys_uids.append(key.uid) + except google_exceptions.GoogleAPICallError as err: + logging.error(f" Error deleting key {key.uid}: {err}") + return deleted_keys_uids + except google_exceptions.PermissionDenied: + logging.warning(f" Permission denied to list or delete API keys for project {project_id}. Skipping.") + except google_exceptions.GoogleAPICallError as err: + logging.error(f" An API error occurred while deleting keys for project {project_id}: {err}") + return [] + + + +def _create_single_project(project_number, creds, dry_run, timeout_seconds=300, initial_delay=5): + """ + Creates a new GCP project and waits for it to be ready. + Readiness is determined by successfully enabling the Generative Language API. + """ + random_string = utils.generate_random_string() + project_id = f"project{project_number}-{random_string}" + display_name = f"Project{project_number}" + + logging.info(f"Attempting to create project: ID='{project_id}', Name='{display_name}'") + + if dry_run: + logging.info(f"[DRY RUN] Would create project '{display_name}' with ID '{project_id}'.") + return None + + try: + resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) + project_to_create = resourcemanager_v3.Project( + project_id=project_id, + display_name=display_name + ) + operation = resource_manager.create_project(project=project_to_create) + logging.info(f"Waiting for project creation operation for '{display_name}' to complete...") + created_project = operation.result() + logging.info(f"Successfully initiated creation for project '{display_name}'.") + + # After creation, there can be a delay before the project is fully available + # for API enablement. This loop polls until the API can be enabled. + start_time = time.time() + delay = initial_delay + while time.time() - start_time < timeout_seconds: + if enable_api(project_id, creds): + logging.info(f"Generative AI API enabled for project '{display_name}' ({project_id}). Project is ready.") + return created_project + else: + logging.info(f"Waiting for project '{display_name}' ({project_id}) to become ready... Retrying in {delay} seconds.") + time.sleep(delay) + delay = min(delay * 2, 30) + + logging.error(f"Timed out waiting for project '{display_name}' ({project_id}) to become ready after {timeout_seconds} seconds.") + return None + + except Exception as e: + logging.error(f"Failed to create project '{display_name}': {e}") + return None + +def create_projects_if_needed(projects, creds, dry_run=False, max_workers=5): + """Creates new GCP projects in parallel until the account has at least 12 projects.""" + existing_project_count = len(projects) + logging.info(f"Found {existing_project_count} existing projects.") + newly_created_projects = [] + + if existing_project_count >= 12: + logging.info("Account already has 12 or more projects. No new projects will be created.") + return newly_created_projects + + projects_to_create_count = 12 - existing_project_count + logging.info(f"Need to create {projects_to_create_count} more projects.") + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_project_number = { + executor.submit(_create_single_project, str(i + 1).zfill(2), creds, dry_run): i + for i in range(existing_project_count, 12) + } + + for future in concurrent.futures.as_completed(future_to_project_number): + try: + created_project = future.result() + if created_project: + newly_created_projects.append(created_project) + except Exception as exc: + project_number = future_to_project_number[future] + logging.error(f"Project number {project_number} generated an exception: {exc}", exc_info=True) + + return newly_created_projects diff --git a/gemini_key_manager/main.py b/gemini_key_manager/main.py new file mode 100644 index 0000000..5398500 --- /dev/null +++ b/gemini_key_manager/main.py @@ -0,0 +1,90 @@ +"""This is the main entry point for the Gemini Key Management command-line tool.""" +import argparse +import logging +import sys +import os +import concurrent.futures +from . import utils, config, auth, database, actions + +def main(): + """Parses command-line arguments and orchestrates the key management process.""" + parser = argparse.ArgumentParser(description="Manage Gemini API keys in Google Cloud projects.") + parser.add_argument("action", choices=['create', 'delete'], help="The action to perform: 'create' or 'delete' API keys.") + parser.add_argument("--email", help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.") + parser.add_argument("--dry-run", action="store_true", help="Simulate the run without making any actual changes to Google Cloud resources.") + parser.add_argument("--max-workers", type=int, default=5, help="The maximum number of concurrent projects to process.") + parser.add_argument("--auth-retries", type=int, default=3, help="Number of retries for a failed authentication attempt.") + parser.add_argument("--auth-retry-delay", type=int, default=5, help="Delay in seconds between authentication retries.") + args = parser.parse_args() + + utils.setup_logging() + logging.info(f"Program arguments: {vars(args)}") + + if args.action == 'delete' and not args.email: + parser.error("the --email argument is required for the 'delete' action") + + if not os.path.exists(config.CLIENT_SECRETS_FILE): + logging.error(f"OAuth client secrets file not found at '{config.CLIENT_SECRETS_FILE}'") + logging.error("Please follow the setup instructions in README.md to create it.") + sys.exit(1) + + if not os.path.exists(config.CREDENTIALS_DIR): + os.makedirs(config.CREDENTIALS_DIR) + + schema = database.load_schema(config.API_KEYS_SCHEMA_FILE) + api_keys_data = database.load_keys_database(config.API_KEYS_DATABASE_FILE, schema) + + emails_to_process = [] + if args.email: + emails_to_process.append(args.email) + elif args.action == 'delete': + logging.error("The 'delete' action requires the --email argument to specify which account's keys to delete.") + sys.exit(1) + else: # 'create' action without a specific email + emails_to_process = utils.load_emails_from_file(config.EMAILS_FILE) + if not emails_to_process: + logging.info("No emails found in emails.txt. Exiting.") + sys.exit(1) + + creds_map = {} + emails_needing_interactive_auth = [] + + logging.info("Checking credentials and refreshing tokens for all accounts...") + + with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor: + future_to_email = {executor.submit(auth.get_and_refresh_credentials, email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay): email for email in emails_to_process} + + for future in concurrent.futures.as_completed(future_to_email): + email = future_to_email[future] + try: + creds = future.result() + if creds: + creds_map[email] = creds + else: + emails_needing_interactive_auth.append(email) + except Exception as exc: + logging.error(f"Credential check for {email} generated an exception: {exc}", exc_info=True) + emails_needing_interactive_auth.append(email) + + if emails_needing_interactive_auth: + logging.info(f"\n--- INTERACTIVE AUTHENTICATION REQUIRED ---") + logging.info(f"The following accounts require manual authentication: {', '.join(sorted(emails_needing_interactive_auth))}") + + for email in sorted(emails_needing_interactive_auth): + creds = auth.run_interactive_auth(email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay) + if creds: + logging.info(f"Successfully authenticated {email}.") + creds_map[email] = creds + else: + logging.warning(f"Authentication failed or was cancelled for {email}. This account will be skipped.") + + logging.info("\n--- Credential checking complete ---") + + for email in emails_to_process: + if email in creds_map: + actions.process_account(email, creds_map[email], args.action, api_keys_data, dry_run=args.dry_run, max_workers=args.max_workers) + else: + logging.warning(f"Skipping account {email} because authentication was not successful.") + + if not args.dry_run: + database.save_keys_to_json(api_keys_data, config.API_KEYS_DATABASE_FILE, schema) diff --git a/gemini_key_manager/utils.py b/gemini_key_manager/utils.py new file mode 100644 index 0000000..9121ee1 --- /dev/null +++ b/gemini_key_manager/utils.py @@ -0,0 +1,87 @@ +"""This module provides utility functions for logging, file handling, and string generation.""" +import logging +import os +import sys +import random +import string +from datetime import datetime, timezone +from colorama import Fore, Style, init +from . import config + +class ColoredFormatter(logging.Formatter): + """A logging formatter that adds color to console output for different log levels.""" + + LOG_COLORS = { + logging.DEBUG: Fore.CYAN, + logging.INFO: Fore.GREEN, + logging.WARNING: Fore.YELLOW, + logging.ERROR: Fore.RED, + logging.CRITICAL: Fore.RED + Style.BRIGHT, + } + + def format(self, record): + """Applies color to the formatted log message.""" + color = self.LOG_COLORS.get(record.levelno) + message = super().format(record) + if color: + # For better readability, only color the message part of the log string. + parts = message.split(" - ", 2) + if len(parts) > 2: + parts[2] = color + parts[2] + Style.RESET_ALL + message = " - ".join(parts) + else: + message = color + message + Style.RESET_ALL + return message + +def setup_logging(): + """ + Configures the root logger to output to both a timestamped file and the console. + Console output is colorized for readability. + """ + init(autoreset=True) # Required for colorama on Windows + + if not os.path.exists(config.LOG_DIR): + os.makedirs(config.LOG_DIR) + + log_filename = f"gemini_key_management_{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H-%M-%S')}.log" + log_filepath = os.path.join(config.LOG_DIR, log_filename) + + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + # Avoids duplicate log messages if the function is called multiple times. + if logger.hasHandlers(): + logger.handlers.clear() + + # The file handler logs detailed, non-colored messages. + file_handler = logging.FileHandler(log_filepath, encoding='utf-8') + file_formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - [%(name)s:%(module)s:%(lineno)d] - %(message)s" + ) + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + # The console handler logs concise, colored messages. + console_handler = logging.StreamHandler(sys.stdout) + console_formatter = ColoredFormatter("%(asctime)s - %(levelname)s - %(message)s") + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + logging.info(f"Logging initialized. Log file: {log_filepath}") + +def load_emails_from_file(filename): + """ + Reads a list of email addresses from a text file. + It ignores empty lines and lines that start with a '#' comment character. + """ + if not os.path.exists(filename): + logging.error(f"Email file not found at '{filename}'") + logging.info("Please create it and add one email address per line.") + return [] + with open(filename, "r") as f: + return [line.strip() for line in f if line.strip() and not line.startswith("#")] + +def generate_random_string(length=10): + """Generates a random alphanumeric string for creating unique project IDs.""" + letters_and_digits = string.ascii_lowercase + string.digits + return ''.join(random.choice(letters_and_digits) for i in range(length)) diff --git a/main.py b/main.py index 4d616df..507dff0 100644 --- a/main.py +++ b/main.py @@ -1,729 +1,12 @@ -import os -import sys -import argparse -import json -import logging -import threading -import concurrent.futures -from datetime import datetime, timezone -import jsonschema -import google.auth -from colorama import Fore, Style, init -from google.oauth2.credentials import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow -from google.cloud import resourcemanager_v3, service_usage_v1, api_keys_v2 -from google.api_core import exceptions as google_exceptions -from google.auth.transport import requests -import random -import string -import time +""" +This script is the entry point for the Gemini Key Management application. -# --- CONFIGURATION --- -CREDENTIALS_DIR = "credentials" -EMAILS_FILE = "emails.txt" -CLIENT_SECRETS_FILE = "credentials.json" -API_KEYS_DATABASE_FILE = "api_keys_database.json" -API_KEYS_SCHEMA_FILE = os.path.join("schemas", "v1", "api_keys_database.schema.json") -LOG_DIR = "logs" -# --------------------- +It imports the main function from the `gemini_key_manager` package and executes it. +This allows the application to be run as a script, while the core logic +is organized within the `gemini_key_manager` package. +""" -# --- LOGGING SETUP --- -class ColoredFormatter(logging.Formatter): - """A custom logging formatter that adds color to console output.""" - - LOG_COLORS = { - logging.DEBUG: Fore.CYAN, - logging.INFO: Fore.GREEN, - logging.WARNING: Fore.YELLOW, - logging.ERROR: Fore.RED, - logging.CRITICAL: Fore.RED + Style.BRIGHT, - } - - def format(self, record): - """Formats the log record with appropriate colors.""" - color = self.LOG_COLORS.get(record.levelno) - message = super().format(record) - if color: - # Only color the message part for readability - parts = message.split(" - ", 2) - if len(parts) > 2: - parts[2] = color + parts[2] + Style.RESET_ALL - message = " - ".join(parts) - else: - message = color + message + Style.RESET_ALL - return message - -def setup_logging(): - """Sets up logging to both console and a file, with colors for the console.""" - init(autoreset=True) # Initialize Colorama - - if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR) - - log_filename = f"gemini_key_management_{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H-%M-%S')}.log" - log_filepath = os.path.join(LOG_DIR, log_filename) - - logger = logging.getLogger() - logger.setLevel(logging.INFO) - - # Clear existing handlers to avoid duplicate logs - if logger.hasHandlers(): - logger.handlers.clear() - - # File handler for detailed, non-colored logging - file_handler = logging.FileHandler(log_filepath, encoding='utf-8') - file_formatter = logging.Formatter( - "%(asctime)s - %(levelname)s - [%(name)s:%(module)s:%(lineno)d] - %(message)s" - ) - file_handler.setFormatter(file_formatter) - logger.addHandler(file_handler) - - # Console handler for concise, colored logging - console_handler = logging.StreamHandler(sys.stdout) - console_formatter = ColoredFormatter("%(asctime)s - %(levelname)s - %(message)s") - console_handler.setFormatter(console_formatter) - logger.addHandler(console_handler) - - logging.info(f"Logging initialized. Log file: {log_filepath}") - -setup_logging() -# --------------------- - - -SCOPES = [ - "https://www.googleapis.com/auth/cloud-platform", -] - -def load_schema(filename): - """Loads a JSON schema from a file.""" - if not os.path.exists(filename): - logging.error(f"Schema file not found at '{filename}'") - sys.exit(1) - with open(filename, "r") as f: - try: - return json.load(f) - except json.JSONDecodeError: - logging.error(f"Could not decode JSON schema from {filename}.") - sys.exit(1) - -def load_emails_from_file(filename): - """Loads a list of emails from a text file, ignoring comments.""" - if not os.path.exists(filename): - logging.error(f"Email file not found at '{filename}'") - logging.info("Please create it and add one email address per line.") - return [] - with open(filename, "r") as f: - # Ignore empty lines and lines starting with # - return [line.strip() for line in f if line.strip() and not line.startswith("#")] - -def load_keys_database(filename, schema): - """Loads and validates the JSON database of API keys.""" - if not os.path.exists(filename): - return { - "schema_version": "1.0.0", - "accounts": [] - } - with open(filename, "r") as f: - try: - data = json.load(f) - jsonschema.validate(instance=data, schema=schema) - return data - except json.JSONDecodeError: - logging.warning(f"Could not decode JSON from {filename}. Starting fresh.") - except jsonschema.ValidationError as e: - logging.warning(f"Database file '{filename}' is not valid. {e.message}. Starting fresh.") - - return { - "schema_version": "1.0.0", - "accounts": [] - } - - -def save_keys_to_json(data, filename, schema): - """Validates and saves the API key data to a single JSON file.""" - now = datetime.now(timezone.utc).isoformat() - data["generation_timestamp_utc"] = data.get("generation_timestamp_utc", now) - data["last_modified_utc"] = now - try: - jsonschema.validate(instance=data, schema=schema) - with open(filename, "w") as f: - json.dump(data, f, indent=2) - logging.info(f"--- Database saved to {filename} ---") - except jsonschema.ValidationError as e: - logging.error(f"Data to be saved is invalid. Could not write to '{filename}'.") - logging.error(f"Validation Error: {e.message}") - sys.exit(1) - - -def main(): - """Main function to orchestrate API key creation or deletion.""" - parser = argparse.ArgumentParser(description="Manage Gemini API keys in Google Cloud projects.") - parser.add_argument("action", choices=['create', 'delete'], help="The action to perform: 'create' or 'delete' API keys.") - parser.add_argument("--email", help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.") - parser.add_argument("--dry-run", action="store_true", help="Simulate the run without making any actual changes to Google Cloud resources.") - parser.add_argument("--max-workers", type=int, default=5, help="The maximum number of concurrent projects to process.") - parser.add_argument("--auth-retries", type=int, default=3, help="Number of retries for a failed authentication attempt.") - parser.add_argument("--auth-retry-delay", type=int, default=5, help="Delay in seconds between authentication retries.") - args = parser.parse_args() - - logging.info(f"Program arguments: {vars(args)}") - - if args.action == 'delete' and not args.email: - parser.error("the --email argument is required for the 'delete' action") - - if not os.path.exists(CLIENT_SECRETS_FILE): - logging.error(f"OAuth client secrets file not found at '{CLIENT_SECRETS_FILE}'") - logging.error("Please follow the setup instructions in README.md to create it.") - sys.exit(1) - - if not os.path.exists(CREDENTIALS_DIR): - os.makedirs(CREDENTIALS_DIR) - - schema = load_schema(API_KEYS_SCHEMA_FILE) - api_keys_data = load_keys_database(API_KEYS_DATABASE_FILE, schema) - - emails_to_process = [] - if args.email: - emails_to_process.append(args.email) - elif args.action == 'delete': - logging.error("The 'delete' action requires the --email argument to specify which account's keys to delete.") - sys.exit(1) - else: # action is 'create' and no email provided - emails_to_process = load_emails_from_file(EMAILS_FILE) - if not emails_to_process: - logging.info("No emails found in emails.txt. Exiting.") - sys.exit(1) - - # --- New Authentication Logic --- - creds_map = {} - emails_needing_interactive_auth = [] - - logging.info("Checking credentials and refreshing tokens for all accounts...") - - with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor: - future_to_email = {executor.submit(get_and_refresh_credentials, email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay): email for email in emails_to_process} - - for future in concurrent.futures.as_completed(future_to_email): - email = future_to_email[future] - try: - creds = future.result() - if creds: - creds_map[email] = creds - else: - emails_needing_interactive_auth.append(email) - except Exception as exc: - logging.error(f"Credential check for {email} generated an exception: {exc}", exc_info=True) - emails_needing_interactive_auth.append(email) - - if emails_needing_interactive_auth: - logging.info(f"\n--- INTERACTIVE AUTHENTICATION REQUIRED ---") - logging.info(f"The following accounts require manual authentication: {', '.join(sorted(emails_needing_interactive_auth))}") - - for email in sorted(emails_needing_interactive_auth): - creds = run_interactive_auth(email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay) - if creds: - logging.info(f"Successfully authenticated {email}.") - creds_map[email] = creds - else: - logging.warning(f"Authentication failed or was cancelled for {email}. This account will be skipped.") - - logging.info("\n--- Credential checking complete ---") - - for email in emails_to_process: - if email in creds_map: - process_account(email, creds_map[email], args.action, api_keys_data, dry_run=args.dry_run, max_workers=args.max_workers) - else: - logging.warning(f"Skipping account {email} because authentication was not successful.") - - if not args.dry_run: - save_keys_to_json(api_keys_data, API_KEYS_DATABASE_FILE, schema) - - - -def sync_project_keys(project, creds, dry_run, db_lock, account_entry): - """Synchronizes API keys between Google Cloud and the local database for a single project. - Returns True if a Gemini API key exists in the project, False otherwise.""" - - # Helper class to create a mock key object compatible with add_key_to_database - class TempKey: - def __init__(self, cloud_key, key_string): - self.key_string = key_string - self.uid = cloud_key.uid - self.name = cloud_key.name - self.display_name = cloud_key.display_name - self.create_time = cloud_key.create_time - self.update_time = cloud_key.update_time - self.restrictions = cloud_key.restrictions - - project_id = project.project_id - logging.info(f" Synchronizing keys for project {project_id}") - gemini_key_exists = False - - try: - api_keys_client = api_keys_v2.ApiKeysClient(credentials=creds) - parent = f"projects/{project_id}/locations/global" - - # 1. Fetch cloud keys - cloud_keys_list = list(api_keys_client.list_keys(parent=parent)) - for key in cloud_keys_list: - if key.display_name in ["Gemini API Key", "Generative Language API Key"]: - gemini_key_exists = True - - cloud_keys = {key.uid: key for key in cloud_keys_list} - - # 2. Fetch local keys - project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) - - if not project_entry: - # If project is not in DB, create it. - project_entry = { - "project_info": { - "project_id": project.project_id, - "project_name": project.display_name, - "project_number": project.name.split('/')[-1], - "state": str(project.state) - }, - "api_keys": [] - } - with db_lock: - account_entry["projects"].append(project_entry) - - local_keys = {key['key_details']['key_id']: key for key in project_entry.get('api_keys', [])} - - # 3. Reconcile - cloud_uids = set(cloud_keys.keys()) - local_uids = set(local_keys.keys()) - - synced_uids = cloud_uids.intersection(local_uids) - cloud_only_uids = cloud_uids - local_uids - local_only_uids = local_uids - cloud_uids - - # 4. Process - for uid in synced_uids: - logging.info(f" Key {uid} is synchronized.") - - for uid in cloud_only_uids: - key_object = cloud_keys[uid] - logging.info(f" Key {uid} ({key_object.display_name}) found in cloud only. Adding to local database.") - if dry_run: - logging.info(f" [DRY RUN] Would fetch key string for {uid} and add to database.") - continue - - try: - # The Key object from list_keys doesn't have key_string, so we fetch it. - key_string_response = api_keys_client.get_key_string(name=key_object.name) - - hydrated_key = TempKey(key_object, key_string_response.key_string) - - with db_lock: - add_key_to_database(account_entry, project, hydrated_key) - - except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to get key string for {uid}. Skipping.") - except google_exceptions.GoogleAPICallError as err: - logging.error(f" Error getting key string for {uid}: {err}") - - for uid in local_only_uids: - logging.info(f" Key {uid} found in local database only. Marking as INACTIVE.") - if dry_run: - logging.info(f" [DRY RUN] Would mark key {uid} as INACTIVE.") - continue - - with db_lock: - local_keys[uid]['state'] = 'INACTIVE' - local_keys[uid]['key_details']['last_updated_timestamp_utc'] = datetime.now(timezone.utc).isoformat() - - return gemini_key_exists - - except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to list keys for project {project_id}. Skipping sync.") - return False - except google_exceptions.GoogleAPICallError as err: - logging.error(f" An API error occurred while syncing keys for project {project_id}: {err}") - return False - -def process_project_for_action(project, creds, action, dry_run, db_lock, account_entry): - """Processes a single project for the given action in a thread-safe manner.""" - project_id = project.project_id - logging.info(f"- Starting to process project: {project_id} ({project.display_name})") - - if action == 'create': - gemini_key_exists = sync_project_keys(project, creds, dry_run, db_lock, account_entry) - if gemini_key_exists: - logging.info(f" 'Gemini API Key' already exists in project {project_id}. Skipping creation.") - return - - if enable_api(project_id, creds, dry_run=dry_run): - key_object = create_api_key(project_id, creds, dry_run=dry_run) - if key_object: - with db_lock: - add_key_to_database(account_entry, project, key_object) - elif action == 'delete': - deleted_keys_uids = delete_api_keys(project_id, creds, dry_run=dry_run) - if deleted_keys_uids: - with db_lock: - remove_keys_from_database(account_entry, project_id, deleted_keys_uids) - logging.info(f"- Finished processing project: {project_id}") - - -def generate_random_string(length=10): - """Generates a random alphanumeric string of a given length.""" - letters_and_digits = string.ascii_lowercase + string.digits - return ''.join(random.choice(letters_and_digits) for i in range(length)) - - -def wait_for_project_ready(project_id, creds, timeout_seconds=300, initial_delay=5): - """Waits for a newly created project to become fully active.""" - logging.info(f" Waiting for project {project_id} to become fully active...") - resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) - start_time = time.time() - delay = initial_delay - - while time.time() - start_time < timeout_seconds: - try: - resource_manager.get_project(name=f"projects/{project_id}") - logging.info(f" Project {project_id} is now active.") - return True - except google_exceptions.NotFound: - logging.info(f" Project {project_id} not found yet. Retrying in {delay} seconds...") - except google_exceptions.PermissionDenied: - logging.info(f" Project {project_id} not accessible yet. Retrying in {delay} seconds...") - except google_exceptions.GoogleAPICallError as e: - logging.warning(f" An API error occurred while waiting for project {project_id}: {e}. Retrying in {delay} seconds...") - - time.sleep(delay) - delay = min(delay * 2, 30) - - logging.error(f" Timed out waiting for project {project_id} to become active after {timeout_seconds} seconds.") - return False - - -def create_projects_if_needed(projects, creds, dry_run=False): - """Creates new projects if the account has fewer than 12 projects.""" - existing_project_count = len(projects) - logging.info(f"Found {existing_project_count} existing projects.") - newly_created_projects = [] - - if existing_project_count >= 12: - logging.info("Account already has 12 or more projects. No new projects will be created.") - return newly_created_projects - - for i in range(existing_project_count, 12): - project_number = str(i + 1).zfill(2) - random_string = generate_random_string() - project_id = f"project{project_number}-{random_string}" - display_name = f"Project{project_number}" - - logging.info(f"Attempting to create project: ID='{project_id}', Name='{display_name}'") - - if dry_run: - logging.info(f"[DRY RUN] Would create project '{display_name}' with ID '{project_id}'.") - continue - - try: - resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) - project_to_create = resourcemanager_v3.Project( - project_id=project_id, - display_name=display_name - ) - operation = resource_manager.create_project(project=project_to_create) - logging.info(f"Waiting for project creation operation for '{display_name}' to complete...") - created_project = operation.result() - logging.info(f"Successfully created project '{display_name}'.") - - if wait_for_project_ready(project_id, creds): - newly_created_projects.append(created_project) - else: - logging.error(f"Could not confirm project '{display_name}' ({project_id}) became active. It will be skipped.") - - except Exception as e: - logging.error(f"Failed to create project '{display_name}': {e}") - - return newly_created_projects - - -def process_account(email, creds, action, api_keys_data, dry_run=False, max_workers=5): - """Processes a single account for the given action.""" - logging.info(f"--- Processing account: {email} for action: {action} ---") - if dry_run: - logging.info("*** DRY RUN MODE ENABLED ***") - - if not creds: - logging.warning(f"Could not get credentials for {email}. Skipping.") - return - - account_entry = next((acc for acc in api_keys_data["accounts"] if acc.get("account_details", {}).get("email") == email), None) - if not account_entry: - account_entry = { - "account_details": { - "email": email, - "authentication_details": { - "token_file": os.path.join(CREDENTIALS_DIR, f"{email}.json"), - "scopes": SCOPES - } - }, - "projects": [] - } - api_keys_data["accounts"].append(account_entry) - - try: - resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) - projects = list(resource_manager.search_projects()) - - if action == 'create': - new_projects = create_projects_if_needed(projects, creds, dry_run) - projects.extend(new_projects) - - if not projects: - logging.info(f"No projects found for {email}.") - return - - logging.info(f"Found {len(projects)} projects. Processing with up to {max_workers} workers...") - - db_lock = threading.Lock() - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_project = { - executor.submit(process_project_for_action, project, creds, action, dry_run, db_lock, account_entry): project - for project in projects - } - for future in concurrent.futures.as_completed(future_to_project): - project = future_to_project[future] - try: - future.result() - except Exception as exc: - logging.error(f"Project {project.project_id} generated an exception: {exc}", exc_info=True) - - except google_exceptions.PermissionDenied as err: - logging.error(f"Permission denied for account {email}. Check IAM roles.") - logging.error(f" Error: {err}") - except google_exceptions.GoogleAPICallError as err: - logging.error(f"An API error occurred while processing account {email}: {err}") - -def add_key_to_database(account_entry, project, key_object): - """Adds a new API key's details to the data structure.""" - project_id = project.project_id - - project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) - if not project_entry: - project_entry = { - "project_info": { - "project_id": project_id, - "project_name": project.display_name, - "project_number": project.name.split('/')[-1], - "state": str(project.state) - }, - "api_keys": [] - } - account_entry["projects"].append(project_entry) - - api_targets = [] - if key_object.restrictions and key_object.restrictions.api_targets: - for target in key_object.restrictions.api_targets: - api_targets.append({"service": target.service, "methods": []}) - - new_key_entry = { - "key_details": { - "key_string": key_object.key_string, - "key_id": key_object.uid, - "key_name": key_object.name, - "display_name": key_object.display_name, - "creation_timestamp_utc": key_object.create_time.isoformat(), - "last_updated_timestamp_utc": key_object.update_time.isoformat(), - }, - "restrictions": { - "api_targets": api_targets - }, - "state": "ACTIVE" - } - - existing_key = next((k for k in project_entry["api_keys"] if k.get("key_details", {}).get("key_id") == key_object.uid), None) - if not existing_key: - project_entry["api_keys"].append(new_key_entry) - logging.info(f" Added key {key_object.uid} to local database for project {project_id}") - else: - logging.warning(f" Key {key_object.uid} already exists in local database for project {project_id}") - -def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): - """Removes deleted API keys from the data structure.""" - project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) - if not project_entry: - return - - initial_key_count = len(project_entry["api_keys"]) - project_entry["api_keys"] = [ - key for key in project_entry["api_keys"] - if key.get("key_details", {}).get("key_id") not in deleted_keys_uids - ] - final_key_count = len(project_entry["api_keys"]) - num_removed = initial_key_count - final_key_count - if num_removed > 0: - logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") - -def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): - """Tries to load and refresh credentials for an email with retries. Does not start interactive flow.""" - token_file = os.path.join(CREDENTIALS_DIR, f"{email}.json") - creds = None - if os.path.exists(token_file): - try: - creds = Credentials.from_authorized_user_file(token_file, SCOPES) - except (ValueError, json.JSONDecodeError): - logging.warning(f"Could not decode token file for {email}. Re-authentication will be required.") - return None - - if creds and creds.valid: - return creds - - if creds and creds.expired and creds.refresh_token: - for attempt in range(max_retries): - try: - logging.info(f"Refreshing credentials for {email} (attempt {attempt + 1}/{max_retries})...") - creds.refresh(google.auth.transport.requests.Request()) - with open(token_file, "w") as token: - token.write(creds.to_json()) - logging.info(f"Successfully refreshed credentials for {email}.") - return creds - except Exception as e: - logging.warning(f"Failed to refresh credentials for {email} on attempt {attempt + 1}: {e}") - if attempt < max_retries - 1: - time.sleep(retry_delay) - - logging.error(f"Failed to refresh credentials for {email} after {max_retries} attempts.") - return None - - return None - -def run_interactive_auth(email, max_retries=3, retry_delay=5): - """Runs the interactive OAuth2 flow for a given email with retries.""" - for attempt in range(max_retries): - try: - logging.info(f"Please authenticate with: {email} (attempt {attempt + 1}/{max_retries})") - flow = InstalledAppFlow.from_client_secrets_file( - CLIENT_SECRETS_FILE, SCOPES - ) - creds = flow.run_local_server(port=0, login_hint=email) - token_file = os.path.join(CREDENTIALS_DIR, f"{email}.json") - with open(token_file, "w") as token: - token.write(creds.to_json()) - return creds - except Exception as e: - logging.error(f"An unexpected error occurred during authentication for {email} on attempt {attempt + 1}: {e}") - if attempt < max_retries - 1: - logging.info(f"Retrying authentication in {retry_delay} seconds...") - time.sleep(retry_delay) - - logging.error(f"Failed to authenticate {email} after {max_retries} attempts.") - return None - -def enable_api(project_id, credentials, dry_run=False): - """Enables the Generative Language API if it's not already enabled.""" - service_name = "generativelanguage.googleapis.com" - service_path = f"projects/{project_id}/services/{service_name}" - service_usage_client = service_usage_v1.ServiceUsageClient(credentials=credentials) - - try: - service_request = service_usage_v1.GetServiceRequest(name=service_path) - service = service_usage_client.get_service(request=service_request) - - if service.state == service_usage_v1.State.ENABLED: - logging.info(f" Generative Language API is already enabled for project {project_id}") - return True - - logging.info(f" API is not enabled. Attempting to enable...") - if dry_run: - logging.info(f" [DRY RUN] Would enable API for project {project_id}") - return True - - enable_request = service_usage_v1.EnableServiceRequest(name=service_path) - operation = service_usage_client.enable_service(request=enable_request) - logging.info(" Waiting for API enablement to complete...") - operation.result() - logging.info(f" Successfully enabled Generative Language API for project {project_id}") - return True - - except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to check or enable API for project {project_id}. Skipping.") - return False - except google_exceptions.GoogleAPICallError as err: - logging.error(f" Error checking or enabling API for project {project_id}: {err}") - return False - -def create_api_key(project_id, credentials, dry_run=False): - """Creates a new, restricted API key.""" - if dry_run: - logging.info(f" [DRY RUN] Would create API key for project {project_id}") - # Return a mock key object for dry run - return api_keys_v2.Key( - name=f"projects/{project_id}/locations/global/keys/mock-key-id", - uid="mock-key-id", - display_name="Gemini API Key", - key_string="mock-key-string-for-dry-run", - create_time=datetime.now(timezone.utc), - update_time=datetime.now(timezone.utc), - restrictions=api_keys_v2.Restrictions( - api_targets=[ - api_keys_v2.ApiTarget(service="generativelanguage.googleapis.com") - ] - ), - ) - - try: - api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) - api_target = api_keys_v2.ApiTarget(service="generativelanguage.googleapis.com") - key = api_keys_v2.Key( - display_name="Gemini API Key", - restrictions=api_keys_v2.Restrictions(api_targets=[api_target]), - ) - request = api_keys_v2.CreateKeyRequest( - parent=f"projects/{project_id}/locations/global", - key=key, - ) - logging.info(" Creating API key...") - operation = api_keys_client.create_key(request=request) - result = operation.result() - logging.info(f" Successfully created restricted API key for project {project_id}") - return result - except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to create API key for project {project_id}. Skipping.") - return None - except google_exceptions.GoogleAPICallError as err: - logging.error(f" Error creating API key for project {project_id}: {err}") - return None - -def delete_api_keys(project_id, credentials, dry_run=False): - """Deletes all API keys with the display name 'Gemini API Key' and returns their UIDs.""" - deleted_keys_uids = [] - try: - api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) - parent = f"projects/{project_id}/locations/global" - - keys = api_keys_client.list_keys(parent=parent) - keys_to_delete = [key for key in keys if key.display_name == "Gemini API Key"] - - if not keys_to_delete: - logging.info(f" No 'Gemini API Key' found to delete.") - return [] - - logging.info(f" Found {len(keys_to_delete)} key(s) with display name 'Gemini API Key'. Deleting...") - for key in keys_to_delete: - if dry_run: - logging.info(f" [DRY RUN] Would delete key: {key.uid}") - deleted_keys_uids.append(key.uid) - continue - try: - request = api_keys_v2.DeleteKeyRequest(name=key.name) - operation = api_keys_client.delete_key(request=request) - operation.result() - logging.info(f" Successfully deleted key: {key.uid}") - deleted_keys_uids.append(key.uid) - except google_exceptions.GoogleAPICallError as err: - logging.error(f" Error deleting key {key.uid}: {err}") - return deleted_keys_uids - except google_exceptions.PermissionDenied: - logging.warning(f" Permission denied to list or delete API keys for project {project_id}. Skipping.") - except google_exceptions.GoogleAPICallError as err: - logging.error(f" An API error occurred while deleting keys for project {project_id}: {err}") - return [] +from gemini_key_manager.main import main if __name__ == "__main__": - main() \ No newline at end of file + main()