目录

fastapimysql实现问卷调查系统

目录

fastapi+mysql实现问卷调查系统

说明:

我计划用fastapi+python,实现多表查询,并写成接口,在postman里面请求访问

step1:


-- 1. 问卷表
CREATE TABLE survey (
                        id INT PRIMARY KEY AUTO_INCREMENT,
                        title VARCHAR(255) NOT NULL,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 2. 问题表(包含题型)
CREATE TABLE question (
                          id INT PRIMARY KEY AUTO_INCREMENT,
                          survey_id INT NOT NULL,
                          content TEXT NOT NULL,
                          type ENUM('single_choice', 'multiple_choice', 'text') NOT NULL,
                          FOREIGN KEY (survey_id) REFERENCES survey(id)
);
-- 3. 选项表(用于单选/多选题)
CREATE TABLE `option` (
                          id INT PRIMARY KEY AUTO_INCREMENT,
                          question_id INT NOT NULL,
                          content VARCHAR(255) NOT NULL,
                          FOREIGN KEY (question_id) REFERENCES question(id)
);
-- 4. 用户表
CREATE TABLE user (
                      id INT PRIMARY KEY AUTO_INCREMENT,
                      username VARCHAR(50) NOT NULL
);
-- 5. 答案表(统一存储所有类型答案)
CREATE TABLE answer (
                        id INT PRIMARY KEY AUTO_INCREMENT,
                        user_id INT NOT NULL,
                        question_id INT NOT NULL,
                        option_id INT DEFAULT NULL,
                        answer_text TEXT DEFAULT NULL,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                        FOREIGN KEY (user_id) REFERENCES user(id),
                        FOREIGN KEY (question_id) REFERENCES question(id),
                        FOREIGN KEY (option_id) REFERENCES `option`(id)
);

-- 插入问卷
INSERT INTO survey (title) VALUES
                               ('消费者满意度调查'),
                               ('产品使用习惯调查');
-- 插入问题(前3题属于问卷1)
INSERT INTO question (survey_id, content, type) VALUES
                                                    (1, '您对产品的满意度如何?', 'single_choice'),
                                                    (1, '您通过哪些渠道了解我们?', 'multiple_choice'),
                                                    (1, '请提出改进建议', 'text'),
                                                    (2, '您每天使用产品的频率?', 'single_choice');
-- 插入选项(问题1有4个选项,问题2有4个选项,问题4有3个选项)
INSERT INTO `option` (question_id, content) VALUES
                                                (1, '非常满意'), (1, '满意'), (1, '一般'), (1, '不满意'),
                                                (2, '互联网广告'), (2, '朋友推荐'), (2, '社交媒体'), (2, '其他'),
                                                (4, '1-3次'), (4, '4-6次'), (4, '6次以上');
-- 插入用户
INSERT INTO user (username) VALUES
                                ('张三'), ('李四'), ('王五');
-- 插入答案(模拟不同用户的回答)
-- 用户1的回答
INSERT INTO answer (user_id, question_id, option_id, answer_text) VALUES
                                                                      (1, 1, 1, NULL),
                                                                      (1, 2, 5, NULL), (1, 2, 6, NULL),
                                                                      (1, 3, NULL, '增加更多功能');

INSERT INTO answer (user_id, question_id, option_id) VALUES
    (1, 4, 5);
-- 用户2的回答
INSERT INTO answer (user_id, question_id, option_id) VALUES
                                                         (2, 1, 2), (2, 2, 5), (2, 2, 7);
-- 用户3的回答
INSERT INTO answer (user_id, question_id, option_id, answer_text) VALUES
                                                                      (3, 1, 1, NULL),
                                                                      (3, 2, 6, NULL), (3, 2, 8, NULL),
                                                                      (3, 3, NULL, '服务态度很好');

# 1. 查询单选问题选项统计
SELECT
    o.content AS '选项内容',
    COUNT(a.id) AS '选择次数'
FROM question q
         JOIN `option` o ON q.id = o.question_id
         LEFT JOIN answer a ON o.id = a.option_id AND q.id = a.question_id
WHERE q.id = 1
GROUP BY o.id
ORDER BY COUNT(a.id) DESC;


# 2. 查询多选题选项统计
SELECT
    o.content AS '选项内容',
    COUNT(a.id) AS '被选次数'
FROM question q
         JOIN `option` o ON q.id = o.question_id
         LEFT JOIN answer a ON o.id = a.option_id AND q.id = a.question_id
WHERE q.id = 2  -- 查询问题ID=2的多选题
GROUP BY o.id
ORDER BY COUNT(a.id) DESC;

# 3. 查询文本题反馈内容
SELECT
    u.username AS '用户',
    a.answer_text AS '反馈内容'
FROM answer a
         JOIN user u ON a.user_id = u.id
WHERE a.question_id = 3;

# 4. 按问卷统计整体回答率


SELECT
    s.title AS '问卷标题',
    COUNT(DISTINCT CONCAT(a.user_id, '-', q.id)) AS '实际回答数',
    COUNT(DISTINCT q.id) * COUNT(DISTINCT u.id) AS '理论最大回答数',
    ROUND(
            COUNT(DISTINCT CONCAT(a.user_id, '-', q.id)) /
            (COUNT(DISTINCT q.id) * COUNT(DISTINCT u.id)) * 100,
            2
    ) AS '回答率(%)'
FROM survey s
         LEFT JOIN question q ON s.id = q.survey_id
         CROSS JOIN user u
         LEFT JOIN answer a ON q.id = a.question_id AND u.id = a.user_id
GROUP BY s.id;





# 5. 按用户统计答题记录
SELECT
    u.username AS '用户名',
    s.title AS '问卷标题',
    COUNT(DISTINCT a.question_id) AS '已答问题数',
    COUNT(DISTINCT q.id) AS '总问题数',
    ROUND(
            COUNT(DISTINCT a.question_id) /
            COUNT(DISTINCT q.id) * 100,
            2
    ) AS '完成率(%)'
FROM user u
         CROSS JOIN survey s
         LEFT JOIN question q ON s.id = q.survey_id
         LEFT JOIN answer a ON u.id = a.user_id AND q.id = a.question_id
GROUP BY u.id, s.id;

step2:C:\Users\Administrator\PycharmProjects\FastAPIProject\main.py

from fastapi import FastAPI, HTTPException
import pymysql.cursors
app = FastAPI()
# 数据库连接配置
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': '123456',
    'db': 'db_spring',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}
# 查询数据库的函数
def query_database(query: str, params=None):
    try:
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            cursor.execute(query, params)
            result = cursor.fetchall()
        connection.close()
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
# 1. 查询单选问题选项统计
@app.get("/single_choice_stats/{question_id}")
async def get_single_choice_stats(question_id: int):
    query = """
    SELECT
        o.content AS option_content,
        COUNT(a.id) AS selection_count
    FROM question q
             JOIN `option` o ON q.id = o.question_id
             LEFT JOIN answer a ON o.id = a.option_id AND q.id = a.question_id
    WHERE q.id = %s
    GROUP BY o.id
    ORDER BY COUNT(a.id) DESC;
    """
    try:
        data = query_database(query, (question_id,))
        return {"status": "success", "data": data}
    except HTTPException as e:
        return {"status": "error", "message": e.detail}
# 2. 查询多选题选项统计
@app.get("/multiple_choice_stats/{question_id}")
async def get_multiple_choice_stats(question_id: int):
    query = """
    SELECT
        o.content AS option_content,
        COUNT(a.id) AS selected_count
    FROM question q
             JOIN `option` o ON q.id = o.question_id
             LEFT JOIN answer a ON o.id = a.option_id AND q.id = a.question_id
    WHERE q.id = %s
    GROUP BY o.id
    ORDER BY COUNT(a.id) DESC;
    """
    try:
        data = query_database(query, (question_id,))
        return {"status": "success", "data": data}
    except HTTPException as e:
        return {"status": "error", "message": e.detail}
# 3. 查询文本题反馈内容
@app.get("/text_feedback/{question_id}")
async def get_text_feedback(question_id: int):
    query = """
    SELECT
        u.username AS user,
        a.answer_text AS feedback_content
    FROM answer a
             JOIN user u ON a.user_id = u.id
    WHERE a.question_id = %s;
    """
    try:
        data = query_database(query, (question_id,))
        return {"status": "success", "data": data}
    except HTTPException as e:
        return {"status": "error", "message": e.detail}


 # 4. 按问卷统计整体回答率
@app.get("/survey_response_rate/{survey_id}")
async def get_survey_response_rate(survey_id: int):
    query = """
    SELECT
        s.title AS survey_title,
        COUNT(DISTINCT CONCAT(a.user_id, '-', q.id)) AS actual_responses,
        COUNT(DISTINCT q.id) * COUNT(DISTINCT u.id) AS theoretical_max_responses,
        ROUND(
            COUNT(DISTINCT CONCAT(a.user_id, '-', q.id)) /
            (COUNT(DISTINCT q.id) * COUNT(DISTINCT u.id)) * 100,
            2
        ) AS response_rate
    FROM survey s
    LEFT JOIN question q ON s.id = q.survey_id
    CROSS JOIN user u
    LEFT JOIN answer a ON q.id = a.question_id AND u.id = a.user_id
    WHERE s.id = %s
    GROUP BY s.id;
    """
    try:
        data = query_database(query, (survey_id,))
        return {"status": "success", "data": data[0] if data else {}}
    except HTTPException as e:
        return {"status": "error", "message": e.detail}
# 5. 按用户统计答题记录
@app.get("/user_response_stats/{user_id}")
async def get_user_response_stats(user_id: int):
    query = """
    SELECT
        s.title AS survey_title,
        COUNT(DISTINCT a.question_id) AS answered_questions,
        COUNT(DISTINCT q.id) AS total_questions,
        ROUND(
            COUNT(DISTINCT a.question_id) /
            COUNT(DISTINCT q.id) * 100,
            2
        ) AS completion_rate
    FROM user u
    CROSS JOIN survey s
    LEFT JOIN question q ON s.id = q.survey_id
    LEFT JOIN answer a ON u.id = a.user_id AND q.id = a.question_id
    WHERE u.id = %s
    GROUP BY u.id, s.id;
    """
    try:
        data = query_database(query, (user_id,))
        return {"status": "success", "data": data}
    except HTTPException as e:
        return {"status": "error", "message": e.detail}
# 启动应用
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

step3:

http://localhost:8000/single_choice_stats/1
{
    "status": "success",
    "data": [
        {
            "option_content": "非常满意",
            "selection_count": 2
        },
        {
            "option_content": "满意",
            "selection_count": 1
        },
        {
            "option_content": "一般",
            "selection_count": 0
        },
        {
            "option_content": "不满意",
            "selection_count": 0
        }
    ]
}

step4:

http://localhost:8000/multiple_choice_stats/1
{
    "status": "success",
    "data": [
        {
            "option_content": "非常满意",
            "selected_count": 2
        },
        {
            "option_content": "满意",
            "selected_count": 1
        },
        {
            "option_content": "一般",
            "selected_count": 0
        },
        {
            "option_content": "不满意",
            "selected_count": 0
        }
    ]
}

step5:

http://localhost:8000/text_feedback/1
{
    "status": "success",
    "data": [
        {
            "user": "张三",
            "feedback_content": null
        },
        {
            "user": "李四",
            "feedback_content": null
        },
        {
            "user": "王五",
            "feedback_content": null
        }
    ]
}

step6:

http://localhost:8000/survey_response_rate/1
{
    "status": "success",
    "data": {
        "survey_title": "消费者满意度调查",
        "actual_responses": 8,
        "theoretical_max_responses": 9,
        "response_rate": 88.89
    }
}

step7:

http://localhost:8000/user_response_stats/1
{
    "status": "success",
    "data": [
        {
            "survey_title": "消费者满意度调查",
            "answered_questions": 3,
            "total_questions": 3,
            "completion_rate": 100.0
        },
        {
            "survey_title": "产品使用习惯调查",
            "answered_questions": 1,
            "total_questions": 1,
            "completion_rate": 100.0
        }
    ]
}

end