[IMP] base_external_dbsource: Refactor & Split by source

* Heavily refactor code for reusability
* Split all sources into independent modules
* Add more test coverage
* Add CRUD methods
* Add iterator execute return to roadmap
This commit is contained in:
Dave Lasley
2016-12-07 18:28:41 -08:00
committed by Sergio Teruel
parent 73c41ac9d9
commit 27417c7e8b
9 changed files with 584 additions and 217 deletions

View File

@@ -19,18 +19,11 @@ Configuration
Database sources can be configured in Settings > Configuration -> Data sources.
Depending on the database, you need:
* to install unixodbc and python-pyodbc packages to use ODBC connections.
* to install FreeTDS driver (tdsodbc package) and configure it through ODBC to connect to Microsoft SQL Server.
* to install and configure Oracle Instant Client and cx_Oracle python library to connect to Oracle.
* to install fdb package to connect in Firebird.
Usage
=====
To use this module:
-------------------
* Go to Settings > Database Structure > Database Sources
* Click on Create to enter the following information:
@@ -38,24 +31,35 @@ To use this module:
* Data source name 
* Password
* Connector: Choose the database to which you want to connect
* Connection string : Specify how to connect to database
* Connection string: Specify how to connect to database
To extend this module:
----------------------
.. image:: https://odoo-community.org/website/image/ir.attachment/5784_f2813bd/datas
:alt: Try me on Runbot
:target: https://runbot.odoo-community.org/runbot/149/9.0 for server-tools
:target: https://runbot.odoo-community.org/runbot/149/10.0 for server-tools
Known issues / Roadmap
======================
* Find a way to remove or default the CA certs dir
* Add concept of multiple connection strings for one source (multiple nodes)
* Add a ConnectionEnvironment that allows for the reuse of connections
* Message box should be displayed instead of error in ``connection_test``
* Remove old api compatibility layers (v11)
* Instead of returning list of results, we should return iterators. This will support
larger datasets in a more efficient manner.
* Implement better CRUD handling
Bug Tracker
===========
Bugs are tracked on `GitHub Issues <https://github.com/OCA/server-tools/issues>`_.
In case of trouble, please check there if your issue has already been reported.
If you spotted it first, help us smashing it by providing a detailed and welcomed feedback `here <https://github.com/OCA/
server-tools/issues/new?body=module:%20
base_external_dbsource%0Aversion:%20
9.0%0A%0A**Steps%20to%20reproduce**%0A-%20...%0A%0A**Current%20behavior**%0A%0A**Expected%20behavior**>`_.
If you spotted it first, help us smashing it by providing a detailed and welcomed feedback.
Credits
=======
@@ -66,7 +70,7 @@ Contributors
* Daniel Reis <dreis.pt@hotmail.com>
* Maxime Chambreuil <maxime.chambreuil@savoirfairelinux.com>
* Gervais Naoussi <gervaisnaoussi@gmail.com>
* Michell Stuttgart <michellstut@gmail.com>
* Dave Lasley <dave@laslabs.com>
Maintainer
----------

View File

@@ -1,2 +1,3 @@
# -*- coding: utf-8 -*-
from . import models

View File

@@ -4,9 +4,11 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
{
'name': 'External Database Sources',
'version': '10.0.1.0.1',
'version': '10.0.2.0.0',
'category': 'Tools',
'author': "Daniel Reis,Odoo Community Association (OCA)",
'author': "Daniel Reis, "
"LasLabs, "
"Odoo Community Association (OCA)",
'website': 'https://github.com/OCA/server-tools',
'license': 'LGPL-3',
'images': [

View File

@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# Copyright 2016 LasLabs Inc.
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo.exceptions import UserError
class ConnectionFailedError(UserError):
pass
class ConnectionSuccessError(UserError):
pass

View File

@@ -1,2 +1,3 @@
# -*- coding: utf-8 -*-
from . import base_external_dbsource

View File

@@ -1,70 +1,50 @@
# -*- coding: utf-8 -*-
# Copyright 2011 Daniel Reis
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
# Copyright 2016 LasLabs Inc.
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
import os
import logging
import psycopg2
from odoo import models, fields, api, _
from odoo.exceptions import Warning as UserError
import odoo.tools as tools
from contextlib import contextmanager
from odoo import _, api, fields, models, tools
from ..exceptions import ConnectionFailedError, ConnectionSuccessError
_logger = logging.getLogger(__name__)
CONNECTORS = []
try:
import sqlalchemy
CONNECTORS.append(('sqlite', 'SQLite'))
try:
import pymssql
CONNECTORS.append(('mssql', 'Microsoft SQL Server'))
assert pymssql
except (ImportError, AssertionError):
_logger.info('MS SQL Server not available. Please install "pymssql"\
python package.')
try:
import MySQLdb
CONNECTORS.append(('mysql', 'MySQL'))
assert MySQLdb
except (ImportError, AssertionError):
_logger.info('MySQL not available. Please install "mysqldb"\
python package.')
except:
_logger.info('SQL Alchemy not available. Please install "slqalchemy"\
python package.')
try:
import pyodbc
CONNECTORS.append(('pyodbc', 'ODBC'))
except:
_logger.info('ODBC libraries not available. Please install "unixodbc"\
and "python-pyodbc" packages.')
try:
import cx_Oracle
CONNECTORS.append(('cx_Oracle', 'Oracle'))
except:
_logger.info('Oracle libraries not available. Please install "cx_Oracle"\
python package.')
try:
import fdb
CONNECTORS.append(('fdb', 'Firebird'))
except:
_logger.info('Firebird libraries not available. Please install "fdb"\
python package.')
CONNECTORS.append(('postgresql', 'PostgreSQL'))
class BaseExternalDbsource(models.Model):
""" It provides logic for connection to an external data source
Classes implementing this interface must provide the following methods
suffixed with the adapter type. See the method definitions and examples
for more information:
* ``connection_open_*``
* ``connection_close_*``
* ``execute_*``
Optional methods for adapters to implement:
* ``remote_browse_*``
* ``remote_create_*``
* ``remote_delete_*``
* ``remote_search_*``
* ``remote_update_*``
"""
_name = "base.external.dbsource"
_description = 'External Database Sources'
name = fields.Char('Datasource name', required=True, size=64)
CONNECTORS = [
('postgresql', 'PostgreSQL'),
]
# This is appended to the conn string if pass declared but not detected.
# Children should declare PWD_STRING_CONNECTOR (such as PWD_STRING_FBD)
# to allow for override.
PWD_STRING = 'PWD=%s;'
name = fields.Char('Datasource name', required=True, size=64)
conn_string = fields.Text('Connection string', help="""
Sample connection strings:
- Microsoft SQL Server:
@@ -72,64 +52,99 @@ class BaseExternalDbsource(models.Model):
- MySQL: mysql://user:%s@server:port/dbname
- ODBC: DRIVER={FreeTDS};SERVER=server.address;Database=mydb;UID=sa
- ORACLE: username/%s@//server.address:port/instance
- FireBird: host=localhost;database=mydatabase.gdb;user=sysdba;password=%s;
port=3050;charset=utf8
- PostgreSQL:
dbname='template1' user='dbuser' host='localhost' port='5432' \
password=%s
- SQLite: sqlite:///test.db
- Elasticsearch: https://user:%s@localhost:9200
""")
conn_string_full = fields.Text(
readonly=True,
compute='_compute_conn_string_full',
)
password = fields.Char('Password', size=40)
client_cert = fields.Text()
client_key = fields.Text()
ca_certs = fields.Char(
help='Path to CA Certs file on server.',
)
connector = fields.Selection(
CONNECTORS, 'Connector', required=True,
help="If a connector is missing from the list, check the server "
"log to confirm that the required components were detected.",
)
connector = fields.Selection(CONNECTORS, 'Connector', required=True,
help="If a connector is missing from the\
list, check the server log to confirm\
that the required components were\
detected.")
current_table = None
@api.multi
def conn_open(self):
"""The connection is open here."""
@api.depends('conn_string', 'password')
def _compute_conn_string_full(self):
for record in self:
if record.password:
if '%s' not in record.conn_string:
pwd_string = getattr(
record,
'PWD_STRING_%s' % record.connector.upper(),
record.PWD_STRING,
)
record.conn_string += pwd_string
record.conn_string_full = record.conn_string % record.password
else:
record.conn_string_full = record.conn_string
self.ensure_one()
# Get dbsource record
# Build the full connection string
connStr = self.conn_string
if self.password:
if '%s' not in self.conn_string:
connStr += ';PWD=%s'
connStr = connStr % self.password
# Try to connect
if self.connector == 'cx_Oracle':
os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.UTF8'
conn = cx_Oracle.connect(connStr)
elif self.connector == 'pyodbc':
conn = pyodbc.connect(connStr)
elif self.connector in ('sqlite', 'mysql', 'mssql'):
conn = sqlalchemy.create_engine(connStr).connect()
elif self.connector == 'fdb':
kwargs = dict([x.split('=') for x in connStr.split(';')])
conn = fdb.connect(**kwargs)
elif self.connector == 'postgresql':
conn = psycopg2.connect(connStr)
return conn
# Interface
@api.multi
def execute(self, sqlquery, sqlparams=None, metadata=False,
context=None):
"""Executes SQL and returns a list of rows.
def change_table(self, name):
""" Change the table that is used for CRUD operations """
self.current_table = name
"sqlparams" can be a dict of values, that can be referenced in
the SQL statement using "%(key)s" or, in the case of Oracle,
@api.multi
def connection_close(self, connection):
""" It closes the connection to the data source.
This method calls adapter method of this same name, suffixed with
the adapter type.
"""
method = self._get_adapter_method('connection_close')
return method(connection)
@api.multi
@contextmanager
def connection_open(self):
""" It provides a context manager for the data source.
This method calls adapter method of this same name, suffixed with
the adapter type.
"""
method = self._get_adapter_method('connection_open')
try:
connection = method()
yield connection
finally:
try:
self.connection_close(connection)
except:
_logger.exception('Connection close failure.')
@api.multi
def execute(
self, query=None, execute_params=None, metadata=False, **kwargs
):
""" Executes a query and returns a list of rows.
"execute_params" can be a dict of values, that can be referenced
in the SQL statement using "%(key)s" or, in the case of Oracle,
":key".
Example:
sqlquery = "select * from mytable where city = %(city)s and
query = "SELECT * FROM mytable WHERE city = %(city)s AND
date > %(dt)s"
params = {'city': 'Lisbon',
'dt': datetime.datetime(2000, 12, 31)}
execute_params = {
'city': 'Lisbon',
'dt': datetime.datetime(2000, 12, 31),
}
If metadata=True, it will instead return a dict containing the
rows list and the columns list, in the format:
@@ -137,33 +152,21 @@ class BaseExternalDbsource(models.Model):
, 'rows': [ (a0, b0, ...), (a1, b1, ...), ...] }
"""
rows, cols = list(), list()
for obj in self:
conn = obj.conn_open()
if obj.connector in ["sqlite", "mysql", "mssql"]:
# using sqlalchemy
cur = conn.execute(sqlquery, sqlparams)
if metadata:
cols = cur.keys()
rows = [r for r in cur]
# Old API compatibility
if not query:
try:
query = kwargs['sqlquery']
except KeyError:
raise TypeError(_('query is a required argument'))
if not execute_params:
try:
execute_params = kwargs['sqlparams']
except KeyError:
pass
elif obj.connector in ["fdb"]:
# using other db connectors
cur = conn.cursor()
for key in sqlparams:
sqlquery = sqlquery.replace('%%(%s)s' % key,
str(sqlparams[key]))
method = self._get_adapter_method('execute')
rows, cols = method(query, execute_params, metadata)
cur.execute(sqlquery)
rows = cur.fetchall()
else:
# using other db connectors
cur = conn.cursor()
cur.execute(sqlquery, sqlparams)
if metadata:
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.close()
if metadata:
return {'cols': cols, 'rows': rows}
else:
@@ -171,18 +174,180 @@ class BaseExternalDbsource(models.Model):
@api.multi
def connection_test(self):
"""Test of connection."""
self.ensure_one()
conn = False
try:
conn = self.conn_open()
except Exception as e:
raise UserError(_("Connection test failed: \
Here is what we got instead:\n %s") % tools.ustr(e))
finally:
if conn:
conn.close()
""" It tests the connection
# TODO: if OK a (wizard) message box should be displayed
raise UserError(_("Connection test succeeded: \
Everything seems properly set up!"))
Raises:
ConnectionSuccessError: On connection success
ConnectionFailedError: On connection failed
"""
for obj in self:
try:
with self.connection_open():
pass
except Exception as e:
raise ConnectionFailedError(_(
"Connection test failed:\n"
"Here is what we got instead:\n%s"
) % tools.ustr(e))
raise ConnectionSuccessError(_(
"Connection test succeeded:\n"
"Everything seems properly set up!",
))
@api.multi
def remote_browse(self, record_ids, *args, **kwargs):
""" It browses for and returns the records from remote by ID
This method calls adapter method of this same name, suffixed with
the adapter type.
Args:
record_ids: (list) List of remote IDs to browse.
*args: Positional arguments to be passed to adapter method.
**kwargs: Keyword arguments to be passed to adapter method.
Returns:
(iter) Iterator of record mappings that match the ID.
"""
assert self.current_table
method = self._get_adapter_method('remote_browse')
return method(record_ids, *args, **kwargs)
@api.multi
def remote_create(self, vals, *args, **kwargs):
""" It creates a record on the remote data source.
This method calls adapter method of this same name, suffixed with
the adapter type.
Args:
vals: (dict) Values to use for creation.
*args: Positional arguments to be passed to adapter method.
**kwargs: Keyword arguments to be passed to adapter method.
Returns:
(mapping) A mapping of the record that was created.
"""
assert self.current_table
method = self._get_adapter_method('remote_create')
return method(vals, *args, **kwargs)
@api.multi
def remote_delete(self, record_ids, *args, **kwargs):
""" It deletes records by ID on remote
This method calls adapter method of this same name, suffixed with
the adapter type.
Args:
record_ids: (list) List of remote IDs to delete.
*args: Positional arguments to be passed to adapter method.
**kwargs: Keyword arguments to be passed to adapter method.
Returns:
(iter) Iterator of bools indicating delete status.
"""
assert self.current_table
method = self._get_adapter_method('remote_delete')
return method(record_ids, *args, **kwargs)
@api.multi
def remote_search(self, query, *args, **kwargs):
""" It searches the remote for the query.
This method calls adapter method of this same name, suffixed with
the adapter type.
Args:
query: (mixed) Query domain as required by the adapter.
*args: Positional arguments to be passed to adapter method.
**kwargs: Keyword arguments to be passed to adapter method.
Returns:
(iter) Iterator of record mappings that match query.
"""
assert self.current_table
method = self._get_adapter_method('remote_search')
return method(query, *args, **kwargs)
@api.multi
def remote_update(self, record_ids, vals, *args, **kwargs):
""" It updates the remote records with the vals
This method calls adapter method of this same name, suffixed with
the adapter type.
Args:
record_ids: (list) List of remote IDs to delete.
*args: Positional arguments to be passed to adapter method.
**kwargs: Keyword arguments to be passed to adapter method.
Returns:
(iter) Iterator of record mappings that were updated.
"""
assert self.current_table
method = self._get_adapter_method('remote_update')
return method(record_ids, vals, *args, **kwargs)
# Adapters
def connection_close_postgresql(self, connection):
return connection.close()
def connection_open_postgresql(self):
return psycopg2.connect(self.conn_string)
def execute_postgresql(self, query, params, metadata):
return self._execute_generic(query, params, metadata)
def _execute_generic(self, query, params, metadata):
with self.connection_open() as connection:
cur = connection.cursor()
cur.execute(query, params)
cols = []
if metadata:
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
return rows, cols
# Compatibility & Private
@api.multi
def conn_open(self):
""" It opens and returns a connection to the remote data source.
This method calls adapter method of this same name, suffixed with
the adapter type.
Deprecate:
This method has been replaced with ``connection_open``.
"""
with self.connection_open() as connection:
return connection
def _get_adapter_method(self, method_prefix):
""" It returns the connector adapter method for ``method_prefix``.
Args:
method_prefix: (str) Prefix of adapter method (such as
``connection_open``).
Raises:
NotImplementedError: When the method is not found
Returns:
(instancemethod)
"""
self.ensure_one()
method = '%s_%s' % (method_prefix, self.connector)
try:
return getattr(self, method)
except AttributeError:
raise NotImplementedError(_(
'"%s" method not found, check that all assets are installed '
'for the %s connector type.'
)) % (
method, self.connector,
)

View File

@@ -1,3 +1,3 @@
# -*- encoding: utf-8 -*-
from . import test_create_dbsource
from . import test_base_external_dbsource

View File

@@ -0,0 +1,247 @@
# -*- coding: utf-8 -*-
# Copyright 2016 LasLabs Inc.
import mock
from odoo.tests import common
from ..exceptions import ConnectionFailedError, ConnectionSuccessError
class TestBaseExternalDbsource(common.TransactionCase):
def setUp(self):
super(TestBaseExternalDbsource, self).setUp()
self.dbsource = self.env.ref('base_external_dbsource.demo_postgre')
def _test_adapter_method(
self, method_name, side_effect=None, return_value=None,
create=False, args=None, kwargs=None,
):
if args is None:
args = []
if kwargs is None:
kwargs = {}
adapter = '%s_postgresql' % method_name
with mock.patch.object(self.dbsource,
adapter, create=create) as adapter:
if side_effect is not None:
adapter.side_effect = side_effect
elif return_value is not None:
adapter.return_value = return_value
res = getattr(self.dbsource, method_name)(*args, **kwargs)
return res, adapter
def test_conn_string_full(self):
""" It should add password if string interpolation not detected """
self.dbsource.conn_string = 'User=Derp;'
self.dbsource.password = 'password'
expect = self.dbsource.conn_string + 'PWD=%s;' % self.dbsource.password
self.assertEqual(
self.dbsource.conn_string_full, expect,
)
# Interface
def test_connection_success(self):
""" It should raise for successful connection """
with self.assertRaises(ConnectionSuccessError):
self.dbsource.connection_test()
def test_connection_fail(self):
""" It should raise for failed/invalid connection """
with mock.patch.object(self.dbsource, 'connection_open') as conn:
conn.side_effect = Exception
with self.assertRaises(ConnectionFailedError):
self.dbsource.connection_test()
def test_connection_open_calls_close(self):
""" It should close connection after context ends """
with mock.patch.object(
self.dbsource, 'connection_close',
) as close:
with self.dbsource.connection_open():
pass
close.assert_called_once()
def test_connection_close(self):
""" It should call adapter's close method """
args = [mock.MagicMock()]
res, adapter = self._test_adapter_method(
'connection_close', args=args,
)
adapter.assert_called_once_with(args[0])
def test_execute_asserts_query_arg(self):
""" It should raise a TypeError if query and sqlquery not in args """
with self.assertRaises(TypeError):
self.dbsource.execute()
def test_execute_calls_adapter(self):
""" It should call the adapter methods with proper args """
expect = ('query', 'execute', 'metadata')
return_value = 'rows', 'cols'
res, adapter = self._test_adapter_method(
'execute', args=expect, return_value=return_value,
)
adapter.assert_called_once_with(*expect)
def test_execute_return(self):
""" It should return rows if not metadata """
expect = (True, True, False)
return_value = 'rows', 'cols'
res, adapter = self._test_adapter_method(
'execute', args=expect, return_value=return_value,
)
self.assertEqual(res, return_value[0])
def test_execute_return_metadata(self):
""" It should return rows and cols if metadata """
expect = (True, True, True)
return_value = 'rows', 'cols'
res, adapter = self._test_adapter_method(
'execute', args=expect, return_value=return_value,
)
self.assertEqual(
res,
{'rows': return_value[0],
'cols': return_value[1]},
)
def test_remote_browse(self):
""" It should call the adapter method with proper args """
args = [1], 'args'
kwargs = {'kwargs': True}
self.dbsource.current_table = 'table'
res, adapter = self._test_adapter_method(
'remote_browse', create=True, args=args, kwargs=kwargs,
)
adapter.assert_called_once_with(*args, **kwargs)
self.assertEqual(res, adapter())
def test_remote_browse_asserts_current_table(self):
""" It should raise AssertionError if a table not selected """
args = [1], 'args'
kwargs = {'kwargs': True}
with self.assertRaises(AssertionError):
res, adapter = self._test_adapter_method(
'remote_browse', create=True, args=args, kwargs=kwargs,
)
def test_remote_create(self):
""" It should call the adapter method with proper args """
args = {'val': 'Value'}, 'args'
kwargs = {'kwargs': True}
self.dbsource.current_table = 'table'
res, adapter = self._test_adapter_method(
'remote_create', create=True, args=args, kwargs=kwargs,
)
adapter.assert_called_once_with(*args, **kwargs)
self.assertEqual(res, adapter())
def test_remote_create_asserts_current_table(self):
""" It should raise AssertionError if a table not selected """
args = [1], 'args'
kwargs = {'kwargs': True}
with self.assertRaises(AssertionError):
res, adapter = self._test_adapter_method(
'remote_create', create=True, args=args, kwargs=kwargs,
)
def test_remote_delete(self):
""" It should call the adapter method with proper args """
args = [1], 'args'
kwargs = {'kwargs': True}
self.dbsource.current_table = 'table'
res, adapter = self._test_adapter_method(
'remote_delete', create=True, args=args, kwargs=kwargs,
)
adapter.assert_called_once_with(*args, **kwargs)
self.assertEqual(res, adapter())
def test_remote_delete_asserts_current_table(self):
""" It should raise AssertionError if a table not selected """
args = [1], 'args'
kwargs = {'kwargs': True}
with self.assertRaises(AssertionError):
res, adapter = self._test_adapter_method(
'remote_delete', create=True, args=args, kwargs=kwargs,
)
def test_remote_search(self):
""" It should call the adapter method with proper args """
args = {'search': 'query'}, 'args'
kwargs = {'kwargs': True}
self.dbsource.current_table = 'table'
res, adapter = self._test_adapter_method(
'remote_search', create=True, args=args, kwargs=kwargs,
)
adapter.assert_called_once_with(*args, **kwargs)
self.assertEqual(res, adapter())
def test_remote_search_asserts_current_table(self):
""" It should raise AssertionError if a table not selected """
args = [1], 'args'
kwargs = {'kwargs': True}
with self.assertRaises(AssertionError):
res, adapter = self._test_adapter_method(
'remote_search', create=True, args=args, kwargs=kwargs,
)
def test_remote_update(self):
""" It should call the adapter method with proper args """
args = [1], {'vals': 'Value'}, 'args'
kwargs = {'kwargs': True}
self.dbsource.current_table = 'table'
res, adapter = self._test_adapter_method(
'remote_update', create=True, args=args, kwargs=kwargs,
)
adapter.assert_called_once_with(*args, **kwargs)
self.assertEqual(res, adapter())
def test_remote_update_asserts_current_table(self):
""" It should raise AssertionError if a table not selected """
args = [1], 'args'
kwargs = {'kwargs': True}
with self.assertRaises(AssertionError):
res, adapter = self._test_adapter_method(
'remote_update', create=True, args=args, kwargs=kwargs,
)
# Postgres
def test_execute_postgresql(self):
""" It should call generic executor with proper args """
expect = ('query', 'execute', 'metadata')
with mock.patch.object(
self.dbsource, '_execute_generic', autospec=True,
) as execute:
execute.return_value = 'rows', 'cols'
self.dbsource.execute(*expect)
execute.assert_called_once_with(*expect)
# Old API Compat
def test_execute_calls_adapter_old_api(self):
""" It should call the adapter correctly if old kwargs provided """
expect = [None, None, 'metadata']
with mock.patch.object(
self.dbsource, 'execute_postgresql', autospec=True,
) as psql:
psql.return_value = 'rows', 'cols'
self.dbsource.execute(
*expect, sqlparams='params', sqlquery='query'
)
expect[0], expect[1] = 'query', 'params'
psql.assert_called_once_with(*expect)
def test_conn_open(self):
""" It should return open connection for use """
with mock.patch.object(
self.dbsource, 'connection_open', autospec=True,
) as connection:
res = self.dbsource.conn_open()
self.assertEqual(
res,
connection().__enter__(),
)

View File

@@ -1,66 +0,0 @@
# -*- coding: utf-8 -*-
from odoo.exceptions import Warning as UserError
from odoo.tests import common
import logging
class TestCreateDbsource(common.TransactionCase):
"""Test class for base_external_dbsource."""
def test_create_dbsource(self):
"""source creation should succeed."""
dbsource = self.env.ref('base_external_dbsource.demo_postgre')
try:
dbsource.connection_test()
except UserError as e:
logging.warning("Log = " + str(e))
self.assertTrue(u'Everything seems properly set up!' in str(e))
def test_create_dbsource_failed(self):
"""source creation without connection string should failed."""
dbsource = self.env.ref('base_external_dbsource.demo_postgre')
# Connection without connection_string
dbsource.conn_string = ""
try:
dbsource.connection_test()
except UserError as e:
logging.warning("Log = " + str(e))
self.assertTrue(u'Here is what we got instead:' in str(e))
def test_create_dbsource_without_connector_failed(self):
"""source creation with other connector should failed."""
dbsource = self.env.ref('base_external_dbsource.demo_postgre')
# Connection to mysql
try:
dbsource.connector = "mysql"
dbsource.connection_test()
except ValueError as e:
logging.warning("Log = " + str(e))
self.assertTrue(u'Wrong value for' in str(e))
# Connection to mysql
try:
dbsource.connector = "pyodbc"
dbsource.connection_test()
except ValueError as e:
logging.warning("Log = " + str(e))
self.assertTrue(u'Wrong value for' in str(e))
# Connection to oracle
try:
dbsource.connector = "cx_Oracle"
dbsource.connection_test()
except ValueError as e:
logging.warning("Log = " + str(e))
self.assertTrue(u'Wrong value for' in str(e))
# Connection to firebird
try:
dbsource.connector = "fdb"
dbsource.connection_test()
except Exception as e:
logging.warning("Log = " + str(e))
self.assertTrue(u'Wrong value for' in str(e))