#!/usr/bin/env python3 """ AI-powered matching of Norda companies to ZOPK projects. Uses Gemini AI to analyze each company's profile and determine which ZOPK projects are relevant, with relevance scores and collaboration descriptions. Usage: python3 scripts/match_companies_zopk.py [--dry-run] [--limit N] [--company-id ID] """ import sys import os import json import time import logging import argparse sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from dotenv import load_dotenv load_dotenv() from database import ( SessionLocal, Company, ZOPKProject, ZOPKCompanyLink ) from sqlalchemy import func logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) MATCHING_PROMPT = """Jesteś ekspertem ds. łańcuchów dostaw i współpracy biznesowej w regionie Pomorza. Przeanalizuj profil firmy i oceń, czy może być powiązana z projektami Zielonego Okręgu Przemysłowego Kaszubia. FIRMA: Nazwa: {company_name} Kategoria: {category} Kody PKD: {pkd_codes} Opis: {description} Usługi: {services} Branża: {industry} Tagi: {tags} PROJEKTY ZOPK: {projects_text} Dla KAŻDEGO projektu oceń: 1. Czy firma może być dostawcą, partnerem lub beneficjentem tego projektu? 2. Jeśli tak — jaki typ powiązania (potential_supplier, partner, beneficiary)? 3. Ocena trafności 0-100 (0 = brak powiązania, 100 = idealny dostawca/partner) 4. Krótki opis (1-2 zdania po polsku) DLACZEGO ta firma jest istotna dla tego projektu — co konkretnie może wnieść? Zwróć JSON: {{ "matches": [ {{ "project_id": 1, "relevant": true, "link_type": "potential_supplier", "relevance_score": 75, "description": "Firma specjalizuje się w spawaniu aluminium, co jest kluczowe przy konstrukcji platform offshore." }} ] }} WAŻNE: - Zwróć wpisy TYLKO dla projektów z relevance_score >= 25 - Bądź realistyczny — nie przypisuj firm do projektów bez uzasadnienia - Oceń na podstawie KONKRETNYCH kompetencji firmy, nie ogólnych kategorii - Opis musi być konkretny — co firma MOŻE WNIEŚĆ do projektu - Odpowiadaj TYLKO JSON-em, bez dodatkowego tekstu""" def gather_company_signals(company, db): """Gather all available signals for a company.""" # Basic info category = '' if company.category: category = company.category.name # PKD codes pkd_codes = [] if company.pkd_code: desc = company.pkd_description or '' pkd_codes.append(f"{company.pkd_code} ({desc})") if company.ceidg_pkd_list: try: pkd_list = company.ceidg_pkd_list if isinstance(company.ceidg_pkd_list, list) else json.loads(company.ceidg_pkd_list) for p in pkd_list[:5]: if isinstance(p, dict): pkd_codes.append(f"{p.get('code', '')} ({p.get('name', '')})") elif isinstance(p, str) and p not in [c.split(' ')[0] for c in pkd_codes]: pkd_codes.append(p) except (json.JSONDecodeError, TypeError): pass # Description desc_parts = [] if company.description_short: desc_parts.append(company.description_short) if company.description_full and len(company.description_full) > len(company.description_short or ''): desc_parts.append(company.description_full[:500]) description = ' '.join(desc_parts)[:800] or 'Brak opisu' # Services services_list = [] if company.services_offered: services_list.append(company.services_offered[:300]) # Linked services if hasattr(company, 'services') and company.services: for cs in company.services[:10]: if hasattr(cs, 'service') and cs.service: services_list.append(cs.service.name) elif hasattr(cs, 'name'): services_list.append(cs.name) services = ', '.join(services_list)[:500] or 'Brak informacji' # AI insights industry = company.industry_sector or '' tags = '' if company.ai_insights: try: insights = company.ai_insights if isinstance(company.ai_insights, dict) else json.loads(company.ai_insights) if insights.get('industry_tags'): tags = ', '.join(insights['industry_tags'][:8]) if insights.get('services_list') and not services_list: services = ', '.join(insights['services_list'][:8]) except (json.JSONDecodeError, TypeError): pass return { 'company_name': company.name, 'category': category, 'pkd_codes': '; '.join(pkd_codes) or 'Brak', 'description': description, 'services': services, 'industry': industry or 'Brak', 'tags': tags or 'Brak', } def format_projects(projects): """Format ZOPK projects for the prompt.""" lines = [] for p in projects: lines.append(f"ID: {p.id} | Nazwa: {p.name} | Typ: {p.project_type or 'brak'} | Status: {p.status}") if p.description: lines.append(f" Opis: {p.description[:300]}") lines.append('') return '\n'.join(lines) def match_company(company, projects, gemini_service, db, dry_run=False): """Match a single company to ZOPK projects using Gemini AI.""" signals = gather_company_signals(company, db) projects_text = format_projects(projects) prompt = MATCHING_PROMPT.format( projects_text=projects_text, **signals ) try: response_text = gemini_service.generate_text( prompt=prompt, temperature=0.3, feature='zopk_company_matching', company_id=company.id, model='3-flash', # Gemini 3 Flash thinking mode — najlepszy dostępny ) if not response_text: logger.warning(f" Empty response for {company.name}") return [] # Parse JSON from response text = response_text.strip() # Remove markdown code blocks if present if text.startswith('```'): text = text.split('\n', 1)[1] if '\n' in text else text[3:] if text.endswith('```'): text = text[:-3] if text.startswith('json'): text = text[4:] text = text.strip() data = json.loads(text) matches = data.get('matches', []) saved = 0 for match in matches: if not match.get('relevant', False): continue score = match.get('relevance_score', 0) if score < 25: continue project_id = match.get('project_id') link_type = match.get('link_type', 'potential_supplier') description = match.get('description', '') # Validate valid_types = ['potential_supplier', 'partner', 'investor', 'beneficiary'] if link_type not in valid_types: link_type = 'potential_supplier' logger.info(f" → Projekt {project_id}: {link_type} (score: {score}) — {description[:80]}") if not dry_run: # Check if already exists existing = db.query(ZOPKCompanyLink).filter( ZOPKCompanyLink.company_id == company.id, ZOPKCompanyLink.project_id == project_id, ZOPKCompanyLink.link_type == link_type ).first() if existing: existing.relevance_score = score existing.collaboration_description = description existing.status = 'suggested' else: link = ZOPKCompanyLink( company_id=company.id, project_id=project_id, link_type=link_type, relevance_score=score, collaboration_description=description, status='suggested' ) db.add(link) saved += 1 if not dry_run and saved > 0: db.commit() return matches except json.JSONDecodeError as e: logger.error(f" JSON parse error for {company.name}: {e}") logger.debug(f" Response: {text[:200] if text else 'None'}") return [] except Exception as e: logger.error(f" Error matching {company.name}: {e}") return [] def main(): parser = argparse.ArgumentParser() parser.add_argument('--dry-run', action='store_true') parser.add_argument('--limit', type=int, default=150) parser.add_argument('--company-id', type=int, help='Match single company by ID') args = parser.parse_args() # Initialize Gemini from gemini_service import init_gemini_service, get_gemini_service init_gemini_service(model='3-flash') gemini = get_gemini_service() if not gemini: logger.error("Failed to initialize Gemini service") sys.exit(1) db = SessionLocal() try: # Get ZOPK projects projects = db.query(ZOPKProject).filter( ZOPKProject.is_active.isnot(False) ).order_by(ZOPKProject.id).all() logger.info(f"Loaded {len(projects)} ZOPK projects") # Get companies query = db.query(Company).filter( Company.status == 'active' ) if args.company_id: query = query.filter(Company.id == args.company_id) companies = query.order_by(Company.name).limit(args.limit).all() logger.info(f"Matching {len(companies)} companies to ZOPK projects") total_matches = 0 for i, company in enumerate(companies, 1): logger.info(f"[{i}/{len(companies)}] {company.name}") matches = match_company(company, projects, gemini, db, dry_run=args.dry_run) relevant = [m for m in matches if m.get('relevant') and m.get('relevance_score', 0) >= 25] total_matches += len(relevant) # Rate limiting — Gemini free tier time.sleep(2) logger.info(f"\nDone! Total matches: {total_matches}") if args.dry_run: logger.info("DRY RUN — no changes saved") finally: db.close() if __name__ == '__main__': main()