[IMP] contract_queue_job: jobify renew method for crons

This commit is contained in:
nans
2021-06-08 20:50:41 +02:00
committed by OCA-git-bot
parent 994017e4d4
commit 0b428bab73
6 changed files with 58 additions and 1 deletions

View File

@@ -6,7 +6,7 @@
"summary": """
This addon make contract invoicing cron plan each contract in a job
instead of creating all invoices in one transaction""",
"version": "12.0.1.0.0",
"version": "12.0.1.1.0",
"license": "AGPL-3",
"author": "ACSONE SA/NV,"
"Odoo Community Association (OCA)",

View File

@@ -18,6 +18,11 @@ msgstr ""
msgid "Contract"
msgstr ""
#. module: contract_queue_job
#: model:ir.model,name:contract_queue_job.model_contract_line
msgid "Contract Line"
msgstr ""
#. module: contract_queue_job
#: model:ir.model,name:contract_queue_job.model_contract_manually_create_invoice
msgid "Contract Manually Create Invoice Wizard"

View File

@@ -1 +1,2 @@
from . import contract_contract
from . import contract_line

View File

@@ -0,0 +1,21 @@
# Copyright 2021 ACSONE SA/NV
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
from odoo.addons.queue_job.job import job
QUEUE_CHANNEL = "root.CONTRACT_LINE_RENEW"
class ContractLine(models.Model):
_inherit = "contract.line"
@job(default_channel=QUEUE_CHANNEL)
def renew(self):
if len(self) > 1:
for rec in self:
rec.with_delay().renew()
return self.env["contract.line"]
return super().renew()

View File

@@ -1 +1,2 @@
from . import test_contract_queue_job
from . import test_contract_line_queue_job

View File

@@ -0,0 +1,29 @@
# Copyright 2020 ACSONE SA/NV
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import fields
from odoo.addons.contract.tests.test_contract import TestContractBase
from odoo.addons.queue_job.tests.common import JobMixin
class TestContractLineQueueJob(TestContractBase, JobMixin):
@classmethod
def setUpClass(cls):
super(TestContractLineQueueJob, cls).setUpClass()
cls.contract3 = cls.contract2.copy()
def test_contract_renew_queue_job_1(self):
"""Only one line, task is run without delay"""
line = self.contract2.mapped("contract_line_ids")
line.date_end = fields.Date.today()
res = line.renew()
self.assertTrue(line.date_end < res.date_start)
def test_contract_renew_queue_job_2(self):
"""Two lines, two jobs are created."""
contracts = self.contract2 | self.contract3
lines = contracts.mapped("contract_line_ids")
job_counter = self.job_counter()
lines.renew()
self.assertEqual(job_counter.count_created(), len(lines))