Files
manufacture/mrp_multi_level/wizards/mrp_multi_level.py
BernatPForgeFlow c328d9b799 [FIX] mrp_multi_level: Prioritize safety stock with mrp moves today
If I have 0 units, my safety stock is 5 units and today I have a supply for 10 units, the procurement recommendation should be 0 units
2024-02-20 16:47:05 +01:00

909 lines
35 KiB
Python

# Copyright 2016 Ucamco - Wim Audenaert <wim.audenaert@ucamco.com>
# Copyright 2016-19 ForgeFlow S.L. (https://www.forgeflow.com)
# - Jordi Ballester Alomar <jordi.ballester@forgeflow.com>
# - Lois Rilo <lois.rilo@forgeflow.com>
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl.html).
import logging
from datetime import date, timedelta
from odoo import _, api, exceptions, fields, models
from odoo.tools import float_is_zero, mute_logger
logger = logging.getLogger(__name__)
class MultiLevelMrp(models.TransientModel):
_name = "mrp.multi.level"
_description = "Multi Level MRP"
mrp_area_ids = fields.Many2many(
comodel_name="mrp.area",
string="MRP Areas to run",
help="If empty, all areas will be computed.",
)
@api.model
def _prepare_mrp_move_data_from_stock_move(
self, product_mrp_area, move, direction="in"
):
area = product_mrp_area.mrp_area_id
if direction == "out":
mrp_type = "d"
product_qty = -move.product_qty
else:
mrp_type = "s"
product_qty = move.product_qty
po = po_line = None
mo = origin = order_number = order_origin = parent_product_id = None
if move.purchase_line_id:
po = move.purchase_line_id.order_id
order_number = po.name
order_origin = po.origin
origin = "po"
po = move.purchase_line_id.order_id.id
po_line = move.purchase_line_id.id
elif move.production_id or move.raw_material_production_id:
production = move.production_id or move.raw_material_production_id
order_number = production.name
order_origin = production.origin
origin = "mo"
mo = production.id
elif move.move_dest_ids:
for move_dest_id in move.move_dest_ids.filtered("production_id"):
production = move_dest_id.production_id
order_number = production.name
order_origin = production.origin
origin = "mo"
mo = move_dest_id.production_id.id
parent_product_id = (
move_dest_id.production_id.product_id or move_dest_id.product_id
).id
if not order_number:
source = (move.picking_id or move).origin
order_number = source or (move.picking_id or move).name
origin = "mv"
# The date to display is based on the timezone of the warehouse.
today_tz = area._datetime_to_date_tz()
move_date_tz = area._datetime_to_date_tz(move.date)
if move_date_tz > today_tz:
mrp_date = move_date_tz
else:
mrp_date = today_tz
return {
"product_id": move.product_id.id,
"product_mrp_area_id": product_mrp_area.id,
"production_id": mo,
"purchase_order_id": po,
"purchase_line_id": po_line,
"stock_move_id": move.id,
"mrp_qty": product_qty,
"current_qty": product_qty,
"mrp_date": mrp_date,
"current_date": move.date,
"mrp_type": mrp_type,
"mrp_origin": origin or "",
"mrp_order_number": order_number,
"parent_product_id": parent_product_id,
"name": order_number,
"origin": order_origin,
"state": move.state,
}
@api.model
def _prepare_planned_order_data(
self, product_mrp_area, qty, mrp_date_supply, mrp_action_date, name, values
):
return {
"product_mrp_area_id": product_mrp_area.id,
"mrp_qty": qty,
"due_date": mrp_date_supply,
"order_release_date": mrp_action_date,
"mrp_action": product_mrp_area.supply_method,
"qty_released": 0.0,
"name": "Planned supply for: " + name,
"origin": values.get("origin") or name,
"fixed": False,
}
@api.model
def _prepare_mrp_move_data_bom_explosion(
self,
product,
bomline,
qty,
mrp_date_demand_2,
bom,
name,
planned_order,
values=None,
):
product_mrp_area = self._get_product_mrp_area_from_product_and_area(
bomline.product_id, product.mrp_area_id
)
if not product_mrp_area:
raise exceptions.Warning(_("No MRP product found"))
factor = (
product.product_id.uom_id._compute_quantity(
qty, bomline.bom_id.product_uom_id
)
/ bomline.bom_id.product_qty
)
line_quantity = factor * bomline.product_qty
return {
"mrp_area_id": product.mrp_area_id.id,
"product_id": bomline.product_id.id,
"product_mrp_area_id": product_mrp_area.id,
"production_id": None,
"purchase_order_id": None,
"purchase_line_id": None,
"stock_move_id": None,
"mrp_qty": -line_quantity, # TODO: review with UoM
"current_qty": None,
"mrp_date": mrp_date_demand_2,
"current_date": None,
"mrp_type": "d",
"mrp_origin": "mrp",
"mrp_order_number": None,
"parent_product_id": bom.product_id.id,
"name": (
"Demand Bom Explosion: %s"
% (name or product.product_id.default_code or product.product_id.name)
).replace(
"Demand Bom Explosion: Demand Bom Explosion: ", "Demand Bom Explosion: "
),
"origin": planned_order.origin if planned_order else values.get("origin"),
}
@api.model
def _get_action_and_supply_dates(self, product_mrp_area, mrp_date):
if not isinstance(mrp_date, date):
mrp_date = fields.Date.from_string(mrp_date)
if mrp_date < date.today():
mrp_date_supply = date.today()
else:
mrp_date_supply = mrp_date
calendar = product_mrp_area.mrp_area_id.calendar_id
if calendar and product_mrp_area.mrp_lead_time:
date_str = fields.Date.to_string(mrp_date)
dt = fields.Datetime.from_string(date_str)
# dt is at the beginning of the day (00:00)
res = calendar.plan_days(-1 * product_mrp_area.mrp_lead_time, dt)
mrp_action_date = res.date()
else:
mrp_action_date = mrp_date - timedelta(days=product_mrp_area.mrp_lead_time)
return mrp_action_date, mrp_date_supply
@api.model
def _get_bom_to_explode(self, product_mrp_area_id):
boms = self.env["mrp.bom"]
if product_mrp_area_id.supply_method in ["manufacture", "phantom"]:
boms = product_mrp_area_id.product_id.bom_ids.filtered(
lambda x: x.type in ["normal", "phantom"]
)
if not boms:
return False
return boms[0]
@api.model
def explode_action(
self, product_mrp_area_id, mrp_action_date, name, qty, action, values=None
):
"""Explode requirements."""
mrp_date_demand = mrp_action_date
if mrp_date_demand < date.today():
mrp_date_demand = date.today()
bom = self._get_bom_to_explode(product_mrp_area_id)
if not bom:
return False
pd = self.env["decimal.precision"].precision_get("Product Unit of Measure")
for bomline in bom.bom_line_ids:
if (
float_is_zero(bomline.product_qty, precision_digits=pd)
or bomline.product_id.type != "product"
):
continue
if self.with_context(mrp_explosion=True)._exclude_from_mrp(
bomline.product_id, product_mrp_area_id.mrp_area_id
):
# Stop explosion.
continue
if bomline._skip_bom_line(product_mrp_area_id.product_id):
continue
# TODO: review: mrp_transit_delay, mrp_inspection_delay
mrp_date_demand_2 = mrp_date_demand - timedelta(
days=(
product_mrp_area_id.mrp_transit_delay
+ product_mrp_area_id.mrp_inspection_delay
)
)
move_data = self._prepare_mrp_move_data_bom_explosion(
product_mrp_area_id,
bomline,
qty,
mrp_date_demand_2,
bom,
name,
action,
values,
)
mrpmove_id2 = self.env["mrp.move"].create(move_data)
if hasattr(action, "mrp_move_down_ids"):
action.mrp_move_down_ids = [(4, mrpmove_id2.id)]
return True
@api.model
def create_action(self, product_mrp_area_id, mrp_date, mrp_qty, name, values=None):
if not values:
values = {}
if not isinstance(mrp_date, date):
mrp_date = fields.Date.from_string(mrp_date)
action_date, date_supply = self._get_action_and_supply_dates(
product_mrp_area_id, mrp_date
)
return self.create_planned_order(
product_mrp_area_id, mrp_qty, name, date_supply, action_date, values=values
)
@api.model
def create_planned_order(
self,
product_mrp_area_id,
mrp_qty,
name,
mrp_date_supply,
mrp_action_date,
values=None,
):
self = self.with_context(auditlog_disabled=True)
if self._exclude_from_mrp(
product_mrp_area_id.product_id, product_mrp_area_id.mrp_area_id
):
values["qty_ordered"] = 0.0
return values
qty_ordered = values.get("qty_ordered", 0.0) if values else 0.0
qty_to_order = mrp_qty
while qty_ordered < mrp_qty:
qty = product_mrp_area_id._adjust_qty_to_order(qty_to_order)
qty_to_order -= qty
order_data = self._prepare_planned_order_data(
product_mrp_area_id, qty, mrp_date_supply, mrp_action_date, name, values
)
# Do not create planned order for products that are Kits
planned_order = False
if not product_mrp_area_id.supply_method == "phantom":
planned_order = self.env["mrp.planned.order"].create(order_data)
qty_ordered = qty_ordered + qty
if product_mrp_area_id._to_be_exploded():
self.explode_action(
product_mrp_area_id,
mrp_action_date,
name,
qty,
planned_order,
values,
)
values["qty_ordered"] = qty_ordered
log_msg = "[{}] {}: qty_ordered = {}".format(
product_mrp_area_id.mrp_area_id.name,
product_mrp_area_id.product_id.default_code
or product_mrp_area_id.product_id.name,
qty_ordered,
)
logger.debug(log_msg)
return values
@api.model
def _mrp_cleanup(self, mrp_areas):
logger.info("Start MRP Cleanup")
domain = []
if mrp_areas:
domain += [("mrp_area_id", "in", mrp_areas.ids)]
with mute_logger("odoo.models.unlink"):
self.env["mrp.move"].search(domain).unlink()
self.env["mrp.inventory"].search(domain).unlink()
domain += [("fixed", "=", False)]
self.env["mrp.planned.order"].search(domain).unlink()
logger.info("End MRP Cleanup")
return True
def _domain_bom_lines_by_llc(self, llc, product_templates):
return [
("product_id.llc", "=", llc),
("bom_id.product_tmpl_id", "in", product_templates.ids),
("bom_id.active", "=", True),
]
def _get_bom_lines_by_llc(self, llc, product_templates):
return self.env["mrp.bom.line"].search(
self._domain_bom_lines_by_llc(llc, product_templates)
)
@api.model
def _low_level_code_calculation(self):
logger.info("Start low level code calculation")
counter = 999999
llc = 0
llc_recursion_limit = (
int(
self.env["ir.config_parameter"]
.sudo()
.get_param("mrp_multi_level.llc_calculation_recursion_limit")
)
or 1000
)
self.env["product.product"].search([]).write({"llc": llc})
products = self.env["product.product"].search([("llc", "=", llc)])
if products:
counter = len(products)
log_msg = "Low level code 0 finished - Nbr. products: %s" % counter
logger.info(log_msg)
while counter:
llc += 1
products = self.env["product.product"].search([("llc", "=", llc - 1)])
p_templates = products.mapped("product_tmpl_id")
bom_lines = self._get_bom_lines_by_llc(llc - 1, p_templates)
products = bom_lines.mapped("product_id")
products.write({"llc": llc})
counter = self.env["product.product"].search_count([("llc", "=", llc)])
log_msg = "Low level code {} finished - Nbr. products: {}".format(
llc, counter
)
logger.info(log_msg)
if llc > llc_recursion_limit:
logger.error("Recursion limit reached during LLC calculation.")
break
mrp_lowest_llc = llc
logger.info("End low level code calculation")
return mrp_lowest_llc
@api.model
def _adjust_mrp_applicable(self, mrp_areas):
"""This method is meant to modify the products that are applicable
to MRP Multi level calculation
"""
return True
@api.model
def _calculate_mrp_applicable(self, mrp_areas):
logger.info("Start Calculate MRP Applicable")
domain = []
if mrp_areas:
domain += [("mrp_area_id", "in", mrp_areas.ids)]
self.env["product.mrp.area"].search(domain).write({"mrp_applicable": False})
domain += [("product_id.type", "=", "product")]
self.env["product.mrp.area"].search(domain).write({"mrp_applicable": True})
self._adjust_mrp_applicable(mrp_areas)
count_domain = [("mrp_applicable", "=", True)]
if mrp_areas:
count_domain += [("mrp_area_id", "in", mrp_areas.ids)]
counter = self.env["product.mrp.area"].search(count_domain, count=True)
log_msg = "End Calculate MRP Applicable: %s" % counter
logger.info(log_msg)
return True
@api.model
def _init_mrp_move_from_forecast(self, product_mrp_area):
"""This method is meant to be inherited to add a forecast mechanism."""
return True
@api.model
def _init_mrp_move_from_stock_move(self, product_mrp_area):
move_obj = self.env["stock.move"]
mrp_move_obj = self.env["mrp.move"]
in_domain = product_mrp_area._in_stock_moves_domain()
in_moves = move_obj.search(in_domain)
out_domain = product_mrp_area._out_stock_moves_domain()
out_moves = move_obj.search(out_domain)
move_vals = []
if in_moves:
for move in in_moves:
move_data = self._prepare_mrp_move_data_from_stock_move(
product_mrp_area, move, direction="in"
)
if move_data:
move_vals.append(move_data)
if out_moves:
for move in out_moves:
move_data = self._prepare_mrp_move_data_from_stock_move(
product_mrp_area, move, direction="out"
)
if move_data:
move_vals.append(move_data)
mrp_move_obj.create(move_vals)
return True
@api.model
def _prepare_mrp_move_data_from_purchase_order(self, poline, product_mrp_area):
mrp_date = date.today()
if fields.Date.from_string(poline.date_planned) > date.today():
mrp_date = fields.Date.from_string(poline.date_planned)
return {
"product_id": poline.product_id.id,
"product_mrp_area_id": product_mrp_area.id,
"production_id": None,
"purchase_order_id": poline.order_id.id,
"purchase_line_id": poline.id,
"stock_move_id": None,
"mrp_qty": poline.product_uom_qty,
"current_qty": poline.product_uom_qty,
"mrp_date": mrp_date,
"current_date": poline.date_planned,
"mrp_type": "s",
"mrp_origin": "po",
"mrp_order_number": poline.order_id.name,
"parent_product_id": None,
"name": poline.order_id.name,
"state": poline.order_id.state,
}
@api.model
def _init_mrp_move_from_purchase_order(self, product_mrp_area):
location_ids = product_mrp_area._get_locations()
picking_types = self.env["stock.picking.type"].search(
[("default_location_dest_id", "child_of", location_ids.ids)]
)
picking_type_ids = [ptype.id for ptype in picking_types]
orders = self.env["purchase.order"].search(
[
("picking_type_id", "in", picking_type_ids),
("state", "in", ["draft", "sent", "to approve"]),
]
)
po_lines = self.env["purchase.order.line"].search(
[
("order_id", "in", orders.ids),
("product_qty", ">", 0.0),
("product_id", "=", product_mrp_area.product_id.id),
]
)
mrp_move_vals = []
for line in po_lines:
mrp_move_data = self._prepare_mrp_move_data_from_purchase_order(
line, product_mrp_area
)
mrp_move_vals.append(mrp_move_data)
if mrp_move_vals:
self.env["mrp.move"].create(mrp_move_vals)
@api.model
def _get_product_mrp_area_from_product_and_area(self, product, mrp_area):
return self.env["product.mrp.area"].search(
[("product_id", "=", product.id), ("mrp_area_id", "=", mrp_area.id)],
limit=1,
)
@api.model
def _init_mrp_move(self, product_mrp_area):
self._init_mrp_move_from_forecast(product_mrp_area)
self._init_mrp_move_from_stock_move(product_mrp_area)
self._init_mrp_move_from_purchase_order(product_mrp_area)
@api.model
def _exclude_from_mrp(self, product, mrp_area):
"""To extend with various logic where needed."""
product_mrp_area = self.env["product.mrp.area"].search(
[("product_id", "=", product.id), ("mrp_area_id", "=", mrp_area.id)],
limit=1,
)
if not product_mrp_area:
return True
return product_mrp_area.mrp_exclude
@api.model
def _mrp_initialisation(self, mrp_areas):
logger.info("Start MRP initialisation")
if not mrp_areas:
mrp_areas = self.env["mrp.area"].search([])
product_mrp_areas = self.env["product.mrp.area"].search(
[("mrp_area_id", "in", mrp_areas.ids), ("mrp_applicable", "=", True)]
)
init_counter = 0
for mrp_area in mrp_areas:
for product_mrp_area in product_mrp_areas.filtered(
lambda a: a.mrp_area_id == mrp_area
):
if self._exclude_from_mrp(product_mrp_area.product_id, mrp_area):
continue
init_counter += 1
log_msg = "MRP Init: {} - {} ".format(
init_counter, product_mrp_area.display_name
)
logger.info(log_msg)
self._init_mrp_move(product_mrp_area)
logger.info("End MRP initialisation")
def _get_qty_to_order(self, product_mrp_area, date, move_qty, onhand):
"""Compute the qty to order at a given date, for a product MRP area, given an
mrp.move quantity and an onhand quantity.
This method is an extension point, allowing a new module to change the way this
quantity should be computed.
"""
# The default rule is to resupply to rebuild the safety stock
return product_mrp_area.mrp_minimum_stock - onhand - move_qty
@api.model
def _init_mrp_move_grouped_demand(self, product_mrp_area):
last_date = None
last_qty = 0.00
onhand = product_mrp_area.qty_available
grouping_delta = product_mrp_area.mrp_nbr_days
demand_origin = []
if (
product_mrp_area.mrp_move_ids
and onhand < product_mrp_area.mrp_minimum_stock
):
last_date = self._get_safety_stock_target_date(product_mrp_area)
demand_origin.append("Safety Stock")
move = fields.first(product_mrp_area.mrp_move_ids)
if last_date and (
fields.Date.from_string(move.mrp_date)
>= last_date + timedelta(days=grouping_delta)
):
name = _("Safety Stock")
origin = ",".join(list({x for x in demand_origin if x}))
qtytoorder = self._get_qty_to_order(
product_mrp_area, last_date, 0, onhand
)
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=last_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=origin),
)
qty_ordered = cm.get("qty_ordered", 0.0)
onhand = onhand + qty_ordered
last_date = None
last_qty = 0.00
demand_origin = []
for move in product_mrp_area.mrp_move_ids:
if self._exclude_move(move):
continue
if (
last_date
and (
fields.Date.from_string(move.mrp_date)
>= last_date + timedelta(days=grouping_delta)
)
and (
(onhand + last_qty + move.mrp_qty)
< product_mrp_area.mrp_minimum_stock
or (onhand + last_qty) < product_mrp_area.mrp_minimum_stock
)
):
name = _(
"Grouped Demand of %(product_name)s for %(delta_days)d Days"
) % dict(
product_name=product_mrp_area.product_id.display_name,
delta_days=grouping_delta,
)
origin = ",".join(list({x for x in demand_origin if x}))
qtytoorder = self._get_qty_to_order(
product_mrp_area, last_date, last_qty, onhand
)
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=last_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=origin),
)
qty_ordered = cm.get("qty_ordered", 0.0)
onhand = onhand + last_qty + qty_ordered
last_date = None
last_qty = 0.00
demand_origin = []
if (
onhand + last_qty + move.mrp_qty
) < product_mrp_area.mrp_minimum_stock or (
onhand + last_qty
) < product_mrp_area.mrp_minimum_stock:
if not last_date:
last_date = fields.Date.from_string(move.mrp_date)
last_qty = move.mrp_qty
else:
last_qty += move.mrp_qty
else:
last_date = fields.Date.from_string(move.mrp_date)
onhand += move.mrp_qty
if move.mrp_type == "d":
demand_origin.append(move.origin or move.name)
if last_date and last_qty != 0.00:
name = _(
"Grouped Demand of %(product_name)s for %(delta_days)d Days"
) % dict(
product_name=product_mrp_area.product_id.display_name,
delta_days=grouping_delta,
)
origin = ",".join(list({x for x in demand_origin if x}))
qtytoorder = self._get_qty_to_order(
product_mrp_area, last_date, last_qty, onhand
)
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=last_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=origin),
)
qty_ordered = cm.get("qty_ordered", 0.0)
onhand += qty_ordered
last_qty -= qty_ordered
if (onhand + last_qty) < product_mrp_area.mrp_minimum_stock:
mrp_date = self._get_safety_stock_target_date(product_mrp_area)
qtytoorder = self._get_qty_to_order(product_mrp_area, mrp_date, 0, onhand)
name = _("Safety Stock")
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=mrp_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=name),
)
qty_ordered = cm["qty_ordered"]
onhand += qty_ordered
def _get_safety_stock_target_date(self, product_mrp_area):
"""Get the date at which the safety stock rebuild should be targeted
This method is an extension point for modules who need to cusomize that date."""
return date.today()
@api.model
def _init_mrp_move_non_grouped_demand(self, product_mrp_area):
onhand = product_mrp_area.qty_available
for move in product_mrp_area.mrp_move_ids:
if self._exclude_move(move):
continue
# This works because mrp moves are ordered by:
# product_mrp_area_id, mrp_date, mrp_type desc, id
if onhand + move.mrp_qty < product_mrp_area.mrp_minimum_stock:
qtytoorder = self._get_qty_to_order(
product_mrp_area,
self._get_safety_stock_target_date(product_mrp_area),
0,
onhand,
)
name = _("Safety Stock")
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=self._get_safety_stock_target_date(product_mrp_area),
mrp_qty=qtytoorder,
name=name,
values=dict(origin=name),
)
qty_ordered = cm["qty_ordered"]
onhand += qty_ordered
qtytoorder = self._get_qty_to_order(
product_mrp_area, move.mrp_date, move.mrp_qty, onhand
)
if qtytoorder > 0.0:
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=move.mrp_date,
mrp_qty=qtytoorder,
name=move.name or "",
values=dict(origin=move.origin or ""),
)
qty_ordered = cm["qty_ordered"]
onhand += move.mrp_qty + qty_ordered
else:
onhand += move.mrp_qty
if onhand < product_mrp_area.mrp_minimum_stock:
mrp_date = self._get_safety_stock_target_date(product_mrp_area)
qtytoorder = self._get_qty_to_order(product_mrp_area, mrp_date, 0, onhand)
name = _("Safety Stock")
cm = self.create_action(
product_mrp_area_id=product_mrp_area,
mrp_date=mrp_date,
mrp_qty=qtytoorder,
name=name,
values=dict(origin=name),
)
qty_ordered = cm["qty_ordered"]
onhand += qty_ordered
@api.model
def _exclude_move(self, move):
"""Improve extensibility being able to exclude special moves."""
return False
@api.model
def _mrp_calculation(self, mrp_lowest_llc, mrp_areas):
logger.info("Start MRP calculation")
product_mrp_area_obj = self.env["product.mrp.area"]
counter = 0
if not mrp_areas:
mrp_areas = self.env["mrp.area"].search([])
for mrp_area in mrp_areas:
llc = 0
while mrp_lowest_llc > llc:
product_mrp_areas = product_mrp_area_obj.search(
[("product_id.llc", "=", llc), ("mrp_area_id", "=", mrp_area.id)]
)
llc += 1
for product_mrp_area in product_mrp_areas:
if product_mrp_area.mrp_nbr_days == 0:
self._init_mrp_move_non_grouped_demand(product_mrp_area)
else:
self._init_mrp_move_grouped_demand(product_mrp_area)
counter += 1
log_msg = "MRP Calculation LLC {} Finished - Nbr. products: {}".format(
llc - 1, counter
)
logger.info(log_msg)
logger.info("Enb MRP calculation")
@api.model
def _get_demand_groups(self, product_mrp_area):
query = """
SELECT mrp_date, sum(mrp_qty)
FROM mrp_move
WHERE product_mrp_area_id = %(mrp_product)s
AND mrp_type = 'd'
GROUP BY mrp_date
"""
params = {"mrp_product": product_mrp_area.id}
return query, params
@api.model
def _get_supply_groups(self, product_mrp_area):
query = """
SELECT mrp_date, sum(mrp_qty)
FROM mrp_move
WHERE product_mrp_area_id = %(mrp_product)s
AND mrp_type = 's'
GROUP BY mrp_date
"""
params = {"mrp_product": product_mrp_area.id}
return query, params
@api.model
def _get_planned_order_groups(self, product_mrp_area):
query = """
SELECT due_date, sum(mrp_qty)
FROM mrp_planned_order
WHERE product_mrp_area_id = %(mrp_product)s
GROUP BY due_date
"""
params = {"mrp_product": product_mrp_area.id}
return query, params
@api.model
def _prepare_mrp_inventory_data(
self,
product_mrp_area,
mdt,
on_hand_qty,
running_availability,
demand_qty_by_date,
supply_qty_by_date,
planned_qty_by_date,
):
"""Return dict to create mrp.inventory records on MRP Multi Level Scheduler"""
mrp_inventory_data = {"product_mrp_area_id": product_mrp_area.id, "date": mdt}
demand_qty = demand_qty_by_date.get(mdt, 0.0)
mrp_inventory_data["demand_qty"] = abs(demand_qty)
supply_qty = supply_qty_by_date.get(mdt, 0.0)
mrp_inventory_data["supply_qty"] = abs(supply_qty)
mrp_inventory_data["initial_on_hand_qty"] = on_hand_qty
on_hand_qty += supply_qty + demand_qty
mrp_inventory_data["final_on_hand_qty"] = on_hand_qty
# Consider that MRP plan is followed exactly:
running_availability += (
supply_qty + demand_qty + planned_qty_by_date.get(mdt, 0.0)
)
mrp_inventory_data["running_availability"] = running_availability
return mrp_inventory_data, running_availability, on_hand_qty
@api.model
def _init_mrp_inventory(self, product_mrp_area):
mrp_move_obj = self.env["mrp.move"]
planned_order_obj = self.env["mrp.planned.order"]
# Read Demand
demand_qty_by_date = {}
query, params = self._get_demand_groups(product_mrp_area)
self.env.cr.execute(query, params)
for mrp_date, qty in self.env.cr.fetchall():
demand_qty_by_date[mrp_date] = qty
# Read Supply
supply_qty_by_date = {}
query, params = self._get_supply_groups(product_mrp_area)
self.env.cr.execute(query, params)
for mrp_date, qty in self.env.cr.fetchall():
supply_qty_by_date[mrp_date] = qty
# Read planned orders:
planned_qty_by_date = {}
query, params = self._get_planned_order_groups(product_mrp_area)
self.env.cr.execute(query, params)
for mrp_date, qty in self.env.cr.fetchall():
planned_qty_by_date[mrp_date] = qty
# Dates
moves_dates = mrp_move_obj.search(
[("product_mrp_area_id", "=", product_mrp_area.id)], order="mrp_date"
).mapped("mrp_date")
action_dates = planned_order_obj.search(
[("product_mrp_area_id", "=", product_mrp_area.id)], order="due_date"
).mapped("due_date")
mrp_dates = set(moves_dates + action_dates)
on_hand_qty = product_mrp_area.qty_available
running_availability = on_hand_qty
mrp_inventory_vals = []
for mdt in sorted(mrp_dates):
(
mrp_inventory_data,
running_availability,
on_hand_qty,
) = self._prepare_mrp_inventory_data(
product_mrp_area,
mdt,
on_hand_qty,
running_availability,
demand_qty_by_date,
supply_qty_by_date,
planned_qty_by_date,
)
mrp_inventory_vals.append(mrp_inventory_data)
if mrp_inventory_vals:
mrp_invs = self.env["mrp.inventory"].create(mrp_inventory_vals)
planned_orders = planned_order_obj.search(
[("product_mrp_area_id", "=", product_mrp_area.id)]
)
# attach planned orders to inventory
for po in planned_orders:
invs = mrp_invs.filtered(lambda i: i.date == po.due_date)
if invs:
po.mrp_inventory_id = invs[0]
@api.model
def _mrp_final_process(self, mrp_areas):
logger.info("Start MRP final process")
domain = [("product_id.llc", "<", 9999)]
if mrp_areas:
domain += [("mrp_area_id", "in", mrp_areas.ids)]
product_mrp_area_ids = self.env["product.mrp.area"].search(domain)
for product_mrp_area in product_mrp_area_ids:
# Build the time-phased inventory
# Condition "supply_method == phantom" is not added in _exclude_from_mrp
# because this function is also used to filter the explosion.
if (
self._exclude_from_mrp(
product_mrp_area.product_id, product_mrp_area.mrp_area_id
)
or product_mrp_area.supply_method == "phantom"
):
continue
self._init_mrp_inventory(product_mrp_area)
logger.info("End MRP final process")
def run_mrp_multi_level(self):
self._mrp_cleanup(self.mrp_area_ids)
mrp_lowest_llc = self._low_level_code_calculation()
self._calculate_mrp_applicable(self.mrp_area_ids)
self._mrp_initialisation(self.mrp_area_ids)
self._mrp_calculation(mrp_lowest_llc, self.mrp_area_ids)
self._mrp_final_process(self.mrp_area_ids)
# Open MRP inventory screen to show result if manually run:
# Done as sudo to allow non-admin users to read the action.
xmlid = "mrp_multi_level.mrp_inventory_action"
return self.env["ir.actions.act_window"]._for_xml_id(xmlid)