fix: use file-based job state for bulk discovery progress
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions

In-memory _bulk_jobs dict was per-worker in gunicorn (4 workers),
causing poll requests to miss job state. Now uses /tmp JSON files
visible to all workers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-02-21 10:05:58 +01:00
parent ec8d419fad
commit 0ace87e346

View File

@ -5,7 +5,9 @@ Admin Website Discovery Routes
Endpoints for discovering and managing website candidates for companies.
"""
import json
import logging
import os
import threading
from datetime import datetime
@ -20,8 +22,24 @@ from services.website_discovery_service import WebsiteDiscoveryService
logger = logging.getLogger(__name__)
# Store bulk job progress
_bulk_jobs = {}
# File-based job state (shared across gunicorn workers)
JOB_DIR = '/tmp/nordabiz_discovery_jobs'
def _save_job(job_id, data):
os.makedirs(JOB_DIR, exist_ok=True)
path = os.path.join(JOB_DIR, f'{job_id}.json')
with open(path, 'w') as f:
json.dump(data, f)
def _load_job(job_id):
path = os.path.join(JOB_DIR, f'{job_id}.json')
try:
with open(path) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
@bp.route('/discover-website/<int:company_id>', methods=['POST'])
@ -58,7 +76,8 @@ def discover_websites_bulk():
job_id = str(uuid.uuid4())[:8]
def run_bulk(job_id):
_bulk_jobs[job_id] = {'status': 'running', 'processed': 0, 'total': 0, 'log': []}
job = {'status': 'running', 'processed': 0, 'total': 0, 'log': []}
_save_job(job_id, job)
db = SessionLocal()
try:
# Skip companies that already have a pending/accepted candidate
@ -74,7 +93,9 @@ def discover_websites_bulk():
~Company.id.in_(already_have) if already_have else True,
).order_by(Company.name).limit(50).all()
_bulk_jobs[job_id]['total'] = len(companies)
job['total'] = len(companies)
_save_job(job_id, job)
service = WebsiteDiscoveryService(db=db)
import time
@ -89,18 +110,20 @@ def discover_websites_bulk():
else:
status_text += result.get('error', 'brak wyników')
# Update processed AFTER building status text — atomic from poll perspective
_bulk_jobs[job_id]['log'].append(status_text)
_bulk_jobs[job_id]['processed'] += 1
job['log'].append(status_text)
job['processed'] += 1
_save_job(job_id, job)
if _bulk_jobs[job_id]['processed'] < _bulk_jobs[job_id]['total']:
if job['processed'] < job['total']:
time.sleep(5)
_bulk_jobs[job_id]['status'] = 'completed'
job['status'] = 'completed'
_save_job(job_id, job)
except Exception as e:
logger.error(f"Bulk discovery error: {e}")
_bulk_jobs[job_id]['status'] = 'error'
_bulk_jobs[job_id]['log'].append(f"Błąd: {e}")
job['status'] = 'error'
job['log'].append(f"Błąd: {e}")
_save_job(job_id, job)
finally:
db.close()
@ -116,10 +139,13 @@ def discover_websites_bulk():
def discover_websites_status():
"""Poll bulk discovery progress."""
job_id = request.args.get('job_id')
if not job_id or job_id not in _bulk_jobs:
if not job_id:
return jsonify({'error': 'Job not found'}), 404
job = _load_job(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
job = _bulk_jobs[job_id]
log_offset = request.args.get('log_offset', 0, type=int)
new_entries = job['log'][log_offset:]