All checks were successful
Deploy MES Core / deploy (push) Successful in 11s
265 lines
8.4 KiB
Python
265 lines
8.4 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
|
|
from django.db import transaction
|
|
from django.db.models import Sum
|
|
from django.db.models.functions import Coalesce
|
|
|
|
from manufacturing.models import BOM, ProductEntity
|
|
from shiftflow.models import Deal, DealItem, MaterialRequirement, ProductionTask
|
|
from warehouse.models import Location, StockItem
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ExplosionStats:
|
|
"""
|
|
Сводка результата BOM Explosion.
|
|
|
|
tasks_*:
|
|
- сколько ProductionTask создано/обновлено (по leaf-деталям)
|
|
|
|
req_*:
|
|
- сколько MaterialRequirement создано/обновлено (по сырью)
|
|
"""
|
|
|
|
tasks_created: int
|
|
tasks_updated: int
|
|
req_created: int
|
|
req_updated: int
|
|
|
|
|
|
def _category_kind(material_category_name: str) -> str:
|
|
"""
|
|
Определение типа материала по названию категории.
|
|
|
|
Возвращает:
|
|
- 'sheet' для листовых материалов
|
|
- 'linear' для профилей/труб/круга
|
|
- 'unknown' если не удалось определить
|
|
"""
|
|
s = (material_category_name or "").strip().lower()
|
|
|
|
if "лист" in s:
|
|
return "sheet"
|
|
|
|
if any(k in s for k in ["труба", "проф", "круг", "швел", "угол", "балк", "квадрат"]):
|
|
return "linear"
|
|
|
|
return "unknown"
|
|
|
|
|
|
def _norm_and_unit(entity: ProductEntity) -> tuple[float | None, str]:
|
|
"""
|
|
Возвращает норму расхода и единицу измерения для MaterialRequirement.
|
|
|
|
Логика:
|
|
- для листа берём blank_area_m2 (м²/шт)
|
|
- для линейного берём blank_length_mm (мм/шт)
|
|
|
|
Если категория не распознана, но одна из норм задана — используем заданную.
|
|
"""
|
|
if not entity.planned_material_id or not getattr(entity.planned_material, "category_id", None):
|
|
if entity.blank_area_m2:
|
|
return float(entity.blank_area_m2), "m2"
|
|
if entity.blank_length_mm:
|
|
return float(entity.blank_length_mm), "mm"
|
|
return None, "pcs"
|
|
|
|
kind = _category_kind(entity.planned_material.category.name)
|
|
|
|
if kind == "sheet":
|
|
return (float(entity.blank_area_m2) if entity.blank_area_m2 else None), "m2"
|
|
|
|
if kind == "linear":
|
|
return (float(entity.blank_length_mm) if entity.blank_length_mm else None), "mm"
|
|
|
|
if entity.blank_area_m2:
|
|
return float(entity.blank_area_m2), "m2"
|
|
if entity.blank_length_mm:
|
|
return float(entity.blank_length_mm), "mm"
|
|
|
|
return None, "pcs"
|
|
|
|
|
|
def _build_bom_graph(root_entity_ids: set[int]) -> dict[int, list[tuple[int, int]]]:
|
|
"""
|
|
Строит граф BOM в памяти для заданного множества root entity.
|
|
|
|
Возвращает:
|
|
adjacency[parent_id] = [(child_id, qty), ...]
|
|
"""
|
|
adjacency: dict[int, list[tuple[int, int]]] = defaultdict(list)
|
|
|
|
frontier = set(root_entity_ids)
|
|
seen = set()
|
|
|
|
while frontier:
|
|
batch = frontier - seen
|
|
if not batch:
|
|
break
|
|
seen |= batch
|
|
|
|
rows = BOM.objects.filter(parent_id__in=batch).values_list("parent_id", "child_id", "quantity")
|
|
next_frontier = set()
|
|
|
|
for parent_id, child_id, qty in rows:
|
|
q = int(qty or 0)
|
|
if q <= 0:
|
|
continue
|
|
adjacency[int(parent_id)].append((int(child_id), q))
|
|
next_frontier.add(int(child_id))
|
|
|
|
frontier |= next_frontier
|
|
|
|
return adjacency
|
|
|
|
|
|
def _explode_to_leaves(
|
|
entity_id: int,
|
|
adjacency: dict[int, list[tuple[int, int]]],
|
|
memo: dict[int, dict[int, int]],
|
|
visiting: set[int],
|
|
) -> dict[int, int]:
|
|
"""
|
|
Возвращает разложение entity_id в leaf-детали в виде:
|
|
{ leaf_entity_id: multiplier_for_one_unit }
|
|
"""
|
|
if entity_id in memo:
|
|
return memo[entity_id]
|
|
|
|
if entity_id in visiting:
|
|
raise RuntimeError("Цикл в BOM: спецификация зациклена.")
|
|
|
|
visiting.add(entity_id)
|
|
|
|
children = adjacency.get(entity_id) or []
|
|
if not children:
|
|
memo[entity_id] = {entity_id: 1}
|
|
visiting.remove(entity_id)
|
|
return memo[entity_id]
|
|
|
|
out: dict[int, int] = defaultdict(int)
|
|
for child_id, qty in children:
|
|
child_map = _explode_to_leaves(child_id, adjacency, memo, visiting)
|
|
for leaf_id, leaf_qty in child_map.items():
|
|
out[leaf_id] += int(qty) * int(leaf_qty)
|
|
|
|
memo[entity_id] = dict(out)
|
|
visiting.remove(entity_id)
|
|
return memo[entity_id]
|
|
|
|
|
|
@transaction.atomic
|
|
def explode_deal(
|
|
deal_id: int,
|
|
*,
|
|
central_location_name: str = "Центральный склад",
|
|
) -> ExplosionStats:
|
|
"""
|
|
BOM Explosion:
|
|
- берём состав сделки (DealItem)
|
|
- рекурсивно обходим BOM
|
|
- считаем суммарное количество leaf-деталей
|
|
- создаём/обновляем ProductionTask (deal + entity)
|
|
- создаём/обновляем MaterialRequirement по нормам расхода и остаткам на центральном складе
|
|
"""
|
|
deal = Deal.objects.select_for_update().get(pk=deal_id)
|
|
|
|
deal_items = list(DealItem.objects.select_related("entity").filter(deal=deal))
|
|
if not deal_items:
|
|
return ExplosionStats(0, 0, 0, 0)
|
|
|
|
root_ids = {di.entity_id for di in deal_items}
|
|
adjacency = _build_bom_graph(root_ids)
|
|
|
|
memo: dict[int, dict[int, int]] = {}
|
|
required_leaves: dict[int, int] = defaultdict(int)
|
|
|
|
for di in deal_items:
|
|
leaf_map = _explode_to_leaves(di.entity_id, adjacency, memo, set())
|
|
for leaf_id, qty_per_unit in leaf_map.items():
|
|
required_leaves[leaf_id] += int(di.quantity) * int(qty_per_unit)
|
|
|
|
leaf_entities = {
|
|
e.id: e
|
|
for e in ProductEntity.objects.select_related("planned_material", "planned_material__category")
|
|
.filter(id__in=list(required_leaves.keys()))
|
|
}
|
|
|
|
tasks_created = 0
|
|
tasks_updated = 0
|
|
|
|
for entity_id, qty in required_leaves.items():
|
|
entity = leaf_entities.get(entity_id)
|
|
if not entity:
|
|
continue
|
|
|
|
pt, created = ProductionTask.objects.get_or_create(
|
|
deal=deal,
|
|
entity=entity,
|
|
defaults={
|
|
"drawing_name": entity.name or "Б/ч",
|
|
"size_value": 0,
|
|
"material": entity.planned_material,
|
|
"quantity_ordered": int(qty),
|
|
"is_bend": False,
|
|
},
|
|
)
|
|
if created:
|
|
tasks_created += 1
|
|
else:
|
|
changed = False
|
|
if pt.quantity_ordered != int(qty):
|
|
pt.quantity_ordered = int(qty)
|
|
changed = True
|
|
if not pt.material_id and entity.planned_material_id:
|
|
pt.material = entity.planned_material
|
|
changed = True
|
|
if changed:
|
|
pt.save(update_fields=["quantity_ordered", "material"])
|
|
tasks_updated += 1
|
|
|
|
central, _ = Location.objects.get_or_create(
|
|
name=central_location_name,
|
|
defaults={"is_production_area": False},
|
|
)
|
|
|
|
req_created = 0
|
|
req_updated = 0
|
|
|
|
for entity_id, qty_parts in required_leaves.items():
|
|
entity = leaf_entities.get(entity_id)
|
|
if not entity or not entity.planned_material_id:
|
|
continue
|
|
|
|
per_unit, unit = _norm_and_unit(entity)
|
|
if not per_unit:
|
|
continue
|
|
|
|
required_qty = float(qty_parts) * float(per_unit)
|
|
|
|
available = (
|
|
StockItem.objects.filter(location=central, material=entity.planned_material)
|
|
.aggregate(v=Coalesce(Sum("quantity"), 0.0))["v"]
|
|
)
|
|
to_buy = max(0.0, required_qty - float(available or 0.0))
|
|
if to_buy <= 0:
|
|
continue
|
|
|
|
mr, created = MaterialRequirement.objects.get_or_create(
|
|
deal=deal,
|
|
material=entity.planned_material,
|
|
unit=unit,
|
|
defaults={"required_qty": to_buy, "status": "needed"},
|
|
)
|
|
if created:
|
|
req_created += 1
|
|
else:
|
|
mr.required_qty = to_buy
|
|
mr.status = "needed"
|
|
mr.save(update_fields=["required_qty", "status"])
|
|
req_updated += 1
|
|
|
|
return ExplosionStats(tasks_created, tasks_updated, req_created, req_updated) |