Open Source, Open Future!
  menu
118 文章
ღゝ◡╹)ノ❤️

第11篇:上门取件路线优化实战

上一篇: 第10篇:配送路线优化VRP实战

一、写在前面

配送路线优化完,下一步是:取件路线应如何统筹规划?

配送和取件看起来差不多,都是跑点,但仔细想想,取件难多了。

为啥?配送你可以灵活安排时间(只要当天送到就行),但取件不行——客户预约了9:00-11:00,你11:10到,客户刚好出门了,白跑一趟。

这就是带时间窗口约束的车辆路径问题(VRPTW)。LaDe数据集里正好有取件数据,我就拿来试试看,能不能优化出更好的路线。

二、取件 vs 配送

2.1、配送路线

  • 目标:走最短的路,把所有包裹送完
  • 约束:当天送完就行(时间窗很宽松)
  • 难度:⭐⭐⭐

2.2、取件路线

  • 目标:走最短的路,在规定时间窗内取完所有包裹
  • 约束:
    • 每个点都有时间窗(通常2小时)
    • 早到了要等,迟到了客户可能不在
    • 有些时间窗会冲突(物理上无法同时满足)
  • 难度:⭐⭐⭐⭐⭐

三、数据准备

3.1、LaDe-P数据集

LaDe-P(Pickup)是LaDe数据集的取件部分,包含:

  • 5个城市(上海、杭州、重庆、吉林、烟台)
  • 约68万取件订单(比配送少,毕竟配送是主要业务)
  • 每条记录包含:
    • 取件位置(GPS坐标:lng/lat)
    • 时间窗口:time_window_start 和 time_window_end(客户预约的时间段)
    • 实际取件时间:pickup_time(快递员实际到达取件的时间)
    • 快递员ID
    • 包裹ID

3.2、加载数据

选择上海快递员3243,2022-06-08的取件数据:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 读取上海取件数据
df_pickup = pd.read_csv('data/raw/pickup/pickup_sh.csv')

# 选择快递员3243,2022-06-08
courier_id = 3243
test_date = '0608'

pickup_points = df_pickup[
    (df_pickup['courier_id'] == courier_id) &
    (df_pickup['ds'] == test_date)
].copy()

# 过滤GPS缺失的数据
pickup_points = pickup_points[
    (pickup_points['pickup_gps_lng'].notna()) &
    (pickup_points['pickup_gps_lat'].notna())
].copy()

# 按实际取件时间排序(这是原始路线)
pickup_points = pickup_points.sort_values('pickup_time').reset_index(drop=True)

print(f"快递员{courier_id}在2022-06-08的取件:")
print(f"总订单数: {len(pickup_points)}")

运行结果:

快递员3243在2022-06-08的取件:
总订单数: 14

14个取件点,看看时间窗分布:

# 解析时间窗
pickup_points['tw_start'] = pd.to_datetime(
    '2022-06-08 ' + pickup_points['time_window_start'].astype(str).str.zfill(4).str[:2] + ':' +
    pickup_points['time_window_start'].astype(str).str.zfill(4).str[2:]
)
pickup_points['tw_end'] = pd.to_datetime(
    '2022-06-08 ' + pickup_points['time_window_end'].astype(str).str.zfill(4).str[:2] + ':' +
    pickup_points['time_window_end'].astype(str).str.zfill(4).str[2:]
)

print("\n时间窗分布:")
for i, row in pickup_points.iterrows():
    print(f"  点{i+1}: {row['tw_start'].strftime('%H:%M')} - {row['tw_end'].strftime('%H:%M')}")

运行结果:

时间窗分布:
  点1: 09:00 - 11:00
  点2: 09:00 - 11:00
  点3: 10:00 - 12:00
  点4: 10:00 - 12:00
  点5: 11:00 - 13:00
  点6: 11:00 - 13:00
  点7: 13:00 - 15:00
  点8: 13:00 - 15:00
  点9: 14:00 - 16:00
  点10: 14:00 - 16:00
  点11: 15:00 - 17:00
  点12: 15:00 - 17:00
  点13: 16:00 - 18:00
  点14: 16:00 - 18:00

时间窗从早上9点到晚上6点,每个窗口2小时。

3.3、计算距离矩阵

和配送一样,用Haversine公式计算真实地理距离:

def haversine_distance(lon1, lat1, lon2, lat2):
    """计算两个GPS坐标之间的距离(单位:公里)"""
    from math import radians, cos, sin, asin, sqrt

    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371  # 地球半径(公里)

    return c * r

def compute_distance_matrix(locations):
    n = len(locations)
    dist_matrix = np.zeros((n, n))

    for i in range(n):
        for j in range(n):
            if i != j:
                dist_matrix[i][j] = haversine_distance(
                    locations[i][0], locations[i][1],
                    locations[j][0], locations[j][1]
                )

    return dist_matrix

locations = pickup_points[['pickup_gps_lng', 'pickup_gps_lat']].values
distance_matrix = compute_distance_matrix(locations)
distance_matrix_int = (distance_matrix * 1000).astype(int)  # 转为米

print(f"\n距离矩阵: {len(distance_matrix_int)}×{len(distance_matrix_int)}")

四、用OR-Tools求解VRPTW

4.1、核心代码

VRPTW比TSP多了时间窗约束:

from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp

def solve_vrptw(distance_matrix, time_windows, service_time=5):
    """
    求解VRPTW问题

    参数:
    - distance_matrix: 距离矩阵(米)
    - time_windows: 时间窗列表 [(start_min, end_min), ...]
    - service_time: 每个点的服务时间(分钟)
    """

    # 创建路径规划管理器
    manager = pywrapcp.RoutingIndexManager(
        len(distance_matrix),
        1,  # 1个快递员
        0   # 起点
    )

    routing = pywrapcp.RoutingModel(manager)

    # 距离回调
    def distance_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        return distance_matrix[from_node][to_node]

    transit_callback_index = routing.RegisterTransitCallback(distance_callback)
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

    # 时间回调(距离转时间,假设速度20km/h)
    def time_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        travel_time = distance_matrix[from_node][to_node] / 1000 * 3  # 20km/h = 3分钟/km
        return int(travel_time + service_time)

    time_callback_index = routing.RegisterTransitCallback(time_callback)

    # 添加时间维度
    time = 'Time'
    routing.AddDimension(
        time_callback_index,
        30,  # 允许等待时间(分钟)
        600,  # 最大时间(分钟,10小时)
        False,  # 不强制从0开始
        time
    )
    time_dimension = routing.GetDimensionOrDie(time)

    # 设置时间窗约束
    for location_idx, time_window in enumerate(time_windows):
        if location_idx == 0:
            continue  # 起点不设时间窗
        index = manager.NodeToIndex(location_idx)
        time_dimension.CumulVar(index).SetRange(time_window[0], time_window[1])

    # 搜索参数
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = (
        routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
    )
    search_parameters.local_search_metaheuristic = (
        routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    )
    search_parameters.time_limit.seconds = 30

    solution = routing.SolveWithParameters(search_parameters)

    return manager, routing, solution, time_dimension

# 准备时间窗数据(转为分钟)
time_windows = []
start_time = datetime(2022, 6, 8, 8, 0)  # 早上8点开始

for i, row in pickup_points.iterrows():
    tw_start_min = int((row['tw_start'] - start_time).total_seconds() / 60)
    tw_end_min = int((row['tw_end'] - start_time).total_seconds() / 60)
    time_windows.append((tw_start_min, tw_end_min))

# 起点时间窗(8:00-8:00)
time_windows[0] = (0, 0)

print("求解VRPTW...")
import time
start = time.time()
manager, routing, solution, time_dimension = solve_vrptw(
    distance_matrix_int,
    time_windows,
    service_time=5
)
solve_time = time.time() - start

if solution:
    print(f"找到解!(用时{solve_time:.3f}秒)")
else:
    print("未找到可行解")

4.2、结果分析

if solution:
    # 提取优化路线
    route = []
    index = routing.Start(0)

    while not routing.IsEnd(index):
        node = manager.IndexToNode(index)
        route.append(node)
        index = solution.Value(routing.NextVar(index))

    route.append(manager.IndexToNode(index))

    # 计算总距离
    total_distance = 0
    for i in range(len(route) - 1):
        total_distance += distance_matrix[route[i]][route[i+1]]

    print(f"\n优化路线总距离: {total_distance:.2f} km")

    # 对比原始路线
    naive_route = list(range(len(distance_matrix)))
    naive_distance = 0
    for i in range(len(naive_route) - 1):
        naive_distance += distance_matrix[naive_route[i]][naive_route[i+1]]

    print(f"原始路线总距离: {naive_distance:.2f} km")
    print(f"节省距离: {(naive_distance - total_distance) / naive_distance * 100:.1f}%")

    # 检查时间窗违反情况
    violations = 0
    for i, node in enumerate(route[1:-1], 1):
        time_var = time_dimension.CumulVar(manager.NodeToIndex(node))
        arrival_time = solution.Value(time_var)
        tw_start, tw_end = time_windows[node]

        if arrival_time < tw_start or arrival_time > tw_end:
            violations += 1

    print(f"时间窗违反: {violations}个")

运行结果:

求解VRPTW...
找到解!(用时28.342秒)

优化路线总距离: 4.24 km
原始路线总距离: 6.43 km
节省距离: 34.1%
时间窗违反: 0个

优化效果:

  • 距离节省34.1%
  • 所有时间窗都满足
  • 求解时间30秒

4.3、Web界面展示

除了脚本,我还做了一个Web界面,可以直观地查看优化效果。

4.3.1、界面功能

核心展示

  • 📊 优化前后数据对比
  • 🗺️ 地图可视化路线
  • ⏰ 时间窗违反检测
  • 📈 准时率统计

4.3.2、实际效果

图片

关键指标对比

指标原始路线优化后路线改善
总距离6.43 km4.24 km↓ 34.1%
准时率21.4%100%↑ 78.6%
时间窗违反11个0个✅ 全部满足

优化亮点

  • 距离节省超过1/3,直接降低油耗成本
  • 准时率从21.4%提升到100%,客户体验大幅改善
  • 所有时间窗约束都满足,不会出现客户等待或白跑情况

五、踩过的坑

5.1、坑1:时间窗设置不合理

最开始我把时间窗设得太紧,导致无解。

错误做法:

# 每个点只有30分钟时间窗
time_windows.append((tw_start_min, tw_start_min + 30))

问题:14个点,每个30分钟,物理上不可能都满足。

解决:用真实的时间窗(2小时),给优化算法足够的灵活空间。

5.2、坑2:速度估计不准

最开始我假设速度30km/h,结果算出来的路线时间窗全违反。

原因:市区配送不是高速公路,要考虑红绿灯、拥堵、停车。实际速度可能只有15-20km/h。

解决:

# 保守估计速度20km/h
travel_time = distance_matrix[from_node][to_node] / 1000 * 3  # 3分钟/km

5.3、坑3:服务时间被忽略

最开始我没加服务时间,导致时间窗计算不准。

每个点要停车、找客户、交接包裹,至少5分钟。

解决:

return int(travel_time + service_time)  # 加上服务时间

六、实战建议

6.1、时间窗冲突怎么办?

如果时间窗物理上无法满足(比如14个点都要求9-10点),有几个办法:

  1. 软约束:允许违反时间窗,但加惩罚
  2. 优先级:优先满足重要客户(大客户、VIP)
  3. 协商:违反的点提前联系客户协商

6.2、多车场景

如果有多个快递员(多辆车),需要用MDVRPTW(多车辆带时间窗的VRP)。

修改很简单:

manager = pywrapcp.RoutingIndexManager(
    len(distance_matrix),
    num_vehicles,  # 改成车辆数
    0
)

求解器会自动分配订单到不同车辆。

七、总结

取件路线优化(VRPTW)比配送(TSP)难多了,但也更有价值。

实测效果(上海快递员3243,2022-06-08):

  • 取件点数:14个
  • 距离节省:34.1%(6.43km → 4.24km)
  • 时间窗违反:0个
  • 求解时间:30秒

核心难点:

  • 时间窗是硬约束,早到迟到都不行
  • 需要在空间距离和时间窗口之间找平衡
  • 可能存在无可行解的情况

适用场景:

  • 上门取件(快递、外卖)
  • 上门维修(家电、汽车)
  • 医疗上门服务
  • 任何有时间窗约束的服务