mirror of
https://github.com/OCA/reporting-engine.git
synced 2025-02-16 16:30:38 +02:00
[IMP] report_qweb_signer: Black python code
This commit is contained in:
committed by
Omar (Comunitea)
parent
e9665dcfb3
commit
b2b51f0e4c
@@ -3,41 +3,43 @@
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||
|
||||
import base64
|
||||
from contextlib import closing
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
from contextlib import closing
|
||||
|
||||
from odoo import models, api, _
|
||||
from odoo.exceptions import UserError, AccessError
|
||||
from odoo import _, api, models
|
||||
from odoo.exceptions import AccessError, UserError
|
||||
from odoo.tools.safe_eval import safe_eval
|
||||
|
||||
import logging
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _normalize_filepath(path):
|
||||
path = path or ''
|
||||
path = path or ""
|
||||
path = path.strip()
|
||||
if not os.path.isabs(path):
|
||||
me = os.path.dirname(__file__)
|
||||
path = '{}/../static/certificate/'.format(me) + path
|
||||
path = "{}/../static/certificate/".format(me) + path
|
||||
path = os.path.normpath(path)
|
||||
return path if os.path.exists(path) else False
|
||||
|
||||
|
||||
class IrActionsReport(models.Model):
|
||||
_inherit = 'ir.actions.report'
|
||||
_inherit = "ir.actions.report"
|
||||
|
||||
def _certificate_get(self, res_ids):
|
||||
"""Obtain the proper certificate for the report and the conditions."""
|
||||
if self.report_type != 'qweb-pdf':
|
||||
if self.report_type != "qweb-pdf":
|
||||
return False
|
||||
certificates = self.env['report.certificate'].search([
|
||||
('company_id', '=', self.env.user.company_id.id),
|
||||
('model_id', '=', self.model),
|
||||
])
|
||||
certificates = self.env["report.certificate"].search(
|
||||
[
|
||||
("company_id", "=", self.env.user.company_id.id),
|
||||
("model_id", "=", self.model),
|
||||
]
|
||||
)
|
||||
if not certificates:
|
||||
return False
|
||||
for cert in certificates:
|
||||
@@ -46,16 +48,19 @@ class IrActionsReport(models.Model):
|
||||
_logger.debug(
|
||||
"Certificate '%s' allows only one document, "
|
||||
"but printing %d documents",
|
||||
cert.name, len(res_ids))
|
||||
cert.name,
|
||||
len(res_ids),
|
||||
)
|
||||
continue
|
||||
# Check domain
|
||||
if cert.domain:
|
||||
domain = [('id', 'in', tuple(res_ids))]
|
||||
domain = [("id", "in", tuple(res_ids))]
|
||||
domain = domain + safe_eval(cert.domain)
|
||||
docs = self.env[cert.model_id.model].search(domain)
|
||||
if not docs:
|
||||
_logger.debug(
|
||||
"Certificate '%s' domain not satisfied", cert.name)
|
||||
"Certificate '%s' domain not satisfied", cert.name
|
||||
)
|
||||
continue
|
||||
# Certificate match!
|
||||
return cert
|
||||
@@ -65,10 +70,7 @@ class IrActionsReport(models.Model):
|
||||
if len(res_ids) != 1:
|
||||
return False
|
||||
doc = self.env[certificate.model_id.model].browse(res_ids[0])
|
||||
return safe_eval(certificate.attachment, {
|
||||
'object': doc,
|
||||
'time': time
|
||||
})
|
||||
return safe_eval(certificate.attachment, {"object": doc, "time": time})
|
||||
|
||||
def _attach_signed_read(self, res_ids, certificate):
|
||||
if len(res_ids) != 1:
|
||||
@@ -76,11 +78,14 @@ class IrActionsReport(models.Model):
|
||||
filename = self._attach_filename_get(res_ids, certificate)
|
||||
if not filename:
|
||||
return False
|
||||
attachment = self.env['ir.attachment'].search([
|
||||
('datas_fname', '=', filename),
|
||||
('res_model', '=', certificate.model_id.model),
|
||||
('res_id', '=', res_ids[0]),
|
||||
], limit=1)
|
||||
attachment = self.env["ir.attachment"].search(
|
||||
[
|
||||
("datas_fname", "=", filename),
|
||||
("res_model", "=", certificate.model_id.model),
|
||||
("res_id", "=", res_ids[0]),
|
||||
],
|
||||
limit=1,
|
||||
)
|
||||
if attachment:
|
||||
return base64.decodestring(attachment.datas)
|
||||
return False
|
||||
@@ -92,45 +97,57 @@ class IrActionsReport(models.Model):
|
||||
if not filename:
|
||||
return False
|
||||
try:
|
||||
attachment = self.env['ir.attachment'].create({
|
||||
'name': filename,
|
||||
'datas': base64.encodestring(signed),
|
||||
'datas_fname': filename,
|
||||
'res_model': certificate.model_id.model,
|
||||
'res_id': res_ids[0],
|
||||
})
|
||||
attachment = self.env["ir.attachment"].create(
|
||||
{
|
||||
"name": filename,
|
||||
"datas": base64.encodestring(signed),
|
||||
"datas_fname": filename,
|
||||
"res_model": certificate.model_id.model,
|
||||
"res_id": res_ids[0],
|
||||
}
|
||||
)
|
||||
except AccessError:
|
||||
raise UserError(
|
||||
_('Saving signed report (PDF): '
|
||||
'You do not have enough access rights to save attachments'))
|
||||
_(
|
||||
"Saving signed report (PDF): "
|
||||
"You do not have enough access rights to save attachments"
|
||||
)
|
||||
)
|
||||
return attachment
|
||||
|
||||
def _signer_bin(self, opts):
|
||||
me = os.path.dirname(__file__)
|
||||
irc_param = self.env['ir.config_parameter'].sudo()
|
||||
java_bin = 'java -jar'
|
||||
java_param = irc_param.get_param('report_qweb_signer.java_parameters')
|
||||
jar = '{}/../static/jar/jPdfSign.jar'.format(me)
|
||||
return '%s %s %s %s' % (java_bin, java_param, jar, opts)
|
||||
irc_param = self.env["ir.config_parameter"].sudo()
|
||||
java_bin = "java -jar"
|
||||
java_param = irc_param.get_param("report_qweb_signer.java_parameters")
|
||||
jar = "{}/../static/jar/jPdfSign.jar".format(me)
|
||||
return "{} {} {} {}".format(java_bin, java_param, jar, opts)
|
||||
|
||||
def pdf_sign(self, pdf, certificate):
|
||||
pdfsigned = pdf + '.signed.pdf'
|
||||
pdfsigned = pdf + ".signed.pdf"
|
||||
p12 = _normalize_filepath(certificate.path)
|
||||
passwd = _normalize_filepath(certificate.password_file)
|
||||
if not (p12 and passwd):
|
||||
raise UserError(
|
||||
_('Signing report (PDF): '
|
||||
'Certificate or password file not found'))
|
||||
signer_opts = '"%s" "%s" "%s" "%s"' % (p12, pdf, pdfsigned, passwd)
|
||||
_(
|
||||
"Signing report (PDF): "
|
||||
"Certificate or password file not found"
|
||||
)
|
||||
)
|
||||
signer_opts = '"{}" "{}" "{}" "{}"'.format(p12, pdf, pdfsigned, passwd)
|
||||
signer = self._signer_bin(signer_opts)
|
||||
process = subprocess.Popen(
|
||||
signer, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
signer, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
|
||||
)
|
||||
out, err = process.communicate()
|
||||
if process.returncode:
|
||||
raise UserError(
|
||||
_('Signing report (PDF): jPdfSign failed (error code: %s). '
|
||||
'Message: %s. Output: %s') %
|
||||
(process.returncode, err, out))
|
||||
_(
|
||||
"Signing report (PDF): jPdfSign failed (error code: %s). "
|
||||
"Message: %s. Output: %s"
|
||||
)
|
||||
% (process.returncode, err, out)
|
||||
)
|
||||
return pdfsigned
|
||||
|
||||
@api.multi
|
||||
@@ -141,32 +158,36 @@ class IrActionsReport(models.Model):
|
||||
if signed_content:
|
||||
_logger.debug(
|
||||
"The signed PDF document '%s/%s' was loaded from the "
|
||||
"database", self.report_name, res_ids,
|
||||
"database",
|
||||
self.report_name,
|
||||
res_ids,
|
||||
)
|
||||
return signed_content, 'pdf'
|
||||
content, ext = super(IrActionsReport, self).render_qweb_pdf(res_ids,
|
||||
data)
|
||||
return signed_content, "pdf"
|
||||
content, ext = super(IrActionsReport, self).render_qweb_pdf(
|
||||
res_ids, data
|
||||
)
|
||||
if certificate:
|
||||
# Creating temporary origin PDF
|
||||
pdf_fd, pdf = tempfile.mkstemp(
|
||||
suffix='.pdf', prefix='report.tmp.')
|
||||
with closing(os.fdopen(pdf_fd, 'wb')) as pf:
|
||||
pdf_fd, pdf = tempfile.mkstemp(suffix=".pdf", prefix="report.tmp.")
|
||||
with closing(os.fdopen(pdf_fd, "wb")) as pf:
|
||||
pf.write(content)
|
||||
_logger.debug(
|
||||
"Signing PDF document '%s' for IDs %s with certificate '%s'",
|
||||
self.report_name, res_ids, certificate.name,
|
||||
self.report_name,
|
||||
res_ids,
|
||||
certificate.name,
|
||||
)
|
||||
signed = self.pdf_sign(pdf, certificate)
|
||||
# Read signed PDF
|
||||
if os.path.exists(signed):
|
||||
with open(signed, 'rb') as pf:
|
||||
with open(signed, "rb") as pf:
|
||||
content = pf.read()
|
||||
# Manual cleanup of the temporary files
|
||||
for fname in (pdf, signed):
|
||||
try:
|
||||
os.unlink(fname)
|
||||
except (OSError, IOError):
|
||||
_logger.error('Error when trying to remove file %s', fname)
|
||||
_logger.error("Error when trying to remove file %s", fname)
|
||||
if certificate.attachment:
|
||||
self._attach_signed_write(res_ids, certificate, content)
|
||||
return content, ext
|
||||
|
||||
@@ -5,38 +5,51 @@ from odoo import api, fields, models
|
||||
|
||||
|
||||
class ReportCertificate(models.Model):
|
||||
_name = 'report.certificate'
|
||||
_description = 'Report Certificate'
|
||||
_order = 'sequence,id'
|
||||
_name = "report.certificate"
|
||||
_description = "Report Certificate"
|
||||
_order = "sequence,id"
|
||||
|
||||
@api.model
|
||||
def _default_company(self):
|
||||
m_company = self.env['res.company']
|
||||
return m_company._company_default_get('report.certificate')
|
||||
m_company = self.env["res.company"]
|
||||
return m_company._company_default_get("report.certificate")
|
||||
|
||||
sequence = fields.Integer(default=10)
|
||||
name = fields.Char(required=True)
|
||||
path = fields.Char(
|
||||
string="Certificate file path", required=True,
|
||||
help="Path to PKCS#12 certificate file")
|
||||
string="Certificate file path",
|
||||
required=True,
|
||||
help="Path to PKCS#12 certificate file",
|
||||
)
|
||||
password_file = fields.Char(
|
||||
string="Password file path", required=True,
|
||||
help="Path to certificate password file")
|
||||
string="Password file path",
|
||||
required=True,
|
||||
help="Path to certificate password file",
|
||||
)
|
||||
model_id = fields.Many2one(
|
||||
string="Model", required=True,
|
||||
comodel_name='ir.model',
|
||||
help="Model where apply this certificate")
|
||||
string="Model",
|
||||
required=True,
|
||||
comodel_name="ir.model",
|
||||
help="Model where apply this certificate",
|
||||
)
|
||||
domain = fields.Char(
|
||||
string="Domain",
|
||||
help="Domain for filtering if sign or not the document")
|
||||
help="Domain for filtering if sign or not the document",
|
||||
)
|
||||
allow_only_one = fields.Boolean(
|
||||
string="Allow only one document", default=True,
|
||||
string="Allow only one document",
|
||||
default=True,
|
||||
help="If True, this certificate can not be useb to sign "
|
||||
"a PDF from several documents.")
|
||||
"a PDF from several documents.",
|
||||
)
|
||||
attachment = fields.Char(
|
||||
string="Save as attachment",
|
||||
help="Filename used to store signed document as attachment. "
|
||||
"Keep empty to not save signed document.")
|
||||
"Keep empty to not save signed document.",
|
||||
)
|
||||
company_id = fields.Many2one(
|
||||
string='Company', comodel_name='res.company',
|
||||
required=True, default=_default_company)
|
||||
string="Company",
|
||||
comodel_name="res.company",
|
||||
required=True,
|
||||
default=_default_company,
|
||||
)
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
# Copyright 2015 Tecnativa - Antonio Espinosa
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||
|
||||
from odoo import models, fields
|
||||
from odoo import fields, models
|
||||
|
||||
|
||||
class ResCompany(models.Model):
|
||||
_inherit = 'res.company'
|
||||
_inherit = "res.company"
|
||||
|
||||
report_certificate_ids = fields.One2many(
|
||||
string="PDF report certificates",
|
||||
comodel_name='report.certificate',
|
||||
inverse_name='company_id')
|
||||
comodel_name="report.certificate",
|
||||
inverse_name="company_id",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user