Files
manufacture/mrp_multi_level/wizards/mrp_multi_level.py
Matt Taylor c850eb0bb5 [FIX] mrp_multi_level: fix kit/phantom planning
fixes #1362

Ignoring qty_available for phantom products prevents double counting the
qty_available of components.

Creating planned orders for phantom products is simpler than recursively
exploding phantom BOMs. This also makes it easier to analyze the planning data
generated by the MRP calculation.
2024-12-04 09:42:26 +05:30

928 lines
35 KiB
Python

# Copyright 2016 Ucamco - Wim Audenaert <wim.audenaert@ucamco.com>
# Copyright 2016-19 ForgeFlow S.L. (https://www.forgeflow.com)
# - Jordi Ballester Alomar <jordi.ballester@forgeflow.com>
# - Lois Rilo <lois.rilo@forgeflow.com>
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl.html).
import logging
from datetime import date, timedelta
from odoo import _, api, exceptions, fields, models
from odoo.tools import float_is_zero, mute_logger
logger = logging.getLogger(__name__)
class MultiLevelMrp(models.TransientModel):
_name = "mrp.multi.level"
_description = "Multi Level MRP"
mrp_area_ids = fields.Many2many(
comodel_name="mrp.area",
string="MRP Areas to run",
help="If empty, all areas will be computed.",
)
@api.model
def _prepare_mrp_move_data_from_stock_move(
self, product_mrp_area, move, direction="in"
):
area = product_mrp_area.mrp_area_id
if direction == "out":
mrp_type = "d"
product_qty = -move.product_qty
else:
mrp_type = "s"
product_qty = move.product_qty
po = po_line = None
mo = origin = order_number = order_origin = parent_product_id = None
if move.purchase_line_id:
po = move.purchase_line_id.order_id
order_number = po.name
order_origin = po.origin
origin = "po"
po = move.purchase_line_id.order_id.id
po_line = move.purchase_line_id.id
elif move.production_id or move.raw_material_production_id:
production = move.production_id or move.raw_material_production_id
order_number = production.name
order_origin = production.origin
origin = "mo"
mo = production.id
elif move.move_dest_ids:
for move_dest_id in move.move_dest_ids.filtered("production_id"):
production = move_dest_id.production_id
order_number = production.name
order_origin = production.origin
origin = "mo"
mo = move_dest_id.production_id.id
parent_product_id = (
move_dest_id.production_id.product_id or move_dest_id.product_id
).id
if not order_number:
source = (move.picking_id or move).origin
order_number = source or (move.picking_id or move).name
origin = "mv"
# The date to display is based on the timezone of the warehouse.
today_tz = area._datetime_to_date_tz()
move_date_tz = area._datetime_to_date_tz(move.date)
if move_date_tz > today_tz:
mrp_date = move_date_tz
else:
mrp_date = today_tz
return {
"product_id": move.product_id.id,
"product_mrp_area_id": product_mrp_area.id,
"production_id": mo,
"purchase_order_id": po,
"purchase_line_id": po_line,
"stock_move_id": move.id,
"mrp_qty": product_qty,
"current_qty": product_qty,
"mrp_date": mrp_date,
"current_date": move.date,
"mrp_type": mrp_type,
"mrp_origin": origin or "",
"mrp_order_number": order_number,
"parent_product_id": parent_product_id,
"name": order_number,
"origin": order_origin,
"state": move.state,
}
@api.model
def _prepare_planned_order_data(
self, product_mrp_area, qty, mrp_date_supply, mrp_action_date, name, values
):
return {
"product_mrp_area_id": product_mrp_area.id,
"mrp_qty": qty,
"due_date": mrp_date_supply,
"order_release_date": mrp_action_date,
"mrp_action": product_mrp_area.supply_method,
"qty_released": 0.0,
"name": "Planned supply for: " + name,
"origin": values.get("origin") or name,
"fixed": False,
}
@api.model
def _prepare_mrp_move_data_bom_explosion(
self,
product,
bomline,
qty,
mrp_date_demand_2,
bom,
name,
planned_order,
values=None,
):
product_mrp_area = self._get_product_mrp_area_from_product_and_area(
bomline.product_id, product.mrp_area_id
)
if not product_mrp_area:
raise exceptions.Warning(_("No MRP product found"))
factor = (
product.product_id.uom_id._compute_quantity(
qty, bomline.bom_id.product_uom_id
)
/ bomline.bom_id.product_qty
)
line_quantity = factor * bomline.product_qty
return {
"mrp_area_id": product_mrp_area.mrp_area_id.id,
"product_id": bomline.product_id.id,
"product_mrp_area_id": product_mrp_area.id,
"production_id": None,
"purchase_order_id": None,
"purchase_line_id": None,
"stock_move_id": None,
"mrp_qty": -line_quantity, # TODO: review with UoM
"current_qty": None,
"mrp_date": mrp_date_demand_2,
"current_date": None,
"mrp_type": "d",
"mrp_origin": "mrp",
"mrp_order_number": None,
"parent_product_id": bom.product_id.id,
"name": (
"Demand Bom Explosion: %s"
% (name or product.product_id.default_code or product.product_id.name)
).replace(
"Demand Bom Explosion: Demand Bom Explosion: ", "Demand Bom Explosion: "
),
"origin": planned_order.origin if planned_order else values.get("origin"),
"bom_id": bom.id,
}
@api.model
def _get_action_and_supply_dates(self, product_mrp_area, mrp_date):
if not isinstance(mrp_date, date):
mrp_date = fields.Date.from_string(mrp_date)
if mrp_date < date.today():
mrp_date_supply = date.today()
else:
mrp_date_supply = mrp_date
calendar = product_mrp_area.mrp_area_id.calendar_id
if calendar and product_mrp_area.mrp_lead_time:
date_str = fields.Date.to_string(mrp_date)
dt = fields.Datetime.from_string(date_str)
# dt is at the beginning of the day (00:00)
res = calendar.plan_days(-1 * product_mrp_area.mrp_lead_time, dt)
mrp_action_date = res.date()
else:
mrp_action_date = mrp_date - timedelta(days=product_mrp_area.mrp_lead_time)
return mrp_action_date, mrp_date_supply
@api.model
def _get_bom_to_explode(self, product_mrp_area_id):
boms = self.env["mrp.bom"]
if product_mrp_area_id.supply_method in ["manufacture", "phantom"]:
boms = product_mrp_area_id.product_id.bom_ids.filtered(
lambda x: x.type in ["normal", "phantom"]
)
if not boms:
return False
return boms[0]
@api.model
def explode_action(
self, product_mrp_area_id, mrp_action_date, name, qty, action, values=None
):
"""Explode requirements."""
mrp_date_demand = mrp_action_date
if mrp_date_demand < date.today():
mrp_date_demand = date.today()
bom = self._get_bom_to_explode(product_mrp_area_id)
if not bom:
return False
pd = self.env["decimal.precision"].precision_get("Product Unit of Measure")
for bomline in bom.bom_line_ids:
if (
float_is_zero(bomline.product_qty, precision_digits=pd)
or bomline.product_id.type != "product"
):
continue
if self.with_context(mrp_explosion=True)._exclude_from_mrp(
bomline.product_id, product_mrp_area_id.mrp_area_id
):
# Stop explosion.
continue
if bomline._skip_bom_line(product_mrp_area_id.product_id):
continue
# TODO: review: mrp_transit_delay, mrp_inspection_delay
mrp_date_demand_2 = mrp_date_demand - timedelta(
days=(
product_mrp_area_id.mrp_transit_delay
+ product_mrp_area_id.mrp_inspection_delay
)
)
move_data = self._prepare_mrp_move_data_bom_explosion(
product_mrp_area_id,
bomline,
qty,
mrp_date_demand_2,
bom,
name,
action,
values,
)
mrpmove_id2 = self.env["mrp.move"].create(move_data)
if hasattr(action, "mrp_move_down_ids"):
action.mrp_move_down_ids = [(4, mrpmove_id2.id)]
return True
@api.model
def create_action(self, product_mrp_area_id, mrp_date, mrp_qty, name, values=None):
if not values:
values = {}
if not isinstance(mrp_date, date):
mrp_date = fields.Date.from_string(mrp_date)
action_date, date_supply = self._get_action_and_supply_dates(
product_mrp_area_id, mrp_date
)
return self.create_planned_order(
product_mrp_area_id, mrp_qty, name, date_supply, action_date, values=values
)
@api.model
def create_planned_order(
self,
product_mrp_area_id,
mrp_qty,
name,
mrp_date_supply,
mrp_action_date,
values=None,
):
self = self.with_context(auditlog_disabled=True)
if self._exclude_from_mrp(
product_mrp_area_id.product_id, product_mrp_area_id.mrp_area_id
):
values["qty_ordered"] = 0.0
return values
qty_ordered = values.get("qty_ordered", 0.0) if values else 0.0
qty_to_order = mrp_qty
while qty_ordered < mrp_qty:
qty = product_mrp_area_id._adjust_qty_to_order(qty_to_order)
qty_to_order -= qty
order_data = self._prepare_planned_order_data(
product_mrp_area_id, qty, mrp_date_supply, mrp_action_date, name, values
)
planned_order = self.env["mrp.planned.order"].create(order_data)
qty_ordered = qty_ordered + qty
if product_mrp_area_id._to_be_exploded():
self.explode_action(
product_mrp_area_id,
mrp_action_date,
name,
qty,
planned_order,
values,
)
values["qty_ordered"] = qty_ordered
log_msg = "[{}] {}: qty_ordered = {}".format(
product_mrp_area_id.mrp_area_id.name,
product_mrp_area_id.product_id.default_code
or product_mrp_area_id.product_id.name,
qty_ordered,
)
logger.debug(log_msg)
return values
@api.model
def _mrp_cleanup(self, mrp_areas):
logger.info("Start MRP Cleanup")
domain = []
if mrp_areas:
domain += [("mrp_area_id", "in", mrp_areas.ids)]
with mute_logger("odoo.models.unlink"):
self.env["mrp.move"].search(domain).unlink()
self.env["mrp.planned.order"].search(
domain + [("fixed", "=", False)]
).unlink()
self.env["mrp.inventory"].search(domain).unlink()
logger.info("End MRP Cleanup")
return True
def _domain_bom_lines_by_llc(self, llc, product_templates):
return [
("product_id.llc", "=", llc),
("bom_id.product_tmpl_id", "in", product_templates.ids),
("bom_id.active", "=", True),
]
def _get_bom_lines_by_llc(self, llc, product_templates):
return self.env["mrp.bom.line"].search(
self._domain_bom_lines_by_llc(llc, product_templates)
)
@api.model
def _low_level_code_calculation(self):
logger.info("Start low level code calculation")
counter = 999999
llc = 0
llc_recursion_limit = (
int(
self.env["ir.config_parameter"]
.sudo()
.get_param("mrp_multi_level.llc_calculation_recursion_limit")
)
or 1000
)
self.env["product.product"].search([]).write({"llc": llc})
products = self.env["product.product"].search([("llc", "=", llc)])
if products:
counter = len(products)
log_msg = "Low level code 0 finished - Nbr. products: %s" % counter
logger.info(log_msg)
while counter:
llc += 1
products = self.env["product.product"].search([("llc", "=", llc - 1)])
p_templates = products.mapped("product_tmpl_id")
bom_lines = self._get_bom_lines_by_llc(llc - 1, p_templates)
products = bom_lines.mapped("product_id")
products.write({"llc": llc})
counter = self.env["product.product"].search_count([("llc", "=", llc)])
log_msg = f"Low level code {llc} finished - Nbr. products: {counter}"
logger.info(log_msg)
if llc > llc_recursion_limit:
logger.error("Recursion limit reached during LLC calculation.")
break
mrp_lowest_llc = llc
logger.info("End low level code calculation")
return mrp_lowest_llc
@api.model
def _adjust_mrp_applicable(self, mrp_areas):
"""This method is meant to modify the products that are applicable
to MRP Multi level calculation
"""
return True
@api.model
def _calculate_mrp_applicable(self, mrp_areas):
logger.info("Start Calculate MRP Applicable")
domain = []
if mrp_areas:
domain += [("mrp_area_id", "in", mrp_areas.ids)]
self.env["product.mrp.area"].search(domain).write({"mrp_applicable": False})
domain += [("product_id.type", "=", "product")]
self.env["product.mrp.area"].search(domain).write({"mrp_applicable": True})
self._adjust_mrp_applicable(mrp_areas)
count_domain = [("mrp_applicable", "=", True)]
if mrp_areas:
count_domain += [("mrp_area_id", "in", mrp_areas.ids)]
counter = self.env["product.mrp.area"].search_count(count_domain)
log_msg = "End Calculate MRP Applicable: %s" % counter
logger.info(log_msg)
return True
@api.model
def _init_mrp_move_from_forecast(self, product_mrp_area):
"""This method is meant to be inherited to add a forecast mechanism."""
return True
@api.model
def _init_mrp_move_from_stock_move(self, product_mrp_area):
move_obj = self.env["stock.move"]
mrp_move_obj = self.env["mrp.move"]
in_domain = product_mrp_area._in_stock_moves_domain()
in_moves = move_obj.search(in_domain)
out_domain = product_mrp_area._out_stock_moves_domain()
out_moves = move_obj.search(out_domain)
move_vals = []
if in_moves:
for move in in_moves:
move_data = self._prepare_mrp_move_data_from_stock_move(
product_mrp_area, move, direction="in"
)
if move_data:
move_vals.append(move_data)
if out_moves:
for move in out_moves:
move_data = self._prepare_mrp_move_data_from_stock_move(
product_mrp_area, move, direction="out"
)
if move_data:
move_vals.append(move_data)
mrp_move_obj.create(move_vals)
return True
@api.model
def _prepare_mrp_move_data_from_purchase_order(self, poline, product_mrp_area):
mrp_date = date.today()
if fields.Date.from_string(poline.date_planned) > date.today():
mrp_date = fields.Date.from_string(poline.date_planned)
return {
"product_id": poline.product_id.id,
"product_mrp_area_id": product_mrp_area.id,
"production_id": None,
"purchase_order_id": poline.order_id.id,
"purchase_line_id": poline.id,
"stock_move_id": None,
"mrp_qty": poline.product_uom_qty,
"current_qty": poline.product_uom_qty,
"mrp_date": mrp_date,
"current_date": poline.date_planned,
"mrp_type": "s",
"mrp_origin": "po",
"mrp_order_number": poline.order_id.name,
"parent_product_id": None,
"name": poline.order_id.name,
"state": poline.order_id.state,
}
@api.model
def _init_mrp_move_from_purchase_order(self, product_mrp_area):
location_ids = product_mrp_area._get_locations()
picking_types = self.env["stock.picking.type"].search(
[("default_location_dest_id", "child_of", location_ids.ids)]
)
picking_type_ids = [ptype.id for ptype in picking_types]
orders = self.env["purchase.order"].search(
[
("picking_type_id", "in", picking_type_ids),
("state", "in", ["draft", "sent", "to approve"]),
]
)
po_lines = self.env["purchase.order.line"].search(
[
("order_id", "in", orders.ids),
("product_qty", ">", 0.0),
("product_id", "=", product_mrp_area.product_id.id),
]
)
mrp_move_vals = []
for line in po_lines:
mrp_move_data = self._prepare_mrp_move_data_from_purchase_order(
line, product_mrp_area
)
mrp_move_vals.append(mrp_move_data)
if mrp_move_vals:
self.env["mrp.move"].create(mrp_move_vals)
@api.model
def _get_product_mrp_area_from_product_and_area(self, product, mrp_area):
return self.env["product.mrp.area"].search(
[("product_id", "=", product.id), ("mrp_area_id", "=", mrp_area.id)],
limit=1,
)
@api.model
def _init_mrp_move(self, product_mrp_area):
self._init_mrp_move_from_forecast(product_mrp_area)
self._init_mrp_move_from_stock_move(product_mrp_area)
self._init_mrp_move_from_purchase_order(product_mrp_area)
@api.model
def _exclude_from_mrp(self, product, mrp_area):
"""To extend with various logic where needed."""
product_mrp_area = self.env["product.mrp.area"].search(
[("product_id", "=", product.id), ("mrp_area_id", "=", mrp_area.id)],
limit=1,
)
if not product_mrp_area:
return True
return product_mrp_area.mrp_exclude
@api.model
def _mrp_initialisation(self, mrp_areas):
logger.info("Start MRP initialisation")
if not mrp_areas:
mrp_areas = self.env["mrp.area"].search([])
product_mrp_areas = self.env["product.mrp.area"].search(
[("mrp_area_id", "in", mrp_areas.ids), ("mrp_applicable", "=", True)]
)
init_counter = 0
for mrp_area in mrp_areas:
for product_mrp_area in product_mrp_areas.filtered(
lambda a, mrp_area=mrp_area: a.mrp_area_id == mrp_area
):
if self._exclude_from_mrp(product_mrp_area.product_id, mrp_area):
continue
init_counter += 1
log_msg = f"MRP Init: {init_counter} - {product_mrp_area.display_name} "
logger.info(log_msg)
self._init_mrp_move(product_mrp_area)
logger.info("End MRP initialisation")
def _get_qty_to_order(self, product_mrp_area, date, move_qty, onhand):
"""Compute the qty to order at a given date, for a product MRP area, given an
mrp.move quantity and an onhand quantity.
This method is an extension point, allowing a new module to change the way this
quantity should be computed.
"""
# The default rule is to resupply to rebuild the safety stock
return product_mrp_area.mrp_minimum_stock - onhand - move_qty
@api.model
def _init_mrp_move_grouped_demand(self, product_mrp_area):
last_date = None
last_qty = 0.00
onhand = (
0.0
if product_mrp_area.supply_method == "phantom"
else product_mrp_area.qty_available
)
grouping_delta = product_mrp_area.mrp_nbr_days
demand_origin = []
if (
product_mrp_area.mrp_move_ids
and onhand < product_mrp_area.mrp_minimum_stock
):
last_date = self._get_safety_stock_target_date(product_mrp_area)
demand_origin.append("Safety Stock")
move = fields.first(product_mrp_area.mrp_move_ids)
if last_date and (
fields.Date.from_string(move.mrp_date)
>= last_date + timedelta(days=grouping_delta)
):
name = _("Safety Stock")
origin = ",".join(list({x for x in demand_origin if x}))
qtytoorder = self._get_qty_to_order(
product_mrp_area, last_date, 0, onhand
)
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=last_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=origin),
)
qty_ordered = cm.get("qty_ordered", 0.0)
onhand = onhand + qty_ordered
last_date = None
last_qty = 0.00
demand_origin = []
for move in product_mrp_area.mrp_move_ids:
if self._exclude_move(move):
continue
if (
last_date
and (
fields.Date.from_string(move.mrp_date)
>= last_date + timedelta(days=grouping_delta)
)
and (
(onhand + last_qty + move.mrp_qty)
< product_mrp_area.mrp_minimum_stock
or (onhand + last_qty) < product_mrp_area.mrp_minimum_stock
)
):
name = _(
"Grouped Demand of %(product_name)s for %(delta_days)d Days"
) % dict(
product_name=product_mrp_area.product_id.display_name,
delta_days=grouping_delta,
)
origin = ",".join(list({x for x in demand_origin if x}))
qtytoorder = self._get_qty_to_order(
product_mrp_area, last_date, last_qty, onhand
)
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=last_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=origin),
)
qty_ordered = cm.get("qty_ordered", 0.0)
onhand = onhand + last_qty + qty_ordered
last_date = None
last_qty = 0.00
demand_origin = []
if (
onhand + last_qty + move.mrp_qty
) < product_mrp_area.mrp_minimum_stock or (
onhand + last_qty
) < product_mrp_area.mrp_minimum_stock:
if not last_date:
last_date = fields.Date.from_string(move.mrp_date)
last_qty = move.mrp_qty
else:
last_qty += move.mrp_qty
else:
last_date = fields.Date.from_string(move.mrp_date)
onhand += move.mrp_qty
if move.mrp_type == "d":
demand_origin.append(move.origin or move.name)
if last_date and last_qty != 0.00:
name = _(
"Grouped Demand of %(product_name)s for %(delta_days)d Days"
) % dict(
product_name=product_mrp_area.product_id.display_name,
delta_days=grouping_delta,
)
origin = ",".join(list({x for x in demand_origin if x}))
qtytoorder = self._get_qty_to_order(
product_mrp_area, last_date, last_qty, onhand
)
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=last_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=origin),
)
qty_ordered = cm.get("qty_ordered", 0.0)
onhand += qty_ordered
last_qty -= qty_ordered
if (onhand + last_qty) < product_mrp_area.mrp_minimum_stock:
mrp_date = self._get_safety_stock_target_date(product_mrp_area)
qtytoorder = self._get_qty_to_order(product_mrp_area, mrp_date, 0, onhand)
name = _("Safety Stock")
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=mrp_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=name),
)
qty_ordered = cm["qty_ordered"]
onhand += qty_ordered
def _get_safety_stock_target_date(self, product_mrp_area):
"""Get the date at which the safety stock rebuild should be targeted
This method is an extension point for modules who need to cusomize that date."""
return date.today()
@api.model
def _init_mrp_move_non_grouped_demand(self, product_mrp_area):
onhand = (
0.0
if product_mrp_area.supply_method == "phantom"
else product_mrp_area.qty_available
)
for move in product_mrp_area.mrp_move_ids:
if self._exclude_move(move):
continue
# This works because mrp moves are ordered by:
# product_mrp_area_id, mrp_date, mrp_type desc, id
if onhand + move.mrp_qty < product_mrp_area.mrp_minimum_stock:
qtytoorder = self._get_qty_to_order(
product_mrp_area,
self._get_safety_stock_target_date(product_mrp_area),
0,
onhand,
)
name = _("Safety Stock")
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=self._get_safety_stock_target_date(product_mrp_area),
mrp_qty=qtytoorder,
name=name,
values=dict(origin=name),
)
qty_ordered = cm["qty_ordered"]
onhand += qty_ordered
qtytoorder = self._get_qty_to_order(
product_mrp_area, move.mrp_date, move.mrp_qty, onhand
)
if qtytoorder > 0.0:
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=move.mrp_date,
mrp_qty=qtytoorder,
name=move.name or "",
values=dict(origin=move.origin or ""),
)
qty_ordered = cm["qty_ordered"]
onhand += move.mrp_qty + qty_ordered
else:
onhand += move.mrp_qty
if onhand < product_mrp_area.mrp_minimum_stock:
mrp_date = self._get_safety_stock_target_date(product_mrp_area)
qtytoorder = self._get_qty_to_order(product_mrp_area, mrp_date, 0, onhand)
name = _("Safety Stock")
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=mrp_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=name),
)
qty_ordered = cm["qty_ordered"]
onhand += qty_ordered
@api.model
def _exclude_move(self, move):
"""Improve extensibility being able to exclude special moves."""
return False
def _get_mrp_initialization_groups_of_params(self, mrp_lowest_llc, mrp_areas):
product_mrp_area_obj = self.env["product.mrp.area"]
groups = {}
for mrp_area in mrp_areas:
llc = 0
while mrp_lowest_llc > llc:
groups[mrp_area, llc] = product_mrp_area_obj.search(
[("product_id.llc", "=", llc), ("mrp_area_id", "=", mrp_area.id)]
)
llc += 1
return groups
@api.model
def _mrp_calculation(self, mrp_lowest_llc, mrp_areas):
logger.info("Start MRP calculation")
if not mrp_areas:
mrp_areas = self.env["mrp.area"].search([])
keyed_groups = self._get_mrp_initialization_groups_of_params(
mrp_lowest_llc, mrp_areas
)
for (mrp_area, llc), product_mrp_areas in keyed_groups.items():
counter = 0
for product_mrp_area in product_mrp_areas:
if product_mrp_area.mrp_nbr_days == 0:
self._init_mrp_move_non_grouped_demand(product_mrp_area)
else:
self._init_mrp_move_grouped_demand(product_mrp_area)
counter += 1
log_msg = (
"MRP Calculation LLC {} at {} Finished - Nbr. products: {}".format(
llc, mrp_area.name, counter
)
)
logger.info(log_msg)
logger.info("End MRP calculation")
@api.model
def _get_demand_groups(self, product_mrp_area):
query = """
SELECT mrp_date, sum(mrp_qty)
FROM mrp_move
WHERE product_mrp_area_id = %(mrp_product)s
AND mrp_type = 'd'
GROUP BY mrp_date
"""
params = {"mrp_product": product_mrp_area.id}
return query, params
@api.model
def _get_supply_groups(self, product_mrp_area):
query = """
SELECT mrp_date, sum(mrp_qty)
FROM mrp_move
WHERE product_mrp_area_id = %(mrp_product)s
AND mrp_type = 's'
GROUP BY mrp_date
"""
params = {"mrp_product": product_mrp_area.id}
return query, params
@api.model
def _get_planned_order_groups(self, product_mrp_area):
query = """
SELECT due_date, sum(mrp_qty)
FROM mrp_planned_order
WHERE product_mrp_area_id = %(mrp_product)s
GROUP BY due_date
"""
params = {"mrp_product": product_mrp_area.id}
return query, params
@api.model
def _prepare_mrp_inventory_data(
self,
product_mrp_area,
mdt,
on_hand_qty,
running_availability,
demand_qty_by_date,
supply_qty_by_date,
planned_qty_by_date,
):
"""Return dict to create mrp.inventory records on MRP Multi Level Scheduler"""
mrp_inventory_data = {"product_mrp_area_id": product_mrp_area.id, "date": mdt}
demand_qty = demand_qty_by_date.get(mdt, 0.0)
mrp_inventory_data["demand_qty"] = abs(demand_qty)
supply_qty = supply_qty_by_date.get(mdt, 0.0)
mrp_inventory_data["supply_qty"] = abs(supply_qty)
mrp_inventory_data["initial_on_hand_qty"] = on_hand_qty
if product_mrp_area.supply_method != "phantom":
on_hand_qty += supply_qty + demand_qty
mrp_inventory_data["final_on_hand_qty"] = on_hand_qty
# Consider that MRP plan is followed exactly:
running_availability += (
supply_qty + demand_qty + planned_qty_by_date.get(mdt, 0.0)
)
mrp_inventory_data["running_availability"] = running_availability
return mrp_inventory_data, running_availability, on_hand_qty
@api.model
def _init_mrp_inventory(self, product_mrp_area):
mrp_move_obj = self.env["mrp.move"]
planned_order_obj = self.env["mrp.planned.order"]
# Read Demand
demand_qty_by_date = {}
query, params = self._get_demand_groups(product_mrp_area)
self.env.cr.execute(query, params)
for mrp_date, qty in self.env.cr.fetchall():
demand_qty_by_date[mrp_date] = qty
# Read Supply
supply_qty_by_date = {}
query, params = self._get_supply_groups(product_mrp_area)
self.env.cr.execute(query, params)
for mrp_date, qty in self.env.cr.fetchall():
supply_qty_by_date[mrp_date] = qty
# Read planned orders:
planned_qty_by_date = {}
query, params = self._get_planned_order_groups(product_mrp_area)
self.env.cr.execute(query, params)
for mrp_date, qty in self.env.cr.fetchall():
planned_qty_by_date[mrp_date] = qty
# Dates
moves_dates = mrp_move_obj.search(
[("product_mrp_area_id", "=", product_mrp_area.id)], order="mrp_date"
).mapped("mrp_date")
action_dates = planned_order_obj.search(
[("product_mrp_area_id", "=", product_mrp_area.id)], order="due_date"
).mapped("due_date")
mrp_dates = set(moves_dates + action_dates)
on_hand_qty = (
0.0
if product_mrp_area.supply_method == "phantom"
else product_mrp_area.qty_available
)
running_availability = on_hand_qty
mrp_inventory_vals = []
for mdt in sorted(mrp_dates):
(
mrp_inventory_data,
running_availability,
on_hand_qty,
) = self._prepare_mrp_inventory_data(
product_mrp_area,
mdt,
on_hand_qty,
running_availability,
demand_qty_by_date,
supply_qty_by_date,
planned_qty_by_date,
)
mrp_inventory_vals.append(mrp_inventory_data)
if mrp_inventory_vals:
mrp_invs = self.env["mrp.inventory"].create(mrp_inventory_vals)
planned_orders = planned_order_obj.search(
[("product_mrp_area_id", "=", product_mrp_area.id)]
)
# attach planned orders to inventory
for po in planned_orders:
invs = mrp_invs.filtered(lambda i, po=po: i.date == po.due_date)
if invs:
po.mrp_inventory_id = invs[0]
def should_build_time_phased_inventory(self, product_mrp_area):
return not (
self._exclude_from_mrp(
product_mrp_area.product_id, product_mrp_area.mrp_area_id
)
or product_mrp_area.supply_method == "phantom"
)
@api.model
def _mrp_final_process(self, mrp_areas):
logger.info("Start MRP final process")
domain = [("product_id.llc", "<", 9999)]
if mrp_areas:
domain += [("mrp_area_id", "in", mrp_areas.ids)]
product_mrp_area_ids = self.env["product.mrp.area"].search(domain)
for product_mrp_area in product_mrp_area_ids:
# Build the time-phased inventory
if not self.should_build_time_phased_inventory(product_mrp_area):
continue
self._init_mrp_inventory(product_mrp_area)
logger.info("End MRP final process")
def run_mrp_multi_level(self):
self._mrp_cleanup(self.mrp_area_ids)
mrp_lowest_llc = self._low_level_code_calculation()
self._calculate_mrp_applicable(self.mrp_area_ids)
self._mrp_initialisation(self.mrp_area_ids)
self._mrp_calculation(mrp_lowest_llc, self.mrp_area_ids)
self._mrp_final_process(self.mrp_area_ids)
# Open MRP inventory screen to show result if manually run:
# Done as sudo to allow non-admin users to read the action.
action = self.env.ref("mrp_multi_level.mrp_inventory_action")
result = action.sudo().read()[0]
return result