nordabiz/services/admission_workflow.py
Maciej Pienczyn 5faa089ce7
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(workflow): extract bulk admissions from decisions, raise pg_trgm threshold to 0.5
Meeting 2 had 5 companies in one agenda item with separate decisions.
Old code only extracted from title. Now also parses each decision for
"Przyjęto firmę X jako" pattern. Raised similarity threshold from 0.3
to 0.5 to avoid false positives (e.g. "Konkol Sp. z o.o." matching "INPI Sp. z o.o.").

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:59:03 +02:00

393 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Post-Rada Admission Workflow Engine
====================================
Automatically processes board meeting protocols to:
1. Extract admitted companies from proceedings
2. Match them to existing companies in DB
3. Create placeholder profiles for new companies
4. Notify Office Managers about companies needing attention
"""
import re
import logging
from datetime import datetime
from database import SessionLocal, BoardMeeting, Company, AdmissionWorkflowLog, User
from sqlalchemy import func, text
logger = logging.getLogger(__name__)
def run_admission_workflow(meeting_id: int, executed_by_user_id: int) -> dict:
"""Main entry point. Called in background thread after protocol publish."""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter_by(id=meeting_id).first()
if not meeting or not meeting.proceedings:
logger.warning(f"Admission workflow: meeting {meeting_id} not found or no proceedings")
return {'status': 'skipped', 'reason': 'no proceedings'}
# Idempotency: check if already processed
existing_log = db.query(AdmissionWorkflowLog).filter_by(meeting_id=meeting_id).first()
# Extract companies from proceedings
extracted = extract_admitted_companies(meeting.proceedings)
if not extracted:
logger.info(f"Admission workflow: no admissions found in meeting {meeting_id}")
if not existing_log:
log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
extracted_companies=[],
status='completed'
)
db.add(log)
db.commit()
return {'status': 'completed', 'extracted': 0}
matched = []
created = []
skipped = []
for item in extracted:
name = item['extracted_name']
# Skip if already linked to this meeting
existing_company = db.query(Company).filter(
Company.admitted_at_meeting_id == meeting_id,
func.lower(Company.name) == func.lower(name)
).first()
if existing_company:
skipped.append({'name': name, 'reason': 'already_linked', 'company_id': existing_company.id})
continue
# Try to match existing company
company, confidence = match_company_by_name(db, name)
if company and confidence >= 0.5:
# Link existing company
link_existing_company(db, company, meeting_id, meeting.meeting_date)
matched.append({
'extracted_name': name,
'matched_id': company.id,
'matched_name': company.name,
'confidence': confidence
})
else:
# Create placeholder
new_company = create_placeholder_company(db, name, meeting_id, meeting.meeting_date)
created.append({
'name': new_company.name,
'id': new_company.id,
'slug': new_company.slug
})
db.flush()
# Send notifications
notif_count, email_count = notify_office_managers(db, meeting, {
'extracted': extracted,
'matched': matched,
'created': created,
'skipped': skipped
})
# Log the workflow run
if existing_log:
# Update existing log
existing_log.extracted_companies = [e for e in extracted]
existing_log.matched_companies = matched
existing_log.created_companies = created
existing_log.skipped = skipped
existing_log.notifications_sent = notif_count
existing_log.emails_sent = email_count
existing_log.executed_at = datetime.now()
existing_log.executed_by = executed_by_user_id
else:
log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
extracted_companies=[e for e in extracted],
matched_companies=matched,
created_companies=created,
skipped=skipped,
notifications_sent=notif_count,
emails_sent=email_count,
status='completed'
)
db.add(log)
db.commit()
logger.info(
f"Admission workflow completed for meeting {meeting_id}: "
f"{len(extracted)} extracted, {len(matched)} matched, "
f"{len(created)} created, {len(skipped)} skipped"
)
return {
'status': 'completed',
'extracted': len(extracted),
'matched': len(matched),
'created': len(created),
'skipped': len(skipped)
}
except Exception as e:
logger.error(f"Admission workflow failed for meeting {meeting_id}: {e}", exc_info=True)
try:
db.rollback()
error_log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
status='failed',
error_message=str(e)
)
db.add(error_log)
db.commit()
except Exception:
pass
return {'status': 'failed', 'error': str(e)}
finally:
db.close()
def extract_admitted_companies(proceedings: list) -> list:
"""
Parse proceedings JSONB to find admission decisions.
Looks for proceedings where:
- title matches "Prezentacja firmy X -- kandydat" pattern
- decisions contain "Przyjeta/Przyjety jednoglosnie" (not "przeniesiona")
"""
results = []
for i, proc in enumerate(proceedings):
title = proc.get('title', '')
decisions = proc.get('decisions', [])
if isinstance(decisions, str):
decisions = [decisions]
# Collect all admission decisions from this proceeding
admission_decisions = []
for d in decisions:
d_lower = d.lower()
if ('przyjęt' in d_lower and 'jednogłośnie' in d_lower
and 'przeniesion' not in d_lower
and 'program' not in d_lower
and 'protokół' not in d_lower
and 'protokol' not in d_lower):
admission_decisions.append(d)
if not admission_decisions:
continue
# Try to extract company name from title first
# Pattern 1: "Prezentacja firmy X — kandydat na członka Izby"
# Pattern 2: "Prezentacja: X coach/mentoring (kandydatka na członka Izby)"
title_name = None
for pattern in [
r'[Pp]rezentacja\s+firmy\s+(.+?)\s*[—–\-]\s*kandydat',
r'[Pp]rezentacja:\s+(.+?)\s*[—–\-]\s*',
r'[Pp]rezentacja\s+firmy\s+(.+?)$',
]:
match = re.search(pattern, title)
if match:
title_name = match.group(1).strip().rstrip('.')
break
if title_name:
# Single company from title — use first admission decision
results.append({
'title': title,
'extracted_name': title_name,
'decision_text': admission_decisions[0],
'proceeding_index': i
})
else:
# Bulk admission — extract company names from each decision
# Pattern: "Przyjęto jednogłośnie firmę X jako nowego członka Izby"
for d in admission_decisions:
match = re.search(r'[Pp]rzyjęt[oa]\s+jednogłośnie\s+firmę\s+(.+?)\s+jako', d)
if match:
company_name = match.group(1).strip().rstrip('.')
results.append({
'title': title,
'extracted_name': company_name,
'decision_text': d,
'proceeding_index': i
})
return results
def match_company_by_name(db, name: str) -> tuple:
"""
Try to find existing company by name.
Returns (Company or None, confidence float).
"""
# 1. Exact case-insensitive match
exact = db.query(Company).filter(
func.lower(Company.name) == func.lower(name)
).first()
if exact:
return (exact, 1.0)
# 2. ILIKE contains match
ilike = db.query(Company).filter(
Company.name.ilike(f'%{name}%')
).first()
if ilike:
return (ilike, 0.8)
# 3. Reverse ILIKE (DB name contained in extracted name)
# e.g. extracted "Prospoland" matches DB "Pros Poland"
all_companies = db.query(Company.id, Company.name).all()
for c_id, c_name in all_companies:
if c_name and (c_name.lower() in name.lower() or name.lower() in c_name.lower()):
company = db.query(Company).filter_by(id=c_id).first()
return (company, 0.7)
# 4. pg_trgm similarity (if extension available)
try:
result = db.execute(
text("SELECT id, name, similarity(name, :name) as sim FROM companies WHERE similarity(name, :name) > 0.5 ORDER BY sim DESC LIMIT 1"),
{'name': name}
).first()
if result:
company = db.query(Company).filter_by(id=result[0]).first()
return (company, float(result[2]))
except Exception:
pass
return (None, 0.0)
def create_placeholder_company(db, name: str, meeting_id: int, meeting_date) -> Company:
"""Create a minimal placeholder company."""
import unicodedata
# Generate slug
slug = name.lower().strip()
slug = unicodedata.normalize('NFKD', slug).encode('ascii', 'ignore').decode('ascii')
slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
# Ensure unique
base_slug = slug
counter = 1
while db.query(Company).filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
company = Company(
name=name,
slug=slug,
status='pending',
data_quality='basic',
admitted_at_meeting_id=meeting_id,
member_since=meeting_date,
)
db.add(company)
db.flush() # Get ID
logger.info(f"Created placeholder company: {name} (ID {company.id}, slug {slug})")
return company
def link_existing_company(db, company, meeting_id: int, meeting_date):
"""Link existing company to a board meeting admission."""
if not company.admitted_at_meeting_id:
company.admitted_at_meeting_id = meeting_id
if not company.member_since:
company.member_since = meeting_date
logger.info(f"Linked company {company.name} (ID {company.id}) to meeting {meeting_id}")
def notify_office_managers(db, meeting, results: dict) -> tuple:
"""Send in-app notifications and emails to Office Managers."""
notif_count = 0
email_count = 0
total = len(results.get('matched', [])) + len(results.get('created', []))
needs_attention = len(results.get('created', []))
if total == 0:
return (0, 0)
# Find Office Managers and Admins
managers = db.query(User).filter(
User.role.in_(['ADMIN', 'OFFICE_MANAGER']),
User.is_active == True # noqa: E712
).all()
meeting_id_str = f"{meeting.meeting_number}/{meeting.year}"
action_url = f"/rada/posiedzenia/{meeting.id}/przyjecia"
title = f"Rada {meeting_id_str} -- nowi czlonkowie"
if needs_attention > 0:
message = f"Przyjeto {total} firm. {needs_attention} wymaga uzupelnienia profilu."
else:
message = f"Przyjeto {total} firm. Wszystkie profile sa juz uzupelnione."
for manager in managers:
try:
from utils.notifications import create_notification
create_notification(
user_id=manager.id,
title=title,
message=message,
notification_type='system',
related_type='board_meeting',
related_id=meeting.id,
action_url=action_url
)
notif_count += 1
except Exception as e:
logger.error(f"Failed to notify user {manager.id}: {e}")
# Send email to managers
try:
from email_service import send_email
# Build HTML table
rows = []
for m in results.get('matched', []):
rows.append(f"<tr><td>{m['matched_name']}</td><td style='color:green;'>Profil istnieje</td></tr>")
for c in results.get('created', []):
rows.append(f"<tr><td>{c['name']}</td><td style='color:orange;'>Wymaga uzupelnienia</td></tr>")
table_html = f"""
<table style="width:100%; border-collapse:collapse; margin:16px 0;">
<thead><tr style="background:#f3f4f6;"><th style="text-align:left;padding:8px;">Firma</th><th style="text-align:left;padding:8px;">Status</th></tr></thead>
<tbody>{''.join(rows)}</tbody>
</table>
"""
body_html = f"""
<p>Na posiedzeniu Rady <strong>{meeting_id_str}</strong> ({meeting.meeting_date.strftime('%d.%m.%Y')})
przyjeto <strong>{total}</strong> nowych czlonkow.</p>
{table_html}
{'<p><strong>Uwaga:</strong> ' + str(needs_attention) + ' firm wymaga uzupelnienia profilu na portalu.</p>' if needs_attention else ''}
<p><a href="https://nordabiznes.pl{action_url}" style="display:inline-block;padding:10px 20px;background:#2563eb;color:white;border-radius:8px;text-decoration:none;font-weight:600;">Przejdz do dashboardu przyjec</a></p>
"""
body_text = f"Na posiedzeniu Rady {meeting_id_str} przyjeto {total} nowych czlonkow. {needs_attention} wymaga uzupelnienia profilu."
for manager in managers:
if manager.email:
try:
send_email(
to=manager.email,
subject=f"[NordaBiz] Rada {meeting_id_str} -- przyjeto {total} nowych czlonkow",
body_text=body_text,
body_html=body_html,
email_type='system',
recipient_name=manager.name
)
email_count += 1
except Exception as e:
logger.error(f"Failed to email {manager.email}: {e}")
except Exception as e:
logger.error(f"Failed to send admission emails: {e}")
return (notif_count, email_count)