oracle客户端配置

oracle客户端安装

 
下载32位客户端
https://www.oracle.com/database/technologies/instant-client/linux-x86-32-downloads.html



需要安装
oracle客户端,instantclient-basic-linux.x64-11.2.0.4.0.zip

其中 oracle客户端不仅是必须的,而且还需要把它的库文件加载到系统中

vim /etc/ld.so.conf
/opt/app/instantclient_11_2
然后执行ldconfig将库文件直接加载到运行的OS中
ldconfig

上面的操作等价于
rpm -ivh oracle-instantclient-basic-21.8.0.0.0-1.x86_64.rpm
当前(2023-01)最新的客户端是21版本,经测试它可以连接11g,
安装这个后,就可以连接所有的oracle数据库了


还有两个必要条件:
oracle的端口不能被防火墙禁用,默认是禁用的,要么关闭防火墙,要么防火墙开放其端口;
oracle监听程序不能限制IP,默认只限本地访问;

防火墙问题会报超时 cx_Oracle.DatabaseError: ORA-12170: TNS: 连接超时

监听
D:\app\Administrator\product\11.2.0\dbhome_1\NETWORK\ADMIN 
默认:

LISTENER =
  (DESCRIPTION_LIST =
    (DESCRIPTION =
      (ADDRESS = (PROTOCOL = IPC)(KEY = EXTPROC1521))
      (ADDRESS = (PROTOCOL = TCP)(HOST = localhost)(PORT = 1521))
    )
  )

修改为:

LISTENER =
  (DESCRIPTION_LIST =
    (DESCRIPTION =
      (ADDRESS = (PROTOCOL = IPC)(KEY = EXTPROC1521))
      (ADDRESS = (PROTOCOL = TCP)(HOST = 0.0.0.0)(PORT = 1521))
    )
  )

实例启动
win11右键我的电脑-->服务,可以找到类似OracleServiceORCL的服务
可以右键服务启动
也可以在cmd命令行执行
net start OracleServiceCASE
net stop OracleServiceCASE

win11重启监听
lsnrctl stop
lsnrctl start
lsnrctl status

 

    

 

    

 


 

  

 


python连接oracle示例

首次设置密码

 
from tpf.db import reset_passwd
reset_passwd({'db1.username': 'automng',
  'db1.password': '00000000',
  'db1.host': '101.43.140.111',
  'db1.port': 13301,
  'db1.database': 'db1',
  'db1.charset': 'utf8',
  'report.username': 'aaa',
  'report.password': 'aaabbbcccddd',
  'report.url': '10.11.111.11:1521/orcl'})
  
    

 
from tpf.db import update_passwd
update_passwd({
    "aml.username":"qqqq",
    "aml.password":"qqqqqq",
    "aml.url":"11.11.11.11:1521/dbname"})
  

获取已有密码配置,补充后可以再更新

 
from tpf.db import getPwd
getPwd()
    

 
from tpf.db import OracleDb
db = OracleDb(name="aml")
db.select("select 1 from dual")

 
-------------------------------------------------------------------------------  

 
(pyai) [root@ml aiwks]# ldd --version
ldd (GNU libc) 2.17
Copyright (C) 2012 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
Written by Roland McGrath and Ulrich Drepper.

    
instantclient_23_7 要求gcc最低为2.27 

export LD_LIBRARY_PATH=/ai/app/instantclient_12_2:$LD_LIBRARY_PATH

 

    

 

    

本例运行环境为linux,需要提前安装oracle客户端,前面已进行安装

cx_Oracle包依赖oracle客户端中的这些库文件

import cx_Oracle

conn = cx_Oracle.connect('user','password','ip:port/DB',encoding="UTF-8") 
cur = conn.cursor() 

query = cur.execute("select * from tablename where rownum <= 10")
col = [c[0] for c in cur.description] #获取列名
wq = query.fetchall() #获取数据

cur.close()
conn.close()

import pandas as pd
data = pd.DataFrame(wq,columns=col)

自动关闭连接

Connections should be released when they are no longer needed by calling Connection.close(). Alternatively, you may prefer to let connections be automatically cleaned up when references to them go out of scope. This lets cx_Oracle close dependent resources in the correct order. One other approach is the use of a “with” block, for example:
with cx_Oracle.connect(user=user, password=password,
    dsn="dbhost.example.com/orclpdb1",encoding="UTF-8") as connection:
    cursor = connection.cursor()
    cursor.execute("insert into SomeTable values (:1, :2)",(1, "Some string"))
    connection.commit()

 

    

 
https://www.oracle.com/database/technologies/instant-client/downloads.html

rpm必须使用root用户安装,zip普通用户解压即可

注意这里使用的是64位客户端,同时强调一下,之前安装的rpm是32位的
rsync -rltDv /mnt/d/soft/instantclient-basic-linux.x64-12.2.0.1.0.zip ./
unzip instantclient-basic-linux.x64-12.2.0.1.0.zip
    
export LD_LIBRARY_PATH=~/app/python/instantclient_12_2:$LD_LIBRARY_PATH

export LD_LIBRARY_PATH=/data/jupyter/instantclient_21_15:$LD_LIBRARY_PATH

python中设置LD_LIBRARY_PATH, 系统中设置后则不需要python重复设备

 
import os  

# 设置LD_LIBRARY_PATH环境变量  
# 注意:这里我们假设instantclient_12_2是你的Oracle Instant Client的路径  
oracle_client_path = "/home/ai-aml/app/python/instantclient_12_2"  
os.environ['LD_LIBRARY_PATH'] = oracle_client_path + ':' + os.environ.get('LD_LIBRARY_PATH', '')  
  
# 验证设置是否成功  
print("LD_LIBRARY_PATH:", os.environ['LD_LIBRARY_PATH'])  
  
# 现在你可以尝试导入cx_Oracle并连接到Oracle数据库了  
import cx_Oracle  
  
# 你的数据库连接代码...
    

  
如果设置了 LD_LIBRARY_PATH,则不需要下面的设置了,但在windows上可能还需要
# 如果报DPI-1047  找不到client的lib  用一下两句
#cx.init_oracle_client(lib_dir=r'/home/ai-aml/app/python/instantclient_12_2')
  

 


 


 
https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html

llm@ii:~$ ldd --version
ldd (Ubuntu GLIBC 2.39-0ubuntu8.4) 2.39
Copyright (C) 2024 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
Written by Roland McGrath and Ulrich Drepper.

Version 23.8.0.0.0 (Requires glibc 2.28)
https://download.oracle.com/otn_software/linux/instantclient/2370000/instantclient-basic-linux.x64-23.7.0.25.01.zip

 
unzip instantclient-basic-linux.x64-23.7.0.25.01.zip

export LD_LIBRARY_PATH=/wks/app/instantclient_23_7:$LD_LIBRARY_PATH

 
import cx_Oracle

conn = cx_Oracle.connect('user','password','ip:port/DB',encoding="UTF-8") 
cur = conn.cursor() 

query = cur.execute("select 1 from dual")
col = [c[0] for c in cur.description] #获取列名
wq = query.fetchall() #获取数据

cur.close()
conn.close()

import pandas as pd
data = pd.DataFrame(wq,columns=col)

  

 
DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libaio.so.1: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help

root@ii:~# find /usr/ -name libaio*
/usr/share/doc/libaio1t64
/usr/share/doc/libaio-dev
/usr/lib/x86_64-linux-gnu/libaio.so
/usr/lib/x86_64-linux-gnu/libaio.so.1t64
/usr/lib/x86_64-linux-gnu/libaio.so.1t64.0.2
/usr/lib/x86_64-linux-gnu/libaio.a
/usr/include/libaio.h

root@ii:/usr/lib/x86_64-linux-gnu# ll libaio.so
lrwxrwxrwx 1 root root 18 Oct  2  2024 libaio.so -> libaio.so.1t64.0.2
root@ii:/usr/lib/x86_64-linux-gnu# ln -s libaio.so.1t64.0.2 libaio.so.1
root@ii:/usr/lib/x86_64-linux-gnu# ll  libaio.so.1
lrwxrwxrwx 1 root root 18 Jun  8 16:43 libaio.so.1 -> libaio.so.1t64.0.2




DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libaio.so.1: cannot open shared object file: No such file or directory"

 
find /usr/ -name libaio*

root@qisan:/usr/lib/x86_64-linux-gnu# ll libaio*
-rw-r--r-- 1 root root 22022 Oct  2  2024 libaio.a
lrwxrwxrwx 1 root root    18 Oct  2  2024 libaio.so -> libaio.so.1t64.0.2
lrwxrwxrwx 1 root root    18 Oct  2  2024 libaio.so.1t64 -> libaio.so.1t64.0.2
-rw-r--r-- 1 root root 14336 Oct  2  2024 libaio.so.1t64.0.2
  

 
ln -s libaio.so.1t64.0.2 libaio.so.1
  

 

  

 


python连接pg
rpm -qa | grep python-psycopg2

在线安装
yum install python-psycopg2          
apt-get install python3-psycopg2
apt install libpq-dev 

离线安装rpm 
yum search python-psycopg2
mkdir -p /opt/soft/pg/rpm
yum install --downloadonly --downloaddir=/opt/soft/pg/rpm python-psycopg2.x86_64

离线下载
pip3 download -d /opt/soft/pg/py psycopg2 -i https://pypi.tuna.tsinghua.edu.cn/simple

离线安装 
pip3 install /opt/soft/pg/py/psycopg2

python定时连接数据库

pip install func_timeout

import time 
from func_timeout import func_set_timeout

@func_set_timeout(7)
def select_data(time_limit=6):
    for num in range(0, time_limit):
        time.sleep(1)
        print(num)
sqlite3

 
apt update 
apt-get install sqlite3

which sqlite3
/usr/bin/sqlite3

mkdir /wks/app/sqlite3
chown -R llm:llm /wks/app/sqlite3/

    

 
  root@ii:/# sqlite3 --version
  3.45.1 2024-01-30 16:01:20 e876e51a0ed5c5b3126f52e532044363a014bc594cfefa87ffb5b82257ccalt1 (64-bit)
  root@ii:/# sqlite3 /wks/app/sqlite3/vo.db
  SQLite version 3.45.1 2024-01-30 16:01:20
  Enter ".help" for usage hints.
  sqlite> -- 创建一个表
  CREATE TABLE users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    age INTEGER
  );
  
  -- 插入数据
  INSERT INTO users (name, age) VALUES ('Alice', 25);
  INSERT INTO users (name, age) VALUES ('Bob', 30);
  
  -- 查询数据
  SELECT * FROM users;
  1|Alice|25
  2|Bob|30
  sqlite>
    

 

    

 
pip install pysqlite3
    

 
from tpf.db import SqLite
sdb = SqLite()

sql= '''
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    age INTEGER
)
'''
sdb.exec(sql)

#插入数据
sdb.exec("INSERT INTO users (name, age) VALUES ('Alice', 25)")
sdb.exec("INSERT INTO users (name, age) VALUES ('Bob', 30)")

sql = "SELECT * FROM users"
df = sdb.select(sql, columns=["id","name","age"])
df

 

    

 


 

  

 


db2

 
pip install ibm_db
或
micromamba install ibm_db

 

    

 

    

 
```
import ibm_db

conn_str = (
    "DATABASE=sample;"       # 数据库名
    "HOSTNAME=192.168.1.10;" # DB2 服务器 IP
    "PORT=50000;"
    "PROTOCOL=TCPIP;"
    "UID=db2inst1;"          # 用户名
    "PWD=secret;"            # 密码
)

conn = ibm_db.connect(conn_str, "", "")
print("已连接到 DB2")

sql = "SELECT * FROM department FETCH FIRST 5 ROWS ONLY"
stmt = ibm_db.exec_immediate(conn, sql)
row = ibm_db.fetch_assoc(stmt)
while row:
    print(row)
    row = ibm_db.fetch_assoc(stmt)

ibm_db.close(conn)
```
    

 

    

 

    

 
```
import ibm_db, ibm_db_dbi      # 提供 DB-API 连接
import pandas as pd

# 1. 拼连接串
conn_str = (
    "DATABASE=sample;"         # 数据库名
    "HOSTNAME=192.168.1.10;"   # DB2 服务器 IP
    "PORT=50000;"
    "PROTOCOL=TCPIP;"
    "UID=db2inst1;"            # 用户名
    "PWD=secret"               # 密码
)

# 2. 建立 DB-API 连接
conn = ibm_db_dbi.Connection(ibm_db.connect(conn_str, "", ""))

# 3. 用 pandas 直接读
sql = "SELECT deptno, deptname, mgrno FROM department FETCH FIRST 10 ROWS ONLY"
df = pd.read_sql_query(sql, conn)

print(df.head())
conn.close()
```

 

  

 


Gauss

 
def select_byday(self, sql, time_col='dt_time', start_date='2025-01-01', end_date='2025-01-01',
  deal_func=None, show_progress=True, **func_params):
  """高斯数据库按天查询数据并进行处理

  从start_date到end_date,逐天查询数据,每天查询完成后执行deal_func处理

  Args:
  sql: SQL查询语句,应包含时间列占位符 {time_col_condition}
  time_col: 时间列名,默认为 'dt_time'
  start_date: 开始日期,格式 'YYYY-MM-DD'
  end_date: 结束日期,格式 'YYYY-MM-DD'
  deal_func: 数据处理函数,接收DataFrame参数
  show_progress: 是否显示处理进度
  **func_params: 传递给deal_func的额外参数

  Returns:
  如果deal_func为None,返回所有查询结果的合并DataFrame
  如果deal_func不为None,返回每天处理结果的列表

  Example:
  # 基本用法 - 逐天查询并合并结果
  result = gdb.select_byday(
  "SELECT * FROM sales WHERE {time_col_condition}",
  time_col='create_time',
  start_date='2025-01-01',
  end_date='2025-01-07'
  )

  # 带数据处理的用法 - 逐天处理
  def process_daily_data(df, **params):
  # 每日数据处理逻辑
  return {
  'date': df['create_time'].dt.date.iloc[0],
  'total_amount': df['amount'].sum(),
  'order_count': len(df)
  }

  results = gdb.select_byday(
  "SELECT * FROM sales WHERE {time_col_condition}",
  start_date='2025-01-01',
  end_date='2025-01-31',
  deal_func=process_daily_data
  )
  """
  import datetime

  # 生成从start_date到end_date的日期列表
  start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
  end_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d')

  if start_dt > end_dt:
  raise ValueError(f"开始日期 {start_date} 不能晚于结束日期 {end_date}")

  # 计算总天数
  total_days = (end_dt - start_dt).days + 1
  print(f"开始按天查询数据,从 {start_date} 到 {end_date},共 {total_days} 天")

  all_results = []
  current_dt = start_dt
  day_count = 0

  # 逐天循环查询
  while current_dt <= end_dt:
  day_count += 1
  current_date_str = current_dt.strftime('%Y-%m-%d')

  if show_progress:
  print(f"\n处理第 {day_count}/{total_days} 天: {current_date_str}")

  try:
  # 构建单天的查询条件
  time_condition = f"DATE({time_col}) = '{current_date_str}'"

  # 替换SQL中的时间条件占位符
  final_sql = sql.format(time_col_condition=time_condition)

  if show_progress:
  print(f"查询SQL: {final_sql}")

  # 查询当天的数据
  daily_df = self.select(final_sql, show_sql=False)

  if daily_df.empty:
  if show_progress:
      print(f"  - {current_date_str} 无数据")
  daily_result = None
  else:
  if show_progress:
      print(f"  - 查询到 {len(daily_df)} 条记录")

  # 如果提供了数据处理函数,则调用处理函数
  if deal_func is not None and callable(deal_func):
      try:
          daily_result = deal_func(daily_df, **func_params)
          if show_progress:
              print(f"  - 数据处理完成: {type(daily_result)}")
      except Exception as e:
          print(f"  - 数据处理失败: {e}")
          daily_result = None
  else:
      # 没有处理函数,直接返回查询结果
      daily_result = daily_df

  all_results.append({
  'date': current_date_str,
  'data': daily_result,
  'count': len(daily_df) if not daily_df.empty else 0
  })

  except Exception as e:
  print(f"  - 处理日期 {current_date_str} 时出错: {e}")
  all_results.append({
  'date': current_date_str,
  'data': None,
  'count': 0,
  'error': str(e)
  })

  # 移动到下一天
  current_dt += datetime.timedelta(days=1)

  # 汇总结果
  successful_days = [r for r in all_results if r.get('error') is None]
  failed_days = [r for r in all_results if r.get('error') is not None]
  total_records = sum(r['count'] for r in all_results)

  print(f"\n查询完成:")
  print(f"  - 总天数: {total_days}")
  print(f"  - 成功处理: {len(successful_days)} 天")
  print(f"  - 处理失败: {len(failed_days)} 天")
  print(f"  - 总记录数: {total_records}")

  # 返回结果
  if deal_func is not None and callable(deal_func):
  # 如果有处理函数,返回处理结果列表
  results = [r['data'] for r in successful_days if r['data'] is not None]
  return results
  else:
  # 如果没有处理函数,合并所有DataFrame
  valid_dfs = [r['data'] for r in successful_days if r['data'] is not None and not r['data'].empty]
  if valid_dfs:
  combined_df = pd.concat(valid_dfs, ignore_index=True)
  print(f"  - 合并后数据: {len(combined_df)} 条记录")
  return combined_df
  else:
  print("  - 无有效数据")
  return pd.DataFrame()


    

 

    

 


 

  

 


参考
oracle客户端下载-夸克网盘 cx-oracle官方API