nordabiz/services/admission_workflow.py
Maciej Pienczyn 0a66905b1c
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(workflow): test mode emails to maciej.pienczyn@inpi.pl, fix Polish chars
Email notifications limited to test address during development.
Fixed missing Polish characters in notification titles and body.
Commented out production email loop (TODO marker for re-enable).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 08:27:04 +02:00

409 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Post-Rada Admission Workflow Engine
====================================
Automatically processes board meeting protocols to:
1. Extract admitted companies from proceedings
2. Match them to existing companies in DB
3. Create placeholder profiles for new companies
4. Notify Office Managers about companies needing attention
"""
import re
import logging
from datetime import datetime
from database import SessionLocal, BoardMeeting, Company, AdmissionWorkflowLog, User
from sqlalchemy import func, text
logger = logging.getLogger(__name__)
def run_admission_workflow(meeting_id: int, executed_by_user_id: int) -> dict:
"""Main entry point. Called in background thread after protocol publish."""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter_by(id=meeting_id).first()
if not meeting or not meeting.proceedings:
logger.warning(f"Admission workflow: meeting {meeting_id} not found or no proceedings")
return {'status': 'skipped', 'reason': 'no proceedings'}
# Idempotency: check if already processed
existing_log = db.query(AdmissionWorkflowLog).filter_by(meeting_id=meeting_id).first()
# Extract companies from proceedings
extracted = extract_admitted_companies(meeting.proceedings)
if not extracted:
logger.info(f"Admission workflow: no admissions found in meeting {meeting_id}")
if not existing_log:
log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
extracted_companies=[],
status='completed'
)
db.add(log)
db.commit()
return {'status': 'completed', 'extracted': 0}
matched = []
created = []
skipped = []
for item in extracted:
name = item['extracted_name']
# Skip if already linked to this meeting
existing_company = db.query(Company).filter(
Company.admitted_at_meeting_id == meeting_id,
func.lower(Company.name) == func.lower(name)
).first()
if existing_company:
skipped.append({'name': name, 'reason': 'already_linked', 'company_id': existing_company.id})
continue
# Try to match existing company
company, confidence = match_company_by_name(db, name)
if company and confidence >= 0.5:
# Link existing company
link_existing_company(db, company, meeting_id, meeting.meeting_date)
matched.append({
'extracted_name': name,
'matched_id': company.id,
'matched_name': company.name,
'confidence': confidence
})
else:
# Create placeholder
new_company = create_placeholder_company(db, name, meeting_id, meeting.meeting_date)
created.append({
'name': new_company.name,
'id': new_company.id,
'slug': new_company.slug
})
db.flush()
# Send notifications
notif_count, email_count = notify_office_managers(db, meeting, {
'extracted': extracted,
'matched': matched,
'created': created,
'skipped': skipped
})
# Log the workflow run
if existing_log:
# Update existing log
existing_log.extracted_companies = [e for e in extracted]
existing_log.matched_companies = matched
existing_log.created_companies = created
existing_log.skipped = skipped
existing_log.notifications_sent = notif_count
existing_log.emails_sent = email_count
existing_log.executed_at = datetime.now()
existing_log.executed_by = executed_by_user_id
else:
log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
extracted_companies=[e for e in extracted],
matched_companies=matched,
created_companies=created,
skipped=skipped,
notifications_sent=notif_count,
emails_sent=email_count,
status='completed'
)
db.add(log)
db.commit()
logger.info(
f"Admission workflow completed for meeting {meeting_id}: "
f"{len(extracted)} extracted, {len(matched)} matched, "
f"{len(created)} created, {len(skipped)} skipped"
)
return {
'status': 'completed',
'extracted': len(extracted),
'matched': len(matched),
'created': len(created),
'skipped': len(skipped)
}
except Exception as e:
logger.error(f"Admission workflow failed for meeting {meeting_id}: {e}", exc_info=True)
try:
db.rollback()
error_log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
status='failed',
error_message=str(e)
)
db.add(error_log)
db.commit()
except Exception:
pass
return {'status': 'failed', 'error': str(e)}
finally:
db.close()
def extract_admitted_companies(proceedings: list) -> list:
"""
Parse proceedings JSONB to find admission decisions.
Looks for proceedings where:
- title matches "Prezentacja firmy X -- kandydat" pattern
- decisions contain "Przyjeta/Przyjety jednoglosnie" (not "przeniesiona")
"""
results = []
for i, proc in enumerate(proceedings):
title = proc.get('title', '')
decisions = proc.get('decisions', [])
if isinstance(decisions, str):
decisions = [decisions]
# Collect all admission decisions from this proceeding
admission_decisions = []
for d in decisions:
d_lower = d.lower()
if ('przyjęt' in d_lower and 'jednogłośnie' in d_lower
and 'przeniesion' not in d_lower
and 'program' not in d_lower
and 'protokół' not in d_lower
and 'protokol' not in d_lower):
admission_decisions.append(d)
if not admission_decisions:
continue
# Try to extract company name from title first
# Pattern 1: "Prezentacja firmy X — kandydat na członka Izby"
# Pattern 2: "Prezentacja: X coach/mentoring (kandydatka na członka Izby)"
title_name = None
for pattern in [
r'[Pp]rezentacja\s+firmy\s+(.+?)\s*[—–\-]\s*kandydat',
r'[Pp]rezentacja:\s+(.+?)\s*[—–\-]\s*',
r'[Pp]rezentacja\s+firmy\s+(.+?)$',
]:
match = re.search(pattern, title)
if match:
title_name = match.group(1).strip().rstrip('.')
break
if title_name:
# Single company from title — use first admission decision
results.append({
'title': title,
'extracted_name': title_name,
'decision_text': admission_decisions[0],
'proceeding_index': i
})
else:
# Bulk admission — extract company names from each decision
# Pattern: "Przyjęto jednogłośnie firmę X jako nowego członka Izby"
for d in admission_decisions:
match = re.search(r'[Pp]rzyjęt[oa]\s+jednogłośnie\s+firmę\s+(.+?)\s+jako', d)
if match:
company_name = match.group(1).strip().rstrip('.')
results.append({
'title': title,
'extracted_name': company_name,
'decision_text': d,
'proceeding_index': i
})
return results
def match_company_by_name(db, name: str) -> tuple:
"""
Try to find existing company by name.
Returns (Company or None, confidence float).
"""
# 1. Exact case-insensitive match
exact = db.query(Company).filter(
func.lower(Company.name) == func.lower(name)
).first()
if exact:
return (exact, 1.0)
# 2. ILIKE contains match
ilike = db.query(Company).filter(
Company.name.ilike(f'%{name}%')
).first()
if ilike:
return (ilike, 0.8)
# 3. Reverse ILIKE (DB name contained in extracted name)
# e.g. extracted "Prospoland" matches DB "Pros Poland"
all_companies = db.query(Company.id, Company.name).all()
for c_id, c_name in all_companies:
if c_name and (c_name.lower() in name.lower() or name.lower() in c_name.lower()):
company = db.query(Company).filter_by(id=c_id).first()
return (company, 0.7)
# 4. pg_trgm similarity (if extension available)
try:
result = db.execute(
text("SELECT id, name, similarity(name, :name) as sim FROM companies WHERE similarity(name, :name) > 0.5 ORDER BY sim DESC LIMIT 1"),
{'name': name}
).first()
if result:
company = db.query(Company).filter_by(id=result[0]).first()
return (company, float(result[2]))
except Exception:
pass
return (None, 0.0)
def create_placeholder_company(db, name: str, meeting_id: int, meeting_date) -> Company:
"""Create a minimal placeholder company."""
import unicodedata
# Generate slug
slug = name.lower().strip()
slug = unicodedata.normalize('NFKD', slug).encode('ascii', 'ignore').decode('ascii')
slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
# Ensure unique
base_slug = slug
counter = 1
while db.query(Company).filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
company = Company(
name=name,
slug=slug,
status='pending',
data_quality='basic',
admitted_at_meeting_id=meeting_id,
member_since=meeting_date,
)
db.add(company)
db.flush() # Get ID
logger.info(f"Created placeholder company: {name} (ID {company.id}, slug {slug})")
return company
def link_existing_company(db, company, meeting_id: int, meeting_date):
"""Link existing company to a board meeting admission."""
if not company.admitted_at_meeting_id:
company.admitted_at_meeting_id = meeting_id
if not company.member_since:
company.member_since = meeting_date
logger.info(f"Linked company {company.name} (ID {company.id}) to meeting {meeting_id}")
def notify_office_managers(db, meeting, results: dict) -> tuple:
"""Send in-app notifications and emails to Office Managers."""
notif_count = 0
email_count = 0
total = len(results.get('matched', [])) + len(results.get('created', []))
needs_attention = len(results.get('created', []))
if total == 0:
return (0, 0)
# Find Office Managers and Admins
managers = db.query(User).filter(
User.role.in_(['ADMIN', 'OFFICE_MANAGER']),
User.is_active == True # noqa: E712
).all()
meeting_id_str = f"{meeting.meeting_number}/{meeting.year}"
action_url = f"/rada/posiedzenia/{meeting.id}/przyjecia"
title = f"Rada {meeting_id_str} — nowi członkowie"
if needs_attention > 0:
message = f"Przyjęto {total} firm. {needs_attention} wymaga uzupełnienia profilu."
else:
message = f"Przyjęto {total} firm. Wszystkie profile są już uzupełnione."
for manager in managers:
try:
from utils.notifications import create_notification
create_notification(
user_id=manager.id,
title=title,
message=message,
notification_type='system',
related_type='board_meeting',
related_id=meeting.id,
action_url=action_url
)
notif_count += 1
except Exception as e:
logger.error(f"Failed to notify user {manager.id}: {e}")
# Send email to managers
try:
from email_service import send_email
# Build HTML table
rows = []
for m in results.get('matched', []):
rows.append(f"<tr><td>{m['matched_name']}</td><td style='color:green;'>✓ Profil istnieje</td></tr>")
for c in results.get('created', []):
rows.append(f"<tr><td>{c['name']}</td><td style='color:orange;'>⏳ Wymaga uzupełnienia</td></tr>")
table_html = f"""
<table style="width:100%; border-collapse:collapse; margin:16px 0;">
<thead><tr style="background:#f3f4f6;"><th style="text-align:left;padding:8px;">Firma</th><th style="text-align:left;padding:8px;">Status</th></tr></thead>
<tbody>{''.join(rows)}</tbody>
</table>
"""
body_html = f"""
<p>Na posiedzeniu Rady <strong>{meeting_id_str}</strong> ({meeting.meeting_date.strftime('%d.%m.%Y')})
przyjęto <strong>{total}</strong> nowych członków.</p>
{table_html}
{'<p><strong>Uwaga:</strong> ' + str(needs_attention) + ' firm wymaga uzupełnienia profilu na portalu.</p>' if needs_attention else ''}
<p><a href="https://nordabiznes.pl{action_url}" style="display:inline-block;padding:10px 20px;background:#2563eb;color:white;border-radius:8px;text-decoration:none;font-weight:600;">Przejdź do dashboardu przyjęć</a></p>
"""
body_text = f"Na posiedzeniu Rady {meeting_id_str} przyjęto {total} nowych członków. {needs_attention} wymaga uzupełnienia profilu."
# TEST MODE: send only to maciej.pienczyn@inpi.pl during testing
test_email = 'maciej.pienczyn@inpi.pl'
try:
send_email(
to=test_email,
subject=f"[NordaBiz] Rada {meeting_id_str} — przyjęto {total} nowych członków",
body_text=body_text,
body_html=body_html,
email_type='system',
recipient_name='Maciej Pienczyn'
)
email_count = 1
except Exception as e:
logger.error(f"Failed to send test email: {e}")
# TODO: Po zakończeniu testów — odkomentować pętlę poniżej i usunąć test_email
# for manager in managers:
# if manager.email:
# try:
# send_email(
# to=manager.email,
# subject=f"[NordaBiz] Rada {meeting_id_str} — przyjęto {total} nowych członków",
# body_text=body_text,
# body_html=body_html,
# email_type='system',
# recipient_name=manager.name
# )
# email_count += 1
# except Exception as e:
# logger.error(f"Failed to email {manager.email}: {e}")
except Exception as e:
logger.error(f"Failed to send admission emails: {e}")
return (notif_count, email_count)