nordabiz/services/admission_workflow.py
Maciej Pienczyn 90203676e5
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: Post-Rada Workflow Engine + redesign /nowi-czlonkowie
- AdmissionWorkflowLog model + migration
- services/admission_workflow.py — auto-extract admitted companies from
  board meeting proceedings, match to existing companies, create
  placeholders, notify Office Managers (in-app + email)
- Hook in meeting_publish_protocol() — triggers workflow in background
- Dashboard /rada/posiedzenia/ID/przyjecia for Office Manager
- /nowi-czlonkowie redesigned: grouped by board meetings, fixed Polish
  characters, removed days-based filter

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:51:38 +02:00

396 lines
14 KiB
Python

"""
Post-Rada Admission Workflow Engine
====================================
Automatically processes board meeting protocols to:
1. Extract admitted companies from proceedings
2. Match them to existing companies in DB
3. Create placeholder profiles for new companies
4. Notify Office Managers about companies needing attention
"""
import re
import logging
from datetime import datetime
from database import SessionLocal, BoardMeeting, Company, AdmissionWorkflowLog, User
from sqlalchemy import func, text
logger = logging.getLogger(__name__)
def run_admission_workflow(meeting_id: int, executed_by_user_id: int) -> dict:
"""Main entry point. Called in background thread after protocol publish."""
db = SessionLocal()
try:
meeting = db.query(BoardMeeting).filter_by(id=meeting_id).first()
if not meeting or not meeting.proceedings:
logger.warning(f"Admission workflow: meeting {meeting_id} not found or no proceedings")
return {'status': 'skipped', 'reason': 'no proceedings'}
# Idempotency: check if already processed
existing_log = db.query(AdmissionWorkflowLog).filter_by(meeting_id=meeting_id).first()
# Extract companies from proceedings
extracted = extract_admitted_companies(meeting.proceedings)
if not extracted:
logger.info(f"Admission workflow: no admissions found in meeting {meeting_id}")
if not existing_log:
log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
extracted_companies=[],
status='completed'
)
db.add(log)
db.commit()
return {'status': 'completed', 'extracted': 0}
matched = []
created = []
skipped = []
for item in extracted:
name = item['extracted_name']
# Skip if already linked to this meeting
existing_company = db.query(Company).filter(
Company.admitted_at_meeting_id == meeting_id,
func.lower(Company.name) == func.lower(name)
).first()
if existing_company:
skipped.append({'name': name, 'reason': 'already_linked', 'company_id': existing_company.id})
continue
# Try to match existing company
company, confidence = match_company_by_name(db, name)
if company and confidence >= 0.5:
# Link existing company
link_existing_company(db, company, meeting_id, meeting.meeting_date)
matched.append({
'extracted_name': name,
'matched_id': company.id,
'matched_name': company.name,
'confidence': confidence
})
else:
# Create placeholder
new_company = create_placeholder_company(db, name, meeting_id, meeting.meeting_date)
created.append({
'name': new_company.name,
'id': new_company.id,
'slug': new_company.slug
})
db.flush()
# Send notifications
notif_count, email_count = notify_office_managers(db, meeting, {
'extracted': extracted,
'matched': matched,
'created': created,
'skipped': skipped
})
# Log the workflow run
if existing_log:
# Update existing log
existing_log.extracted_companies = [e for e in extracted]
existing_log.matched_companies = matched
existing_log.created_companies = created
existing_log.skipped = skipped
existing_log.notifications_sent = notif_count
existing_log.emails_sent = email_count
existing_log.executed_at = datetime.now()
existing_log.executed_by = executed_by_user_id
else:
log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
extracted_companies=[e for e in extracted],
matched_companies=matched,
created_companies=created,
skipped=skipped,
notifications_sent=notif_count,
emails_sent=email_count,
status='completed'
)
db.add(log)
db.commit()
logger.info(
f"Admission workflow completed for meeting {meeting_id}: "
f"{len(extracted)} extracted, {len(matched)} matched, "
f"{len(created)} created, {len(skipped)} skipped"
)
return {
'status': 'completed',
'extracted': len(extracted),
'matched': len(matched),
'created': len(created),
'skipped': len(skipped)
}
except Exception as e:
logger.error(f"Admission workflow failed for meeting {meeting_id}: {e}", exc_info=True)
try:
db.rollback()
error_log = AdmissionWorkflowLog(
meeting_id=meeting_id,
executed_by=executed_by_user_id,
status='failed',
error_message=str(e)
)
db.add(error_log)
db.commit()
except Exception:
pass
return {'status': 'failed', 'error': str(e)}
finally:
db.close()
def extract_admitted_companies(proceedings: list) -> list:
"""
Parse proceedings JSONB to find admission decisions.
Looks for proceedings where:
- title matches "Prezentacja firmy X -- kandydat" pattern
- decisions contain "Przyjeta/Przyjety jednoglosnie" (not "przeniesiona")
"""
results = []
for i, proc in enumerate(proceedings):
title = proc.get('title', '')
decisions = proc.get('decisions', [])
if isinstance(decisions, str):
decisions = [decisions]
# Check if this is an admission proceeding
is_admission = False
decision_text = ''
for d in decisions:
d_lower = d.lower()
if ('przyjęt' in d_lower and 'jednogłośnie' in d_lower
and 'przeniesion' not in d_lower
and 'program' not in d_lower
and 'protokół' not in d_lower
and 'protokol' not in d_lower):
is_admission = True
decision_text = d
break
if not is_admission:
continue
# Extract company name from title
# Pattern 1: "Prezentacja firmy X -- kandydat na czlonka Izby"
# Pattern 2: "Prezentacja firmy X - kandydat na czlonka Izby"
# Pattern 3: "Prezentacja: X -- coach/mentoring (kandydatka na czlonka Izby)"
# Pattern 4: "Prezentacja i glosowanie nad kandydatami..." (bulk - extract from decisions)
company_name = None
# Try title patterns
for pattern in [
r'[Pp]rezentacja\s+firmy\s+(.+?)\s*[—–\-]\s*kandydat',
r'[Pp]rezentacja:\s+(.+?)\s*[—–\-]\s*',
r'[Pp]rezentacja\s+firmy\s+(.+?)$',
]:
match = re.search(pattern, title)
if match:
company_name = match.group(1).strip()
break
# If no match from title, try to extract from decision text
# Pattern: "Przyjeto jednoglosnie firme X jako nowego czlonka Izby"
if not company_name:
match = re.search(r'[Pp]rzyjęt[oa]\s+jednogłośnie\s+firmę\s+(.+?)\s+jako', decision_text)
if match:
company_name = match.group(1).strip()
if company_name:
# Clean up: remove trailing dots, Sp. z o.o. standardization
company_name = company_name.rstrip('.')
results.append({
'title': title,
'extracted_name': company_name,
'decision_text': decision_text,
'proceeding_index': i
})
return results
def match_company_by_name(db, name: str) -> tuple:
"""
Try to find existing company by name.
Returns (Company or None, confidence float).
"""
# 1. Exact case-insensitive match
exact = db.query(Company).filter(
func.lower(Company.name) == func.lower(name)
).first()
if exact:
return (exact, 1.0)
# 2. ILIKE contains match
ilike = db.query(Company).filter(
Company.name.ilike(f'%{name}%')
).first()
if ilike:
return (ilike, 0.8)
# 3. Reverse ILIKE (DB name contained in extracted name)
# e.g. extracted "Prospoland" matches DB "Pros Poland"
all_companies = db.query(Company.id, Company.name).all()
for c_id, c_name in all_companies:
if c_name and (c_name.lower() in name.lower() or name.lower() in c_name.lower()):
company = db.query(Company).filter_by(id=c_id).first()
return (company, 0.7)
# 4. pg_trgm similarity (if extension available)
try:
result = db.execute(
text("SELECT id, name, similarity(name, :name) as sim FROM companies WHERE similarity(name, :name) > 0.3 ORDER BY sim DESC LIMIT 1"),
{'name': name}
).first()
if result:
company = db.query(Company).filter_by(id=result[0]).first()
return (company, float(result[2]))
except Exception:
pass
return (None, 0.0)
def create_placeholder_company(db, name: str, meeting_id: int, meeting_date) -> Company:
"""Create a minimal placeholder company."""
import unicodedata
# Generate slug
slug = name.lower().strip()
slug = unicodedata.normalize('NFKD', slug).encode('ascii', 'ignore').decode('ascii')
slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
# Ensure unique
base_slug = slug
counter = 1
while db.query(Company).filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
company = Company(
name=name,
slug=slug,
status='pending',
data_quality='basic',
admitted_at_meeting_id=meeting_id,
member_since=meeting_date,
)
db.add(company)
db.flush() # Get ID
logger.info(f"Created placeholder company: {name} (ID {company.id}, slug {slug})")
return company
def link_existing_company(db, company, meeting_id: int, meeting_date):
"""Link existing company to a board meeting admission."""
if not company.admitted_at_meeting_id:
company.admitted_at_meeting_id = meeting_id
if not company.member_since:
company.member_since = meeting_date
logger.info(f"Linked company {company.name} (ID {company.id}) to meeting {meeting_id}")
def notify_office_managers(db, meeting, results: dict) -> tuple:
"""Send in-app notifications and emails to Office Managers."""
notif_count = 0
email_count = 0
total = len(results.get('matched', [])) + len(results.get('created', []))
needs_attention = len(results.get('created', []))
if total == 0:
return (0, 0)
# Find Office Managers and Admins
managers = db.query(User).filter(
User.role.in_(['ADMIN', 'OFFICE_MANAGER']),
User.is_active == True # noqa: E712
).all()
meeting_id_str = f"{meeting.meeting_number}/{meeting.year}"
action_url = f"/rada/posiedzenia/{meeting.id}/przyjecia"
title = f"Rada {meeting_id_str} -- nowi czlonkowie"
if needs_attention > 0:
message = f"Przyjeto {total} firm. {needs_attention} wymaga uzupelnienia profilu."
else:
message = f"Przyjeto {total} firm. Wszystkie profile sa juz uzupelnione."
for manager in managers:
try:
from utils.notifications import create_notification
create_notification(
user_id=manager.id,
title=title,
message=message,
notification_type='system',
related_type='board_meeting',
related_id=meeting.id,
action_url=action_url
)
notif_count += 1
except Exception as e:
logger.error(f"Failed to notify user {manager.id}: {e}")
# Send email to managers
try:
from email_service import send_email
# Build HTML table
rows = []
for m in results.get('matched', []):
rows.append(f"<tr><td>{m['matched_name']}</td><td style='color:green;'>Profil istnieje</td></tr>")
for c in results.get('created', []):
rows.append(f"<tr><td>{c['name']}</td><td style='color:orange;'>Wymaga uzupelnienia</td></tr>")
table_html = f"""
<table style="width:100%; border-collapse:collapse; margin:16px 0;">
<thead><tr style="background:#f3f4f6;"><th style="text-align:left;padding:8px;">Firma</th><th style="text-align:left;padding:8px;">Status</th></tr></thead>
<tbody>{''.join(rows)}</tbody>
</table>
"""
body_html = f"""
<p>Na posiedzeniu Rady <strong>{meeting_id_str}</strong> ({meeting.meeting_date.strftime('%d.%m.%Y')})
przyjeto <strong>{total}</strong> nowych czlonkow.</p>
{table_html}
{'<p><strong>Uwaga:</strong> ' + str(needs_attention) + ' firm wymaga uzupelnienia profilu na portalu.</p>' if needs_attention else ''}
<p><a href="https://nordabiznes.pl{action_url}" style="display:inline-block;padding:10px 20px;background:#2563eb;color:white;border-radius:8px;text-decoration:none;font-weight:600;">Przejdz do dashboardu przyjec</a></p>
"""
body_text = f"Na posiedzeniu Rady {meeting_id_str} przyjeto {total} nowych czlonkow. {needs_attention} wymaga uzupelnienia profilu."
for manager in managers:
if manager.email:
try:
send_email(
to=manager.email,
subject=f"[NordaBiz] Rada {meeting_id_str} -- przyjeto {total} nowych czlonkow",
body_text=body_text,
body_html=body_html,
email_type='system',
recipient_name=manager.name
)
email_count += 1
except Exception as e:
logger.error(f"Failed to email {manager.email}: {e}")
except Exception as e:
logger.error(f"Failed to send admission emails: {e}")
return (notif_count, email_count)