From 9dfcbe37b6bab16b8a0d81a443533cd636de266c Mon Sep 17 00:00:00 2001 From: not-lucky Date: Fri, 22 Aug 2025 10:31:59 +0530 Subject: [PATCH] add concurrency support for authentication and enhance project key processing --- TODO.md | 1 + main.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 93 insertions(+), 20 deletions(-) create mode 100644 TODO.md diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..30f0158 --- /dev/null +++ b/TODO.md @@ -0,0 +1 @@ +During project creation readiness check, it might take some time. The time in which we could perform other tasks. \ No newline at end of file diff --git a/main.py b/main.py index be12623..4d616df 100644 --- a/main.py +++ b/main.py @@ -161,6 +161,9 @@ def main(): parser.add_argument("action", choices=['create', 'delete'], help="The action to perform: 'create' or 'delete' API keys.") parser.add_argument("--email", help="Specify a single email address to process. Required for 'delete'. If not provided for 'create', emails will be read from emails.txt.") parser.add_argument("--dry-run", action="store_true", help="Simulate the run without making any actual changes to Google Cloud resources.") + parser.add_argument("--max-workers", type=int, default=5, help="The maximum number of concurrent projects to process.") + parser.add_argument("--auth-retries", type=int, default=3, help="Number of retries for a failed authentication attempt.") + parser.add_argument("--auth-retry-delay", type=int, default=5, help="Delay in seconds between authentication retries.") args = parser.parse_args() logging.info(f"Program arguments: {vars(args)}") @@ -191,12 +194,52 @@ def main(): logging.info("No emails found in emails.txt. Exiting.") sys.exit(1) + # --- New Authentication Logic --- + creds_map = {} + emails_needing_interactive_auth = [] + + logging.info("Checking credentials and refreshing tokens for all accounts...") + + with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor: + future_to_email = {executor.submit(get_and_refresh_credentials, email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay): email for email in emails_to_process} + + for future in concurrent.futures.as_completed(future_to_email): + email = future_to_email[future] + try: + creds = future.result() + if creds: + creds_map[email] = creds + else: + emails_needing_interactive_auth.append(email) + except Exception as exc: + logging.error(f"Credential check for {email} generated an exception: {exc}", exc_info=True) + emails_needing_interactive_auth.append(email) + + if emails_needing_interactive_auth: + logging.info(f"\n--- INTERACTIVE AUTHENTICATION REQUIRED ---") + logging.info(f"The following accounts require manual authentication: {', '.join(sorted(emails_needing_interactive_auth))}") + + for email in sorted(emails_needing_interactive_auth): + creds = run_interactive_auth(email, max_retries=args.auth_retries, retry_delay=args.auth_retry_delay) + if creds: + logging.info(f"Successfully authenticated {email}.") + creds_map[email] = creds + else: + logging.warning(f"Authentication failed or was cancelled for {email}. This account will be skipped.") + + logging.info("\n--- Credential checking complete ---") + for email in emails_to_process: - process_account(email, args.action, api_keys_data, args.dry_run) + if email in creds_map: + process_account(email, creds_map[email], args.action, api_keys_data, dry_run=args.dry_run, max_workers=args.max_workers) + else: + logging.warning(f"Skipping account {email} because authentication was not successful.") if not args.dry_run: save_keys_to_json(api_keys_data, API_KEYS_DATABASE_FILE, schema) + + def sync_project_keys(project, creds, dry_run, db_lock, account_entry): """Synchronizes API keys between Google Cloud and the local database for a single project. Returns True if a Gemini API key exists in the project, False otherwise.""" @@ -399,13 +442,12 @@ def create_projects_if_needed(projects, creds, dry_run=False): return newly_created_projects -def process_account(email, action, api_keys_data, dry_run=False, max_workers=5): +def process_account(email, creds, action, api_keys_data, dry_run=False, max_workers=5): """Processes a single account for the given action.""" logging.info(f"--- Processing account: {email} for action: {action} ---") if dry_run: logging.info("*** DRY RUN MODE ENABLED ***") - creds = get_credentials_for_email(email) if not creds: logging.warning(f"Could not get credentials for {email}. Skipping.") return @@ -518,30 +560,60 @@ def remove_keys_from_database(account_entry, project_id, deleted_keys_uids): if num_removed > 0: logging.info(f" Removed {num_removed} key(s) from local database for project {project_id}") -def get_credentials_for_email(email): - """Handles the OAuth2 flow for a given email.""" +def get_and_refresh_credentials(email, max_retries=3, retry_delay=5): + """Tries to load and refresh credentials for an email with retries. Does not start interactive flow.""" token_file = os.path.join(CREDENTIALS_DIR, f"{email}.json") creds = None - try: - if os.path.exists(token_file): + if os.path.exists(token_file): + try: creds = Credentials.from_authorized_user_file(token_file, SCOPES) + except (ValueError, json.JSONDecodeError): + logging.warning(f"Could not decode token file for {email}. Re-authentication will be required.") + return None - if not creds or not creds.valid: - if creds and creds.expired and creds.refresh_token: - logging.info("Refreshing credentials...") + if creds and creds.valid: + return creds + + if creds and creds.expired and creds.refresh_token: + for attempt in range(max_retries): + try: + logging.info(f"Refreshing credentials for {email} (attempt {attempt + 1}/{max_retries})...") creds.refresh(google.auth.transport.requests.Request()) - else: - logging.info(f"Please authenticate with: {email}") - flow = InstalledAppFlow.from_client_secrets_file( - CLIENT_SECRETS_FILE, SCOPES - ) - creds = flow.run_local_server(port=0, login_hint=email) + with open(token_file, "w") as token: + token.write(creds.to_json()) + logging.info(f"Successfully refreshed credentials for {email}.") + return creds + except Exception as e: + logging.warning(f"Failed to refresh credentials for {email} on attempt {attempt + 1}: {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + + logging.error(f"Failed to refresh credentials for {email} after {max_retries} attempts.") + return None + + return None + +def run_interactive_auth(email, max_retries=3, retry_delay=5): + """Runs the interactive OAuth2 flow for a given email with retries.""" + for attempt in range(max_retries): + try: + logging.info(f"Please authenticate with: {email} (attempt {attempt + 1}/{max_retries})") + flow = InstalledAppFlow.from_client_secrets_file( + CLIENT_SECRETS_FILE, SCOPES + ) + creds = flow.run_local_server(port=0, login_hint=email) + token_file = os.path.join(CREDENTIALS_DIR, f"{email}.json") with open(token_file, "w") as token: token.write(creds.to_json()) - return creds - except Exception as e: - logging.error(f"An unexpected error occurred during authentication: {e}") - return None + return creds + except Exception as e: + logging.error(f"An unexpected error occurred during authentication for {email} on attempt {attempt + 1}: {e}") + if attempt < max_retries - 1: + logging.info(f"Retrying authentication in {retry_delay} seconds...") + time.sleep(retry_delay) + + logging.error(f"Failed to authenticate {email} after {max_retries} attempts.") + return None def enable_api(project_id, credentials, dry_run=False): """Enables the Generative Language API if it's not already enabled."""