文件绝对路径
import os file_name = 'a.conf' absolute_path = os.path.abspath(file_name) print(absolute_path) #/opt/tpf/aitpf/cmd/a.conf
import chardet
file_path="E:\\tpf\\aitpf\\bigmodel\\agent\\prompts\\tools\\excel_analyser.txt"
# 检测文件编码格式
def detect_encoding(file_path):
with open(file_path, 'rb') as f:
result = chardet.detect(f.read())
return result['encoding']
# file_path = 'your_file_path'
encoding = detect_encoding(file_path)
print("encoding:",encoding)
# 使用检测到的编码格式打开文件
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
print(content)
|
|
|
|
|
|
|
|
file = open('a.txt','w')
#向文件中输入字符串
file.write('a\n')
file.write('b\n')
file.write('c\n')
file.close()
f=open('a.txt','r')
#读取所有内容
f.read() #'a\nb\nc\n'
f.close()
a+ 追加 w+ 可读可写,写覆盖 w 写覆盖 r 只读,文件不存在时报错 open(file_name,"r","utf-8")
文件读写方法
f.read() 读全部内容 f.readline() 逐行读 f.readlines() 所有行 f.write() 写 f.writelines() 写列表
#-- coding: UTF-8 --
from ai.params import csv_path1
# 方式一
for line in open(csv_path1):
print(line)
# 方式二
with open(csv_path1) as f:
line = f.readline()
while line:
print(line, end = '\n')
line = f.readline()
# 方式三
with open(csv_path1) as f:
lines = f.readlines(10)
if lines :
for line in lines:
print(line)
utf8,utf-8,UTF8,UTF-8都可以
cat a.txt
a
with open(file="a.txt",mode='r',encoding='utf8') as f:
line = f.readline()
print(line)
line = f.readline()
print(f"--{line}--")
print(line.strip()) #什么也没有,在python中,空行也可以.处理方法而不报错
单位:字节
fpath = "aa.log" import os stt = os.stat(fpath) print(stt.st_size) #5
from pathlib import Path f = Path(fpath) size = f.stat().st_size print(size) #5
import os size = os.path.getsize(fpath) print(size) #5
import os
def to_mysql(value_list):
print(value_list)
base_dir="need_todbc"
# 获取当前目录下的所有文件
files = [os.path.join(base_dir, file) for file in os.listdir(base_dir)]
value_list_tmp = []
row_num = 0
max_count_todb = 7
for file in files:
if file.endswith(".txt"):
with open(file,mode='r',encoding='utf8') as f:
line = f.readline()
while line:
value_list_tmp.append(line.strip())
line = f.readline()
row_num = row_num+1
if row_num>=max_count_todb:
row_num = 0
to_mysql(value_list_tmp)
value_list_tmp=[]
if len(value_list_tmp)>0:
to_mysql(value_list_tmp)
value_list_tmp=[]
批次处理,并可中断可反复读取
import os
def deal_func(value_list):
print(value_list)
def batch_read(base_dir="need_todbc", save_file = "readed_file.txt", batch_size = 5):
# 获取当前目录下的所有文件
files = [os.path.join(base_dir, file) for file in os.listdir(base_dir)]
value_list_tmp = []
value_list_old =[]
row_num = 0
if os.path.exists(save_file):
readed_file = open(save_file,'r')
value_list_old = readed_file.readlines()
readed_file.close()
line_num = 0
for line in value_list_old:
value_list_old[line_num] = line.strip()
line_num = line_num+1
write_file = open(save_file,'a+')
for file in files:
if file.endswith(".txt") and (file not in value_list_old):
with open(file,mode='r',encoding='utf8') as f:
line = f.readline()
while line:
value_list_tmp.append(line.strip())
line = f.readline()
row_num = row_num+1
if row_num>=batch_size:
row_num = 0
deal_func(value_list_tmp)
value_list_tmp=[]
if len(value_list_tmp)>0:
deal_func(value_list_tmp)
value_list_tmp=[]
value_list_old.append(file)#文件写完后保存一下
write_file.write(f"{file}\n")
write_file.close()
batch_read(base_dir="need_todbc", save_file = "readed_file.txt")
import os
def last_line(filename, last_n = 2):
"""读取一个文件的最后n行,前去除前后空格
fp.seek(offset, 2):
- where=0,1,2分别表示从文件头,当前指针位置,文件尾偏移,缺省值为0
- where=2,文件打开的方式必须是二进制打开,即使用'rb'模式
"""
try:
filesize = os.path.getsize(filename)
if filesize == 0:
return None
else:
with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb'
offset = -16
while -offset < filesize: #
fp.seek(offset, 2) #
lines = fp.readlines() #
if len(lines) >= last_n:
res = lines[-last_n:]
return ([(v.decode("utf8")).strip() for v in res]) #
else:
offset *= 2
fp.seek(0) # 边界条件的处理,只有一行时
lines = fp.readlines()
return ((lines[-1]).decode('utf8')).strip()
except FileNotFoundError:
print(filename + ' not found!')
return None
strip可去除首尾的字符
$ cat /opt/tpf/aiwks/datasets/text/a.cvs
{aa=1,bb=2}
from ai.params import csv_path1
with open(csv_path1) as f:
lines = f.readlines(64)
if lines :
for line in lines:
kv = line.replace(" ","").strip('{').strip('}').split(",")
row = {}
for elem in kv:
tmp = elem.split("=")
row[tmp[0]] = tmp[1]
print(row)
pandas读取csv zip压缩文件
import zipfile
import pandas as pd
zp = zipfile.ZipFile("data/aa.zip", 'r')
names = zp.namelist()
f = zp.open(names[0])
data = pd.read_csv(f)
f.close()
python读取tar.gz文件
import tarfile
import pandas as pd
tar = tarfile.open('data/aa.tar.gz')
name = tar.getnames() #获取被压缩文件的名字,list形式
tar.extractall('./tmp') #解压后文件存放的路径
df = pd.read_csv('./tmp/' + name[0])
python读取zip压缩文件
import zipfile
import pandas as pd
class ZipReader(object):
'''python读取zip压缩文件
'''
def __init__(self, zip_path):
'''
:param zip_path: zip文件路径
'''
self.zip = zipfile.ZipFile(zip_path, 'r') # 创建一个zipfile
def get_filecount(self):
'''
:return: 返回压缩包里面的文件个数
'''
return len(self.zip.namelist())
def get_files(self):
'''generator ,每次返回一个文件的内容
'''
for name in self.zip.namelist():
yield self.read_lines(name) # 生成器
def read_lines(self, name):
'''list列表,每个元素为一行
'''
return [line.decode() for line in self.zip.open(name).readlines()]
def get_filenames(self):
'''zip文件里面的所有文件名
'''
return self.zip.namelist()
def extract_to(self, path):
'''解压路径
'''
self.zip.extractall(path)
return path
def read_cvs(self, name):
'''读取单个csv文件并转为pandas
'''
f = self.zip.open(name)
data = pd.read_csv(f)
f.close()
return data
def read_csvs(self):
'''generator,每次返回一个csv文件的内容
'''
for name in self.zip.namelist():
yield self.read_cvs(name) # 生成器
if __name__ == "__main__":
zp = ZipReader(zip_path="data/aa.zip")
print("文件个数:", zp.get_filecount())
print("文件名列表:", zp.zip.namelist())
print("文件内容(所有文件):", str(list(zp.get_files())))
print("解压路径:", zp.extract_to("./test"))
for data in zp.read_csvs():
print(type(data)) # class 'pandas.core.frame.DataFrame'
break
统计指定目录下特定格式文件的个数
import datetime
import os
def file_counts(filePath="/tmp/logs", nearent_days = 7, expect_count=7):
"""统计最近几天生成文件个数,文件名称以日期开头
"""
file_num = 0
for i in range(1, nearent_days+1):
day = datetime.datetime.now() - datetime.timedelta(days=i)
day = day.strftime('%Y-%m-%d')
day_num = 0
# os.listdir 读取出当前文件夹下的文件夹和文件
for file in os.listdir(filePath):
if file.endswith(".zip") and file.startswith(day):
file_num += 1
day_num += 1
if day_num != expect_count :
print(day,day_num)
print(f'最近{nearent_days}天文件总数为:{file_num}个--------')
file_counts()
in or not in
"go" in "good" # True "bad" not in "good" # True
使用切片
a="a,a,a," a[:-1] # 'a,a,a'
writelines
import time
start_time = time.time() # 记录程序开始时间
for i in range(1000):
with open('example.txt', 'w') as file:
lines = []
for i in range(1000):
lines.append('{}'.format(i))
file.writelines(lines)
end_time = time.time() # 记录程序结束时间
tim_ms = round((end_time - start_time)*1000,2)
print("程序运行时间为", tim_ms, "毫秒")
程序运行时间为 2629.4 毫秒
import time
start_time = time.time() # 记录程序开始时间
for i in range(10000):
with open('example2.txt', 'w') as file:
file.write('{}'.format(i))
end_time = time.time() # 记录程序结束时间
tim_ms = round((end_time - start_time)*1000,2)
print("程序运行时间为", tim_ms, "毫秒")
程序运行时间为 21376.61 毫秒 这是1W次循环的时间,下面还测试一下无打开关闭的时间
import time
file = open('example3.txt', 'a')
start_time = time.time() # 记录程序开始时间
for i in range(1000000):
file.write('{}'.format(i))
file.close()
end_time = time.time() # 记录程序结束时间
tim_ms = round((end_time - start_time)*1000,2)
print("程序运行时间为", tim_ms, "毫秒")
程序运行时间为 449.95 毫秒 看来打开关闭在数据量大的情况下,相当耗时
def write(obj,file_path):
"""
直接将对象转字符串写入文件,这样可以在文件打开时,看到原内容,还可以进行搜索
"""
ss = str(obj)
with open(file_path,"w",encoding="utf-8") as f:
f.write(ss)
def read(file_path):
with open(file_path,'r',encoding="utf-8") as f:
c = eval(f.read())
return c
r 只读 按字符,文件必须存在 r+ 读写 按字符,文件必须存在 rb 只读 按字节,文件必须存在 rb+ 读写 按字节,文件必须存在 w 只写 按字符,文件不存在则创建,存在则覆盖 w+ 读写 按字符,文件不存在则创建,存在则覆盖 wb 只写 按字节,文件不存在则创建,存在则覆盖 wb+ 读写 按字节,文件不存在则创建,存在则覆盖
f.read()
f = open("a.txt",mode="rb")
print(f.read())#一次读取所有内容
f.close()
b'a\nd\nd \na\n\xe5\x9c\xa8\n\xe5\x9c\xa8\xe8\xa6\x81\xe5\xb7\xa5\n'
f.read(size)
一次读取指定的字节数
Python读写文件
python中文件读写mode参数