mirror of
https://github.com/OCA/reporting-engine.git
synced 2025-02-16 16:30:38 +02:00
[MIG] report_qweb_signer: Migration to 10.0
OCA Transbot updated translations from Transifex
This commit is contained in:
committed by
Omar (Comunitea)
parent
ba5030a5c1
commit
4ef450ae6b
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# © 2015 Antiun Ingenieria S.L. - Antonio Espinosa
|
||||
# Copyright 2015 Tecnativa - Antonio Espinosa
|
||||
# Copyright 2017 Tecnativa - Pedro M. Baeza
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||
|
||||
import base64
|
||||
@@ -9,9 +10,9 @@ import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from openerp import models, api, _
|
||||
from openerp.exceptions import Warning, AccessError
|
||||
from openerp.tools.safe_eval import safe_eval
|
||||
from odoo import models, api, _
|
||||
from odoo.exceptions import UserError, AccessError
|
||||
from odoo.tools.safe_eval import safe_eval
|
||||
|
||||
import logging
|
||||
_logger = logging.getLogger(__name__)
|
||||
@@ -30,99 +31,80 @@ def _normalize_filepath(path):
|
||||
class Report(models.Model):
|
||||
_inherit = 'report'
|
||||
|
||||
def _certificate_get(self, cr, uid, ids, report, context=None):
|
||||
def _certificate_get(self, report, docids):
|
||||
"""Obtain the proper certificate for the report and the conditions."""
|
||||
if report.report_type != 'qweb-pdf':
|
||||
_logger.info(
|
||||
"Can only sign qweb-pdf reports, this one is '%s' type",
|
||||
report.report_type)
|
||||
return False
|
||||
m_cert = self.pool['report.certificate']
|
||||
company_id = self.pool['res.users']._get_company(cr, uid)
|
||||
certificate_ids = m_cert.search(cr, uid, [
|
||||
('company_id', '=', company_id),
|
||||
('model_id', '=', report.model)], context=context)
|
||||
if not certificate_ids:
|
||||
_logger.info(
|
||||
"No PDF certificate found for report '%s'",
|
||||
report.report_name)
|
||||
certificates = self.env['report.certificate'].search([
|
||||
('company_id', '=', self.env.user.company_id.id),
|
||||
('model_id', '=', report.model),
|
||||
])
|
||||
if not certificates:
|
||||
return False
|
||||
for cert in m_cert.browse(cr, uid, certificate_ids, context=context):
|
||||
for cert in certificates:
|
||||
# Check allow only one document
|
||||
if cert.allow_only_one and len(ids) > 1:
|
||||
_logger.info(
|
||||
if cert.allow_only_one and len(self) > 1:
|
||||
_logger.debug(
|
||||
"Certificate '%s' allows only one document, "
|
||||
"but printing %d documents",
|
||||
cert.name, len(ids))
|
||||
cert.name, len(docids))
|
||||
continue
|
||||
# Check domain
|
||||
if cert.domain:
|
||||
m_model = self.pool[cert.model_id.model]
|
||||
domain = [('id', 'in', tuple(ids))]
|
||||
domain = [('id', 'in', tuple(docids))]
|
||||
domain = domain + safe_eval(cert.domain)
|
||||
doc_ids = m_model.search(cr, uid, domain, context=context)
|
||||
if not doc_ids:
|
||||
_logger.info(
|
||||
docs = self.env[cert.model_id.model].search(domain)
|
||||
if not docs:
|
||||
_logger.debug(
|
||||
"Certificate '%s' domain not satisfied", cert.name)
|
||||
continue
|
||||
# Certificate match!
|
||||
return cert
|
||||
return False
|
||||
|
||||
def _attach_filename_get(self, cr, uid, ids, certificate, context=None):
|
||||
if len(ids) != 1:
|
||||
def _attach_filename_get(self, docids, certificate):
|
||||
if len(docids) != 1:
|
||||
return False
|
||||
obj = self.pool[certificate.model_id.model].browse(cr, uid, ids[0])
|
||||
filename = safe_eval(certificate.attachment, {
|
||||
'object': obj,
|
||||
doc = self.env[certificate.model_id.model].browse(docids[0])
|
||||
return safe_eval(certificate.attachment, {
|
||||
'object': doc,
|
||||
'time': time
|
||||
})
|
||||
return filename
|
||||
|
||||
def _attach_signed_read(self, cr, uid, ids, certificate, context=None):
|
||||
if len(ids) != 1:
|
||||
def _attach_signed_read(self, docids, certificate):
|
||||
if len(docids) != 1:
|
||||
return False
|
||||
filename = self._attach_filename_get(
|
||||
cr, uid, ids, certificate, context=context)
|
||||
filename = self._attach_filename_get(docids, certificate)
|
||||
if not filename:
|
||||
return False
|
||||
signed = False
|
||||
m_attachment = self.pool['ir.attachment']
|
||||
attach_ids = m_attachment.search(cr, uid, [
|
||||
attachment = self.env['ir.attachment'].search([
|
||||
('datas_fname', '=', filename),
|
||||
('res_model', '=', certificate.model_id.model),
|
||||
('res_id', '=', ids[0])
|
||||
])
|
||||
if attach_ids:
|
||||
signed = m_attachment.browse(cr, uid, attach_ids[0]).datas
|
||||
signed = base64.decodestring(signed)
|
||||
return signed
|
||||
('res_id', '=', docids[0]),
|
||||
], limit=1)
|
||||
if attachment:
|
||||
return base64.decodestring(attachment.datas)
|
||||
return False
|
||||
|
||||
def _attach_signed_write(self, cr, uid, ids, certificate, signed,
|
||||
context=None):
|
||||
if len(ids) != 1:
|
||||
def _attach_signed_write(self, docids, certificate, signed):
|
||||
if len(docids) != 1:
|
||||
return False
|
||||
filename = self._attach_filename_get(
|
||||
cr, uid, ids, certificate, context=context)
|
||||
filename = self._attach_filename_get(docids, certificate)
|
||||
if not filename:
|
||||
return False
|
||||
m_attachment = self.pool['ir.attachment']
|
||||
try:
|
||||
attach_id = m_attachment.create(cr, uid, {
|
||||
attachment = self.env['ir.attachment'].create({
|
||||
'name': filename,
|
||||
'datas': base64.encodestring(signed),
|
||||
'datas_fname': filename,
|
||||
'res_model': certificate.model_id.model,
|
||||
'res_id': ids[0],
|
||||
'res_id': docids[0],
|
||||
})
|
||||
except AccessError:
|
||||
raise Warning(
|
||||
raise UserError(
|
||||
_('Saving signed report (PDF): '
|
||||
'You do not have enought access rights to save attachments'))
|
||||
else:
|
||||
_logger.info(
|
||||
"The signed PDF document '%s' is now saved in the database",
|
||||
filename)
|
||||
return attach_id
|
||||
'You do not have enough access rights to save attachments'))
|
||||
return attachment
|
||||
|
||||
def _signer_bin(self, opts):
|
||||
me = os.path.dirname(__file__)
|
||||
@@ -135,7 +117,7 @@ class Report(models.Model):
|
||||
p12 = _normalize_filepath(certificate.path)
|
||||
passwd = _normalize_filepath(certificate.password_file)
|
||||
if not (p12 and passwd):
|
||||
raise Warning(
|
||||
raise UserError(
|
||||
_('Signing report (PDF): '
|
||||
'Certificate or password file not found'))
|
||||
signer_opts = '"%s" "%s" "%s" "%s"' % (p12, pdf, pdfsigned, passwd)
|
||||
@@ -144,38 +126,37 @@ class Report(models.Model):
|
||||
signer, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
out, err = process.communicate()
|
||||
if process.returncode:
|
||||
raise Warning(
|
||||
raise UserError(
|
||||
_('Signing report (PDF): jPdfSign failed (error code: %s). '
|
||||
'Message: %s. Output: %s') %
|
||||
(process.returncode, err, out))
|
||||
return pdfsigned
|
||||
|
||||
@api.v7
|
||||
def get_pdf(self, cr, uid, ids, report_name, html=None, data=None,
|
||||
context=None):
|
||||
signed_content = False
|
||||
report = self._get_report_from_name(cr, uid, report_name)
|
||||
certificate = self._certificate_get(
|
||||
cr, uid, ids, report, context=context)
|
||||
@api.model
|
||||
def get_pdf(self, docids, report_name, html=None, data=None):
|
||||
report = self._get_report_from_name(report_name)
|
||||
certificate = self._certificate_get(report, docids)
|
||||
if certificate and certificate.attachment:
|
||||
signed_content = self._attach_signed_read(
|
||||
cr, uid, ids, certificate, context=context)
|
||||
signed_content = self._attach_signed_read(docids, certificate)
|
||||
if signed_content:
|
||||
_logger.info("The signed PDF document '%s/%s' was loaded from "
|
||||
"the database", report_name, ids)
|
||||
_logger.debug(
|
||||
"The signed PDF document '%s/%s' was loaded from the "
|
||||
"database", report_name, docids,
|
||||
)
|
||||
return signed_content
|
||||
content = super(Report, self).get_pdf(
|
||||
cr, uid, ids, report_name, html=html, data=data,
|
||||
context=context)
|
||||
docids, report_name, html=html, data=data,
|
||||
)
|
||||
if certificate:
|
||||
# Creating temporary origin PDF
|
||||
pdf_fd, pdf = tempfile.mkstemp(
|
||||
suffix='.pdf', prefix='report.tmp.')
|
||||
with closing(os.fdopen(pdf_fd, 'w')) as pf:
|
||||
pf.write(content)
|
||||
_logger.info(
|
||||
"Signing PDF document '%s/%s' with certificate '%s'",
|
||||
report_name, ids, certificate.name)
|
||||
_logger.debug(
|
||||
"Signing PDF document '%s' for IDs %s with certificate '%s'",
|
||||
report_name, docids, certificate.name,
|
||||
)
|
||||
signed = self.pdf_sign(pdf, certificate)
|
||||
# Read signed PDF
|
||||
if os.path.exists(signed):
|
||||
@@ -188,6 +169,5 @@ class Report(models.Model):
|
||||
except (OSError, IOError):
|
||||
_logger.error('Error when trying to remove file %s', fname)
|
||||
if certificate.attachment:
|
||||
self._attach_signed_write(
|
||||
cr, uid, ids, certificate, content, context=context)
|
||||
self._attach_signed_write(docids, certificate, content)
|
||||
return content
|
||||
|
||||
Reference in New Issue
Block a user