mirror of
https://github.com/OCA/manufacture.git
synced 2025-01-28 16:37:15 +02:00
Introduce a safe and configurable LLC depth limit to avoid infinite recursion. Also, improve extensibility of BoM finding and fix it to not consider archived BoMs.
835 lines
32 KiB
Python
835 lines
32 KiB
Python
# Copyright 2016 Ucamco - Wim Audenaert <wim.audenaert@ucamco.com>
|
|
# Copyright 2016-19 ForgeFlow S.L. (https://www.forgeflow.com)
|
|
# - Jordi Ballester Alomar <jordi.ballester@forgeflow.com>
|
|
# - Lois Rilo <lois.rilo@forgeflow.com>
|
|
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl.html).
|
|
|
|
import logging
|
|
from datetime import date, timedelta
|
|
|
|
from odoo import _, api, exceptions, fields, models
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MultiLevelMrp(models.TransientModel):
|
|
_name = "mrp.multi.level"
|
|
_description = "Multi Level MRP"
|
|
|
|
mrp_area_ids = fields.Many2many(
|
|
comodel_name="mrp.area",
|
|
string="MRP Areas to run",
|
|
help="If empty, all areas will be computed.",
|
|
)
|
|
|
|
@api.model
|
|
def _prepare_product_mrp_area_data(self, product_mrp_area):
|
|
qty_available = 0.0
|
|
product_obj = self.env["product.product"]
|
|
location_ids = product_mrp_area.mrp_area_id._get_locations()
|
|
for location in location_ids:
|
|
product_l = product_obj.with_context({"location": location.id}).browse(
|
|
product_mrp_area.product_id.id
|
|
)
|
|
qty_available += product_l.qty_available
|
|
|
|
return {
|
|
"product_mrp_area_id": product_mrp_area.id,
|
|
"mrp_qty_available": qty_available,
|
|
"mrp_llc": product_mrp_area.product_id.llc,
|
|
}
|
|
|
|
@api.model
|
|
def _prepare_mrp_move_data_from_stock_move(
|
|
self, product_mrp_area, move, direction="in"
|
|
):
|
|
area = product_mrp_area.mrp_area_id
|
|
if direction == "out":
|
|
mrp_type = "d"
|
|
product_qty = -move.product_qty
|
|
else:
|
|
mrp_type = "s"
|
|
product_qty = move.product_qty
|
|
po = po_line = None
|
|
mo = origin = order_number = order_origin = parent_product_id = None
|
|
if move.purchase_line_id:
|
|
po = move.purchase_line_id.order_id
|
|
order_number = po.name
|
|
order_origin = po.origin
|
|
origin = "po"
|
|
po = move.purchase_line_id.order_id.id
|
|
po_line = move.purchase_line_id.id
|
|
elif move.production_id or move.raw_material_production_id:
|
|
production = move.production_id or move.raw_material_production_id
|
|
order_number = production.name
|
|
order_origin = production.origin
|
|
origin = "mo"
|
|
mo = production.id
|
|
elif move.move_dest_ids:
|
|
for move_dest_id in move.move_dest_ids.filtered("production_id"):
|
|
production = move_dest_id.production_id
|
|
order_number = production.name
|
|
order_origin = production.origin
|
|
origin = "mo"
|
|
mo = move_dest_id.production_id.id
|
|
parent_product_id = (
|
|
move_dest_id.production_id.product_id or move_dest_id.product_id
|
|
).id
|
|
if not order_number:
|
|
source = (move.picking_id or move).origin
|
|
order_number = source or (move.picking_id or move).name
|
|
origin = "mv"
|
|
# The date to display is based on the timezone of the warehouse.
|
|
today_tz = area._datetime_to_date_tz()
|
|
move_date_tz = area._datetime_to_date_tz(move.date)
|
|
if move_date_tz > today_tz:
|
|
mrp_date = move_date_tz
|
|
else:
|
|
mrp_date = today_tz
|
|
return {
|
|
"product_id": move.product_id.id,
|
|
"product_mrp_area_id": product_mrp_area.id,
|
|
"production_id": mo,
|
|
"purchase_order_id": po,
|
|
"purchase_line_id": po_line,
|
|
"stock_move_id": move.id,
|
|
"mrp_qty": product_qty,
|
|
"current_qty": product_qty,
|
|
"mrp_date": mrp_date,
|
|
"current_date": move.date,
|
|
"mrp_type": mrp_type,
|
|
"mrp_origin": origin or "",
|
|
"mrp_order_number": order_number,
|
|
"parent_product_id": parent_product_id,
|
|
"name": order_number,
|
|
"origin": order_origin,
|
|
"state": move.state,
|
|
}
|
|
|
|
@api.model
|
|
def _prepare_planned_order_data(
|
|
self, product_mrp_area, qty, mrp_date_supply, mrp_action_date, name, values
|
|
):
|
|
return {
|
|
"product_mrp_area_id": product_mrp_area.id,
|
|
"mrp_qty": qty,
|
|
"due_date": mrp_date_supply,
|
|
"order_release_date": mrp_action_date,
|
|
"mrp_action": product_mrp_area.supply_method,
|
|
"qty_released": 0.0,
|
|
"name": "Planned supply for: " + name,
|
|
"origin": values.get("origin") or name,
|
|
"fixed": False,
|
|
}
|
|
|
|
@api.model
|
|
def _prepare_mrp_move_data_bom_explosion(
|
|
self,
|
|
product,
|
|
bomline,
|
|
qty,
|
|
mrp_date_demand_2,
|
|
bom,
|
|
name,
|
|
planned_order,
|
|
values=None,
|
|
):
|
|
product_mrp_area = self._get_product_mrp_area_from_product_and_area(
|
|
bomline.product_id, product.mrp_area_id
|
|
)
|
|
if not product_mrp_area:
|
|
raise exceptions.Warning(_("No MRP product found"))
|
|
factor = (
|
|
product.product_id.uom_id._compute_quantity(
|
|
qty, bomline.bom_id.product_uom_id
|
|
)
|
|
/ bomline.bom_id.product_qty
|
|
)
|
|
line_quantity = factor * bomline.product_qty
|
|
return {
|
|
"mrp_area_id": product.mrp_area_id.id,
|
|
"product_id": bomline.product_id.id,
|
|
"product_mrp_area_id": product_mrp_area.id,
|
|
"production_id": None,
|
|
"purchase_order_id": None,
|
|
"purchase_line_id": None,
|
|
"stock_move_id": None,
|
|
"mrp_qty": -line_quantity, # TODO: review with UoM
|
|
"current_qty": None,
|
|
"mrp_date": mrp_date_demand_2,
|
|
"current_date": None,
|
|
"mrp_type": "d",
|
|
"mrp_origin": "mrp",
|
|
"mrp_order_number": None,
|
|
"parent_product_id": bom.product_id.id,
|
|
"name": (
|
|
"Demand Bom Explosion: %s"
|
|
% (name or product.product_id.default_code or product.product_id.name)
|
|
).replace(
|
|
"Demand Bom Explosion: Demand Bom Explosion: ", "Demand Bom Explosion: "
|
|
),
|
|
"origin": planned_order.origin if planned_order else values.get("origin"),
|
|
}
|
|
|
|
@api.model
|
|
def _get_action_and_supply_dates(self, product_mrp_area, mrp_date):
|
|
if not isinstance(mrp_date, date):
|
|
mrp_date = fields.Date.from_string(mrp_date)
|
|
|
|
if mrp_date < date.today():
|
|
mrp_date_supply = date.today()
|
|
else:
|
|
mrp_date_supply = mrp_date
|
|
|
|
calendar = product_mrp_area.mrp_area_id.calendar_id
|
|
if calendar and product_mrp_area.mrp_lead_time:
|
|
date_str = fields.Date.to_string(mrp_date)
|
|
dt = fields.Datetime.from_string(date_str)
|
|
# dt is at the beginning of the day (00:00)
|
|
res = calendar.plan_days(-1 * product_mrp_area.mrp_lead_time, dt)
|
|
mrp_action_date = res.date()
|
|
else:
|
|
mrp_action_date = mrp_date - timedelta(days=product_mrp_area.mrp_lead_time)
|
|
return mrp_action_date, mrp_date_supply
|
|
|
|
@api.model
|
|
def explode_action(
|
|
self, product_mrp_area_id, mrp_action_date, name, qty, action, values=None
|
|
):
|
|
"""Explode requirements."""
|
|
mrp_date_demand = mrp_action_date
|
|
if mrp_date_demand < date.today():
|
|
mrp_date_demand = date.today()
|
|
if not product_mrp_area_id.product_id.bom_ids:
|
|
return False
|
|
bomcount = 0
|
|
for bom in product_mrp_area_id.product_id.bom_ids:
|
|
if not bom.active or not bom.bom_line_ids:
|
|
continue
|
|
bomcount += 1
|
|
if bomcount != 1:
|
|
continue
|
|
for bomline in bom.bom_line_ids:
|
|
if bomline.product_qty <= 0.00 or bomline.product_id.type != "product":
|
|
continue
|
|
if self.with_context(mrp_explosion=True)._exclude_from_mrp(
|
|
bomline.product_id, product_mrp_area_id.mrp_area_id
|
|
):
|
|
# Stop explosion.
|
|
continue
|
|
if bomline._skip_bom_line(product_mrp_area_id.product_id):
|
|
continue
|
|
# TODO: review: mrp_transit_delay, mrp_inspection_delay
|
|
mrp_date_demand_2 = mrp_date_demand - timedelta(
|
|
days=(
|
|
product_mrp_area_id.mrp_transit_delay
|
|
+ product_mrp_area_id.mrp_inspection_delay
|
|
)
|
|
)
|
|
move_data = self._prepare_mrp_move_data_bom_explosion(
|
|
product_mrp_area_id,
|
|
bomline,
|
|
qty,
|
|
mrp_date_demand_2,
|
|
bom,
|
|
name,
|
|
action,
|
|
values,
|
|
)
|
|
mrpmove_id2 = self.env["mrp.move"].create(move_data)
|
|
if hasattr(action, "mrp_move_down_ids"):
|
|
action.mrp_move_down_ids = [(4, mrpmove_id2.id)]
|
|
return True
|
|
|
|
@api.model
|
|
def create_action(self, product_mrp_area_id, mrp_date, mrp_qty, name, values=None):
|
|
if not values:
|
|
values = {}
|
|
if not isinstance(mrp_date, date):
|
|
mrp_date = fields.Date.from_string(mrp_date)
|
|
action_date, date_supply = self._get_action_and_supply_dates(
|
|
product_mrp_area_id, mrp_date
|
|
)
|
|
return self.create_planned_order(
|
|
product_mrp_area_id, mrp_qty, name, date_supply, action_date, values=values
|
|
)
|
|
|
|
@api.model
|
|
def create_planned_order(
|
|
self,
|
|
product_mrp_area_id,
|
|
mrp_qty,
|
|
name,
|
|
mrp_date_supply,
|
|
mrp_action_date,
|
|
values=None,
|
|
):
|
|
self = self.with_context(auditlog_disabled=True)
|
|
if self._exclude_from_mrp(
|
|
product_mrp_area_id.product_id, product_mrp_area_id.mrp_area_id
|
|
):
|
|
values["qty_ordered"] = 0.0
|
|
return values
|
|
|
|
qty_ordered = values.get("qty_ordered", 0.0) if values else 0.0
|
|
qty_to_order = mrp_qty
|
|
while qty_ordered < mrp_qty:
|
|
qty = product_mrp_area_id._adjust_qty_to_order(qty_to_order)
|
|
qty_to_order -= qty
|
|
order_data = self._prepare_planned_order_data(
|
|
product_mrp_area_id, qty, mrp_date_supply, mrp_action_date, name, values
|
|
)
|
|
# Do not create planned order for products that are Kits
|
|
planned_order = False
|
|
if not product_mrp_area_id.supply_method == "phantom":
|
|
planned_order = self.env["mrp.planned.order"].create(order_data)
|
|
qty_ordered = qty_ordered + qty
|
|
|
|
if product_mrp_area_id._to_be_exploded():
|
|
self.explode_action(
|
|
product_mrp_area_id,
|
|
mrp_action_date,
|
|
name,
|
|
qty,
|
|
planned_order,
|
|
values,
|
|
)
|
|
|
|
values["qty_ordered"] = qty_ordered
|
|
log_msg = "[{}] {}: qty_ordered = {}".format(
|
|
product_mrp_area_id.mrp_area_id.name,
|
|
product_mrp_area_id.product_id.default_code
|
|
or product_mrp_area_id.product_id.name,
|
|
qty_ordered,
|
|
)
|
|
logger.debug(log_msg)
|
|
return values
|
|
|
|
@api.model
|
|
def _mrp_cleanup(self, mrp_areas):
|
|
logger.info("Start MRP Cleanup")
|
|
domain = []
|
|
if mrp_areas:
|
|
domain += [("mrp_area_id", "in", mrp_areas.ids)]
|
|
self.env["mrp.move"].search(domain).unlink()
|
|
self.env["mrp.inventory"].search(domain).unlink()
|
|
domain += [("fixed", "=", False)]
|
|
self.env["mrp.planned.order"].search(domain).unlink()
|
|
logger.info("End MRP Cleanup")
|
|
return True
|
|
|
|
def _domain_bom_lines_by_llc(self, llc, product_templates):
|
|
return [
|
|
("product_id.llc", "=", llc),
|
|
("bom_id.product_tmpl_id", "in", product_templates.ids),
|
|
("bom_id.active", "=", True),
|
|
]
|
|
|
|
def _get_bom_lines_by_llc(self, llc, product_templates):
|
|
return self.env["mrp.bom.line"].search(
|
|
self._domain_bom_lines_by_llc(llc, product_templates)
|
|
)
|
|
|
|
@api.model
|
|
def _low_level_code_calculation(self):
|
|
logger.info("Start low level code calculation")
|
|
counter = 999999
|
|
llc = 0
|
|
llc_recursion_limit = (
|
|
int(
|
|
self.env["ir.config_parameter"]
|
|
.sudo()
|
|
.get_param("mrp_multi_level.llc_calculation_recursion_limit")
|
|
)
|
|
or 1000
|
|
)
|
|
self.env["product.product"].search([]).write({"llc": llc})
|
|
products = self.env["product.product"].search([("llc", "=", llc)])
|
|
if products:
|
|
counter = len(products)
|
|
log_msg = "Low level code 0 finished - Nbr. products: %s" % counter
|
|
logger.info(log_msg)
|
|
|
|
while counter:
|
|
llc += 1
|
|
products = self.env["product.product"].search([("llc", "=", llc - 1)])
|
|
p_templates = products.mapped("product_tmpl_id")
|
|
bom_lines = self._get_bom_lines_by_llc(llc - 1, p_templates)
|
|
products = bom_lines.mapped("product_id")
|
|
products.write({"llc": llc})
|
|
counter = self.env["product.product"].search_count([("llc", "=", llc)])
|
|
log_msg = "Low level code {} finished - Nbr. products: {}".format(
|
|
llc, counter
|
|
)
|
|
logger.info(log_msg)
|
|
if llc > llc_recursion_limit:
|
|
logger.error("Recursion limit reached during LLC calculation.")
|
|
break
|
|
|
|
mrp_lowest_llc = llc
|
|
logger.info("End low level code calculation")
|
|
return mrp_lowest_llc
|
|
|
|
@api.model
|
|
def _adjust_mrp_applicable(self, mrp_areas):
|
|
"""This method is meant to modify the products that are applicable
|
|
to MRP Multi level calculation
|
|
"""
|
|
return True
|
|
|
|
@api.model
|
|
def _calculate_mrp_applicable(self, mrp_areas):
|
|
logger.info("Start Calculate MRP Applicable")
|
|
domain = []
|
|
if mrp_areas:
|
|
domain += [("mrp_area_id", "in", mrp_areas.ids)]
|
|
self.env["product.mrp.area"].search(domain).write({"mrp_applicable": False})
|
|
domain += [("product_id.type", "=", "product")]
|
|
self.env["product.mrp.area"].search(domain).write({"mrp_applicable": True})
|
|
self._adjust_mrp_applicable(mrp_areas)
|
|
count_domain = [("mrp_applicable", "=", True)]
|
|
if mrp_areas:
|
|
count_domain += [("mrp_area_id", "in", mrp_areas.ids)]
|
|
counter = self.env["product.mrp.area"].search(count_domain, count=True)
|
|
log_msg = "End Calculate MRP Applicable: %s" % counter
|
|
logger.info(log_msg)
|
|
return True
|
|
|
|
@api.model
|
|
def _init_mrp_move_from_forecast(self, product_mrp_area):
|
|
"""This method is meant to be inherited to add a forecast mechanism."""
|
|
return True
|
|
|
|
@api.model
|
|
def _init_mrp_move_from_stock_move(self, product_mrp_area):
|
|
move_obj = self.env["stock.move"]
|
|
mrp_move_obj = self.env["mrp.move"]
|
|
in_domain = product_mrp_area._in_stock_moves_domain()
|
|
in_moves = move_obj.search(in_domain)
|
|
out_domain = product_mrp_area._out_stock_moves_domain()
|
|
out_moves = move_obj.search(out_domain)
|
|
move_vals = []
|
|
if in_moves:
|
|
for move in in_moves:
|
|
move_data = self._prepare_mrp_move_data_from_stock_move(
|
|
product_mrp_area, move, direction="in"
|
|
)
|
|
move_vals.append(move_data)
|
|
if out_moves:
|
|
for move in out_moves:
|
|
move_data = self._prepare_mrp_move_data_from_stock_move(
|
|
product_mrp_area, move, direction="out"
|
|
)
|
|
move_vals.append(move_data)
|
|
mrp_move_obj.create(move_vals)
|
|
return True
|
|
|
|
@api.model
|
|
def _prepare_mrp_move_data_from_purchase_order(self, poline, product_mrp_area):
|
|
mrp_date = date.today()
|
|
if fields.Date.from_string(poline.date_planned) > date.today():
|
|
mrp_date = fields.Date.from_string(poline.date_planned)
|
|
return {
|
|
"product_id": poline.product_id.id,
|
|
"product_mrp_area_id": product_mrp_area.id,
|
|
"production_id": None,
|
|
"purchase_order_id": poline.order_id.id,
|
|
"purchase_line_id": poline.id,
|
|
"stock_move_id": None,
|
|
"mrp_qty": poline.product_uom_qty,
|
|
"current_qty": poline.product_uom_qty,
|
|
"mrp_date": mrp_date,
|
|
"current_date": poline.date_planned,
|
|
"mrp_type": "s",
|
|
"mrp_origin": "po",
|
|
"mrp_order_number": poline.order_id.name,
|
|
"parent_product_id": None,
|
|
"name": poline.order_id.name,
|
|
"state": poline.order_id.state,
|
|
}
|
|
|
|
@api.model
|
|
def _init_mrp_move_from_purchase_order(self, product_mrp_area):
|
|
location_ids = product_mrp_area.mrp_area_id._get_locations()
|
|
picking_types = self.env["stock.picking.type"].search(
|
|
[("default_location_dest_id", "in", location_ids.ids)]
|
|
)
|
|
picking_type_ids = [ptype.id for ptype in picking_types]
|
|
orders = self.env["purchase.order"].search(
|
|
[
|
|
("picking_type_id", "in", picking_type_ids),
|
|
("state", "in", ["draft", "sent", "to approve"]),
|
|
]
|
|
)
|
|
po_lines = self.env["purchase.order.line"].search(
|
|
[
|
|
("order_id", "in", orders.ids),
|
|
("product_qty", ">", 0.0),
|
|
("product_id", "=", product_mrp_area.product_id.id),
|
|
]
|
|
)
|
|
|
|
mrp_move_vals = []
|
|
for line in po_lines:
|
|
mrp_move_data = self._prepare_mrp_move_data_from_purchase_order(
|
|
line, product_mrp_area
|
|
)
|
|
mrp_move_vals.append(mrp_move_data)
|
|
if mrp_move_vals:
|
|
self.env["mrp.move"].create(mrp_move_vals)
|
|
|
|
@api.model
|
|
def _get_product_mrp_area_from_product_and_area(self, product, mrp_area):
|
|
return self.env["product.mrp.area"].search(
|
|
[("product_id", "=", product.id), ("mrp_area_id", "=", mrp_area.id)],
|
|
limit=1,
|
|
)
|
|
|
|
@api.model
|
|
def _init_mrp_move(self, product_mrp_area):
|
|
self._init_mrp_move_from_forecast(product_mrp_area)
|
|
self._init_mrp_move_from_stock_move(product_mrp_area)
|
|
self._init_mrp_move_from_purchase_order(product_mrp_area)
|
|
|
|
@api.model
|
|
def _exclude_from_mrp(self, product, mrp_area):
|
|
"""To extend with various logic where needed."""
|
|
product_mrp_area = self.env["product.mrp.area"].search(
|
|
[("product_id", "=", product.id), ("mrp_area_id", "=", mrp_area.id)],
|
|
limit=1,
|
|
)
|
|
if not product_mrp_area:
|
|
return True
|
|
return product_mrp_area.mrp_exclude
|
|
|
|
@api.model
|
|
def _mrp_initialisation(self, mrp_areas):
|
|
logger.info("Start MRP initialisation")
|
|
if not mrp_areas:
|
|
mrp_areas = self.env["mrp.area"].search([])
|
|
product_mrp_areas = self.env["product.mrp.area"].search(
|
|
[("mrp_area_id", "in", mrp_areas.ids), ("mrp_applicable", "=", True)]
|
|
)
|
|
init_counter = 0
|
|
for mrp_area in mrp_areas:
|
|
for product_mrp_area in product_mrp_areas.filtered(
|
|
lambda a: a.mrp_area_id == mrp_area
|
|
):
|
|
if self._exclude_from_mrp(product_mrp_area.product_id, mrp_area):
|
|
continue
|
|
init_counter += 1
|
|
log_msg = "MRP Init: {} - {} ".format(
|
|
init_counter, product_mrp_area.display_name
|
|
)
|
|
logger.info(log_msg)
|
|
self._init_mrp_move(product_mrp_area)
|
|
logger.info("End MRP initialisation")
|
|
|
|
@api.model
|
|
def _init_mrp_move_grouped_demand(self, nbr_create, product_mrp_area):
|
|
last_date = None
|
|
last_qty = 0.00
|
|
onhand = product_mrp_area.qty_available
|
|
grouping_delta = product_mrp_area.mrp_nbr_days
|
|
demand_origin = []
|
|
for move in product_mrp_area.mrp_move_ids:
|
|
if self._exclude_move(move):
|
|
continue
|
|
if (
|
|
last_date
|
|
and (
|
|
fields.Date.from_string(move.mrp_date)
|
|
>= last_date + timedelta(days=grouping_delta)
|
|
)
|
|
and (
|
|
(onhand + last_qty + move.mrp_qty)
|
|
< product_mrp_area.mrp_minimum_stock
|
|
or (onhand + last_qty) < product_mrp_area.mrp_minimum_stock
|
|
)
|
|
):
|
|
name = _(
|
|
"Grouped Demand of %(product_name)s for %(delta_days)d Days"
|
|
) % dict(
|
|
product_name=product_mrp_area.product_id.display_name,
|
|
delta_days=grouping_delta,
|
|
)
|
|
origin = ",".join(list({x for x in demand_origin if x}))
|
|
qtytoorder = product_mrp_area.mrp_minimum_stock - onhand - last_qty
|
|
cm = self.create_action(
|
|
product_mrp_area_id=product_mrp_area,
|
|
mrp_date=last_date,
|
|
mrp_qty=qtytoorder,
|
|
name=name,
|
|
values=dict(origin=origin),
|
|
)
|
|
qty_ordered = cm.get("qty_ordered", 0.0)
|
|
onhand = onhand + last_qty + qty_ordered
|
|
last_date = None
|
|
last_qty = 0.00
|
|
nbr_create += 1
|
|
demand_origin = []
|
|
if (
|
|
onhand + last_qty + move.mrp_qty
|
|
) < product_mrp_area.mrp_minimum_stock or (
|
|
onhand + last_qty
|
|
) < product_mrp_area.mrp_minimum_stock:
|
|
if not last_date or last_qty == 0.0:
|
|
last_date = fields.Date.from_string(move.mrp_date)
|
|
last_qty = move.mrp_qty
|
|
else:
|
|
last_qty += move.mrp_qty
|
|
else:
|
|
last_date = fields.Date.from_string(move.mrp_date)
|
|
onhand += move.mrp_qty
|
|
if move.mrp_type == "d":
|
|
demand_origin.append(move.origin or move.name)
|
|
|
|
if last_date and last_qty != 0.00:
|
|
name = _(
|
|
"Grouped Demand of %(product_name)s for %(delta_days)d Days"
|
|
) % dict(
|
|
product_name=product_mrp_area.product_id.display_name,
|
|
delta_days=grouping_delta,
|
|
)
|
|
origin = ",".join(list({x for x in demand_origin if x}))
|
|
qtytoorder = product_mrp_area.mrp_minimum_stock - onhand - last_qty
|
|
cm = self.create_action(
|
|
product_mrp_area_id=product_mrp_area,
|
|
mrp_date=last_date,
|
|
mrp_qty=qtytoorder,
|
|
name=name,
|
|
values=dict(origin=origin),
|
|
)
|
|
qty_ordered = cm.get("qty_ordered", 0.0)
|
|
onhand += qty_ordered
|
|
nbr_create += 1
|
|
return nbr_create
|
|
|
|
@api.model
|
|
def _exclude_move(self, move):
|
|
"""Improve extensibility being able to exclude special moves."""
|
|
return False
|
|
|
|
@api.model
|
|
def _mrp_calculation(self, mrp_lowest_llc, mrp_areas):
|
|
logger.info("Start MRP calculation")
|
|
product_mrp_area_obj = self.env["product.mrp.area"]
|
|
counter = 0
|
|
if not mrp_areas:
|
|
mrp_areas = self.env["mrp.area"].search([])
|
|
for mrp_area in mrp_areas:
|
|
llc = 0
|
|
while mrp_lowest_llc > llc:
|
|
product_mrp_areas = product_mrp_area_obj.search(
|
|
[("product_id.llc", "=", llc), ("mrp_area_id", "=", mrp_area.id)]
|
|
)
|
|
llc += 1
|
|
|
|
for product_mrp_area in product_mrp_areas:
|
|
nbr_create = 0
|
|
onhand = product_mrp_area.qty_available
|
|
if product_mrp_area.mrp_nbr_days == 0:
|
|
for move in product_mrp_area.mrp_move_ids:
|
|
if self._exclude_move(move):
|
|
continue
|
|
qtytoorder = (
|
|
product_mrp_area.mrp_minimum_stock
|
|
- onhand
|
|
- move.mrp_qty
|
|
)
|
|
if qtytoorder > 0.0:
|
|
cm = self.create_action(
|
|
product_mrp_area_id=product_mrp_area,
|
|
mrp_date=move.mrp_date,
|
|
mrp_qty=qtytoorder,
|
|
name=move.name or "",
|
|
values=dict(origin=move.origin or ""),
|
|
)
|
|
qty_ordered = cm["qty_ordered"]
|
|
onhand += move.mrp_qty + qty_ordered
|
|
nbr_create += 1
|
|
else:
|
|
onhand += move.mrp_qty
|
|
else:
|
|
nbr_create = self._init_mrp_move_grouped_demand(
|
|
nbr_create, product_mrp_area
|
|
)
|
|
|
|
if onhand < product_mrp_area.mrp_minimum_stock and nbr_create == 0:
|
|
qtytoorder = product_mrp_area.mrp_minimum_stock - onhand
|
|
name = _("Safety Stock")
|
|
cm = self.create_action(
|
|
product_mrp_area_id=product_mrp_area,
|
|
mrp_date=date.today(),
|
|
mrp_qty=qtytoorder,
|
|
name=name,
|
|
values=dict(origin=name),
|
|
)
|
|
qty_ordered = cm["qty_ordered"]
|
|
onhand += qty_ordered
|
|
counter += 1
|
|
|
|
log_msg = "MRP Calculation LLC {} Finished - Nbr. products: {}".format(
|
|
llc - 1, counter
|
|
)
|
|
logger.info(log_msg)
|
|
|
|
logger.info("Enb MRP calculation")
|
|
|
|
@api.model
|
|
def _get_demand_groups(self, product_mrp_area):
|
|
query = """
|
|
SELECT mrp_date, sum(mrp_qty)
|
|
FROM mrp_move
|
|
WHERE product_mrp_area_id = %(mrp_product)s
|
|
AND mrp_type = 'd'
|
|
GROUP BY mrp_date
|
|
"""
|
|
params = {"mrp_product": product_mrp_area.id}
|
|
return query, params
|
|
|
|
@api.model
|
|
def _get_supply_groups(self, product_mrp_area):
|
|
query = """
|
|
SELECT mrp_date, sum(mrp_qty)
|
|
FROM mrp_move
|
|
WHERE product_mrp_area_id = %(mrp_product)s
|
|
AND mrp_type = 's'
|
|
GROUP BY mrp_date
|
|
"""
|
|
params = {"mrp_product": product_mrp_area.id}
|
|
return query, params
|
|
|
|
@api.model
|
|
def _get_planned_order_groups(self, product_mrp_area):
|
|
query = """
|
|
SELECT due_date, sum(mrp_qty)
|
|
FROM mrp_planned_order
|
|
WHERE product_mrp_area_id = %(mrp_product)s
|
|
GROUP BY due_date
|
|
"""
|
|
params = {"mrp_product": product_mrp_area.id}
|
|
return query, params
|
|
|
|
@api.model
|
|
def _prepare_mrp_inventory_data(
|
|
self,
|
|
product_mrp_area,
|
|
mdt,
|
|
on_hand_qty,
|
|
running_availability,
|
|
demand_qty_by_date,
|
|
supply_qty_by_date,
|
|
planned_qty_by_date,
|
|
):
|
|
"""Return dict to create mrp.inventory records on MRP Multi Level Scheduler"""
|
|
mrp_inventory_data = {"product_mrp_area_id": product_mrp_area.id, "date": mdt}
|
|
demand_qty = demand_qty_by_date.get(mdt, 0.0)
|
|
mrp_inventory_data["demand_qty"] = abs(demand_qty)
|
|
supply_qty = supply_qty_by_date.get(mdt, 0.0)
|
|
mrp_inventory_data["supply_qty"] = abs(supply_qty)
|
|
mrp_inventory_data["initial_on_hand_qty"] = on_hand_qty
|
|
on_hand_qty += supply_qty + demand_qty
|
|
mrp_inventory_data["final_on_hand_qty"] = on_hand_qty
|
|
# Consider that MRP plan is followed exactly:
|
|
running_availability += (
|
|
supply_qty + demand_qty + planned_qty_by_date.get(mdt, 0.0)
|
|
)
|
|
mrp_inventory_data["running_availability"] = running_availability
|
|
return mrp_inventory_data, running_availability, on_hand_qty
|
|
|
|
@api.model
|
|
def _init_mrp_inventory(self, product_mrp_area):
|
|
mrp_move_obj = self.env["mrp.move"]
|
|
planned_order_obj = self.env["mrp.planned.order"]
|
|
# Read Demand
|
|
demand_qty_by_date = {}
|
|
query, params = self._get_demand_groups(product_mrp_area)
|
|
self.env.cr.execute(query, params)
|
|
for mrp_date, qty in self.env.cr.fetchall():
|
|
demand_qty_by_date[mrp_date] = qty
|
|
# Read Supply
|
|
supply_qty_by_date = {}
|
|
query, params = self._get_supply_groups(product_mrp_area)
|
|
self.env.cr.execute(query, params)
|
|
for mrp_date, qty in self.env.cr.fetchall():
|
|
supply_qty_by_date[mrp_date] = qty
|
|
# Read planned orders:
|
|
planned_qty_by_date = {}
|
|
query, params = self._get_planned_order_groups(product_mrp_area)
|
|
self.env.cr.execute(query, params)
|
|
for mrp_date, qty in self.env.cr.fetchall():
|
|
planned_qty_by_date[mrp_date] = qty
|
|
# Dates
|
|
moves_dates = mrp_move_obj.search(
|
|
[("product_mrp_area_id", "=", product_mrp_area.id)], order="mrp_date"
|
|
).mapped("mrp_date")
|
|
action_dates = planned_order_obj.search(
|
|
[("product_mrp_area_id", "=", product_mrp_area.id)], order="due_date"
|
|
).mapped("due_date")
|
|
mrp_dates = set(moves_dates + action_dates)
|
|
on_hand_qty = product_mrp_area.product_id.with_context(
|
|
location=product_mrp_area.mrp_area_id.location_id.id
|
|
)._product_available()[product_mrp_area.product_id.id]["qty_available"]
|
|
running_availability = on_hand_qty
|
|
mrp_inventory_vals = []
|
|
for mdt in sorted(mrp_dates):
|
|
(
|
|
mrp_inventory_data,
|
|
running_availability,
|
|
on_hand_qty,
|
|
) = self._prepare_mrp_inventory_data(
|
|
product_mrp_area,
|
|
mdt,
|
|
on_hand_qty,
|
|
running_availability,
|
|
demand_qty_by_date,
|
|
supply_qty_by_date,
|
|
planned_qty_by_date,
|
|
)
|
|
mrp_inventory_vals.append(mrp_inventory_data)
|
|
if mrp_inventory_vals:
|
|
mrp_invs = self.env["mrp.inventory"].create(mrp_inventory_vals)
|
|
planned_orders = planned_order_obj.search(
|
|
[("product_mrp_area_id", "=", product_mrp_area.id)]
|
|
)
|
|
# attach planned orders to inventory
|
|
for po in planned_orders:
|
|
invs = mrp_invs.filtered(lambda i: i.date == po.due_date)
|
|
if invs:
|
|
po.mrp_inventory_id = invs[0]
|
|
|
|
@api.model
|
|
def _mrp_final_process(self, mrp_areas):
|
|
logger.info("Start MRP final process")
|
|
domain = [("product_id.llc", "<", 9999)]
|
|
if mrp_areas:
|
|
domain += [("mrp_area_id", "in", mrp_areas.ids)]
|
|
product_mrp_area_ids = self.env["product.mrp.area"].search(domain)
|
|
|
|
for product_mrp_area in product_mrp_area_ids:
|
|
# Build the time-phased inventory
|
|
# Condition "supply_method == phantom" is not added in _exclude_from_mrp
|
|
# because this function is also used to filter the explosion.
|
|
if (
|
|
self._exclude_from_mrp(
|
|
product_mrp_area.product_id, product_mrp_area.mrp_area_id
|
|
)
|
|
or product_mrp_area.supply_method == "phantom"
|
|
):
|
|
continue
|
|
self._init_mrp_inventory(product_mrp_area)
|
|
logger.info("End MRP final process")
|
|
|
|
def run_mrp_multi_level(self):
|
|
self._mrp_cleanup(self.mrp_area_ids)
|
|
mrp_lowest_llc = self._low_level_code_calculation()
|
|
self._calculate_mrp_applicable(self.mrp_area_ids)
|
|
self._mrp_initialisation(self.mrp_area_ids)
|
|
self._mrp_calculation(mrp_lowest_llc, self.mrp_area_ids)
|
|
self._mrp_final_process(self.mrp_area_ids)
|
|
# Open MRP inventory screen to show result if manually run:
|
|
# Done as sudo to allow non-admin users to read the action.
|
|
xmlid = "mrp_multi_level.mrp_inventory_action"
|
|
return self.env["ir.actions.act_window"]._for_xml_id(xmlid)
|