nordabiz/company_matcher.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

500 lines
17 KiB
Python

"""
Company Matcher for NordaGPT
==============================
Pre-selects companies from the database BEFORE the AI sees them,
eliminating hallucinations by ensuring only real companies with
rich profiles are passed as context.
Multi-layer matching:
1. Category matching (query keywords -> category names)
2. Full-text search on search_vector (tsvector GIN index)
3. ILIKE on services_offered, description_full, description_short
4. AI Insights tags (industry_tags, services_list from company_ai_insights)
5. PKD codes matching
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-03-28
"""
import re
import logging
from typing import Optional
from sqlalchemy import text
from sqlalchemy.orm import joinedload
from database import (
SessionLocal, Company, Category, CompanyAIInsights,
GBPAudit, CompanyService, CompanyCompetency,
Service, Competency,
)
logger = logging.getLogger(__name__)
# ============================================================
# CATEGORY KEYWORD MAPPING
# ============================================================
CATEGORY_KEYWORDS = {
'Budownictwo ogólne': [
'budow', 'budynk', 'konstrukc', 'dekars', 'remontow',
'stawianie obiektów',
],
'Budownictwo': ['budow', 'budynk'],
'IT i Technologie': [
'it', 'informatyk', 'oprogramow', 'software', 'system',
'serwer', 'technologi',
],
'IT i Telekomunikacja': [
'telekomunikac', 'internet', 'światłowod', 'sieć', 'łącznoś',
],
'Energia i OZE': [
'energ', 'elektryk', 'prąd', 'oze', 'fotowolt', 'zasilani',
],
'HVAC i Instalacje': [
'hvac', 'klimatyz', 'chłodz', 'wentylac', 'instalac', 'grzewcz',
],
'Bezpieczeństwo i Ochrona': [
'bezpiecz', 'ochron', 'monitoring', 'alarm',
],
'Nieruchomości': [
'nieruchomoś', 'dewelop', 'gruntów', 'działk',
],
'Usługi prawne': ['prawn', 'adwokat', 'radca', 'kancelari'],
'Usługi finansowe': [
'finans', 'rachunk', 'księgow', 'doradztw',
],
'Marketing i Reklama': [
'marketing', 'reklam', 'promocj', 'media',
],
'Handel i Hurtownie': [
'handel', 'hurtowni', 'sprzedaż', 'dystrybuc',
],
'Produkcja ogólna': [
'produkcj', 'produku', 'wytwarz', 'fabryk',
],
'Hotelarstwo i Turystyka': [
'hotel', 'turyst', 'nocleg', 'gastrono',
],
'Motoryzacja': ['motoryz', 'samochod', 'warsztat'],
}
# ============================================================
# PKD CODE MAPPING
# ============================================================
PKD_KEYWORDS = {
'budow': ['41', '42', '43'], # Construction
'it': ['62', '63'], # IT / Software
'energ': ['35'], # Energy
'transport': ['49', '50', '51'], # Transport / logistics
'handel': ['45', '46', '47'], # Trade
'produkcj': ['10', '20', '22', '24', '25', '28'], # Manufacturing
'hotel': ['55', '56'], # Hospitality
'finans': ['64', '66'], # Finance
'prawn': ['69'], # Legal
'marketing': ['73'], # Advertising
'nieruchomoś': ['68'], # Real estate
}
# ============================================================
# SCORING WEIGHTS
# ============================================================
SCORE_CATEGORY = 10
SCORE_FTS = 8
SCORE_ILIKE = 5
SCORE_AI_INSIGHTS = 7
SCORE_PKD = 3
# ============================================================
# MAIN ENTRY POINT
# ============================================================
def match_companies(
query: str,
user_context: Optional[dict] = None,
max_results: int = 15,
) -> list[dict]:
"""
Search the database for companies matching the user's query.
Returns rich company profiles ready for AI context injection.
Args:
query: User's search query in Polish or English.
user_context: Optional dict with caller info (unused for now).
max_results: Maximum number of companies to return.
Returns:
List of dicts with rich company profiles, sorted by relevance_score desc.
"""
if not query or not query.strip():
return []
query = query.strip()
query_lower = query.lower()
db = SessionLocal()
try:
# Accumulator: company_id -> {score, reasons}
scores: dict[int, dict] = {}
def _add(company_id: int, points: int, reason: str):
if company_id not in scores:
scores[company_id] = {'score': 0, 'reasons': []}
scores[company_id]['score'] += points
scores[company_id]['reasons'].append(reason)
# ----------------------------------------------------------
# Layer 1: Category keyword matching
# ----------------------------------------------------------
matched_categories = _match_categories(query_lower)
if matched_categories:
cat_rows = (
db.query(Company.id)
.join(Category, Company.category_id == Category.id)
.filter(
Category.name.in_(matched_categories),
Company.status == 'active',
)
.all()
)
for (cid,) in cat_rows:
_add(cid, SCORE_CATEGORY, f"category: {', '.join(matched_categories)}")
logger.debug(
"Layer 1 (category): matched %d companies for categories %s",
len(cat_rows), matched_categories,
)
# ----------------------------------------------------------
# Layer 2: PostgreSQL Full-Text Search (search_vector)
# ----------------------------------------------------------
fts_ids = _fts_search(db, query)
for cid in fts_ids:
_add(cid, SCORE_FTS, f"fts: {query}")
logger.debug("Layer 2 (FTS): matched %d companies", len(fts_ids))
# ----------------------------------------------------------
# Layer 3: ILIKE on text fields
# ----------------------------------------------------------
ilike_ids = _ilike_search(db, query_lower)
for cid in ilike_ids:
_add(cid, SCORE_ILIKE, f"ilike: {query}")
logger.debug("Layer 3 (ILIKE): matched %d companies", len(ilike_ids))
# ----------------------------------------------------------
# Layer 4: AI Insights (services_list, industry_tags, USPs)
# ----------------------------------------------------------
ai_ids = _ai_insights_search(db, query_lower)
for cid in ai_ids:
_add(cid, SCORE_AI_INSIGHTS, f"ai_insights: {query}")
logger.debug("Layer 4 (AI Insights): matched %d companies", len(ai_ids))
# ----------------------------------------------------------
# Layer 5: PKD codes
# ----------------------------------------------------------
pkd_ids = _pkd_search(db, query_lower)
for cid in pkd_ids:
_add(cid, SCORE_PKD, f"pkd: {query}")
logger.debug("Layer 5 (PKD): matched %d companies", len(pkd_ids))
# ----------------------------------------------------------
# Sort by score and pick top N
# ----------------------------------------------------------
if not scores:
logger.info("company_matcher: no matches for query=%r", query)
return []
ranked = sorted(scores.items(), key=lambda x: x[1]['score'], reverse=True)
top_ids = [cid for cid, _ in ranked[:max_results]]
# ----------------------------------------------------------
# Build rich profiles
# ----------------------------------------------------------
companies = (
db.query(Company)
.options(
joinedload(Company.category),
joinedload(Company.services).joinedload(CompanyService.service),
joinedload(Company.competencies).joinedload(CompanyCompetency.competency),
joinedload(Company.ai_insights),
)
.filter(Company.id.in_(top_ids), Company.status == 'active')
.all()
)
company_map = {c.id: c for c in companies}
# Fetch latest GBP audit per company (one query)
gbp_map = _fetch_gbp_audits(db, top_ids)
results = []
for cid in top_ids:
company = company_map.get(cid)
if not company:
continue
info = scores[cid]
profile = _build_rich_profile(company, info, gbp_map.get(cid))
results.append(profile)
logger.info(
"company_matcher: query=%r -> %d results (top score=%d)",
query, len(results), results[0]['relevance_score'] if results else 0,
)
return results
except Exception:
logger.exception("company_matcher error for query=%r", query)
return []
finally:
db.close()
# ============================================================
# LAYER IMPLEMENTATIONS
# ============================================================
def _match_categories(query_lower: str) -> list[str]:
"""Return category names whose keywords appear in the query."""
matched = []
for cat_name, keywords in CATEGORY_KEYWORDS.items():
for kw in keywords:
if kw in query_lower:
matched.append(cat_name)
break
return matched
def _fts_search(db, query: str) -> list[int]:
"""Full-text search using the search_vector tsvector column."""
# Sanitize: keep word characters (including Polish diacritics)
words = re.findall(r'[\w]+', query, flags=re.UNICODE)
words = [w for w in words if len(w) > 1]
if not words:
return []
tsquery = ' | '.join(f"{w}:*" for w in words)
sql = text("""
SELECT c.id
FROM companies c
WHERE c.status = 'active'
AND c.search_vector @@ to_tsquery('simple', :tsquery)
ORDER BY ts_rank(c.search_vector, to_tsquery('simple', :tsquery)) DESC
LIMIT 50
""")
try:
rows = db.execute(sql, {'tsquery': tsquery}).fetchall()
return [r[0] for r in rows]
except Exception:
logger.debug("FTS search failed (search_vector may not exist), skipping")
db.rollback()
return []
def _ilike_search(db, query_lower: str) -> list[int]:
"""ILIKE search across text columns."""
words = re.findall(r'[\w]+', query_lower, flags=re.UNICODE)
words = [w for w in words if len(w) > 2]
if not words:
return []
patterns = [f'%{w}%' for w in words]
sql = text("""
SELECT DISTINCT c.id
FROM companies c
WHERE c.status = 'active'
AND (
c.services_offered ILIKE ANY(:patterns)
OR c.description_full ILIKE ANY(:patterns)
OR c.description_short ILIKE ANY(:patterns)
OR c.founding_history ILIKE ANY(:patterns)
OR c.technologies_used ILIKE ANY(:patterns)
)
LIMIT 50
""")
try:
rows = db.execute(sql, {'patterns': patterns}).fetchall()
return [r[0] for r in rows]
except Exception:
logger.debug("ILIKE search failed, skipping")
db.rollback()
return []
def _ai_insights_search(db, query_lower: str) -> list[int]:
"""Search AI Insights arrays: services_list, industry_tags, unique_selling_points."""
words = re.findall(r'[\w]+', query_lower, flags=re.UNICODE)
words = [w for w in words if len(w) > 2]
if not words:
return []
# Build an OR of array element ILIKE checks
# Using unnest to search inside PostgreSQL ARRAY columns
patterns = [f'%{w}%' for w in words]
sql = text("""
SELECT DISTINCT ai.company_id
FROM company_ai_insights ai
JOIN companies c ON c.id = ai.company_id AND c.status = 'active'
WHERE EXISTS (
SELECT 1 FROM unnest(ai.services_list) s WHERE s ILIKE ANY(:patterns)
)
OR EXISTS (
SELECT 1 FROM unnest(ai.industry_tags) t WHERE t ILIKE ANY(:patterns)
)
OR EXISTS (
SELECT 1 FROM unnest(ai.unique_selling_points) u WHERE u ILIKE ANY(:patterns)
)
LIMIT 50
""")
try:
rows = db.execute(sql, {'patterns': patterns}).fetchall()
return [r[0] for r in rows]
except Exception:
logger.debug("AI Insights search failed, skipping")
db.rollback()
return []
def _pkd_search(db, query_lower: str) -> list[int]:
"""Match companies by PKD codes derived from query keywords."""
pkd_prefixes = set()
for keyword_stem, codes in PKD_KEYWORDS.items():
if keyword_stem in query_lower:
pkd_prefixes.update(codes)
if not pkd_prefixes:
return []
# Match pkd_code starting with any of the 2-digit prefixes
like_patterns = [f'{p}%' for p in pkd_prefixes]
sql = text("""
SELECT DISTINCT c.id
FROM companies c
WHERE c.status = 'active'
AND c.pkd_code IS NOT NULL
AND c.pkd_code LIKE ANY(:patterns)
LIMIT 50
""")
try:
rows = db.execute(sql, {'patterns': like_patterns}).fetchall()
return [r[0] for r in rows]
except Exception:
logger.debug("PKD search failed, skipping")
db.rollback()
return []
# ============================================================
# GBP AUDIT HELPER
# ============================================================
def _fetch_gbp_audits(db, company_ids: list[int]) -> dict:
"""Fetch the latest GBP audit for each company. Returns {company_id: GBPAudit}."""
if not company_ids:
return {}
# Use DISTINCT ON to get the latest audit per company
sql = text("""
SELECT DISTINCT ON (company_id) id, company_id
FROM gbp_audits
WHERE company_id = ANY(:ids)
ORDER BY company_id, audit_date DESC
""")
try:
rows = db.execute(sql, {'ids': company_ids}).fetchall()
audit_ids = [r[0] for r in rows]
if not audit_ids:
return {}
audits = db.query(GBPAudit).filter(GBPAudit.id.in_(audit_ids)).all()
return {a.company_id: a for a in audits}
except Exception:
logger.debug("GBP audit fetch failed, skipping")
db.rollback()
return {}
# ============================================================
# RICH PROFILE BUILDER
# ============================================================
def _build_rich_profile(
company: Company,
score_info: dict,
gbp_audit: Optional[GBPAudit],
) -> dict:
"""Build a rich company profile dict for AI context injection."""
# Structured services from relationship
structured_services = []
if company.services:
for cs in company.services:
if cs.service and cs.service.name:
structured_services.append(cs.service.name)
# Competencies from relationship
competency_list = []
if company.competencies:
for cc in company.competencies:
if cc.competency and cc.competency.name:
competency_list.append(cc.competency.name)
# AI Insights
ai_data = {}
if company.ai_insights:
ai = company.ai_insights
ai_data = {
'services_list': ai.services_list or [],
'unique_selling_points': ai.unique_selling_points or [],
'target_market': ai.target_market or '',
'industry_tags': ai.industry_tags or [],
'certifications': ai.certifications or [],
}
# GBP data
google_rating = None
google_reviews = None
if gbp_audit:
google_rating = float(gbp_audit.average_rating) if gbp_audit.average_rating else None
google_reviews = gbp_audit.review_count
# Category name
category_name = company.category.name if company.category else None
# Truncate founding_history
founding = None
if company.founding_history:
founding = company.founding_history[:300]
if len(company.founding_history) > 300:
founding += '...'
return {
'name': company.name,
'slug': company.slug,
'link': f'/company/{company.slug}',
'category': category_name,
'description': company.description_full or company.description_short or '',
'services': company.services_offered or '',
'structured_services': structured_services,
'competencies': competency_list,
'ai_insights': ai_data,
'founding_history': founding,
'city': company.address_city,
'website': company.website,
'phone': company.phone,
'email': company.email,
'year_established': company.year_established,
'employee_range': company.employee_count_range,
'google_rating': google_rating,
'google_reviews': google_reviews,
'relevance_score': score_info['score'],
'match_reasons': score_info['reasons'],
}