mirror of
https://gitee.com/Lyon1998/pikapython.git
synced 2025-01-22 17:12:55 +08:00
219 lines
8.4 KiB
Python
219 lines
8.4 KiB
Python
|
# SPDX-FileCopyrightText: 2003 Python Software Foundation
|
||
|
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
|
||
|
# SPDX-FileCopyrightText: Copyright (c) 2021 Alec Delaney
|
||
|
#
|
||
|
# SPDX-License-Identifier: MIT
|
||
|
# SPDX-License-Identifier: PSF-2.0
|
||
|
# SPDX-License-Identifier: 0BSD
|
||
|
|
||
|
"""
|
||
|
|
||
|
`pikapython_csv`
|
||
|
================================================================================
|
||
|
PikaPython helper library for working with CSV files
|
||
|
* Author(s): Alec Delaney
|
||
|
* Author(s): Lyon
|
||
|
|
||
|
"""
|
||
|
|
||
|
import re
|
||
|
|
||
|
|
||
|
class reader:
|
||
|
"""Basic CSV reader class that behaves like CPython's ``csv.reader()``
|
||
|
:param csvfile: The open file to read from
|
||
|
:type csvfile: io.TextIOWrapper
|
||
|
:param str delimiter: (Optional) The CSV delimiter, default is comma (,)
|
||
|
:param str quotechar: (Optional) The CSV quote character for encapsulating special characters
|
||
|
including the delimiter, default is double quotation mark (")
|
||
|
"""
|
||
|
|
||
|
def __init__(self, csvfile, delimiter=",", quotechar='"') -> None:
|
||
|
|
||
|
self.file_interator = csvfile
|
||
|
self.delimiter = delimiter
|
||
|
self.quotechar = quotechar
|
||
|
self._re_exp = "(\\{0}.+?\\{0}),|([^{1}]+)".format(quotechar, delimiter)
|
||
|
|
||
|
def __iter__(self):
|
||
|
return self
|
||
|
|
||
|
def __next__(self):
|
||
|
csv_value_list = []
|
||
|
row_string = self.file_interator.__next__()
|
||
|
|
||
|
while len(row_string) != 0:
|
||
|
if row_string.startswith(self.delimiter):
|
||
|
csv_value_list.append("")
|
||
|
row_string = row_string[1:]
|
||
|
continue
|
||
|
|
||
|
next_match = re.match(self._re_exp, row_string)
|
||
|
matches = next_match.groups()
|
||
|
if matches[0] is None:
|
||
|
latest_match = matches[1].strip("\r\n").strip("\n")
|
||
|
csv_value_list.append(latest_match.replace(self.quotechar * 2, self.quotechar))
|
||
|
else:
|
||
|
latest_match = matches[0].strip("\r\n").strip("\n")
|
||
|
csv_value_list.append(
|
||
|
latest_match[1:-1].replace(self.quotechar * 2, self.quotechar)
|
||
|
)
|
||
|
|
||
|
if len(row_string) != 0: # If anything is left in the list...
|
||
|
row_string = row_string[len(latest_match) :]
|
||
|
if row_string == self.delimiter:
|
||
|
csv_value_list.append("")
|
||
|
row_string = row_string[1:]
|
||
|
elif row_string == "\r\n" or row_string == "n":
|
||
|
row_string = ""
|
||
|
row_string = row_string[1:]
|
||
|
|
||
|
return csv_value_list
|
||
|
|
||
|
|
||
|
class writer:
|
||
|
"""Basic CSV writer class that behaves like CPython's ``csv.writer()``
|
||
|
:param csvfile: The open CSVfile to write to
|
||
|
:type csvfile: io.TextIOWrapper
|
||
|
:param str delimiter: (Optional) The CSV delimiter, default is comma (,)
|
||
|
:param str quotechar: (Optional) The CSV quote character for encapsulating special characters
|
||
|
including the delimiter, default is double quotation mark (")
|
||
|
"""
|
||
|
|
||
|
def __init__(self, csvfile, delimiter=",", quoterchar='"'):
|
||
|
|
||
|
self.file_iterator = csvfile
|
||
|
self.delimiter = delimiter
|
||
|
self.quotechar = quoterchar
|
||
|
self.newlinechar = "\r\n"
|
||
|
|
||
|
def writerow(self, seq):
|
||
|
"""Write a row to the CSV file
|
||
|
:param seq: The list of values to write, which must all be str or be able to
|
||
|
be cast to str
|
||
|
:type seq: Sequence[Any]
|
||
|
"""
|
||
|
|
||
|
str_seq = [str(entry) for entry in seq]
|
||
|
doub_quote_seq = [entry.replace(self.quotechar, self.quotechar * 2) for entry in str_seq]
|
||
|
quoted_seq = [self._apply_quotes(entry) for entry in doub_quote_seq]
|
||
|
parsed_str = (self.delimiter).join(quoted_seq)
|
||
|
self.file_iterator.write(parsed_str + self.newlinechar)
|
||
|
|
||
|
def writerows(self, rows):
|
||
|
"""Write multiple rows to the CSV file
|
||
|
:param rows: An iterable item that yields multiple rows to write (e.g., list)
|
||
|
:type rows: Iterable[Sequence[Any]]
|
||
|
"""
|
||
|
for row in rows:
|
||
|
self.writerow(row)
|
||
|
|
||
|
def _apply_quotes(self, entry):
|
||
|
"""Apply the quote character to entries as necessary
|
||
|
:param str entry: The entry to add the quote charcter to, if needed
|
||
|
"""
|
||
|
|
||
|
return (self.quotechar + entry + self.quotechar) if self.delimiter in entry else entry
|
||
|
|
||
|
|
||
|
# Ported from CPython's csv.py:
|
||
|
class DictReader:
|
||
|
"""CSV reader that maps rows to a dict according to given or inferred fieldnames,
|
||
|
it also accepts the delimiter and quotechar keywords
|
||
|
:param f: The open file to read from
|
||
|
:type f: io.TextIOWrapper
|
||
|
:param fieldnames: (Optional) The fieldnames for each of the columns, if none is given,
|
||
|
it will default to the whatever is in the first row of the CSV file
|
||
|
:type fieldnames: Sequence[str]
|
||
|
:param str restkey: (Optional) A key name for values that have no key (row is larger than
|
||
|
the length of fieldnames), default is None
|
||
|
:param restval: (Optional) A default value for keys that have no values (row is small
|
||
|
than the length of fieldnames, default is None
|
||
|
:type restval: Any
|
||
|
"""
|
||
|
|
||
|
def __init__(self, f, fieldnames=None, restkey=None, restval=None, **kwargs):
|
||
|
|
||
|
self.fieldnames = fieldnames
|
||
|
self.restkey = restkey
|
||
|
self.restval = restval
|
||
|
self.reader = reader(f, **kwargs)
|
||
|
self.line_num = 0
|
||
|
|
||
|
def __iter__(self):
|
||
|
return self
|
||
|
|
||
|
def __next__(self):
|
||
|
if self.line_num == 0:
|
||
|
if self.fieldnames is None:
|
||
|
self.fieldnames = next(self.reader)
|
||
|
row = next(self.reader)
|
||
|
|
||
|
row_dict = dict(zip(self.fieldnames, row))
|
||
|
length_fn = len(self.fieldnames)
|
||
|
length_row = len(row)
|
||
|
if length_fn < length_row:
|
||
|
row_dict[self.restkey] = row[length_fn:]
|
||
|
elif length_fn > length_row:
|
||
|
for key in self.fieldnames[length_row:]:
|
||
|
row_dict[key] = self.restval
|
||
|
self.line_num += 1
|
||
|
return row_dict
|
||
|
|
||
|
|
||
|
# Ported from CPython's csv.py
|
||
|
class DictWriter:
|
||
|
"""CSV writer that uses a dict to write the rows according fieldnames, it also accepts the
|
||
|
delimiter and quotechar keywords
|
||
|
:param f: The open file to write to
|
||
|
:type f: io.TextIOWrapper
|
||
|
:param fieldnames: The fieldnames for each of the comlumns
|
||
|
:type fieldnames: Sequence[str]
|
||
|
:param str restval: A default value for keys that have no values
|
||
|
:param str extrasaction: The action to perform if a key is encountered when parsing the dict
|
||
|
that is not included in the fieldnames parameter, either "raise" or "ignore". Ignore
|
||
|
raises a ValueError, and "ignore" simply ignore that key/value pair. Default behavior
|
||
|
is "raise"
|
||
|
"""
|
||
|
|
||
|
def __init__(self, f, fieldnames, restval="", extrasaction="raise", **kwargs):
|
||
|
self.fieldnames = fieldnames # list of keys for the dict
|
||
|
self.restval = restval # for writing short dicts
|
||
|
if extrasaction.lower() not in ("raise", "ignore"):
|
||
|
raise ValueError("extrasaction " "(%s)" " must be 'raise' or 'ignore'" % extrasaction)
|
||
|
self.extrasaction = extrasaction
|
||
|
self.writer = writer(f, **kwargs)
|
||
|
|
||
|
def writeheader(self):
|
||
|
"""Writes the header row to the CSV file"""
|
||
|
self.writerow(dict(zip(self.fieldnames, self.fieldnames)))
|
||
|
|
||
|
def _dict_to_tuple(self, rowdict):
|
||
|
if self.extrasaction == "raise":
|
||
|
wrong_fields = []
|
||
|
for field in rowdict.keys():
|
||
|
if field not in self.fieldnames:
|
||
|
wrong_fields.append(field)
|
||
|
if wrong_fields:
|
||
|
raise ValueError(
|
||
|
"dict contains fields not in fieldnames: "
|
||
|
+ ", ".join([repr(x) for x in wrong_fields])
|
||
|
)
|
||
|
return (rowdict.get(key, self.restval) for key in self.fieldnames)
|
||
|
|
||
|
def writerow(self, rowdict):
|
||
|
"""Writes a row to the CSV file
|
||
|
:param rowdict: The row to write as a dict, with keys of the DictWriter's
|
||
|
fieldnames parameter; values must be str or be able to be cast to str
|
||
|
:type rowdict: Dict[str, Any]
|
||
|
"""
|
||
|
return self.writer.writerow(self._dict_to_tuple(rowdict))
|
||
|
|
||
|
def writerows(self, rowdicts):
|
||
|
"""Writes multiple rows to the CSV files
|
||
|
:param rowdicts: An iterable item that yields multiple rows to write;
|
||
|
values in those rows must be str or be able to be cast to str
|
||
|
:type rowdicts: Iterable[Dict[str, Any]]
|
||
|
"""
|
||
|
return self.writer.writerows(map(self._dict_to_tuple, rowdicts))
|