Merge branch 'mig/15.0/sale_planner' into '15.0'

mig/15.0/sale_planner into 15.0

See merge request hibou-io/hibou-odoo/suite!1098
This commit is contained in:
Jared Kipe
2021-10-06 17:12:49 +00:00
20 changed files with 1734 additions and 0 deletions

2
sale_planner/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from . import wizard
from . import models

View File

@@ -0,0 +1,43 @@
{
'name': 'Sale Order Planner',
'summary': 'Plans order dates and warehouses.',
'version': '15.0.1.0.0',
'author': "Hibou Corp.",
'category': 'Sale',
'license': 'AGPL-3',
'complexity': 'expert',
'images': [],
'website': "https://hibou.io",
'description': """
Sale Order Planner
==================
Plans sales order dates based on available warehouses and shipping methods.
Adds shipping calendar to warehouse to plan delivery orders based on availability
of the warehouse or warehouse staff.
Adds shipping calendar to individual shipping methods to estimate delivery based
on the specific method's characteristics. (e.g. Do they deliver on Saturday?)
""",
'depends': [
'sale_sourced_by_line',
'base_geolocalize',
'delivery',
'resource',
'stock',
],
'demo': [],
'data': [
'security/ir.model.access.csv',
'wizard/order_planner_views.xml',
'views/sale.xml',
'views/stock.xml',
'views/delivery.xml',
'views/product.xml',
],
'auto_install': False,
'installable': True,
}

View File

@@ -0,0 +1,7 @@
from . import delivery
from . import partner
from . import planning
from . import product
from . import resource
from . import sale
from . import stock

View File

@@ -0,0 +1,106 @@
from datetime import timedelta
from odoo import api, fields, models
class DeliveryCarrier(models.Model):
_inherit = 'delivery.carrier'
delivery_calendar_id = fields.Many2one(
'resource.calendar', 'Delivery Calendar',
help="This calendar represents days that the carrier will deliver the package.")
# -------------------------- #
# API for external providers #
# -------------------------- #
def get_shipping_price_for_plan(self, orders, date_planned):
''' For every sale order, compute the price of the shipment
:param orders: A recordset of sale orders
:param date_planned: Date to say that the shipment is leaving.
:return list: A list of floats, containing the estimated price for the shipping of the sale order
'''
self.ensure_one()
if hasattr(self, '%s_get_shipping_price_for_plan' % self.delivery_type):
return getattr(self, '%s_get_shipping_price_for_plan' % self.delivery_type)(orders, date_planned)
def rate_shipment_date_planned(self, order, date_planned=None):
"""
For every sale order, compute the price of the shipment and potentially when it will arrive.
:param order: `sale.order`
:param date_planned: The date the shipment is expected to leave/be picked up by carrier.
:return: rate in the same form the normal `rate_shipment` method does BUT
-- Additional keys
- transit_days: int
- date_delivered: string
"""
self.ensure_one()
if hasattr(self, '%s_rate_shipment_date_planned' % self.delivery_type):
# New API Odoo 11 - Carrier specific override.
return getattr(self, '%s_rate_shipment_date_planned' % self.delivery_type)(order, date_planned)
rate = self.with_context(date_planned=date_planned).rate_shipment(order)
if rate and date_planned:
if rate.get('date_delivered'):
date_delivered = rate['date_delivered']
transit_days = self.calculate_transit_days(date_planned, date_delivered)
if not rate.get('transit_days') or transit_days < rate.get('transit_days'):
rate['transit_days'] = transit_days
elif rate.get('transit_days'):
rate['date_delivered'] = self.calculate_date_delivered(date_planned, rate.get('transit_days'))
elif rate:
date_delivered = rate.get('date_delivered')
if date_delivered and not rate.get('transit_days'):
# we could have a date delivered based on shipping it "now"
# so we can still calculate the transit days
rate['transit_days'] = self.calculate_transit_days(fields.Datetime.now(), date_delivered)
rate.pop('date_delivered') # because we don't have a date_planned, we cannot have a guarenteed delivery
return rate
def calculate_transit_days(self, date_planned, date_delivered):
self.ensure_one()
if isinstance(date_planned, str):
date_planned = fields.Datetime.from_string(date_planned)
if isinstance(date_delivered, str):
date_delivered = fields.Datetime.from_string(date_delivered)
transit_days = 0
while date_planned < date_delivered:
if transit_days > 10:
break
current_date_planned = self.delivery_calendar_id.plan_days(1, date_planned, compute_leaves=True)
if not current_date_planned:
return self._calculate_transit_days_naive(date_planned, date_delivered)
if current_date_planned == date_planned:
date_planned += timedelta(days=1)
else:
date_planned = current_date_planned
transit_days += 1
if transit_days > 1:
transit_days -= 1
return transit_days
def _calculate_transit_days_naive(self, date_planned, date_delivered):
return abs((date_delivered - date_planned).days)
def calculate_date_delivered(self, date_planned, transit_days):
self.ensure_one()
if isinstance(date_planned, str):
date_planned = fields.Datetime.from_string(date_planned)
# date calculations needs an extra day
effective_transit_days = transit_days + 1
last_day = self.delivery_calendar_id.plan_days_end(effective_transit_days, date_planned, compute_leaves=True)
if not last_day:
return self._calculate_date_delivered_naive(date_planned, transit_days)
return last_day
def _calculate_date_delivered_naive(self, date_planned, transit_days):
return date_planned + timedelta(days=transit_days)

View File

@@ -0,0 +1,31 @@
from odoo import api, fields, models
try:
from uszipcode import SearchEngine
except ImportError:
SearchEngine = None
class Partner(models.Model):
_inherit = 'res.partner'
def geo_localize(self):
# We need country names in English below
for partner in self.with_context(lang='en_US'):
try:
if SearchEngine and partner.zip:
with SearchEngine() as search:
zipcode = search.by_zipcode(str(self.zip).split('-')[0])
if zipcode and zipcode.lat:
partner.write({
'partner_latitude': zipcode.lat,
'partner_longitude': zipcode.lng,
'date_localization': fields.Date.context_today(partner),
})
else:
super(Partner, partner).geo_localize()
else:
super(Partner, partner).geo_localize()
except:
pass
return True

View File

@@ -0,0 +1,16 @@
from odoo import api, fields, models
class PlanningPolicy(models.Model):
_name = 'sale.order.planning.policy'
name = fields.Char(string='Name')
carrier_filter_id = fields.Many2one(
'ir.filters',
string='Delivery Carrier Filter',
)
warehouse_filter_id = fields.Many2one(
'ir.filters',
string='Warehouse Filter',
)
always_closest_warehouse = fields.Boolean(string='Always Plan Closest Warehouse')

View File

@@ -0,0 +1,21 @@
from odoo import api, fields, models
class ProductTemplate(models.Model):
_inherit = 'product.template'
property_planning_policy_id = fields.Many2one('sale.order.planning.policy', company_dependent=True,
string="Order Planner Policy",
help="The Order Planner Policy to use when making a sale order planner.")
def get_planning_policy(self):
self.ensure_one()
return self.property_planning_policy_id or self.categ_id.property_planning_policy_categ_id
class ProductCategory(models.Model):
_inherit = 'product.category'
property_planning_policy_categ_id = fields.Many2one('sale.order.planning.policy', company_dependent=True,
string="Order Planner Policy",
help="The Order Planner Policy to use when making a sale order planner.")

View File

@@ -0,0 +1,46 @@
from functools import partial
from datetime import timedelta
from odoo import api, models
from odoo.addons.resource.models.resource import make_aware
class ResourceCalendar(models.Model):
_inherit = 'resource.calendar'
def plan_days_end(self, days, day_dt, compute_leaves=False, domain=None):
"""
Override to `plan_days` that allows you to get the nearest 'end' including today.
"""
day_dt, revert = make_aware(day_dt)
# which method to use for retrieving intervals
if compute_leaves:
get_intervals = partial(self._work_intervals_batch, domain=domain)
else:
get_intervals = self._attendance_intervals_batch
if days >= 0:
found = set()
delta = timedelta(days=14)
for n in range(100):
dt = day_dt + delta * n
for start, stop, meta in get_intervals(dt, dt + delta)[False]:
found.add(start.date())
if len(found) >= days:
return revert(stop)
return False
elif days < 0:
days = abs(days)
found = set()
delta = timedelta(days=14)
for n in range(100):
dt = day_dt - delta * n
for start, stop, meta in reversed(get_intervals(dt - delta, dt))[False]:
found.add(start.date())
if len(found) == days:
return revert(start)
return False
else:
return revert(day_dt)

View File

@@ -0,0 +1,15 @@
from odoo import api, fields, models
class SaleOrder(models.Model):
_inherit = 'sale.order'
def action_planorder(self):
plan_obj = self.env['sale.order.make.plan']
for order in self:
plan = plan_obj.create({
'order_id': order.id,
})
action = self.env.ref('sale_planner.action_plan_sale_order').read()[0]
action['res_id'] = plan.id
return action

View File

@@ -0,0 +1,9 @@
from odoo import api, fields, models
class Warehouse(models.Model):
_inherit = 'stock.warehouse'
shipping_calendar_id = fields.Many2one(
'resource.calendar', 'Shipping Calendar',
help="This calendar represents shipping availability from the warehouse.")

View File

@@ -0,0 +1,4 @@
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_sale_order_make_plan,access_sale_order_make_plan,model_sale_order_make_plan,base.group_user,1,1,1,1
access_sale_order_planning_option,access_sale_order_planning_option,model_sale_order_planning_option,base.group_user,1,1,1,1
access_sale_order_planning_policy,access_sale_order_planning_policy,model_sale_order_planning_policy,base.group_user,1,1,1,1
1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
2 access_sale_order_make_plan access_sale_order_make_plan model_sale_order_make_plan base.group_user 1 1 1 1
3 access_sale_order_planning_option access_sale_order_planning_option model_sale_order_planning_option base.group_user 1 1 1 1
4 access_sale_order_planning_policy access_sale_order_planning_policy model_sale_order_planning_policy base.group_user 1 1 1 1

View File

@@ -0,0 +1 @@
from . import test_planner

View File

@@ -0,0 +1,479 @@
from odoo.tests import common
from datetime import datetime, timedelta
from json import loads as json_decode
from logging import getLogger
_logger = getLogger(__name__)
class TestPlanner(common.TransactionCase):
# @todo Test date planning!
def setUp(self):
super(TestPlanner, self).setUp()
self.today = datetime.today()
self.tomorrow = datetime.today() + timedelta(days=1)
# This partner has a parent
self.country_usa = self.env['res.country'].search([('name', '=', 'United States')], limit=1)
self.state_wa = self.env['res.country.state'].search([('name', '=', 'Washington')], limit=1)
self.state_co = self.env['res.country.state'].search([('name', '=', 'Colorado')], limit=1)
self.partner_wa = self.env['res.partner'].create({
'name': 'Jared',
'street': '1234 Test Street',
'city': 'Marysville',
'state_id': self.state_wa.id,
'zip': '98270',
'country_id': self.country_usa.id,
'partner_latitude': 48.05636,
'partner_longitude': -122.14922,
})
self.warehouse_partner_1 = self.env['res.partner'].create({
'name': 'WH1',
'street': '1234 Test Street',
'city': 'Lynnwood',
'state_id': self.state_wa.id,
'zip': '98036',
'country_id': self.country_usa.id,
'partner_latitude': 47.82093,
'partner_longitude': -122.31513,
})
self.warehouse_partner_2 = self.env['res.partner'].create({
'name': 'WH2',
'street': '1234 Test Street',
'city': 'Craig',
'state_id': self.state_co.id,
'zip': '81625',
'country_id': self.country_usa.id,
'partner_latitude': 40.51525,
'partner_longitude': -107.54645,
})
hour_from = (self.today.hour - 1) % 24
hour_to = (self.today.hour + 1) % 24
if hour_to < hour_from:
hour_to, hour_from = hour_from, hour_to
self.warehouse_calendar_1 = self.env['resource.calendar'].create({
'name': 'Washington Warehouse Hours',
'tz': 'UTC',
'attendance_ids': [
(0, 0, {'name': 'today',
'dayofweek': str(self.today.weekday()),
'hour_from': hour_from,
'hour_to': hour_to,
'day_period': 'morning'}),
(0, 0, {'name': 'tomorrow',
'dayofweek': str(self.tomorrow.weekday()),
'hour_from': hour_from,
'hour_to': hour_to,
'day_period': 'morning'}),
]
})
self.warehouse_calendar_2 = self.env['resource.calendar'].create({
'name': 'Colorado Warehouse Hours',
'tz': 'UTC',
'attendance_ids': [
(0, 0, {'name': 'tomorrow',
'dayofweek': str(self.tomorrow.weekday()),
'hour_from': hour_from,
'hour_to': hour_to,
'day_period': 'morning'}),
]
})
self.warehouse_1 = self.env['stock.warehouse'].create({
'name': 'Washington Warehouse',
'partner_id': self.warehouse_partner_1.id,
'code': 'TWH1',
'shipping_calendar_id': self.warehouse_calendar_1.id,
})
self.warehouse_2 = self.env['stock.warehouse'].create({
'name': 'Colorado Warehouse',
'partner_id': self.warehouse_partner_2.id,
'code': 'TWH2',
'shipping_calendar_id': self.warehouse_calendar_2.id,
})
self.so = self.env['sale.order'].create({
'partner_id': self.partner_wa.id,
'warehouse_id': self.warehouse_1.id,
})
self.product_1 = self.env['product.template'].create({
'name': 'Product for WH1',
'type': 'product',
'standard_price': 1.0,
})
self.product_12 = self.env['product.template'].create({
'name': 'Product for WH1 Second',
'type': 'product',
'standard_price': 1.0,
})
self.product_1 = self.product_1.product_variant_id
self.product_12 = self.product_12.product_variant_id
self.product_2 = self.env['product.template'].create({
'name': 'Product for WH2',
'type': 'product',
'standard_price': 1.0,
})
self.product_22 = self.env['product.template'].create({
'name': 'Product for WH2 Second',
'type': 'product',
'standard_price': 1.0,
})
self.product_2 = self.product_2.product_variant_id
self.product_22 = self.product_22.product_variant_id
self.product_both = self.env['product.template'].create({
'name': 'Product for Both',
'type': 'product',
'standard_price': 1.0,
})
self.product_both = self.product_both.product_variant_id
self.env['stock.quant'].create({
'location_id': self.warehouse_1.lot_stock_id.id,
'product_id': self.product_1.id,
'quantity': 100,
})
self.env['stock.quant'].create({
'location_id': self.warehouse_1.lot_stock_id.id,
'product_id': self.product_12.id,
'quantity': 100,
})
self.env['stock.quant'].create({
'location_id': self.warehouse_1.lot_stock_id.id,
'product_id': self.product_both.id,
'quantity': 100,
})
self.env['stock.quant'].create({
'location_id': self.warehouse_2.lot_stock_id.id,
'product_id': self.product_2.id,
'quantity': 100,
})
self.env['stock.quant'].create({
'location_id': self.warehouse_2.lot_stock_id.id,
'product_id': self.product_22.id,
'quantity': 100,
})
self.env['stock.quant'].create({
'location_id': self.warehouse_2.lot_stock_id.id,
'product_id': self.product_both.id,
'quantity': 100,
})
self.policy_closest = self.env['sale.order.planning.policy'].create({
'always_closest_warehouse': True,
})
self.policy_other = self.env['sale.order.planning.policy'].create({})
self.wh_filter_1 = self.env['ir.filters'].create({
'name': 'TWH1 Only',
'domain': "[('id', '=', %d)]" % (self.warehouse_1.id, ),
'model_id': 'stock.warehouse',
})
self.wh_filter_2 = self.env['ir.filters'].create({
'name': 'TWH2 Only',
'domain': "[('id', '=', %d)]" % (self.warehouse_2.id,),
'model_id': 'stock.warehouse',
})
self.policy_wh_1 = self.env['sale.order.planning.policy'].create({
'warehouse_filter_id': self.wh_filter_1.id,
})
self.policy_wh_2 = self.env['sale.order.planning.policy'].create({
'warehouse_filter_id': self.wh_filter_2.id,
})
def both_wh_ids(self):
return [self.warehouse_1.id, self.warehouse_2.id]
def test_10_planner_creation_internals(self):
"""
Tests certain internal representations and that we can create a basic plan.
"""
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_1.id,
'name': 'demo',
})
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)], skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertEqual(set(both_wh_ids), set(planner.get_warehouses().ids))
fake_order = planner._fake_order(self.so)
base_option = planner.generate_base_option(fake_order)
self.assertTrue(base_option, 'Must have base option.')
self.assertEqual(self.warehouse_1.id, base_option['warehouse_id'])
def test_21_planner_creation(self):
"""
Scenario where only one warehouse has inventory on the order line.
This is "the closest" warehouse.
"""
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_1.id,
'name': 'demo',
})
self.assertEqual(self.product_1.with_context(warehouse=self.warehouse_2.id).qty_available, 0)
self.product_1.invalidate_cache(fnames=['qty_available'], ids=self.product_1.ids)
self.assertEqual(self.product_1.with_context(warehouse=self.warehouse_1.id).qty_available, 100)
self.product_1.invalidate_cache(fnames=['qty_available'], ids=self.product_1.ids)
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)], skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_1)
self.assertFalse(planner.planning_option_ids[0].sub_options)
def test_22_planner_creation(self):
"""
Scenario where only one warehouse has inventory on the order line.
This is "the further" warehouse.
"""
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo',
})
self.assertEqual(self.product_2.with_context(warehouse=self.warehouse_1.id).qty_available, 0)
self.product_2.invalidate_cache(fnames=['qty_available'], ids=self.product_2.ids)
self.assertEqual(self.product_2.with_context(warehouse=self.warehouse_2.id).qty_available, 100)
self.product_2.invalidate_cache(fnames=['qty_available'], ids=self.product_2.ids)
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)], skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_2)
self.assertFalse(planner.planning_option_ids[0].sub_options)
def test_31_planner_creation_split(self):
"""
Scenario where only one warehouse has inventory on each of the order line.
This will cause two pickings to be created, one for each warehouse.
"""
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_1.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo2',
})
self.assertEqual(self.product_1.with_context(warehouse=self.warehouse_1.id).qty_available, 100)
self.assertEqual(self.product_2.with_context(warehouse=self.warehouse_2.id).qty_available, 100)
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)], skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertTrue(planner.planning_option_ids[0].sub_options)
def test_32_planner_creation_no_split(self):
"""
Scenario where only "the further" warehouse has inventory on whole order, but
the "closest" warehouse only has inventory on one item.
This will simply plan out of the "the further" warehouse.
"""
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_both.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo',
})
self.assertEqual(self.product_both.with_context(warehouse=self.warehouse_2.id).qty_available, 100)
self.assertEqual(self.product_2.with_context(warehouse=self.warehouse_2.id).qty_available, 100)
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)], skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_2)
self.assertFalse(planner.planning_option_ids[0].sub_options)
def test_42_policy_force_closest(self):
"""
Scenario where an item may not be in stock at "the closest" warehouse, but an item is only allowed
to come from "the closest" warehouse.
"""
self.product_2.property_planning_policy_id = self.policy_closest
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_both.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo',
})
self.assertEqual(self.product_both.with_context(warehouse=self.warehouse_1.id).qty_available, 100)
# Close warehouse doesn't have product.
self.assertEqual(self.product_2.with_context(warehouse=self.warehouse_1.id).qty_available, 0)
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)],
skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_1)
self.assertFalse(planner.planning_option_ids[0].sub_options)
def test_43_policy_merge(self):
"""
Scenario that will make a complicated scenario specifically:
- 3 policy groups
- 2 base options with sub_options (all base options with same warehouse)
"""
self.product_both.property_planning_policy_id = self.policy_closest
self.product_12.property_planning_policy_id = self.policy_other
self.product_22.property_planning_policy_id = self.policy_other
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_both.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_1.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_12.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_22.id,
'name': 'demo',
})
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)],
skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_1)
self.assertTrue(planner.planning_option_ids.sub_options)
sub_options = json_decode(planner.planning_option_ids.sub_options)
_logger.error(sub_options)
wh_1_ids = sorted([self.product_both.id, self.product_1.id, self.product_12.id])
wh_2_ids = sorted([self.product_2.id, self.product_22.id])
self.assertEqual(sorted(sub_options[str(self.warehouse_1.id)]['product_ids']), wh_1_ids)
self.assertEqual(sorted(sub_options[str(self.warehouse_2.id)]['product_ids']), wh_2_ids)
def test_44_policy_merge_2(self):
"""
Scenario that will make a complicated scenario specifically:
- 3 policy groups
- 2 base options from different warehouses
"""
self.product_both.property_planning_policy_id = self.policy_other
self.product_12.property_planning_policy_id = self.policy_closest
self.product_22.property_planning_policy_id = self.policy_other
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_both.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_1.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_12.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_22.id,
'name': 'demo',
})
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)],
skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_2)
self.assertTrue(planner.planning_option_ids.sub_options)
sub_options = json_decode(planner.planning_option_ids.sub_options)
_logger.error(sub_options)
wh_1_ids = sorted([self.product_1.id, self.product_12.id])
wh_2_ids = sorted([self.product_both.id, self.product_2.id, self.product_22.id])
self.assertEqual(sorted(sub_options[str(self.warehouse_1.id)]['product_ids']), wh_1_ids)
self.assertEqual(sorted(sub_options[str(self.warehouse_2.id)]['product_ids']), wh_2_ids)
def test_45_policy_merge_3(self):
"""
Different order of products for test_44
- 3 policy groups
- 2 base options from different warehouses
"""
self.product_both.property_planning_policy_id = self.policy_other
self.product_12.property_planning_policy_id = self.policy_closest
self.product_22.property_planning_policy_id = self.policy_other
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_1.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_2.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_12.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_22.id,
'name': 'demo',
})
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_both.id,
'name': 'demo',
})
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)],
skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_1)
self.assertTrue(planner.planning_option_ids.sub_options)
sub_options = json_decode(planner.planning_option_ids.sub_options)
_logger.error(sub_options)
wh_1_ids = sorted([self.product_1.id, self.product_12.id])
wh_2_ids = sorted([self.product_both.id, self.product_2.id, self.product_22.id])
self.assertEqual(sorted(sub_options[str(self.warehouse_1.id)]['product_ids']), wh_1_ids)
self.assertEqual(sorted(sub_options[str(self.warehouse_2.id)]['product_ids']), wh_2_ids)
def test_51_policy_specific_warehouse(self):
"""
Force one item to TWH2.
"""
self.product_both.property_planning_policy_id = self.policy_wh_2
self.env['sale.order.line'].create({
'order_id': self.so.id,
'product_id': self.product_both.id,
'name': 'demo',
})
both_wh_ids = self.both_wh_ids()
planner = self.env['sale.order.make.plan'].with_context(warehouse_domain=[('id', 'in', both_wh_ids)],
skip_plan_shipping=True).create({'order_id': self.so.id})
self.assertTrue(planner.planning_option_ids, 'Must have one or more plans.')
self.assertEqual(planner.planning_option_ids.warehouse_id, self.warehouse_2)

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo>
<record id="view_delivery_carrier_form_calendar" model="ir.ui.view">
<field name="name">delivery.carrier.form.calendar</field>
<field name="model">delivery.carrier</field>
<field name="inherit_id" ref="delivery.view_delivery_carrier_form" />
<field name="arch" type="xml">
<xpath expr="//field[@name='integration_level']" position="after">
<field name="delivery_calendar_id" />
</xpath>
</field>
</record>
</odoo>

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo>
<record id="product_template_form_view_inherit" model="ir.ui.view">
<field name="name">product.template.common.form.inherit</field>
<field name="model">product.template</field>
<field name="inherit_id" ref="product.product_template_form_view" />
<field name="arch" type="xml">
<xpath expr="//field[@name='categ_id']" position="after">
<field name="property_planning_policy_id"/>
</xpath>
</field>
</record>
<record id="product_category_form_view_inherit" model="ir.ui.view">
<field name="name">product.category.form.inherit</field>
<field name="model">product.category</field>
<field name="inherit_id" ref="product.product_category_form_view" />
<field name="arch" type="xml">
<xpath expr="//field[@name='parent_id']" position="after">
<field name="property_planning_policy_categ_id"/>
</xpath>
</field>
</record>
</odoo>

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo>
<record id="view_order_form_planner" model="ir.ui.view">
<field name="name">sale.order.form.planner</field>
<field name="model">sale.order</field>
<field name="inherit_id" ref="sale.view_order_form" />
<field name="arch" type="xml">
<xpath expr="//header/button[@name='action_confirm']" position="before">
<button name="action_planorder"
type="object"
attrs="{'invisible': [('state', 'not in', ('draft'))]}"
string="Plan"
class="oe_highlight"/>
</xpath>
</field>
</record>
</odoo>

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo>
<record id="view_warehouse_shipping_calendar" model="ir.ui.view">
<field name="name">stock.warehouse.shipping.calendar</field>
<field name="model">stock.warehouse</field>
<field name="inherit_id" ref="stock.view_warehouse" />
<field name="arch" type="xml">
<xpath expr="//field[@name='partner_id']" position="after">
<field name="shipping_calendar_id" />
</xpath>
</field>
</record>
</odoo>

View File

@@ -0,0 +1 @@
from . import order_planner

View File

@@ -0,0 +1,848 @@
from math import sin, cos, sqrt, atan2, radians
from json import dumps, loads
from copy import deepcopy
from datetime import datetime, timedelta
from collections import defaultdict
from logging import getLogger
_logger = getLogger(__name__)
try:
from uszipcode import SearchEngine
except ImportError:
_logger.warn('module "uszipcode" cannot be loaded, falling back to Google API')
SearchEngine = None
from odoo import api, fields, models
from odoo.tools.safe_eval import safe_eval
class FakeCollection():
def __init__(self, vals):
self.vals = vals
def __iter__(self):
for v in self.vals:
yield v
def filtered(self, f):
return filter(f, self.vals)
def sudo(self, *args, **kwargs):
return self
class FakePartner(FakeCollection):
def __init__(self, **kwargs):
"""
'delivery.carrier'.verify_carrier(contact) ->
country_id,
state_id,
zip
company
city,
`distance calculations` ->
date_localization,
partner_latitude,
partner_longitude
computes them when accessed
"""
self.partner_latitude = 0.0
self.partner_longitude = 0.0
self.is_company = False
self._date_localization = kwargs.pop('date_localization', False)
if not kwargs.pop('PARENT', False):
self.parent_id = FakePartner(PARENT=True)
for attr, value in kwargs.items():
setattr(self, attr, value)
@property
def date_localization(self):
if not self._date_localization:
try:
self._date_localization = 'TODAY!'
# The fast way.
if SearchEngine and self.zip:
with SearchEngine() as search:
zipcode = search.by_zipcode(str(self.zip).split('-')[0])
if zipcode and zipcode.lat:
self.partner_latitude = zipcode.lat
self.partner_longitude = zipcode.lng
return self._date_localization
# The slow way.
geo_obj = self.env['base.geocoder']
search = geo_obj.geo_query_address(city=self.city, state=self.state_id.name, country=self.country_id.name)
result = geo_obj.geo_find(search, force_country=self.country_id.id)
if result:
self.partner_latitude = result[0]
self.partner_longitude = result[1]
except:
self._date_localization = 'ERROR'
return self._date_localization
def __getattr__(self, item):
return False
def __getitem__(self, item):
if item == '__last_update':
return str(datetime.now())
return getattr(self, item)
class FakeOrderLine(FakeCollection):
def __init__(self, **kwargs):
"""
'delivery.carrier'.get_price_available(order) ->
state,
is_delivery,
product_uom._compute_quantity,
product_uom_qty,
product_id
price_total
"""
self.state = 'draft'
self.is_delivery = False
self.product_uom = self
for attr, value in kwargs.items():
setattr(self, attr, value)
def _compute_quantity(self, qty=1, uom=None):
"""
This is a non-implementation for when someone wants to call product_uom._compute_quantity
:param qty:
:param uom:
:return:
"""
return qty
def __getattr__(self, item):
return False
def __getitem__(self, item):
if item == '__last_update':
return str(datetime.now())
return getattr(self, item)
class FakeSaleOrder(FakeCollection):
"""
partner_id :: used in shipping
partner_shipping_id :: is used in several places
order_line :: can be a FakeCollection of FakeOrderLine's or Odoo 'sale.order.line'
carrier_id :: can be empty, will be overwritten when walking through carriers
'delivery.carrier'.get_shipping_price_from_so(orders) ->
id, (int)
name, (String)
currency_id, (Odoo 'res.currency')
company_id, (Odoo 'res.company')
warehouse_id, (Odoo 'stock.warehouse')
carrier_id, (Odoo 'delivery.carrier')
SaleOrderMakePlan.generate_shipping_options() ->
pricelist_id, (Odoo 'product.pricelist')
"""
def __init__(self, **kwargs):
self.carrier_id = None
self.id = 0
self.name = 'Quote'
self.team_id = None
self.analytic_account_id = None
self.amount_total = 0.0
self.date_order = fields.Date.today()
self.shipping_account_id = False # from delivery_hibou
self.ups_service_type = False # Added in 12
for attr, value in kwargs.items():
setattr(self, attr, value)
def __iter__(self):
"""
Emulate a recordset of a single order.
"""
yield self
def _compute_amount_total_without_delivery(self):
return self.amount_total
def __getattr__(self, item):
return False
def __getitem__(self, item):
if item == '__last_update':
return str(datetime.now())
return getattr(self, item)
def distance(lat_1, lon_1, lat_2, lon_2):
R = 6373.0
lat1 = radians(lat_1)
lon1 = radians(lon_1)
lat2 = radians(lat_2)
lon2 = radians(lon_2)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
class SaleOrderMakePlan(models.TransientModel):
_name = 'sale.order.make.plan'
_description = 'Plan Order'
order_id = fields.Many2one(
'sale.order', 'Sale Order',
)
planning_option_ids = fields.One2many('sale.order.planning.option', 'plan_id', 'Options')
@api.model
def plan_order(self, vals):
pass
def select_option(self, option):
for plan in self:
self.plan_order_option(plan.order_id, option)
def _order_fields_for_option(self, option):
return {
'warehouse_id': option.warehouse_id.id,
'requested_date': option.requested_date,
'date_planned': option.date_planned,
'carrier_id': option.carrier_id.id,
}
@api.model
def plan_order_option(self, order, option):
if option.sub_options:
sub_options = option.sub_options
if isinstance(sub_options, str):
sub_options = loads(sub_options)
if not isinstance(sub_options, dict):
_logger.warn('Cannot apply option with corrupt sub_options')
return False
order_lines = order.order_line
for wh_id, wh_vals in sub_options.items():
wh_id = int(wh_id)
if wh_id == option.warehouse_id.id:
continue
order_lines.filtered(lambda line: line.product_id.id in wh_vals['product_ids']).write({
'warehouse_id': wh_id,
'date_planned': wh_vals.get('date_planned'),
})
order_fields = self._order_fields_for_option(option)
order.write(order_fields)
if option.carrier_id:
order._create_delivery_line(option.carrier_id, option.shipping_price)
@api.model
def create(self, values):
planner = super(SaleOrderMakePlan, self).create(values)
for option_vals in self.generate_order_options(planner.order_id):
if type(option_vals) != dict:
continue
option_vals['plan_id'] = planner.id
planner.planning_option_ids |= self.env['sale.order.planning.option'].create(option_vals)
return planner
def _fake_order(self, order):
return FakeSaleOrder(**{
'id': order.id,
'name': order.name,
'partner_id': order.partner_id,
'partner_shipping_id': order.partner_shipping_id,
'order_line': order.order_line,
'currency_id': order.currency_id,
'company_id': order.company_id,
'warehouse_id': order.warehouse_id,
'amount_total': order.amount_total,
'pricelist_id': order.pricelist_id,
'env': self.env,
})
@api.model
def generate_order_options(self, order, plan_shipping=True):
fake_order = self._fake_order(order)
base_option = self.generate_base_option(fake_order)
# do we need shipping?
# we need to collect it because we want multi-warehouse shipping amounts.
if order.carrier_id:
base_option['carrier_id'] = order.carrier_id.id
if plan_shipping and not self.env.context.get('skip_plan_shipping'):
if base_option.get('date_planned'):
fake_order.date_order = base_option['date_planned']
options = self.generate_shipping_options(base_option, fake_order)
else:
options = [base_option]
return options
def get_warehouses(self, warehouse_id=None, domain=None):
warehouse = self.env['stock.warehouse'].sudo()
if warehouse_id:
return warehouse.browse(warehouse_id)
if domain:
if not isinstance(domain, (list, tuple)):
domain = safe_eval(domain)
else:
domain = []
if 'allowed_company_ids' in self.env.context:
domain.append(('company_id', 'in', self.env.context['allowed_company_ids']))
if self.env.context.get('warehouse_domain'):
domain.extend(self.env.context.get('warehouse_domain'))
irconfig_parameter = self.env['ir.config_parameter'].sudo()
if irconfig_parameter.get_param('sale.order.planner.warehouse_domain'):
domain.extend(safe_eval(irconfig_parameter.get_param('sale.order.planner.warehouse_domain')))
return warehouse.search(domain)
def get_shipping_carriers(self, carrier_id=None, domain=None):
Carrier = self.env['delivery.carrier'].sudo()
if carrier_id:
return Carrier.browse(carrier_id)
if domain:
if not isinstance(domain, (list, tuple)):
domain = safe_eval(domain)
else:
domain = []
if self.env.context.get('carrier_domain'):
# potential bug here if this is textual
domain.extend(self.env.context.get('carrier_domain'))
irconfig_parameter = self.env['ir.config_parameter'].sudo()
if irconfig_parameter.get_param('sale.order.planner.carrier_domain'):
domain.extend(safe_eval(irconfig_parameter.get_param('sale.order.planner.carrier_domain')))
return Carrier.search(domain)
def _generate_base_option(self, order_fake, policy_group):
flag_force_closest = False
warehouse_domain = False
if 'policy' in policy_group:
policy = policy_group['policy']
flag_force_closest = policy.always_closest_warehouse
warehouse_domain = policy.warehouse_filter_id.domain
# Need to look at warehouse filter.
# Eventually need to look at shipping filter....
warehouses = self.get_warehouses(domain=warehouse_domain)
if flag_force_closest:
warehouses = self._find_closest_warehouse_by_partner(warehouses, order_fake.partner_shipping_id)
product_stock = self._fetch_product_stock(warehouses, policy_group['products'])
sub_options = {}
wh_date_planning = {}
p_len = len(policy_group['products'])
full_candidates = set()
partial_candidates = set()
for wh_id, stock in product_stock.items():
available = sum(1 for p_id, p_vals in stock.items() if self._is_in_stock(p_vals, policy_group['buy_qty'][p_id]))
if available == p_len:
full_candidates.add(wh_id)
elif available > 0:
partial_candidates.add(wh_id)
if full_candidates:
if len(full_candidates) == 1:
warehouse = warehouses.filtered(lambda wh: wh.id in full_candidates)
else:
warehouse = self._find_closest_warehouse_by_partner(
warehouses.filtered(lambda wh: wh.id in full_candidates), order_fake.partner_shipping_id)
date_planned = self._next_warehouse_shipping_date(warehouse)
#order_fake.warehouse_id = warehouse
return {'warehouse_id': warehouse.id, 'date_planned': date_planned}
_logger.error(' partial_candidates: ' + str(partial_candidates))
if partial_candidates:
_logger.error(' using...')
if len(partial_candidates) == 1:
warehouse = warehouses.filtered(lambda wh: wh.id in partial_candidates)
#order_fake.warehouse_id = warehouse
return {'warehouse_id': warehouse.id}
sorted_warehouses = self._sort_warehouses_by_partner(
warehouses.filtered(lambda wh: wh.id in partial_candidates), order_fake.partner_shipping_id)
_logger.error(' sorted_warehouses: ' + str(sorted_warehouses) + ' warehouses: ' + str(warehouses))
primary_wh = sorted_warehouses[0] # partial_candidates means there is at least one warehouse
primary_wh_date_planned = self._next_warehouse_shipping_date(primary_wh)
wh_date_planning[primary_wh.id] = primary_wh_date_planned
for wh in sorted_warehouses:
_logger.error(' wh: ' + str(wh) + ' buy_qty: ' + str(policy_group['buy_qty']))
if not policy_group['buy_qty']:
continue
stock = product_stock[wh.id]
for p_id, p_vals in stock.items():
_logger.error(' p_id: ' + str(p_id) + ' p_vals: ' + str(p_vals))
if p_id in policy_group['buy_qty'] and self._is_in_stock(p_vals, policy_group['buy_qty'][p_id]):
if wh.id not in sub_options:
sub_options[wh.id] = {
'date_planned': self._next_warehouse_shipping_date(wh),
'product_ids': [],
'product_skus': [],
}
sub_options[wh.id]['product_ids'].append(p_id)
sub_options[wh.id]['product_skus'].append(p_vals['sku'])
_logger.error(' removing: ' + str(p_id))
del policy_group['buy_qty'][p_id]
if not policy_group['buy_qty']:
# item_details can fulfil all items.
# this is good!!
#order_fake.warehouse_id = primary_wh
return {'warehouse_id': primary_wh.id, 'date_planned': primary_wh_date_planned,
'sub_options': sub_options}
# warehouses cannot fulfil all requested items!!
#order_fake.warehouse_id = primary_wh
return {'warehouse_id': primary_wh.id}
# nobody has stock!
primary_wh = self._find_closest_warehouse_by_partner(warehouses, order_fake.partner_shipping_id)
return {'warehouse_id': primary_wh.id}
def generate_base_option(self, order_fake):
_logger.error('generate_base_option:')
__start_date = datetime.now() - timedelta(days=30)
product_lines = list(filter(lambda line: line.product_id.type == 'product', order_fake.order_line))
if not product_lines:
return {}
buy_qty = defaultdict(int)
for line in product_lines:
buy_qty[line.product_id.id] += line.product_uom_qty
products = self.env['product.product']
for line in product_lines:
products |= line.product_id
policy_groups = defaultdict(lambda: {'products': [], 'buy_qty': {}})
for p in products:
policy = p.product_tmpl_id.get_planning_policy()
if policy:
policy_groups[policy.id]['products'].append(p)
policy_groups[policy.id]['buy_qty'][p.id] = buy_qty[p.id]
policy_groups[policy.id]['policy'] = policy
else:
policy_groups[0]['products'].append(p)
policy_groups[0]['buy_qty'][p.id] = buy_qty[p.id]
for _, policy_group in policy_groups.items():
product_set = self.env['product.product'].browse()
for p in policy_group['products']:
product_set += p
policy_group['products'] = product_set
policy_group['base_option'] = self._generate_base_option(order_fake, policy_group)
option_policy_groups = defaultdict(lambda: {'products': self.env['product.product'].browse(), 'policies': self.env['sale.order.planning.policy'].browse(), 'date_planned': __start_date, 'sub_options': [],})
for policy_id, policy_group in policy_groups.items():
base_option = policy_group['base_option']
_logger.error(' base_option: ' + str(base_option))
b_wh_id = base_option['warehouse_id']
if 'policy' in policy_group:
option_policy_groups[b_wh_id]['policies'] += policy_group['policy']
if option_policy_groups[b_wh_id].get('date_planned'):
# The first base_option without a date clears it
if base_option.get('date_planned'):
if base_option['date_planned'] > option_policy_groups[b_wh_id]['date_planned']:
option_policy_groups[b_wh_id]['date_planned'] = base_option['date_planned']
else:
# One of our options has no plan date. Remove it.
del option_policy_groups[b_wh_id]['date_planned']
if 'sub_options' in base_option:
option_policy_groups[b_wh_id]['sub_options'].append(base_option['sub_options'])
option_policy_groups[b_wh_id]['products'] += policy_group['products']
option_policy_groups[b_wh_id]['warehouse_id'] = b_wh_id
# clean up unused sub_options and collapse used ones
for o_wh_id, option_group in option_policy_groups.items():
if not option_group['sub_options']:
del option_group['sub_options']
else:
sub_options = defaultdict(lambda: {'date_planned': __start_date, 'product_ids': [], 'product_skus': []})
remaining_products = option_group['products']
for options in option_group['sub_options']:
for wh_id, option in options.items():
if sub_options[wh_id].get('date_planned'):
# The first option without a date clears it
if option.get('date_planned'):
if option['date_planned'] > sub_options[wh_id]['date_planned']:
sub_options[wh_id]['date_planned'] = option['date_planned']
else:
del sub_options[wh_id]['date_planned']
sub_options[wh_id]['product_ids'] += option['product_ids']
sub_options[wh_id]['product_skus'] += option['product_skus']
remaining_products = remaining_products.filtered(lambda p: p.id not in sub_options[wh_id]['product_ids'])
option_group['sub_options'] = sub_options
if remaining_products:
option_group['sub_options'][o_wh_id]['product_ids'] += remaining_products.ids
option_group['sub_options'][o_wh_id]['product_skus'] += remaining_products.mapped('default_code')
# At this point we should have all of the policy options collapsed.
# Collapse warehouse options.
base_option = {'date_planned': __start_date, 'products': self.env['product.product'].browse()}
for wh_id, intermediate_option in option_policy_groups.items():
_logger.error(' base_option: ' + str(base_option))
_logger.error(' intermediate_option: ' + str(intermediate_option))
if 'warehouse_id' not in base_option:
base_option['warehouse_id'] = wh_id
b_wh_id = base_option['warehouse_id']
if base_option.get('date_planned'):
if intermediate_option.get('date_planned'):
if intermediate_option['date_planned'] > base_option['date_planned']:
base_option['date_planned'] = intermediate_option['date_planned']
else:
del base_option['date_planned']
if 'sub_options' in base_option:
for _, option in base_option['sub_options'].items():
del option['date_planned']
if b_wh_id == wh_id:
if 'sub_options' in intermediate_option and 'sub_options' not in base_option:
# Base option will get new sub_options
intermediate_option['sub_options'][wh_id]['product_ids'] += base_option['products'].ids
intermediate_option['sub_options'][wh_id]['product_skus'] += base_option['products'].mapped('default_code')
base_option['sub_options'] = intermediate_option['sub_options']
elif 'sub_options' in intermediate_option and 'sub_options' in base_option:
# Both have sub_options, merge
for o_wh_id, option in intermediate_option['sub_options'].items():
if o_wh_id not in base_option['sub_options']:
base_option['sub_options'][o_wh_id] = option
else:
base_option['sub_options'][o_wh_id]['product_ids'] += option['product_ids']
base_option['sub_options'][o_wh_id]['product_skus'] += option['product_skus']
if base_option.get('date_planned'):
if option['date_planned'] > base_option['sub_options'][o_wh_id]['date_planned']:
base_option['sub_options'][o_wh_id]['date_planned'] = intermediate_option['date_planned']
elif 'sub_options' in base_option:
# merge products from intermediate into base_option's sub_options
base_option['sub_options'][wh_id]['product_ids'] += intermediate_option['products'].ids
base_option['sub_options'][wh_id]['product_skus'] += intermediate_option['products'].mapped('default_code')
base_option['products'] += intermediate_option['products']
else:
# Promote
if 'sub_options' not in intermediate_option and 'sub_options' not in base_option:
base_option['sub_options'] = {
wh_id: {
'product_ids': intermediate_option['products'].ids,
'product_skus': intermediate_option['products'].mapped('default_code'),
},
b_wh_id: {
'product_ids': base_option['products'].ids,
'product_skus': base_option['products'].mapped('default_code'),
},
}
if base_option.get('date_planned'):
base_option['sub_options'][wh_id]['date_planned'] = intermediate_option['date_planned']
base_option['sub_options'][b_wh_id]['date_planned'] = base_option['date_planned']
elif 'sub_options' in base_option and 'sub_options' not in intermediate_option:
if wh_id not in base_option['sub_options']:
base_option['sub_options'][wh_id] = {
'product_ids': intermediate_option['products'].ids,
'product_skus': intermediate_option['products'].mapped('default_code'),
}
if base_option.get('date_planned'):
base_option['sub_options'][wh_id]['date_planned'] = intermediate_option['date_planned']
else:
base_option['sub_options'][wh_id]['product_ids'] += intermediate_option['products'].ids
base_option['sub_options'][wh_id]['product_skus'] += intermediate_option['products'].mapped('default_code')
if base_option.get('date_planned'):
if intermediate_option['date_planned'] > base_option['sub_options'][wh_id]['date_planned']:
base_option['sub_options'][wh_id]['date_planned'] = intermediate_option['date_planned']
elif 'sub_options' in intermediate_option and 'sub_options' in base_option:
# Both have sub_options, merge
for o_wh_id, option in intermediate_option['sub_options'].items():
if o_wh_id not in base_option['sub_options']:
base_option['sub_options'][o_wh_id] = option
else:
base_option['sub_options'][o_wh_id]['product_ids'] += option['product_ids']
base_option['sub_options'][o_wh_id]['product_skus'] += option['product_skus']
if base_option.get('date_planned'):
if option['date_planned'] > base_option['sub_options'][o_wh_id]['date_planned']:
base_option['sub_options'][o_wh_id]['date_planned'] = intermediate_option['date_planned']
else:
# intermediate_option has sub_options but base_option doesn't
base_option['sub_options'] = {
b_wh_id: {
'product_ids': base_option['products'].ids,
'product_skus': base_option['products'].mapped('default_code'),
}
}
if base_option.get('date_planned'):
base_option['sub_options'][b_wh_id]['date_planned'] = base_option['date_planned']
for o_wh_id, option in intermediate_option['sub_options'].items():
if o_wh_id not in base_option['sub_options']:
base_option['sub_options'][o_wh_id] = option
else:
base_option['sub_options'][o_wh_id]['product_ids'] += option['product_ids']
base_option['sub_options'][o_wh_id]['product_skus'] += option['product_skus']
if base_option.get('date_planned'):
if option['date_planned'] > base_option['sub_options'][o_wh_id]['date_planned']:
base_option['sub_options'][o_wh_id]['date_planned'] = intermediate_option['date_planned']
del base_option['products']
_logger.error(' returning: ' + str(base_option))
order_fake.warehouse_id = self.get_warehouses(warehouse_id=base_option['warehouse_id'])
return base_option
def _is_in_stock(self, p_stock, buy_qty):
return p_stock['type'] == 'consu' or p_stock['qty_available'] >= buy_qty
def _find_closest_warehouse_by_partner(self, warehouses, partner):
if not partner.date_localization:
partner.geo_localize()
return self._find_closest_warehouse(warehouses, partner.partner_latitude, partner.partner_longitude)
def _find_closest_warehouse(self, warehouses, latitude, longitude):
distances = {distance(latitude, longitude, wh.partner_id.partner_latitude, wh.partner_id.partner_longitude): wh.id for wh in warehouses}
wh_id = distances[min(distances)]
return warehouses.filtered(lambda wh: wh.id == wh_id)
def _sort_warehouses_by_partner(self, warehouses, partner):
if not partner.date_localization:
partner.geo_localize()
return self._sort_warehouses(warehouses, partner.partner_latitude, partner.partner_longitude)
def _sort_warehouses(self, warehouses, latitude, longitude):
distances = {distance(latitude, longitude, wh.partner_id.partner_latitude, wh.partner_id.partner_longitude): wh.id for wh in warehouses}
wh_distances = sorted(distances)
return [warehouses.filtered(lambda wh: wh.id == distances[d]) for d in wh_distances]
def _next_warehouse_shipping_date(self, warehouse):
if warehouse.shipping_calendar_id:
return warehouse.shipping_calendar_id.plan_days_end(0, fields.Datetime.now(), compute_leaves=True)
return False
@api.model
def _fetch_product_stock(self, warehouses, products):
output = {}
for wh in warehouses:
products.invalidate_cache(fnames=['qty_available', 'virtual_available', 'incoming_qty', 'outgoing_qty'],
ids=products.ids)
products = products.with_context({'location': wh.lot_stock_id.id})
output[wh.id] = {
p.id: {
'qty_available': p.qty_available,
'virtual_available': p.virtual_available,
'incoming_qty': p.incoming_qty,
'outgoing_qty': p.outgoing_qty,
'sku': p.default_code or str(p.id),
'type': p.type,
} for p in products}
return output
def generate_shipping_options(self, base_option, order_fake):
# generate a carrier_id, amount, requested_date (promise date)
# if base_option['carrier_id'] then that is the only carrier we want to collect rates for.
product_lines = list(filter(lambda line: line.product_id.type in ('product', 'consu'), order_fake.order_line))
domain = []
for line in product_lines:
policy = line.product_id.product_tmpl_id.get_planning_policy()
if policy and policy.carrier_filter_id:
domain.extend(safe_eval(policy.carrier_filter_id.domain))
carriers = self.get_shipping_carriers(base_option.get('carrier_id'), domain=domain)
_logger.info('generate_shipping_options:: base_option: ' + str(base_option) + ' order_fake: ' + str(order_fake) + ' carriers: ' + str(carriers))
if not carriers:
return base_option
if not base_option.get('sub_options'):
options = []
# this locic comes from "delivery.models.sale_order.SaleOrder"
for carrier in carriers:
option = self._generate_shipping_carrier_option(base_option, order_fake, carrier)
if option:
options.append(option)
if options:
return options
return [base_option]
else:
warehouses = self.get_warehouses()
original_order_fake_warehouse_id = order_fake.warehouse_id
original_order_fake_order_line = order_fake.order_line
options = []
for carrier in carriers:
new_base_option = deepcopy(base_option)
has_error = False
for wh_id, wh_vals in base_option['sub_options'].items():
if has_error:
continue
order_fake.warehouse_id = warehouses.filtered(lambda wh: wh.id == wh_id)
order_fake.order_line = FakeCollection(filter(lambda line: line.product_id.id in wh_vals['product_ids'], original_order_fake_order_line))
wh_option = self._generate_shipping_carrier_option(wh_vals, order_fake, carrier)
if not wh_option:
has_error = True
else:
new_base_option['sub_options'][wh_id] = wh_option
if has_error:
continue
# now that we've collected, we can roll up some details.
new_base_option['carrier_id'] = carrier.id
new_base_option['shipping_price'] = self._get_shipping_price_for_options(new_base_option['sub_options'])
new_base_option['requested_date'] = self._get_max_requested_date(new_base_option['sub_options'])
new_base_option['transit_days'] = self._get_max_transit_days(new_base_option['sub_options'])
options.append(new_base_option)
#restore values in case more processing occurs
order_fake.warehouse_id = original_order_fake_warehouse_id
order_fake.order_line = original_order_fake_order_line
if not options:
options.append(base_option)
return options
def _get_shipping_price_for_options(self, sub_options):
return sum(wh_option.get('shipping_price', 0.0) for wh_option in sub_options.values())
def _get_max_requested_date(self, sub_options):
max_requested_date = None
for option in sub_options.values():
requested_date = option.get('requested_date')
if requested_date and not max_requested_date:
max_requested_date = requested_date
elif requested_date:
if requested_date > max_requested_date:
max_requested_date = requested_date
return max_requested_date
def _get_max_transit_days(self, sub_options):
return max(wh_option.get('transit_days', 0) or 0 for wh_option in sub_options.values())
def _generate_shipping_carrier_option(self, base_option, order_fake, carrier):
# some carriers look at the order carrier_id
order_fake.carrier_id = carrier
# this logic comes from "delivery.models.sale_order.SaleOrder"
try:
result = None
date_delivered = None
transit_days = 0
if carrier.delivery_type not in ['fixed', 'base_on_rule']:
if hasattr(carrier, 'rate_shipment_date_planned'):
# New API
result = carrier.rate_shipment_date_planned(order_fake, base_option.get('date_planned'))
if result:
if not result.get('success'):
return None
price_unit, transit_days, date_delivered = result['price'], result.get('transit_days'), result.get('date_delivered')
elif hasattr(carrier, 'get_shipping_price_for_plan'):
# Old API
result = carrier.get_shipping_price_for_plan(order_fake, base_option.get('date_planned'))
if result and isinstance(result, list):
price_unit, transit_days, date_delivered = result[0]
elif not result:
rate = carrier.rate_shipment(order_fake)
if rate and rate.get('success'):
price_unit = rate['price']
if rate.get('transit_days'):
transit_days = rate.get('transit_days')
if rate.get('date_delivered'):
date_delivered = rate.get('date_delivered')
else:
_logger.warn('returning None because carrier: ' + str(carrier))
return None
else:
carrier = carrier.available_carriers(order_fake.partner_shipping_id)
if not carrier:
return None
res = carrier.rate_shipment(order_fake)
price_unit = res['price']
if order_fake.company_id.currency_id.id != order_fake.pricelist_id.currency_id.id:
price_unit = order_fake.company_id.currency_id.with_context(date=order_fake.date_order).compute(price_unit, order_fake.pricelist_id.currency_id)
final_price = float(price_unit) * (1.0 + (float(carrier.margin) / 100.0))
option = deepcopy(base_option)
option['carrier_id'] = carrier.id
option['shipping_price'] = final_price
option['requested_date'] = date_delivered
option['transit_days'] = transit_days
return option
except Exception as e:
_logger.info("Exception collecting carrier rates: " + str(e))
# Want to see more?
# _logger.exception(e)
return None
class SaleOrderPlanningOption(models.TransientModel):
_name = 'sale.order.planning.option'
_description = 'Order Planning Option'
def create(self, values):
def datetime_converter(o):
if isinstance(o, datetime):
return str(o)
if 'sub_options' in values and not isinstance(values['sub_options'], str):
for wh_id, option in values['sub_options'].items():
if option.get('date_planned'):
option['date_planned'] = str(option['date_planned'])
values['sub_options'] = dumps(values['sub_options'], default=datetime_converter)
return super(SaleOrderPlanningOption, self).create(values)
def _compute_sub_options_text(self):
for option in self:
sub_options = option.sub_options
if sub_options and not isinstance(sub_options, dict):
sub_options = loads(sub_options)
if not isinstance(sub_options, dict):
option.sub_options_text = ''
continue
line = ''
for wh_id, wh_option in sub_options.items():
product_skus = (str(s) for s in wh_option.get('product_skus', []))
date_planned = wh_option.get('date_planned') or ''
product_skus = ', '.join(product_skus)
requested_date = wh_option.get('requested_date', '') or ''
shipping_price = float(wh_option.get('shipping_price', 0.0) or 0)
transit_days = int(wh_option.get('transit_days', 0) or 0)
line += """WH %d :: %s
Date Planned: %s
Requested Date: %s
Transit Days: %d
Shipping Price: %.2f
""" % (int(wh_id), product_skus, date_planned, requested_date, transit_days, shipping_price)
option.sub_options_text = line
plan_id = fields.Many2one('sale.order.make.plan', 'Plan', ondelete='cascade')
warehouse_id = fields.Many2one('stock.warehouse', string='Warehouse')
date_planned = fields.Datetime('Planned Date')
requested_date = fields.Datetime('Requested Date')
carrier_id = fields.Many2one('delivery.carrier', string='Carrier')
transit_days = fields.Integer('Transit Days')
shipping_price = fields.Float('Shipping Price')
sub_options = fields.Text('Sub Options JSON')
sub_options_text = fields.Text('Sub Options', compute=_compute_sub_options_text)
def select_plan(self):
for option in self:
option.plan_id.select_option(option)
return

View File

@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo>
<record id="view_plan_sale_order" model="ir.ui.view">
<field name="name">view.plan.sale.order</field>
<field name="model">sale.order.make.plan</field>
<field name="type">form</field>
<field name="arch" type="xml">
<form>
<field name="planning_option_ids" readonly="1" force_save="1">
<tree>
<field name="warehouse_id" />
<field name="date_planned" />
<field name="requested_date" />
<field name="transit_days" />
<field name="carrier_id" />
<field name="shipping_price" />
<field name="sub_options_text" />
<button class="eo_highlight"
name="select_plan"
string="Select"
type="object" />
</tree>
</field>
<footer>
<button class="oe_link"
special="cancel"
string="Cancel" />
</footer>
</form>
</field>
</record>
<record id="action_plan_sale_order" model="ir.actions.act_window">
<field name="name">Plan Sale Order</field>
<field name="res_model">sale.order.make.plan</field>
<field name="view_mode">form</field>
<field name="view_id" ref="view_plan_sale_order" />
<field name="target">new</field>
</record>
</odoo>