Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- HIGH: Fix SQL injection in ZOPK knowledge service (3 functions) — replace f-strings with parameterized queries - MEDIUM: Sanitize tsquery/LIKE input in SearchService to prevent injection - MEDIUM: Add @login_required + @role_required(ADMIN) to /health/full endpoint - MEDIUM: Add @role_required(ADMIN) to ZOPK knowledge search API - MEDIUM: Add bleach HTML sanitization on write for announcements, events, board proceedings (stored XSS via |safe) - MEDIUM: Remove partial API key from Gemini service logs - MEDIUM: Remove @csrf.exempt from chat endpoints, add X-CSRFToken headers in JS - MEDIUM: Add missing CSRF tokens to 3 POST forms (data_request, benefits_form, benefits_list) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
423 lines
15 KiB
Python
423 lines
15 KiB
Python
"""
|
|
Search Service for Norda Biznes Partner
|
|
|
|
Unified search functionality with:
|
|
- NIP/REGON direct lookup
|
|
- Synonym expansion
|
|
- SQLite scoring fallback
|
|
- PostgreSQL FTS with fuzzy matching (when available)
|
|
"""
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional, Tuple
|
|
from sqlalchemy import text
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import Company, Service, Competency, CompanyService, CompanyCompetency
|
|
|
|
|
|
@dataclass
|
|
class SearchResult:
|
|
"""Search result with score and match info"""
|
|
company: Company
|
|
score: float
|
|
match_type: str # 'nip', 'regon', 'exact', 'fuzzy', 'keyword'
|
|
|
|
|
|
# Synonym dictionary - reused from nordabiz_chat.py
|
|
KEYWORD_SYNONYMS = {
|
|
# IT / Web
|
|
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
|
|
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
|
|
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
|
|
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
|
|
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
|
|
# Budownictwo
|
|
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
|
|
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
|
|
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
|
|
'hala': ['magazyn', 'produkcja', 'przemysłowa', 'stalowa', 'konstrukcja'],
|
|
# Transport / Logistyka
|
|
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
|
|
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
|
|
# Usługi
|
|
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
|
|
'księgowa': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe'],
|
|
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
|
|
'prawnik': ['prawo', 'adwokat', 'radca', 'kancelaria', 'prawny'],
|
|
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
|
|
# Produkcja
|
|
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
|
|
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
|
|
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
|
|
'aluminium': ['alu', 'lekkie', 'profile', 'spawanie'],
|
|
}
|
|
|
|
|
|
class SearchService:
|
|
"""Unified search service for companies"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self._is_postgres = self._detect_postgres()
|
|
|
|
def _detect_postgres(self) -> bool:
|
|
"""Check if we're using PostgreSQL"""
|
|
try:
|
|
dialect = self.db.bind.dialect.name
|
|
return dialect == 'postgresql'
|
|
except:
|
|
return False
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
category_id: Optional[int] = None,
|
|
limit: int = 50
|
|
) -> List[SearchResult]:
|
|
"""
|
|
Main search method
|
|
|
|
Args:
|
|
query: Search query string
|
|
category_id: Optional category filter
|
|
limit: Max results to return
|
|
|
|
Returns:
|
|
List of SearchResult objects sorted by relevance
|
|
"""
|
|
query = query.strip()
|
|
if not query:
|
|
return self._get_all_companies(category_id, limit)
|
|
|
|
# 1. Check for NIP (10 digits)
|
|
if self._is_nip(query):
|
|
result = self._search_by_nip(query)
|
|
if result:
|
|
return [result]
|
|
|
|
# 2. Check for REGON (9 or 14 digits)
|
|
if self._is_regon(query):
|
|
result = self._search_by_regon(query)
|
|
if result:
|
|
return [result]
|
|
|
|
# 3. Full-text search with synonyms
|
|
if self._is_postgres:
|
|
return self._search_postgres_fts(query, category_id, limit)
|
|
else:
|
|
return self._search_sqlite_fallback(query, category_id, limit)
|
|
|
|
def _is_nip(self, query: str) -> bool:
|
|
"""Check if query looks like NIP"""
|
|
clean = re.sub(r'[\s\-]', '', query)
|
|
return bool(re.match(r'^\d{10}$', clean))
|
|
|
|
def _is_regon(self, query: str) -> bool:
|
|
"""Check if query looks like REGON"""
|
|
clean = re.sub(r'[\s\-]', '', query)
|
|
return bool(re.match(r'^\d{9}$|^\d{14}$', clean))
|
|
|
|
def _search_by_nip(self, query: str) -> Optional[SearchResult]:
|
|
"""Direct NIP lookup"""
|
|
clean_nip = re.sub(r'[\s\-]', '', query)
|
|
company = self.db.query(Company).filter(
|
|
Company.nip == clean_nip,
|
|
Company.status == 'active'
|
|
).first()
|
|
if company:
|
|
return SearchResult(company=company, score=100.0, match_type='nip')
|
|
return None
|
|
|
|
def _search_by_regon(self, query: str) -> Optional[SearchResult]:
|
|
"""Direct REGON lookup"""
|
|
clean_regon = re.sub(r'[\s\-]', '', query)
|
|
company = self.db.query(Company).filter(
|
|
Company.regon == clean_regon,
|
|
Company.status == 'active'
|
|
).first()
|
|
if company:
|
|
return SearchResult(company=company, score=100.0, match_type='regon')
|
|
return None
|
|
|
|
def _expand_keywords(self, query: str) -> List[str]:
|
|
"""Expand query with synonyms"""
|
|
query_lower = query.lower()
|
|
words = [w.strip('.,?!') for w in query_lower.split() if len(w) > 1]
|
|
|
|
expanded = set(words)
|
|
for word in words:
|
|
# Direct synonym lookup
|
|
if word in KEYWORD_SYNONYMS:
|
|
expanded.update(KEYWORD_SYNONYMS[word])
|
|
# Check if word is in any synonym list
|
|
for key, synonyms in KEYWORD_SYNONYMS.items():
|
|
if word in key or any(word in syn for syn in synonyms):
|
|
expanded.add(key)
|
|
expanded.update(synonyms)
|
|
|
|
return list(expanded)
|
|
|
|
def _search_sqlite_fallback(
|
|
self,
|
|
query: str,
|
|
category_id: Optional[int],
|
|
limit: int
|
|
) -> List[SearchResult]:
|
|
"""
|
|
SQLite search using keyword scoring
|
|
|
|
Scoring:
|
|
- Company name match: +10
|
|
- Description match: +5
|
|
- Service match: +8
|
|
- Competency match: +7
|
|
- City match: +3
|
|
"""
|
|
keywords = self._expand_keywords(query)
|
|
|
|
# Get all active companies
|
|
companies_query = self.db.query(Company).filter(Company.status == 'active')
|
|
if category_id:
|
|
companies_query = companies_query.filter(Company.category_id == category_id)
|
|
|
|
companies = companies_query.all()
|
|
|
|
# Score each company
|
|
scored: List[Tuple[float, Company, str]] = []
|
|
for company in companies:
|
|
score = 0.0
|
|
match_type = 'keyword'
|
|
|
|
# Name match (highest weight)
|
|
name_lower = company.name.lower()
|
|
if any(kw in name_lower for kw in keywords):
|
|
score += 10
|
|
# Exact match bonus
|
|
if query.lower() in name_lower:
|
|
score += 5
|
|
match_type = 'exact'
|
|
|
|
# Description match
|
|
if company.description_short:
|
|
desc_lower = company.description_short.lower()
|
|
if any(kw in desc_lower for kw in keywords):
|
|
score += 5
|
|
|
|
# Services match
|
|
if company.services:
|
|
for cs in company.services:
|
|
if cs.service and any(kw in cs.service.name.lower() for kw in keywords):
|
|
score += 8
|
|
break # Only count once per company
|
|
|
|
# Competencies match
|
|
if company.competencies:
|
|
for cc in company.competencies:
|
|
if cc.competency and any(kw in cc.competency.name.lower() for kw in keywords):
|
|
score += 7
|
|
break # Only count once per company
|
|
|
|
# City match
|
|
if company.address_city:
|
|
if any(kw in company.address_city.lower() for kw in keywords):
|
|
score += 3
|
|
|
|
# Founding history match (owners, history) - high weight for people search
|
|
if company.founding_history:
|
|
history_lower = company.founding_history.lower()
|
|
if any(kw in history_lower for kw in keywords):
|
|
score += 12 # High weight - owners/founders are important
|
|
|
|
# Full description match
|
|
if company.description_full:
|
|
full_lower = company.description_full.lower()
|
|
if any(kw in full_lower for kw in keywords):
|
|
score += 4
|
|
|
|
if score > 0:
|
|
scored.append((score, company, match_type))
|
|
|
|
# Sort by score descending
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
|
|
# Convert to SearchResult
|
|
return [
|
|
SearchResult(company=c, score=s, match_type=m)
|
|
for s, c, m in scored[:limit]
|
|
]
|
|
|
|
def _search_postgres_fts(
|
|
self,
|
|
query: str,
|
|
category_id: Optional[int],
|
|
limit: int
|
|
) -> List[SearchResult]:
|
|
"""
|
|
PostgreSQL full-text search with fuzzy matching
|
|
|
|
Uses:
|
|
- tsvector for full-text search
|
|
- pg_trgm for fuzzy matching (typos)
|
|
"""
|
|
keywords = self._expand_keywords(query)
|
|
|
|
# Sanitize keywords for tsquery - keep only word characters (alphanumeric + polish chars)
|
|
sanitized_keywords = [re.sub(r'[^\w]', '', kw, flags=re.UNICODE) for kw in keywords]
|
|
sanitized_keywords = [kw for kw in sanitized_keywords if kw]
|
|
|
|
# Build tsquery from sanitized keywords
|
|
tsquery_parts = [f"{kw}:*" for kw in sanitized_keywords]
|
|
tsquery = ' | '.join(tsquery_parts)
|
|
|
|
# Check if pg_trgm is available
|
|
try:
|
|
self.db.execute(text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
|
|
has_trgm = True
|
|
except Exception:
|
|
self.db.rollback()
|
|
has_trgm = False
|
|
|
|
# Build ILIKE patterns for each keyword (for multi-word searches)
|
|
# Escape LIKE wildcards in user input before wrapping with %
|
|
like_patterns = [f'%{kw.replace("%", r"\\%").replace("_", r"\\_")}%' for kw in sanitized_keywords if len(kw) > 2]
|
|
|
|
# Build SQL query with scoring for founding_history matches (owners/founders)
|
|
if has_trgm:
|
|
sql = text("""
|
|
SELECT c.id,
|
|
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score,
|
|
COALESCE(similarity(c.name, :query), 0) as fuzzy_score,
|
|
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score
|
|
FROM companies c
|
|
WHERE c.status = 'active'
|
|
AND (
|
|
c.search_vector @@ to_tsquery('simple', :tsquery)
|
|
OR similarity(c.name, :query) > 0.2
|
|
OR c.name ILIKE ANY(:like_patterns)
|
|
OR c.description_short ILIKE ANY(:like_patterns)
|
|
OR c.founding_history ILIKE ANY(:like_patterns)
|
|
OR c.description_full ILIKE ANY(:like_patterns)
|
|
)
|
|
{category_filter}
|
|
ORDER BY GREATEST(
|
|
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0),
|
|
COALESCE(similarity(c.name, :query), 0),
|
|
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END
|
|
) DESC
|
|
LIMIT :limit
|
|
""".format(
|
|
category_filter="AND c.category_id = :category_id" if category_id else ""
|
|
))
|
|
else:
|
|
# Fallback without pg_trgm
|
|
sql = text("""
|
|
SELECT c.id,
|
|
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score,
|
|
0 as fuzzy_score,
|
|
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score
|
|
FROM companies c
|
|
WHERE c.status = 'active'
|
|
AND (
|
|
c.search_vector @@ to_tsquery('simple', :tsquery)
|
|
OR c.name ILIKE ANY(:like_patterns)
|
|
OR c.description_short ILIKE ANY(:like_patterns)
|
|
OR c.founding_history ILIKE ANY(:like_patterns)
|
|
OR c.description_full ILIKE ANY(:like_patterns)
|
|
)
|
|
{category_filter}
|
|
ORDER BY GREATEST(fts_score, CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END) DESC
|
|
LIMIT :limit
|
|
""".format(
|
|
category_filter="AND c.category_id = :category_id" if category_id else ""
|
|
))
|
|
|
|
params = {
|
|
'tsquery': tsquery,
|
|
'query': query,
|
|
'like_patterns': like_patterns if like_patterns else ['%%'],
|
|
'limit': limit
|
|
}
|
|
if category_id:
|
|
params['category_id'] = category_id
|
|
|
|
try:
|
|
result = self.db.execute(sql, params)
|
|
rows = result.fetchall()
|
|
|
|
# Fetch full company objects
|
|
company_ids = [row[0] for row in rows]
|
|
if not company_ids:
|
|
# Fallback to SQLite method if FTS returns nothing
|
|
return self._search_sqlite_fallback(query, category_id, limit)
|
|
|
|
companies_map = {
|
|
c.id: c for c in self.db.query(Company).filter(Company.id.in_(company_ids)).all()
|
|
}
|
|
|
|
results = []
|
|
for row in rows:
|
|
company_id, fts_score, fuzzy_score, history_score = row
|
|
if company_id in companies_map:
|
|
# Determine match type and score
|
|
scores = {
|
|
'fts': fts_score or 0,
|
|
'fuzzy': fuzzy_score or 0,
|
|
'history': history_score or 0
|
|
}
|
|
best_match = max(scores, key=scores.get)
|
|
score = scores[best_match] * 100
|
|
results.append(SearchResult(
|
|
company=companies_map[company_id],
|
|
score=score,
|
|
match_type=best_match
|
|
))
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
# If FTS fails, rollback and fall back to SQLite method
|
|
print(f"PostgreSQL FTS error: {e}, falling back to keyword search")
|
|
self.db.rollback()
|
|
return self._search_sqlite_fallback(query, category_id, limit)
|
|
|
|
def _get_all_companies(
|
|
self,
|
|
category_id: Optional[int],
|
|
limit: int
|
|
) -> List[SearchResult]:
|
|
"""Return all companies when no query given"""
|
|
query = self.db.query(Company).filter(Company.status == 'active')
|
|
if category_id:
|
|
query = query.filter(Company.category_id == category_id)
|
|
|
|
companies = query.order_by(Company.name).limit(limit).all()
|
|
return [
|
|
SearchResult(company=c, score=0.0, match_type='all')
|
|
for c in companies
|
|
]
|
|
|
|
|
|
# Convenience function for use in routes
|
|
def search_companies(
|
|
db: Session,
|
|
query: str,
|
|
category_id: Optional[int] = None,
|
|
limit: int = 50
|
|
) -> List[SearchResult]:
|
|
"""
|
|
Search companies
|
|
|
|
Args:
|
|
db: Database session
|
|
query: Search query
|
|
category_id: Optional category filter
|
|
limit: Max results
|
|
|
|
Returns:
|
|
List of SearchResult objects
|
|
"""
|
|
service = SearchService(db)
|
|
return service.search(query, category_id, limit)
|