mirror of https://github.com/apache/cassandra
192 lines
7.0 KiB
Python
192 lines
7.0 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
import io
|
|
import os
|
|
import sys
|
|
import pytest
|
|
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
from cqlshlib.authproviderhandling import load_auth_provider
|
|
|
|
|
|
def construct_config_path(config_file_name):
|
|
return os.path.join(os.path.dirname(__file__),
|
|
'test_authproviderhandling_config',
|
|
config_file_name)
|
|
|
|
|
|
# Simple class to help verify AuthProviders that don't need arguments.
|
|
class NoUserNamePlainTextAuthProvider(PlainTextAuthProvider):
|
|
def __init__(self):
|
|
super(NoUserNamePlainTextAuthProvider, self).__init__('', '')
|
|
|
|
|
|
class ComplexTextAuthProvider(PlainTextAuthProvider):
|
|
def __init__(self, username, password='default_pass', extra_flag=None):
|
|
super(ComplexTextAuthProvider, self).__init__(username, password)
|
|
self.extra_flag = extra_flag
|
|
|
|
|
|
def _assert_auth_provider_matches(actual, klass, expected_props):
|
|
"""
|
|
Assert that the provider matches class and properties
|
|
* actual ..........: Thing to compare with it
|
|
* klass ...........: Class to ensure this matches to (ie PlainTextAuthProvider)
|
|
* expected_props ..: Dict of var properties to match
|
|
"""
|
|
assert isinstance(actual, klass)
|
|
assert expected_props == vars(actual)
|
|
|
|
|
|
class CustomAuthProviderTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self._captured_std_err = io.StringIO()
|
|
sys.stderr = self._captured_std_err
|
|
|
|
def tearDown(self):
|
|
self._captured_std_err.close()
|
|
sys.stdout = sys.__stderr__
|
|
|
|
def test_no_warning_insecure_if_no_pass(self):
|
|
load_auth_provider(construct_config_path('plain_text_partial_example'))
|
|
err_msg = self._captured_std_err.getvalue()
|
|
assert err_msg == ''
|
|
|
|
def test_insecure_creds(self):
|
|
load_auth_provider(construct_config_path('full_plain_text_example'))
|
|
err_msg = self._captured_std_err.getvalue()
|
|
assert "Notice:" in err_msg
|
|
assert "Warning:" in err_msg
|
|
|
|
def test_creds_not_checked_for_non_plaintext(self):
|
|
load_auth_provider(construct_config_path('complex_auth_provider_with_pass'))
|
|
err_msg = self._captured_std_err.getvalue()
|
|
assert err_msg == ''
|
|
|
|
def test_partial_property_example(self):
|
|
actual = load_auth_provider(construct_config_path('partial_example'))
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
NoUserNamePlainTextAuthProvider,
|
|
{"username": '',
|
|
"password": ''})
|
|
|
|
def test_full_property_example(self):
|
|
actual = load_auth_provider(construct_config_path('full_plain_text_example'))
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
PlainTextAuthProvider,
|
|
{"username": 'user1',
|
|
"password": 'pass1'})
|
|
|
|
def test_empty_example(self):
|
|
actual = load_auth_provider(construct_config_path('empty_example'))
|
|
assert actual is None
|
|
|
|
def test_plaintextauth_when_not_defined(self):
|
|
creds_file = construct_config_path('plain_text_full_creds')
|
|
actual = load_auth_provider(cred_file=creds_file)
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
PlainTextAuthProvider,
|
|
{"username": 'user2',
|
|
"password": 'pass2'})
|
|
|
|
def test_no_cqlshrc_file(self):
|
|
actual = load_auth_provider()
|
|
assert actual is None
|
|
|
|
def test_no_classname_example(self):
|
|
actual = load_auth_provider(construct_config_path('no_classname_example'))
|
|
assert actual is None
|
|
|
|
def test_improper_config_example(self):
|
|
with pytest.raises(ModuleNotFoundError) as error:
|
|
load_auth_provider(construct_config_path('illegal_example'))
|
|
assert error is not None
|
|
|
|
def test_username_password_passed_from_commandline(self):
|
|
creds_file = construct_config_path('complex_auth_provider_creds')
|
|
cqlshrc = construct_config_path('complex_auth_provider')
|
|
|
|
actual = load_auth_provider(cqlshrc, creds_file, 'user-from-legacy', 'pass-from-legacy')
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
ComplexTextAuthProvider,
|
|
{"username": 'user-from-legacy',
|
|
"password": 'pass-from-legacy',
|
|
"extra_flag": 'flag2'})
|
|
|
|
def test_creds_example(self):
|
|
creds_file = construct_config_path('complex_auth_provider_creds')
|
|
cqlshrc = construct_config_path('complex_auth_provider')
|
|
|
|
actual = load_auth_provider(cqlshrc, creds_file)
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
ComplexTextAuthProvider,
|
|
{"username": 'user1',
|
|
"password": 'pass2',
|
|
"extra_flag": 'flag2'})
|
|
|
|
def test_legacy_example_use_passed_username(self):
|
|
creds_file = construct_config_path('plain_text_partial_creds')
|
|
cqlshrc = construct_config_path('plain_text_partial_example')
|
|
|
|
actual = load_auth_provider(cqlshrc, creds_file, 'user3')
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
PlainTextAuthProvider,
|
|
{"username": 'user3',
|
|
"password": 'pass2'})
|
|
|
|
def test_legacy_example_no_auth_provider_given(self):
|
|
cqlshrc = construct_config_path('empty_example')
|
|
creds_file = construct_config_path('complex_auth_provider_creds')
|
|
|
|
actual = load_auth_provider(cqlshrc, creds_file, 'user3', 'pass3')
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
PlainTextAuthProvider,
|
|
{"username": 'user3',
|
|
"password": 'pass3'})
|
|
|
|
def test_shouldnt_pass_no_password_when_alt_auth_provider(self):
|
|
cqlshrc = construct_config_path('complex_auth_provider')
|
|
creds_file = None
|
|
|
|
actual = load_auth_provider(cqlshrc, creds_file, 'user3')
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
ComplexTextAuthProvider,
|
|
{"username": 'user3',
|
|
"password": 'default_pass',
|
|
"extra_flag": 'flag1'})
|
|
|
|
def test_legacy_example_no_password(self):
|
|
cqlshrc = construct_config_path('plain_text_partial_example')
|
|
creds_file = None
|
|
|
|
actual = load_auth_provider(cqlshrc, creds_file, 'user3')
|
|
_assert_auth_provider_matches(
|
|
actual,
|
|
PlainTextAuthProvider,
|
|
{"username": 'user3',
|
|
"password": None})
|