Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Sends email to admin when a user with last_login=NULL successfully sets their password via reset link (first-time activation). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1208 lines
45 KiB
Python
1208 lines
45 KiB
Python
"""
|
|
Auth Routes
|
|
============
|
|
|
|
Authentication routes: login, logout, register, password reset, email verification, 2FA.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import logging
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
|
|
from flask import render_template, request, redirect, url_for, flash, session, current_app
|
|
from flask_login import login_required, login_user, logout_user, current_user
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
from . import bp
|
|
from database import SessionLocal, User, Company, UserBlock, UserCompany, SystemRole
|
|
from utils.helpers import sanitize_input, validate_email, validate_password
|
|
from extensions import limiter
|
|
from security_service import log_audit
|
|
|
|
# Logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Security logger (same name as in app.py for fail2ban integration)
|
|
security_logger = logging.getLogger('security')
|
|
|
|
# Security service availability check
|
|
try:
|
|
from security_service import (
|
|
generate_totp_secret, get_totp_uri, verify_totp,
|
|
generate_backup_codes, verify_backup_code, log_audit
|
|
)
|
|
SECURITY_SERVICE_AVAILABLE = True
|
|
except ImportError:
|
|
SECURITY_SERVICE_AVAILABLE = False
|
|
logger.warning("Security service not available in auth blueprint")
|
|
|
|
|
|
def _send_registration_notification(user_info):
|
|
"""Send email notification when a new user registers"""
|
|
try:
|
|
from email_service import send_email, is_configured
|
|
|
|
if not is_configured():
|
|
logger.warning("Email service not configured - skipping registration notification")
|
|
return
|
|
|
|
notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl')
|
|
if not notify_email:
|
|
return
|
|
|
|
reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
is_member = "TAK" if user_info.get('is_norda_member') else "NIE"
|
|
company_name = user_info.get('company_name', 'Brak przypisanej firmy')
|
|
|
|
subject = f"NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}"
|
|
|
|
body_text = f"""NOWA REJESTRACJA NA NORDABIZNES.PL
|
|
{'='*50}
|
|
|
|
Czas: {reg_time}
|
|
Imie: {user_info.get('name', 'N/A')}
|
|
Email: {user_info.get('email', 'N/A')}
|
|
NIP: {user_info.get('company_nip', 'N/A')}
|
|
Firma: {company_name}
|
|
Czlonek NORDA: {is_member}
|
|
|
|
{'='*50}
|
|
"""
|
|
|
|
send_email(
|
|
to=[notify_email],
|
|
subject=subject,
|
|
body_text=body_text,
|
|
email_type='registration_notification'
|
|
)
|
|
logger.info(f"Registration notification sent for {user_info.get('email')}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send registration notification: {e}")
|
|
|
|
|
|
# ============================================================
|
|
# REGISTRATION
|
|
# ============================================================
|
|
|
|
@bp.route('/register', methods=['GET', 'POST'])
|
|
@limiter.limit("50 per hour;200 per day")
|
|
def register():
|
|
"""User registration"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
password = request.form.get('password', '')
|
|
name = sanitize_input(request.form.get('name', ''), 255)
|
|
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
|
|
|
|
# Validate email
|
|
if not validate_email(email):
|
|
flash('Nieprawidlowy format adresu email.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate password
|
|
password_valid, password_message = validate_password(password)
|
|
if not password_valid:
|
|
flash(password_message, 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate required fields
|
|
if not name or not email or not company_nip:
|
|
flash('Imię, email i NIP firmy są wymagane.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate full name (first + last name)
|
|
if len(name.split()) < 2:
|
|
flash('Podaj imię i nazwisko (np. Jan Kowalski).', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate NIP format and checksum
|
|
if not re.match(r'^\d{10}$', company_nip):
|
|
flash('NIP musi skladac sie z 10 cyfr.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
nip_weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
|
nip_checksum = sum(int(company_nip[i]) * nip_weights[i] for i in range(9)) % 11
|
|
if nip_checksum != int(company_nip[9]):
|
|
flash('Nieprawidlowy NIP - blad sumy kontrolnej. Sprawdz poprawnosc numeru.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check if user exists
|
|
if db.query(User).filter_by(email=email).first():
|
|
flash('Email juz jest zarejestrowany.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Check if company is NORDA member
|
|
is_norda_member = False
|
|
company_id = None
|
|
if company_nip and re.match(r'^\d{10}$', company_nip):
|
|
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
|
|
if company:
|
|
is_norda_member = True
|
|
company_id = company.id
|
|
|
|
# Generate verification token
|
|
verification_token = secrets.token_urlsafe(32)
|
|
verification_expires = datetime.now() + timedelta(hours=24)
|
|
|
|
# Create user
|
|
user = User(
|
|
email=email,
|
|
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
|
|
name=name,
|
|
company_nip=company_nip,
|
|
company_id=company_id,
|
|
company_role='NONE',
|
|
is_norda_member=is_norda_member,
|
|
created_at=datetime.now(),
|
|
is_active=True,
|
|
is_verified=False,
|
|
verification_token=verification_token,
|
|
verification_token_expires=verification_expires
|
|
)
|
|
|
|
db.add(user)
|
|
db.flush() # Get user.id before creating UserCompany
|
|
|
|
# Create user_companies record if company matched
|
|
if company_id:
|
|
uc = UserCompany(user_id=user.id, company_id=company_id, role='NONE', is_primary=True)
|
|
db.add(uc)
|
|
|
|
db.commit()
|
|
|
|
# Build verification URL
|
|
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
|
|
verification_url = f"{base_url}/verify-email/{verification_token}"
|
|
|
|
# Try to send verification email
|
|
try:
|
|
import email_service
|
|
if email_service.is_configured():
|
|
success = email_service.send_welcome_email(email, name, verification_url)
|
|
if success:
|
|
logger.info(f"Verification email sent to {email}")
|
|
else:
|
|
logger.warning(f"Failed to send verification email to {email}")
|
|
logger.info(f"Verification token (email failed) for {email}: {verification_token[:8]}...")
|
|
else:
|
|
logger.warning("Email service not configured")
|
|
logger.info(f"Verification token (no email) for {email}: {verification_token[:8]}...")
|
|
except Exception as e:
|
|
logger.error(f"Error sending verification email: {e}")
|
|
logger.info(f"Verification token (exception) for {email}: {verification_token[:8]}...")
|
|
|
|
logger.info(f"New user registered: {email}")
|
|
|
|
# Send notification to admin about new registration
|
|
try:
|
|
company_name = company.name if company_id and company else None
|
|
_send_registration_notification({
|
|
'name': name,
|
|
'email': email,
|
|
'company_nip': company_nip,
|
|
'company_name': company_name,
|
|
'is_norda_member': is_norda_member
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Failed to send registration notification: {e}")
|
|
|
|
# Store email in session for success page
|
|
session['registered_email'] = email
|
|
return redirect(url_for('auth.registration_success'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Registration error: {e}")
|
|
flash('Wystapil blad podczas rejestracji. Sprobuj ponownie.', 'error')
|
|
return render_template('auth/register.html')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/register.html')
|
|
|
|
|
|
# ============================================================
|
|
# REGISTRATION SUCCESS
|
|
# ============================================================
|
|
|
|
@bp.route('/registration-success')
|
|
def registration_success():
|
|
"""Show registration success page with email verification instructions."""
|
|
# Get email from session (set during registration)
|
|
email = session.pop('registered_email', None)
|
|
|
|
if not email:
|
|
# No email in session - redirect to register
|
|
return redirect(url_for('auth.register'))
|
|
|
|
return render_template('auth/registration_success.html', email=email)
|
|
|
|
|
|
@bp.route('/check-verification-status')
|
|
def check_verification_status():
|
|
"""API endpoint to check if email has been verified (for polling from success page)."""
|
|
from flask import jsonify
|
|
|
|
email = request.args.get('email', '')
|
|
if not email:
|
|
return jsonify({'verified': False, 'error': 'No email provided'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter(User.email == email).first()
|
|
if not user:
|
|
return jsonify({'verified': False, 'error': 'User not found'}), 404
|
|
|
|
if user.is_verified:
|
|
return jsonify({'verified': True, 'redirect': url_for('dashboard')})
|
|
|
|
return jsonify({'verified': False})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# LOGIN
|
|
# ============================================================
|
|
|
|
@bp.route('/login', methods=['GET', 'POST'])
|
|
@limiter.limit("1000 per hour" if os.getenv('FLASK_ENV') == 'development' else "60 per minute;500 per hour")
|
|
def login():
|
|
"""User login"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
password = request.form.get('password', '')
|
|
remember = request.form.get('remember', False) == 'on'
|
|
|
|
# Basic validation
|
|
if not email or not password:
|
|
flash('Email i haslo sa wymagane.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(email=email).first()
|
|
|
|
# Get client IP for logging
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
if client_ip and ',' in client_ip:
|
|
client_ip = client_ip.split(',')[0].strip()
|
|
|
|
# Check if account is locked
|
|
if user and user.locked_until and user.locked_until > datetime.now():
|
|
remaining = (user.locked_until - datetime.now()).seconds // 60 + 1
|
|
security_logger.warning(f"LOCKED_ACCOUNT ip={client_ip} email={email}")
|
|
flash(f'Konto tymczasowo zablokowane. Sprobuj za {remaining} minut.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
if not user or not check_password_hash(user.password_hash, password):
|
|
logger.warning(f"Failed login attempt for: {email}")
|
|
security_logger.warning(f"FAILED_LOGIN ip={client_ip} email={email}")
|
|
|
|
# Log failed login to audit
|
|
log_audit(db, 'login_failed', 'user', entity_name=email,
|
|
details={'reason': 'invalid_credentials', 'ip': client_ip})
|
|
db.commit()
|
|
|
|
# Increment failed attempts if user exists
|
|
if user:
|
|
user.failed_login_attempts = (user.failed_login_attempts or 0) + 1
|
|
# Lock account after 5 failed attempts (30 min lockout)
|
|
if user.failed_login_attempts >= 5:
|
|
user.locked_until = datetime.now() + timedelta(minutes=30)
|
|
security_logger.warning(f"ACCOUNT_LOCKED ip={client_ip} email={email} attempts={user.failed_login_attempts}")
|
|
db.commit()
|
|
flash('Zbyt wiele nieudanych prob. Konto zablokowane na 30 minut.', 'error')
|
|
return render_template('auth/login.html')
|
|
db.commit()
|
|
|
|
flash('Nieprawidlowy email lub haslo.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
if not user.is_active:
|
|
flash('Konto zostalo dezaktywowane.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
# Require email verification
|
|
if not user.is_verified:
|
|
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
# Reset failed attempts on successful login
|
|
user.failed_login_attempts = 0
|
|
user.locked_until = None
|
|
|
|
# Check if user has 2FA enabled
|
|
if user.totp_enabled and SECURITY_SERVICE_AVAILABLE:
|
|
# Store pending login in session for 2FA verification
|
|
session['2fa_pending_user_id'] = user.id
|
|
session['2fa_remember'] = remember
|
|
next_page = request.args.get('next')
|
|
if next_page and next_page.startswith('/'):
|
|
session['2fa_next'] = next_page
|
|
db.commit()
|
|
logger.info(f"2FA required for user: {email}")
|
|
return redirect(url_for('auth.verify_2fa'))
|
|
|
|
# No 2FA - login directly
|
|
login_user(user, remember=remember)
|
|
user.last_login = datetime.now()
|
|
|
|
# Log successful login to audit
|
|
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
|
|
details={'method': 'password', 'ip': client_ip})
|
|
db.commit()
|
|
|
|
logger.info(f"User logged in: {email}")
|
|
|
|
next_page = request.args.get('next')
|
|
# Prevent open redirect vulnerability
|
|
if next_page and not next_page.startswith('/'):
|
|
next_page = None
|
|
|
|
return redirect(next_page or url_for('dashboard'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
flash('Wystapil blad podczas logowania. Sprobuj ponownie.', 'error')
|
|
return render_template('auth/login.html')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/login.html')
|
|
|
|
|
|
# ============================================================
|
|
# LOGOUT
|
|
# ============================================================
|
|
|
|
@bp.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
"""User logout"""
|
|
# Log logout to audit
|
|
db = SessionLocal()
|
|
try:
|
|
log_audit(db, 'logout', 'user', entity_id=current_user.id,
|
|
entity_name=current_user.email)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
# Clear 2FA session flag
|
|
session.pop('2fa_verified', None)
|
|
logout_user()
|
|
flash('Wylogowano pomyslnie.', 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
# ============================================================
|
|
# TWO-FACTOR AUTHENTICATION
|
|
# ============================================================
|
|
|
|
@bp.route('/verify-2fa', methods=['GET', 'POST'])
|
|
@limiter.limit("10 per minute")
|
|
def verify_2fa():
|
|
"""Verify 2FA code during login"""
|
|
# Check if there's a pending 2FA login
|
|
pending_user_id = session.get('2fa_pending_user_id')
|
|
if not pending_user_id:
|
|
flash('Sesja wygasla. Zaloguj sie ponownie.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
code = request.form.get('code', '').strip()
|
|
use_backup = request.form.get('use_backup', False)
|
|
|
|
if not code:
|
|
flash('Wprowadz kod weryfikacyjny.', 'error')
|
|
return render_template('auth/verify_2fa.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).get(pending_user_id)
|
|
if not user:
|
|
session.pop('2fa_pending_user_id', None)
|
|
flash('Uzytkownik nie istnieje.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
# Verify code
|
|
if SECURITY_SERVICE_AVAILABLE:
|
|
if use_backup:
|
|
valid = verify_backup_code(user, code, db)
|
|
else:
|
|
valid = verify_totp(user, code)
|
|
else:
|
|
valid = False
|
|
|
|
if valid:
|
|
# Clear pending login and log in
|
|
session.pop('2fa_pending_user_id', None)
|
|
remember = session.pop('2fa_remember', False)
|
|
next_page = session.pop('2fa_next', None)
|
|
|
|
login_user(user, remember=remember)
|
|
session['2fa_verified'] = True
|
|
user.last_login = datetime.now()
|
|
|
|
# Log successful 2FA login to audit
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
if client_ip and ',' in client_ip:
|
|
client_ip = client_ip.split(',')[0].strip()
|
|
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
|
|
details={'method': '2fa', 'ip': client_ip})
|
|
db.commit()
|
|
|
|
logger.info(f"User logged in with 2FA: {user.email}")
|
|
flash('Zalogowano pomyslnie.', 'success')
|
|
|
|
return redirect(next_page or url_for('dashboard'))
|
|
else:
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
if client_ip and ',' in client_ip:
|
|
client_ip = client_ip.split(',')[0].strip()
|
|
security_logger.warning(f"INVALID_2FA ip={client_ip} user_id={pending_user_id}")
|
|
flash('Nieprawidlowy kod weryfikacyjny.', 'error')
|
|
|
|
except Exception as e:
|
|
logger.error(f"2FA verification error: {e}")
|
|
flash('Wystapil blad. Sprobuj ponownie.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/verify_2fa.html')
|
|
|
|
|
|
@bp.route('/settings/2fa', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_2fa():
|
|
"""2FA settings - enable/disable"""
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).get(current_user.id)
|
|
|
|
if request.method == 'POST':
|
|
action = request.form.get('action')
|
|
|
|
if action == 'setup':
|
|
# Generate new secret
|
|
if SECURITY_SERVICE_AVAILABLE:
|
|
secret = generate_totp_secret()
|
|
if secret:
|
|
user.totp_secret = secret
|
|
user.totp_enabled = False # Not enabled until verified
|
|
db.commit()
|
|
qr_uri = get_totp_uri(user)
|
|
return render_template('auth/2fa_setup.html',
|
|
qr_uri=qr_uri,
|
|
secret=secret)
|
|
flash('Blad konfiguracji 2FA.', 'error')
|
|
|
|
elif action == 'verify_setup':
|
|
# Verify the setup code and enable 2FA
|
|
code = request.form.get('code', '').strip()
|
|
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
|
|
user.totp_enabled = True
|
|
# Generate backup codes
|
|
backup_codes = generate_backup_codes(8)
|
|
user.totp_backup_codes = backup_codes
|
|
db.commit()
|
|
|
|
# Log audit
|
|
if SECURITY_SERVICE_AVAILABLE:
|
|
log_audit(db, '2fa.enabled', 'user', user.id, user.email)
|
|
db.commit()
|
|
|
|
logger.info(f"2FA enabled for user: {user.email}")
|
|
return render_template('auth/2fa_backup_codes.html',
|
|
backup_codes=backup_codes)
|
|
else:
|
|
flash('Nieprawidlowy kod. Sprobuj ponownie.', 'error')
|
|
qr_uri = get_totp_uri(user)
|
|
return render_template('auth/2fa_setup.html',
|
|
qr_uri=qr_uri,
|
|
secret=user.totp_secret)
|
|
|
|
elif action == 'disable':
|
|
# Require current code to disable
|
|
code = request.form.get('code', '').strip()
|
|
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
|
|
user.totp_enabled = False
|
|
user.totp_secret = None
|
|
user.totp_backup_codes = None
|
|
db.commit()
|
|
|
|
# Log audit
|
|
if SECURITY_SERVICE_AVAILABLE:
|
|
log_audit(db, '2fa.disabled', 'user', user.id, user.email)
|
|
db.commit()
|
|
|
|
logger.info(f"2FA disabled for user: {user.email}")
|
|
flash('Uwierzytelnianie dwuskladnikowe zostalo wylaczone.', 'success')
|
|
else:
|
|
flash('Nieprawidlowy kod. Nie mozna wylaczyc 2FA.', 'error')
|
|
|
|
elif action == 'regenerate_backup':
|
|
# Require current code to regenerate backup codes
|
|
code = request.form.get('code', '').strip()
|
|
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
|
|
backup_codes = generate_backup_codes(8)
|
|
user.totp_backup_codes = backup_codes
|
|
db.commit()
|
|
|
|
logger.info(f"Backup codes regenerated for user: {user.email}")
|
|
return render_template('auth/2fa_backup_codes.html',
|
|
backup_codes=backup_codes)
|
|
else:
|
|
flash('Nieprawidlowy kod. Nie mozna wygenerowac kodow.', 'error')
|
|
|
|
return render_template('auth/2fa_settings.html',
|
|
totp_enabled=user.totp_enabled,
|
|
backup_codes_count=len(user.totp_backup_codes) if user.totp_backup_codes else 0)
|
|
|
|
except Exception as e:
|
|
logger.error(f"2FA settings error: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# PRIVACY SETTINGS
|
|
# ============================================================
|
|
|
|
@bp.route('/settings/privacy', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_privacy():
|
|
"""Privacy settings - control visibility of phone and email"""
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).get(current_user.id)
|
|
|
|
if request.method == 'POST':
|
|
# Update privacy settings
|
|
user.privacy_show_phone = request.form.get('show_phone') == 'on'
|
|
user.privacy_show_email = request.form.get('show_email') == 'on'
|
|
|
|
# Update contact preferences
|
|
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
|
|
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
|
|
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
|
|
user.contact_note = request.form.get('contact_note', '').strip() or None
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Privacy settings updated for user: {user.email}")
|
|
flash('Ustawienia prywatnosci zostaly zapisane.', 'success')
|
|
return redirect(url_for('auth.settings_privacy'))
|
|
|
|
return render_template('settings/privacy.html',
|
|
user=user,
|
|
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
|
|
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Privacy settings error: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# BLOCK SETTINGS
|
|
# ============================================================
|
|
|
|
@bp.route('/settings/blocks', methods=['GET'])
|
|
@login_required
|
|
def settings_blocks():
|
|
"""Manage blocked users"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get list of blocked users
|
|
blocks = db.query(UserBlock).filter(
|
|
UserBlock.user_id == current_user.id
|
|
).all()
|
|
|
|
# Get all users for blocking (exclude self and already blocked)
|
|
blocked_ids = [b.blocked_user_id for b in blocks]
|
|
blocked_ids.append(current_user.id)
|
|
|
|
available_users = db.query(User).filter(
|
|
User.id.notin_(blocked_ids),
|
|
User.is_active == True
|
|
).order_by(User.name).all()
|
|
|
|
return render_template('settings/blocks.html',
|
|
blocks=blocks,
|
|
available_users=available_users)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/settings/blocks/add', methods=['POST'])
|
|
@login_required
|
|
def settings_blocks_add():
|
|
"""Block a user"""
|
|
user_id = request.form.get('user_id', type=int)
|
|
reason = request.form.get('reason', '').strip()
|
|
|
|
if not user_id or user_id == current_user.id:
|
|
flash('Nieprawidlowy uzytkownik.', 'error')
|
|
return redirect(url_for('auth.settings_blocks'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check if already blocked
|
|
existing = db.query(UserBlock).filter(
|
|
UserBlock.user_id == current_user.id,
|
|
UserBlock.blocked_user_id == user_id
|
|
).first()
|
|
|
|
if existing:
|
|
flash('Ten uzytkownik jest juz zablokowany.', 'warning')
|
|
return redirect(url_for('auth.settings_blocks'))
|
|
|
|
block = UserBlock(
|
|
user_id=current_user.id,
|
|
blocked_user_id=user_id,
|
|
reason=reason if reason else None
|
|
)
|
|
db.add(block)
|
|
db.commit()
|
|
|
|
logger.info(f"User {current_user.id} blocked user {user_id}")
|
|
flash('Uzytkownik zostal zablokowany.', 'success')
|
|
except Exception as e:
|
|
logger.error(f"Error blocking user: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return redirect(url_for('auth.settings_blocks'))
|
|
|
|
|
|
@bp.route('/settings/blocks/remove/<int:block_id>', methods=['POST'])
|
|
@login_required
|
|
def settings_blocks_remove(block_id):
|
|
"""Unblock a user"""
|
|
db = SessionLocal()
|
|
try:
|
|
block = db.query(UserBlock).filter(
|
|
UserBlock.id == block_id,
|
|
UserBlock.user_id == current_user.id
|
|
).first()
|
|
|
|
if not block:
|
|
flash('Blokada nie istnieje.', 'error')
|
|
return redirect(url_for('auth.settings_blocks'))
|
|
|
|
blocked_user_id = block.blocked_user_id
|
|
db.delete(block)
|
|
db.commit()
|
|
|
|
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
|
|
flash('Uzytkownik zostal odblokowany.', 'success')
|
|
except Exception as e:
|
|
logger.error(f"Error unblocking user: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return redirect(url_for('auth.settings_blocks'))
|
|
|
|
|
|
# ============================================================
|
|
# MOJE KONTO - User Account Settings
|
|
# ============================================================
|
|
|
|
@bp.route('/konto')
|
|
@login_required
|
|
def konto_dane():
|
|
"""User profile - edit personal data"""
|
|
return render_template('konto/dane.html')
|
|
|
|
|
|
@bp.route('/konto', methods=['POST'])
|
|
@login_required
|
|
def konto_dane_post():
|
|
"""Save user profile changes"""
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(id=current_user.id).first()
|
|
if user:
|
|
name = sanitize_input(request.form.get('name', ''), 255)
|
|
phone = sanitize_input(request.form.get('phone', ''), 50)
|
|
|
|
user.name = name if name else None
|
|
user.phone = phone if phone else None
|
|
db.commit()
|
|
|
|
# Update current_user session
|
|
current_user.name = user.name
|
|
current_user.phone = user.phone
|
|
|
|
logger.info(f"Profile updated for user: {user.email}")
|
|
flash('Dane zostaly zapisane.', 'success')
|
|
except Exception as e:
|
|
logger.error(f"Profile update error: {e}")
|
|
flash('Wystapil blad podczas zapisywania.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return redirect(url_for('auth.konto_dane'))
|
|
|
|
|
|
@bp.route('/konto/prywatnosc', methods=['GET', 'POST'])
|
|
@login_required
|
|
def konto_prywatnosc():
|
|
"""Privacy settings - control visibility of phone and email"""
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(id=current_user.id).first()
|
|
|
|
if request.method == 'POST':
|
|
user.privacy_show_phone = request.form.get('show_phone') == 'on'
|
|
user.privacy_show_email = request.form.get('show_email') == 'on'
|
|
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
|
|
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
|
|
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
|
|
user.notify_email_messages = request.form.get('notify_email_messages') == 'on'
|
|
db.commit()
|
|
|
|
logger.info(f"Privacy settings updated for user: {user.email}")
|
|
flash('Ustawienia prywatności zostały zapisane.', 'success')
|
|
return redirect(url_for('auth.konto_prywatnosc'))
|
|
|
|
return render_template('konto/prywatnosc.html',
|
|
user=user,
|
|
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
|
|
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
|
|
except Exception as e:
|
|
logger.error(f"Privacy settings error: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
return redirect(url_for('auth.konto_dane'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/konto/bezpieczenstwo')
|
|
@login_required
|
|
def konto_bezpieczenstwo():
|
|
"""Security settings - 2FA, password"""
|
|
return render_template('konto/bezpieczenstwo.html')
|
|
|
|
|
|
@bp.route('/konto/blokady')
|
|
@login_required
|
|
def konto_blokady():
|
|
"""User blocks management"""
|
|
db = SessionLocal()
|
|
try:
|
|
blocks = db.query(UserBlock).filter_by(user_id=current_user.id).all()
|
|
blocked_ids = [b.blocked_user_id for b in blocks]
|
|
blocked_ids.append(current_user.id)
|
|
|
|
available_users = db.query(User).filter(
|
|
User.id.notin_(blocked_ids),
|
|
User.is_active == True
|
|
).order_by(User.name).all()
|
|
|
|
return render_template('konto/blokady.html',
|
|
blocks=blocks,
|
|
available_users=available_users)
|
|
except Exception as e:
|
|
logger.error(f"Blocks page error: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
return redirect(url_for('auth.konto_dane'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/konto/blokady/dodaj', methods=['POST'])
|
|
@login_required
|
|
def konto_blokady_dodaj():
|
|
"""Block a user"""
|
|
db = SessionLocal()
|
|
try:
|
|
user_id = request.form.get('user_id', type=int)
|
|
if not user_id or user_id == current_user.id:
|
|
flash('Nieprawidlowy uzytkownik.', 'error')
|
|
return redirect(url_for('auth.konto_blokady'))
|
|
|
|
existing = db.query(UserBlock).filter_by(
|
|
user_id=current_user.id,
|
|
blocked_user_id=user_id
|
|
).first()
|
|
|
|
if existing:
|
|
flash('Ten uzytkownik jest juz zablokowany.', 'info')
|
|
return redirect(url_for('auth.konto_blokady'))
|
|
|
|
block = UserBlock(user_id=current_user.id, blocked_user_id=user_id)
|
|
db.add(block)
|
|
db.commit()
|
|
|
|
logger.info(f"User {current_user.id} blocked user {user_id}")
|
|
flash('Uzytkownik zostal zablokowany.', 'success')
|
|
except Exception as e:
|
|
logger.error(f"Error blocking user: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return redirect(url_for('auth.konto_blokady'))
|
|
|
|
|
|
@bp.route('/konto/integracje')
|
|
@login_required
|
|
def konto_integracje():
|
|
"""OAuth integrations page for MANAGER+.
|
|
|
|
Allows connecting Google/Meta accounts for enriched audit data.
|
|
"""
|
|
if not current_user.has_role(SystemRole.MANAGER):
|
|
flash('Ta strona wymaga uprawnień kadry zarządzającej.', 'error')
|
|
return redirect(url_for('auth.konto_dane'))
|
|
if not current_user.company_id:
|
|
flash('Musisz byc przypisany do firmy, aby korzystac z integracji.', 'info')
|
|
return redirect(url_for('auth.konto_dane'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == current_user.company_id).first()
|
|
if not company:
|
|
flash('Firma nie istnieje.', 'error')
|
|
return redirect(url_for('auth.konto_dane'))
|
|
|
|
from oauth_service import OAuthService
|
|
oauth = OAuthService()
|
|
connections = oauth.get_connected_services(db, current_user.company_id)
|
|
|
|
oauth_available = {
|
|
'google': bool(oauth.google_client_id),
|
|
'meta': bool(oauth.meta_app_id),
|
|
}
|
|
|
|
return render_template(
|
|
'admin/company_settings.html',
|
|
company=company,
|
|
connections=connections,
|
|
oauth_available=oauth_available,
|
|
is_user_view=True,
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/konto/blokady/usun/<int:block_id>', methods=['POST'])
|
|
@login_required
|
|
def konto_blokady_usun(block_id):
|
|
"""Unblock a user"""
|
|
db = SessionLocal()
|
|
try:
|
|
block = db.query(UserBlock).filter(
|
|
UserBlock.id == block_id,
|
|
UserBlock.user_id == current_user.id
|
|
).first()
|
|
|
|
if not block:
|
|
flash('Blokada nie istnieje.', 'error')
|
|
return redirect(url_for('auth.konto_blokady'))
|
|
|
|
blocked_user_id = block.blocked_user_id
|
|
db.delete(block)
|
|
db.commit()
|
|
|
|
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
|
|
flash('Uzytkownik zostal odblokowany.', 'success')
|
|
except Exception as e:
|
|
logger.error(f"Error unblocking user: {e}")
|
|
flash('Wystapil blad.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return redirect(url_for('auth.konto_blokady'))
|
|
|
|
|
|
# ============================================================
|
|
# PASSWORD RESET
|
|
# ============================================================
|
|
|
|
@bp.route('/forgot-password', methods=['GET', 'POST'])
|
|
@limiter.limit("20 per hour")
|
|
def forgot_password():
|
|
"""Request password reset"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
|
|
if not validate_email(email):
|
|
flash('Nieprawidlowy format adresu email.', 'error')
|
|
return render_template('auth/forgot_password.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(email=email, is_active=True).first()
|
|
|
|
if user:
|
|
# Generate reset token
|
|
reset_token = secrets.token_urlsafe(32)
|
|
reset_expires = datetime.now() + timedelta(hours=1)
|
|
|
|
# Save token to database
|
|
user.reset_token = reset_token
|
|
user.reset_token_expires = reset_expires
|
|
db.commit()
|
|
|
|
# Build reset URL
|
|
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
|
|
reset_url = f"{base_url}/reset-password/{reset_token}"
|
|
|
|
# Try to send email
|
|
try:
|
|
import email_service
|
|
if email_service.is_configured():
|
|
success = email_service.send_password_reset_email(email, reset_url)
|
|
if success:
|
|
logger.info(f"Password reset email sent to {email}")
|
|
else:
|
|
logger.warning(f"Failed to send password reset email to {email}")
|
|
logger.info(f"Reset token (email failed) for {email}: {reset_token[:8]}...")
|
|
else:
|
|
logger.warning("Email service not configured")
|
|
logger.info(f"Reset token (no email) for {email}: {reset_token[:8]}...")
|
|
except Exception as e:
|
|
logger.error(f"Error sending reset email: {e}")
|
|
logger.info(f"Reset token (exception) for {email}: {reset_token[:8]}...")
|
|
|
|
# Always show same message to prevent email enumeration
|
|
flash('Jesli email istnieje w systemie, instrukcje resetowania hasla zostaly wyslane.', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Password reset error: {e}")
|
|
flash('Wystapil blad. Sprobuj ponownie.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/forgot_password.html')
|
|
|
|
|
|
@bp.route('/reset-password/<token>', methods=['GET', 'POST'])
|
|
@limiter.limit("30 per hour")
|
|
def reset_password(token):
|
|
"""Reset password with token"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find user with valid token
|
|
user = db.query(User).filter(
|
|
User.reset_token == token,
|
|
User.reset_token_expires > datetime.now(),
|
|
User.is_active == True
|
|
).first()
|
|
|
|
if not user:
|
|
flash('Link resetowania hasla jest nieprawidlowy lub wygasl.', 'error')
|
|
return redirect(url_for('auth.forgot_password'))
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password', '')
|
|
password_confirm = request.form.get('password_confirm', '')
|
|
|
|
# Validate passwords match
|
|
if password != password_confirm:
|
|
flash('Hasla nie sa identyczne.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
# Validate password strength
|
|
password_valid, password_message = validate_password(password)
|
|
if not password_valid:
|
|
flash(password_message, 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
# Update password and clear reset token
|
|
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
|
|
user.reset_token = None
|
|
user.reset_token_expires = None
|
|
|
|
# Auto-verify email - user proved inbox access by using reset link
|
|
if not user.is_verified:
|
|
user.is_verified = True
|
|
user.verified_at = datetime.now()
|
|
user.verification_token = None
|
|
user.verification_token_expires = None
|
|
logger.info(f"Email auto-verified via password reset for {user.email}")
|
|
|
|
# Notify admin if this is a first-time activation (user never logged in)
|
|
first_activation = user.last_login is None
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Password reset successful for {user.email}")
|
|
|
|
if first_activation:
|
|
try:
|
|
import email_service
|
|
email_service.send_email(
|
|
to=['maciej.pienczyn@inpi.pl'],
|
|
subject=f'Aktywacja konta: {user.name}',
|
|
body_text=f'{user.name} ({user.email}) właśnie ustawił(a) hasło i aktywował(a) swoje konto w portalu Norda Biznes Partner.',
|
|
email_type='admin_notification',
|
|
bcc=[]
|
|
)
|
|
except Exception as notify_err:
|
|
logger.warning(f"Failed to send activation notification: {notify_err}")
|
|
|
|
flash('Haslo zostalo zmienione. Mozesz sie teraz zalogowac.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Reset password error: {e}")
|
|
flash('Wystapil blad. Sprobuj ponownie.', 'error')
|
|
return redirect(url_for('auth.forgot_password'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# EMAIL VERIFICATION
|
|
# ============================================================
|
|
|
|
@bp.route('/verify-email/<token>')
|
|
def verify_email(token):
|
|
"""Verify email address with token"""
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter(
|
|
User.verification_token == token,
|
|
User.verification_token_expires > datetime.now(),
|
|
User.is_active == True
|
|
).first()
|
|
|
|
if not user:
|
|
flash('Link weryfikacyjny jest nieprawidlowy lub wygasl.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if user.is_verified:
|
|
# Already verified - auto-login if not logged in
|
|
if not current_user.is_authenticated:
|
|
login_user(user, remember=True)
|
|
user.last_login = datetime.now()
|
|
db.commit()
|
|
flash('Witamy ponownie! Zostales automatycznie zalogowany.', 'info')
|
|
return redirect(url_for('dashboard'))
|
|
flash('Email zostal juz zweryfikowany.', 'info')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Verify user
|
|
user.is_verified = True
|
|
user.verified_at = datetime.now()
|
|
user.verification_token = None
|
|
user.verification_token_expires = None
|
|
|
|
# Auto-login the user after verification
|
|
login_user(user, remember=True)
|
|
user.last_login = datetime.now()
|
|
|
|
# Log successful verification and login
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
if client_ip and ',' in client_ip:
|
|
client_ip = client_ip.split(',')[0].strip()
|
|
log_audit(db, 'email_verified', 'user', entity_id=user.id, entity_name=user.email,
|
|
details={'ip': client_ip})
|
|
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
|
|
details={'method': 'email_verification', 'ip': client_ip})
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Email verified and auto-logged in: {user.email}")
|
|
flash('Email zweryfikowany! Witamy w Norda Biznes Partner.', 'success')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Email verification error: {e}")
|
|
flash('Wystapil blad podczas weryfikacji.', 'error')
|
|
return redirect(url_for('login'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/resend-verification', methods=['GET', 'POST'])
|
|
@limiter.limit("15 per hour")
|
|
def resend_verification():
|
|
"""Resend email verification link"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
|
|
if not validate_email(email):
|
|
flash('Nieprawidlowy format adresu email.', 'error')
|
|
return render_template('auth/resend_verification.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(email=email, is_active=True).first()
|
|
|
|
if user and not user.is_verified:
|
|
# Generate new verification token
|
|
verification_token = secrets.token_urlsafe(32)
|
|
verification_expires = datetime.now() + timedelta(hours=24)
|
|
|
|
# Update user token
|
|
user.verification_token = verification_token
|
|
user.verification_token_expires = verification_expires
|
|
db.commit()
|
|
|
|
# Build verification URL
|
|
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
|
|
verification_url = f"{base_url}/verify-email/{verification_token}"
|
|
|
|
# Try to send email
|
|
try:
|
|
import email_service
|
|
if email_service.is_configured():
|
|
success = email_service.send_welcome_email(email, user.name, verification_url)
|
|
if success:
|
|
logger.info(f"Verification email resent to {email}")
|
|
else:
|
|
logger.warning(f"Failed to resend verification email to {email}")
|
|
logger.info(f"Resend verification token (email failed) for {email}: {verification_token[:8]}...")
|
|
else:
|
|
logger.warning("Email service not configured")
|
|
logger.info(f"Resend verification token (no email) for {email}: {verification_token[:8]}...")
|
|
except Exception as e:
|
|
logger.error(f"Error resending verification email: {e}")
|
|
logger.info(f"Resend verification token (exception) for {email}: {verification_token[:8]}...")
|
|
|
|
# Always show same message to prevent email enumeration
|
|
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Resend verification error: {e}")
|
|
flash('Wystapil blad. Sprobuj ponownie.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/resend_verification.html')
|