nordabiz/app.py.backup.security.20260109_150801
Maciej Pienczyn 8ee5945ccd fix: Handle NULL views_count in forum and classifieds
- Forum topics and classifieds now handle NULL views_count gracefully
- Prevents TypeError when incrementing view counter
2026-01-11 06:03:13 +01:00

6454 lines
221 KiB
Python

#!/usr/bin/env python3
"""
Norda Biznes Hub - Flask Application
====================================
Main Flask application for Norda Biznes company directory with AI chat.
Features:
- User authentication with email confirmation
- Company directory with advanced search
- AI chat assistant powered by Google Gemini
- PostgreSQL database integration
- Analytics dashboard for chat insights
Author: Norda Biznes Development Team
Created: 2025-11-23
"""
import os
import logging
import secrets
import re
import json
from collections import deque
from datetime import datetime, timedelta
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session, Response
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_wtf.csrf import CSRFProtect
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.security import generate_password_hash, check_password_hash
from dotenv import load_dotenv
# Load environment variables (override any existing env vars)
# Try .env first, then nordabiz_config.txt for production flexibility
import os
if os.path.exists('.env'):
load_dotenv('.env', override=True)
elif os.path.exists('nordabiz_config.txt'):
load_dotenv('nordabiz_config.txt', override=True)
else:
load_dotenv(override=True)
# Configure logging with in-memory buffer for debug panel
class DebugLogHandler(logging.Handler):
"""Custom handler that stores logs in memory for real-time viewing"""
def __init__(self, max_logs=500):
super().__init__()
self.logs = deque(maxlen=max_logs)
def emit(self, record):
log_entry = {
'timestamp': datetime.now().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': self.format(record),
'module': record.module,
'funcName': record.funcName,
'lineno': record.lineno
}
self.logs.append(log_entry)
# Create debug handler
debug_handler = DebugLogHandler(max_logs=500)
debug_handler.setFormatter(logging.Formatter('%(message)s'))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Add debug handler to root logger
logging.getLogger().addHandler(debug_handler)
logger = logging.getLogger(__name__)
# Import database models
from database import (
init_db,
SessionLocal,
User,
Company,
Category,
Service,
Competency,
CompanyDigitalMaturity,
CompanyWebsiteAnalysis,
CompanyQualityTracking,
CompanyWebsiteContent,
CompanyAIInsights,
CompanyEvent,
CompanySocialMedia,
CompanyContact,
AIChatConversation,
AIChatMessage,
AIChatFeedback,
AIAPICostLog,
ForumTopic,
ForumReply,
NordaEvent,
EventAttendee,
PrivateMessage,
Classified,
UserNotification,
CompanyRecommendation,
MembershipFee,
MembershipFeeConfig
)
# Import services
import gemini_service
from nordabiz_chat import NordaBizChatEngine
from search_service import search_companies
import krs_api_service
# News service for fetching company news
try:
from news_service import NewsService, get_news_service, init_news_service
NEWS_SERVICE_AVAILABLE = True
except ImportError:
NEWS_SERVICE_AVAILABLE = False
logger.warning("News service not available")
# SEO audit components for triggering audits via API
import sys
_scripts_path = os.path.join(os.path.dirname(__file__), 'scripts')
if _scripts_path not in sys.path:
sys.path.insert(0, _scripts_path)
try:
from seo_audit import SEOAuditor, SEO_AUDIT_VERSION
SEO_AUDIT_AVAILABLE = True
except ImportError as e:
SEO_AUDIT_AVAILABLE = False
logger.warning(f"SEO audit service not available: {e}")
# GBP (Google Business Profile) audit service
try:
from gbp_audit_service import (
GBPAuditService,
audit_company as gbp_audit_company,
get_company_audit as gbp_get_company_audit,
fetch_google_business_data as gbp_fetch_google_data
)
GBP_AUDIT_AVAILABLE = True
GBP_AUDIT_VERSION = '1.0'
except ImportError as e:
GBP_AUDIT_AVAILABLE = False
GBP_AUDIT_VERSION = None
logger.warning(f"GBP audit service not available: {e}")
# Initialize Flask app
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
# Security configurations
app.config['WTF_CSRF_ENABLED'] = True
app.config['WTF_CSRF_TIME_LIMIT'] = None # No time limit for CSRF tokens
app.config['SESSION_COOKIE_SECURE'] = os.getenv('FLASK_ENV') != 'development' # HTTPS only in production
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Template filters
@app.template_filter('ensure_url')
def ensure_url_filter(url):
"""Ensure URL has http:// or https:// scheme"""
if url and not url.startswith(('http://', 'https://')):
return f'https://{url}'
return url
# Initialize CSRF protection
csrf = CSRFProtect(app)
# Initialize rate limiter
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="memory://"
)
# Initialize database
init_db()
# Initialize Login Manager
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony.'
# Initialize Gemini service
try:
gemini_service.init_gemini_service(model='flash-2.0') # Gemini 2.0 Flash (DARMOWY w preview)
logger.info("Gemini service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gemini service: {e}")
@login_manager.user_loader
def load_user(user_id):
"""Load user from database"""
db = SessionLocal()
try:
return db.query(User).filter_by(id=int(user_id)).first()
finally:
db.close()
# ============================================================
# TEMPLATE CONTEXT PROCESSORS
# ============================================================
@app.context_processor
def inject_globals():
"""Inject global variables into all templates"""
return {
'current_year': datetime.now().year
}
@app.context_processor
def inject_notifications():
"""Inject unread notifications count into all templates"""
if current_user.is_authenticated:
db = SessionLocal()
try:
unread_count = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
return {'unread_notifications_count': unread_count}
finally:
db.close()
return {'unread_notifications_count': 0}
# ============================================================
# NOTIFICATION HELPERS
# ============================================================
def create_notification(user_id, title, message, notification_type='info',
related_type=None, related_id=None, action_url=None):
"""
Create a notification for a user.
Args:
user_id: ID of the user to notify
title: Notification title
message: Notification message/body
notification_type: Type of notification (news, system, message, event, alert)
related_type: Type of related entity (company_news, event, message, etc.)
related_id: ID of the related entity
action_url: URL to navigate when notification is clicked
Returns:
UserNotification object or None on error
"""
db = SessionLocal()
try:
notification = UserNotification(
user_id=user_id,
title=title,
message=message,
notification_type=notification_type,
related_type=related_type,
related_id=related_id,
action_url=action_url
)
db.add(notification)
db.commit()
db.refresh(notification)
logger.info(f"Created notification for user {user_id}: {title}")
return notification
except Exception as e:
logger.error(f"Error creating notification: {e}")
db.rollback()
return None
finally:
db.close()
def create_news_notification(company_id, news_id, news_title):
"""
Create notification for company owner when their news is approved.
Args:
company_id: ID of the company
news_id: ID of the approved news
news_title: Title of the news
"""
db = SessionLocal()
try:
# Find users associated with this company
users = db.query(User).filter(
User.company_id == company_id,
User.is_active == True
).all()
for user in users:
create_notification(
user_id=user.id,
title="Nowa aktualnosc o Twojej firmie",
message=f"Aktualnosc '{news_title}' zostala zatwierdzona i jest widoczna na profilu firmy.",
notification_type='news',
related_type='company_news',
related_id=news_id,
action_url=f"/company/{company_id}"
)
finally:
db.close()
# ============================================================
# SECURITY MIDDLEWARE & HELPERS
# ============================================================
@app.after_request
def set_security_headers(response):
"""Add security headers to all responses"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Content Security Policy
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
"img-src 'self' data: https:; "
"font-src 'self' https://cdn.jsdelivr.net https://fonts.gstatic.com; "
"connect-src 'self'"
)
response.headers['Content-Security-Policy'] = csp
return response
def validate_email(email):
"""Validate email format"""
if not email or len(email) > 255:
return False
# RFC 5322 compliant email regex (simplified)
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_password(password):
"""
Validate password strength
Requirements:
- Minimum 8 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one digit
"""
if not password or len(password) < 8:
return False, "Hasło musi mieć minimum 8 znaków"
if not re.search(r'[A-Z]', password):
return False, "Hasło musi zawierać przynajmniej jedną wielką literę"
if not re.search(r'[a-z]', password):
return False, "Hasło musi zawierać przynajmniej jedną małą literę"
if not re.search(r'\d', password):
return False, "Hasło musi zawierać przynajmniej jedną cyfrę"
return True, "OK"
def sanitize_input(text, max_length=1000):
"""Sanitize user input - remove potentially dangerous characters"""
if not text:
return ""
# Remove null bytes
text = text.replace('\x00', '')
# Trim to max length
text = text[:max_length]
# Strip whitespace
text = text.strip()
return text
def get_free_tier_usage():
"""
Get today's Gemini API usage for free tier tracking.
Returns:
Dict with requests_today and tokens_today
"""
from datetime import date
from sqlalchemy import func
db = SessionLocal()
try:
today = date.today()
result = db.query(
func.count(AIAPICostLog.id).label('requests'),
func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'gemini'
).first()
return {
'requests_today': result.requests or 0,
'tokens_today': int(result.tokens or 0)
}
except Exception as e:
logger.warning(f"Failed to get free tier usage: {e}")
return {'requests_today': 0, 'tokens_today': 0}
finally:
db.close()
def get_brave_api_usage():
"""
Get Brave Search API usage for current month.
Brave free tier: 2000 requests/month
Returns:
Dict with usage stats and limits
"""
from datetime import date
from sqlalchemy import func, extract
db = SessionLocal()
try:
today = date.today()
current_month = today.month
current_year = today.year
# Monthly usage
monthly_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
extract('month', AIAPICostLog.timestamp) == current_month,
extract('year', AIAPICostLog.timestamp) == current_year,
AIAPICostLog.api_provider == 'brave'
).first()
# Today's usage
daily_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'brave'
).first()
monthly_used = monthly_result.requests or 0
daily_used = daily_result.requests or 0
monthly_limit = 2000 # Brave free tier
return {
'requests_today': daily_used,
'requests_this_month': monthly_used,
'monthly_limit': monthly_limit,
'remaining': max(0, monthly_limit - monthly_used),
'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0,
'tier': 'free',
'is_limit_reached': monthly_used >= monthly_limit
}
except Exception as e:
logger.warning(f"Failed to get Brave API usage: {e}")
return {
'requests_today': 0,
'requests_this_month': 0,
'monthly_limit': 2000,
'remaining': 2000,
'usage_percent': 0,
'tier': 'free',
'is_limit_reached': False
}
finally:
db.close()
def log_brave_api_call(user_id=None, feature='news_search', company_name=None):
"""
Log a Brave API call for usage tracking.
Args:
user_id: User who triggered the call (optional)
feature: Feature name (news_search, etc.)
company_name: Company being searched (for reference)
"""
db = SessionLocal()
try:
log_entry = AIAPICostLog(
api_provider='brave',
model_name='search_api',
feature=feature,
user_id=user_id,
input_tokens=0,
output_tokens=0,
total_tokens=0
)
db.add(log_entry)
db.commit()
logger.debug(f"Logged Brave API call: {feature} for {company_name}")
except Exception as e:
logger.error(f"Failed to log Brave API call: {e}")
db.rollback()
finally:
db.close()
# ============================================================
# HEALTH CHECK
# ============================================================
@app.route('/health')
def health():
"""Health check endpoint for monitoring"""
return {'status': 'ok'}, 200
# ============================================================
# PUBLIC ROUTES
# ============================================================
@app.route('/')
def index():
"""Homepage - landing page for guests, company directory for logged in users"""
if not current_user.is_authenticated:
# Landing page for guests
db = SessionLocal()
try:
total_companies = db.query(Company).filter_by(status='active').count()
total_categories = db.query(Category).count()
return render_template(
'landing.html',
total_companies=total_companies,
total_categories=total_categories
)
finally:
db.close()
# Company directory for logged in users
db = SessionLocal()
try:
companies = db.query(Company).filter_by(status='active').order_by(Company.name).all()
categories = db.query(Category).order_by(Category.sort_order).all()
total_companies = len(companies)
total_categories = len([c for c in categories if db.query(Company).filter_by(category_id=c.id).count() > 0])
return render_template(
'index.html',
companies=companies,
categories=categories,
total_companies=total_companies,
total_categories=total_categories
)
finally:
db.close()
@app.route('/company/<int:company_id>')
# @login_required # Public access
def company_detail(company_id):
"""Company detail page - requires login"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
flash('Firma nie znaleziona.', 'error')
return redirect(url_for('index'))
# Load digital maturity data if available
maturity_data = db.query(CompanyDigitalMaturity).filter_by(company_id=company_id).first()
website_analysis = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company_id).first()
# Load quality tracking data
quality_data = db.query(CompanyQualityTracking).filter_by(company_id=company_id).first()
# Load company events (latest 10)
events = db.query(CompanyEvent).filter_by(company_id=company_id).order_by(
CompanyEvent.event_date.desc(),
CompanyEvent.created_at.desc()
).limit(10).all()
# Load website scraping data (most recent)
website_content = db.query(CompanyWebsiteContent).filter_by(company_id=company_id).order_by(
CompanyWebsiteContent.scraped_at.desc()
).first()
# Load AI insights
ai_insights = db.query(CompanyAIInsights).filter_by(company_id=company_id).first()
# Load social media profiles
social_media = db.query(CompanySocialMedia).filter_by(company_id=company_id).all()
# Load company contacts (phones, emails with sources)
contacts = db.query(CompanyContact).filter_by(company_id=company_id).order_by(
CompanyContact.contact_type,
CompanyContact.is_primary.desc()
).all()
# Load recommendations (approved only, with recommender details)
recommendations = db.query(CompanyRecommendation).filter_by(
company_id=company_id,
status='approved'
).join(User, CompanyRecommendation.user_id == User.id).order_by(
CompanyRecommendation.created_at.desc()
).all()
return render_template('company_detail.html',
company=company,
maturity_data=maturity_data,
website_analysis=website_analysis,
quality_data=quality_data,
events=events,
website_content=website_content,
ai_insights=ai_insights,
social_media=social_media,
contacts=contacts,
recommendations=recommendations
)
finally:
db.close()
@app.route('/company/<slug>')
# @login_required # Disabled - public access
def company_detail_by_slug(slug):
"""Company detail page by slug - requires login"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(slug=slug).first()
if not company:
flash('Firma nie znaleziona.', 'error')
return redirect(url_for('index'))
# Redirect to canonical int ID route
return redirect(url_for('company_detail', company_id=company.id))
finally:
db.close()
@app.route('/company/<slug>/recommend', methods=['GET', 'POST'])
# @login_required # Disabled - public access
def company_recommend(slug):
"""Create recommendation for a company - requires login"""
db = SessionLocal()
try:
# Get company
company = db.query(Company).filter_by(slug=slug).first()
if not company:
flash('Firma nie znaleziona.', 'error')
return redirect(url_for('index'))
# Handle POST (form submission)
if request.method == 'POST':
recommendation_text = request.form.get('recommendation_text', '').strip()
service_category = sanitize_input(request.form.get('service_category', ''), 200)
show_contact = request.form.get('show_contact') == '1'
# Validation
if not recommendation_text or len(recommendation_text) < 50:
flash('Rekomendacja musi mieć co najmniej 50 znaków.', 'error')
return render_template('company/recommend.html', company=company)
if len(recommendation_text) > 2000:
flash('Rekomendacja może mieć maksymalnie 2000 znaków.', 'error')
return render_template('company/recommend.html', company=company)
# Prevent self-recommendation
if current_user.company_id == company.id:
flash('Nie możesz polecać własnej firmy.', 'error')
return redirect(url_for('company_detail', company_id=company.id))
# Check for duplicate (user already recommended this company)
existing = db.query(CompanyRecommendation).filter_by(
user_id=current_user.id,
company_id=company.id
).first()
if existing:
flash('Już poleciłeś tę firmę. Możesz edytować swoją wcześniejszą rekomendację.', 'error')
return redirect(url_for('company_detail', company_id=company.id))
# Create recommendation
recommendation = CompanyRecommendation(
company_id=company.id,
user_id=current_user.id,
recommendation_text=recommendation_text,
service_category=service_category if service_category else None,
show_contact=show_contact,
status='pending'
)
db.add(recommendation)
db.commit()
flash('Dziękujemy! Twoja rekomendacja została przesłana i oczekuje na moderację.', 'success')
return redirect(url_for('company_detail', company_id=company.id))
# Handle GET (show form)
return render_template('company/recommend.html', company=company)
finally:
db.close()
@app.route('/search')
@login_required
def search():
"""Search companies with advanced matching - requires login"""
query = request.args.get('q', '')
category_id = request.args.get('category', type=int)
db = SessionLocal()
try:
# Use new SearchService with synonym expansion, NIP/REGON lookup, and fuzzy matching
results = search_companies(db, query, category_id, limit=50)
# Extract companies from SearchResult objects
companies = [r.company for r in results]
# For debugging/analytics - log search stats
if query:
match_types = {}
for r in results:
match_types[r.match_type] = match_types.get(r.match_type, 0) + 1
logger.info(f"Search '{query}': {len(companies)} results, types: {match_types}")
return render_template(
'search_results.html',
companies=companies,
query=query,
category_id=category_id,
result_count=len(companies)
)
finally:
db.close()
@app.route('/aktualnosci')
@login_required
def events():
"""Company events and news - latest updates from member companies"""
from sqlalchemy import func
event_type_filter = request.args.get('type', '')
company_id = request.args.get('company', type=int)
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
# Build query
query = db.query(CompanyEvent).join(Company)
# Apply filters
if event_type_filter:
query = query.filter(CompanyEvent.event_type == event_type_filter)
if company_id:
query = query.filter(CompanyEvent.company_id == company_id)
# Order by date (newest first)
query = query.order_by(
CompanyEvent.event_date.desc(),
CompanyEvent.created_at.desc()
)
# Pagination
total_events = query.count()
events = query.limit(per_page).offset((page - 1) * per_page).all()
# Get companies with events for filter dropdown
companies_with_events = db.query(Company).join(CompanyEvent).distinct().order_by(Company.name).all()
# Event type statistics
event_types = db.query(
CompanyEvent.event_type,
func.count(CompanyEvent.id)
).group_by(CompanyEvent.event_type).all()
return render_template(
'events.html',
events=events,
companies_with_events=companies_with_events,
event_types=event_types,
event_type_filter=event_type_filter,
company_id=company_id,
page=page,
per_page=per_page,
total_events=total_events,
total_pages=(total_events + per_page - 1) // per_page
)
finally:
db.close()
# ============================================================
# FORUM ROUTES
# ============================================================
@app.route('/forum')
@login_required
def forum_index():
"""Forum - list of topics"""
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
# Get topics ordered by pinned first, then by last activity
query = db.query(ForumTopic).order_by(
ForumTopic.is_pinned.desc(),
ForumTopic.updated_at.desc()
)
total_topics = query.count()
topics = query.limit(per_page).offset((page - 1) * per_page).all()
return render_template(
'forum/index.html',
topics=topics,
page=page,
per_page=per_page,
total_topics=total_topics,
total_pages=(total_topics + per_page - 1) // per_page
)
finally:
db.close()
@app.route('/forum/nowy', methods=['GET', 'POST'])
@login_required
def forum_new_topic():
"""Create new forum topic"""
if request.method == 'POST':
title = sanitize_input(request.form.get('title', ''), 255)
content = request.form.get('content', '').strip()
if not title or len(title) < 5:
flash('Tytuł musi mieć co najmniej 5 znaków.', 'error')
return render_template('forum/new_topic.html')
if not content or len(content) < 10:
flash('Treść musi mieć co najmniej 10 znaków.', 'error')
return render_template('forum/new_topic.html')
db = SessionLocal()
try:
topic = ForumTopic(
title=title,
content=content,
author_id=current_user.id
)
db.add(topic)
db.commit()
db.refresh(topic)
flash('Temat został utworzony.', 'success')
return redirect(url_for('forum_topic', topic_id=topic.id))
finally:
db.close()
return render_template('forum/new_topic.html')
@app.route('/forum/<int:topic_id>')
@login_required
def forum_topic(topic_id):
"""View forum topic with replies"""
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
flash('Temat nie istnieje.', 'error')
return redirect(url_for('forum_index'))
# Increment view count
topic.views_count += 1
db.commit()
return render_template('forum/topic.html', topic=topic)
finally:
db.close()
@app.route('/forum/<int:topic_id>/odpowiedz', methods=['POST'])
@login_required
def forum_reply(topic_id):
"""Add reply to forum topic"""
content = request.form.get('content', '').strip()
if not content or len(content) < 3:
flash('Odpowiedź musi mieć co najmniej 3 znaki.', 'error')
return redirect(url_for('forum_topic', topic_id=topic_id))
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
flash('Temat nie istnieje.', 'error')
return redirect(url_for('forum_index'))
if topic.is_locked:
flash('Ten temat jest zamknięty.', 'error')
return redirect(url_for('forum_topic', topic_id=topic_id))
reply = ForumReply(
topic_id=topic_id,
author_id=current_user.id,
content=content
)
db.add(reply)
# Update topic updated_at
topic.updated_at = datetime.now()
db.commit()
flash('Odpowiedź dodana.', 'success')
return redirect(url_for('forum_topic', topic_id=topic_id))
finally:
db.close()
# ============================================================
# FORUM ADMIN ROUTES
# ============================================================
@app.route('/admin/forum')
@login_required
def admin_forum():
"""Admin panel for forum moderation"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('forum_index'))
db = SessionLocal()
try:
# Get all topics with stats
topics = db.query(ForumTopic).order_by(
ForumTopic.created_at.desc()
).all()
# Get recent replies
recent_replies = db.query(ForumReply).order_by(
ForumReply.created_at.desc()
).limit(50).all()
# Stats
total_topics = len(topics)
total_replies = db.query(ForumReply).count()
pinned_count = sum(1 for t in topics if t.is_pinned)
locked_count = sum(1 for t in topics if t.is_locked)
return render_template(
'admin/forum.html',
topics=topics,
recent_replies=recent_replies,
total_topics=total_topics,
total_replies=total_replies,
pinned_count=pinned_count,
locked_count=locked_count
)
finally:
db.close()
@app.route('/admin/forum/topic/<int:topic_id>/pin', methods=['POST'])
@login_required
def admin_forum_pin(topic_id):
"""Toggle topic pin status"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
topic.is_pinned = not topic.is_pinned
db.commit()
logger.info(f"Admin {current_user.email} {'pinned' if topic.is_pinned else 'unpinned'} topic #{topic_id}")
return jsonify({
'success': True,
'is_pinned': topic.is_pinned,
'message': f"Temat {'przypięty' if topic.is_pinned else 'odpięty'}"
})
finally:
db.close()
@app.route('/admin/forum/topic/<int:topic_id>/lock', methods=['POST'])
@login_required
def admin_forum_lock(topic_id):
"""Toggle topic lock status"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
topic.is_locked = not topic.is_locked
db.commit()
logger.info(f"Admin {current_user.email} {'locked' if topic.is_locked else 'unlocked'} topic #{topic_id}")
return jsonify({
'success': True,
'is_locked': topic.is_locked,
'message': f"Temat {'zamknięty' if topic.is_locked else 'otwarty'}"
})
finally:
db.close()
@app.route('/admin/forum/topic/<int:topic_id>/delete', methods=['POST'])
@login_required
def admin_forum_delete_topic(topic_id):
"""Delete topic and all its replies"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
topic_title = topic.title
db.delete(topic) # Cascade deletes replies
db.commit()
logger.info(f"Admin {current_user.email} deleted topic #{topic_id}: {topic_title}")
return jsonify({
'success': True,
'message': 'Temat usunięty'
})
finally:
db.close()
@app.route('/admin/forum/reply/<int:reply_id>/delete', methods=['POST'])
@login_required
def admin_forum_delete_reply(reply_id):
"""Delete a reply"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
reply = db.query(ForumReply).filter(ForumReply.id == reply_id).first()
if not reply:
return jsonify({'success': False, 'error': 'Odpowiedź nie istnieje'}), 404
topic_id = reply.topic_id
db.delete(reply)
db.commit()
logger.info(f"Admin {current_user.email} deleted reply #{reply_id} from topic #{topic_id}")
return jsonify({
'success': True,
'message': 'Odpowiedź usunięta'
})
finally:
db.close()
# ============================================================
# RECOMMENDATIONS ADMIN ROUTES
# ============================================================
@app.route('/admin/recommendations')
@login_required
def admin_recommendations():
"""Admin panel for recommendations moderation"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('index'))
db = SessionLocal()
try:
# Get all recommendations with user and company info
recommendations = db.query(CompanyRecommendation).order_by(
CompanyRecommendation.created_at.desc()
).all()
# Get pending recommendations (requires moderation)
pending_recommendations = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'pending'
).order_by(CompanyRecommendation.created_at.desc()).all()
# Stats
total_recommendations = len(recommendations)
pending_count = len(pending_recommendations)
approved_count = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'approved'
).count()
rejected_count = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'rejected'
).count()
logger.info(f"Admin {current_user.email} accessed recommendations panel - {pending_count} pending")
return render_template(
'admin/recommendations.html',
recommendations=recommendations,
pending_recommendations=pending_recommendations,
total_recommendations=total_recommendations,
pending_count=pending_count,
approved_count=approved_count,
rejected_count=rejected_count
)
finally:
db.close()
@app.route('/admin/recommendations/<int:recommendation_id>/approve', methods=['POST'])
@login_required
def admin_recommendation_approve(recommendation_id):
"""Approve a recommendation"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
recommendation = db.query(CompanyRecommendation).filter(
CompanyRecommendation.id == recommendation_id
).first()
if not recommendation:
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
recommendation.status = 'approved'
recommendation.moderated_by = current_user.id
recommendation.moderated_at = datetime.utcnow()
recommendation.rejection_reason = None # Clear any previous rejection reason
db.commit()
logger.info(f"Admin {current_user.email} approved recommendation #{recommendation_id}")
return jsonify({
'success': True,
'message': 'Rekomendacja zatwierdzona'
})
finally:
db.close()
@app.route('/admin/recommendations/<int:recommendation_id>/reject', methods=['POST'])
@login_required
def admin_recommendation_reject(recommendation_id):
"""Reject a recommendation"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
recommendation = db.query(CompanyRecommendation).filter(
CompanyRecommendation.id == recommendation_id
).first()
if not recommendation:
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
# Get optional rejection reason from request
rejection_reason = request.json.get('reason', '') if request.is_json else request.form.get('reason', '')
recommendation.status = 'rejected'
recommendation.moderated_by = current_user.id
recommendation.moderated_at = datetime.utcnow()
recommendation.rejection_reason = rejection_reason.strip() if rejection_reason else None
db.commit()
logger.info(f"Admin {current_user.email} rejected recommendation #{recommendation_id}")
return jsonify({
'success': True,
'message': 'Rekomendacja odrzucona'
})
finally:
db.close()
# ============================================================
# MEMBERSHIP FEES ADMIN
# ============================================================
MONTHS_PL = [
(1, 'Styczen'), (2, 'Luty'), (3, 'Marzec'), (4, 'Kwiecien'),
(5, 'Maj'), (6, 'Czerwiec'), (7, 'Lipiec'), (8, 'Sierpien'),
(9, 'Wrzesien'), (10, 'Pazdziernik'), (11, 'Listopad'), (12, 'Grudzien')
]
@app.route('/admin/fees')
@login_required
def admin_fees():
"""Admin panel for membership fee management"""
if not current_user.is_admin:
flash('Brak uprawnien do tej strony.', 'error')
return redirect(url_for('index'))
db = SessionLocal()
try:
from sqlalchemy import func, case
from decimal import Decimal
# Get filter parameters
year = request.args.get('year', datetime.now().year, type=int)
month = request.args.get('month', type=int)
status_filter = request.args.get('status', '')
# Get all active companies
companies = db.query(Company).filter(Company.status == 'active').order_by(Company.name).all()
# Get fees for selected period
fee_query = db.query(MembershipFee).filter(MembershipFee.fee_year == year)
if month:
fee_query = fee_query.filter(MembershipFee.fee_month == month)
fees = {(f.company_id, f.fee_month): f for f in fee_query.all()}
# Build company list with fee status
companies_fees = []
for company in companies:
if month:
fee = fees.get((company.id, month))
companies_fees.append({
'company': company,
'fee': fee,
'status': fee.status if fee else 'brak'
})
else:
# Show all months
company_data = {'company': company, 'months': {}}
for m in range(1, 13):
fee = fees.get((company.id, m))
company_data['months'][m] = fee
companies_fees.append(company_data)
# Apply status filter
if status_filter and month:
if status_filter == 'paid':
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'paid']
elif status_filter == 'pending':
companies_fees = [cf for cf in companies_fees if cf.get('status') in ('pending', 'brak')]
elif status_filter == 'overdue':
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'overdue']
# Calculate stats
total_companies = len(companies)
if month:
month_fees = [cf.get('fee') for cf in companies_fees if cf.get('fee')]
paid_count = sum(1 for f in month_fees if f and f.status == 'paid')
pending_count = total_companies - paid_count
total_due = sum(float(f.amount) for f in month_fees if f) if month_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in month_fees if f) if month_fees else Decimal(0)
else:
all_fees = list(fees.values())
paid_count = sum(1 for f in all_fees if f.status == 'paid')
pending_count = len(all_fees) - paid_count
total_due = sum(float(f.amount) for f in all_fees) if all_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in all_fees) if all_fees else Decimal(0)
# Get default fee amount
fee_config = db.query(MembershipFeeConfig).filter(
MembershipFeeConfig.scope == 'global',
MembershipFeeConfig.valid_until == None
).first()
default_fee = float(fee_config.monthly_amount) if fee_config else 100.00
return render_template(
'admin/fees.html',
companies_fees=companies_fees,
year=year,
month=month,
status_filter=status_filter,
total_companies=total_companies,
paid_count=paid_count,
pending_count=pending_count,
total_due=total_due,
total_paid=total_paid,
default_fee=default_fee,
years=list(range(2024, datetime.now().year + 2)),
months=MONTHS_PL
)
finally:
db.close()
@app.route('/admin/fees/generate', methods=['POST'])
@login_required
def admin_fees_generate():
"""Generate fee records for all companies for a given month"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
db = SessionLocal()
try:
year = request.form.get('year', type=int)
month = request.form.get('month', type=int)
if not year or not month:
return jsonify({'success': False, 'error': 'Brak roku lub miesiaca'}), 400
# Get default fee amount
fee_config = db.query(MembershipFeeConfig).filter(
MembershipFeeConfig.scope == 'global',
MembershipFeeConfig.valid_until == None
).first()
default_fee = fee_config.monthly_amount if fee_config else 100.00
# Get all active companies
companies = db.query(Company).filter(Company.status == 'active').all()
created = 0
for company in companies:
# Check if record already exists
existing = db.query(MembershipFee).filter(
MembershipFee.company_id == company.id,
MembershipFee.fee_year == year,
MembershipFee.fee_month == month
).first()
if not existing:
fee = MembershipFee(
company_id=company.id,
fee_year=year,
fee_month=month,
amount=default_fee,
status='pending'
)
db.add(fee)
created += 1
db.commit()
return jsonify({
'success': True,
'message': f'Utworzono {created} rekordow skladek'
})
except Exception as e:
db.rollback()
logger.error(f"Error generating fees: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@app.route('/admin/fees/<int:fee_id>/mark-paid', methods=['POST'])
@login_required
def admin_fees_mark_paid(fee_id):
"""Mark a fee as paid"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
db = SessionLocal()
try:
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
if not fee:
return jsonify({'success': False, 'error': 'Nie znaleziono skladki'}), 404
# Get data from request
amount_paid = request.form.get('amount_paid', type=float)
payment_date = request.form.get('payment_date')
payment_method = request.form.get('payment_method', 'transfer')
payment_reference = request.form.get('payment_reference', '')
notes = request.form.get('notes', '')
# Update fee record
fee.amount_paid = amount_paid or float(fee.amount)
fee.payment_date = datetime.strptime(payment_date, '%Y-%m-%d').date() if payment_date else datetime.now().date()
fee.payment_method = payment_method
fee.payment_reference = payment_reference
fee.notes = notes
fee.recorded_by = current_user.id
fee.recorded_at = datetime.now()
# Set status based on payment amount
if fee.amount_paid >= float(fee.amount):
fee.status = 'paid'
elif fee.amount_paid > 0:
fee.status = 'partial'
db.commit()
return jsonify({
'success': True,
'message': 'Skladka zostala zarejestrowana'
})
except Exception as e:
db.rollback()
logger.error(f"Error marking fee as paid: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@app.route('/admin/fees/bulk-mark-paid', methods=['POST'])
@login_required
def admin_fees_bulk_mark_paid():
"""Bulk mark fees as paid"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
db = SessionLocal()
try:
fee_ids = request.form.getlist('fee_ids[]', type=int)
if not fee_ids:
return jsonify({'success': False, 'error': 'Brak wybranych skladek'}), 400
updated = 0
for fee_id in fee_ids:
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
if fee and fee.status != 'paid':
fee.status = 'paid'
fee.amount_paid = fee.amount
fee.payment_date = datetime.now().date()
fee.recorded_by = current_user.id
fee.recorded_at = datetime.now()
updated += 1
db.commit()
return jsonify({
'success': True,
'message': f'Zaktualizowano {updated} rekordow'
})
except Exception as e:
db.rollback()
logger.error(f"Error in bulk action: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@app.route('/admin/fees/export')
@login_required
def admin_fees_export():
"""Export fees to CSV"""
if not current_user.is_admin:
flash('Brak uprawnien.', 'error')
return redirect(url_for('admin_fees'))
import csv
from io import StringIO
db = SessionLocal()
try:
year = request.args.get('year', datetime.now().year, type=int)
month = request.args.get('month', type=int)
query = db.query(MembershipFee).join(Company).filter(
MembershipFee.fee_year == year
)
if month:
query = query.filter(MembershipFee.fee_month == month)
fees = query.order_by(Company.name, MembershipFee.fee_month).all()
# Generate CSV
output = StringIO()
writer = csv.writer(output)
writer.writerow([
'Firma', 'NIP', 'Rok', 'Miesiac', 'Kwota', 'Zaplacono',
'Status', 'Data platnosci', 'Metoda', 'Referencja', 'Notatki'
])
for fee in fees:
writer.writerow([
fee.company.name,
fee.company.nip,
fee.fee_year,
fee.fee_month,
fee.amount,
fee.amount_paid,
fee.status,
fee.payment_date,
fee.payment_method,
fee.payment_reference,
fee.notes
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=skladki_{year}_{month or "all"}.csv'
}
)
finally:
db.close()
# ============================================================
# CALENDAR ROUTES
# ============================================================
@app.route('/kalendarz')
@login_required
def calendar_index():
"""Kalendarz wydarzeń Norda Biznes"""
from datetime import date
db = SessionLocal()
try:
today = date.today()
# Nadchodzące wydarzenia
upcoming = db.query(NordaEvent).filter(
NordaEvent.event_date >= today
).order_by(NordaEvent.event_date.asc()).all()
# Przeszłe wydarzenia (ostatnie 5)
past = db.query(NordaEvent).filter(
NordaEvent.event_date < today
).order_by(NordaEvent.event_date.desc()).limit(5).all()
return render_template('calendar/index.html',
upcoming_events=upcoming,
past_events=past,
today=today
)
finally:
db.close()
@app.route('/kalendarz/<int:event_id>')
@login_required
def calendar_event(event_id):
"""Szczegóły wydarzenia"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
flash('Wydarzenie nie istnieje.', 'error')
return redirect(url_for('calendar_index'))
# Sprawdź czy użytkownik jest zapisany
user_attending = db.query(EventAttendee).filter(
EventAttendee.event_id == event_id,
EventAttendee.user_id == current_user.id
).first()
return render_template('calendar/event.html',
event=event,
user_attending=user_attending
)
finally:
db.close()
@app.route('/kalendarz/<int:event_id>/rsvp', methods=['POST'])
@login_required
def calendar_rsvp(event_id):
"""Zapisz się / wypisz z wydarzenia"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
# Sprawdź czy już zapisany
existing = db.query(EventAttendee).filter(
EventAttendee.event_id == event_id,
EventAttendee.user_id == current_user.id
).first()
if existing:
# Wypisz
db.delete(existing)
db.commit()
return jsonify({
'success': True,
'action': 'removed',
'message': 'Wypisano z wydarzenia',
'attendee_count': event.attendee_count
})
else:
# Zapisz
if event.max_attendees and event.attendee_count >= event.max_attendees:
return jsonify({'success': False, 'error': 'Brak wolnych miejsc'}), 400
attendee = EventAttendee(
event_id=event_id,
user_id=current_user.id,
status='confirmed'
)
db.add(attendee)
db.commit()
return jsonify({
'success': True,
'action': 'added',
'message': 'Zapisano na wydarzenie',
'attendee_count': event.attendee_count
})
finally:
db.close()
@app.route('/admin/kalendarz')
@login_required
def admin_calendar():
"""Panel admin - zarządzanie wydarzeniami"""
if not current_user.is_admin:
flash('Brak uprawnień.', 'error')
return redirect(url_for('calendar_index'))
db = SessionLocal()
try:
events = db.query(NordaEvent).order_by(NordaEvent.event_date.desc()).all()
return render_template('calendar/admin.html', events=events)
finally:
db.close()
@app.route('/admin/kalendarz/nowy', methods=['GET', 'POST'])
@login_required
def admin_calendar_new():
"""Dodaj nowe wydarzenie"""
if not current_user.is_admin:
flash('Brak uprawnień.', 'error')
return redirect(url_for('calendar_index'))
if request.method == 'POST':
from datetime import datetime as dt
title = sanitize_input(request.form.get('title', ''), 255)
description = request.form.get('description', '').strip()
event_type = request.form.get('event_type', 'meeting')
event_date_str = request.form.get('event_date', '')
time_start_str = request.form.get('time_start', '')
time_end_str = request.form.get('time_end', '')
location = sanitize_input(request.form.get('location', ''), 500)
location_url = request.form.get('location_url', '').strip()
speaker_name = sanitize_input(request.form.get('speaker_name', ''), 255)
max_attendees = request.form.get('max_attendees', type=int)
if not title or not event_date_str:
flash('Tytuł i data są wymagane.', 'error')
return render_template('calendar/admin_new.html')
db = SessionLocal()
try:
event = NordaEvent(
title=title,
description=description,
event_type=event_type,
event_date=dt.strptime(event_date_str, '%Y-%m-%d').date(),
time_start=dt.strptime(time_start_str, '%H:%M').time() if time_start_str else None,
time_end=dt.strptime(time_end_str, '%H:%M').time() if time_end_str else None,
location=location,
location_url=location_url,
speaker_name=speaker_name,
max_attendees=max_attendees,
created_by=current_user.id
)
db.add(event)
db.commit()
flash('Wydarzenie utworzone.', 'success')
return redirect(url_for('admin_calendar'))
finally:
db.close()
return render_template('calendar/admin_new.html')
@app.route('/admin/kalendarz/<int:event_id>/delete', methods=['POST'])
@login_required
def admin_calendar_delete(event_id):
"""Usuń wydarzenie"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
db.delete(event)
db.commit()
return jsonify({'success': True, 'message': 'Wydarzenie usunięte'})
finally:
db.close()
# ============================================================
# PRIVATE MESSAGES ROUTES
# ============================================================
@app.route('/wiadomosci')
@login_required
def messages_inbox():
"""Skrzynka odbiorcza"""
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
query = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id
).order_by(PrivateMessage.created_at.desc())
total = query.count()
messages = query.limit(per_page).offset((page - 1) * per_page).all()
unread_count = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id,
PrivateMessage.is_read == False
).count()
return render_template('messages/inbox.html',
messages=messages,
page=page,
total_pages=(total + per_page - 1) // per_page,
unread_count=unread_count
)
finally:
db.close()
@app.route('/wiadomosci/wyslane')
@login_required
def messages_sent():
"""Wysłane wiadomości"""
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
query = db.query(PrivateMessage).filter(
PrivateMessage.sender_id == current_user.id
).order_by(PrivateMessage.created_at.desc())
total = query.count()
messages = query.limit(per_page).offset((page - 1) * per_page).all()
return render_template('messages/sent.html',
messages=messages,
page=page,
total_pages=(total + per_page - 1) // per_page
)
finally:
db.close()
@app.route('/wiadomosci/nowa')
@login_required
def messages_new():
"""Formularz nowej wiadomości"""
recipient_id = request.args.get('to', type=int)
db = SessionLocal()
try:
# Lista użytkowników do wyboru
users = db.query(User).filter(
User.is_active == True,
User.is_verified == True,
User.id != current_user.id
).order_by(User.name).all()
recipient = None
if recipient_id:
recipient = db.query(User).filter(User.id == recipient_id).first()
return render_template('messages/compose.html',
users=users,
recipient=recipient
)
finally:
db.close()
@app.route('/wiadomosci/wyslij', methods=['POST'])
@login_required
def messages_send():
"""Wyślij wiadomość"""
recipient_id = request.form.get('recipient_id', type=int)
subject = sanitize_input(request.form.get('subject', ''), 255)
content = request.form.get('content', '').strip()
if not recipient_id or not content:
flash('Odbiorca i treść są wymagane.', 'error')
return redirect(url_for('messages_new'))
db = SessionLocal()
try:
recipient = db.query(User).filter(User.id == recipient_id).first()
if not recipient:
flash('Odbiorca nie istnieje.', 'error')
return redirect(url_for('messages_new'))
message = PrivateMessage(
sender_id=current_user.id,
recipient_id=recipient_id,
subject=subject,
content=content
)
db.add(message)
db.commit()
flash('Wiadomość wysłana.', 'success')
return redirect(url_for('messages_sent'))
finally:
db.close()
@app.route('/wiadomosci/<int:message_id>')
@login_required
def messages_view(message_id):
"""Czytaj wiadomość"""
db = SessionLocal()
try:
message = db.query(PrivateMessage).filter(
PrivateMessage.id == message_id
).first()
if not message:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('messages_inbox'))
# Sprawdź dostęp
if message.recipient_id != current_user.id and message.sender_id != current_user.id:
flash('Brak dostępu do tej wiadomości.', 'error')
return redirect(url_for('messages_inbox'))
# Oznacz jako przeczytaną
if message.recipient_id == current_user.id and not message.is_read:
message.is_read = True
message.read_at = datetime.now()
db.commit()
return render_template('messages/view.html', message=message)
finally:
db.close()
@app.route('/wiadomosci/<int:message_id>/odpowiedz', methods=['POST'])
@login_required
def messages_reply(message_id):
"""Odpowiedz na wiadomość"""
content = request.form.get('content', '').strip()
if not content:
flash('Treść jest wymagana.', 'error')
return redirect(url_for('messages_view', message_id=message_id))
db = SessionLocal()
try:
original = db.query(PrivateMessage).filter(
PrivateMessage.id == message_id
).first()
if not original:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('messages_inbox'))
# Odpowiedz do nadawcy oryginalnej wiadomości
recipient_id = original.sender_id if original.sender_id != current_user.id else original.recipient_id
reply = PrivateMessage(
sender_id=current_user.id,
recipient_id=recipient_id,
subject=f"Re: {original.subject}" if original.subject else None,
content=content,
parent_id=message_id
)
db.add(reply)
db.commit()
flash('Odpowiedź wysłana.', 'success')
return redirect(url_for('messages_view', message_id=message_id))
finally:
db.close()
@app.route('/api/messages/unread-count')
@login_required
def api_unread_count():
"""API: Liczba nieprzeczytanych wiadomości"""
db = SessionLocal()
try:
count = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id,
PrivateMessage.is_read == False
).count()
return jsonify({'count': count})
finally:
db.close()
# ============================================================
# NOTIFICATIONS API ROUTES
# ============================================================
@app.route('/api/notifications')
@login_required
def api_notifications():
"""API: Get user notifications"""
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
unread_only = request.args.get('unread_only', 'false').lower() == 'true'
db = SessionLocal()
try:
query = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id
)
if unread_only:
query = query.filter(UserNotification.is_read == False)
# Order by most recent first
query = query.order_by(UserNotification.created_at.desc())
total = query.count()
notifications = query.limit(limit).offset(offset).all()
return jsonify({
'success': True,
'notifications': [
{
'id': n.id,
'title': n.title,
'message': n.message,
'notification_type': n.notification_type,
'related_type': n.related_type,
'related_id': n.related_id,
'action_url': n.action_url,
'is_read': n.is_read,
'created_at': n.created_at.isoformat() if n.created_at else None
}
for n in notifications
],
'total': total,
'unread_count': db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
})
finally:
db.close()
@app.route('/api/notifications/<int:notification_id>/read', methods=['POST'])
@login_required
def api_notification_mark_read(notification_id):
"""API: Mark notification as read"""
db = SessionLocal()
try:
notification = db.query(UserNotification).filter(
UserNotification.id == notification_id,
UserNotification.user_id == current_user.id
).first()
if not notification:
return jsonify({'success': False, 'error': 'Powiadomienie nie znalezione'}), 404
notification.mark_as_read()
db.commit()
return jsonify({
'success': True,
'message': 'Oznaczono jako przeczytane'
})
finally:
db.close()
@app.route('/api/notifications/read-all', methods=['POST'])
@login_required
def api_notifications_mark_all_read():
"""API: Mark all notifications as read"""
db = SessionLocal()
try:
updated = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).update({
UserNotification.is_read: True,
UserNotification.read_at: datetime.now()
})
db.commit()
return jsonify({
'success': True,
'message': f'Oznaczono {updated} powiadomien jako przeczytane',
'count': updated
})
finally:
db.close()
@app.route('/api/notifications/unread-count')
@login_required
def api_notifications_unread_count():
"""API: Get unread notifications count"""
db = SessionLocal()
try:
count = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
return jsonify({'count': count})
finally:
db.close()
# ============================================================
# RECOMMENDATIONS API ROUTES
# ============================================================
@app.route('/api/recommendations/<int:company_id>', methods=['GET'])
@login_required
def api_get_recommendations(company_id):
"""API: Get all approved recommendations for a company"""
db = SessionLocal()
try:
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Query recommendations with user details
recommendations = db.query(CompanyRecommendation).filter_by(
company_id=company_id,
status='approved'
).join(User, CompanyRecommendation.user_id == User.id).order_by(CompanyRecommendation.created_at.desc()).all()
# Build response with recommender details
result = []
for rec in recommendations:
recommender = db.query(User).filter_by(id=rec.user_id).first()
recommender_company = None
if recommender and recommender.company_id:
recommender_company = db.query(Company).filter_by(id=recommender.company_id).first()
rec_data = {
'id': rec.id,
'recommendation_text': rec.recommendation_text,
'service_category': rec.service_category,
'created_at': rec.created_at.isoformat() if rec.created_at else None,
'updated_at': rec.updated_at.isoformat() if rec.updated_at else None,
'recommender': {
'name': recommender.full_name if recommender else '[Użytkownik usunięty]',
'email': recommender.email if (recommender and rec.show_contact) else None,
'phone': recommender.phone if (recommender and rec.show_contact) else None,
'company_id': recommender_company.id if recommender_company else None,
'company_name': recommender_company.name if recommender_company else None,
'company_slug': recommender_company.slug if recommender_company else None
}
}
result.append(rec_data)
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'recommendations': result,
'count': len(result)
})
except Exception as e:
logger.error(f"Error fetching recommendations for company {company_id}: {e}")
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas pobierania rekomendacji'
}), 500
finally:
db.close()
@app.route('/api/recommendations/create', methods=['POST'])
@login_required
def api_create_recommendation():
"""API: Create a new recommendation"""
db = SessionLocal()
try:
# Get JSON data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych'
}), 400
company_id = data.get('company_id')
recommendation_text = data.get('recommendation_text', '').strip()
service_category = data.get('service_category', '').strip() or None
show_contact = data.get('show_contact', True)
# Validate required fields
if not company_id:
return jsonify({
'success': False,
'error': 'Brak ID firmy'
}), 400
if not recommendation_text:
return jsonify({
'success': False,
'error': 'Treść rekomendacji jest wymagana'
}), 400
# Validate text length (50-2000 characters)
if len(recommendation_text) < 50:
return jsonify({
'success': False,
'error': 'Rekomendacja musi mieć co najmniej 50 znaków'
}), 400
if len(recommendation_text) > 2000:
return jsonify({
'success': False,
'error': 'Rekomendacja nie może przekraczać 2000 znaków'
}), 400
# Check if user is verified
if not current_user.is_verified:
return jsonify({
'success': False,
'error': 'Tylko zweryfikowani użytkownicy mogą dodawać rekomendacje'
}), 403
# Verify company exists
company = db.query(Company).filter_by(id=company_id, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Prevent self-recommendation
if current_user.company_id and current_user.company_id == company_id:
return jsonify({
'success': False,
'error': 'Nie możesz polecać własnej firmy'
}), 400
# Check for duplicate recommendation (user can only have one recommendation per company)
existing_rec = db.query(CompanyRecommendation).filter_by(
user_id=current_user.id,
company_id=company_id
).first()
if existing_rec:
return jsonify({
'success': False,
'error': 'Już poleciłeś tę firmę. Możesz edytować swoją istniejącą rekomendację.'
}), 400
# Create recommendation
recommendation = CompanyRecommendation(
company_id=company_id,
user_id=current_user.id,
recommendation_text=recommendation_text,
service_category=service_category,
show_contact=show_contact,
status='pending' # Start as pending for moderation
)
db.add(recommendation)
db.commit()
db.refresh(recommendation)
# Create notification for company owner (if exists)
# Find users associated with this company
company_users = db.query(User).filter_by(company_id=company_id, is_active=True).all()
for company_user in company_users:
if company_user.id != current_user.id:
notification = UserNotification(
user_id=company_user.id,
notification_type='new_recommendation',
title='Nowa rekomendacja',
message=f'{current_user.name or current_user.email} polecił Twoją firmę: {company.name}',
action_url=f'/company/{company.slug}#recommendations',
related_id=recommendation.id
)
db.add(notification)
db.commit()
logger.info(f"Recommendation created: user {current_user.id} -> company {company_id}, ID {recommendation.id}")
return jsonify({
'success': True,
'message': 'Rekomendacja została utworzona i oczekuje na moderację',
'recommendation_id': recommendation.id,
'status': recommendation.status
}), 201
except Exception as e:
logger.error(f"Error creating recommendation: {e}")
db.rollback()
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas tworzenia rekomendacji'
}), 500
finally:
db.close()
@app.route('/api/recommendations/<int:rec_id>/edit', methods=['POST'])
@login_required
def api_edit_recommendation(rec_id):
"""API: Edit an existing recommendation (owner or admin only)"""
db = SessionLocal()
try:
# Get the recommendation
recommendation = db.query(CompanyRecommendation).filter_by(id=rec_id).first()
if not recommendation:
return jsonify({
'success': False,
'error': 'Rekomendacja nie znaleziona'
}), 404
# Check authorization - user must be the owner OR admin
if recommendation.user_id != current_user.id and not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień do edycji tej rekomendacji'
}), 403
# Get JSON data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych'
}), 400
recommendation_text = data.get('recommendation_text', '').strip()
service_category = data.get('service_category', '').strip() or None
show_contact = data.get('show_contact', recommendation.show_contact)
# Validate text if provided
if recommendation_text:
# Validate text length (50-2000 characters)
if len(recommendation_text) < 50:
return jsonify({
'success': False,
'error': 'Rekomendacja musi mieć co najmniej 50 znaków'
}), 400
if len(recommendation_text) > 2000:
return jsonify({
'success': False,
'error': 'Rekomendacja nie może przekraczać 2000 znaków'
}), 400
recommendation.recommendation_text = recommendation_text
# Update other fields if provided
if 'service_category' in data:
recommendation.service_category = service_category
if 'show_contact' in data:
recommendation.show_contact = show_contact
# Update timestamp
recommendation.updated_at = datetime.now()
db.commit()
logger.info(f"Recommendation edited: ID {rec_id} by user {current_user.id}")
return jsonify({
'success': True,
'message': 'Rekomendacja została zaktualizowana',
'recommendation_id': recommendation.id
})
except Exception as e:
logger.error(f"Error editing recommendation {rec_id}: {e}")
db.rollback()
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas edycji rekomendacji'
}), 500
finally:
db.close()
@app.route('/api/recommendations/<int:rec_id>/delete', methods=['POST'])
@login_required
def api_delete_recommendation(rec_id):
"""API: Delete a recommendation (owner or admin only)"""
db = SessionLocal()
try:
# Get the recommendation
recommendation = db.query(CompanyRecommendation).filter_by(id=rec_id).first()
if not recommendation:
return jsonify({
'success': False,
'error': 'Rekomendacja nie znaleziona'
}), 404
# Check authorization - user must be the owner OR admin
if recommendation.user_id != current_user.id and not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień do usunięcia tej rekomendacji'
}), 403
# Store info for logging
company_id = recommendation.company_id
user_id = recommendation.user_id
# Delete the recommendation
db.delete(recommendation)
db.commit()
logger.info(f"Recommendation deleted: ID {rec_id} (company {company_id}, user {user_id}) by user {current_user.id}")
return jsonify({
'success': True,
'message': 'Rekomendacja została usunięta'
})
except Exception as e:
logger.error(f"Error deleting recommendation {rec_id}: {e}")
db.rollback()
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas usuwania rekomendacji'
}), 500
finally:
db.close()
# ============================================================
# B2B CLASSIFIEDS ROUTES
# ============================================================
@app.route('/tablica')
@login_required
def classifieds_index():
"""Tablica ogłoszeń B2B"""
listing_type = request.args.get('type', '')
category = request.args.get('category', '')
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
query = db.query(Classified).filter(
Classified.is_active == True
)
# Filtry
if listing_type:
query = query.filter(Classified.listing_type == listing_type)
if category:
query = query.filter(Classified.category == category)
# Sortowanie - najnowsze pierwsze
query = query.order_by(Classified.created_at.desc())
total = query.count()
classifieds = query.limit(per_page).offset((page - 1) * per_page).all()
# Kategorie do filtrów
categories = [
('uslugi', 'Usługi'),
('produkty', 'Produkty'),
('wspolpraca', 'Współpraca'),
('praca', 'Praca'),
('inne', 'Inne')
]
return render_template('classifieds/index.html',
classifieds=classifieds,
categories=categories,
listing_type=listing_type,
category_filter=category,
page=page,
total_pages=(total + per_page - 1) // per_page
)
finally:
db.close()
@app.route('/tablica/nowe', methods=['GET', 'POST'])
@login_required
def classifieds_new():
"""Dodaj nowe ogłoszenie"""
if request.method == 'POST':
listing_type = request.form.get('listing_type', '')
category = request.form.get('category', '')
title = sanitize_input(request.form.get('title', ''), 255)
description = request.form.get('description', '').strip()
budget_info = sanitize_input(request.form.get('budget_info', ''), 255)
location_info = sanitize_input(request.form.get('location_info', ''), 255)
if not listing_type or not category or not title or not description:
flash('Wszystkie wymagane pola muszą być wypełnione.', 'error')
return render_template('classifieds/new.html')
db = SessionLocal()
try:
# Automatyczne wygaśnięcie po 30 dniach
expires = datetime.now() + timedelta(days=30)
classified = Classified(
author_id=current_user.id,
company_id=current_user.company_id,
listing_type=listing_type,
category=category,
title=title,
description=description,
budget_info=budget_info,
location_info=location_info,
expires_at=expires
)
db.add(classified)
db.commit()
flash('Ogłoszenie dodane.', 'success')
return redirect(url_for('classifieds_index'))
finally:
db.close()
return render_template('classifieds/new.html')
@app.route('/tablica/<int:classified_id>')
@login_required
def classifieds_view(classified_id):
"""Szczegóły ogłoszenia"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
flash('Ogłoszenie nie istnieje.', 'error')
return redirect(url_for('classifieds_index'))
# Zwiększ licznik wyświetleń
classified.views_count += 1
db.commit()
return render_template('classifieds/view.html', classified=classified)
finally:
db.close()
@app.route('/tablica/<int:classified_id>/zakoncz', methods=['POST'])
@login_required
def classifieds_close(classified_id):
"""Zamknij ogłoszenie"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.author_id == current_user.id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub brak uprawnień'}), 404
classified.is_active = False
db.commit()
return jsonify({'success': True, 'message': 'Ogłoszenie zamknięte'})
finally:
db.close()
# ============================================================
# NEW MEMBERS ROUTE
# ============================================================
@app.route('/nowi-czlonkowie')
@login_required
def new_members():
"""Lista nowych firm członkowskich"""
days = request.args.get('days', 90, type=int)
db = SessionLocal()
try:
cutoff_date = datetime.now() - timedelta(days=days)
new_companies = db.query(Company).filter(
Company.status == 'active',
Company.created_at >= cutoff_date
).order_by(Company.created_at.desc()).all()
return render_template('new_members.html',
companies=new_companies,
days=days,
total=len(new_companies)
)
finally:
db.close()
# ============================================================
# AUTHENTICATION ROUTES
# ============================================================
@app.route('/register', methods=['GET', 'POST'])
@limiter.limit("5 per hour") # Limit registration attempts
def register():
"""User registration"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
name = sanitize_input(request.form.get('name', ''), 255)
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
# Validate email
if not validate_email(email):
flash('Nieprawidłowy format adresu email.', 'error')
return render_template('auth/register.html')
# Validate password
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/register.html')
# Validate required fields
if not name or not email or not company_nip:
flash('Imię, email i NIP firmy są wymagane.', 'error')
return render_template('auth/register.html')
# Validate NIP format
if not re.match(r'^\d{10}$', company_nip):
flash('NIP musi składać się z 10 cyfr.', 'error')
return render_template('auth/register.html')
db = SessionLocal()
try:
# Check if user exists
if db.query(User).filter_by(email=email).first():
flash('Email już jest zarejestrowany.', 'error')
return render_template('auth/register.html')
# Check if company is NORDA member
is_norda_member = False
company_id = None
if company_nip and re.match(r'^\d{10}$', company_nip):
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
if company:
is_norda_member = True
company_id = company.id
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Create user
user = User(
email=email,
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
name=name,
company_nip=company_nip,
company_id=company_id,
is_norda_member=is_norda_member,
created_at=datetime.now(),
is_active=True,
is_verified=False, # Requires email verification
verification_token=verification_token,
verification_token_expires=verification_expires
)
db.add(user)
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send verification email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, name, verification_url)
if success:
logger.info(f"Verification email sent to {email}")
else:
logger.warning(f"Failed to send verification email to {email}")
logger.info(f"Verification URL (email failed): {verification_url}")
else:
logger.warning("Email service not configured")
logger.info(f"Verification URL (no email service): {verification_url}")
except Exception as e:
logger.error(f"Error sending verification email: {e}")
logger.info(f"Verification URL (exception): {verification_url}")
logger.info(f"New user registered: {email}")
flash('Rejestracja udana! Sprawdz email i kliknij link weryfikacyjny.', 'success')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Registration error: {e}")
flash('Wystąpił błąd podczas rejestracji. Spróbuj ponownie.', 'error')
return render_template('auth/register.html')
finally:
db.close()
return render_template('auth/register.html')
@app.route('/login', methods=['GET', 'POST'])
@limiter.limit("100 per hour") # Increased for testing
def login():
"""User login"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# Basic validation
if not email or not password:
flash('Email i hasło są wymagane.', 'error')
return render_template('auth/login.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email).first()
if not user or not check_password_hash(user.password_hash, password):
logger.warning(f"Failed login attempt for: {email}")
flash('Nieprawidłowy email lub hasło.', 'error')
return render_template('auth/login.html')
if not user.is_active:
flash('Konto zostało dezaktywowane.', 'error')
return render_template('auth/login.html')
# Require email verification
if not user.is_verified:
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
return render_template('auth/login.html')
login_user(user, remember=remember)
user.last_login = datetime.now()
db.commit()
logger.info(f"User logged in: {email}")
next_page = request.args.get('next')
# Prevent open redirect vulnerability
if next_page and not next_page.startswith('/'):
next_page = None
return redirect(next_page or url_for('dashboard'))
except Exception as e:
logger.error(f"Login error: {e}")
flash('Wystąpił błąd podczas logowania. Spróbuj ponownie.', 'error')
return render_template('auth/login.html')
finally:
db.close()
return render_template('auth/login.html')
@app.route('/logout')
@login_required
def logout():
"""User logout"""
logout_user()
flash('Wylogowano pomyślnie.', 'success')
return redirect(url_for('index'))
@app.route('/forgot-password', methods=['GET', 'POST'])
@limiter.limit("5 per hour")
def forgot_password():
"""Request password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidłowy format adresu email.', 'error')
return render_template('auth/forgot_password.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user:
# Generate reset token
reset_token = secrets.token_urlsafe(32)
reset_expires = datetime.now() + timedelta(hours=1)
# Save token to database
user.reset_token = reset_token
user.reset_token_expires = reset_expires
db.commit()
# Build reset URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
reset_url = f"{base_url}/reset-password/{reset_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_password_reset_email(email, reset_url)
if success:
logger.info(f"Password reset email sent to {email}")
else:
logger.warning(f"Failed to send password reset email to {email}")
# Log URL for manual recovery
logger.info(f"Reset URL (email failed): {reset_url}")
else:
logger.warning("Email service not configured")
logger.info(f"Reset URL (no email service): {reset_url}")
except Exception as e:
logger.error(f"Error sending reset email: {e}")
logger.info(f"Reset URL (exception): {reset_url}")
# Always show same message to prevent email enumeration
flash('Jeśli email istnieje w systemie, instrukcje resetowania hasła zostały wysłane.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Password reset error: {e}")
flash('Wystąpił błąd. Spróbuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/forgot_password.html')
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
@limiter.limit("10 per hour")
def reset_password(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
db = SessionLocal()
try:
# Find user with valid token
user = db.query(User).filter(
User.reset_token == token,
User.reset_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link resetowania hasła jest nieprawidłowy lub wygasł.', 'error')
return redirect(url_for('forgot_password'))
if request.method == 'POST':
password = request.form.get('password', '')
password_confirm = request.form.get('password_confirm', '')
# Validate passwords match
if password != password_confirm:
flash('Hasła nie są identyczne.', 'error')
return render_template('auth/reset_password.html', token=token)
# Validate password strength
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/reset_password.html', token=token)
# Update password and clear reset token
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
user.reset_token = None
user.reset_token_expires = None
db.commit()
logger.info(f"Password reset successful for {user.email}")
flash('Hasło zostało zmienione. Możesz się teraz zalogować.', 'success')
return redirect(url_for('login'))
return render_template('auth/reset_password.html', token=token)
except Exception as e:
logger.error(f"Reset password error: {e}")
flash('Wystąpił błąd. Spróbuj ponownie.', 'error')
return redirect(url_for('forgot_password'))
finally:
db.close()
@app.route('/verify-email/<token>')
def verify_email(token):
"""Verify email address with token"""
db = SessionLocal()
try:
user = db.query(User).filter(
User.verification_token == token,
User.verification_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link weryfikacyjny jest nieprawidłowy lub wygasł.', 'error')
return redirect(url_for('login'))
if user.is_verified:
flash('Email został już zweryfikowany.', 'info')
return redirect(url_for('login'))
# Verify user
user.is_verified = True
user.verified_at = datetime.now()
user.verification_token = None
user.verification_token_expires = None
db.commit()
logger.info(f"Email verified for {user.email}")
flash('Email został zweryfikowany! Możesz się teraz zalogować.', 'success')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Email verification error: {e}")
flash('Wystąpił błąd podczas weryfikacji.', 'error')
return redirect(url_for('login'))
finally:
db.close()
@app.route('/resend-verification', methods=['GET', 'POST'])
@limiter.limit("5 per hour")
def resend_verification():
"""Resend email verification link"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidłowy format adresu email.', 'error')
return render_template('auth/resend_verification.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user and not user.is_verified:
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Update user token
user.verification_token = verification_token
user.verification_token_expires = verification_expires
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, user.name, verification_url)
if success:
logger.info(f"Verification email resent to {email}")
else:
logger.warning(f"Failed to resend verification email to {email}")
logger.info(f"Verification URL (email failed): {verification_url}")
else:
logger.warning("Email service not configured")
logger.info(f"Verification URL (no email service): {verification_url}")
except Exception as e:
logger.error(f"Error resending verification email: {e}")
logger.info(f"Verification URL (exception): {verification_url}")
# Always show same message to prevent email enumeration
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Resend verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/resend_verification.html')
# ============================================================
# USER DASHBOARD
# ============================================================
@app.route('/dashboard')
@login_required
def dashboard():
"""User dashboard"""
db = SessionLocal()
try:
# Get user's conversations
conversations = db.query(AIChatConversation).filter_by(
user_id=current_user.id
).order_by(AIChatConversation.updated_at.desc()).limit(10).all()
# Stats
total_conversations = db.query(AIChatConversation).filter_by(user_id=current_user.id).count()
total_messages = db.query(AIChatMessage).join(AIChatConversation).filter(
AIChatConversation.user_id == current_user.id
).count()
return render_template(
'dashboard.html',
conversations=conversations,
total_conversations=total_conversations,
total_messages=total_messages
)
finally:
db.close()
# ============================================================
# AI CHAT ROUTES
# ============================================================
@app.route('/chat')
@login_required
def chat():
"""AI Chat interface"""
return render_template('chat.html')
@app.route('/api/chat/start', methods=['POST'])
@login_required
def chat_start():
"""Start new chat conversation"""
try:
data = request.get_json()
title = data.get('title', f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
chat_engine = NordaBizChatEngine()
conversation = chat_engine.start_conversation(
user_id=current_user.id,
title=title
)
return jsonify({
'success': True,
'conversation_id': conversation.id,
'title': conversation.title
})
except Exception as e:
logger.error(f"Error starting chat: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/chat/<int:conversation_id>/message', methods=['POST'])
@login_required
def chat_send_message(conversation_id):
"""Send message to AI chat"""
try:
data = request.get_json()
message = data.get('message', '').strip()
if not message:
return jsonify({'success': False, 'error': 'Wiadomość nie może być pusta'}), 400
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
chat_engine = NordaBizChatEngine()
response = chat_engine.send_message(
conversation_id=conversation_id,
user_message=message,
user_id=current_user.id
)
# Get free tier usage stats for today
free_tier_stats = get_free_tier_usage()
# Calculate theoretical cost (Gemini 2.0 Flash pricing)
tokens_in = response.tokens_input or 0
tokens_out = response.tokens_output or 0
theoretical_cost = (tokens_in / 1_000_000) * 0.075 + (tokens_out / 1_000_000) * 0.30
return jsonify({
'success': True,
'message': response.content,
'message_id': response.id,
'created_at': response.created_at.isoformat(),
# Technical metadata
'tech_info': {
'model': 'gemini-2.0-flash',
'data_source': 'PostgreSQL (80 firm Norda Biznes)',
'architecture': 'Full DB Context (wszystkie firmy w kontekście AI)',
'tokens_input': tokens_in,
'tokens_output': tokens_out,
'tokens_total': tokens_in + tokens_out,
'latency_ms': response.latency_ms or 0,
'theoretical_cost_usd': round(theoretical_cost, 6),
'actual_cost_usd': 0.0, # Free tier
'free_tier': {
'is_free': True,
'daily_limit': 1500, # Gemini free tier: 1500 req/day
'requests_today': free_tier_stats['requests_today'],
'tokens_today': free_tier_stats['tokens_today'],
'remaining': max(0, 1500 - free_tier_stats['requests_today'])
}
}
})
except Exception as e:
logger.error(f"Error sending message: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/chat/<int:conversation_id>/history', methods=['GET'])
@login_required
def chat_get_history(conversation_id):
"""Get conversation history"""
try:
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
chat_engine = NordaBizChatEngine()
history = chat_engine.get_conversation_history(conversation_id)
return jsonify({
'success': True,
'messages': history
})
except Exception as e:
logger.error(f"Error getting history: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# API ROUTES (for frontend)
# ============================================================
@app.route('/api/companies')
def api_companies():
"""API: Get all companies"""
db = SessionLocal()
try:
companies = db.query(Company).filter_by(status='active').all()
return jsonify({
'success': True,
'companies': [
{
'id': c.id,
'name': c.name,
'category': c.category.name if c.category else None,
'description': c.description_short,
'website': c.website,
'phone': c.phone,
'email': c.email
}
for c in companies
]
})
finally:
db.close()
def _build_seo_audit_response(company, analysis):
"""
Helper function to build SEO audit response JSON.
Used by both /api/seo/audit and /api/seo/audit/<slug> endpoints.
"""
# Build issues list from various checks
issues = []
# Check for images without alt
if analysis.images_without_alt and analysis.images_without_alt > 0:
issues.append({
'severity': 'warning',
'message': f'{analysis.images_without_alt} obrazów nie ma atrybutu alt',
'category': 'accessibility'
})
# Check for missing meta description
if not analysis.meta_description:
issues.append({
'severity': 'warning',
'message': 'Brak meta description',
'category': 'on_page'
})
# Check H1 count (should be exactly 1)
if analysis.h1_count is not None:
if analysis.h1_count == 0:
issues.append({
'severity': 'error',
'message': 'Brak nagłówka H1 na stronie',
'category': 'on_page'
})
elif analysis.h1_count > 1:
issues.append({
'severity': 'warning',
'message': f'Strona zawiera {analysis.h1_count} nagłówków H1 (zalecany: 1)',
'category': 'on_page'
})
# Check SSL
if analysis.has_ssl is False:
issues.append({
'severity': 'error',
'message': 'Strona nie używa HTTPS (brak certyfikatu SSL)',
'category': 'security'
})
# Check robots.txt
if analysis.has_robots_txt is False:
issues.append({
'severity': 'info',
'message': 'Brak pliku robots.txt',
'category': 'technical'
})
# Check sitemap
if analysis.has_sitemap is False:
issues.append({
'severity': 'info',
'message': 'Brak pliku sitemap.xml',
'category': 'technical'
})
# Check indexability
if analysis.is_indexable is False:
issues.append({
'severity': 'error',
'message': f'Strona nie jest indeksowalna: {analysis.noindex_reason or "nieznana przyczyna"}',
'category': 'technical'
})
# Check structured data
if analysis.has_structured_data is False:
issues.append({
'severity': 'info',
'message': 'Brak danych strukturalnych (Schema.org)',
'category': 'on_page'
})
# Check Open Graph tags
if analysis.has_og_tags is False:
issues.append({
'severity': 'info',
'message': 'Brak tagów Open Graph (ważne dla udostępniania w social media)',
'category': 'social'
})
# Check mobile-friendliness
if analysis.is_mobile_friendly is False:
issues.append({
'severity': 'warning',
'message': 'Strona nie jest przyjazna dla urządzeń mobilnych',
'category': 'technical'
})
# Add issues from seo_issues JSONB field if available
if analysis.seo_issues:
stored_issues = analysis.seo_issues if isinstance(analysis.seo_issues, list) else []
for issue in stored_issues:
if isinstance(issue, dict):
issues.append(issue)
# Build response
return {
'success': True,
'company_id': company.id,
'company_name': company.name,
'website': company.website,
'seo_audit': {
'audited_at': analysis.seo_audited_at.isoformat() if analysis.seo_audited_at else None,
'audit_version': analysis.seo_audit_version,
'overall_score': analysis.seo_overall_score,
'pagespeed': {
'seo_score': analysis.pagespeed_seo_score,
'performance_score': analysis.pagespeed_performance_score,
'accessibility_score': analysis.pagespeed_accessibility_score,
'best_practices_score': analysis.pagespeed_best_practices_score
},
'on_page': {
'meta_title': analysis.meta_title,
'meta_description': analysis.meta_description,
'h1_count': analysis.h1_count,
'h1_text': analysis.h1_text,
'h2_count': analysis.h2_count,
'h3_count': analysis.h3_count,
'total_images': analysis.total_images,
'images_without_alt': analysis.images_without_alt,
'images_with_alt': analysis.images_with_alt,
'internal_links_count': analysis.internal_links_count,
'external_links_count': analysis.external_links_count,
'has_structured_data': analysis.has_structured_data,
'structured_data_types': analysis.structured_data_types
},
'technical': {
'has_ssl': analysis.has_ssl,
'ssl_issuer': analysis.ssl_issuer,
'ssl_expires_at': analysis.ssl_expires_at.isoformat() if analysis.ssl_expires_at else None,
'has_sitemap': analysis.has_sitemap,
'has_robots_txt': analysis.has_robots_txt,
'has_canonical': analysis.has_canonical,
'canonical_url': analysis.canonical_url,
'is_indexable': analysis.is_indexable,
'noindex_reason': analysis.noindex_reason,
'is_mobile_friendly': analysis.is_mobile_friendly,
'viewport_configured': analysis.viewport_configured,
'load_time_ms': analysis.load_time_ms,
'http_status_code': analysis.http_status_code
},
'core_web_vitals': {
'largest_contentful_paint_ms': analysis.largest_contentful_paint_ms,
'first_input_delay_ms': analysis.first_input_delay_ms,
'cumulative_layout_shift': float(analysis.cumulative_layout_shift) if analysis.cumulative_layout_shift else None
},
'social': {
'has_og_tags': analysis.has_og_tags,
'og_title': analysis.og_title,
'og_description': analysis.og_description,
'og_image': analysis.og_image,
'has_twitter_cards': analysis.has_twitter_cards
},
'language': {
'html_lang': analysis.html_lang,
'has_hreflang': analysis.has_hreflang
},
'issues': issues
}
}
def _get_seo_audit_for_company(db, company):
"""
Helper function to get SEO audit data for a company.
Returns tuple of (response_dict, status_code) or (None, None) if audit exists.
"""
# Get latest SEO audit for this company
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
company_id=company.id
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
if not analysis:
return {
'success': True,
'company_id': company.id,
'company_name': company.name,
'website': company.website,
'seo_audit': None,
'message': 'Brak danych SEO dla tej firmy. Audyt nie został jeszcze przeprowadzony.'
}, 200
# Check if SEO audit was performed (seo_audited_at is set)
if not analysis.seo_audited_at:
return {
'success': True,
'company_id': company.id,
'company_name': company.name,
'website': company.website,
'seo_audit': None,
'message': 'Audyt SEO nie został jeszcze przeprowadzony dla tej firmy.'
}, 200
# Build full response
return _build_seo_audit_response(company, analysis), 200
@app.route('/api/seo/audit')
def api_seo_audit():
"""
API: Get SEO audit results for a company.
Query parameters:
- company_id: Company ID (integer)
- slug: Company slug (string)
At least one of company_id or slug must be provided.
Returns JSON with:
- pagespeed scores (seo, performance, accessibility, best_practices)
- on_page metrics (meta tags, headings, images, links, structured data)
- technical checks (ssl, sitemap, robots.txt, mobile-friendly)
- issues list with severity levels
"""
company_id = request.args.get('company_id', type=int)
slug = request.args.get('slug', type=str)
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
response, status_code = _get_seo_audit_for_company(db, company)
return jsonify(response), status_code
finally:
db.close()
@app.route('/api/seo/audit/<slug>')
def api_seo_audit_by_slug(slug):
"""
API: Get SEO audit results for a company by slug.
Convenience endpoint that uses slug from URL path.
Example: GET /api/seo/audit/pixlab-sp-z-o-o
"""
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
response, status_code = _get_seo_audit_for_company(db, company)
return jsonify(response), status_code
finally:
db.close()
@app.route('/api/seo/audit', methods=['POST'])
@login_required
@limiter.limit("10 per hour")
def api_seo_audit_trigger():
"""
API: Trigger SEO audit for a company (admin-only).
This endpoint runs a full SEO audit including:
- Google PageSpeed Insights analysis
- On-page SEO analysis (meta tags, headings, images, links)
- Technical SEO checks (robots.txt, sitemap, canonical URLs)
Request JSON body:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
Returns:
- Success: Full SEO audit results saved to database
- Error: Error message with status code
Rate limited to 10 requests per hour per user to prevent API abuse.
"""
# Admin-only check
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Tylko administrator może uruchamiać audyty SEO.'
}), 403
# Check if SEO audit service is available
if not SEO_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu SEO jest niedostępna. Sprawdź konfigurację serwera.'
}), 503
# Parse request data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
}), 400
company_id = data.get('company_id')
slug = data.get('slug')
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Check if company has a website
if not company.website:
return jsonify({
'success': False,
'error': f'Firma "{company.name}" nie ma zdefiniowanej strony internetowej.',
'company_id': company.id,
'company_name': company.name
}), 400
logger.info(f"SEO audit triggered by admin {current_user.email} for company: {company.name} (ID: {company.id})")
# Initialize SEO auditor and run audit
try:
auditor = SEOAuditor()
# Prepare company dict for auditor
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city
}
# Run the audit
audit_result = auditor.audit_company(company_dict)
# Check for errors
if audit_result.get('errors') and not audit_result.get('onpage') and not audit_result.get('pagespeed'):
return jsonify({
'success': False,
'error': f'Audyt nie powiódł się: {", ".join(audit_result["errors"])}',
'company_id': company.id,
'company_name': company.name,
'website': company.website
}), 422
# Save result to database
saved = auditor.save_audit_result(audit_result)
if not saved:
return jsonify({
'success': False,
'error': 'Audyt został wykonany, ale nie udało się zapisać wyników do bazy danych.',
'company_id': company.id,
'company_name': company.name
}), 500
# Get the updated analysis record to return
db.expire_all() # Refresh the session to get updated data
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
company_id=company.id
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
# Build response using the existing helper function
response = _build_seo_audit_response(company, analysis)
return jsonify({
'success': True,
'message': f'Audyt SEO dla firmy "{company.name}" został zakończony pomyślnie.',
'audit_version': SEO_AUDIT_VERSION,
'triggered_by': current_user.email,
'triggered_at': datetime.now().isoformat(),
**response
}), 200
except Exception as e:
logger.error(f"SEO audit error for company {company.id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas wykonywania audytu: {str(e)}',
'company_id': company.id,
'company_name': company.name
}), 500
finally:
db.close()
# ============================================================
# SEO ADMIN DASHBOARD
# ============================================================
@app.route('/admin/seo')
@login_required
def admin_seo():
"""
Admin dashboard for SEO metrics overview.
Displays:
- Summary stats (score distribution, average score)
- Sortable table of all companies with SEO scores
- Color-coded score badges (green 90-100, yellow 50-89, red 0-49)
- Filtering by category, score range, and search text
- Last audit date with staleness indicator
- Actions: view profile, trigger single company audit
Query Parameters:
- company: Slug of company to highlight/filter (optional)
"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
# Get optional company filter from URL
filter_company_slug = request.args.get('company', '')
db = SessionLocal()
try:
from sqlalchemy import func
# Get all active companies with their latest SEO analysis data
# Using outerjoin to include companies without SEO data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
CompanyWebsiteAnalysis.pagespeed_seo_score,
CompanyWebsiteAnalysis.pagespeed_performance_score,
CompanyWebsiteAnalysis.pagespeed_accessibility_score,
CompanyWebsiteAnalysis.pagespeed_best_practices_score,
CompanyWebsiteAnalysis.seo_audited_at
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
CompanyWebsiteAnalysis,
Company.id == CompanyWebsiteAnalysis.company_id
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'seo_score': row.pagespeed_seo_score,
'performance_score': row.pagespeed_performance_score,
'accessibility_score': row.pagespeed_accessibility_score,
'best_practices_score': row.pagespeed_best_practices_score,
'seo_audited_at': row.seo_audited_at
})
# Calculate statistics
audited_companies = [c for c in companies if c['seo_score'] is not None]
not_audited = [c for c in companies if c['seo_score'] is None]
good_count = len([c for c in audited_companies if c['seo_score'] >= 90])
medium_count = len([c for c in audited_companies if 50 <= c['seo_score'] < 90])
poor_count = len([c for c in audited_companies if c['seo_score'] < 50])
not_audited_count = len(not_audited)
# Calculate average score (only for audited companies)
if audited_companies:
avg_score = round(sum(c['seo_score'] for c in audited_companies) / len(audited_companies))
else:
avg_score = None
stats = {
'good_count': good_count,
'medium_count': medium_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_score': avg_score
}
# Get unique categories for filter dropdown
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin_seo_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now(),
filter_company=filter_company_slug
)
finally:
db.close()
# ============================================================
# GBP AUDIT ADMIN DASHBOARD
# ============================================================
@app.route('/admin/gbp-audit')
@login_required
def admin_gbp_audit():
"""
Admin dashboard for GBP (Google Business Profile) audit overview.
Displays:
- Summary stats (completeness score distribution, field coverage)
- Sortable table of all companies with GBP audit data
- Review metrics (avg rating, review counts)
- Photo statistics
"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, distinct
from database import GBPAudit, Category
# Subquery to get latest audit for each company
latest_audit_subq = db.query(
GBPAudit.company_id,
func.max(GBPAudit.audit_date).label('max_date')
).group_by(GBPAudit.company_id).subquery()
# Get all companies with their latest GBP audit data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
GBPAudit.completeness_score,
GBPAudit.average_rating,
GBPAudit.review_count,
GBPAudit.photo_count,
GBPAudit.has_name,
GBPAudit.has_address,
GBPAudit.has_phone,
GBPAudit.has_website,
GBPAudit.has_hours,
GBPAudit.has_categories,
GBPAudit.has_photos,
GBPAudit.has_description,
GBPAudit.has_services,
GBPAudit.has_reviews,
GBPAudit.audit_date
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
latest_audit_subq,
Company.id == latest_audit_subq.c.company_id
).outerjoin(
GBPAudit,
(Company.id == GBPAudit.company_id) &
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
).filter(
Company.status == 'active'
).order_by(Company.name).all()
# Build companies list
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'completeness_score': row.completeness_score,
'average_rating': float(row.average_rating) if row.average_rating else None,
'review_count': row.review_count or 0,
'photo_count': row.photo_count or 0,
'has_name': row.has_name,
'has_address': row.has_address,
'has_phone': row.has_phone,
'has_website': row.has_website,
'has_hours': row.has_hours,
'has_categories': row.has_categories,
'has_photos': row.has_photos,
'has_description': row.has_description,
'has_services': row.has_services,
'has_reviews': row.has_reviews,
'audit_date': row.audit_date
})
# Calculate statistics
total_companies = len(companies)
audited = [c for c in companies if c['completeness_score'] is not None]
not_audited = [c for c in companies if c['completeness_score'] is None]
# Score distribution
excellent_count = len([c for c in audited if c['completeness_score'] >= 90])
good_count = len([c for c in audited if 70 <= c['completeness_score'] < 90])
poor_count = len([c for c in audited if c['completeness_score'] < 70])
not_audited_count = len(not_audited)
# Average completeness
avg_completeness = round(sum(c['completeness_score'] for c in audited) / len(audited)) if audited else None
# Average rating (only for companies with reviews)
companies_with_rating = [c for c in audited if c['average_rating']]
avg_rating = round(sum(c['average_rating'] for c in companies_with_rating) / len(companies_with_rating), 1) if companies_with_rating else None
# Total reviews
total_reviews = sum(c['review_count'] for c in companies)
# Field coverage stats (percentage of audited companies with each field)
if audited:
field_coverage = {
'name': round(len([c for c in audited if c['has_name']]) / len(audited) * 100),
'address': round(len([c for c in audited if c['has_address']]) / len(audited) * 100),
'phone': round(len([c for c in audited if c['has_phone']]) / len(audited) * 100),
'website': round(len([c for c in audited if c['has_website']]) / len(audited) * 100),
'hours': round(len([c for c in audited if c['has_hours']]) / len(audited) * 100),
'categories': round(len([c for c in audited if c['has_categories']]) / len(audited) * 100),
'photos': round(len([c for c in audited if c['has_photos']]) / len(audited) * 100),
'description': round(len([c for c in audited if c['has_description']]) / len(audited) * 100),
'services': round(len([c for c in audited if c['has_services']]) / len(audited) * 100),
'reviews': round(len([c for c in audited if c['has_reviews']]) / len(audited) * 100),
}
else:
field_coverage = {k: 0 for k in ['name', 'address', 'phone', 'website', 'hours', 'categories', 'photos', 'description', 'services', 'reviews']}
stats = {
'total_companies': total_companies,
'audited_count': len(audited),
'excellent_count': excellent_count,
'good_count': good_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_completeness': avg_completeness,
'avg_rating': avg_rating,
'total_reviews': total_reviews,
'field_coverage': field_coverage
}
# Get unique categories
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert to objects for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin/gbp_audit_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# GBP (GOOGLE BUSINESS PROFILE) AUDIT API
# ============================================================
@app.route('/api/gbp/audit/health')
def api_gbp_audit_health():
"""
API: Health check for GBP audit service.
Returns service status and version information.
Used by monitoring systems to verify service availability.
"""
if GBP_AUDIT_AVAILABLE:
return jsonify({
'status': 'ok',
'service': 'gbp_audit',
'version': GBP_AUDIT_VERSION,
'available': True
}), 200
else:
return jsonify({
'status': 'unavailable',
'service': 'gbp_audit',
'available': False,
'error': 'GBP audit service not loaded'
}), 503
@app.route('/api/gbp/audit', methods=['GET'])
def api_gbp_audit_get():
"""
API: Get GBP audit results for a company.
Query parameters:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
Returns:
- Latest audit results with completeness score and recommendations
- 404 if company not found
- 404 if no audit exists for the company
Example: GET /api/gbp/audit?company_id=26
Example: GET /api/gbp/audit?slug=pixlab-sp-z-o-o
"""
if not GBP_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu GBP jest niedostępna.'
}), 503
company_id = request.args.get('company_id', type=int)
slug = request.args.get('slug')
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy.'
}), 400
db = SessionLocal()
try:
# Find company
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Get latest audit
audit = gbp_get_company_audit(db, company.id)
if not audit:
return jsonify({
'success': False,
'error': f'Brak wyników audytu GBP dla firmy "{company.name}". Uruchom audyt używając POST /api/gbp/audit.',
'company_id': company.id,
'company_name': company.name
}), 404
# Build response
return jsonify({
'success': True,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'completeness_score': audit.completeness_score,
'score_category': audit.score_category,
'fields_status': audit.fields_status,
'recommendations': audit.recommendations,
'has_name': audit.has_name,
'has_address': audit.has_address,
'has_phone': audit.has_phone,
'has_website': audit.has_website,
'has_hours': audit.has_hours,
'has_categories': audit.has_categories,
'has_photos': audit.has_photos,
'has_description': audit.has_description,
'has_services': audit.has_services,
'has_reviews': audit.has_reviews,
'photo_count': audit.photo_count,
'review_count': audit.review_count,
'average_rating': float(audit.average_rating) if audit.average_rating else None,
'google_place_id': audit.google_place_id,
'audit_source': audit.audit_source,
'audit_version': audit.audit_version
}
}), 200
except Exception as e:
logger.error(f"Error fetching GBP audit: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania audytu: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/gbp/audit/<slug>')
def api_gbp_audit_by_slug(slug):
"""
API: Get GBP audit results for a company by slug.
Convenience endpoint that uses slug from URL path.
Example: GET /api/gbp/audit/pixlab-sp-z-o-o
"""
if not GBP_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu GBP jest niedostępna.'
}), 503
db = SessionLocal()
try:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': f'Firma o slug "{slug}" nie znaleziona.'
}), 404
audit = gbp_get_company_audit(db, company.id)
if not audit:
return jsonify({
'success': False,
'error': f'Brak wyników audytu GBP dla firmy "{company.name}".',
'company_id': company.id,
'company_name': company.name
}), 404
return jsonify({
'success': True,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'completeness_score': audit.completeness_score,
'score_category': audit.score_category,
'fields_status': audit.fields_status,
'recommendations': audit.recommendations,
'photo_count': audit.photo_count,
'review_count': audit.review_count,
'average_rating': float(audit.average_rating) if audit.average_rating else None
}
}), 200
finally:
db.close()
@app.route('/api/gbp/audit', methods=['POST'])
@login_required
@limiter.limit("20 per hour")
def api_gbp_audit_trigger():
"""
API: Run GBP audit for a company.
This endpoint runs a completeness audit for Google Business Profile data,
checking fields like name, address, phone, website, hours, categories,
photos, description, services, and reviews.
Request JSON body:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
- save: Whether to save results to database (default: true)
Returns:
- Success: Audit results with completeness score and recommendations
- Error: Error message with status code
Access:
- Members can audit their own company
- Admins can audit any company
Rate limited to 20 requests per hour per user.
"""
if not GBP_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu GBP jest niedostępna. Sprawdź konfigurację serwera.'
}), 503
# Parse request data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
}), 400
company_id = data.get('company_id')
slug = data.get('slug')
save_result = data.get('save', True)
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Check access: admin can audit any company, member only their own
if not current_user.is_admin:
# Check if user is associated with this company
if current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Możesz audytować tylko własną firmę.'
}), 403
logger.info(f"GBP audit triggered by {current_user.email} for company: {company.name} (ID: {company.id})")
# Option to fetch fresh Google data before audit
fetch_google = data.get('fetch_google', True)
force_refresh = data.get('force_refresh', False)
try:
# Step 1: Fetch fresh Google Business data (if enabled)
fetch_result = None
if fetch_google:
logger.info(f"Fetching Google Business data for company {company.id}...")
fetch_result = gbp_fetch_google_data(db, company.id, force_refresh=force_refresh)
if not fetch_result.get('success') and not fetch_result.get('data', {}).get('cached'):
# Log warning but continue with audit
logger.warning(f"Google fetch warning for company {company.id}: {fetch_result.get('error')}")
# Step 2: Run the audit
result = gbp_audit_company(db, company.id, save=save_result)
# Build field status for response
fields_response = {}
for field_name, field_status in result.fields.items():
fields_response[field_name] = {
'status': field_status.status,
'value': str(field_status.value) if field_status.value is not None else None,
'score': field_status.score,
'max_score': field_status.max_score,
'recommendation': field_status.recommendation
}
# Determine score category
score = result.completeness_score
if score >= 90:
score_category = 'excellent'
elif score >= 70:
score_category = 'good'
elif score >= 50:
score_category = 'needs_work'
else:
score_category = 'poor'
response_data = {
'success': True,
'message': f'Audyt GBP dla firmy "{company.name}" został zakończony pomyślnie.',
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit_version': GBP_AUDIT_VERSION,
'triggered_by': current_user.email,
'triggered_at': datetime.now().isoformat(),
'saved': save_result,
'audit': {
'completeness_score': result.completeness_score,
'score_category': score_category,
'fields_status': fields_response,
'recommendations': result.recommendations,
'photo_count': result.photo_count,
'logo_present': result.logo_present,
'cover_photo_present': result.cover_photo_present,
'review_count': result.review_count,
'average_rating': float(result.average_rating) if result.average_rating else None,
'google_place_id': result.google_place_id
}
}
# Include Google fetch results if performed
if fetch_result:
response_data['google_fetch'] = {
'success': fetch_result.get('success', False),
'steps': fetch_result.get('steps', []),
'data': fetch_result.get('data', {}),
'error': fetch_result.get('error')
}
return jsonify(response_data), 200
except ValueError as e:
return jsonify({
'success': False,
'error': str(e),
'company_id': company.id if company else None
}), 400
except Exception as e:
logger.error(f"GBP audit error for company {company.id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas wykonywania audytu: {str(e)}',
'company_id': company.id,
'company_name': company.name
}), 500
finally:
db.close()
# ============================================================
# SEO AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/seo/<slug>')
@login_required
def seo_audit_dashboard(slug):
"""
User-facing SEO audit dashboard for a specific company.
Displays SEO audit results with:
- PageSpeed Insights scores (SEO, Performance, Accessibility, Best Practices)
- Website analysis data
- Improvement recommendations
Access control:
- Admin users can view audit for any company
- Regular users can only view audit for their own company
Args:
slug: Company slug identifier
Returns:
Rendered seo_audit.html template with company and audit data
"""
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can view any company, member only their own
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest SEO analysis for this company
analysis = db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company.id
).order_by(CompanyWebsiteAnalysis.seo_audited_at.desc()).first()
# Build SEO data dict if analysis exists
seo_data = None
if analysis and analysis.seo_audited_at:
seo_data = {
'seo_score': analysis.pagespeed_seo_score,
'performance_score': analysis.pagespeed_performance_score,
'accessibility_score': analysis.pagespeed_accessibility_score,
'best_practices_score': analysis.pagespeed_best_practices_score,
'audited_at': analysis.seo_audited_at,
'audit_version': analysis.seo_audit_version,
'url': analysis.website_url
}
# Determine if user can run audit (admin or company owner)
can_audit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"SEO audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('seo_audit.html',
company=company,
seo_data=seo_data,
can_audit=can_audit
)
finally:
db.close()
# ============================================================
# SOCIAL MEDIA AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/social/<slug>')
@login_required
def social_audit_dashboard(slug):
"""
User-facing Social Media audit dashboard for a specific company.
Displays social media presence audit with:
- Overall presence score (platforms found / total platforms)
- Platform-by-platform status
- Profile validation status
- Recommendations for missing platforms
Access control:
- Admins: Can view all companies
- Regular users: Can only view their own company
Args:
slug: Company URL slug
Returns:
Rendered social_audit.html template with company and social data
"""
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control - admin can view all, users only their company
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień do wyświetlenia audytu social media tej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get social media profiles for this company
social_profiles = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company.id
).all()
# Define all platforms we track
all_platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
# Build social media data
profiles_dict = {}
for profile in social_profiles:
profiles_dict[profile.platform] = {
'url': profile.url,
'is_valid': profile.is_valid,
'check_status': profile.check_status,
'page_name': profile.page_name,
'followers_count': profile.followers_count,
'verified_at': profile.verified_at,
'last_checked_at': profile.last_checked_at
}
# Calculate score (platforms with profiles / total platforms)
platforms_with_profiles = len([p for p in all_platforms if p in profiles_dict])
total_platforms = len(all_platforms)
score = int((platforms_with_profiles / total_platforms) * 100) if total_platforms > 0 else 0
social_data = {
'profiles': profiles_dict,
'all_platforms': all_platforms,
'platforms_count': platforms_with_profiles,
'total_platforms': total_platforms,
'score': score
}
# Determine if user can run audit (admin or company owner)
can_audit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"Social Media audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('social_audit.html',
company=company,
social_data=social_data,
can_audit=can_audit
)
finally:
db.close()
@app.route('/api/social/audit', methods=['POST'])
@login_required
@limiter.limit("10 per hour")
def api_social_audit_trigger():
"""
API: Trigger Social Media audit for a company.
This endpoint performs a comprehensive social media audit:
- Scans company website for social media links
- Searches for profiles via Brave Search API (if configured)
- Fetches Google Business Profile data
- Updates database with discovered profiles
Request JSON body:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
Returns:
- Success: Updated social media audit results
- Error: Error message with status code
Rate limited to 10 requests per hour per user.
"""
# Import the SocialMediaAuditor from scripts
try:
import sys
from pathlib import Path
scripts_dir = Path(__file__).parent / 'scripts'
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
from social_media_audit import SocialMediaAuditor
except ImportError as e:
logger.error(f"Failed to import SocialMediaAuditor: {e}")
return jsonify({
'success': False,
'error': 'Usługa audytu Social Media jest niedostępna. Sprawdź konfigurację serwera.'
}), 503
# Parse request data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
}), 400
company_id = data.get('company_id')
slug = data.get('slug')
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control - admin can audit all, users only their company
if not current_user.is_admin:
if current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Brak uprawnień do audytu social media tej firmy.'
}), 403
logger.info(f"Social Media audit triggered by {current_user.email} for company: {company.name} (ID: {company.id})")
# Prepare company dict for auditor
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or 'Wejherowo'
}
# Initialize auditor and run audit
try:
auditor = SocialMediaAuditor()
audit_result = auditor.audit_company(company_dict)
# Check for errors
if audit_result.get('errors') and not audit_result.get('social_media') and not audit_result.get('website'):
return jsonify({
'success': False,
'error': f'Audyt nie powiódł się: {", ".join(audit_result["errors"][:3])}',
'company_id': company.id,
'company_name': company.name
}), 422
# Save result to database
saved = auditor.save_audit_result(audit_result)
if not saved:
return jsonify({
'success': False,
'error': 'Audyt został wykonany, ale nie udało się zapisać wyników do bazy danych.',
'company_id': company.id,
'company_name': company.name
}), 500
# Get count of social media profiles found
social_media_found = audit_result.get('social_media', {})
platforms_count = len(social_media_found)
# Calculate score
all_platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
score = int((platforms_count / len(all_platforms)) * 100)
return jsonify({
'success': True,
'message': f'Audyt Social Media zakończony. Znaleziono {platforms_count} profili.',
'company_id': company.id,
'company_name': company.name,
'profiles_found': platforms_count,
'platforms': list(social_media_found.keys()),
'score': score,
'google_reviews': audit_result.get('google_reviews', {}),
'errors': audit_result.get('errors') if audit_result.get('errors') else None
}), 200
except Exception as e:
logger.error(f"Social Media audit error for company {company.id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas audytu: {str(e)}'
}), 500
except Exception as e:
logger.error(f"Social Media audit error for company {slug or company_id}: {e}")
db.rollback()
return jsonify({
'success': False,
'error': f'Błąd podczas audytu: {str(e)}'
}), 500
finally:
db.close()
# ============================================================
# GBP AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/gbp/<slug>')
@login_required
def gbp_audit_dashboard(slug):
"""
User-facing GBP audit dashboard for a specific company.
Displays Google Business Profile completeness audit results with:
- Overall completeness score (0-100)
- Field-by-field status breakdown
- AI-generated improvement recommendations
- Historical audit data
Access control:
- Admin users can view audit for any company
- Regular users can only view audit for their own company
Args:
slug: Company slug identifier
Returns:
Rendered gbp_audit.html template with company and audit data
"""
if not GBP_AUDIT_AVAILABLE:
flash('Usługa audytu Google Business Profile jest tymczasowo niedostępna.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can view any company, member only their own
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest audit for this company
audit = gbp_get_company_audit(db, company.id)
# If no audit exists, we still render the page (template handles this)
# The user can trigger an audit from the dashboard
# Determine if user can run audit (admin or company owner)
can_audit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"GBP audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('gbp_audit.html',
company=company,
audit=audit,
can_audit=can_audit,
gbp_audit_available=GBP_AUDIT_AVAILABLE,
gbp_audit_version=GBP_AUDIT_VERSION
)
finally:
db.close()
# ============================================================
# IT AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/it/<slug>')
@login_required
def it_audit_dashboard(slug):
"""
User-facing IT infrastructure audit dashboard for a specific company.
Displays IT audit results with:
- Overall score and maturity level
- Security, collaboration, and completeness sub-scores
- Technology stack summary (Azure AD, M365, backup, monitoring)
- AI-generated recommendations
Access control:
- Admin users can view audit for any company
- Regular users can only view audit for their own company
Args:
slug: Company slug identifier
Returns:
Rendered it_audit.html template with company and audit data
"""
db = SessionLocal()
try:
# Import IT audit models
from database import ITAudit
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can view any company, member only their own
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest IT audit for this company
audit = db.query(ITAudit).filter(
ITAudit.company_id == company.id
).order_by(ITAudit.audit_date.desc()).first()
# Build audit data dict if audit exists
audit_data = None
if audit:
# Get maturity label
maturity_labels = {
'basic': 'Podstawowy',
'developing': 'Rozwijający się',
'established': 'Ugruntowany',
'advanced': 'Zaawansowany'
}
audit_data = {
'id': audit.id,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'maturity_label': maturity_labels.get(audit.maturity_level, 'Nieznany'),
'audit_date': audit.audit_date,
'audit_source': audit.audit_source,
# Technology flags
'has_azure_ad': audit.has_azure_ad,
'has_m365': audit.has_m365,
'has_google_workspace': audit.has_google_workspace,
'has_local_ad': audit.has_local_ad,
'has_edr': audit.has_edr,
'has_mfa': audit.has_mfa,
'has_vpn': audit.has_vpn,
'has_proxmox_pbs': audit.has_proxmox_pbs,
'has_dr_plan': audit.has_dr_plan,
'has_mdm': audit.has_mdm,
# Solutions
'antivirus_solution': audit.antivirus_solution,
'backup_solution': audit.backup_solution,
'monitoring_solution': audit.monitoring_solution,
'virtualization_platform': audit.virtualization_platform,
# Collaboration flags
'open_to_shared_licensing': audit.open_to_shared_licensing,
'open_to_backup_replication': audit.open_to_backup_replication,
'open_to_teams_federation': audit.open_to_teams_federation,
'open_to_shared_monitoring': audit.open_to_shared_monitoring,
'open_to_collective_purchasing': audit.open_to_collective_purchasing,
'open_to_knowledge_sharing': audit.open_to_knowledge_sharing,
# Recommendations
'recommendations': audit.recommendations
}
# Determine if user can edit audit (admin or company owner)
can_edit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"IT audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('it_audit.html',
company=company,
audit_data=audit_data,
can_edit=can_edit
)
finally:
db.close()
@app.route('/api/check-email', methods=['POST'])
def api_check_email():
"""API: Check if email is available"""
data = request.get_json()
email = data.get('email', '').strip().lower()
# Validate email format
if not email or not validate_email(email):
return jsonify({
'available': False,
'error': 'Nieprawidłowy format email'
}), 400
db = SessionLocal()
try:
# Check if email exists
existing_user = db.query(User).filter_by(email=email).first()
return jsonify({
'available': existing_user is None,
'email': email
})
finally:
db.close()
@app.route('/api/verify-nip', methods=['POST'])
def api_verify_nip():
"""API: Verify NIP and check if company is NORDA member"""
data = request.get_json()
nip = data.get('nip', '').strip()
# Validate NIP format
if not nip or not re.match(r'^\d{10}$', nip):
return jsonify({
'success': False,
'error': 'Nieprawidłowy format NIP'
}), 400
db = SessionLocal()
try:
# Check if NIP exists in companies database
company = db.query(Company).filter_by(nip=nip, status='active').first()
if company:
return jsonify({
'success': True,
'is_member': True,
'company_name': company.name,
'company_id': company.id
})
else:
return jsonify({
'success': True,
'is_member': False,
'company_name': None,
'company_id': None
})
finally:
db.close()
@app.route('/api/verify-krs', methods=['GET', 'POST'])
def api_verify_krs():
"""
API: Verify company data from KRS Open API (prs.ms.gov.pl).
GET /api/verify-krs?krs=0000817317
POST /api/verify-krs with JSON body: {"krs": "0000817317"}
Returns official KRS data including:
- Company name, NIP, REGON
- Address
- Capital
- Registration date
- Management board (anonymized in Open API)
- Shareholders (anonymized in Open API)
"""
# Get KRS from query params (GET) or JSON body (POST)
if request.method == 'GET':
krs = request.args.get('krs', '').strip()
else:
data = request.get_json(silent=True) or {}
krs = data.get('krs', '').strip()
# Validate KRS format (7-10 digits)
if not krs or not re.match(r'^\d{7,10}$', krs):
return jsonify({
'success': False,
'error': 'Nieprawidłowy format KRS (wymagane 7-10 cyfr)'
}), 400
# Normalize to 10 digits
krs_normalized = krs.zfill(10)
try:
# Fetch data from KRS Open API
krs_data = krs_api_service.get_company_from_krs(krs_normalized)
if krs_data is None:
return jsonify({
'success': False,
'error': f'Nie znaleziono podmiotu o KRS {krs_normalized} w rejestrze',
'krs': krs_normalized
}), 404
# Check if company exists in our database
db = SessionLocal()
try:
our_company = db.query(Company).filter_by(krs=krs_normalized).first()
is_member = our_company is not None
company_id = our_company.id if our_company else None
finally:
db.close()
return jsonify({
'success': True,
'krs': krs_normalized,
'is_norda_member': is_member,
'company_id': company_id,
'data': krs_data.to_dict(),
'formatted_address': krs_api_service.format_address(krs_data),
'source': 'KRS Open API (prs.ms.gov.pl)',
'note': 'Dane osobowe (imiona, nazwiska) są zanonimizowane w Open API'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania danych z KRS: {str(e)}'
}), 500
@app.route('/api/company/<int:company_id>/refresh-krs', methods=['POST'])
@login_required
def api_refresh_company_krs(company_id):
"""
API: Refresh company data from KRS Open API.
Updates company record with official KRS data.
Requires login.
"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
if not company.krs:
return jsonify({
'success': False,
'error': 'Firma nie ma numeru KRS'
}), 400
# Fetch data from KRS
krs_data = krs_api_service.get_company_from_krs(company.krs)
if krs_data is None:
return jsonify({
'success': False,
'error': f'Nie znaleziono podmiotu o KRS {company.krs} w rejestrze'
}), 404
# Update company data (only non-personal data)
updates = {}
if krs_data.nip and krs_data.nip != company.nip:
updates['nip'] = krs_data.nip
company.nip = krs_data.nip
if krs_data.regon:
regon_9 = krs_data.regon[:9]
if regon_9 != company.regon:
updates['regon'] = regon_9
company.regon = regon_9
# Update address if significantly different
new_address = krs_api_service.format_address(krs_data)
if new_address and new_address != company.address:
updates['address'] = new_address
company.address = new_address
if krs_data.miejscowosc and krs_data.miejscowosc != company.city:
updates['city'] = krs_data.miejscowosc
company.city = krs_data.miejscowosc
if krs_data.kapital_zakladowy:
updates['kapital_zakladowy'] = krs_data.kapital_zakladowy
# Note: Might need to add this field to Company model
# Update verification timestamp
company.krs_verified_at = datetime.utcnow()
db.commit()
return jsonify({
'success': True,
'company_id': company_id,
'updates': updates,
'krs_data': krs_data.to_dict(),
'message': f'Zaktualizowano {len(updates)} pól' if updates else 'Dane są aktualne'
})
except Exception as e:
db.rollback()
return jsonify({
'success': False,
'error': f'Błąd podczas aktualizacji: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/model-info', methods=['GET'])
def api_model_info():
"""API: Get current AI model information"""
service = gemini_service.get_gemini_service()
if service:
return jsonify({
'success': True,
'model': service.model_name,
'provider': 'Google Gemini'
})
else:
return jsonify({
'success': False,
'error': 'AI service not initialized'
}), 500
# ============================================================
# AI CHAT FEEDBACK & ANALYTICS
# ============================================================
@app.route('/api/chat/feedback', methods=['POST'])
@login_required
def chat_feedback():
"""API: Submit feedback for AI response"""
try:
data = request.get_json()
message_id = data.get('message_id')
rating = data.get('rating') # 1 = thumbs down, 2 = thumbs up
if not message_id or rating not in [1, 2]:
return jsonify({'success': False, 'error': 'Invalid data'}), 400
db = SessionLocal()
try:
# Verify message exists and belongs to user's conversation
message = db.query(AIChatMessage).filter_by(id=message_id).first()
if not message:
return jsonify({'success': False, 'error': 'Message not found'}), 404
conversation = db.query(AIChatConversation).filter_by(
id=message.conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
# Update message feedback
message.feedback_rating = rating
message.feedback_at = datetime.now()
message.feedback_comment = data.get('comment', '')
# Create detailed feedback record if provided
if data.get('is_helpful') is not None or data.get('comment'):
existing_feedback = db.query(AIChatFeedback).filter_by(message_id=message_id).first()
if existing_feedback:
existing_feedback.rating = rating
existing_feedback.is_helpful = data.get('is_helpful')
existing_feedback.is_accurate = data.get('is_accurate')
existing_feedback.found_company = data.get('found_company')
existing_feedback.comment = data.get('comment')
else:
feedback = AIChatFeedback(
message_id=message_id,
user_id=current_user.id,
rating=rating,
is_helpful=data.get('is_helpful'),
is_accurate=data.get('is_accurate'),
found_company=data.get('found_company'),
comment=data.get('comment'),
original_query=data.get('original_query'),
expected_companies=data.get('expected_companies')
)
db.add(feedback)
db.commit()
logger.info(f"Feedback received: message_id={message_id}, rating={rating}")
return jsonify({'success': True})
finally:
db.close()
except Exception as e:
logger.error(f"Error saving feedback: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/admin/chat-analytics')
@login_required
def chat_analytics():
"""Admin dashboard for chat analytics"""
# Only admins can access
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Basic stats
total_conversations = db.query(AIChatConversation).count()
total_messages = db.query(AIChatMessage).count()
total_user_messages = db.query(AIChatMessage).filter_by(role='user').count()
# Feedback stats
feedback_count = db.query(AIChatMessage).filter(AIChatMessage.feedback_rating.isnot(None)).count()
positive_feedback = db.query(AIChatMessage).filter_by(feedback_rating=2).count()
negative_feedback = db.query(AIChatMessage).filter_by(feedback_rating=1).count()
# Recent conversations with feedback
recent_feedback = db.query(AIChatMessage).filter(
AIChatMessage.feedback_rating.isnot(None)
).order_by(desc(AIChatMessage.feedback_at)).limit(20).all()
# Popular queries (user messages)
recent_queries = db.query(AIChatMessage).filter_by(role='user').order_by(
desc(AIChatMessage.created_at)
).limit(50).all()
# Calculate satisfaction rate
satisfaction_rate = (positive_feedback / feedback_count * 100) if feedback_count > 0 else 0
return render_template(
'admin/chat_analytics.html',
total_conversations=total_conversations,
total_messages=total_messages,
total_user_messages=total_user_messages,
feedback_count=feedback_count,
positive_feedback=positive_feedback,
negative_feedback=negative_feedback,
satisfaction_rate=round(satisfaction_rate, 1),
recent_feedback=recent_feedback,
recent_queries=recent_queries
)
finally:
db.close()
@app.route('/api/admin/chat-stats')
@login_required
def api_chat_stats():
"""API: Get chat statistics for dashboard"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
db = SessionLocal()
try:
from sqlalchemy import func, desc
from datetime import timedelta
# Stats for last 7 days
week_ago = datetime.now() - timedelta(days=7)
daily_stats = db.query(
func.date(AIChatMessage.created_at).label('date'),
func.count(AIChatMessage.id).label('count')
).filter(
AIChatMessage.created_at >= week_ago,
AIChatMessage.role == 'user'
).group_by(
func.date(AIChatMessage.created_at)
).order_by('date').all()
return jsonify({
'success': True,
'daily_queries': [{'date': str(d.date), 'count': d.count} for d in daily_stats]
})
finally:
db.close()
# ============================================================
# DEBUG PANEL (Admin only)
# ============================================================
@app.route('/admin/debug')
@login_required
def debug_panel():
"""Real-time debug panel for monitoring app activity"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
return render_template('admin/debug.html')
@app.route('/api/admin/logs')
@login_required
def api_get_logs():
"""API: Get recent logs"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
# Get optional filters
level = request.args.get('level', '') # DEBUG, INFO, WARNING, ERROR
since = request.args.get('since', '') # ISO timestamp
limit = min(int(request.args.get('limit', 100)), 500)
logs = list(debug_handler.logs)
# Filter by level
if level:
logs = [l for l in logs if l['level'] == level.upper()]
# Filter by timestamp
if since:
logs = [l for l in logs if l['timestamp'] > since]
# Return most recent
logs = logs[-limit:]
return jsonify({
'success': True,
'logs': logs,
'total': len(debug_handler.logs)
})
@app.route('/api/admin/logs/stream')
@login_required
def api_logs_stream():
"""SSE endpoint for real-time log streaming"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
def generate():
last_count = 0
while True:
current_count = len(debug_handler.logs)
if current_count > last_count:
# Send new logs
new_logs = list(debug_handler.logs)[last_count:]
for log in new_logs:
yield f"data: {json.dumps(log)}\n\n"
last_count = current_count
import time
time.sleep(0.5)
return Response(generate(), mimetype='text/event-stream')
@app.route('/api/admin/logs/clear', methods=['POST'])
@login_required
def api_clear_logs():
"""API: Clear log buffer"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
debug_handler.logs.clear()
logger.info("Log buffer cleared by admin")
return jsonify({'success': True})
@app.route('/api/admin/test-log', methods=['POST'])
@login_required
def api_test_log():
"""API: Generate test log entries"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
logger.debug("Test DEBUG message")
logger.info("Test INFO message")
logger.warning("Test WARNING message")
logger.error("Test ERROR message")
return jsonify({'success': True, 'message': 'Test logs generated'})
@app.route('/admin/digital-maturity')
@login_required
def digital_maturity_dashboard():
"""Admin dashboard for digital maturity assessment results"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get all companies with maturity data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
CompanyDigitalMaturity.overall_score,
CompanyDigitalMaturity.online_presence_score,
CompanyDigitalMaturity.sales_readiness,
CompanyDigitalMaturity.total_opportunity_value,
CompanyWebsiteAnalysis.opportunity_score,
CompanyWebsiteAnalysis.has_blog,
CompanyWebsiteAnalysis.has_portfolio,
CompanyWebsiteAnalysis.has_contact_form,
CompanyWebsiteAnalysis.content_richness_score,
CompanyDigitalMaturity.critical_gaps,
CompanyWebsiteAnalysis.missing_features
).join(
CompanyDigitalMaturity, Company.id == CompanyDigitalMaturity.company_id
).join(
CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id
).filter(
CompanyDigitalMaturity.overall_score > 0
).order_by(
desc(CompanyDigitalMaturity.overall_score)
).all()
# Calculate stats
total_analyzed = len(companies_query)
avg_score = round(sum(c.overall_score for c in companies_query) / total_analyzed, 1) if total_analyzed else 0
total_opportunity = sum(float(c.total_opportunity_value or 0) for c in companies_query)
warm_leads = [c for c in companies_query if c.sales_readiness == 'warm']
cold_leads = [c for c in companies_query if c.sales_readiness == 'cold']
# Top 10 and bottom 10
top_performers = companies_query[:10]
bottom_performers = sorted(companies_query, key=lambda c: c.overall_score)[:10]
# Top opportunities
top_opportunities = sorted(
companies_query,
key=lambda c: float(c.total_opportunity_value or 0),
reverse=True
)[:10]
return render_template('admin/digital_maturity.html',
total_analyzed=total_analyzed,
avg_score=avg_score,
total_opportunity=total_opportunity,
warm_leads_count=len(warm_leads),
cold_leads_count=len(cold_leads),
top_performers=top_performers,
bottom_performers=bottom_performers,
top_opportunities=top_opportunities,
all_companies=companies_query
)
finally:
db.close()
@app.route('/admin/social-media')
@login_required
def admin_social_media():
"""Admin dashboard for social media analytics"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, case, distinct
from database import CompanySocialMedia
# Total counts per platform
platform_stats = db.query(
CompanySocialMedia.platform,
func.count(CompanySocialMedia.id).label('count'),
func.count(distinct(CompanySocialMedia.company_id)).label('companies')
).filter(
CompanySocialMedia.is_valid == True
).group_by(CompanySocialMedia.platform).all()
# Companies with each platform combination
company_platforms = db.query(
Company.id,
Company.name,
Company.slug,
func.array_agg(distinct(CompanySocialMedia.platform)).label('platforms')
).outerjoin(
CompanySocialMedia,
(Company.id == CompanySocialMedia.company_id) & (CompanySocialMedia.is_valid == True)
).group_by(Company.id, Company.name, Company.slug).all()
# Analysis
total_companies = len(company_platforms)
companies_with_sm = [c for c in company_platforms if c.platforms and c.platforms[0] is not None]
companies_without_sm = [c for c in company_platforms if not c.platforms or c.platforms[0] is None]
# Platform combinations
platform_combos_raw = {}
for c in companies_with_sm:
platforms = sorted([p for p in c.platforms if p]) if c.platforms else []
key = ', '.join(platforms) if platforms else 'Brak'
if key not in platform_combos_raw:
platform_combos_raw[key] = []
platform_combos_raw[key].append({'id': c.id, 'name': c.name, 'slug': c.slug})
# Sort by number of companies (descending)
platform_combos = dict(sorted(platform_combos_raw.items(), key=lambda x: len(x[1]), reverse=True))
# Only Facebook
only_facebook = [c for c in companies_with_sm if set(c.platforms) == {'facebook'}]
# Only LinkedIn
only_linkedin = [c for c in companies_with_sm if set(c.platforms) == {'linkedin'}]
# Only Instagram
only_instagram = [c for c in companies_with_sm if set(c.platforms) == {'instagram'}]
# Has all major (FB + LI + IG)
has_all_major = [c for c in companies_with_sm if {'facebook', 'linkedin', 'instagram'}.issubset(set(c.platforms or []))]
# Get all social media entries with company info for detailed view
all_entries = db.query(
CompanySocialMedia,
Company.name.label('company_name'),
Company.slug.label('company_slug')
).join(Company).order_by(
Company.name, CompanySocialMedia.platform
).all()
# Freshness analysis
from datetime import datetime, timedelta
now = datetime.now()
fresh_30d = db.query(func.count(CompanySocialMedia.id)).filter(
CompanySocialMedia.verified_at >= now - timedelta(days=30)
).scalar()
stale_90d = db.query(func.count(CompanySocialMedia.id)).filter(
CompanySocialMedia.verified_at < now - timedelta(days=90)
).scalar()
return render_template('admin/social_media.html',
platform_stats=platform_stats,
total_companies=total_companies,
companies_with_sm=len(companies_with_sm),
companies_without_sm=companies_without_sm,
platform_combos=platform_combos,
only_facebook=only_facebook,
only_linkedin=only_linkedin,
only_instagram=only_instagram,
has_all_major=has_all_major,
all_entries=all_entries,
fresh_30d=fresh_30d,
stale_90d=stale_90d,
now=now
)
finally:
db.close()
# ============================================================
# SOCIAL MEDIA AUDIT ADMIN DASHBOARD
# ============================================================
@app.route('/admin/social-audit')
@login_required
def admin_social_audit():
"""
Admin dashboard for Social Media audit overview.
Displays:
- Summary stats (coverage per platform, total profiles)
- Platform coverage with progress bars
- Sortable table with platform icons per company
- Followers aggregate statistics
"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, distinct
from database import CompanySocialMedia, Category
# Platform definitions
platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
# Total companies count
total_companies = db.query(func.count(Company.id)).filter(Company.status == 'active').scalar()
# Get all companies with their social media profiles
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name')
).outerjoin(
Category,
Company.category_id == Category.id
).filter(
Company.status == 'active'
).order_by(Company.name).all()
# Get social media data per company
social_data = db.query(
CompanySocialMedia.company_id,
CompanySocialMedia.platform,
CompanySocialMedia.url,
CompanySocialMedia.followers_count,
CompanySocialMedia.verified_at,
CompanySocialMedia.is_valid
).filter(
CompanySocialMedia.is_valid == True
).all()
# Group social media by company
company_social = {}
for sm in social_data:
if sm.company_id not in company_social:
company_social[sm.company_id] = {}
company_social[sm.company_id][sm.platform] = {
'url': sm.url,
'followers': sm.followers_count or 0,
'verified_at': sm.verified_at
}
# Build companies list with social media info
companies = []
for row in companies_query:
sm_data = company_social.get(row.id, {})
total_followers = sum(p.get('followers', 0) for p in sm_data.values())
platform_count = len(sm_data)
# Get last verified date across all platforms
verified_dates = [p.get('verified_at') for p in sm_data.values() if p.get('verified_at')]
last_verified = max(verified_dates) if verified_dates else None
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'platforms': sm_data,
'platform_count': platform_count,
'total_followers': total_followers,
'last_verified': last_verified,
'has_facebook': 'facebook' in sm_data,
'has_instagram': 'instagram' in sm_data,
'has_linkedin': 'linkedin' in sm_data,
'has_youtube': 'youtube' in sm_data,
'has_twitter': 'twitter' in sm_data,
'has_tiktok': 'tiktok' in sm_data
})
# Platform statistics
platform_stats = {}
for platform in platforms:
count = db.query(func.count(distinct(CompanySocialMedia.company_id))).filter(
CompanySocialMedia.platform == platform,
CompanySocialMedia.is_valid == True
).scalar() or 0
platform_stats[platform] = {
'count': count,
'percent': round(count / total_companies * 100) if total_companies > 0 else 0
}
# Summary stats
companies_with_sm = len([c for c in companies if c['platform_count'] > 0])
companies_without_sm = total_companies - companies_with_sm
total_profiles = sum(c['platform_count'] for c in companies)
total_followers = sum(c['total_followers'] for c in companies)
# Top followers (top 10 companies by total followers)
top_followers = sorted([c for c in companies if c['total_followers'] > 0],
key=lambda x: x['total_followers'], reverse=True)[:10]
stats = {
'total_companies': total_companies,
'companies_with_sm': companies_with_sm,
'companies_without_sm': companies_without_sm,
'total_profiles': total_profiles,
'total_followers': total_followers,
'platform_stats': platform_stats
}
# Get unique categories
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert to objects for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
top_followers_objects = [CompanyRow(c) for c in top_followers]
return render_template('admin/social_audit_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
platforms=platforms,
top_followers=top_followers_objects,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# IT AUDIT ADMIN DASHBOARD
# ============================================================
@app.route('/admin/it-audit')
@login_required
def admin_it_audit():
"""
Admin dashboard for IT audit overview.
Displays:
- Summary stats (audit count, average scores, maturity distribution)
- Technology adoption stats (Azure AD, M365, PBS, Zabbix, EDR, DR)
- Collaboration flags distribution
- Company table with IT audit data
- Collaboration matches matrix
Access: Admin only
"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, distinct
# Import IT audit models and service
from database import ITAudit, ITCollaborationMatch
from it_audit_service import get_maturity_level_label
# Get all active companies with their latest IT audit
# Using subquery to get only the latest audit per company
latest_audit_subq = db.query(
ITAudit.company_id,
func.max(ITAudit.audit_date).label('max_date')
).group_by(ITAudit.company_id).subquery()
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
ITAudit.id.label('audit_id'),
ITAudit.overall_score,
ITAudit.security_score,
ITAudit.collaboration_score,
ITAudit.completeness_score,
ITAudit.maturity_level,
ITAudit.audit_date,
ITAudit.has_azure_ad,
ITAudit.has_m365,
ITAudit.has_proxmox_pbs,
ITAudit.monitoring_solution,
ITAudit.has_edr,
ITAudit.has_dr_plan
).outerjoin(
latest_audit_subq,
Company.id == latest_audit_subq.c.company_id
).outerjoin(
ITAudit,
(Company.id == ITAudit.company_id) &
(ITAudit.audit_date == latest_audit_subq.c.max_date)
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
# Detect Zabbix from monitoring_solution field
has_zabbix = row.monitoring_solution and 'zabbix' in str(row.monitoring_solution).lower()
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'audit_id': row.audit_id,
'overall_score': row.overall_score,
'security_score': row.security_score,
'collaboration_score': row.collaboration_score,
'completeness_score': row.completeness_score,
'maturity_level': row.maturity_level,
'maturity_label': get_maturity_level_label(row.maturity_level) if row.maturity_level else None,
'audit_date': row.audit_date,
'has_azure_ad': row.has_azure_ad,
'has_m365': row.has_m365,
'has_proxmox_pbs': row.has_proxmox_pbs,
'has_zabbix': has_zabbix,
'has_edr': row.has_edr,
'has_dr_plan': row.has_dr_plan
})
# Calculate statistics
audited_companies = [c for c in companies if c['overall_score'] is not None]
not_audited = [c for c in companies if c['overall_score'] is None]
# Maturity distribution
maturity_counts = {
'basic': 0,
'developing': 0,
'established': 0,
'advanced': 0
}
for c in audited_companies:
level = c['maturity_level']
if level in maturity_counts:
maturity_counts[level] += 1
# Calculate average scores
if audited_companies:
avg_overall = round(sum(c['overall_score'] for c in audited_companies) / len(audited_companies))
avg_security = round(sum(c['security_score'] or 0 for c in audited_companies) / len(audited_companies))
avg_collaboration = round(sum(c['collaboration_score'] or 0 for c in audited_companies) / len(audited_companies))
else:
avg_overall = None
avg_security = None
avg_collaboration = None
# Technology adoption stats
tech_stats = {
'azure_ad': len([c for c in audited_companies if c['has_azure_ad']]),
'm365': len([c for c in audited_companies if c['has_m365']]),
'proxmox_pbs': len([c for c in audited_companies if c['has_proxmox_pbs']]),
'zabbix': len([c for c in audited_companies if c['has_zabbix']]),
'edr': len([c for c in audited_companies if c['has_edr']]),
'dr_plan': len([c for c in audited_companies if c['has_dr_plan']])
}
# Collaboration flags stats from latest audits
collab_stats = {}
if audited_companies:
collab_flags = [
'open_to_shared_licensing',
'open_to_backup_replication',
'open_to_teams_federation',
'open_to_shared_monitoring',
'open_to_collective_purchasing',
'open_to_knowledge_sharing'
]
for flag in collab_flags:
count = db.query(func.count(ITAudit.id)).filter(
ITAudit.id.in_([c['audit_id'] for c in audited_companies if c['audit_id']]),
getattr(ITAudit, flag) == True
).scalar()
collab_stats[flag] = count
# Get collaboration matches with both companies' info
matches = db.query(ITCollaborationMatch).order_by(
ITCollaborationMatch.match_score.desc()
).all()
# Build flat list of collaboration matches with all necessary attributes
class CollabMatchRow:
"""Helper class for template attribute access"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
collaboration_matches = []
for match in matches:
# Get company A and B info
company_a = db.query(Company).filter(Company.id == match.company_a_id).first()
company_b = db.query(Company).filter(Company.id == match.company_b_id).first()
collaboration_matches.append(CollabMatchRow(
id=match.id,
match_type=match.match_type,
company_a_id=match.company_a_id,
company_a_name=company_a.name if company_a else 'Nieznana',
company_a_slug=company_a.slug if company_a else '',
company_b_id=match.company_b_id,
company_b_name=company_b.name if company_b else 'Nieznana',
company_b_slug=company_b.slug if company_b else '',
match_reason=match.match_reason,
match_score=match.match_score,
status=match.status,
created_at=match.created_at
))
stats = {
# Main stats
'total_audits': len(audited_companies),
'total_companies': len(companies),
'companies_without_audit': len(not_audited),
# Score averages
'avg_overall_score': avg_overall,
'avg_security_score': avg_security,
'avg_collaboration_score': avg_collaboration,
# Maturity distribution (flattened for template)
'maturity_basic': maturity_counts['basic'],
'maturity_developing': maturity_counts['developing'],
'maturity_established': maturity_counts['established'],
'maturity_advanced': maturity_counts['advanced'],
# Technology adoption stats (matching template naming with has_* prefix)
'has_azure_ad': tech_stats['azure_ad'],
'has_m365': tech_stats['m365'],
'has_proxmox_pbs': tech_stats['proxmox_pbs'],
'has_zabbix': tech_stats['zabbix'],
'has_edr': tech_stats['edr'],
'has_dr_plan': tech_stats['dr_plan'],
# Collaboration flags
'open_to_shared_licensing': collab_stats.get('open_to_shared_licensing', 0),
'open_to_backup_replication': collab_stats.get('open_to_backup_replication', 0),
'open_to_teams_federation': collab_stats.get('open_to_teams_federation', 0),
'open_to_shared_monitoring': collab_stats.get('open_to_shared_monitoring', 0),
'open_to_collective_purchasing': collab_stats.get('open_to_collective_purchasing', 0),
'open_to_knowledge_sharing': collab_stats.get('open_to_knowledge_sharing', 0),
# Legacy nested structures (for any templates that still use them)
'maturity_counts': maturity_counts,
'tech_stats': tech_stats,
'collab_stats': collab_stats,
'total_matches': len(collaboration_matches)
}
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin/it_audit_dashboard.html',
companies=companies_objects,
stats=stats,
collaboration_matches=collaboration_matches,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# IT AUDIT FORM
# ============================================================
@app.route('/it-audit/form')
@login_required
def it_audit_form():
"""
IT Audit form for data collection.
Displays a 9-section form for collecting IT infrastructure data:
- IT Contact
- Cloud & Identity
- Server Infrastructure
- Endpoints
- Security
- Backup & DR
- Monitoring
- Business Apps
- Collaboration
Query parameters:
company_id (int, optional): Company ID to audit. If not provided,
defaults to current user's company.
Access control:
- Admin users can access form for any company
- Regular users can only access form for their own company
Returns:
Rendered it_audit_form.html template with company and audit data
"""
db = SessionLocal()
try:
from database import ITAudit, Company
# Get company_id from query params or use current user's company
company_id = request.args.get('company_id', type=int)
if not company_id:
# If no company_id provided, use current user's company
if current_user.company_id:
company_id = current_user.company_id
elif current_user.is_admin:
# Admin without specific company_id should redirect to admin dashboard
flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info')
return redirect(url_for('admin_it_audit'))
else:
flash('Nie jesteś przypisany do żadnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can access any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest audit for this company (for pre-filling the form)
audit = db.query(ITAudit).filter(
ITAudit.company_id == company.id
).order_by(
ITAudit.audit_date.desc()
).first()
logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}")
return render_template('it_audit_form.html',
company=company,
audit=audit
)
finally:
db.close()
@app.route('/it-audit/save', methods=['POST'])
@login_required
@limiter.limit("30 per hour")
def it_audit_save():
"""
Save IT audit form data with automatic scoring.
This endpoint saves IT infrastructure audit data from the form,
calculates security, collaboration, and completeness scores,
and stores the audit in the database.
Request JSON body:
- company_id: Company ID (integer, required)
- All audit fields from the 9-section form
Returns:
- Success: Audit results with scores and redirect URL
- Error: Error message with status code
Access:
- Members can save audits for their own company
- Admins can save audits for any company
Rate limited to 30 requests per hour per user.
"""
from database import ITAudit, Company
from it_audit_service import ITAuditService
# Parse request data (supports both JSON and form data)
if request.is_json:
data = request.get_json()
else:
data = request.form.to_dict(flat=True)
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu.'
}), 400
# Get company_id
company_id = data.get('company_id')
if company_id:
try:
company_id = int(company_id)
except (ValueError, TypeError):
return jsonify({
'success': False,
'error': 'Nieprawidłowy identyfikator firmy.'
}), 400
else:
# Use current user's company if not specified
if current_user.company_id:
company_id = current_user.company_id
else:
return jsonify({
'success': False,
'error': 'Podaj company_id firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control: admin can save for any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.'
}), 403
# Parse form data into audit_data dictionary
audit_data = _parse_it_audit_form_data(data)
audit_data['audited_by'] = current_user.id
audit_data['audit_source'] = 'form'
# Save audit using service
service = ITAuditService(db)
audit = service.save_audit(company_id, audit_data)
# Check if this is a partial submission (completeness < 100)
is_partial = audit.completeness_score < 100 if audit.completeness_score else True
# Count previous audits for this company (to indicate if history exists)
audit_history_count = db.query(ITAudit).filter(
ITAudit.company_id == company_id
).count()
logger.info(
f"IT audit saved by {current_user.email} for company {company.name}: "
f"overall={audit.overall_score}, security={audit.security_score}, "
f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}"
f"{' (partial)' if is_partial else ''}"
)
# Build appropriate success message
if is_partial:
if audit.completeness_score < 30:
message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.'
elif audit.completeness_score < 70:
message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.'
else:
message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).'
else:
message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.'
# Return success response with detailed information
return jsonify({
'success': True,
'message': message,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_partial': is_partial,
},
'history_count': audit_history_count, # Number of audits for this company (including current)
'redirect_url': url_for('company_detail_by_slug', slug=company.slug)
}), 200
except Exception as e:
db.rollback()
logger.error(f"Error saving IT audit for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas zapisywania audytu: {str(e)}'
}), 500
finally:
db.close()
def _parse_it_audit_form_data(data: dict) -> dict:
"""
Parse form data into audit_data dictionary.
Handles:
- Boolean fields (checkboxes)
- Array fields (multi-select)
- String and numeric fields
Args:
data: Raw form data dictionary
Returns:
Parsed audit_data dictionary with proper types
"""
# Boolean fields (checkboxes - present means True)
boolean_fields = [
'has_it_manager', 'it_outsourced',
'has_azure_ad', 'has_m365', 'has_google_workspace',
'has_mdm', 'has_edr', 'has_vpn', 'has_mfa',
'has_proxmox_pbs', 'has_dr_plan',
'has_local_ad', 'has_ad_azure_sync',
'open_to_shared_licensing', 'open_to_backup_replication',
'open_to_teams_federation', 'open_to_shared_monitoring',
'open_to_collective_purchasing', 'open_to_knowledge_sharing',
]
# Array fields (multi-select - may come as comma-separated or multiple values)
array_fields = [
'm365_plans', 'teams_usage', 'server_types', 'server_os',
'desktop_os', 'mfa_scope', 'backup_targets',
]
# String fields
string_fields = [
'it_provider_name', 'it_contact_name', 'it_contact_email',
'azure_tenant_name', 'azure_user_count',
'server_count', 'virtualization_platform', 'network_firewall_brand',
'employee_count', 'computer_count', 'mdm_solution',
'antivirus_solution', 'edr_solution', 'vpn_solution',
'backup_solution', 'backup_frequency',
'monitoring_solution', 'ad_domain_name',
'ticketing_system', 'erp_system', 'crm_system', 'document_management',
]
audit_data = {}
# Parse boolean fields
for field in boolean_fields:
value = data.get(field)
if value is None:
audit_data[field] = False
elif isinstance(value, bool):
audit_data[field] = value
elif isinstance(value, str):
audit_data[field] = value.lower() in ('true', '1', 'on', 'yes')
else:
audit_data[field] = bool(value)
# Parse array fields
for field in array_fields:
value = data.get(field)
if value is None:
audit_data[field] = []
elif isinstance(value, list):
audit_data[field] = value
elif isinstance(value, str):
# Handle comma-separated values
audit_data[field] = [v.strip() for v in value.split(',') if v.strip()]
else:
audit_data[field] = [value]
# Parse string fields
for field in string_fields:
value = data.get(field)
if value is not None and isinstance(value, str):
audit_data[field] = value.strip() if value.strip() else None
else:
audit_data[field] = None
# Parse zabbix_integration as JSON if present
zabbix_integration = data.get('zabbix_integration')
if zabbix_integration:
if isinstance(zabbix_integration, dict):
audit_data['zabbix_integration'] = zabbix_integration
elif isinstance(zabbix_integration, str):
try:
audit_data['zabbix_integration'] = json.loads(zabbix_integration)
except json.JSONDecodeError:
audit_data['zabbix_integration'] = {'hostname': zabbix_integration}
else:
audit_data['zabbix_integration'] = None
else:
# Check for zabbix_hostname field as alternative
zabbix_hostname = data.get('zabbix_hostname')
if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip():
audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()}
else:
audit_data['zabbix_integration'] = None
return audit_data
@app.route('/api/it-audit/matches/<int:company_id>')
@login_required
def api_it_audit_matches(company_id):
"""
API: Get IT audit collaboration matches for a company.
Returns all collaboration matches where the specified company
is either company_a or company_b in the match pair.
This endpoint is admin-only as collaboration matches
are not visible to regular users.
Args:
company_id: Company ID to get matches for
Returns:
JSON with list of matches including:
- match_id, match_type, match_score, status
- partner company info (id, name, slug)
- match_reason and shared_attributes
"""
# Only admins can view collaboration matches
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Tylko administrator może przeglądać dopasowania.'
}), 403
db = SessionLocal()
try:
from it_audit_service import ITAuditService
from database import ITCollaborationMatch
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Get matches for this company
service = ITAuditService(db)
matches = service.get_matches_for_company(company_id)
# Format matches for JSON response
matches_data = []
for match in matches:
# Determine partner company (the other company in the match)
if match.company_a_id == company_id:
partner = match.company_b
else:
partner = match.company_a
matches_data.append({
'id': match.id,
'match_type': match.match_type,
'match_type_label': match.match_type_label,
'match_score': match.match_score,
'match_reason': match.match_reason,
'status': match.status,
'status_label': match.status_label,
'shared_attributes': match.shared_attributes,
'created_at': match.created_at.isoformat() if match.created_at else None,
'partner': {
'id': partner.id if partner else None,
'name': partner.name if partner else None,
'slug': partner.slug if partner else None,
}
})
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'matches_count': len(matches_data),
'matches': matches_data
}), 200
except Exception as e:
logger.error(f"Error fetching IT audit matches for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania dopasowań: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/it-audit/history/<int:company_id>')
@login_required
def api_it_audit_history(company_id):
"""
API: Get IT audit history for a company.
Returns a list of all IT audits for a company, ordered by date descending.
The first item in the list is always the latest (current) audit.
Access:
- Admin: Can view history for any company
- User: Can only view history for their own company
Args:
company_id: Company ID to get audit history for
Query params:
limit: Maximum number of audits to return (default: 10)
Returns:
JSON with list of audits including:
- audit_id, audit_date, overall_score, scores, maturity_level
- is_current flag (True for the most recent audit)
"""
from it_audit_service import get_company_audit_history
# Access control: users can only view their own company's history
if not current_user.is_admin and current_user.company_id != company_id:
return jsonify({
'success': False,
'error': 'Brak uprawnień do przeglądania historii audytów tej firmy.'
}), 403
# Parse limit from query params
limit = request.args.get('limit', 10, type=int)
limit = min(max(limit, 1), 50) # Clamp to 1-50
db = SessionLocal()
try:
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Get audit history
audits = get_company_audit_history(db, company_id, limit)
# Format response
history = []
for idx, audit in enumerate(audits):
history.append({
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'audit_source': audit.audit_source,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_current': idx == 0, # First item is most recent
'is_partial': (audit.completeness_score or 0) < 100,
})
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'company_slug': company.slug,
'total_audits': len(history),
'history': history
}), 200
except Exception as e:
logger.error(f"Error fetching IT audit history for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania historii audytów: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/it-audit/export')
@login_required
def api_it_audit_export():
"""
API: Export IT audit data as CSV.
Exports all IT audits with company information and scores.
Admin-only endpoint.
Returns:
CSV file with IT audit data
"""
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Tylko administrator może eksportować dane audytów.'
}), 403
db = SessionLocal()
try:
from database import ITAudit
import csv
from io import StringIO
# Get all latest audits per company
audits = db.query(ITAudit, Company).join(
Company, ITAudit.company_id == Company.id
).order_by(
ITAudit.company_id,
ITAudit.audit_date.desc()
).all()
# Deduplicate to get only latest audit per company
seen_companies = set()
latest_audits = []
for audit, company in audits:
if company.id not in seen_companies:
seen_companies.add(company.id)
latest_audits.append((audit, company))
# Create CSV
output = StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
'Firma', 'NIP', 'Kategoria', 'Data audytu',
'Wynik ogólny', 'Bezpieczeństwo', 'Współpraca', 'Kompletność',
'Poziom dojrzałości', 'Azure AD', 'M365', 'EDR', 'MFA',
'Proxmox PBS', 'Monitoring'
])
# Data rows
for audit, company in latest_audits:
writer.writerow([
company.name,
company.nip or '',
company.category.name if company.category else '',
audit.audit_date.strftime('%Y-%m-%d') if audit.audit_date else '',
audit.overall_score or '',
audit.security_score or '',
audit.collaboration_score or '',
audit.completeness_score or '',
audit.maturity_level or '',
'Tak' if audit.has_azure_ad else 'Nie',
'Tak' if audit.has_m365 else 'Nie',
'Tak' if audit.has_edr else 'Nie',
'Tak' if audit.has_mfa else 'Nie',
'Tak' if audit.has_proxmox_pbs else 'Nie',
audit.monitoring_solution or 'Brak'
])
# Create response
output.seek(0)
from flask import Response
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': 'attachment; filename=it_audit_export.csv',
'Content-Type': 'text/csv; charset=utf-8'
}
)
except Exception as e:
logger.error(f"Error exporting IT audits: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas eksportu: {str(e)}'
}), 500
finally:
db.close()
# ============================================================
# RELEASE NOTES
# ============================================================
@app.route('/release-notes')
def release_notes():
"""Historia zmian platformy."""
releases = [
{
'version': 'v1.9.0',
'date': '9 stycznia 2026',
'badges': ['new'],
'new': [
'Panel Audyt GBP - przegląd kompletności profili Google Business',
'Panel Audyt Social - przegląd pokrycia Social Media wszystkich firm',
],
},
{
'version': 'v1.8.0',
'date': '8 stycznia 2026',
'badges': ['new', 'improve'],
'new': [
'Panel Audyt IT - kompleksowy audyt infrastruktury IT firm',
'Eksport audytów IT do CSV',
],
'improve': [
'Poprawki w formularzach edycji audytu IT',
],
},
{
'version': 'v1.7.0',
'date': '6 stycznia 2026',
'badges': ['new'],
'new': [
'Panel Audyt SEO - analiza wydajności stron www firm',
'Integracja z Google PageSpeed Insights API',
],
},
{
'version': 'v1.6.0',
'date': '29 grudnia 2025',
'badges': ['new'],
'new': [
'System newsów i wzmianek medialnych o firmach',
'Panel moderacji newsów dla adminów',
'Integracja z Brave Search API',
],
},
{
'version': 'v1.5.0',
'date': '15 grudnia 2025',
'badges': ['new', 'improve'],
'new': [
'Panel Social Media - zarządzanie profilami społecznościowymi',
'Weryfikacja aktywności profili Social Media',
],
'improve': [
'Ulepszony profil firmy z sekcją Social Media',
],
},
{
'version': 'v1.4.0',
'date': '1 grudnia 2025',
'badges': ['new'],
'new': [
'System rekomendacji między firmami',
'Panel składek członkowskich',
'Kalendarz wydarzeń Norda Biznes',
],
},
{
'version': 'v1.3.0',
'date': '28 listopada 2025',
'badges': ['new', 'improve'],
'new': [
'Chatbot AI z wiedzą o wszystkich firmach',
'Wyszukiwarka firm z synonimami i fuzzy matching',
],
'improve': [
'Ulepszony SearchService z PostgreSQL FTS',
],
},
{
'version': 'v1.2.0',
'date': '25 listopada 2025',
'badges': ['new'],
'new': [
'System wiadomości prywatnych między użytkownikami',
'Powiadomienia o nowych wiadomościach',
],
},
{
'version': 'v1.1.0',
'date': '24 listopada 2025',
'badges': ['new', 'improve'],
'new': [
'Rejestracja i logowanie użytkowników',
'Profile użytkowników powiązane z firmami',
],
'improve': [
'Responsywny design na urządzenia mobilne',
],
},
{
'version': 'v1.0.0',
'date': '23 listopada 2025',
'badges': ['new'],
'new': [
'Oficjalny start platformy Norda Biznes Hub',
'Katalog 80 firm członkowskich',
'Wyszukiwarka firm po nazwie, kategorii, usługach',
'Profile firm z pełnymi danymi kontaktowymi',
],
},
]
return render_template('release_notes.html', releases=releases)
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(error):
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('errors/500.html'), 500
# ============================================================
# MAIN
# ============================================================
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000))
debug = os.getenv('FLASK_ENV') == 'development'
logger.info(f"Starting Norda Biznes Hub on port {port}")
app.run(host='0.0.0.0', port=port, debug=debug)