From b08284e1c7939fdba2ae93be7a58f020d190d46c Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 18 Sep 2013 13:45:07 +0200 Subject: [PATCH] [IMP] asynchrone update, management of error, bypass orm option --- .../model/move_line_importer.py | 132 ++++++++++++++++-- 1 file changed, 122 insertions(+), 10 deletions(-) diff --git a/async_move_line_importer/model/move_line_importer.py b/async_move_line_importer/model/move_line_importer.py index 2d7d78128..eff2433b2 100644 --- a/async_move_line_importer/model/move_line_importer.py +++ b/async_move_line_importer/model/move_line_importer.py @@ -19,9 +19,15 @@ # ############################################################################## import base64 +import threading import csv import tempfile + +import openerp.pooler as pooler from openerp.osv import orm, fields +from openerp.tools.translate import _ + +USE_THREAD = False class move_line_importer(orm.Model): @@ -29,12 +35,26 @@ class move_line_importer(orm.Model): _name = "move.line.importer" _inherit = ['mail.thread'] - # _track = { - # 'state': { - # 'import.success': lambda self, cr, uid, obj, ctx=None: obj['state'] == 'done', - # 'import.error': lambda self, cr, uid, obj, ctx=None: obj['state'] == 'error', - # }, - # } + + def copy(self, cr, uid, id, default=None, context=None): + if default is None: + default = {} + default.update(state='draft', report=False) + return super(move_line_importer, self).copy(cr, uid, id, default=default, + context=context) + + def track_success(sef, cr, uid, obj, context=None): + return obj['state'] == 'done' + + def track_error(sef, cr, uid, obj, context=None): + return obj['state'] == 'error' + + _track = { + 'state': { + 'async_move_line_importer.mvl_imported': track_success, + 'async_move_line_importer.mvl_error': track_error, + }, + } _columns = {'name': fields.datetime('Name', required=True, @@ -54,10 +74,21 @@ class move_line_importer(orm.Model): string="CSV delimiter", required=True), 'company_id': fields.many2one('res.company', - 'Company') + 'Company'), + 'bypass_orm': fields.boolean('Fast import (use with caution)', + help="When enabled import will be faster but" + " it will not use orm and may" + " not support all CSV canvas. \n" + "Entry posted option will be skipped. \n" + "AA lines will only be crated when" + " moves are posted. \n" + "Tax lines computation will be skipped. \n" + "This option should be used with caution" + " and in conjonction with provided canvas."), } - _defaults = {'delimiter': ','} + _defaults = {'delimiter': ',', + 'bypass_orm': False} def _get_current_company(self, cr, uid, context=None, model="move.line.importer"): return self.pool.get('res.company')._company_default_get(cr, uid, model, @@ -68,6 +99,9 @@ class move_line_importer(orm.Model): 'company_id': _get_current_company} def _parse_csv(self, cr, uid, imp_id): + """Parse stored CSV file in order to be usable by load method. + Manage base 64 decoding. + It will return head (list of first row) and data list of list""" with tempfile.TemporaryFile() as src: imp = self.read(cr, uid, imp_id, ['file', 'delimiter']) content = imp['file'] @@ -80,13 +114,91 @@ class move_line_importer(orm.Model): return self._prepare_csv_data(decoded, delimiter) def _prepare_csv_data(self, csv_file, delimiter=","): + """Parse and decoded CSV file and return head list + and data list""" data = csv.reader(csv_file, delimiter=str(delimiter)) head = data.next() - # generator does not work in load - values = [x for x in data] + head = [x.replace(' ', '') for x in head] + # Generator does not work with orm.BaseModel.load + values = [tuple(x) for x in data if x] return (head, values) + def format_messages(self, messages): + res = [] + for msg in messages: + rows = msg.get('rows', {}) + res.append(_("%s. -- Field: %s -- rows %s to %s") % (msg.get('message', 'N/A'), + msg.get('field', 'N/A'), + rows.get('from', 'N/A'), + rows.get('to', 'N/A'))) + return "\n \n".join(res) + + def _manage_load_results(self, cr, uid, imp_id, result, context=None): + if not result['messages']: + msg = _("%s lines imported" % len(result['ids'] or [])) + self.write(cr, uid, [imp_id], {'state': 'done', + 'report': msg}) + else: + cr.rollback() + msg = self.format_messages(result['messages']) + self.write(cr, uid, [imp_id], {'state': 'error', + 'report': msg}) + cr.commit() + return imp_id + + def _load_data(self, cr, uid, imp_id, head, data, mode, context=None): + """Function that does the load management, exception and load report""" + valid_modes = ('threaded', 'direct') + if mode not in valid_modes: + raise ValueError('%s is not in valid mode %s ' % (mode, valid_modes)) + try: + res = self.pool['account.move'].load(cr, uid, head, data, context=context) + self._manage_load_results(cr, uid, imp_id, res, context=context) + except Exception as exc: + cr.rollback() + self.write(cr, uid, [imp_id], {'state': 'error'}) + if mode != "threaded": + raise + msg = _("Unexpected exception not related to CSV file.\n %s" % repr(exc)) + self.write(cr, uid, [imp_id], {'report': msg}) + + finally: + if mode == 'threaded': + cr.commit() + cr.close() + return imp_id + + def _allows_thread(self, imp_id): + """Check if there is a async import of this file running""" + for th in threading.enumerate(): + if th.getName() == 'async_move_line_import_%s' % imp_id: + raise orm.except_orm(_('An import of this file is already running'), + _('Please try latter')) + def import_file(self, cr, uid, imp_id, context=None): if isinstance(imp_id, list): imp_id = imp_id[0] + if context is None: + context = {} + current = self.read(cr, uid, imp_id, ['bypass_orm', 'company_id'], + load='_classic_write') + context['company_id'] = current['company_id'] + bypass_orm = current['bypass_orm'] + if bypass_orm: + # Tells create funtion to bypass orm + context['async_bypass_create'] = True head, data = self._parse_csv(cr, uid, imp_id) + self.write(cr, uid, [imp_id], {'state': 'running'}) + if USE_THREAD: + self._allows_thread(imp_id) + db_name = cr.dbname + local_cr = pooler.get_db(db_name).cursor() + thread = threading.Thread(target=self._load_data, + name='async_move_line_import_%s' % imp_id, + args=(local_cr, uid, imp_id, head, data, + 'threaded', context.copy())) + thread.start() + else: + self._load_data(cr, uid, imp_id, head, data, 'direct', + context=context) + return {}