""" Company Matcher for NordaGPT ============================== Pre-selects companies from the database BEFORE the AI sees them, eliminating hallucinations by ensuring only real companies with rich profiles are passed as context. Multi-layer matching: 1. Category matching (query keywords -> category names) 2. Full-text search on search_vector (tsvector GIN index) 3. ILIKE on services_offered, description_full, description_short 4. AI Insights tags (industry_tags, services_list from company_ai_insights) 5. PKD codes matching Author: Maciej Pienczyn, InPi sp. z o.o. Created: 2026-03-28 """ import re import logging from typing import Optional from sqlalchemy import text from sqlalchemy.orm import joinedload from database import ( SessionLocal, Company, Category, CompanyAIInsights, GBPAudit, CompanyService, CompanyCompetency, Service, Competency, ) logger = logging.getLogger(__name__) # ============================================================ # CATEGORY KEYWORD MAPPING # ============================================================ CATEGORY_KEYWORDS = { 'Budownictwo ogólne': [ 'budow', 'budynk', 'konstrukc', 'dekars', 'remontow', 'stawianie obiektów', ], 'Budownictwo': ['budow', 'budynk'], 'IT i Technologie': [ 'it', 'informatyk', 'oprogramow', 'software', 'system', 'serwer', 'technologi', ], 'IT i Telekomunikacja': [ 'telekomunikac', 'internet', 'światłowod', 'sieć', 'łącznoś', ], 'Energia i OZE': [ 'energ', 'elektryk', 'prąd', 'oze', 'fotowolt', 'zasilani', ], 'HVAC i Instalacje': [ 'hvac', 'klimatyz', 'chłodz', 'wentylac', 'instalac', 'grzewcz', ], 'Bezpieczeństwo i Ochrona': [ 'bezpiecz', 'ochron', 'monitoring', 'alarm', ], 'Nieruchomości': [ 'nieruchomoś', 'dewelop', 'gruntów', 'działk', ], 'Usługi prawne': ['prawn', 'adwokat', 'radca', 'kancelari'], 'Usługi finansowe': [ 'finans', 'rachunk', 'księgow', 'doradztw', ], 'Marketing i Reklama': [ 'marketing', 'reklam', 'promocj', 'media', ], 'Handel i Hurtownie': [ 'handel', 'hurtowni', 'sprzedaż', 'dystrybuc', ], 'Produkcja ogólna': [ 'produkcj', 'produku', 'wytwarz', 'fabryk', ], 'Hotelarstwo i Turystyka': [ 'hotel', 'turyst', 'nocleg', 'gastrono', ], 'Motoryzacja': ['motoryz', 'samochod', 'warsztat'], } # ============================================================ # PKD CODE MAPPING # ============================================================ PKD_KEYWORDS = { 'budow': ['41', '42', '43'], # Construction 'it': ['62', '63'], # IT / Software 'energ': ['35'], # Energy 'transport': ['49', '50', '51'], # Transport / logistics 'handel': ['45', '46', '47'], # Trade 'produkcj': ['10', '20', '22', '24', '25', '28'], # Manufacturing 'hotel': ['55', '56'], # Hospitality 'finans': ['64', '66'], # Finance 'prawn': ['69'], # Legal 'marketing': ['73'], # Advertising 'nieruchomoś': ['68'], # Real estate } # ============================================================ # SCORING WEIGHTS # ============================================================ SCORE_CATEGORY = 10 SCORE_FTS = 8 SCORE_ILIKE = 5 SCORE_AI_INSIGHTS = 7 SCORE_PKD = 3 # ============================================================ # MAIN ENTRY POINT # ============================================================ def match_companies( query: str, user_context: Optional[dict] = None, max_results: int = 15, ) -> list[dict]: """ Search the database for companies matching the user's query. Returns rich company profiles ready for AI context injection. Args: query: User's search query in Polish or English. user_context: Optional dict with caller info (unused for now). max_results: Maximum number of companies to return. Returns: List of dicts with rich company profiles, sorted by relevance_score desc. """ if not query or not query.strip(): return [] query = query.strip() query_lower = query.lower() db = SessionLocal() try: # Accumulator: company_id -> {score, reasons} scores: dict[int, dict] = {} def _add(company_id: int, points: int, reason: str): if company_id not in scores: scores[company_id] = {'score': 0, 'reasons': []} scores[company_id]['score'] += points scores[company_id]['reasons'].append(reason) # ---------------------------------------------------------- # Layer 1: Category keyword matching # ---------------------------------------------------------- matched_categories = _match_categories(query_lower) if matched_categories: cat_rows = ( db.query(Company.id) .join(Category, Company.category_id == Category.id) .filter( Category.name.in_(matched_categories), Company.status == 'active', ) .all() ) for (cid,) in cat_rows: _add(cid, SCORE_CATEGORY, f"category: {', '.join(matched_categories)}") logger.debug( "Layer 1 (category): matched %d companies for categories %s", len(cat_rows), matched_categories, ) # ---------------------------------------------------------- # Layer 2: PostgreSQL Full-Text Search (search_vector) # ---------------------------------------------------------- fts_ids = _fts_search(db, query) for cid in fts_ids: _add(cid, SCORE_FTS, f"fts: {query}") logger.debug("Layer 2 (FTS): matched %d companies", len(fts_ids)) # ---------------------------------------------------------- # Layer 3: ILIKE on text fields # ---------------------------------------------------------- ilike_ids = _ilike_search(db, query_lower) for cid in ilike_ids: _add(cid, SCORE_ILIKE, f"ilike: {query}") logger.debug("Layer 3 (ILIKE): matched %d companies", len(ilike_ids)) # ---------------------------------------------------------- # Layer 4: AI Insights (services_list, industry_tags, USPs) # ---------------------------------------------------------- ai_ids = _ai_insights_search(db, query_lower) for cid in ai_ids: _add(cid, SCORE_AI_INSIGHTS, f"ai_insights: {query}") logger.debug("Layer 4 (AI Insights): matched %d companies", len(ai_ids)) # ---------------------------------------------------------- # Layer 5: PKD codes # ---------------------------------------------------------- pkd_ids = _pkd_search(db, query_lower) for cid in pkd_ids: _add(cid, SCORE_PKD, f"pkd: {query}") logger.debug("Layer 5 (PKD): matched %d companies", len(pkd_ids)) # ---------------------------------------------------------- # Sort by score and pick top N # ---------------------------------------------------------- if not scores: logger.info("company_matcher: no matches for query=%r", query) return [] ranked = sorted(scores.items(), key=lambda x: x[1]['score'], reverse=True) top_ids = [cid for cid, _ in ranked[:max_results]] # ---------------------------------------------------------- # Build rich profiles # ---------------------------------------------------------- companies = ( db.query(Company) .options( joinedload(Company.category), joinedload(Company.services).joinedload(CompanyService.service), joinedload(Company.competencies).joinedload(CompanyCompetency.competency), joinedload(Company.ai_insights), ) .filter(Company.id.in_(top_ids), Company.status == 'active') .all() ) company_map = {c.id: c for c in companies} # Fetch latest GBP audit per company (one query) gbp_map = _fetch_gbp_audits(db, top_ids) results = [] for cid in top_ids: company = company_map.get(cid) if not company: continue info = scores[cid] profile = _build_rich_profile(company, info, gbp_map.get(cid)) results.append(profile) logger.info( "company_matcher: query=%r -> %d results (top score=%d)", query, len(results), results[0]['relevance_score'] if results else 0, ) return results except Exception: logger.exception("company_matcher error for query=%r", query) return [] finally: db.close() # ============================================================ # LAYER IMPLEMENTATIONS # ============================================================ def _match_categories(query_lower: str) -> list[str]: """Return category names whose keywords appear in the query.""" matched = [] for cat_name, keywords in CATEGORY_KEYWORDS.items(): for kw in keywords: if kw in query_lower: matched.append(cat_name) break return matched def _fts_search(db, query: str) -> list[int]: """Full-text search using the search_vector tsvector column.""" # Sanitize: keep word characters (including Polish diacritics) words = re.findall(r'[\w]+', query, flags=re.UNICODE) words = [w for w in words if len(w) > 1] if not words: return [] tsquery = ' | '.join(f"{w}:*" for w in words) sql = text(""" SELECT c.id FROM companies c WHERE c.status = 'active' AND c.search_vector @@ to_tsquery('simple', :tsquery) ORDER BY ts_rank(c.search_vector, to_tsquery('simple', :tsquery)) DESC LIMIT 50 """) try: rows = db.execute(sql, {'tsquery': tsquery}).fetchall() return [r[0] for r in rows] except Exception: logger.debug("FTS search failed (search_vector may not exist), skipping") db.rollback() return [] def _ilike_search(db, query_lower: str) -> list[int]: """ILIKE search across text columns.""" words = re.findall(r'[\w]+', query_lower, flags=re.UNICODE) words = [w for w in words if len(w) > 2] if not words: return [] patterns = [f'%{w}%' for w in words] sql = text(""" SELECT DISTINCT c.id FROM companies c WHERE c.status = 'active' AND ( c.services_offered ILIKE ANY(:patterns) OR c.description_full ILIKE ANY(:patterns) OR c.description_short ILIKE ANY(:patterns) OR c.founding_history ILIKE ANY(:patterns) OR c.technologies_used ILIKE ANY(:patterns) ) LIMIT 50 """) try: rows = db.execute(sql, {'patterns': patterns}).fetchall() return [r[0] for r in rows] except Exception: logger.debug("ILIKE search failed, skipping") db.rollback() return [] def _ai_insights_search(db, query_lower: str) -> list[int]: """Search AI Insights arrays: services_list, industry_tags, unique_selling_points.""" words = re.findall(r'[\w]+', query_lower, flags=re.UNICODE) words = [w for w in words if len(w) > 2] if not words: return [] # Build an OR of array element ILIKE checks # Using unnest to search inside PostgreSQL ARRAY columns patterns = [f'%{w}%' for w in words] sql = text(""" SELECT DISTINCT ai.company_id FROM company_ai_insights ai JOIN companies c ON c.id = ai.company_id AND c.status = 'active' WHERE EXISTS ( SELECT 1 FROM unnest(ai.services_list) s WHERE s ILIKE ANY(:patterns) ) OR EXISTS ( SELECT 1 FROM unnest(ai.industry_tags) t WHERE t ILIKE ANY(:patterns) ) OR EXISTS ( SELECT 1 FROM unnest(ai.unique_selling_points) u WHERE u ILIKE ANY(:patterns) ) LIMIT 50 """) try: rows = db.execute(sql, {'patterns': patterns}).fetchall() return [r[0] for r in rows] except Exception: logger.debug("AI Insights search failed, skipping") db.rollback() return [] def _pkd_search(db, query_lower: str) -> list[int]: """Match companies by PKD codes derived from query keywords.""" pkd_prefixes = set() for keyword_stem, codes in PKD_KEYWORDS.items(): if keyword_stem in query_lower: pkd_prefixes.update(codes) if not pkd_prefixes: return [] # Match pkd_code starting with any of the 2-digit prefixes like_patterns = [f'{p}%' for p in pkd_prefixes] sql = text(""" SELECT DISTINCT c.id FROM companies c WHERE c.status = 'active' AND c.pkd_code IS NOT NULL AND c.pkd_code LIKE ANY(:patterns) LIMIT 50 """) try: rows = db.execute(sql, {'patterns': like_patterns}).fetchall() return [r[0] for r in rows] except Exception: logger.debug("PKD search failed, skipping") db.rollback() return [] # ============================================================ # GBP AUDIT HELPER # ============================================================ def _fetch_gbp_audits(db, company_ids: list[int]) -> dict: """Fetch the latest GBP audit for each company. Returns {company_id: GBPAudit}.""" if not company_ids: return {} # Use DISTINCT ON to get the latest audit per company sql = text(""" SELECT DISTINCT ON (company_id) id, company_id FROM gbp_audits WHERE company_id = ANY(:ids) ORDER BY company_id, audit_date DESC """) try: rows = db.execute(sql, {'ids': company_ids}).fetchall() audit_ids = [r[0] for r in rows] if not audit_ids: return {} audits = db.query(GBPAudit).filter(GBPAudit.id.in_(audit_ids)).all() return {a.company_id: a for a in audits} except Exception: logger.debug("GBP audit fetch failed, skipping") db.rollback() return {} # ============================================================ # RICH PROFILE BUILDER # ============================================================ def _build_rich_profile( company: Company, score_info: dict, gbp_audit: Optional[GBPAudit], ) -> dict: """Build a rich company profile dict for AI context injection.""" # Structured services from relationship structured_services = [] if company.services: for cs in company.services: if cs.service and cs.service.name: structured_services.append(cs.service.name) # Competencies from relationship competency_list = [] if company.competencies: for cc in company.competencies: if cc.competency and cc.competency.name: competency_list.append(cc.competency.name) # AI Insights ai_data = {} if company.ai_insights: ai = company.ai_insights ai_data = { 'services_list': ai.services_list or [], 'unique_selling_points': ai.unique_selling_points or [], 'target_market': ai.target_market or '', 'industry_tags': ai.industry_tags or [], 'certifications': ai.certifications or [], } # GBP data google_rating = None google_reviews = None if gbp_audit: google_rating = float(gbp_audit.average_rating) if gbp_audit.average_rating else None google_reviews = gbp_audit.review_count # Category name category_name = company.category.name if company.category else None # Truncate founding_history founding = None if company.founding_history: founding = company.founding_history[:300] if len(company.founding_history) > 300: founding += '...' return { 'name': company.name, 'slug': company.slug, 'link': f'/company/{company.slug}', 'category': category_name, 'description': company.description_full or company.description_short or '', 'services': company.services_offered or '', 'structured_services': structured_services, 'competencies': competency_list, 'ai_insights': ai_data, 'founding_history': founding, 'city': company.address_city, 'website': company.website, 'phone': company.phone, 'email': company.email, 'year_established': company.year_established, 'employee_range': company.employee_count_range, 'google_rating': google_rating, 'google_reviews': google_reviews, 'relevance_score': score_info['score'], 'match_reasons': score_info['reasons'], }