pre-commit

This commit is contained in:
Andrea
2020-09-09 12:52:32 +02:00
parent 2d17441547
commit fb5ba3bce4
17 changed files with 1368 additions and 1093 deletions

View File

@@ -3,6 +3,7 @@
import base64
import json
import pydot
from psycopg2.extensions import AsIs
@@ -11,163 +12,160 @@ from odoo.exceptions import UserError, ValidationError
class BveView(models.Model):
_name = 'bve.view'
_description = 'BI View Editor'
_name = "bve.view"
_description = "BI View Editor"
@api.depends('group_ids', 'group_ids.users')
@api.depends("group_ids", "group_ids.users")
def _compute_users(self):
for bve_view in self.sudo():
if bve_view.group_ids:
bve_view.user_ids = bve_view.group_ids.mapped('users')
bve_view.user_ids = bve_view.group_ids.mapped("users")
else:
bve_view.user_ids = self.env['res.users'].sudo().search([])
bve_view.user_ids = self.env["res.users"].sudo().search([])
@api.depends('name')
@api.depends("name")
def _compute_model_name(self):
for bve_view in self:
name = [x for x in bve_view.name.lower() if x.isalnum()]
model_name = ''.join(name).replace('_', '.').replace(' ', '.')
bve_view.model_name = 'x_bve.' + model_name
model_name = "".join(name).replace("_", ".").replace(" ", ".")
bve_view.model_name = "x_bve." + model_name
def _compute_serialized_data(self):
for bve_view in self:
serialized_data = []
for line in bve_view.line_ids.sorted(key=lambda r: r.sequence):
serialized_data.append({
'sequence': line.sequence,
'model_id': line.model_id.id,
'id': line.field_id.id,
'name': line.name,
'model_name': line.model_id.name,
'model': line.model_id.model,
'type': line.ttype,
'table_alias': line.table_alias,
'description': line.description,
'row': line.row,
'column': line.column,
'measure': line.measure,
'list': line.in_list,
'join_node': line.join_node,
'relation': line.relation,
})
serialized_data.append(
{
"sequence": line.sequence,
"model_id": line.model_id.id,
"id": line.field_id.id,
"name": line.name,
"model_name": line.model_id.name,
"model": line.model_id.model,
"type": line.ttype,
"table_alias": line.table_alias,
"description": line.description,
"row": line.row,
"column": line.column,
"measure": line.measure,
"list": line.in_list,
"join_node": line.join_node,
"relation": line.relation,
}
)
bve_view.data = json.dumps(serialized_data)
def _inverse_serialized_data(self):
for bve_view in self:
line_ids = self._sync_lines_and_data(bve_view.data)
bve_view.write({'line_ids': line_ids})
bve_view.write({"line_ids": line_ids})
name = fields.Char(required=True, copy=False)
model_name = fields.Char(compute='_compute_model_name', store=True)
note = fields.Text(string='Notes')
state = fields.Selection([
('draft', 'Draft'),
('created', 'Created')
], default='draft', copy=False)
model_name = fields.Char(compute="_compute_model_name", store=True)
note = fields.Text(string="Notes")
state = fields.Selection(
[("draft", "Draft"), ("created", "Created")], default="draft", copy=False
)
data = fields.Char(
compute='_compute_serialized_data',
inverse='_inverse_serialized_data',
compute="_compute_serialized_data",
inverse="_inverse_serialized_data",
help="Use the special query builder to define the query "
"to generate your report dataset. "
"NOTE: To be edited, the query should be in 'Draft' status.")
line_ids = fields.One2many(
'bve.view.line',
'bve_view_id',
string='Lines')
"to generate your report dataset. "
"NOTE: To be edited, the query should be in 'Draft' status.",
)
line_ids = fields.One2many("bve.view.line", "bve_view_id", string="Lines")
field_ids = fields.One2many(
'bve.view.line',
'bve_view_id',
domain=['|', ('join_node', '=', -1), ('join_node', '=', False)],
string='Fields')
"bve.view.line",
"bve_view_id",
domain=["|", ("join_node", "=", -1), ("join_node", "=", False)],
string="Fields",
)
relation_ids = fields.One2many(
'bve.view.line',
'bve_view_id',
domain=[('join_node', '!=', -1), ('join_node', '!=', False)],
string='Relations')
action_id = fields.Many2one('ir.actions.act_window', string='Action')
view_id = fields.Many2one('ir.ui.view', string='View')
"bve.view.line",
"bve_view_id",
domain=[("join_node", "!=", -1), ("join_node", "!=", False)],
string="Relations",
)
action_id = fields.Many2one("ir.actions.act_window", string="Action")
view_id = fields.Many2one("ir.ui.view", string="View")
group_ids = fields.Many2many(
'res.groups',
string='Groups',
"res.groups",
string="Groups",
help="User groups allowed to see the generated report; "
"if NO groups are specified the report will be public "
"for everyone.")
"if NO groups are specified the report will be public "
"for everyone.",
)
user_ids = fields.Many2many(
'res.users',
string='Users',
compute='_compute_users',
store=True)
query = fields.Text(compute='_compute_sql_query')
"res.users", string="Users", compute="_compute_users", store=True
)
query = fields.Text(compute="_compute_sql_query")
over_condition = fields.Text(
states={
'draft': [
('readonly', False),
],
},
states={"draft": [("readonly", False),],},
readonly=True,
help="Condition to be inserted in the OVER part "
"of the ID's row_number function.\n"
"For instance, 'ORDER BY t1.id' would create "
"IDs ordered in the same way as t1's IDs; otherwise "
"IDs are assigned with no specific order.",
"of the ID's row_number function.\n"
"For instance, 'ORDER BY t1.id' would create "
"IDs ordered in the same way as t1's IDs; otherwise "
"IDs are assigned with no specific order.",
)
er_diagram_image = fields.Binary(compute='_compute_er_diagram_image')
er_diagram_image = fields.Binary(compute="_compute_er_diagram_image")
_sql_constraints = [
('name_uniq',
'unique(name)',
_('Custom BI View names must be unique!')),
("name_uniq", "unique(name)", _("Custom BI View names must be unique!")),
]
@api.depends('line_ids')
@api.depends("line_ids")
def _compute_er_diagram_image(self):
for bve_view in self:
graph = pydot.Dot(graph_type='graph')
graph = pydot.Dot(graph_type="graph")
table_model_map = {}
for line in bve_view.field_ids:
if line.table_alias not in table_model_map:
table_alias_node = pydot.Node(
line.model_id.name + ' ' + line.table_alias,
line.model_id.name + " " + line.table_alias,
style="filled",
shape='box',
fillcolor="#DDDDDD"
shape="box",
fillcolor="#DDDDDD",
)
table_model_map[line.table_alias] = table_alias_node
graph.add_node(table_model_map[line.table_alias])
field_node = pydot.Node(
line.table_alias + '.' + line.field_id.field_description,
line.table_alias + "." + line.field_id.field_description,
label=line.description,
style="filled",
fillcolor="green"
fillcolor="green",
)
graph.add_node(field_node)
graph.add_edge(pydot.Edge(
table_model_map[line.table_alias],
field_node
))
graph.add_edge(
pydot.Edge(table_model_map[line.table_alias], field_node)
)
for line in bve_view.relation_ids:
field_description = line.field_id.field_description
table_alias = line.table_alias
diamond_node = pydot.Node(
line.ttype + ' ' + table_alias + '.' + field_description,
label=table_alias + '.' + field_description,
line.ttype + " " + table_alias + "." + field_description,
label=table_alias + "." + field_description,
style="filled",
shape='diamond',
fillcolor="#D2D2FF"
shape="diamond",
fillcolor="#D2D2FF",
)
graph.add_node(diamond_node)
graph.add_edge(pydot.Edge(
table_model_map[table_alias],
diamond_node,
labelfontcolor="#D2D2FF",
color="blue"
))
graph.add_edge(pydot.Edge(
diamond_node,
table_model_map[line.join_node],
labelfontcolor="black",
color="blue"
))
graph.add_edge(
pydot.Edge(
table_model_map[table_alias],
diamond_node,
labelfontcolor="#D2D2FF",
color="blue",
)
)
graph.add_edge(
pydot.Edge(
diamond_node,
table_model_map[line.join_node],
labelfontcolor="black",
color="blue",
)
)
try:
png_base64_image = base64.b64encode(graph.create_png())
bve_view.er_diagram_image = png_base64_image
@@ -179,9 +177,9 @@ class BveView(models.Model):
def _get_field_def(line):
field_type = line.view_field_type
return '<field name="%s" type="%s" />' % (line.name, field_type)
return '<field name="{}" type="{}" />'.format(line.name, field_type)
bve_field_lines = self.field_ids.filtered('view_field_type')
bve_field_lines = self.field_ids.filtered("view_field_type")
return list(map(_get_field_def, bve_field_lines))
def _create_tree_view_arch(self):
@@ -189,134 +187,161 @@ class BveView(models.Model):
def _get_field_attrs(line):
attr = line.list_attr
res = attr and '%s="%s"' % (attr, line.description) or ''
return '<field name="%s" %s />' % (line.name, res)
res = attr and '{}="{}"'.format(attr, line.description) or ""
return '<field name="{}" {} />'.format(line.name, res)
bve_field_lines = self.field_ids.filtered(lambda l: l.in_list)
return list(map(_get_field_attrs, bve_field_lines))
def _create_bve_view(self):
self.ensure_one()
View = self.env['ir.ui.view'].sudo()
View = self.env["ir.ui.view"].sudo()
# delete old views
View.search([('model', '=', self.model_name)]).unlink()
View.search([("model", "=", self.model_name)]).unlink()
# create views
View.create([{
'name': 'Pivot Analysis',
'type': 'pivot',
'model': self.model_name,
'priority': 16,
'arch': """<?xml version="1.0"?>
View.create(
[
{
"name": "Pivot Analysis",
"type": "pivot",
"model": self.model_name,
"priority": 16,
"arch": """<?xml version="1.0"?>
<pivot string="Pivot Analysis">
{}
</pivot>
""".format("".join(self._create_view_arch()))
}, {
'name': 'Graph Analysis',
'type': 'graph',
'model': self.model_name,
'priority': 16,
'arch': """<?xml version="1.0"?>
""".format(
"".join(self._create_view_arch())
),
},
{
"name": "Graph Analysis",
"type": "graph",
"model": self.model_name,
"priority": 16,
"arch": """<?xml version="1.0"?>
<graph string="Graph Analysis"
type="bar" stacked="True">
{}
</graph>
""".format("".join(self._create_view_arch()))
}, {
'name': 'Search BI View',
'type': 'search',
'model': self.model_name,
'priority': 16,
'arch': """<?xml version="1.0"?>
""".format(
"".join(self._create_view_arch())
),
},
{
"name": "Search BI View",
"type": "search",
"model": self.model_name,
"priority": 16,
"arch": """<?xml version="1.0"?>
<search string="Search BI View">
{}
</search>
""".format("".join(self._create_view_arch()))
}])
""".format(
"".join(self._create_view_arch())
),
},
]
)
# create Tree view
tree_view = View.create({
'name': 'Tree Analysis',
'type': 'tree',
'model': self.model_name,
'priority': 16,
'arch': """<?xml version="1.0"?>
tree_view = View.create(
{
"name": "Tree Analysis",
"type": "tree",
"model": self.model_name,
"priority": 16,
"arch": """<?xml version="1.0"?>
<tree string="List Analysis" create="false">
{}
</tree>
""".format("".join(self._create_tree_view_arch()))
})
""".format(
"".join(self._create_tree_view_arch())
),
}
)
# set the Tree view as the default one
action = self.env['ir.actions.act_window'].sudo().create({
'name': self.name,
'res_model': self.model_name,
'type': 'ir.actions.act_window',
'view_type': 'form',
'view_mode': 'tree,graph,pivot',
'view_id': tree_view.id,
'context': "{'service_name': '%s'}" % self.name,
})
action = (
self.env["ir.actions.act_window"]
.sudo()
.create(
{
"name": self.name,
"res_model": self.model_name,
"type": "ir.actions.act_window",
"view_type": "form",
"view_mode": "tree,graph,pivot",
"view_id": tree_view.id,
"context": "{'service_name': '%s'}" % self.name,
}
)
)
self.write({
'action_id': action.id,
'view_id': tree_view.id,
'state': 'created'
})
self.write(
{"action_id": action.id, "view_id": tree_view.id, "state": "created"}
)
def _build_access_rules(self, model):
self.ensure_one()
if not self.group_ids:
self.env['ir.model.access'].sudo().create({
'name': 'read access to ' + self.model_name,
'model_id': model.id,
'perm_read': True,
})
self.env["ir.model.access"].sudo().create(
{
"name": "read access to " + self.model_name,
"model_id": model.id,
"perm_read": True,
}
)
else:
# read access only to model
access_vals = [{
'name': 'read access to ' + self.model_name,
'model_id': model.id,
'group_id': group.id,
'perm_read': True
} for group in self.group_ids]
self.env['ir.model.access'].sudo().create(access_vals)
access_vals = [
{
"name": "read access to " + self.model_name,
"model_id": model.id,
"group_id": group.id,
"perm_read": True,
}
for group in self.group_ids
]
self.env["ir.model.access"].sudo().create(access_vals)
def _create_sql_view(self):
self.ensure_one()
view_name = self.model_name.replace('.', '_')
query = self.query and self.query.replace('\n', ' ')
view_name = self.model_name.replace(".", "_")
query = self.query and self.query.replace("\n", " ")
# robustness in case something went wrong
self._cr.execute('DROP TABLE IF EXISTS %s', (AsIs(view_name), ))
self._cr.execute("DROP TABLE IF EXISTS %s", (AsIs(view_name),))
# create postgres view
try:
with self.env.cr.savepoint():
self.env.cr.execute('CREATE or REPLACE VIEW %s as (%s)', (
AsIs(view_name), AsIs(query), ))
self.env.cr.execute(
"CREATE or REPLACE VIEW %s as (%s)", (AsIs(view_name), AsIs(query),)
)
except Exception as e:
raise UserError(
_("Error creating the view '{query}':\n{error}")
.format(
query=query,
error=e))
_("Error creating the view '{query}':\n{error}").format(
query=query, error=e
)
)
@api.depends('line_ids', 'state', 'over_condition')
@api.depends("line_ids", "state", "over_condition")
def _compute_sql_query(self):
for bve_view in self:
tables_map = {}
select_str = '\n CAST(row_number() OVER ({}) as integer) AS id' \
.format(bve_view.over_condition or '')
select_str = "\n CAST(row_number() OVER ({}) as integer) AS id".format(
bve_view.over_condition or ""
)
for line in bve_view.field_ids:
table = line.table_alias
select = line.field_id.name
as_name = line.name
select_str += ',\n {}.{} AS {}'.format(table, select, as_name)
select_str += ",\n {}.{} AS {}".format(table, select, as_name)
if line.table_alias not in tables_map:
table = self.env[line.field_id.model_id.model]._table
@@ -339,45 +364,53 @@ class BveView(models.Model):
from_str += " LEFT" if line.left_join else ""
from_str += " JOIN {} ON {}.id = {}.{}".format(
table_format,
line.join_node, line.table_alias, line.field_id.name
line.join_node,
line.table_alias,
line.field_id.name,
)
if line.join_node not in seen:
from_str += "\n"
seen.add(line.join_node)
from_str += " LEFT" if line.left_join else ""
from_str += " JOIN {} AS {} ON {}.{} = {}.id".format(
tables_map[line.join_node], line.join_node,
line.table_alias, line.field_id.name, line.join_node
tables_map[line.join_node],
line.join_node,
line.table_alias,
line.field_id.name,
line.join_node,
)
bve_view.query = """SELECT %s\n\nFROM %s
""" % (AsIs(select_str), AsIs(from_str),)
""" % (
AsIs(select_str),
AsIs(from_str),
)
def action_translations(self):
self.ensure_one()
if self.state != 'created':
if self.state != "created":
return
self = self.sudo()
model = self.env['ir.model'].search([('model', '=', self.model_name)])
IrTranslation = self.env['ir.translation']
IrTranslation.translate_fields('ir.model', model.id)
model = self.env["ir.model"].search([("model", "=", self.model_name)])
IrTranslation = self.env["ir.translation"]
IrTranslation.translate_fields("ir.model", model.id)
for field in model.field_id:
IrTranslation.translate_fields('ir.model.fields', field.id)
IrTranslation.translate_fields("ir.model.fields", field.id)
return {
'name': 'Translations',
'res_model': 'ir.translation',
'type': 'ir.actions.act_window',
'view_mode': 'tree',
'view_id': self.env.ref('base.view_translation_dialog_tree').id,
'target': 'current',
'flags': {'search_view': True, 'action_buttons': True},
'domain': [
'|',
'&',
('res_id', 'in', model.field_id.ids),
('name', '=', 'ir.model.fields,field_description'),
'&',
('res_id', '=', model.id),
('name', '=', 'ir.model,name')
"name": "Translations",
"res_model": "ir.translation",
"type": "ir.actions.act_window",
"view_mode": "tree",
"view_id": self.env.ref("base.view_translation_dialog_tree").id,
"target": "current",
"flags": {"search_view": True, "action_buttons": True},
"domain": [
"|",
"&",
("res_id", "in", model.field_id.ids),
("name", "=", "ir.model.fields,field_description"),
"&",
("res_id", "=", model.id),
("name", "=", "ir.model,name"),
],
}
@@ -396,12 +429,19 @@ class BveView(models.Model):
# create model and fields
bve_fields = self.line_ids.filtered(lambda l: not l.join_node)
model = self.env['ir.model'].sudo().with_context(bve=True).create({
'name': self.name,
'model': self.model_name,
'state': 'manual',
'field_id': [(0, 0, f) for f in bve_fields._prepare_field_vals()],
})
model = (
self.env["ir.model"]
.sudo()
.with_context(bve=True)
.create(
{
"name": self.name,
"model": self.model_name,
"state": "manual",
"field_id": [(0, 0, f) for f in bve_fields._prepare_field_vals()],
}
)
)
# give access rights
self._build_access_rules(model)
@@ -415,57 +455,69 @@ class BveView(models.Model):
if not self.group_ids:
return
for line_model in self.line_ids.mapped('model_id'):
res_count = self.env['ir.model.access'].sudo().search([
('model_id', '=', line_model.id),
('perm_read', '=', True),
'|',
('group_id', '=', False),
('group_id', 'in', self.group_ids.ids),
], limit=1)
for line_model in self.line_ids.mapped("model_id"):
res_count = (
self.env["ir.model.access"]
.sudo()
.search(
[
("model_id", "=", line_model.id),
("perm_read", "=", True),
"|",
("group_id", "=", False),
("group_id", "in", self.group_ids.ids),
],
limit=1,
)
)
if not res_count:
access_records = self.env['ir.model.access'].sudo().search([
('model_id', '=', line_model.id),
('perm_read', '=', True),
])
group_list = ''
for group in access_records.mapped('group_id'):
group_list += ' * %s\n' % (group.full_name, )
access_records = (
self.env["ir.model.access"]
.sudo()
.search(
[("model_id", "=", line_model.id), ("perm_read", "=", True),]
)
)
group_list = ""
for group in access_records.mapped("group_id"):
group_list += " * {}\n".format(group.full_name)
msg_title = _(
'The model "%s" cannot be accessed by users with the '
'selected groups only.' % (line_model.name, ))
msg_details = _(
'At least one of the following groups must be added:')
raise UserError(_(
'%s\n\n%s\n%s' % (msg_title, msg_details, group_list,)
))
"selected groups only." % (line_model.name,)
)
msg_details = _("At least one of the following groups must be added:")
raise UserError(
_("{}\n\n{}\n{}".format(msg_title, msg_details, group_list))
)
def _check_invalid_lines(self):
self.ensure_one()
if not self.line_ids:
raise ValidationError(_('No data to process.'))
raise ValidationError(_("No data to process."))
if any(not line.model_id for line in self.line_ids):
invalid_lines = self.line_ids.filtered(lambda l: not l.model_id)
missing_models = set(invalid_lines.mapped('model_name'))
missing_models = ', '.join(missing_models)
raise ValidationError(_(
'Following models are missing: %s.\n'
'Probably some modules were uninstalled.' % (missing_models,)
))
missing_models = set(invalid_lines.mapped("model_name"))
missing_models = ", ".join(missing_models)
raise ValidationError(
_(
"Following models are missing: %s.\n"
"Probably some modules were uninstalled." % (missing_models,)
)
)
if any(not line.field_id for line in self.line_ids):
invalid_lines = self.line_ids.filtered(lambda l: not l.field_id)
missing_fields = set(invalid_lines.mapped('field_name'))
missing_fields = ', '.join(missing_fields)
raise ValidationError(_(
'Following fields are missing: %s.' % (missing_fields,)
))
missing_fields = set(invalid_lines.mapped("field_name"))
missing_fields = ", ".join(missing_fields)
raise ValidationError(
_("Following fields are missing: {}.".format(missing_fields))
)
def open_view(self):
self.ensure_one()
self._check_invalid_lines()
[action] = self.action_id.read()
action['display_name'] = _('BI View')
action["display_name"] = _("BI View")
return action
@api.multi
@@ -479,10 +531,8 @@ class BveView(models.Model):
has_menus = False
if self.action_id:
action = 'ir.actions.act_window,%d' % (self.action_id.id,)
menus = self.env['ir.ui.menu'].search([
('action', '=', action)
])
action = "ir.actions.act_window,%d" % (self.action_id.id,)
menus = self.env["ir.ui.menu"].search([("action", "=", action)])
has_menus = True if menus else False
menus.unlink()
@@ -490,26 +540,26 @@ class BveView(models.Model):
self.sudo().action_id.view_id.unlink()
self.sudo().action_id.unlink()
self.env['ir.ui.view'].sudo().search(
[('model', '=', self.model_name)]).unlink()
models_to_delete = self.env['ir.model'].sudo().search([
('model', '=', self.model_name)])
self.env["ir.ui.view"].sudo().search([("model", "=", self.model_name)]).unlink()
models_to_delete = (
self.env["ir.model"].sudo().search([("model", "=", self.model_name)])
)
if models_to_delete:
models_to_delete.unlink()
table_name = self.model_name.replace('.', '_')
table_name = self.model_name.replace(".", "_")
tools.drop_view_if_exists(self.env.cr, table_name)
self.state = 'draft'
self.state = "draft"
if has_menus:
return {'type': 'ir.actions.client', 'tag': 'reload'}
return {"type": "ir.actions.client", "tag": "reload"}
def unlink(self):
if self.filtered(lambda v: v.state == 'created'):
if self.filtered(lambda v: v.state == "created"):
raise UserError(
_('You cannot delete a created view! '
'Reset the view to draft first.'))
_("You cannot delete a created view! " "Reset the view to draft first.")
)
return super().unlink()
@api.model
@@ -521,63 +571,69 @@ class BveView(models.Model):
table_model_map = {}
for item in fields_info:
if item.get('join_node', -1) == -1:
table_model_map[item['table_alias']] = item['model_id']
if item.get("join_node", -1) == -1:
table_model_map[item["table_alias"]] = item["model_id"]
for sequence, field_info in enumerate(fields_info, start=1):
join_model_id = False
join_node = field_info.get('join_node', -1)
join_node = field_info.get("join_node", -1)
if join_node != -1 and table_model_map.get(join_node):
join_model_id = int(table_model_map[join_node])
line_ids += [(0, False, {
'sequence': sequence,
'model_id': field_info['model_id'],
'table_alias': field_info['table_alias'],
'description': field_info['description'],
'field_id': field_info['id'],
'ttype': field_info['type'],
'row': field_info['row'],
'column': field_info['column'],
'measure': field_info['measure'],
'in_list': field_info['list'],
'relation': field_info.get('relation'),
'join_node': field_info.get('join_node'),
'join_model_id': join_model_id,
})]
line_ids += [
(
0,
False,
{
"sequence": sequence,
"model_id": field_info["model_id"],
"table_alias": field_info["table_alias"],
"description": field_info["description"],
"field_id": field_info["id"],
"ttype": field_info["type"],
"row": field_info["row"],
"column": field_info["column"],
"measure": field_info["measure"],
"in_list": field_info["list"],
"relation": field_info.get("relation"),
"join_node": field_info.get("join_node"),
"join_model_id": join_model_id,
},
)
]
return line_ids
@api.constrains('line_ids')
@api.constrains("line_ids")
def _constraint_line_ids(self):
models_with_tables = self.env.registry.models.keys()
for view in self:
nodes = view.line_ids.filtered(lambda n: n.join_node)
nodes_models = nodes.mapped('table_alias')
nodes_models += nodes.mapped('join_node')
nodes_models = nodes.mapped("table_alias")
nodes_models += nodes.mapped("join_node")
not_nodes = view.line_ids.filtered(lambda n: not n.join_node)
not_nodes_models = not_nodes.mapped('table_alias')
err_msg = _('Inconsistent lines.')
not_nodes_models = not_nodes.mapped("table_alias")
err_msg = _("Inconsistent lines.")
if set(nodes_models) - set(not_nodes_models):
raise ValidationError(err_msg)
if len(set(not_nodes_models) - set(nodes_models)) > 1:
raise ValidationError(err_msg)
models = view.line_ids.mapped('model_id')
models = view.line_ids.mapped("model_id")
if models.filtered(lambda m: m.model not in models_with_tables):
raise ValidationError(_('Abstract models not supported.'))
raise ValidationError(_("Abstract models not supported."))
@api.model
def get_clean_list(self, data_dict):
serialized_data = json.loads(data_dict)
table_alias_list = set()
for item in serialized_data:
if item.get('join_node', -1) in [-1, False]:
table_alias_list.add(item['table_alias'])
if item.get("join_node", -1) in [-1, False]:
table_alias_list.add(item["table_alias"])
for item in serialized_data:
if item.get('join_node', -1) not in [-1, False]:
if item['table_alias'] not in table_alias_list:
if item.get("join_node", -1) not in [-1, False]:
if item["table_alias"] not in table_alias_list:
serialized_data.remove(item)
elif item['join_node'] not in table_alias_list:
elif item["join_node"] not in table_alias_list:
serialized_data.remove(item)
return json.dumps(serialized_data)

View File

@@ -6,19 +6,19 @@ from odoo.exceptions import ValidationError
class BveViewLine(models.Model):
_name = 'bve.view.line'
_description = 'BI View Editor Lines'
_name = "bve.view.line"
_description = "BI View Editor Lines"
name = fields.Char(compute='_compute_name')
name = fields.Char(compute="_compute_name")
sequence = fields.Integer(default=1)
bve_view_id = fields.Many2one('bve.view', ondelete='cascade')
model_id = fields.Many2one('ir.model', string='Model')
model_name = fields.Char(compute='_compute_model_name', store=True)
bve_view_id = fields.Many2one("bve.view", ondelete="cascade")
model_id = fields.Many2one("ir.model", string="Model")
model_name = fields.Char(compute="_compute_model_name", store=True)
table_alias = fields.Char()
join_model_id = fields.Many2one('ir.model', string='Join Model')
field_id = fields.Many2one('ir.model.fields', string='Field')
field_name = fields.Char(compute='_compute_model_field_name', store=True)
ttype = fields.Char(string='Type')
join_model_id = fields.Many2one("ir.model", string="Join Model")
field_id = fields.Many2one("ir.model.fields", string="Field")
field_name = fields.Char(compute="_compute_model_field_name", store=True)
ttype = fields.Char(string="Type")
description = fields.Char(translate=True)
relation = fields.Char()
join_node = fields.Char()
@@ -28,88 +28,87 @@ class BveViewLine(models.Model):
column = fields.Boolean()
measure = fields.Boolean()
in_list = fields.Boolean()
list_attr = fields.Selection([
('sum', 'Sum'),
('avg', 'Average'),
], string='List Attribute', default='sum')
view_field_type = fields.Char(compute='_compute_view_field_type')
list_attr = fields.Selection(
[("sum", "Sum"), ("avg", "Average"),], string="List Attribute", default="sum"
)
view_field_type = fields.Char(compute="_compute_view_field_type")
@api.depends('row', 'column', 'measure')
@api.depends("row", "column", "measure")
def _compute_view_field_type(self):
for line in self:
row = line.row and 'row'
column = line.column and 'col'
measure = line.measure and 'measure'
row = line.row and "row"
column = line.column and "col"
measure = line.measure and "measure"
line.view_field_type = row or column or measure
@api.constrains('row', 'column', 'measure')
@api.constrains("row", "column", "measure")
def _constrains_options_check(self):
measure_types = ['float', 'integer', 'monetary']
measure_types = ["float", "integer", "monetary"]
for line in self.filtered(lambda l: l.row or l.column):
if line.join_model_id or line.ttype in measure_types:
err_msg = _('This field cannot be a row or a column.')
err_msg = _("This field cannot be a row or a column.")
raise ValidationError(err_msg)
for line in self.filtered(lambda l: l.measure):
if line.join_model_id or line.ttype not in measure_types:
err_msg = _('This field cannot be a measure.')
err_msg = _("This field cannot be a measure.")
raise ValidationError(err_msg)
@api.constrains('table_alias', 'field_id')
@api.constrains("table_alias", "field_id")
def _constrains_unique_fields_check(self):
seen = set()
for line in self.mapped('bve_view_id.field_ids'):
if (line.table_alias, line.field_id.id, ) not in seen:
seen.add((line.table_alias, line.field_id.id, ))
for line in self.mapped("bve_view_id.field_ids"):
if (line.table_alias, line.field_id.id,) not in seen:
seen.add((line.table_alias, line.field_id.id,))
else:
raise ValidationError(_('Field %s/%s is duplicated.\n'
'Please remove the duplications.') % (
line.field_id.model, line.field_id.name
))
raise ValidationError(
_("Field %s/%s is duplicated.\n" "Please remove the duplications.")
% (line.field_id.model, line.field_id.name)
)
@api.depends('field_id', 'sequence')
@api.depends("field_id", "sequence")
def _compute_name(self):
for line in self.filtered(lambda l: l.field_id):
field_name = line.field_id.name
line.name = 'x_bve_%s_%s' % (line.table_alias, field_name,)
line.name = "x_bve_{}_{}".format(line.table_alias, field_name)
@api.depends('model_id')
@api.depends("model_id")
def _compute_model_name(self):
for line in self.filtered(lambda l: l.model_id):
line.model_name = line.model_id.model
@api.depends('field_id')
@api.depends("field_id")
def _compute_model_field_name(self):
for line in self.filtered(lambda l: l.field_id):
field_name = line.description
model_name = line.model_name
line.field_name = '%s (%s)' % (field_name, model_name, )
line.field_name = "{} ({})".format(field_name, model_name)
def _prepare_field_vals(self):
vals_list = []
for line in self:
field = line.field_id
vals = {
'name': line.name,
'complete_name': field.complete_name,
'model': line.bve_view_id.model_name,
'relation': field.relation,
'field_description': line.description,
'ttype': field.ttype,
'selection': field.selection,
'size': field.size,
'state': 'manual',
'readonly': True,
'groups': [(6, 0, field.groups.ids)],
"name": line.name,
"complete_name": field.complete_name,
"model": line.bve_view_id.model_name,
"relation": field.relation,
"field_description": line.description,
"ttype": field.ttype,
"selection": field.selection,
"size": field.size,
"state": "manual",
"readonly": True,
"groups": [(6, 0, field.groups.ids)],
}
if vals['ttype'] == 'monetary':
vals.update({'ttype': 'float'})
if field.ttype == 'selection' and not field.selection:
if vals["ttype"] == "monetary":
vals.update({"ttype": "float"})
if field.ttype == "selection" and not field.selection:
model_obj = self.env[field.model_id.model]
selection = model_obj._fields[field.name].selection
if callable(selection):
selection_domain = selection(model_obj)
else:
selection_domain = selection
vals.update({'selection': str(selection_domain)})
vals.update({"selection": str(selection_domain)})
vals_list.append(vals)
return vals_list

View File

@@ -5,98 +5,88 @@ from collections import defaultdict
from odoo import api, models, registry
NO_BI_MODELS = [
'fetchmail.server'
]
NO_BI_MODELS = ["fetchmail.server"]
NO_BI_TTYPES = [
'many2many',
'one2many',
'html',
'binary',
'reference'
]
NO_BI_TTYPES = ["many2many", "one2many", "html", "binary", "reference"]
def dict_for_field(field):
return {
'id': field.id,
'name': field.name,
'description': field.field_description,
'type': field.ttype,
'relation': field.relation,
'custom': False,
'model_id': field.model_id.id,
'model': field.model_id.model,
'model_name': field.model_id.name
"id": field.id,
"name": field.name,
"description": field.field_description,
"type": field.ttype,
"relation": field.relation,
"custom": False,
"model_id": field.model_id.id,
"model": field.model_id.model,
"model_name": field.model_id.name,
}
def dict_for_model(model):
return {
'id': model.id,
'name': model.name,
'model': model.model
}
return {"id": model.id, "name": model.name, "model": model.model}
class IrModel(models.Model):
_inherit = 'ir.model'
_inherit = "ir.model"
@api.model
def _filter_bi_models(self, model):
def _check_name(model_model):
if model_model in NO_BI_MODELS:
return 1
return 0
def _check_startswith(model_model):
if model_model.startswith('workflow') or \
model_model.startswith('ir.') or \
model_model.startswith('base_'):
if (
model_model.startswith("workflow")
or model_model.startswith("ir.")
or model_model.startswith("base_")
):
return 1
return 0
def _check_contains(model_model):
if 'mail' in model_model or \
'report' in model_model or \
'edi.' in model_model:
if (
"mail" in model_model
or "report" in model_model
or "edi." in model_model
):
return 1
return 0
def _check_unknown(model_name):
if model_name == 'Unknown' or '.' in model_name:
if model_name == "Unknown" or "." in model_name:
return 1
return 0
model_model = model['model']
model_name = model['name']
model_model = model["model"]
model_name = model["name"]
count_check = 0
count_check += _check_name(model_model)
count_check += _check_startswith(model_model)
count_check += _check_contains(model_model)
count_check += _check_unknown(model_name)
if not count_check:
return self.env['ir.model.access'].check(
model['model'], 'read', False)
return self.env["ir.model.access"].check(model["model"], "read", False)
return False
def get_model_list(self, model_table_map):
if not model_table_map:
return []
domain = [('model_id', 'in', list(model_table_map.keys())),
('store', '=', True),
('ttype', '=', 'many2one')]
fields = self.env['ir.model.fields'].sudo().search(domain)
domain = [
("model_id", "in", list(model_table_map.keys())),
("store", "=", True),
("ttype", "=", "many2one"),
]
fields = self.env["ir.model.fields"].sudo().search(domain)
model_list = []
for field in fields:
for table_alias in model_table_map[field.model_id.id]:
model_list.append(dict(
dict_for_field(field),
table_alias=table_alias,
join_node=-1,
))
model_list.append(
dict(dict_for_field(field), table_alias=table_alias, join_node=-1,)
)
return model_list
def get_relation_list(self, model_table_map):
@@ -106,32 +96,31 @@ class IrModel(models.Model):
for model in self.sudo().browse(model_table_map.keys()):
model_names.update({model.model: model.id})
domain = [('relation', 'in', list(model_names.keys())),
('store', '=', True),
('ttype', '=', 'many2one')]
fields = self.env['ir.model.fields'].sudo().search(domain)
domain = [
("relation", "in", list(model_names.keys())),
("store", "=", True),
("ttype", "=", "many2one"),
]
fields = self.env["ir.model.fields"].sudo().search(domain)
relation_list = []
for field in fields:
model_id = model_names[field.relation]
for join_node in model_table_map[model_id]:
relation_list.append(dict(
dict_for_field(field),
join_node=join_node,
table_alias=-1
))
relation_list.append(
dict(dict_for_field(field), join_node=join_node, table_alias=-1)
)
return relation_list
@api.model
def _get_related_models_domain(self, model_table_map):
domain = [('transient', '=', False)]
domain = [("transient", "=", False)]
if model_table_map:
model_list = self.get_model_list(model_table_map)
relation_list = self.get_relation_list(model_table_map)
model_ids = [f['model_id'] for f in relation_list + model_list]
model_ids = [f["model_id"] for f in relation_list + model_list]
model_ids += list(model_table_map.keys())
relations = [f['relation'] for f in model_list]
domain += [
'|', ('id', 'in', model_ids), ('model', 'in', relations)]
relations = [f["relation"] for f in model_list]
domain += ["|", ("id", "in", model_ids), ("model", "in", relations)]
return domain
@api.model
@@ -140,7 +129,7 @@ class IrModel(models.Model):
joined with the already selected models.
"""
domain = self._get_related_models_domain(model_table_map)
return self.sudo().search(domain, order='name asc')
return self.sudo().search(domain, order="name asc")
@api.model
def get_models(self, table_model_map=None):
@@ -166,6 +155,7 @@ class IrModel(models.Model):
Return all possible join nodes to add new_field to the query
containing model_ids.
"""
def remove_duplicate_nodes(join_nodes):
seen = set()
nodes_list = []
@@ -181,24 +171,24 @@ class IrModel(models.Model):
keys = []
model_table_map = defaultdict(list)
for field in field_data:
model_table_map[field['model_id']].append(field['table_alias'])
if field.get('join_node', -1) != -1:
keys.append((field['table_alias'], field['id']))
model_table_map[field["model_id"]].append(field["table_alias"])
if field.get("join_node", -1) != -1:
keys.append((field["table_alias"], field["id"]))
# nodes in current model
existing_aliases = model_table_map[new_field['model_id']]
join_nodes = [{'table_alias': alias} for alias in existing_aliases]
existing_aliases = model_table_map[new_field["model_id"]]
join_nodes = [{"table_alias": alias} for alias in existing_aliases]
# nodes in past selected models
for field in self.get_model_list(model_table_map):
if new_field['model'] == field['relation']:
if (field['table_alias'], field['id']) not in keys:
if new_field["model"] == field["relation"]:
if (field["table_alias"], field["id"]) not in keys:
join_nodes.append(field)
# nodes in new model
for field in self.get_relation_list(model_table_map):
if new_field['model_id'] == field['model_id']:
if (field['table_alias'], field['id']) not in keys:
if new_field["model_id"] == field["model_id"]:
if (field["table_alias"], field["id"]) not in keys:
join_nodes.append(field)
return remove_duplicate_nodes(join_nodes)
@@ -207,29 +197,36 @@ class IrModel(models.Model):
def get_fields(self, model_id):
self = self.with_context(lang=self.env.user.lang)
fields = self.env['ir.model.fields'].sudo().search([
('model_id', '=', model_id),
('store', '=', True),
('name', 'not in', models.MAGIC_COLUMNS),
('ttype', 'not in', NO_BI_TTYPES)
], order='field_description desc')
fields = (
self.env["ir.model.fields"]
.sudo()
.search(
[
("model_id", "=", model_id),
("store", "=", True),
("name", "not in", models.MAGIC_COLUMNS),
("ttype", "not in", NO_BI_TTYPES),
],
order="field_description desc",
)
)
fields_dict = list(map(dict_for_field, fields))
return fields_dict
@api.model
def create(self, vals):
if self.env.context and self.env.context.get('bve'):
vals['state'] = 'base'
if self.env.context and self.env.context.get("bve"):
vals["state"] = "base"
res = super().create(vals)
# this sql update is necessary since a write method here would
# be not working (an orm constraint is restricting the modification
# of the state field while updating ir.model)
q = "UPDATE ir_model SET state = 'manual' WHERE id = %s"
self.env.cr.execute(q, (res.id, ))
self.env.cr.execute(q, (res.id,))
# # update registry
if self.env.context.get('bve'):
if self.env.context.get("bve"):
# setup models; this reloads custom models in registry
self.pool.setup_models(self._cr)

View File

@@ -11,7 +11,7 @@ _logger = logging.getLogger(__name__)
@api.model
def _bi_view(_name):
return _name.startswith('x_bve.')
return _name.startswith("x_bve.")
_auto_init_orig = models.BaseModel._auto_init
@@ -36,7 +36,7 @@ models.BaseModel._auto_init = _auto_init
class Base(models.AbstractModel):
_inherit = 'base'
_inherit = "base"
@api.model
def _setup_complete(self):
@@ -50,10 +50,9 @@ class Base(models.AbstractModel):
if not _bi_view(self._name):
return super()._read_group_process_groupby(gb, query)
split = gb.split(':')
split = gb.split(":")
if split[0] not in self._fields:
raise UserError(
_('No data to be displayed.'))
raise UserError(_("No data to be displayed."))
return super()._read_group_process_groupby(gb, query)
@api.model