Files
MES_Core/shiftflow/services/bom_explosion.py
ackFromRedmi e88b861f68
All checks were successful
Deploy MES Core / deploy (push) Successful in 11s
Огромная замена логики
2026-04-06 08:06:37 +03:00

265 lines
8.4 KiB
Python

from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from django.db import transaction
from django.db.models import Sum
from django.db.models.functions import Coalesce
from manufacturing.models import BOM, ProductEntity
from shiftflow.models import Deal, DealItem, MaterialRequirement, ProductionTask
from warehouse.models import Location, StockItem
@dataclass(frozen=True)
class ExplosionStats:
"""
Сводка результата BOM Explosion.
tasks_*:
- сколько ProductionTask создано/обновлено (по leaf-деталям)
req_*:
- сколько MaterialRequirement создано/обновлено (по сырью)
"""
tasks_created: int
tasks_updated: int
req_created: int
req_updated: int
def _category_kind(material_category_name: str) -> str:
"""
Определение типа материала по названию категории.
Возвращает:
- 'sheet' для листовых материалов
- 'linear' для профилей/труб/круга
- 'unknown' если не удалось определить
"""
s = (material_category_name or "").strip().lower()
if "лист" in s:
return "sheet"
if any(k in s for k in ["труба", "проф", "круг", "швел", "угол", "балк", "квадрат"]):
return "linear"
return "unknown"
def _norm_and_unit(entity: ProductEntity) -> tuple[float | None, str]:
"""
Возвращает норму расхода и единицу измерения для MaterialRequirement.
Логика:
- для листа берём blank_area_m2 (м²/шт)
- для линейного берём blank_length_mm (мм/шт)
Если категория не распознана, но одна из норм задана — используем заданную.
"""
if not entity.planned_material_id or not getattr(entity.planned_material, "category_id", None):
if entity.blank_area_m2:
return float(entity.blank_area_m2), "m2"
if entity.blank_length_mm:
return float(entity.blank_length_mm), "mm"
return None, "pcs"
kind = _category_kind(entity.planned_material.category.name)
if kind == "sheet":
return (float(entity.blank_area_m2) if entity.blank_area_m2 else None), "m2"
if kind == "linear":
return (float(entity.blank_length_mm) if entity.blank_length_mm else None), "mm"
if entity.blank_area_m2:
return float(entity.blank_area_m2), "m2"
if entity.blank_length_mm:
return float(entity.blank_length_mm), "mm"
return None, "pcs"
def _build_bom_graph(root_entity_ids: set[int]) -> dict[int, list[tuple[int, int]]]:
"""
Строит граф BOM в памяти для заданного множества root entity.
Возвращает:
adjacency[parent_id] = [(child_id, qty), ...]
"""
adjacency: dict[int, list[tuple[int, int]]] = defaultdict(list)
frontier = set(root_entity_ids)
seen = set()
while frontier:
batch = frontier - seen
if not batch:
break
seen |= batch
rows = BOM.objects.filter(parent_id__in=batch).values_list("parent_id", "child_id", "quantity")
next_frontier = set()
for parent_id, child_id, qty in rows:
q = int(qty or 0)
if q <= 0:
continue
adjacency[int(parent_id)].append((int(child_id), q))
next_frontier.add(int(child_id))
frontier |= next_frontier
return adjacency
def _explode_to_leaves(
entity_id: int,
adjacency: dict[int, list[tuple[int, int]]],
memo: dict[int, dict[int, int]],
visiting: set[int],
) -> dict[int, int]:
"""
Возвращает разложение entity_id в leaf-детали в виде:
{ leaf_entity_id: multiplier_for_one_unit }
"""
if entity_id in memo:
return memo[entity_id]
if entity_id in visiting:
raise RuntimeError("Цикл в BOM: спецификация зациклена.")
visiting.add(entity_id)
children = adjacency.get(entity_id) or []
if not children:
memo[entity_id] = {entity_id: 1}
visiting.remove(entity_id)
return memo[entity_id]
out: dict[int, int] = defaultdict(int)
for child_id, qty in children:
child_map = _explode_to_leaves(child_id, adjacency, memo, visiting)
for leaf_id, leaf_qty in child_map.items():
out[leaf_id] += int(qty) * int(leaf_qty)
memo[entity_id] = dict(out)
visiting.remove(entity_id)
return memo[entity_id]
@transaction.atomic
def explode_deal(
deal_id: int,
*,
central_location_name: str = "Центральный склад",
) -> ExplosionStats:
"""
BOM Explosion:
- берём состав сделки (DealItem)
- рекурсивно обходим BOM
- считаем суммарное количество leaf-деталей
- создаём/обновляем ProductionTask (deal + entity)
- создаём/обновляем MaterialRequirement по нормам расхода и остаткам на центральном складе
"""
deal = Deal.objects.select_for_update().get(pk=deal_id)
deal_items = list(DealItem.objects.select_related("entity").filter(deal=deal))
if not deal_items:
return ExplosionStats(0, 0, 0, 0)
root_ids = {di.entity_id for di in deal_items}
adjacency = _build_bom_graph(root_ids)
memo: dict[int, dict[int, int]] = {}
required_leaves: dict[int, int] = defaultdict(int)
for di in deal_items:
leaf_map = _explode_to_leaves(di.entity_id, adjacency, memo, set())
for leaf_id, qty_per_unit in leaf_map.items():
required_leaves[leaf_id] += int(di.quantity) * int(qty_per_unit)
leaf_entities = {
e.id: e
for e in ProductEntity.objects.select_related("planned_material", "planned_material__category")
.filter(id__in=list(required_leaves.keys()))
}
tasks_created = 0
tasks_updated = 0
for entity_id, qty in required_leaves.items():
entity = leaf_entities.get(entity_id)
if not entity:
continue
pt, created = ProductionTask.objects.get_or_create(
deal=deal,
entity=entity,
defaults={
"drawing_name": entity.name or "Б/ч",
"size_value": 0,
"material": entity.planned_material,
"quantity_ordered": int(qty),
"is_bend": False,
},
)
if created:
tasks_created += 1
else:
changed = False
if pt.quantity_ordered != int(qty):
pt.quantity_ordered = int(qty)
changed = True
if not pt.material_id and entity.planned_material_id:
pt.material = entity.planned_material
changed = True
if changed:
pt.save(update_fields=["quantity_ordered", "material"])
tasks_updated += 1
central, _ = Location.objects.get_or_create(
name=central_location_name,
defaults={"is_production_area": False},
)
req_created = 0
req_updated = 0
for entity_id, qty_parts in required_leaves.items():
entity = leaf_entities.get(entity_id)
if not entity or not entity.planned_material_id:
continue
per_unit, unit = _norm_and_unit(entity)
if not per_unit:
continue
required_qty = float(qty_parts) * float(per_unit)
available = (
StockItem.objects.filter(location=central, material=entity.planned_material)
.aggregate(v=Coalesce(Sum("quantity"), 0.0))["v"]
)
to_buy = max(0.0, required_qty - float(available or 0.0))
if to_buy <= 0:
continue
mr, created = MaterialRequirement.objects.get_or_create(
deal=deal,
material=entity.planned_material,
unit=unit,
defaults={"required_qty": to_buy, "status": "needed"},
)
if created:
req_created += 1
else:
mr.required_qty = to_buy
mr.status = "needed"
mr.save(update_fields=["required_qty", "status"])
req_updated += 1
return ExplosionStats(tasks_created, tasks_updated, req_created, req_updated)