nordabiz/blueprints/auth/routes.py
Maciej Pienczyn 825c79c399
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: add user engagement tracking (login_count, last_active_at, page_views_count)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 19:08:45 +01:00

1356 lines
50 KiB
Python

"""
Auth Routes
============
Authentication routes: login, logout, register, password reset, email verification, 2FA.
"""
import os
import re
import logging
import secrets
from datetime import datetime, timedelta
from flask import render_template, request, redirect, url_for, flash, session, current_app
from flask_login import login_required, login_user, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from . import bp
from database import SessionLocal, User, Company, UserBlock, UserCompany, SystemRole
from utils.helpers import sanitize_input, validate_email, validate_password
from extensions import limiter
from security_service import log_audit
# Logger
logger = logging.getLogger(__name__)
# Security logger (same name as in app.py for fail2ban integration)
security_logger = logging.getLogger('security')
# Security service availability check
try:
from security_service import (
generate_totp_secret, get_totp_uri, verify_totp,
generate_backup_codes, verify_backup_code, log_audit
)
SECURITY_SERVICE_AVAILABLE = True
except ImportError:
SECURITY_SERVICE_AVAILABLE = False
logger.warning("Security service not available in auth blueprint")
def _auto_link_person(db, user):
"""Auto-link User to Person record by name match (if not already linked)."""
if user.person_id or not user.name:
return
try:
from database import Person
name_parts = user.name.strip().split()
if len(name_parts) < 2:
return
# Try exact match: first name + last name (case-insensitive)
first_name = name_parts[0].upper()
last_name = name_parts[-1].upper()
from sqlalchemy import func
person = db.query(Person).filter(
func.upper(Person.nazwisko) == last_name,
func.upper(Person.imiona).like(f'{first_name}%'),
).first()
if person:
user.person_id = person.id
logger.info(f"Auto-linked user {user.id} ({user.name}) to person {person.id}")
except Exception as e:
logger.error(f"Auto-link person failed for user {user.id}: {e}")
def _send_registration_notification(user_info):
"""Send email notification when a new user registers"""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured - skipping registration notification")
return
notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl')
if not notify_email:
return
reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
is_member = "TAK" if user_info.get('is_norda_member') else "NIE"
company_name = user_info.get('company_name', 'Brak przypisanej firmy')
subject = f"NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}"
body_text = f"""NOWA REJESTRACJA NA NORDABIZNES.PL
{'='*50}
Czas: {reg_time}
Imie: {user_info.get('name', 'N/A')}
Email: {user_info.get('email', 'N/A')}
NIP: {user_info.get('company_nip', 'N/A')}
Firma: {company_name}
Czlonek NORDA: {is_member}
{'='*50}
"""
send_email(
to=[notify_email],
subject=subject,
body_text=body_text,
email_type='registration_notification'
)
logger.info(f"Registration notification sent for {user_info.get('email')}")
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
# ============================================================
# REGISTRATION
# ============================================================
@bp.route('/register', methods=['GET', 'POST'])
@limiter.limit("50 per hour;200 per day")
def register():
"""User registration"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
name = sanitize_input(request.form.get('name', ''), 255)
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
# Validate email
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/register.html')
# Validate password
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/register.html')
# Validate required fields
if not name or not email or not company_nip:
flash('Imię, email i NIP firmy są wymagane.', 'error')
return render_template('auth/register.html')
# Validate full name (first + last name)
if len(name.split()) < 2:
flash('Podaj imię i nazwisko (np. Jan Kowalski).', 'error')
return render_template('auth/register.html')
# Validate NIP format and checksum
if not re.match(r'^\d{10}$', company_nip):
flash('NIP musi skladac sie z 10 cyfr.', 'error')
return render_template('auth/register.html')
nip_weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
nip_checksum = sum(int(company_nip[i]) * nip_weights[i] for i in range(9)) % 11
if nip_checksum != int(company_nip[9]):
flash('Nieprawidlowy NIP - blad sumy kontrolnej. Sprawdz poprawnosc numeru.', 'error')
return render_template('auth/register.html')
db = SessionLocal()
try:
# Check if user exists
if db.query(User).filter_by(email=email).first():
flash('Email juz jest zarejestrowany.', 'error')
return render_template('auth/register.html')
# Check if company is NORDA member
is_norda_member = False
company_id = None
if company_nip and re.match(r'^\d{10}$', company_nip):
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
if company:
is_norda_member = True
company_id = company.id
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Create user
user = User(
email=email,
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
name=name,
company_nip=company_nip,
company_id=company_id,
company_role='NONE',
is_norda_member=is_norda_member,
created_at=datetime.now(),
is_active=True,
is_verified=False,
verification_token=verification_token,
verification_token_expires=verification_expires
)
db.add(user)
db.flush() # Get user.id before creating UserCompany
# Create user_companies record if company matched
if company_id:
uc = UserCompany(user_id=user.id, company_id=company_id, role='NONE', is_primary=True)
db.add(uc)
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send verification email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, name, verification_url)
if success:
logger.info(f"Verification email sent to {email}")
else:
logger.warning(f"Failed to send verification email to {email}")
logger.info(f"Verification token (email failed) for {email}: {verification_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Verification token (no email) for {email}: {verification_token[:8]}...")
except Exception as e:
logger.error(f"Error sending verification email: {e}")
logger.info(f"Verification token (exception) for {email}: {verification_token[:8]}...")
logger.info(f"New user registered: {email}")
# Send notification to admin about new registration
try:
company_name = company.name if company_id and company else None
_send_registration_notification({
'name': name,
'email': email,
'company_nip': company_nip,
'company_name': company_name,
'is_norda_member': is_norda_member
})
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
# Store email in session for success page
session['registered_email'] = email
return redirect(url_for('auth.registration_success'))
except Exception as e:
logger.error(f"Registration error: {e}")
flash('Wystapil blad podczas rejestracji. Sprobuj ponownie.', 'error')
return render_template('auth/register.html')
finally:
db.close()
return render_template('auth/register.html')
# ============================================================
# REGISTRATION SUCCESS
# ============================================================
@bp.route('/registration-success')
def registration_success():
"""Show registration success page with email verification instructions."""
# Get email from session (set during registration)
email = session.pop('registered_email', None)
if not email:
# No email in session - redirect to register
return redirect(url_for('auth.register'))
return render_template('auth/registration_success.html', email=email)
@bp.route('/check-verification-status')
def check_verification_status():
"""API endpoint to check if email has been verified (for polling from success page)."""
from flask import jsonify
email = request.args.get('email', '')
if not email:
return jsonify({'verified': False, 'error': 'No email provided'}), 400
db = SessionLocal()
try:
user = db.query(User).filter(User.email == email).first()
if not user:
return jsonify({'verified': False, 'error': 'User not found'}), 404
if user.is_verified:
return jsonify({'verified': True, 'redirect': url_for('dashboard')})
return jsonify({'verified': False})
finally:
db.close()
# ============================================================
# LOGIN
# ============================================================
@bp.route('/login', methods=['GET', 'POST'])
@limiter.limit("1000 per hour" if os.getenv('FLASK_ENV') == 'development' else "60 per minute;500 per hour")
def login():
"""User login"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# Basic validation
if not email or not password:
flash('Email i haslo sa wymagane.', 'error')
return render_template('auth/login.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email).first()
# Get client IP for logging
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
# Check if account is locked
if user and user.locked_until and user.locked_until > datetime.now():
remaining = (user.locked_until - datetime.now()).seconds // 60 + 1
security_logger.warning(f"LOCKED_ACCOUNT ip={client_ip} email={email}")
flash(f'Konto tymczasowo zablokowane. Sprobuj za {remaining} minut.', 'error')
return render_template('auth/login.html')
if not user or not check_password_hash(user.password_hash, password):
logger.warning(f"Failed login attempt for: {email}")
security_logger.warning(f"FAILED_LOGIN ip={client_ip} email={email}")
# Log failed login to audit
log_audit(db, 'login_failed', 'user', entity_name=email,
details={'reason': 'invalid_credentials', 'ip': client_ip})
db.commit()
# Increment failed attempts if user exists
if user:
user.failed_login_attempts = (user.failed_login_attempts or 0) + 1
# Lock account after 5 failed attempts (30 min lockout)
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now() + timedelta(minutes=30)
security_logger.warning(f"ACCOUNT_LOCKED ip={client_ip} email={email} attempts={user.failed_login_attempts}")
db.commit()
flash('Zbyt wiele nieudanych prob. Konto zablokowane na 30 minut.', 'error')
return render_template('auth/login.html')
db.commit()
flash('Nieprawidlowy email lub haslo.', 'error')
return render_template('auth/login.html')
if not user.is_active:
flash('Konto zostalo dezaktywowane.', 'error')
return render_template('auth/login.html')
# Require email verification
if not user.is_verified:
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
return render_template('auth/login.html')
# Reset failed attempts on successful login
user.failed_login_attempts = 0
user.locked_until = None
# Check if user has 2FA enabled
if user.totp_enabled and SECURITY_SERVICE_AVAILABLE:
# Store pending login in session for 2FA verification
session['2fa_pending_user_id'] = user.id
session['2fa_remember'] = remember
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
session['2fa_next'] = next_page
db.commit()
logger.info(f"2FA required for user: {email}")
return redirect(url_for('auth.verify_2fa'))
# No 2FA - login directly
login_user(user, remember=remember)
user.last_login = datetime.now()
user.login_count = (user.login_count or 0) + 1
_auto_link_person(db, user)
# Log successful login to audit
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': 'password', 'ip': client_ip})
db.commit()
logger.info(f"User logged in: {email}")
next_page = request.args.get('next')
# Prevent open redirect vulnerability
if next_page and not next_page.startswith('/'):
next_page = None
return redirect(next_page or url_for('dashboard'))
except Exception as e:
logger.error(f"Login error: {e}")
flash('Wystapil blad podczas logowania. Sprobuj ponownie.', 'error')
return render_template('auth/login.html')
finally:
db.close()
return render_template('auth/login.html')
# ============================================================
# LOGOUT
# ============================================================
@bp.route('/logout')
@login_required
def logout():
"""User logout"""
# Log logout to audit
db = SessionLocal()
try:
log_audit(db, 'logout', 'user', entity_id=current_user.id,
entity_name=current_user.email)
db.commit()
finally:
db.close()
# Clear 2FA session flag
session.pop('2fa_verified', None)
logout_user()
flash('Wylogowano pomyslnie.', 'success')
return redirect(url_for('index'))
# ============================================================
# TWO-FACTOR AUTHENTICATION
# ============================================================
@bp.route('/verify-2fa', methods=['GET', 'POST'])
@limiter.limit("10 per minute")
def verify_2fa():
"""Verify 2FA code during login"""
# Check if there's a pending 2FA login
pending_user_id = session.get('2fa_pending_user_id')
if not pending_user_id:
flash('Sesja wygasla. Zaloguj sie ponownie.', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
code = request.form.get('code', '').strip()
use_backup = request.form.get('use_backup', False)
if not code:
flash('Wprowadz kod weryfikacyjny.', 'error')
return render_template('auth/verify_2fa.html')
db = SessionLocal()
try:
user = db.query(User).get(pending_user_id)
if not user:
session.pop('2fa_pending_user_id', None)
flash('Uzytkownik nie istnieje.', 'error')
return redirect(url_for('login'))
# Verify code
if SECURITY_SERVICE_AVAILABLE:
if use_backup:
valid = verify_backup_code(user, code, db)
else:
valid = verify_totp(user, code)
else:
valid = False
if valid:
# Clear pending login and log in
session.pop('2fa_pending_user_id', None)
remember = session.pop('2fa_remember', False)
next_page = session.pop('2fa_next', None)
login_user(user, remember=remember)
session['2fa_verified'] = True
user.last_login = datetime.now()
user.login_count = (user.login_count or 0) + 1
_auto_link_person(db, user)
# Log successful 2FA login to audit
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': '2fa', 'ip': client_ip})
db.commit()
logger.info(f"User logged in with 2FA: {user.email}")
flash('Zalogowano pomyslnie.', 'success')
return redirect(next_page or url_for('dashboard'))
else:
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
security_logger.warning(f"INVALID_2FA ip={client_ip} user_id={pending_user_id}")
flash('Nieprawidlowy kod weryfikacyjny.', 'error')
except Exception as e:
logger.error(f"2FA verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/verify_2fa.html')
@bp.route('/settings/2fa', methods=['GET', 'POST'])
@login_required
def settings_2fa():
"""2FA settings - enable/disable"""
db = SessionLocal()
try:
user = db.query(User).get(current_user.id)
if request.method == 'POST':
action = request.form.get('action')
if action == 'setup':
# Generate new secret
if SECURITY_SERVICE_AVAILABLE:
secret = generate_totp_secret()
if secret:
user.totp_secret = secret
user.totp_enabled = False # Not enabled until verified
db.commit()
qr_uri = get_totp_uri(user)
return render_template('auth/2fa_setup.html',
qr_uri=qr_uri,
secret=secret)
flash('Blad konfiguracji 2FA.', 'error')
elif action == 'verify_setup':
# Verify the setup code and enable 2FA
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
user.totp_enabled = True
# Generate backup codes
backup_codes = generate_backup_codes(8)
user.totp_backup_codes = backup_codes
db.commit()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, '2fa.enabled', 'user', user.id, user.email)
db.commit()
logger.info(f"2FA enabled for user: {user.email}")
return render_template('auth/2fa_backup_codes.html',
backup_codes=backup_codes)
else:
flash('Nieprawidlowy kod. Sprobuj ponownie.', 'error')
qr_uri = get_totp_uri(user)
return render_template('auth/2fa_setup.html',
qr_uri=qr_uri,
secret=user.totp_secret)
elif action == 'disable':
# Require current code to disable
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
user.totp_enabled = False
user.totp_secret = None
user.totp_backup_codes = None
db.commit()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, '2fa.disabled', 'user', user.id, user.email)
db.commit()
logger.info(f"2FA disabled for user: {user.email}")
flash('Uwierzytelnianie dwuskladnikowe zostalo wylaczone.', 'success')
else:
flash('Nieprawidlowy kod. Nie mozna wylaczyc 2FA.', 'error')
elif action == 'regenerate_backup':
# Require current code to regenerate backup codes
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
backup_codes = generate_backup_codes(8)
user.totp_backup_codes = backup_codes
db.commit()
logger.info(f"Backup codes regenerated for user: {user.email}")
return render_template('auth/2fa_backup_codes.html',
backup_codes=backup_codes)
else:
flash('Nieprawidlowy kod. Nie mozna wygenerowac kodow.', 'error')
return render_template('auth/2fa_settings.html',
totp_enabled=user.totp_enabled,
backup_codes_count=len(user.totp_backup_codes) if user.totp_backup_codes else 0)
except Exception as e:
logger.error(f"2FA settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('dashboard'))
finally:
db.close()
# ============================================================
# PRIVACY SETTINGS
# ============================================================
@bp.route('/settings/privacy', methods=['GET', 'POST'])
@login_required
def settings_privacy():
"""Privacy settings - control visibility of phone and email"""
db = SessionLocal()
try:
user = db.query(User).get(current_user.id)
if request.method == 'POST':
# Update privacy settings
user.privacy_show_phone = request.form.get('show_phone') == 'on'
user.privacy_show_email = request.form.get('show_email') == 'on'
# Update contact preferences
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
user.contact_note = request.form.get('contact_note', '').strip() or None
db.commit()
logger.info(f"Privacy settings updated for user: {user.email}")
flash('Ustawienia prywatnosci zostaly zapisane.', 'success')
return redirect(url_for('auth.settings_privacy'))
return render_template('settings/privacy.html',
user=user,
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
except Exception as e:
logger.error(f"Privacy settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('dashboard'))
finally:
db.close()
# ============================================================
# BLOCK SETTINGS
# ============================================================
@bp.route('/settings/blocks', methods=['GET'])
@login_required
def settings_blocks():
"""Manage blocked users"""
db = SessionLocal()
try:
# Get list of blocked users
blocks = db.query(UserBlock).filter(
UserBlock.user_id == current_user.id
).all()
# Get all users for blocking (exclude self and already blocked)
blocked_ids = [b.blocked_user_id for b in blocks]
blocked_ids.append(current_user.id)
available_users = db.query(User).filter(
User.id.notin_(blocked_ids),
User.is_active == True
).order_by(User.name).all()
return render_template('settings/blocks.html',
blocks=blocks,
available_users=available_users)
finally:
db.close()
@bp.route('/settings/blocks/add', methods=['POST'])
@login_required
def settings_blocks_add():
"""Block a user"""
user_id = request.form.get('user_id', type=int)
reason = request.form.get('reason', '').strip()
if not user_id or user_id == current_user.id:
flash('Nieprawidlowy uzytkownik.', 'error')
return redirect(url_for('auth.settings_blocks'))
db = SessionLocal()
try:
# Check if already blocked
existing = db.query(UserBlock).filter(
UserBlock.user_id == current_user.id,
UserBlock.blocked_user_id == user_id
).first()
if existing:
flash('Ten uzytkownik jest juz zablokowany.', 'warning')
return redirect(url_for('auth.settings_blocks'))
block = UserBlock(
user_id=current_user.id,
blocked_user_id=user_id,
reason=reason if reason else None
)
db.add(block)
db.commit()
logger.info(f"User {current_user.id} blocked user {user_id}")
flash('Uzytkownik zostal zablokowany.', 'success')
except Exception as e:
logger.error(f"Error blocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.settings_blocks'))
@bp.route('/settings/blocks/remove/<int:block_id>', methods=['POST'])
@login_required
def settings_blocks_remove(block_id):
"""Unblock a user"""
db = SessionLocal()
try:
block = db.query(UserBlock).filter(
UserBlock.id == block_id,
UserBlock.user_id == current_user.id
).first()
if not block:
flash('Blokada nie istnieje.', 'error')
return redirect(url_for('auth.settings_blocks'))
blocked_user_id = block.blocked_user_id
db.delete(block)
db.commit()
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
flash('Uzytkownik zostal odblokowany.', 'success')
except Exception as e:
logger.error(f"Error unblocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.settings_blocks'))
# ============================================================
# MOJE KONTO - User Account Settings
# ============================================================
@bp.route('/konto')
@login_required
def konto_dane():
"""User profile - edit personal data"""
return render_template('konto/dane.html')
@bp.route('/konto', methods=['POST'])
@login_required
def konto_dane_post():
"""Save user profile changes"""
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if user:
name = sanitize_input(request.form.get('name', ''), 255)
phone = sanitize_input(request.form.get('phone', ''), 50)
user.name = name if name else None
user.phone = phone if phone else None
db.commit()
# Update current_user session
current_user.name = user.name
current_user.phone = user.phone
logger.info(f"Profile updated for user: {user.email}")
flash('Dane zostaly zapisane.', 'success')
except Exception as e:
logger.error(f"Profile update error: {e}")
flash('Wystapil blad podczas zapisywania.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/avatar', methods=['POST'])
@login_required
def konto_avatar_upload():
"""Upload or replace user avatar photo"""
from file_upload_service import FileUploadService, _BASE_DIR
from PIL import Image
file = request.files.get('avatar')
if not file or file.filename == '':
flash('Nie wybrano pliku.', 'error')
return redirect(url_for('auth.konto_dane'))
# Validate using existing service
is_valid, error = FileUploadService.validate_file(file)
if not is_valid:
flash(error, 'error')
return redirect(url_for('auth.konto_dane'))
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if not user:
flash('Nie znaleziono użytkownika.', 'error')
return redirect(url_for('auth.konto_dane'))
# Generate filename and path
ext = file.filename.rsplit('.', 1)[-1].lower()
if ext == 'jpeg':
ext = 'jpg'
import uuid
stored_filename = f"{uuid.uuid4()}.{ext}"
now = datetime.now()
avatar_dir = os.path.join(_BASE_DIR, 'static', 'uploads', 'avatars', str(now.year), f"{now.month:02d}")
os.makedirs(avatar_dir, exist_ok=True)
file_path = os.path.join(avatar_dir, stored_filename)
# Resize to 300x300 square crop, strip EXIF
img = Image.open(file)
if img.mode in ('RGBA', 'LA', 'P') and ext == 'jpg':
img = img.convert('RGB')
elif img.mode not in ('RGB', 'RGBA'):
img = img.convert('RGB')
# Center crop to square
w, h = img.size
side = min(w, h)
left = (w - side) // 2
top = (h - side) // 2
img = img.crop((left, top, left + side, top + side))
img = img.resize((300, 300), Image.LANCZOS)
# Save
save_kwargs = {'optimize': True}
if ext == 'jpg':
save_kwargs['quality'] = 85
img.save(file_path, **save_kwargs)
# Delete old avatar if exists
if user.avatar_path:
old_path = os.path.join(_BASE_DIR, 'static', user.avatar_path)
if os.path.exists(old_path):
try:
os.remove(old_path)
except OSError:
pass
# Save relative path
user.avatar_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static'))
db.commit()
current_user.avatar_path = user.avatar_path
logger.info(f"Avatar uploaded for user {user.id}: {user.avatar_path}")
flash('Zdjęcie profilowe zostało zapisane.', 'success')
except Exception as e:
db.rollback()
logger.error(f"Avatar upload error: {e}")
flash('Wystąpił błąd podczas zapisywania zdjęcia.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/avatar/delete', methods=['POST'])
@login_required
def konto_avatar_delete():
"""Delete user avatar photo"""
from file_upload_service import _BASE_DIR
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if user and user.avatar_path:
# Delete file
old_path = os.path.join(_BASE_DIR, 'static', user.avatar_path)
if os.path.exists(old_path):
try:
os.remove(old_path)
except OSError:
pass
user.avatar_path = None
db.commit()
current_user.avatar_path = None
logger.info(f"Avatar deleted for user {user.id}")
flash('Zdjęcie profilowe zostało usunięte.', 'success')
except Exception as e:
db.rollback()
logger.error(f"Avatar delete error: {e}")
flash('Wystąpił błąd podczas usuwania zdjęcia.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/prywatnosc', methods=['GET', 'POST'])
@login_required
def konto_prywatnosc():
"""Privacy settings - control visibility of phone and email"""
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if request.method == 'POST':
user.privacy_show_phone = request.form.get('show_phone') == 'on'
user.privacy_show_email = request.form.get('show_email') == 'on'
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
user.notify_email_messages = request.form.get('notify_email_messages') == 'on'
db.commit()
logger.info(f"Privacy settings updated for user: {user.email}")
flash('Ustawienia prywatności zostały zapisane.', 'success')
return redirect(url_for('auth.konto_prywatnosc'))
return render_template('konto/prywatnosc.html',
user=user,
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
except Exception as e:
logger.error(f"Privacy settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('auth.konto_dane'))
finally:
db.close()
@bp.route('/konto/bezpieczenstwo')
@login_required
def konto_bezpieczenstwo():
"""Security settings - 2FA, password"""
return render_template('konto/bezpieczenstwo.html')
@bp.route('/konto/blokady')
@login_required
def konto_blokady():
"""User blocks management"""
db = SessionLocal()
try:
blocks = db.query(UserBlock).filter_by(user_id=current_user.id).all()
blocked_ids = [b.blocked_user_id for b in blocks]
blocked_ids.append(current_user.id)
available_users = db.query(User).filter(
User.id.notin_(blocked_ids),
User.is_active == True
).order_by(User.name).all()
return render_template('konto/blokady.html',
blocks=blocks,
available_users=available_users)
except Exception as e:
logger.error(f"Blocks page error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('auth.konto_dane'))
finally:
db.close()
@bp.route('/konto/blokady/dodaj', methods=['POST'])
@login_required
def konto_blokady_dodaj():
"""Block a user"""
db = SessionLocal()
try:
user_id = request.form.get('user_id', type=int)
if not user_id or user_id == current_user.id:
flash('Nieprawidlowy uzytkownik.', 'error')
return redirect(url_for('auth.konto_blokady'))
existing = db.query(UserBlock).filter_by(
user_id=current_user.id,
blocked_user_id=user_id
).first()
if existing:
flash('Ten uzytkownik jest juz zablokowany.', 'info')
return redirect(url_for('auth.konto_blokady'))
block = UserBlock(user_id=current_user.id, blocked_user_id=user_id)
db.add(block)
db.commit()
logger.info(f"User {current_user.id} blocked user {user_id}")
flash('Uzytkownik zostal zablokowany.', 'success')
except Exception as e:
logger.error(f"Error blocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_blokady'))
@bp.route('/konto/integracje')
@login_required
def konto_integracje():
"""OAuth integrations page for MANAGER+.
Allows connecting Google/Meta accounts for enriched audit data.
"""
if not current_user.has_role(SystemRole.MANAGER):
flash('Ta strona wymaga uprawnień kadry zarządzającej.', 'error')
return redirect(url_for('auth.konto_dane'))
if not current_user.company_id:
flash('Musisz byc przypisany do firmy, aby korzystac z integracji.', 'info')
return redirect(url_for('auth.konto_dane'))
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == current_user.company_id).first()
if not company:
flash('Firma nie istnieje.', 'error')
return redirect(url_for('auth.konto_dane'))
from oauth_service import OAuthService
oauth = OAuthService()
connections = oauth.get_connected_services(db, current_user.company_id)
oauth_available = {
'google': bool(oauth.google_client_id),
'meta': bool(oauth.meta_app_id),
}
return render_template(
'admin/company_settings.html',
company=company,
connections=connections,
oauth_available=oauth_available,
is_user_view=True,
)
finally:
db.close()
@bp.route('/konto/blokady/usun/<int:block_id>', methods=['POST'])
@login_required
def konto_blokady_usun(block_id):
"""Unblock a user"""
db = SessionLocal()
try:
block = db.query(UserBlock).filter(
UserBlock.id == block_id,
UserBlock.user_id == current_user.id
).first()
if not block:
flash('Blokada nie istnieje.', 'error')
return redirect(url_for('auth.konto_blokady'))
blocked_user_id = block.blocked_user_id
db.delete(block)
db.commit()
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
flash('Uzytkownik zostal odblokowany.', 'success')
except Exception as e:
logger.error(f"Error unblocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_blokady'))
# ============================================================
# PASSWORD RESET
# ============================================================
@bp.route('/forgot-password', methods=['GET', 'POST'])
@limiter.limit("20 per hour")
def forgot_password():
"""Request password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/forgot_password.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user:
# Generate reset token
reset_token = secrets.token_urlsafe(32)
reset_expires = datetime.now() + timedelta(hours=1)
# Save token to database
user.reset_token = reset_token
user.reset_token_expires = reset_expires
db.commit()
# Build reset URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
reset_url = f"{base_url}/reset-password/{reset_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_password_reset_email(email, reset_url)
if success:
logger.info(f"Password reset email sent to {email}")
else:
logger.warning(f"Failed to send password reset email to {email}")
logger.info(f"Reset token (email failed) for {email}: {reset_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Reset token (no email) for {email}: {reset_token[:8]}...")
except Exception as e:
logger.error(f"Error sending reset email: {e}")
logger.info(f"Reset token (exception) for {email}: {reset_token[:8]}...")
# Always show same message to prevent email enumeration
flash('Jesli email istnieje w systemie, instrukcje resetowania hasla zostaly wyslane.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Password reset error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/forgot_password.html')
@bp.route('/reset-password/<token>', methods=['GET', 'POST'])
@limiter.limit("30 per hour")
def reset_password(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
db = SessionLocal()
try:
# Find user with valid token
user = db.query(User).filter(
User.reset_token == token,
User.reset_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link resetowania hasla jest nieprawidlowy lub wygasl.', 'error')
return redirect(url_for('auth.forgot_password'))
if request.method == 'POST':
password = request.form.get('password', '')
password_confirm = request.form.get('password_confirm', '')
# Validate passwords match
if password != password_confirm:
flash('Hasla nie sa identyczne.', 'error')
return render_template('auth/reset_password.html', token=token)
# Validate password strength
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/reset_password.html', token=token)
# Update password and clear reset token
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
user.reset_token = None
user.reset_token_expires = None
# Auto-verify email - user proved inbox access by using reset link
if not user.is_verified:
user.is_verified = True
user.verified_at = datetime.now()
user.verification_token = None
user.verification_token_expires = None
logger.info(f"Email auto-verified via password reset for {user.email}")
# Notify admin if this is a first-time activation (user never logged in)
first_activation = user.last_login is None
db.commit()
logger.info(f"Password reset successful for {user.email}")
if first_activation:
try:
import email_service
email_service.send_email(
to=['maciej.pienczyn@inpi.pl'],
subject=f'Aktywacja konta: {user.name}',
body_text=f'{user.name} ({user.email}) właśnie ustawił(a) hasło i aktywował(a) swoje konto w portalu Norda Biznes Partner.',
email_type='admin_notification',
bcc=[]
)
except Exception as notify_err:
logger.warning(f"Failed to send activation notification: {notify_err}")
flash('Haslo zostalo zmienione. Mozesz sie teraz zalogowac.', 'success')
return redirect(url_for('login'))
return render_template('auth/reset_password.html', token=token)
except Exception as e:
logger.error(f"Reset password error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
return redirect(url_for('auth.forgot_password'))
finally:
db.close()
# ============================================================
# EMAIL VERIFICATION
# ============================================================
@bp.route('/verify-email/<token>')
def verify_email(token):
"""Verify email address with token"""
db = SessionLocal()
try:
user = db.query(User).filter(
User.verification_token == token,
User.verification_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link weryfikacyjny jest nieprawidlowy lub wygasl.', 'error')
return redirect(url_for('login'))
if user.is_verified:
# Already verified - auto-login if not logged in
if not current_user.is_authenticated:
login_user(user, remember=True)
user.last_login = datetime.now()
user.login_count = (user.login_count or 0) + 1
db.commit()
flash('Witamy ponownie! Zostales automatycznie zalogowany.', 'info')
return redirect(url_for('dashboard'))
flash('Email zostal juz zweryfikowany.', 'info')
return redirect(url_for('dashboard'))
# Verify user (keep token until natural expiry so double-clicks
# and email client prefetch don't show a confusing error)
user.is_verified = True
user.verified_at = datetime.now()
# Auto-login the user after verification
login_user(user, remember=True)
user.last_login = datetime.now()
user.login_count = (user.login_count or 0) + 1
# Log successful verification and login
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
log_audit(db, 'email_verified', 'user', entity_id=user.id, entity_name=user.email,
details={'ip': client_ip})
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': 'email_verification', 'ip': client_ip})
db.commit()
logger.info(f"Email verified and auto-logged in: {user.email}")
flash('Email zweryfikowany! Witamy w Norda Biznes Partner.', 'success')
return redirect(url_for('dashboard'))
except Exception as e:
logger.error(f"Email verification error: {e}")
flash('Wystapil blad podczas weryfikacji.', 'error')
return redirect(url_for('login'))
finally:
db.close()
@bp.route('/resend-verification', methods=['GET', 'POST'])
@limiter.limit("15 per hour")
def resend_verification():
"""Resend email verification link"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/resend_verification.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user and not user.is_verified:
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Update user token
user.verification_token = verification_token
user.verification_token_expires = verification_expires
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, user.name, verification_url)
if success:
logger.info(f"Verification email resent to {email}")
else:
logger.warning(f"Failed to resend verification email to {email}")
logger.info(f"Resend verification token (email failed) for {email}: {verification_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Resend verification token (no email) for {email}: {verification_token[:8]}...")
except Exception as e:
logger.error(f"Error resending verification email: {e}")
logger.info(f"Resend verification token (exception) for {email}: {verification_token[:8]}...")
# Always show same message to prevent email enumeration
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Resend verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/resend_verification.html')