import os import sys import argparse import json import logging from datetime import datetime, timezone import jsonschema import google.auth from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.cloud import resourcemanager_v3, service_usage_v1, api_keys_v2 from google.api_core import exceptions as google_exceptions from google.auth.transport import requests # --- CONFIGURATION --- CREDENTIALS_DIR = "credentials" EMAILS_FILE = "emails.txt" CLIENT_SECRETS_FILE = "credentials.json" API_KEYS_DATABASE_FILE = "api_keys_database.json" API_KEYS_SCHEMA_FILE = os.path.join("schemas", "v1", "api_keys_database.schema.json") # --------------------- # --- LOGGING SETUP --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", stream=sys.stdout, ) # --------------------- SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", ] def load_schema(filename): """Loads a JSON schema from a file.""" if not os.path.exists(filename): logging.error(f"Schema file not found at '{filename}'") sys.exit(1) with open(filename, "r") as f: try: return json.load(f) except json.JSONDecodeError: logging.error(f"Could not decode JSON schema from {filename}.") sys.exit(1) def load_emails_from_file(filename): """Loads a list of emails from a text file, ignoring comments.""" if not os.path.exists(filename): logging.error(f"Email file not found at '{filename}'") logging.info("Please create it and add one email address per line.") return [] with open(filename, "r") as f: # Ignore empty lines and lines starting with # return [line.strip() for line in f if line.strip() and not line.startswith("#")] def load_keys_database(filename, schema): """Loads and validates the JSON database of API keys.""" if not os.path.exists(filename): return { "schema_version": "1.0.0", "accounts": [] } with open(filename, "r") as f: try: data = json.load(f) jsonschema.validate(instance=data, schema=schema) return data except json.JSONDecodeError: logging.warning(f"Could not decode JSON from {filename}. Starting fresh.") except jsonschema.ValidationError as e: logging.warning(f"Database file '{filename}' is not valid. {e.message}. Starting fresh.") return { "schema_version": "1.0.0", "accounts": [] } def save_keys_to_json(data, filename, schema): """Validates and saves the API key data to a single JSON file.""" now = datetime.now(timezone.utc).isoformat() data["generation_timestamp_utc"] = data.get("generation_timestamp_utc", now) data["last_modified_utc"] = now try: jsonschema.validate(instance=data, schema=schema) with open(filename, "w") as f: json.dump(data, f, indent=2) logging.info(f"--- Database saved to {filename} ---") except jsonschema.ValidationError as e: logging.error(f"Data to be saved is invalid. Could not write to '{filename}'.") logging.error(f"Validation Error: {e.message}") sys.exit(1) def main(): """Main function to orchestrate API key creation or deletion.""" parser = argparse.ArgumentParser(description="Manage Gemini API keys in Google Cloud projects.") parser.add_argument("action", choices=['create', 'delete'], help="The action to perform: 'create' or 'delete' API keys.") parser.add_argument("--email", help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.") parser.add_argument("--dry-run", action="store_true", help="Simulate the run without making any actual changes to Google Cloud resources.") args = parser.parse_args() if args.action == 'delete' and not args.email: parser.error("the --email argument is required for the 'delete' action") if not os.path.exists(CLIENT_SECRETS_FILE): logging.error(f"OAuth client secrets file not found at '{CLIENT_SECRETS_FILE}'") logging.error("Please follow the setup instructions in README.md to create it.") sys.exit(1) if not os.path.exists(CREDENTIALS_DIR): os.makedirs(CREDENTIALS_DIR) schema = load_schema(API_KEYS_SCHEMA_FILE) api_keys_data = load_keys_database(API_KEYS_DATABASE_FILE, schema) emails_to_process = [] if args.email: emails_to_process.append(args.email) elif args.action == 'delete': logging.error("The 'delete' action requires the --email argument to specify which account's keys to delete.") sys.exit(1) else: # action is 'create' and no email provided emails_to_process = load_emails_from_file(EMAILS_FILE) if not emails_to_process: logging.info("No emails found in emails.txt. Exiting.") sys.exit(1) for email in emails_to_process: process_account(email, args.action, api_keys_data, args.dry_run) if not args.dry_run: save_keys_to_json(api_keys_data, API_KEYS_DATABASE_FILE, schema) def process_account(email, action, api_keys_data, dry_run=False): """Processes a single account for the given action.""" logging.info(f"--- Processing account: {email} for action: {action} ---") if dry_run: logging.info("*** DRY RUN MODE ENABLED ***") creds = get_credentials_for_email(email) if not creds: logging.warning(f"Could not get credentials for {email}. Skipping.") return account_entry = next((acc for acc in api_keys_data["accounts"] if acc.get("account_details", {}).get("email") == email), None) if not account_entry: account_entry = { "account_details": { "email": email, "authentication_details": { "token_file": os.path.join(CREDENTIALS_DIR, f"{email}.json"), "scopes": SCOPES } }, "projects": [] } api_keys_data["accounts"].append(account_entry) try: resource_manager = resourcemanager_v3.ProjectsClient(credentials=creds) projects = list(resource_manager.search_projects()) if not projects: logging.info(f"No projects found for {email}.") return logging.info(f"Found {len(projects)} projects. Processing...") for project in projects: project_id = project.project_id logging.info(f"- Project: {project_id} ({project.display_name})") if action == 'create': if project_has_gemini_key(project_id, creds): logging.info(" 'Gemini API Key' already exists in this project. Skipping creation.") continue if enable_api(project_id, creds, dry_run=dry_run): key_object = create_api_key(project_id, creds, dry_run=dry_run) if key_object: add_key_to_database(account_entry, project, key_object) elif action == 'delete': deleted_keys_uids = delete_api_keys(project_id, creds, dry_run=dry_run) if deleted_keys_uids: remove_keys_from_database(account_entry, project_id, deleted_keys_uids) except google_exceptions.PermissionDenied as err: logging.error(f"Permission denied for account {email}. Check IAM roles.") logging.error(f" Error: {err}") except google_exceptions.GoogleAPICallError as err: logging.error(f"An API error occurred while processing account {email}: {err}") def add_key_to_database(account_entry, project, key_object): """Adds a new API key's details to the data structure.""" project_id = project.project_id project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) if not project_entry: project_entry = { "project_info": { "project_id": project_id, "project_name": project.display_name, "project_number": project.name.split('/')[-1], "state": str(project.state) }, "api_keys": [] } account_entry["projects"].append(project_entry) api_targets = [] if key_object.restrictions and key_object.restrictions.api_targets: for target in key_object.restrictions.api_targets: api_targets.append({"service": target.service, "methods": []}) new_key_entry = { "key_details": { "key_string": key_object.key_string, "key_id": key_object.uid, "key_name": key_object.name, "display_name": key_object.display_name, "creation_timestamp_utc": key_object.create_time.isoformat(), "last_updated_timestamp_utc": key_object.update_time.isoformat(), }, "restrictions": { "api_targets": api_targets }, "state": "ACTIVE" } existing_key = next((k for k in project_entry["api_keys"] if k.get("key_details", {}).get("key_id") == key_object.uid), None) if not existing_key: project_entry["api_keys"].append(new_key_entry) logging.info(f" Added key {key_object.uid} to local database for project {project_id}") else: logging.warning(f" Key {key_object.uid} already exists in local database for project {project_id}") def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): """Removes deleted API keys from the data structure.""" project_entry = next((p for p in account_entry["projects"] if p.get("project_info", {}).get("project_id") == project_id), None) if not project_entry: return initial_key_count = len(project_entry["api_keys"]) project_entry["api_keys"] = [ key for key in project_entry["api_keys"] if key.get("key_details", {}).get("key_id") not in deleted_keys_uids ] final_key_count = len(project_entry["api_keys"]) num_removed = initial_key_count - final_key_count if num_removed > 0: logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") def project_has_gemini_key(project_id, credentials): """Checks if a project already has a key named 'Gemini API Key'.""" try: api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) parent = f"projects/{project_id}/locations/global" keys = api_keys_client.list_keys(parent=parent) for key in keys: if key.display_name == "Gemini API Key": return True return False except google_exceptions.GoogleAPICallError as err: logging.error(f" Could not list keys in project {project_id}. Error: {err}") return False def get_credentials_for_email(email): """Handles the OAuth2 flow for a given email.""" token_file = os.path.join(CREDENTIALS_DIR, f"{email}.json") creds = None try: if os.path.exists(token_file): creds = Credentials.from_authorized_user_file(token_file, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: logging.info("Refreshing credentials...") creds.refresh(google.auth.transport.requests.Request()) else: logging.info(f"Please authenticate with: {email}") flow = InstalledAppFlow.from_client_secrets_file( CLIENT_SECRETS_FILE, SCOPES ) creds = flow.run_local_server(port=0, login_hint=email) with open(token_file, "w") as token: token.write(creds.to_json()) return creds except Exception as e: logging.error(f"An unexpected error occurred during authentication: {e}") return None def enable_api(project_id, credentials, dry_run=False): """Enables the Generative Language API if it's not already enabled.""" service_name = "generativelanguage.googleapis.com" service_path = f"projects/{project_id}/services/{service_name}" service_usage_client = service_usage_v1.ServiceUsageClient(credentials=credentials) try: service_request = service_usage_v1.GetServiceRequest(name=service_path) service = service_usage_client.get_service(request=service_request) if service.state == service_usage_v1.State.ENABLED: logging.info(f" Generative Language API is already enabled for project {project_id}") return True logging.info(f" API is not enabled. Attempting to enable...") if dry_run: logging.info(f" [DRY RUN] Would enable API for project {project_id}") return True enable_request = service_usage_v1.EnableServiceRequest(name=service_path) operation = service_usage_client.enable_service(request=enable_request) logging.info(" Waiting for API enablement to complete...") operation.result() logging.info(f" Successfully enabled Generative Language API for project {project_id}") return True except google_exceptions.PermissionDenied: logging.warning(f" Permission denied to check or enable API for project {project_id}. Skipping.") return False except google_exceptions.GoogleAPICallError as err: logging.error(f" Error checking or enabling API for project {project_id}: {err}") return False def create_api_key(project_id, credentials, dry_run=False): """Creates a new, restricted API key.""" if dry_run: logging.info(f" [DRY RUN] Would create API key for project {project_id}") # Return a mock key object for dry run return api_keys_v2.Key( name=f"projects/{project_id}/locations/global/keys/mock-key-id", uid="mock-key-id", display_name="Gemini API Key", key_string="mock-key-string-for-dry-run", create_time=datetime.now(timezone.utc), update_time=datetime.now(timezone.utc), restrictions=api_keys_v2.Restrictions( api_targets=[ api_keys_v2.ApiTarget(service="generativelanguage.googleapis.com") ] ), ) try: api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) api_target = api_keys_v2.ApiTarget(service="generativelanguage.googleapis.com") key = api_keys_v2.Key( display_name="Gemini API Key", restrictions=api_keys_v2.Restrictions(api_targets=[api_target]), ) request = api_keys_v2.CreateKeyRequest( parent=f"projects/{project_id}/locations/global", key=key, ) logging.info(" Creating API key...") operation = api_keys_client.create_key(request=request) result = operation.result() logging.info(f" Successfully created restricted API key for project {project_id}") return result except google_exceptions.PermissionDenied: logging.warning(f" Permission denied to create API key for project {project_id}. Skipping.") return None except google_exceptions.GoogleAPICallError as err: logging.error(f" Error creating API key for project {project_id}: {err}") return None def delete_api_keys(project_id, credentials, dry_run=False): """Deletes all API keys with the display name 'Gemini API Key' and returns their UIDs.""" deleted_keys_uids = [] try: api_keys_client = api_keys_v2.ApiKeysClient(credentials=credentials) parent = f"projects/{project_id}/locations/global" keys = api_keys_client.list_keys(parent=parent) keys_to_delete = [key for key in keys if key.display_name == "Gemini API Key"] if not keys_to_delete: logging.info(f" No 'Gemini API Key' found to delete.") return [] logging.info(f" Found {len(keys_to_delete)} key(s) with display name 'Gemini API Key'. Deleting...") for key in keys_to_delete: if dry_run: logging.info(f" [DRY RUN] Would delete key: {key.uid}") deleted_keys_uids.append(key.uid) continue try: request = api_keys_v2.DeleteKeyRequest(name=key.name) operation = api_keys_client.delete_key(request=request) operation.result() logging.info(f" Successfully deleted key: {key.uid}") deleted_keys_uids.append(key.uid) except google_exceptions.GoogleAPICallError as err: logging.error(f" Error deleting key {key.uid}: {err}") return deleted_keys_uids except google_exceptions.PermissionDenied: logging.warning(f" Permission denied to list or delete API keys for project {project_id}. Skipping.") except google_exceptions.GoogleAPICallError as err: logging.error(f" An API error occurred while deleting keys for project {project_id}: {err}") return [] if __name__ == "__main__": main()