Fix missing encoding with open() (#53593)
* Fix missing encoding with open() * Fix tests * Improve open - frontend
This commit is contained in:
parent
1c20eb3263
commit
10bfc78365
21 changed files with 49 additions and 39 deletions
|
@ -146,8 +146,8 @@ def daemonize() -> None:
|
|||
|
||||
# redirect standard file descriptors to devnull
|
||||
# pylint: disable=consider-using-with
|
||||
infd = open(os.devnull)
|
||||
outfd = open(os.devnull, "a+")
|
||||
infd = open(os.devnull, encoding="utf8")
|
||||
outfd = open(os.devnull, "a+", encoding="utf8")
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
os.dup2(infd.fileno(), sys.stdin.fileno())
|
||||
|
@ -159,7 +159,7 @@ def check_pid(pid_file: str) -> None:
|
|||
"""Check that Home Assistant is not already running."""
|
||||
# Check pid file
|
||||
try:
|
||||
with open(pid_file) as file:
|
||||
with open(pid_file, encoding="utf8") as file:
|
||||
pid = int(file.readline())
|
||||
except OSError:
|
||||
# PID File does not exist
|
||||
|
@ -182,7 +182,7 @@ def write_pid(pid_file: str) -> None:
|
|||
"""Create a PID File."""
|
||||
pid = os.getpid()
|
||||
try:
|
||||
with open(pid_file, "w") as file:
|
||||
with open(pid_file, "w", encoding="utf8") as file:
|
||||
file.write(str(pid))
|
||||
except OSError:
|
||||
print(f"Fatal Error: Unable to write pid file {pid_file}")
|
||||
|
|
|
@ -184,7 +184,7 @@ class ApnsNotificationService(BaseNotificationService):
|
|||
|
||||
def write_devices(self):
|
||||
"""Write all known devices to file."""
|
||||
with open(self.yaml_path, "w+") as out:
|
||||
with open(self.yaml_path, "w+", encoding="utf8") as out:
|
||||
for device in self.devices.values():
|
||||
_write_device(out, device)
|
||||
|
||||
|
@ -202,7 +202,7 @@ class ApnsNotificationService(BaseNotificationService):
|
|||
|
||||
if current_device is None:
|
||||
self.devices[push_id] = device
|
||||
with open(self.yaml_path, "a") as out:
|
||||
with open(self.yaml_path, "a", encoding="utf8") as out:
|
||||
_write_device(out, device)
|
||||
return True
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ HOST_SCHEMA = vol.Schema(
|
|||
def write_tls_asset(hass: core.HomeAssistant, filename: str, asset: bytes) -> None:
|
||||
"""Write the tls assets to disk."""
|
||||
makedirs(hass.config.path(DOMAIN), exist_ok=True)
|
||||
with open(hass.config.path(DOMAIN, filename), "w") as file_handle:
|
||||
with open(hass.config.path(DOMAIN, filename), "w", encoding="utf8") as file_handle:
|
||||
file_handle.write(asset.decode("utf-8"))
|
||||
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ class ZWaveLogView(HomeAssistantView):
|
|||
def _get_log(self, hass, lines):
|
||||
"""Retrieve the logfile content."""
|
||||
logfilepath = hass.config.path(OZW_LOG_FILENAME)
|
||||
with open(logfilepath) as logfile:
|
||||
with open(logfilepath, encoding="utf8") as logfile:
|
||||
data = (line.rstrip() for line in logfile)
|
||||
if lines == 0:
|
||||
loglines = list(data)
|
||||
|
|
|
@ -898,7 +898,7 @@ async def async_load_config(
|
|||
|
||||
def update_config(path: str, dev_id: str, device: Device) -> None:
|
||||
"""Add device to YAML configuration file."""
|
||||
with open(path, "a") as out:
|
||||
with open(path, "a", encoding="utf8") as out:
|
||||
device_config = {
|
||||
device.dev_id: {
|
||||
ATTR_NAME: device.name,
|
||||
|
|
|
@ -41,7 +41,7 @@ class FileNotificationService(BaseNotificationService):
|
|||
|
||||
def send_message(self, message="", **kwargs):
|
||||
"""Send a message to a file."""
|
||||
with open(self.filepath, "a") as file:
|
||||
with open(self.filepath, "a", encoding="utf8") as file:
|
||||
if os.stat(self.filepath).st_size == 0:
|
||||
title = f"{kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)} notifications (Log started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n"
|
||||
file.write(title)
|
||||
|
|
|
@ -569,7 +569,9 @@ class IndexView(web_urldispatcher.AbstractResource):
|
|||
"""Get template."""
|
||||
tpl = self._template_cache
|
||||
if tpl is None:
|
||||
with open(str(_frontend_root(self.repo_path) / "index.html")) as file:
|
||||
with (_frontend_root(self.repo_path) / "index.html").open(
|
||||
encoding="utf8"
|
||||
) as file:
|
||||
tpl = jinja2.Template(file.read())
|
||||
|
||||
# Cache template if not running from repository
|
||||
|
|
|
@ -244,7 +244,7 @@ def setup(hass, config):
|
|||
|
||||
def check_correct_scopes(token_file, config):
|
||||
"""Check for the correct scopes in file."""
|
||||
with open(token_file) as tokenfile:
|
||||
with open(token_file, encoding="utf8") as tokenfile:
|
||||
contents = tokenfile.read()
|
||||
|
||||
# Check for quoted scope as our scopes can be subsets of other scopes
|
||||
|
@ -408,7 +408,7 @@ def load_config(path):
|
|||
"""Load the google_calendar_devices.yaml."""
|
||||
calendars = {}
|
||||
try:
|
||||
with open(path) as file:
|
||||
with open(path, encoding="utf8") as file:
|
||||
data = yaml.safe_load(file)
|
||||
for calendar in data:
|
||||
try:
|
||||
|
@ -425,6 +425,6 @@ def load_config(path):
|
|||
|
||||
def update_config(path, calendar):
|
||||
"""Write the google_calendar_devices.yaml."""
|
||||
with open(path, "a") as out:
|
||||
with open(path, "a", encoding="utf8") as out:
|
||||
out.write("\n")
|
||||
yaml.dump([calendar], out, default_flow_style=False)
|
||||
|
|
|
@ -35,7 +35,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
|
|||
tokenfile = hass.config.path(".greenwave")
|
||||
if config.get(CONF_VERSION) == 3:
|
||||
if os.path.exists(tokenfile):
|
||||
with open(tokenfile) as tokenfile:
|
||||
with open(tokenfile, encoding="utf8") as tokenfile:
|
||||
token = tokenfile.read()
|
||||
else:
|
||||
try:
|
||||
|
@ -43,7 +43,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
|
|||
except PermissionError:
|
||||
_LOGGER.error("The Gateway Is Not In Sync Mode")
|
||||
raise
|
||||
with open(tokenfile, "w+") as tokenfile:
|
||||
with open(tokenfile, "w+", encoding="utf8") as tokenfile:
|
||||
tokenfile.write(token)
|
||||
else:
|
||||
token = None
|
||||
|
|
|
@ -217,7 +217,7 @@ async def async_load_ip_bans_config(hass: HomeAssistant, path: str) -> list[IpBa
|
|||
|
||||
def update_ip_bans_config(path: str, ip_ban: IpBan) -> None:
|
||||
"""Update config file with new banned IP address."""
|
||||
with open(path, "a") as out:
|
||||
with open(path, "a", encoding="utf8") as out:
|
||||
ip_ = {str(ip_ban.ip_address): {ATTR_BANNED_AT: ip_ban.banned_at.isoformat()}}
|
||||
out.write("\n")
|
||||
out.write(yaml.dump(ip_))
|
||||
|
|
|
@ -78,7 +78,7 @@ def load_codes(path):
|
|||
"""Load KIRA codes from specified file."""
|
||||
codes = []
|
||||
if os.path.exists(path):
|
||||
with open(path) as code_file:
|
||||
with open(path, encoding="utf8") as code_file:
|
||||
data = yaml.safe_load(code_file) or []
|
||||
for code in data:
|
||||
try:
|
||||
|
@ -87,7 +87,7 @@ def load_codes(path):
|
|||
# keep going
|
||||
_LOGGER.warning("KIRA code invalid data: %s", exception)
|
||||
else:
|
||||
with open(path, "w") as code_file:
|
||||
with open(path, "w", encoding="utf8") as code_file:
|
||||
code_file.write("")
|
||||
return codes
|
||||
|
||||
|
|
|
@ -586,7 +586,7 @@ class Profiles:
|
|||
for profile_path in profile_paths:
|
||||
if not os.path.isfile(profile_path):
|
||||
continue
|
||||
with open(profile_path) as inp:
|
||||
with open(profile_path, encoding="utf8") as inp:
|
||||
reader = csv.reader(inp)
|
||||
|
||||
# Skip the header
|
||||
|
|
|
@ -137,7 +137,9 @@ class LutronCasetaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
def _write_tls_assets(self, assets):
|
||||
"""Write the tls assets to disk."""
|
||||
for asset_key, conf_key in FILE_MAPPING.items():
|
||||
with open(self.hass.config.path(self.data[conf_key]), "w") as file_handle:
|
||||
with open(
|
||||
self.hass.config.path(self.data[conf_key]), "w", encoding="utf8"
|
||||
) as file_handle:
|
||||
file_handle.write(assets[asset_key])
|
||||
|
||||
def _tls_assets_exist(self):
|
||||
|
|
|
@ -522,7 +522,7 @@ async def async_setup_entry(hass, entry):
|
|||
unsub = await async_subscribe(hass, call.data["topic"], collect_msg)
|
||||
|
||||
def write_dump():
|
||||
with open(hass.config.path("mqtt_dump.txt"), "wt") as fp:
|
||||
with open(hass.config.path("mqtt_dump.txt"), "wt", encoding="utf8") as fp:
|
||||
for msg in messages:
|
||||
fp.write(",".join(msg) + "\n")
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ def execute_script(hass, name, data=None):
|
|||
"""Execute a script."""
|
||||
filename = f"{name}.py"
|
||||
raise_if_invalid_filename(filename)
|
||||
with open(hass.config.path(FOLDER, filename)) as fil:
|
||||
with open(hass.config.path(FOLDER, filename), encoding="utf8") as fil:
|
||||
source = fil.read()
|
||||
execute(hass, filename, source, data)
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ class RememberTheMilkConfiguration:
|
|||
return
|
||||
try:
|
||||
_LOGGER.debug("Loading configuration from file: %s", self._config_file_path)
|
||||
with open(self._config_file_path) as config_file:
|
||||
with open(self._config_file_path, encoding="utf8") as config_file:
|
||||
self._config = json.load(config_file)
|
||||
except ValueError:
|
||||
_LOGGER.error(
|
||||
|
@ -174,7 +174,7 @@ class RememberTheMilkConfiguration:
|
|||
|
||||
def save_config(self):
|
||||
"""Write the configuration to a file."""
|
||||
with open(self._config_file_path, "w") as config_file:
|
||||
with open(self._config_file_path, "w", encoding="utf8") as config_file:
|
||||
json.dump(self._config, config_file)
|
||||
|
||||
def get_token(self, profile_name):
|
||||
|
|
|
@ -112,7 +112,7 @@ def convert_client_keys(config_file):
|
|||
return
|
||||
|
||||
# Try to parse the file as being JSON
|
||||
with open(config_file) as json_file:
|
||||
with open(config_file, encoding="utf8") as json_file:
|
||||
try:
|
||||
json_conf = json.load(json_file)
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
|
|
|
@ -290,30 +290,30 @@ def _write_default_config(config_dir: str) -> bool:
|
|||
# Writing files with YAML does not create the most human readable results
|
||||
# So we're hard coding a YAML template.
|
||||
try:
|
||||
with open(config_path, "wt") as config_file:
|
||||
with open(config_path, "wt", encoding="utf8") as config_file:
|
||||
config_file.write(DEFAULT_CONFIG)
|
||||
|
||||
if not os.path.isfile(secret_path):
|
||||
with open(secret_path, "wt") as secret_file:
|
||||
with open(secret_path, "wt", encoding="utf8") as secret_file:
|
||||
secret_file.write(DEFAULT_SECRETS)
|
||||
|
||||
with open(version_path, "wt") as version_file:
|
||||
with open(version_path, "wt", encoding="utf8") as version_file:
|
||||
version_file.write(__version__)
|
||||
|
||||
if not os.path.isfile(group_yaml_path):
|
||||
with open(group_yaml_path, "wt"):
|
||||
with open(group_yaml_path, "wt", encoding="utf8"):
|
||||
pass
|
||||
|
||||
if not os.path.isfile(automation_yaml_path):
|
||||
with open(automation_yaml_path, "wt") as automation_file:
|
||||
with open(automation_yaml_path, "wt", encoding="utf8") as automation_file:
|
||||
automation_file.write("[]")
|
||||
|
||||
if not os.path.isfile(script_yaml_path):
|
||||
with open(script_yaml_path, "wt"):
|
||||
with open(script_yaml_path, "wt", encoding="utf8"):
|
||||
pass
|
||||
|
||||
if not os.path.isfile(scene_yaml_path):
|
||||
with open(scene_yaml_path, "wt"):
|
||||
with open(scene_yaml_path, "wt", encoding="utf8"):
|
||||
pass
|
||||
|
||||
return True
|
||||
|
@ -379,7 +379,7 @@ def process_ha_config_upgrade(hass: HomeAssistant) -> None:
|
|||
version_path = hass.config.path(VERSION_FILE)
|
||||
|
||||
try:
|
||||
with open(version_path) as inp:
|
||||
with open(version_path, encoding="utf8") as inp:
|
||||
conf_version = inp.readline().strip()
|
||||
except FileNotFoundError:
|
||||
# Last version to not have this file
|
||||
|
@ -423,7 +423,7 @@ def process_ha_config_upgrade(hass: HomeAssistant) -> None:
|
|||
if os.path.isdir(lib_path):
|
||||
shutil.rmtree(lib_path)
|
||||
|
||||
with open(version_path, "wt") as outp:
|
||||
with open(version_path, "wt", encoding="utf8") as outp:
|
||||
outp.write(__version__)
|
||||
|
||||
|
||||
|
|
|
@ -617,9 +617,13 @@ async def test_tls_assets_writer(hass):
|
|||
}
|
||||
with patch("os.mkdir"), patch("builtins.open", mock_open()) as mocked_file:
|
||||
write_tls_asset(hass, CONF_SHC_CERT, assets["cert"])
|
||||
mocked_file.assert_called_with(hass.config.path(DOMAIN, CONF_SHC_CERT), "w")
|
||||
mocked_file.assert_called_with(
|
||||
hass.config.path(DOMAIN, CONF_SHC_CERT), "w", encoding="utf8"
|
||||
)
|
||||
mocked_file().write.assert_called_with("content_cert")
|
||||
|
||||
write_tls_asset(hass, CONF_SHC_KEY, assets["key"])
|
||||
mocked_file.assert_called_with(hass.config.path(DOMAIN, CONF_SHC_KEY), "w")
|
||||
mocked_file.assert_called_with(
|
||||
hass.config.path(DOMAIN, CONF_SHC_KEY), "w", encoding="utf8"
|
||||
)
|
||||
mocked_file().write.assert_called_with("content_key")
|
||||
|
|
|
@ -63,7 +63,7 @@ async def test_notify_file(hass, timestamp):
|
|||
|
||||
full_filename = os.path.join(hass.config.path(), filename)
|
||||
assert m_open.call_count == 1
|
||||
assert m_open.call_args == call(full_filename, "a")
|
||||
assert m_open.call_args == call(full_filename, "a", encoding="utf8")
|
||||
|
||||
assert m_open.return_value.write.call_count == 2
|
||||
if not timestamp:
|
||||
|
|
|
@ -166,7 +166,9 @@ async def test_ip_bans_file_creation(hass, aiohttp_client):
|
|||
resp = await client.get("/")
|
||||
assert resp.status == 401
|
||||
assert len(app[KEY_BANNED_IPS]) == len(BANNED_IPS) + 1
|
||||
m_open.assert_called_once_with(hass.config.path(IP_BANS_FILE), "a")
|
||||
m_open.assert_called_once_with(
|
||||
hass.config.path(IP_BANS_FILE), "a", encoding="utf8"
|
||||
)
|
||||
|
||||
resp = await client.get("/")
|
||||
assert resp.status == HTTP_FORBIDDEN
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue