IA para distribuidoras: previsão de demanda, otimização de estoque e roteirização

Como distribuidoras usam IA para prever demanda com precisão, otimizar níveis de estoque e reduzir custos de logística com roteirização inteligente.

Distribuidoras operam em um equilíbrio delicado: estoque demais imobiliza capital e gera perdas (produtos perecíveis, obsolescência). Estoque de menos gera ruptura, vendas perdidas e clientes insatisfeitos.

Os três principais pontos de dor de distribuidoras são precisamente onde IA entrega maior impacto:

  • Previsão de demanda imprecisa → produtos errados no lugar errado
  • Gestão de estoque manual → excesso ou falta
  • Logística sub-otimizada → custos altos de entrega

O custo real da ineficiência em distribuição

Cenário típico: distribuidora de alimentos e bebidas

Faturamento: R$ 80M/ano SKUs ativos: 2.500 produtos Clientes: 800 pontos de venda Margem bruta: 12-18%

Custos de ineficiência operacional:

ProblemaCusto anual típico
Ruptura de estoque (vendas perdidas)R$ 3,2M (4% do faturamento)
Excesso de estoque (capital parado + perdas)R$ 2,4M (custo de oportunidade)
Produtos vencidos/avariadosR$ 800k (1% do faturamento)
Logística sub-otimizada (combustível, tempo)R$ 1,8M
Total desperdiçadoR$ 8,2M/ano

Reduzir esses custos em 40-60% com IA significa R$ 3,3M-4,9M de ganho anual.

Previsão de demanda com IA: reduzindo erro de forecasting

Distribuidoras tradicionalmente preveem demanda com:

  • Média móvel dos últimos 3-6 meses
  • Ajuste manual por “feeling” do time comercial
  • Excel com fórmulas simples

Problema: não captura sazonalidades, tendências, eventos externos.

Erro típico de forecasting: 25-35% (MAPE - Mean Absolute Percentage Error)

Como IA melhora previsão de demanda

Modelos de ML consideram múltiplas variáveis simultaneamente:

Variáveis históricas:

  • Vendas passadas (por SKU, por cliente, por região)
  • Sazonalidade (mensal, semanal, dia da semana)
  • Tendências de longo prazo

Variáveis contextuais:

  • Promoções planejadas
  • Feriados e eventos
  • Clima (impacta bebidas, sorvetes, etc.)
  • Lançamentos de produtos

Variáveis externas:

  • Indicadores econômicos
  • Calendário escolar (impacta snacks, material escolar)
  • Eventos esportivos (impacta bebidas alcoólicas)

Implementação de forecasting com IA

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from prophet import Prophet  # modelo de forecasting do Facebook
import holidays

class DemandForecaster:
    def __init__(self):
        self.models = {}  # um modelo por SKU ou categoria

    def preparar_features(self, df: pd.DataFrame, sku: str) -> pd.DataFrame:
        """
        Cria features para previsão de demanda de um SKU
        """
        # Filtra SKU específico
        df_sku = df[df['sku'] == sku].copy()
        df_sku = df_sku.sort_values('data')

        # Features temporais
        df_sku['dia_semana'] = df_sku['data'].dt.dayofweek
        df_sku['dia_mes'] = df_sku['data'].dt.day
        df_sku['mes'] = df_sku['data'].dt.month
        df_sku['semana_ano'] = df_sku['data'].dt.isocalendar().week

        # Lag features (vendas passadas)
        for lag in [7, 14, 30, 90]:
            df_sku[f'vendas_lag_{lag}d'] = df_sku['quantidade'].shift(lag)

        # Rolling statistics
        for window in [7, 30]:
            df_sku[f'vendas_media_{window}d'] = (
                df_sku['quantidade'].rolling(window=window).mean()
            )
            df_sku[f'vendas_std_{window}d'] = (
                df_sku['quantidade'].rolling(window=window).std()
            )

        # Features de tendência
        df_sku['vendas_variacao_7d'] = (
            df_sku['quantidade'] - df_sku['vendas_lag_7d']
        ) / df_sku['vendas_lag_7d']

        # Features de sazonalidade
        df_sku['eh_feriado'] = df_sku['data'].apply(
            lambda x: x in holidays.Brazil()
        )
        df_sku['dias_ate_feriado'] = df_sku['data'].apply(
            self._dias_ate_proximo_feriado
        )

        # Features externas (se disponível)
        if 'temperatura' in df.columns:
            df_sku['temperatura'] = df['temperatura']

        if 'promocao' in df.columns:
            df_sku['em_promocao'] = df['promocao']

        return df_sku.dropna()

    def treinar_modelo(self, dados_historicos: pd.DataFrame, sku: str):
        """
        Treina modelo de previsão para um SKU
        """
        df_features = self.preparar_features(dados_historicos, sku)

        # Features de entrada
        feature_cols = [col for col in df_features.columns
                       if col not in ['data', 'sku', 'quantidade']]

        X = df_features[feature_cols]
        y = df_features['quantidade']

        # Treina modelo
        model = RandomForestRegressor(
            n_estimators=100,
            max_depth=10,
            random_state=42,
            n_jobs=-1
        )
        model.fit(X, y)

        self.models[sku] = {
            'model': model,
            'feature_cols': feature_cols,
            'ultima_data_treino': df_features['data'].max()
        }

        return model

    def prever_demanda(
        self,
        sku: str,
        dias_futuro: int = 30,
        dados_atuais: pd.DataFrame = None
    ) -> pd.DataFrame:
        """
        Prevê demanda para os próximos N dias
        """
        if sku not in self.models:
            raise ValueError(f"Modelo não treinado para SKU {sku}")

        model_info = self.models[sku]
        model = model_info['model']
        feature_cols = model_info['feature_cols']

        # Prepara features futuras
        ultima_data = model_info['ultima_data_treino']
        datas_futuras = pd.date_range(
            start=ultima_data + pd.Timedelta(days=1),
            periods=dias_futuro,
            freq='D'
        )

        previsoes = []

        for data in datas_futuras:
            # Monta features para esta data futura
            features_futuras = self._construir_features_futuras(
                data,
                sku,
                dados_atuais,
                feature_cols
            )

            # Prevê
            demanda_prevista = model.predict([features_futuras])[0]

            previsoes.append({
                'data': data,
                'sku': sku,
                'demanda_prevista': max(0, round(demanda_prevista)),
                'confianca': self._calcular_confianca(model, features_futuras)
            })

        return pd.DataFrame(previsoes)

    def _dias_ate_proximo_feriado(self, data: pd.Timestamp) -> int:
        """Calcula dias até próximo feriado"""
        br_holidays = holidays.Brazil()
        dias = 0
        data_temp = data
        while dias < 60:
            data_temp += pd.Timedelta(days=1)
            if data_temp in br_holidays:
                return dias + 1
            dias += 1
        return 60

    def _calcular_confianca(self, model, features) -> str:
        """
        Estima confiança da previsão baseado em variância das árvores
        """
        # Em RandomForest, podemos usar variância entre árvores
        previsoes_arvores = [tree.predict([features])[0]
                            for tree in model.estimators_]
        std = np.std(previsoes_arvores)
        media = np.mean(previsoes_arvores)

        coef_variacao = std / media if media > 0 else 999

        if coef_variacao < 0.1:
            return "ALTA"
        elif coef_variacao < 0.25:
            return "MÉDIA"
        else:
            return "BAIXA"

# Uso
forecaster = DemandForecaster()

# Dados históricos
vendas_historicas = carregar_vendas_historicas()  # últimos 2 anos

# Treina modelo para top 100 SKUs (por faturamento)
top_skus = vendas_historicas.groupby('sku')['receita'].sum().nlargest(100).index

for sku in top_skus:
    print(f"Treinando modelo para SKU {sku}...")
    forecaster.treinar_modelo(vendas_historicas, sku)

# Prevê próximos 30 dias
previsoes = {}
for sku in top_skus:
    prev = forecaster.prever_demanda(sku, dias_futuro=30)
    previsoes[sku] = prev

    # Salva no sistema
    salvar_previsao_demanda(sku, prev)

# Exemplo de output
sku_cerveja = 'CERVEJA-PILSEN-350ML'
print(f"\nPrevisão de demanda - {sku_cerveja}")
print(previsoes[sku_cerveja].head(10))

Output exemplo:

Previsão de demanda - CERVEJA-PILSEN-350ML

data        | demanda_prevista | confianca
------------|------------------|----------
2026-05-01  | 1.245 un        | ALTA
2026-05-02  | 1.189 un        | ALTA
2026-05-03  | 982 un          | MÉDIA (sábado)
2026-05-04  | 756 un          | MÉDIA (domingo)
2026-05-05  | 1.312 un        | ALTA
...
2026-05-30  | 2.890 un        | ALTA (véspera de feriado)

Melhoria vs. forecasting tradicional

Antes (média móvel manual):

  • MAPE (erro médio): 28%
  • Ruptura: 8% dos SKUs/mês
  • Excesso: 15% dos SKUs acima de 60 dias de cobertura

Depois (modelo de ML):

  • MAPE: 14% (-50% de erro)
  • Ruptura: 3% dos SKUs/mês
  • Excesso: 6% dos SKUs acima de 60 dias

Impacto financeiro:

  • Redução de vendas perdidas por ruptura: R$ 1,8M/ano
  • Redução de capital parado em estoque: R$ 1,2M
  • Total: R$ 3M/ano

Otimização de estoque: quanto ter de cada produto

Com demanda prevista, próximo passo é calcular níveis ótimos de estoque.

Cálculo de estoque de segurança com IA

class EstoqueOtimizador:
    def calcular_parametros_estoque(
        self,
        sku: str,
        previsao_demanda: pd.DataFrame,
        dados_sku: dict
    ) -> dict:
        """
        Calcula ponto de pedido, estoque de segurança e lote de compra
        """
        # Demanda média diária prevista
        demanda_media_dia = previsao_demanda['demanda_prevista'].mean()

        # Variabilidade da demanda
        std_demanda = previsao_demanda['demanda_prevista'].std()

        # Lead time de fornecedor (dias)
        lead_time = dados_sku['lead_time_dias']
        std_lead_time = dados_sku.get('std_lead_time', lead_time * 0.1)

        # Nível de serviço desejado (% de não ruptura)
        nivel_servico = dados_sku.get('nivel_servico_target', 0.95)

        # Z-score para nível de serviço (95% = 1.65, 98% = 2.05, 99% = 2.33)
        from scipy import stats
        z_score = stats.norm.ppf(nivel_servico)

        # Estoque de segurança (considerando variabilidade de demanda e lead time)
        estoque_seguranca = z_score * np.sqrt(
            (lead_time * std_demanda**2) +
            (demanda_media_dia**2 * std_lead_time**2)
        )

        # Ponto de pedido
        ponto_pedido = (demanda_media_dia * lead_time) + estoque_seguranca

        # Lote econômico de compra (EOQ - Economic Order Quantity)
        custo_pedido = dados_sku.get('custo_pedido', 200)  # R$ por pedido
        custo_unitario = dados_sku['custo_unitario']
        taxa_manutencao = dados_sku.get('taxa_manutencao_anual', 0.20)  # 20% ao ano

        demanda_anual = demanda_media_dia * 365

        eoq = np.sqrt(
            (2 * demanda_anual * custo_pedido) /
            (custo_unitario * taxa_manutencao)
        )

        # Estoque máximo
        estoque_maximo = ponto_pedido + eoq

        # Cobertura em dias
        cobertura_dias_seguranca = estoque_seguranca / demanda_media_dia
        cobertura_dias_maximo = estoque_maximo / demanda_media_dia

        return {
            'sku': sku,
            'demanda_media_dia': round(demanda_media_dia, 1),
            'estoque_seguranca': round(estoque_seguranca),
            'ponto_pedido': round(ponto_pedido),
            'lote_compra': round(eoq),
            'estoque_maximo': round(estoque_maximo),
            'cobertura_seguranca_dias': round(cobertura_dias_seguranca, 1),
            'cobertura_maxima_dias': round(cobertura_dias_maximo, 1),
            'nivel_servico_target': f"{nivel_servico:.0%}"
        }

    def gerar_sugestoes_compra(self, estoque_atual: pd.DataFrame) -> pd.DataFrame:
        """
        Gera sugestões de compra baseado em estoque atual vs. parâmetros ótimos
        """
        sugestoes = []

        for _, item in estoque_atual.iterrows():
            sku = item['sku']
            qtd_atual = item['quantidade_estoque']

            # Busca parâmetros calculados
            params = self.parametros_estoque.get(sku)
            if not params:
                continue

            # Verifica se precisa comprar
            if qtd_atual <= params['ponto_pedido']:
                qtd_sugerida = params['estoque_maximo'] - qtd_atual

                # Ajusta para múltiplo do lote mínimo se fornecedor tiver
                lote_minimo = item.get('lote_minimo_fornecedor', 1)
                qtd_sugerida = np.ceil(qtd_sugerida / lote_minimo) * lote_minimo

                urgencia = 'URGENTE' if qtd_atual < params['estoque_seguranca'] else 'NORMAL'

                sugestoes.append({
                    'sku': sku,
                    'descricao': item['descricao'],
                    'estoque_atual': qtd_atual,
                    'ponto_pedido': params['ponto_pedido'],
                    'quantidade_sugerida': qtd_sugerida,
                    'custo_total': qtd_sugerida * item['custo_unitario'],
                    'urgencia': urgencia,
                    'fornecedor': item['fornecedor_principal']
                })

        df_sugestoes = pd.DataFrame(sugestoes)

        # Ordena por urgência e valor
        df_sugestoes = df_sugestoes.sort_values(
            ['urgencia', 'custo_total'],
            ascending=[False, False]
        )

        return df_sugestoes

# Uso
otimizador = EstoqueOtimizador()

# Calcula parâmetros para todos os SKUs
for sku in top_skus:
    previsao = previsoes[sku]
    dados_sku = carregar_dados_sku(sku)

    params = otimizador.calcular_parametros_estoque(
        sku,
        previsao,
        dados_sku
    )

    otimizador.parametros_estoque[sku] = params
    salvar_parametros_estoque(sku, params)

# Gera sugestões de compra para hoje
estoque_atual = carregar_estoque_atual()
sugestoes_compra = otimizador.gerar_sugestoes_compra(estoque_atual)

print("\n🛒 SUGESTÕES DE COMPRA")
print(sugestoes_compra[['sku', 'estoque_atual', 'quantidade_sugerida', 'custo_total', 'urgencia']])

Output:

🛒 SUGESTÕES DE COMPRA

sku                    | estoque_atual | qtd_sugerida | custo_total | urgencia
-----------------------|---------------|--------------|-------------|----------
ARROZ-TIPO1-5KG       | 145 un        | 850 un       | R$ 22.100   | URGENTE
FEIJAO-PRETO-1KG      | 89 un         | 620 un       | R$ 4.340    | URGENTE
CERVEJA-PILSEN-350ML  | 1.205 un      | 2.800 un     | R$ 4.760    | NORMAL
REFRIGERANTE-COLA-2L  | 678 un        | 1.200 un     | R$ 3.960    | NORMAL

Roteirização inteligente: reduzindo custo de entrega

Distribuidoras geralmente atendem dezenas a centenas de clientes por dia, cada um em localização diferente.

Problema de roteirização = VRP (Vehicle Routing Problem):

  • Minimizar distância total percorrida
  • Respeitar capacidade do veículo
  • Respeitar janelas de entrega dos clientes
  • Respeitar jornada de trabalho do motorista

É um problema NP-difícil (computacionalmente complexo), mas IA (algoritmos de otimização) resolve aproximadamente bem.

Implementação de roteirização

from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
import googlemaps

class Roteirizador:
    def __init__(self, google_maps_api_key: str):
        self.gmaps = googlemaps.Client(key=google_maps_api_key)

    def calcular_matriz_distancias(self, enderecos: list) -> np.ndarray:
        """
        Calcula matriz de distâncias entre todos os pontos usando Google Maps
        """
        n = len(enderecos)
        matriz = np.zeros((n, n))

        # Cache para evitar chamadas repetidas
        cache = {}

        for i in range(n):
            for j in range(i+1, n):
                key = (enderecos[i], enderecos[j])
                if key in cache:
                    distancia = cache[key]
                else:
                    resultado = self.gmaps.distance_matrix(
                        origins=[enderecos[i]],
                        destinations=[enderecos[j]],
                        mode='driving'
                    )

                    # Distância em km
                    distancia = resultado['rows'][0]['elements'][0]['distance']['value'] / 1000
                    cache[key] = distancia

                matriz[i][j] = distancia
                matriz[j][i] = distancia  # simétrica

        return matriz

    def otimizar_rota(
        self,
        deposito: str,
        entregas: list,
        capacidade_veiculo: float
    ) -> dict:
        """
        Otimiza rota de entregas usando OR-Tools

        entregas: lista de dicts com {cliente, endereco, peso_kg}
        """
        # Monta lista de endereços (depósito + clientes)
        enderecos = [deposito] + [e['endereco'] for e in entregas]

        # Calcula matriz de distâncias
        matriz_distancias = self.calcular_matriz_distancias(enderecos)

        # Demanda de cada ponto (índice 0 = depósito com demanda 0)
        demandas = [0] + [e['peso_kg'] for e in entregas]

        # Cria modelo de roteamento
        manager = pywrapcp.RoutingIndexManager(
            len(enderecos),  # número de locais
            1,  # número de veículos
            0  # índice do depósito
        )

        routing = pywrapcp.RoutingModel(manager)

        # Callback de distância
        def distance_callback(from_index, to_index):
            from_node = manager.IndexToNode(from_index)
            to_node = manager.IndexToNode(to_index)
            return int(matriz_distancias[from_node][to_node] * 1000)  # em metros

        transit_callback_index = routing.RegisterTransitCallback(distance_callback)
        routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

        # Constraint de capacidade
        def demand_callback(from_index):
            from_node = manager.IndexToNode(from_index)
            return demandas[from_node]

        demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)

        routing.AddDimensionWithVehicleCapacity(
            demand_callback_index,
            0,  # slack (folga)
            [int(capacidade_veiculo)],  # capacidade do veículo
            True,  # start cumul to zero
            'Capacity'
        )

        # Parâmetros de busca
        search_parameters = pywrapcp.DefaultRoutingSearchParameters()
        search_parameters.first_solution_strategy = (
            routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
        )
        search_parameters.local_search_metaheuristic = (
            routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
        )
        search_parameters.time_limit.seconds = 30

        # Resolve
        solution = routing.SolveWithParameters(search_parameters)

        if solution:
            return self._extrair_rota(manager, routing, solution, entregas, enderecos)
        else:
            return {'erro': 'Não foi possível encontrar solução'}

    def _extrair_rota(self, manager, routing, solution, entregas, enderecos):
        """Extrai rota otimizada da solução"""
        index = routing.Start(0)
        rota = []
        distancia_total = 0
        carga_total = 0

        ordem = 0
        while not routing.IsEnd(index):
            node_index = manager.IndexToNode(index)

            if node_index > 0:  # não é o depósito
                entrega = entregas[node_index - 1]
                ordem += 1

                rota.append({
                    'ordem': ordem,
                    'cliente': entrega['cliente'],
                    'endereco': entrega['endereco'],
                    'peso_kg': entrega['peso_kg'],
                    'carga_acumulada': carga_total + entrega['peso_kg']
                })

                carga_total += entrega['peso_kg']

            previous_index = index
            index = solution.Value(routing.NextVar(index))

            if not routing.IsEnd(index):
                from_node = manager.IndexToNode(previous_index)
                to_node = manager.IndexToNode(index)
                distancia_total += matriz_distancias[from_node][to_node]

        # Volta ao depósito
        last_node = manager.IndexToNode(previous_index)
        distancia_total += matriz_distancias[last_node][0]

        return {
            'rota': rota,
            'distancia_total_km': round(distancia_total, 1),
            'numero_entregas': len(rota),
            'carga_total_kg': carga_total,
            'tempo_estimado_min': self._estimar_tempo(distancia_total, len(rota))
        }

    def _estimar_tempo(self, distancia_km: float, num_entregas: int) -> int:
        """
        Estima tempo total da rota
        - 40 km/h velocidade média (considerando trânsito urbano)
        - 15 min por entrega (descarga + burocracia)
        """
        tempo_direcao = (distancia_km / 40) * 60  # minutos
        tempo_entregas = num_entregas * 15  # minutos
        return round(tempo_direcao + tempo_entregas)

# Uso
roteirizador = Roteirizador(google_maps_api_key='SUA_API_KEY')

# Entregas do dia
deposito = "Rua do Depósito 123, São Paulo, SP"

entregas_hoje = [
    {'cliente': 'Mercado ABC', 'endereco': 'Av. Paulista 1000, São Paulo', 'peso_kg': 120},
    {'cliente': 'Bar do João', 'endereco': 'Rua Augusta 500, São Paulo', 'peso_kg': 85},
    {'cliente': 'Restaurante XYZ', 'endereco': 'Rua da Consolação 2000, São Paulo', 'peso_kg': 150},
    # ... mais 20 entregas
]

# Otimiza rota
resultado = roteirizador.otimizar_rota(
    deposito=deposito,
    entregas=entregas_hoje,
    capacidade_veiculo=1500  # kg
)

print(f"\n🚚 ROTA OTIMIZADA")
print(f"Distância total: {resultado['distancia_total_km']} km")
print(f"Tempo estimado: {resultado['tempo_estimado_min']} min ({resultado['tempo_estimado_min']/60:.1f}h)")
print(f"Número de entregas: {resultado['numero_entregas']}")
print(f"\nSequência de entrega:")
for parada in resultado['rota']:
    print(f"{parada['ordem']}. {parada['cliente']} - {parada['peso_kg']} kg")

Ganho com roteirização otimizada

Antes (rota manual / “no feeling”):

  • Distância média por rota: 180 km
  • Tempo médio por rota: 7,5 horas
  • Combustível: 16 litros/rota (11 km/l)
  • 5 rotas/dia em média

Depois (otimização com IA):

  • Distância média por rota: 125 km (-30%)
  • Tempo médio por rota: 6 horas (-20%)
  • Combustível: 11,4 litros/rota
  • 6 rotas/dia (mesma frota, mais produtiva)

Economia mensal (22 dias úteis):

  • Combustível economizado: 505 litros/mês × R$ 6,50 = R$ 3.280/mês
  • Manutenção reduzida (menos km): R$ 1.800/mês
  • Produtividade (+20% entregas): R$ 18.000/mês em receita adicional
  • Total: R$ 23.000/mês (R$ 276.000/ano)

ROI consolidado para distribuidoras

Cenário: Distribuidora com R$ 80M faturamento/ano

Investimento em IA:

  • Implementação de forecasting + otimização de estoque: R$ 120k
  • Sistema de roteirização: R$ 45k
  • Integração com ERP existente: R$ 35k
  • Total implementação: R$ 200k
  • Custo operacional: R$ 5k/mês (R$ 60k/ano)

Ganhos anuais:

  • Redução de ruptura de estoque: R$ 1,8M
  • Redução de capital parado: R$ 1,2M
  • Redução de perdas (vencidos/avariados): R$ 480k
  • Economia logística: R$ 276k
  • Total anual: R$ 3,756M

ROI primeiro ano: 1.344% Payback: 0,8 meses (< 1 mês)


Se você opera uma distribuidora e quer implementar previsão de demanda, otimização de estoque ou roteirização inteligente, agende um diagnóstico. Analisamos seu perfil operacional e identificamos as oportunidades de maior impacto no seu contexto específico.

Pronto para sair do manual?

Agende o diagnóstico gratuito. Vamos mapear o gargalo, estimar o impacto e definir o primeiro resultado mensurável.

Você sai com clareza — não com um pitch de vendas.