mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-03 00:37:48 +08:00
fix(agent): ensure database connections are properly closed in ExeSQL tool (#13427)
## Summary Fix a database connection and cursor resource leak in the ExeSQL agent tool. When SQL execution raises an exception (for example syntax error or missing table), the existing code path skips `cursor.close()` and `db.close()`, causing database connections to accumulate over time. This can eventually lead to connection exhaustion in long-running agent workflows. ## Root Cause The cleanup logic for database cursors and connections is placed after the SQL execution loop without `try/finally` protection. If an exception occurs during `cursor.execute()`, `fetchmany()`, or result processing, the cleanup code is not reached and the connection remains open. The same issue also exists in the IBM DB2 execution path where `ibm_db.close(conn)` may be skipped when exceptions occur. ## Fix - Wrap SQL execution logic in `try/finally` blocks to guarantee resource cleanup. - Ensure `cursor.close()` and `db.close()` are always executed. - Add explicit `db.close()` when `db.cursor()` creation fails. - Remove redundant close calls in early-return branches since `finally` now handles cleanup. ## Impact - No change to normal execution behavior. - Ensures database resources are always released when errors occur. - Prevents connection leaks in long-running workflows. - Only affects `agent/tools/exesql.py`. ## Testing Manual test scenarios: 1. Valid SQL execution 2. SQL syntax error 3. Query against a non-existing table 4. Execution cancellation during query In all scenarios the database cursor and connection are properly closed. Code quality checks: - `ruff check` passed - No new warnings introduced
This commit is contained in:
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@ -195,43 +196,43 @@ class ExeSQL(ToolBase, ABC):
|
||||
except Exception as e:
|
||||
raise Exception("Database Connection Failed! \n" + str(e))
|
||||
|
||||
sql_res = []
|
||||
formalized_content = []
|
||||
for single_sql in sqls:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
ibm_db.close(conn)
|
||||
return
|
||||
|
||||
single_sql = single_sql.replace("```", "").strip()
|
||||
if not single_sql:
|
||||
continue
|
||||
single_sql = re.sub(r"\[ID:[0-9]+\]", "", single_sql)
|
||||
|
||||
stmt = ibm_db.exec_immediate(conn, single_sql)
|
||||
rows = []
|
||||
row = ibm_db.fetch_assoc(stmt)
|
||||
while row and len(rows) < self._param.max_records:
|
||||
try:
|
||||
sql_res = []
|
||||
formalized_content = []
|
||||
for single_sql in sqls:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
ibm_db.close(conn)
|
||||
return
|
||||
rows.append(row)
|
||||
|
||||
single_sql = single_sql.replace("```", "").strip()
|
||||
if not single_sql:
|
||||
continue
|
||||
single_sql = re.sub(r"\[ID:[0-9]+\]", "", single_sql)
|
||||
|
||||
stmt = ibm_db.exec_immediate(conn, single_sql)
|
||||
rows = []
|
||||
row = ibm_db.fetch_assoc(stmt)
|
||||
while row and len(rows) < self._param.max_records:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
return
|
||||
rows.append(row)
|
||||
row = ibm_db.fetch_assoc(stmt)
|
||||
|
||||
if not rows:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
continue
|
||||
if not rows:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
continue
|
||||
|
||||
df = pd.DataFrame(rows)
|
||||
for col in df.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(df[col]):
|
||||
df[col] = df[col].dt.strftime("%Y-%m-%d")
|
||||
df = pd.DataFrame(rows)
|
||||
for col in df.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(df[col]):
|
||||
df[col] = df[col].dt.strftime("%Y-%m-%d")
|
||||
|
||||
df = df.where(pd.notnull(df), None)
|
||||
df = df.where(pd.notnull(df), None)
|
||||
|
||||
sql_res.append(convert_decimals(df.to_dict(orient="records")))
|
||||
formalized_content.append(df.to_markdown(index=False, floatfmt=".6f"))
|
||||
|
||||
ibm_db.close(conn)
|
||||
sql_res.append(convert_decimals(df.to_dict(orient="records")))
|
||||
formalized_content.append(df.to_markdown(index=False, floatfmt=".6f"))
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
ibm_db.close(conn)
|
||||
|
||||
self.set_output("json", sql_res)
|
||||
self.set_output("formalized_content", "\n\n".join(formalized_content))
|
||||
@ -239,42 +240,45 @@ class ExeSQL(ToolBase, ABC):
|
||||
try:
|
||||
cursor = db.cursor()
|
||||
except Exception as e:
|
||||
with contextlib.suppress(Exception):
|
||||
db.close()
|
||||
raise Exception("Database Connection Failed! \n" + str(e))
|
||||
|
||||
sql_res = []
|
||||
formalized_content = []
|
||||
for single_sql in sqls:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
try:
|
||||
sql_res = []
|
||||
formalized_content = []
|
||||
for single_sql in sqls:
|
||||
if self.check_if_canceled("ExeSQL processing"):
|
||||
return
|
||||
|
||||
single_sql = single_sql.replace('```', '').strip()
|
||||
if not single_sql:
|
||||
continue
|
||||
single_sql = re.sub(r"\[ID:[0-9]+\]", "", single_sql)
|
||||
cursor.execute(single_sql)
|
||||
if cursor.rowcount == 0:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
break
|
||||
if self._param.db_type == 'mssql':
|
||||
single_res = pd.DataFrame.from_records(cursor.fetchmany(self._param.max_records),
|
||||
columns=[desc[0] for desc in cursor.description])
|
||||
else:
|
||||
single_res = pd.DataFrame([i for i in cursor.fetchmany(self._param.max_records)])
|
||||
single_res.columns = [i[0] for i in cursor.description]
|
||||
|
||||
for col in single_res.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(single_res[col]):
|
||||
single_res[col] = single_res[col].dt.strftime('%Y-%m-%d')
|
||||
|
||||
single_res = single_res.where(pd.notnull(single_res), None)
|
||||
|
||||
sql_res.append(convert_decimals(single_res.to_dict(orient='records')))
|
||||
formalized_content.append(single_res.to_markdown(index=False, floatfmt=".6f"))
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
cursor.close()
|
||||
with contextlib.suppress(Exception):
|
||||
db.close()
|
||||
return
|
||||
|
||||
single_sql = single_sql.replace('```','')
|
||||
if not single_sql:
|
||||
continue
|
||||
single_sql = re.sub(r"\[ID:[0-9]+\]", "", single_sql)
|
||||
cursor.execute(single_sql)
|
||||
if cursor.rowcount == 0:
|
||||
sql_res.append({"content": "No record in the database!"})
|
||||
break
|
||||
if self._param.db_type == 'mssql':
|
||||
single_res = pd.DataFrame.from_records(cursor.fetchmany(self._param.max_records),
|
||||
columns=[desc[0] for desc in cursor.description])
|
||||
else:
|
||||
single_res = pd.DataFrame([i for i in cursor.fetchmany(self._param.max_records)])
|
||||
single_res.columns = [i[0] for i in cursor.description]
|
||||
|
||||
for col in single_res.columns:
|
||||
if pd.api.types.is_datetime64_any_dtype(single_res[col]):
|
||||
single_res[col] = single_res[col].dt.strftime('%Y-%m-%d')
|
||||
|
||||
single_res = single_res.where(pd.notnull(single_res), None)
|
||||
|
||||
sql_res.append(convert_decimals(single_res.to_dict(orient='records')))
|
||||
formalized_content.append(single_res.to_markdown(index=False, floatfmt=".6f"))
|
||||
|
||||
cursor.close()
|
||||
db.close()
|
||||
|
||||
self.set_output("json", sql_res)
|
||||
self.set_output("formalized_content", "\n\n".join(formalized_content))
|
||||
|
||||
Reference in New Issue
Block a user