Files
ragflow/test/testcases/test_web_api/test_common.py
buua436 f703169117 Refa: migrate document preview/download to RESTful API (#14633)
### What problem does this PR solve?

migrate document preview/download to RESTful API

### Type of change
- [x] Refactoring
2026-05-08 13:26:13 +08:00

647 lines
23 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
from pathlib import Path
from uuid import uuid4
import requests
from configs import HOST_ADDRESS, VERSION
from requests_toolbelt import MultipartEncoder
from utils.file_utils import create_txt_file
HEADERS = {"Content-Type": "application/json"}
DATASETS_URL = f"/api/{VERSION}/datasets"
CHUNK_APP_URL = f"/{VERSION}/chunk"
CHUNK_API_URL = f"/api/{VERSION}/datasets/{{dataset_id}}/documents/{{document_id}}/chunks"
# SESSION_WITH_CHAT_ASSISTANT_API_URL = "/api/v1/chats/{chat_id}/sessions"
# SESSION_WITH_AGENT_API_URL = "/api/v1/agents/{agent_id}/sessions"
MEMORY_API_URL = f"/api/{VERSION}/memories"
MESSAGE_API_URL = f"/api/{VERSION}/messages"
API_APP_URL = f"/{VERSION}/api"
SYSTEM_APP_URL = f"/{VERSION}/system"
SYSTEM_API_URL = f"/api/{VERSION}/system"
LLM_APP_URL = f"/{VERSION}/llm"
PLUGIN_APP_URL = f"/api/{VERSION}/plugin"
SEARCHES_URL = f"/api/{VERSION}/searches"
CHATS_URL = f"/api/{VERSION}/chats"
def _http_debug_enabled():
return os.getenv("TEST_HTTP_DEBUG") == "1"
def _redact_payload(payload):
if not isinstance(payload, dict):
return payload
redacted = {}
for key, value in payload.items():
if any(token in key.lower() for token in ("api_key", "password", "token", "secret", "authorization")):
redacted[key] = "***redacted***"
else:
redacted[key] = value
return redacted
def _log_http_debug(method, url, req_id, payload, status, text, resp_json, elapsed_ms):
if not _http_debug_enabled():
return
payload_summary = _redact_payload(payload)
print(f"[HTTP DEBUG] {method} {url} req_id={req_id} elapsed_ms={elapsed_ms:.1f}")
print(f"[HTTP DEBUG] request_payload={json.dumps(payload_summary, default=str)}")
print(f"[HTTP DEBUG] status={status}")
print(f"[HTTP DEBUG] response_text={text}")
print(f"[HTTP DEBUG] response_json={json.dumps(resp_json, default=str) if resp_json is not None else None}")
def api_stats(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{API_APP_URL}/stats", headers=headers, auth=auth, params=params)
return res.json()
# SYSTEM APP
def system_new_token(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{SYSTEM_API_URL}/tokens", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def system_token_list(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{SYSTEM_API_URL}/tokens", headers=headers, auth=auth, params=params)
return res.json()
def system_delete_token(auth, token, *, headers=HEADERS):
res = requests.delete(url=f"{HOST_ADDRESS}{SYSTEM_API_URL}/tokens/{token}", headers=headers, auth=auth)
return res.json()
def system_status(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{SYSTEM_API_URL}/status", headers=headers, auth=auth, params=params)
return res.json()
def system_version(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{SYSTEM_API_URL}/version", headers=headers, auth=auth, params=params)
return res.json()
def system_config(auth=None, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{SYSTEM_API_URL}/config", headers=headers, auth=auth, params=params)
return res.json()
# LLM APP
def llm_factories(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{LLM_APP_URL}/factories", headers=headers, auth=auth, params=params)
return res.json()
def llm_list(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{LLM_APP_URL}/list", headers=headers, auth=auth, params=params)
return res.json()
# PLUGIN APP
def plugin_llm_tools(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{PLUGIN_APP_URL}/tools", headers=headers, auth=auth, params=params)
return res.json()
# SEARCH APP
def search_create(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{SEARCHES_URL}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def search_update(auth, search_id, payload=None, *, headers=HEADERS, data=None):
res = requests.put(url=f"{HOST_ADDRESS}{SEARCHES_URL}/{search_id}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def search_detail(auth, search_id, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{SEARCHES_URL}/{search_id}", headers=headers, auth=auth)
return res.json()
def search_list(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{SEARCHES_URL}", headers=headers, auth=auth, params=params)
return res.json()
def search_rm(auth, search_id, *, headers=HEADERS):
res = requests.delete(url=f"{HOST_ADDRESS}{SEARCHES_URL}/{search_id}", headers=headers, auth=auth)
return res.json()
# CHAT APP
def create_chat(auth, payload=None, *, headers=HEADERS, data=None):
if payload is None:
payload = {}
res = requests.post(url=f"{HOST_ADDRESS}{CHATS_URL}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def list_chats(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{CHATS_URL}", headers=headers, auth=auth, params=params)
return res.json()
def delete_chat(auth, chat_id, *, headers=HEADERS):
res = requests.delete(url=f"{HOST_ADDRESS}{CHATS_URL}/{chat_id}", headers=headers, auth=auth)
return res.json()
def delete_chats(auth, payload=None, *, headers=HEADERS, data=None):
if payload is None:
payload = {"delete_all": True}
res = requests.delete(url=f"{HOST_ADDRESS}{CHATS_URL}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def batch_create_chats(auth, num):
ids = []
for i in range(num):
res = create_chat(auth, {"name": f"chat_{uuid4().hex}_{i}"})
ids.append(res["data"]["id"])
return ids
# KB APP
def create_dataset(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{DATASETS_URL}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def list_datasets(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}", headers=headers, auth=auth, params=params)
return res.json()
def update_dataset(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def delete_datasets(auth, payload=None, *, headers=HEADERS, data=None):
"""
Delete datasets.
The endpoint is DELETE /api/{VERSION}/datasets with payload {"ids": [...]}
This is the standard SDK REST API endpoint for dataset deletion.
"""
res = requests.delete(url=f"{HOST_ADDRESS}{DATASETS_URL}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def detail_kb(auth, dataset_id, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}", headers=headers, auth=auth)
return res.json()
def kb_get_meta(auth, dataset_ids, *, headers=HEADERS):
params = {"dataset_ids": dataset_ids}
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/metadata/flattened", headers=headers, auth=auth, params=params)
return res.json()
def kb_basic_info(auth, dataset_id, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/ingestions/summary", headers=headers, auth=auth)
return res.json()
def kb_update_metadata_setting(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/metadata/config", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def kb_list_pipeline_logs(auth, dataset_id, params=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/ingestions"
res = requests.get(url=url, headers=headers, auth=auth, params=params)
return res.json()
def kb_list_pipeline_dataset_logs(auth, dataset_id, params=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/ingestions"
res = requests.get(url=url, headers=headers, auth=auth, params=params)
return res.json()
def kb_pipeline_log_detail(auth, dataset_id, log_id, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/ingestions/{log_id}", headers=headers, auth=auth)
return res.json()
def list_tags_from_kbs(auth, dataset_ids, *, headers=HEADERS):
params = {"dataset_ids": dataset_ids}
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/tags/aggregation", headers=headers, auth=auth, params=params)
return res.json()
def list_tags(auth, dataset_id, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/tags", headers=headers, auth=auth)
return res.json()
def rm_tags(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.delete(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/tags", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def rename_tags(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/tags", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def batch_create_datasets(auth, num):
ids = []
for i in range(num):
res = create_dataset(auth, {"name": f"kb_{i}"})
ids.append(res["data"]["id"])
return ids
# DOCUMENT APP
def upload_documents(auth, payload=None, files_path=None, *, filename_override=None):
# New endpoint: /api/v1/datasets/{kb_id}/documents
kb_id = payload.get("kb_id") if payload else None
url = f"{HOST_ADDRESS}/api/{VERSION}/datasets/{kb_id}/documents"
if files_path is None:
files_path = []
fields = []
file_objects = []
try:
# Note: kb_id is now in the URL path, not in the form data
if payload:
for k, v in payload.items():
if k != "kb_id": # Skip kb_id as it's in the URL
fields.append((k, str(v)))
for fp in files_path:
p = Path(fp)
f = p.open("rb")
filename = filename_override if filename_override is not None else p.name
fields.append(("file", (filename, f)))
file_objects.append(f)
m = MultipartEncoder(fields=fields)
res = requests.post(
url=url,
headers={"Content-Type": m.content_type},
auth=auth,
data=m,
)
return res.json()
finally:
for f in file_objects:
f.close()
def upload_info(auth, files_path=None, *, url=None):
"""
Call the /api/v1/documents/upload endpoint to get upload info.
This is used to get file metadata before actually uploading to a dataset.
Args:
auth: Authentication object
files_path: List of file paths to upload (optional)
url: URL to fetch file from (optional, can be used alone or with files_path to test mixed input rejection)
Returns:
Response JSON with upload info
"""
url_endpoint = f"{HOST_ADDRESS}/api/{VERSION}/documents/upload"
fields = []
file_objects = []
try:
if files_path:
for fp in files_path:
p = Path(fp)
f = p.open("rb")
fields.append(("file", (p.name, f)))
file_objects.append(f)
# Add url as query parameter if provided
if url:
url_endpoint = f"{url_endpoint}?url={url}"
# Handle empty fields (no files) - create empty MultipartEncoder
if not fields:
fields = [("empty", ("", ""))]
m = MultipartEncoder(fields=fields)
res = requests.post(
url=url_endpoint,
headers={"Content-Type": m.content_type},
auth=auth,
data=m,
)
return res.json()
finally:
for f in file_objects:
f.close()
def create_document(auth, payload=None, *, headers=HEADERS, data=None):
kb_id = payload.get("kb_id") if payload else None
request_payload = dict(payload or {})
request_payload.pop("kb_id", None)
res = requests.post(
url=f"{HOST_ADDRESS}{DATASETS_URL}/{kb_id}/documents?type=empty",
headers=headers,
auth=auth,
json=request_payload,
data=data,
)
return res.json()
def list_documents(auth, params=None, payload=None, *, headers=HEADERS, data=None):
kb_id = params.get("kb_id") if params else None
url = f"{HOST_ADDRESS}{DATASETS_URL}/{kb_id}/documents"
if payload is None:
payload = {}
res = requests.get(url=url, headers=headers, auth=auth, params=params, json=payload, data=data)
return res.json()
def delete_document(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
# New API: DELETE /api/v1/datasets/<dataset_id>/documents
url = f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents"
res = requests.delete(url=url, headers=headers, auth=auth, json=payload, data=data)
return res.json()
def parse_documents(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}/api/{VERSION}/documents/ingest", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def document_filter(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents?type=filter", params=payload, headers=headers, auth=auth, data=data)
return res.json()
def document_infos(auth, dataset_id, params=None, payload=None, *, headers=HEADERS, data=None):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents", params=params, json=payload, headers=headers, auth=auth, data=data)
return res.json()
def document_metadata_summary(auth, payload=None, *, headers=HEADERS, data=None):
dataset_id = (payload or {}).get("kb_id")
doc_ids = (payload or {}).get("doc_ids")
if not dataset_id:
return {"code": 101, "message": "KB ID is required"}
params = {}
if doc_ids:
params["doc_ids"] = ",".join(doc_ids)
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/metadata/summary", headers=headers, auth=auth, params=params, data=data)
return res.json()
def document_get(auth, document_id, *, headers=HEADERS, data=None):
res = requests.get(url=f"{HOST_ADDRESS}/api/{VERSION}/documents/{document_id}/preview", headers=headers, auth=auth, data=data)
return res
def document_download(auth, attachment_id, *, ext="markdown", headers=HEADERS, data=None):
res = requests.get(
url=f"{HOST_ADDRESS}/api/{VERSION}/documents/{attachment_id}/download",
headers=headers,
auth=auth,
params={"ext": ext},
data=data,
)
return res
def document_metadata_update(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
"""New unified API for updating document metadata.
Uses PATCH method at /api/v1/datasets/{dataset_id}/documents/metadatas
"""
res = requests.patch(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents/metadatas", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def document_update_metadata_setting(auth, dataset_id, doc_id, payload=None, *, headers=HEADERS, data=None):
res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents/{doc_id}/metadata/config", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def document_change_status(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
"""
Batch update document status within a dataset.
Args:
auth: Authentication credentials
dataset_id: ID of the dataset
payload: Request body containing doc_ids and status
"""
res = requests.post(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents/batch-update-status", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def document_update(auth, dataset_id, doc_id, payload=None, *, headers=HEADERS, data=None):
"""Update document via PATCH /api/v1/datasets/<dataset_id>/documents/<doc_id>"""
res = requests.patch(url=f"{HOST_ADDRESS}{DATASETS_URL}/{dataset_id}/documents/{doc_id}", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def document_thumbnails(auth, params=None, *, headers=HEADERS, data=None):
"""Get document thumbnails.
Args:
auth: Authentication object
params: Query parameters (e.g., {"doc_ids": ["doc1", "doc2"]})
"""
res = requests.get(url=f"{HOST_ADDRESS}/api/v1/thumbnails", params=params, headers=headers, auth=auth, data=data)
return res.json()
def bulk_upload_documents(auth, kb_id, num, tmp_path):
fps = []
for i in range(num):
fp = create_txt_file(tmp_path / f"ragflow_test_upload_{i}.txt")
fps.append(fp)
res = upload_documents(auth, {"kb_id": kb_id}, fps)
document_ids = []
for document in res["data"]:
document_ids.append(document["id"])
return document_ids
# CHUNK MANAGEMENT
def add_chunk(auth, dataset_id, document_id, payload=None, *, headers=HEADERS, data=None):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.post(url=url, headers=headers, auth=auth, json=payload, data=data)
return res.json()
def list_chunks(auth, dataset_id, document_id, params=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.get(url=url, headers=headers, auth=auth, params=params)
return res.json()
def get_chunk(auth, dataset_id, document_id, chunk_id, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}/{chunk_id}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.get(url=url, headers=headers, auth=auth)
return res.json()
def update_chunk(auth, dataset_id, document_id, chunk_id, payload=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}/{chunk_id}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.patch(url=url, headers=headers, auth=auth, json=payload)
return res.json()
def switch_chunks(auth, dataset_id, document_id, payload=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.patch(url=url, headers=headers, auth=auth, json=payload)
return res.json()
def delete_chunks(auth, dataset_id, document_id, payload=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.delete(url=url, headers=headers, auth=auth, json=payload)
return res.json()
def batch_add_chunks(auth, dataset_id, document_id, num):
chunk_ids = []
for i in range(num):
res = add_chunk(auth, dataset_id, document_id, {"content": f"chunk test {i}"})
chunk_ids.append(res["data"]["chunk"]["id"])
return chunk_ids
# MEMORY APP
def create_memory(auth, payload=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}"
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def update_memory(auth, memory_id:str, payload=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/{memory_id}"
res = requests.put(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def delete_memory(auth, memory_id:str):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/{memory_id}"
res = requests.delete(url=url, headers=HEADERS, auth=auth)
return res.json()
def list_memory(auth, params=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}"
if params:
query_parts = []
for key, value in params.items():
if isinstance(value, list):
for item in value:
query_parts.append(f"{key}={item}")
else:
query_parts.append(f"{key}={value}")
query_string = "&".join(query_parts)
url = f"{url}?{query_string}"
res = requests.get(url=url, headers=HEADERS, auth=auth)
return res.json()
def get_memory_config(auth, memory_id:str):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/{memory_id}/config"
res = requests.get(url=url, headers=HEADERS, auth=auth)
return res.json()
def list_memory_message(auth, memory_id, params=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/{memory_id}"
if params:
query_parts = []
for key, value in params.items():
if isinstance(value, list):
for item in value:
query_parts.append(f"{key}={item}")
else:
query_parts.append(f"{key}={value}")
query_string = "&".join(query_parts)
url = f"{url}?{query_string}"
res = requests.get(url=url, headers=HEADERS, auth=auth)
return res.json()
def add_message(auth, payload=None):
url = f"{HOST_ADDRESS}{MESSAGE_API_URL}"
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def forget_message(auth, memory_id: str, message_id: int):
url = f"{HOST_ADDRESS}{MESSAGE_API_URL}/{memory_id}:{message_id}"
res = requests.delete(url=url, headers=HEADERS, auth=auth)
return res.json()
def update_message_status(auth, memory_id: str, message_id: int, status: bool):
url = f"{HOST_ADDRESS}{MESSAGE_API_URL}/{memory_id}:{message_id}"
payload = {"status": status}
res = requests.put(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def search_message(auth, params=None):
url = f"{HOST_ADDRESS}{MESSAGE_API_URL}/search"
if params:
query_parts = []
for key, value in params.items():
if isinstance(value, list):
for item in value:
query_parts.append(f"{key}={item}")
else:
query_parts.append(f"{key}={value}")
query_string = "&".join(query_parts)
url = f"{url}?{query_string}"
res = requests.get(url=url, headers=HEADERS, auth=auth)
return res.json()
def get_recent_message(auth, params=None):
url = f"{HOST_ADDRESS}{MESSAGE_API_URL}"
if params:
query_parts = []
for key, value in params.items():
if isinstance(value, list):
for item in value:
query_parts.append(f"{key}={item}")
else:
query_parts.append(f"{key}={value}")
query_string = "&".join(query_parts)
url = f"{url}?{query_string}"
res = requests.get(url=url, headers=HEADERS, auth=auth)
return res.json()
def get_message_content(auth, memory_id: str, message_id: int):
url = f"{HOST_ADDRESS}{MESSAGE_API_URL}/{memory_id}:{message_id}/content"
res = requests.get(url=url, headers=HEADERS, auth=auth)
return res.json()