1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
| -- -- APISIX Plugin: claude2openai -- 参考实现: Higress ai-proxy provider/claude_to_openai.go -- -- 功能: -- 1. 请求阶段:将 Claude Messages API 请求转换为 OpenAI Chat Completions 请求 -- 2. 响应阶段:将 OpenAI Chat Completions 响应转换为 Claude Messages API 响应 -- 支持:非流式、流式、thinking、tool_use、多模态图片 --
local core = require("apisix.core") local cjson = require("cjson.safe") local ngx = ngx local pairs = pairs local ipairs = ipairs local type = type local table_insert = table.insert local table_concat = table.concat local string_sub = string.sub local string_find = string.find local ngx_now = ngx.now
local plugin_name = "jiankunking-claude2openai"
local schema = { type = "object", properties = { target_model = { type = "string", description = "Override the model name sent to OpenAI backend" }, }, }
local _M = { version = 0.1, priority = 2650, -- 在 consumer-restriction(2640) 和 key-auth(2500) 之前执行 name = plugin_name, schema = schema, }
function _M.check_schema(conf) return core.schema.check(schema, conf) end
------------------------------------------------------------------------ -- 工具函数 ------------------------------------------------------------------------
-- OpenAI finish_reason -> Claude stop_reason (参考 Higress openAIFinishReasonToClaude) local function openai_finish_reason_to_claude(reason) if not reason or reason == cjson.null then return "end_turn" end local mapping = { stop = "end_turn", length = "max_tokens", tool_calls = "tool_use", content_filter = "end_turn", } return mapping[reason] or reason end
-- 移除 x-anthropic-billing-header 中的动态 cch 字段以支持 prompt caching -- 参考 Higress stripCchFromBillingHeader -- cch 值每次请求都会变化,如果不移除会导致缓存失效 local function strip_cch_from_billing_header(text) if type(text) ~= "string" then return text end local prefix = "x-anthropic-billing-header:" if string_sub(text, 1, #prefix) ~= prefix then return text end -- 循环移除所有"; cch=xxx" 模式 local result = text while true do local cch_start = string_find(result, "; cch=", 1, true) if not cch_start then break end local after = cch_start + 2 -- skip "; " local semi_pos = string_find(result, ";", after, true) if not semi_pos then -- cch 在末尾,直接截断 result = string_sub(result, 1, cch_start - 1) else -- cch 后面还有内容,移除 "; cch=xxx" 部分 result = string_sub(result, 1, cch_start - 1) .. string_sub(result, semi_pos) end end return result end
-- Claude content blocks 中提取纯文本部分 local function extract_text_parts(content_array) local parts = {} if type(content_array) ~= "table" then return parts end for _, block in ipairs(content_array) do if block.type == "text" and block.text then table_insert(parts, block.text) end end return parts end
------------------------------------------------------------------------ -- 请求转换: Claude Messages -> OpenAI Chat Completions -- 参Higress ConvertClaudeRequestToOpenAI ------------------------------------------------------------------------
-- 转换 Claude content blocks 数组为 OpenAI 格式 -- 返回 { text_parts, tool_calls, tool_results, openai_contents } local function convert_content_array(content_array) local result = { text_parts = {}, tool_calls = {}, tool_results = {}, openai_contents = {}, } if type(content_array) ~= "table" then return result end
for _, block in ipairs(content_array) do if block.type == "text" and block.text then local processed_text = strip_cch_from_billing_header(block.text) table_insert(result.text_parts, processed_text) local openai_content = { type = "text", text = processed_text } -- 透传 cache_control 以支持 prompt caching if block.cache_control then openai_content.cache_control = block.cache_control end table_insert(result.openai_contents, openai_content)
elseif (block.type == "image" or block.type == "document") and block.source then -- Claude: {type:"image"/"document", source:{type:"base64", media_type:"...", data:"..."}} -- OpenAI: {type:"image_url", image_url:{url:"data:...;base64,..."}} local img_obj if block.source.type == "base64" then img_obj = { type = "image_url", image_url = { url = "data:" .. (block.source.media_type or "image/png") .. ";base64," .. block.source.data } } elseif block.source.type == "url" then img_obj = { type = "image_url", image_url = { url = block.source.url } } end if img_obj then if block.cache_control then img_obj.cache_control = block.cache_control end table_insert(result.openai_contents, img_obj) end
elseif block.type == "thinking" and block.thinking then -- Claude assistant 历史消息中的 thinking block -> OpenAI thinking block local thinking_obj = { type = "thinking", thinking = block.thinking, signature = block.signature or "" } table_insert(result.openai_contents, thinking_obj)
elseif block.type == "redacted_thinking" then -- Claude redacted_thinking block -> 透传 table_insert(result.openai_contents, { type = "redacted_thinking", data = block.data or "" })
elseif block.type == "tool_use" then -- Claude tool_use -> OpenAI tool_calls if block.id and block.name then table_insert(result.tool_calls, { id = block.id, type = "function", ["function"] = { name = block.name, arguments = cjson.encode(block.input or {}) or "{}" } }) core.log.debug("[Claude->OpenAI] Converted tool_use to tool_call: ", block.name) end
elseif block.type == "tool_result" then table_insert(result.tool_results, block) core.log.debug("[Claude->OpenAI] Found tool_result for tool_use_id: ", block.tool_use_id or "") end end
return result end
local function convert_claude_request_to_openai(claude_body, conf) core.log.debug("[Claude->OpenAI] Original Claude request body: ", cjson.encode(claude_body))
local openai_body = { model = conf.target_model or claude_body.model, max_tokens = claude_body.max_tokens, temperature = claude_body.temperature, top_p = claude_body.top_p, stop = claude_body.stop_sequences, stream = claude_body.stream, }
if openai_body.stream then openai_body.stream_options = { include_usage = true } end
local messages = {}
-- system: Claude 顶层字段 -> OpenAI system message(插入到 messages 最前面) if claude_body.system then if type(claude_body.system) == "string" then table_insert(messages, { role = "system", content = strip_cch_from_billing_header(claude_body.system) }) elseif type(claude_body.system) == "table" then -- system 可以是 content blocks 数组 -- 参考 litellm: 过滤掉 x-anthropic-billing-header 开头的 block,空文本 block 也跳过 local sys_contents = {} for _, block in ipairs(claude_body.system) do if block.type == "text" and block.text then if block.text == "" then -- 跳过空文本(Anthropic API 会报错) elseif string_sub(block.text, 1, 28) == "x-anthropic-billing-header:" then -- 跳过 billing header metadata core.log.debug("[Claude->OpenAI] Skipping billing header system block") else local sys_block = { type = "text", text = strip_cch_from_billing_header(block.text) } if block.cache_control then sys_block.cache_control = block.cache_control end table_insert(sys_contents, sys_block) end end end if #sys_contents > 0 then table_insert(messages, { role = "system", content = sys_contents }) end end end
-- messages 转换 if claude_body.messages then for _, msg in ipairs(claude_body.messages) do if type(msg.content) == "string" then -- 简单文本 table_insert(messages, { role = msg.role, content = msg.content }) elseif type(msg.content) == "table" then local conv = convert_content_array(msg.content)
-- 有 tool_calls(assistant 消息中的 tool_use) if #conv.tool_calls > 0 then local openai_msg = { role = msg.role, tool_calls = conv.tool_calls, } if #conv.text_parts > 0 then openai_msg.content = table_concat(conv.text_parts, "\n\n") else openai_msg.content = cjson.null end table_insert(messages, openai_msg) end
-- 有 tool_results(user 消息中的 tool_result) if #conv.tool_results > 0 then for _, tr in ipairs(conv.tool_results) do local tool_content if type(tr.content) == "string" then tool_content = tr.content elseif type(tr.content) == "table" then -- 参考 litellm: tool_result.content 可能包含 text + image 混合内容 local content_parts = {} local has_non_text = false for _, c in ipairs(tr.content) do if type(c) == "string" then table_insert(content_parts, { type = "text", text = c }) elseif type(c) == "table" then if c.type == "text" then table_insert(content_parts, { type = "text", text = c.text or "" }) elseif (c.type == "image" or c.type == "document") and c.source then has_non_text = true if c.source.type == "base64" then table_insert(content_parts, { type = "image_url", image_url = { url = "data:" .. (c.source.media_type or "image/png") .. ";base64," .. c.source.data } }) elseif c.source.type == "url" then table_insert(content_parts, { type = "image_url", image_url = { url = c.source.url } }) end end end end -- 如果只有纯文本,合并为 string(兼容性更好) if not has_non_text then local texts = {} for _, p in ipairs(content_parts) do if p.type == "text" then table_insert(texts, p.text) end end tool_content = table_concat(texts, "\n") else -- 混合内容(text + image),保持数组格式 tool_content = content_parts end else tool_content = "" end table_insert(messages, { role = "tool", tool_call_id = tr.tool_use_id, content = tool_content }) end -- tool_result 旁边可能还有 text if #conv.text_parts > 0 then table_insert(messages, { role = msg.role, content = table_concat(conv.text_parts, "\n\n") }) end end
-- 普通内容(无 tool_calls 也无 tool_results) if #conv.tool_calls == 0 and #conv.tool_results == 0 then -- 如果只有 text,简化为 string if #conv.openai_contents == 1 and conv.openai_contents[1].type == "text" then table_insert(messages, { role = msg.role, content = conv.openai_contents[1].text }) else table_insert(messages, { role = msg.role, content = conv.openai_contents }) end end end end end
openai_body.messages = messages
-- tools 转换: Claude {name, description, input_schema} -> OpenAI {type, function} if claude_body.tools then local openai_tools = {} for _, tool in ipairs(claude_body.tools) do table_insert(openai_tools, { type = "function", ["function"] = { name = tool.name, description = tool.description, parameters = tool.input_schema } }) end openai_body.tools = openai_tools end
-- tool_choice 转换 if claude_body.tool_choice then local tc = claude_body.tool_choice if tc.type == "auto" then openai_body.tool_choice = "auto" elseif tc.type == "any" then openai_body.tool_choice = "required" elseif tc.type == "tool" and tc.name then openai_body.tool_choice = { type = "function", ["function"] = { name = tc.name } } end -- parallel_tool_calls if tc.disable_parallel_tool_use ~= nil then openai_body.parallel_tool_calls = not tc.disable_parallel_tool_use end end
-- thinking 转换: Claude {type:"enabled", budget_tokens:N} -> OpenAI reasoning 参数 if claude_body.thinking then core.log.debug("[Claude->OpenAI] Found thinking config: type=", claude_body.thinking.type, ", budget_tokens=", claude_body.thinking.budget_tokens or 0) if claude_body.thinking.type == "enabled" then local budget = claude_body.thinking.budget_tokens or 8192 openai_body.reasoning_max_tokens = budget if budget < 4096 then openai_body.reasoning_effort = "low" elseif budget < 16384 then openai_body.reasoning_effort = "medium" else openai_body.reasoning_effort = "high" end core.log.debug("[Claude->OpenAI] Converted thinking config: budget_tokens=", budget, ", reasoning_effort=", openai_body.reasoning_effort, ", reasoning_max_tokens=", openai_body.reasoning_max_tokens) end end
-- output_format 转换: Claude {type:"json_schema", schema:{...}} -> OpenAI response_format -- 参考 litellm translate_anthropic_output_format_to_openai if claude_body.output_format and type(claude_body.output_format) == "table" then if claude_body.output_format.type == "json_schema" and claude_body.output_format.schema then openai_body.response_format = { type = "json_schema", json_schema = { name = "structured_output", schema = claude_body.output_format.schema, strict = true, } } core.log.debug("[Claude->OpenAI] Converted output_format to response_format: json_schema") end end
core.log.debug("[Claude->OpenAI] Converted OpenAI request body: ", cjson.encode(openai_body)) return openai_body end
------------------------------------------------------------------------ -- 非流式响应转换: OpenAI -> Claude -- 参考 Higress ConvertOpenAIResponseToClaude ------------------------------------------------------------------------
local function convert_openai_response_to_claude(openai_body, conf) core.log.debug("[OpenAI->Claude] Original OpenAI response body: ", cjson.encode(openai_body))
-- 上游返回 error 对象时,直接透传真实错误信息给前端 -- 注意:GLM 等模型可能不返回 error 字段,但其他模型可能返回 null,需排除 cjson.null if openai_body and openai_body.error and openai_body.error ~= cjson.null then local err_msg = "Unknown upstream error" if type(openai_body.error) == "table" then err_msg = openai_body.error.message or cjson.encode(openai_body.error) elseif type(openai_body.error) == "string" then err_msg = openai_body.error end core.log.debug("[OpenAI->Claude] Upstream returned error: ", err_msg) return { type = "error", error = { type = openai_body.error.type or "api_error", message = err_msg, } } end
if not openai_body or not openai_body.choices or type(openai_body.choices) ~= "table" or #openai_body.choices == 0 then return { type = "error", error = { type = "api_error", message = "Empty response from upstream" } } end
local choice = openai_body.choices[1] local msg = choice.message or {} local content = {}
-- reasoning/thinking content(兼容 reasoning_content 和 reasoning 两种字段名) -- 注意:GLM 等模型返回 "reasoning_content": null,cjson.null 是 truthy,必须用 type() 过滤 local rc = msg.reasoning_content local r = msg.reasoning local reasoning = (type(rc) == "string" and rc ~= "" and rc) or (type(r) == "string" and r ~= "" and r) or nil if reasoning then core.log.debug("[OpenAI->Claude] Added thinking content: ", string_sub(reasoning, 1, 200)) table_insert(content, { type = "thinking", thinking = reasoning, signature = "", -- Claude 协议要求 signature 字段 }) end
-- text content -- 注意:GLM 等模型可能返回 "content": null,必须用 type() 过滤 cjson.null if type(msg.content) == "string" and msg.content ~= "" then table_insert(content, { type = "text", text = msg.content }) end
-- tool_calls -> tool_use content blocks if msg.tool_calls and type(msg.tool_calls) == "table" then for _, tc in ipairs(msg.tool_calls) do local input = {} if tc["function"] and tc["function"].arguments then input = cjson.decode(tc["function"].arguments) or {} end table_insert(content, { type = "tool_use", id = tc.id, name = tc["function"] and tc["function"].name or "", input = input }) end end
-- 空 content 兜底 if #content == 0 then table_insert(content, { type = "text", text = "" }) end
-- 构建 Claude 响应 local stop_reason = openai_finish_reason_to_claude(choice.finish_reason)
-- 构建 usage(参考 litellm: input_tokens = prompt_tokens - cached_tokens) local claude_usage = { input_tokens = 0, output_tokens = 0, } if type(openai_body.usage) == "table" then local prompt_tokens = openai_body.usage.prompt_tokens or 0 local cached_tokens = 0 if type(openai_body.usage.prompt_tokens_details) == "table" and openai_body.usage.prompt_tokens_details.cached_tokens then cached_tokens = openai_body.usage.prompt_tokens_details.cached_tokens end -- Claude 协议: input_tokens 是未缓存的 token 数 claude_usage.input_tokens = prompt_tokens - cached_tokens claude_usage.output_tokens = openai_body.usage.completion_tokens or 0 -- cache token 字段 if cached_tokens > 0 then claude_usage.cache_read_input_tokens = cached_tokens end -- cache_creation_input_tokens(如果上游返回) if type(openai_body.usage.prompt_tokens_details) == "table" and openai_body.usage.prompt_tokens_details.cache_creation_input_tokens then claude_usage.cache_creation_input_tokens = openai_body.usage.prompt_tokens_details.cache_creation_input_tokens end end
local claude_resp = { id = openai_body.id or ("msg_" .. ngx_now()), type = "message", role = "assistant", content = content, model = openai_body.model or "unknown", stop_reason = stop_reason, stop_sequence = cjson.null, usage = claude_usage, }
core.log.debug("[OpenAI->Claude] Converted Claude response body: ", cjson.encode(claude_resp)) return claude_resp end
------------------------------------------------------------------------ -- 流式响应转换: OpenAI SSE -> Claude SSE -- 参考 Higress ConvertOpenAIStreamResponseToClaude + buildClaudeStreamResponse -- -- 关键设计(对齐 Higress): -- 1. nextContentIndex 动态递增分配 content block index -- 2. pendingStopReason 缓存 stop_reason 直到收到 usage 后一起发送 -- 3. tool_calls 支持 activeToolIndex + cachedArguments 序列化机制 -- 4. thinking block 输出 signature 字段 ------------------------------------------------------------------------
-- 创建流式转换器状态(对应 Higress ClaudeToOpenAIConverter 的状态字段) local function new_stream_ctx() return { message_start_sent = false, message_stop_sent = false, message_id = nil, -- content block index 动态分配 next_content_index = 0, -- thinking block thinking_started = false, thinking_stopped = false, thinking_index = -1, -- text block text_started = false, text_stopped = false, text_index = -1, -- tool call 状态追踪 tool_call_states = {}, -- map[openai_index] = {id, name, claude_index, started, stopped, cached_args} active_tool_index = nil, -- 当前活跃的 tool call index -- stop_reason 缓存 pending_stop_reason = nil, -- usage output_tokens = 0, } end
-- 启动一个 tool call content block(参考 Higress startToolCall) local function start_tool_call(sctx, tool_state, conf) local events = {}
-- 关闭 thinking block if sctx.thinking_started and not sctx.thinking_stopped then sctx.thinking_stopped = true core.log.debug("[OpenAI->Claude] Closing thinking content block before tool use") table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index }) }) end
-- 关闭 text block if sctx.text_started and not sctx.text_stopped then sctx.text_stopped = true core.log.debug("[OpenAI->Claude] Closing text content block before tool use") table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.text_index }) }) end
-- 分配 Claude content index tool_state.claude_index = sctx.next_content_index sctx.next_content_index = sctx.next_content_index + 1 tool_state.started = true
core.log.debug("[OpenAI->Claude] Started tool call: Claude index=", tool_state.claude_index, ", id=", tool_state.id, ", name=", tool_state.name)
-- content_block_start table_insert(events, { event = "content_block_start", data = cjson.encode({ type = "content_block_start", index = tool_state.claude_index, content_block = { type = "tool_use", id = tool_state.id, name = tool_state.name, input = {} } }) })
-- 输出缓存的 arguments if tool_state.cached_args and tool_state.cached_args ~= "" then core.log.debug("[OpenAI->Claude] Outputting cached arguments for tool: ", tool_state.cached_args) table_insert(events, { event = "content_block_delta", data = cjson.encode({ type = "content_block_delta", index = tool_state.claude_index, delta = { type = "input_json_delta", partial_json = tool_state.cached_args } }) }) end
return events end
-- 处理单个 OpenAI SSE data chunk,返回 Claude SSE events 列表 -- 参考 Higress buildClaudeStreamResponse local function build_claude_stream_events(sctx, openai_chunk, conf) local events = {}
local choice if openai_chunk.choices and type(openai_chunk.choices) == "table" and #openai_chunk.choices > 0 then choice = openai_chunk.choices[1] else choice = { index = 0, delta = {} } end local delta = choice.delta or {}
-- 日志:分析当前 chunk 包含的内容 local has_role = type(delta.role) == "string" and delta.role ~= "" local has_content = type(delta.content) == "string" and delta.content ~= "" local has_finish = choice.finish_reason ~= nil and choice.finish_reason ~= cjson.null local has_usage = type(openai_chunk.usage) == "table" local has_reasoning = (type(delta.reasoning_content) == "string" and delta.reasoning_content ~= "") or (type(delta.reasoning) == "string" and delta.reasoning ~= "") local has_tool_calls = delta.tool_calls ~= nil and delta.tool_calls ~= cjson.null core.log.debug("[OpenAI->Claude] Processing OpenAI chunk - Role: ", tostring(has_role), ", Content: ", tostring(has_content), ", Reasoning: ", tostring(has_reasoning), ", ToolCalls: ", tostring(has_tool_calls), ", FinishReason: ", tostring(has_finish), ", Usage: ", tostring(has_usage))
-- message_start(只发一次) -- 注意:GLM 等模型后续 chunk 中 role 为 null(cjson.null),必须用 type() 过滤 if type(delta.role) == "string" and delta.role ~= "" and not sctx.message_start_sent then sctx.message_start_sent = true sctx.message_id = openai_chunk.id
core.log.debug("[OpenAI->Claude] Generated message_start event for id: ", openai_chunk.id)
local msg_start = { type = "message_start", message = { id = openai_chunk.id or ("msg_" .. ngx_now()), type = "message", role = "assistant", content = {}, model = openai_chunk.model or "unknown", stop_reason = cjson.null, stop_sequence = cjson.null, usage = { input_tokens = 0, output_tokens = 0, cache_creation_input_tokens = 0, cache_read_input_tokens = 0, } } } -- 如果首个 chunk 就带 usage if type(openai_chunk.usage) == "table" then msg_start.message.usage.input_tokens = openai_chunk.usage.prompt_tokens or 0 end table_insert(events, { event = "message_start", data = cjson.encode(msg_start) }) elseif type(delta.role) == "string" and delta.role ~= "" and sctx.message_start_sent then -- Skip duplicate role messages from OpenRouter core.log.debug("[OpenAI->Claude] Skipping duplicate role message for id: ", openai_chunk.id) end
-- reasoning content(thinking) -- 注意:Kimi-K2.5 等模型的流式输出可能交错发送 reasoning_content 和 content -- 即某个 chunk 同时包含两者,或者 content 出现后仍有后续 reasoning_content。 -- 因此不能在收到 content 时立即关闭 thinking block,而是延迟到 finish_reason 时关闭。 -- 注意:GLM 等模型每个 chunk 都带 "reasoning_content": null,cjson.null 是 truthy 且 ~= "" 为 true, -- 必须用 type() == "string" 过滤 local rc = delta.reasoning_content local r = delta.reasoning local reasoning = (type(rc) == "string" and rc ~= "" and rc) or (type(r) == "string" and r ~= "" and r) or nil if reasoning then if not sctx.thinking_started then sctx.thinking_index = sctx.next_content_index sctx.next_content_index = sctx.next_content_index + 1 sctx.thinking_started = true table_insert(events, { event = "content_block_start", data = cjson.encode({ type = "content_block_start", index = sctx.thinking_index, content_block = { type = "thinking", thinking = "", signature = "" } }) }) end table_insert(events, { event = "content_block_delta", data = cjson.encode({ type = "content_block_delta", index = sctx.thinking_index, delta = { type = "thinking_delta", thinking = reasoning } }) }) end
-- text content -- 不在此处关闭 thinking block,因为后续 chunk 可能还有 reasoning_content(Kimi-K2.5 交错行为)。 -- thinking block 的关闭统一在 finish_reason 处理中完成 -- 注意:GLM 等模型每个 chunk 都带 "content": null,必须用 type() == "string" 过滤 cjson.null if type(delta.content) == "string" and delta.content ~= "" then if not sctx.text_started then sctx.text_index = sctx.next_content_index sctx.next_content_index = sctx.next_content_index + 1 sctx.text_started = true table_insert(events, { event = "content_block_start", data = cjson.encode({ type = "content_block_start", index = sctx.text_index, content_block = { type = "text", text = "" } }) }) end table_insert(events, { event = "content_block_delta", data = cjson.encode({ type = "content_block_delta", index = sctx.text_index, delta = { type = "text_delta", text = delta.content } }) }) end
-- tool_calls(参考 Higress 的 activeToolIndex + cachedArguments 机制) if delta.tool_calls and type(delta.tool_calls) == "table" then -- 确保 message_start 已发送 if not sctx.message_start_sent then sctx.message_start_sent = true sctx.message_id = openai_chunk.id core.log.debug("[OpenAI->Claude] Generated message_start event before tool calls for id: ", openai_chunk.id) table_insert(events, { event = "message_start", data = cjson.encode({ type = "message_start", message = { id = openai_chunk.id or ("msg_" .. ngx_now()), type = "message", role = "assistant", content = {}, model = openai_chunk.model or "unknown", stop_reason = cjson.null, stop_sequence = cjson.null, usage = { input_tokens = 0, output_tokens = 0 } } }) }) end
for _, tc in ipairs(delta.tool_calls) do local idx = tc.index or 0
-- 新 tool call(有 id 和 name) if tc.id and tc.id ~= "" and tc["function"] and tc["function"].name and tc["function"].name ~= "" then core.log.debug("[OpenAI->Claude] Processing tool call delta: index=", idx, ", id=", tc.id, ", name=", tc["function"].name, ", args=", tc["function"].arguments or "") if not sctx.tool_call_states[idx] then sctx.tool_call_states[idx] = { id = tc.id, name = tc["function"].name, claude_index = -1, started = false, stopped = false, cached_args = "", } end
-- 如果没有活跃的 tool call,立即启动 if sctx.active_tool_index == nil then sctx.active_tool_index = idx local tc_events = start_tool_call(sctx, sctx.tool_call_states[idx], conf) for _, e in ipairs(tc_events) do table_insert(events, e) end end end
-- arguments 增量 if tc["function"] and tc["function"].arguments and tc["function"].arguments ~= "" then local state = sctx.tool_call_states[idx] if state then state.cached_args = (state.cached_args or "") .. tc["function"].arguments core.log.debug("[OpenAI->Claude] Cached arguments for tool index ", idx, ": ", tc["function"].arguments, " (total: ", state.cached_args, ")") -- 只有活跃的 tool call 才实时输出 delta if sctx.active_tool_index == idx and state.started then core.log.debug("[OpenAI->Claude] Generated input_json_delta event for active tool index ", idx, ": ", tc["function"].arguments) table_insert(events, { event = "content_block_delta", data = cjson.encode({ type = "content_block_delta", index = state.claude_index, delta = { type = "input_json_delta", partial_json = tc["function"].arguments } }) }) end end end end end
-- finish_reason 处理 (v0.8: 排除 cjson.null,JSON null 在 Lua 中是 truthy userdata) if choice.finish_reason and choice.finish_reason ~= cjson.null then local claude_reason = openai_finish_reason_to_claude(choice.finish_reason) core.log.debug("[OpenAI->Claude] Processing finish_reason: ", choice.finish_reason, " -> ", claude_reason)
-- 关闭所有活跃的 content blocks if sctx.thinking_started and not sctx.thinking_stopped then sctx.thinking_stopped = true core.log.debug("[OpenAI->Claude] Generated thinking content_block_stop event at index ", sctx.thinking_index) table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index }) }) end if sctx.text_started and not sctx.text_stopped then sctx.text_stopped = true core.log.debug("[OpenAI->Claude] Generated text content_block_stop event at index ", sctx.text_index) table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.text_index }) }) end
-- 启动所有未启动的 tool calls,然后关闭所有 tool calls(按 index 排序) local sorted_indices = {} for k, _ in pairs(sctx.tool_call_states) do table_insert(sorted_indices, k) end table.sort(sorted_indices)
for _, tidx in ipairs(sorted_indices) do local ts = sctx.tool_call_states[tidx] if not ts.started then core.log.debug("[OpenAI->Claude] Starting remaining tool call at finish: index=", tidx, ", id=", ts.id, ", name=", ts.name) sctx.active_tool_index = tidx local tc_events = start_tool_call(sctx, ts, conf) for _, e in ipairs(tc_events) do table_insert(events, e) end sctx.active_tool_index = nil end end for _, tidx in ipairs(sorted_indices) do local ts = sctx.tool_call_states[tidx] if ts.started and not ts.stopped then ts.stopped = true core.log.debug("[OpenAI->Claude] Generated content_block_stop for tool at index ", tidx, ", Claude index ", ts.claude_index) table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = ts.claude_index }) }) end end
-- 缓存 stop_reason,等 usage 到了一起发(Claude 协议要求) sctx.pending_stop_reason = claude_reason core.log.debug("[OpenAI->Claude] Cached stop_reason: ", claude_reason, ", waiting for usage") end
-- usage 处理(可能和 finish_reason 在同一个 chunk,也可能在后续 chunk)? if type(openai_chunk.usage) == "table" then core.log.debug("[OpenAI->Claude] Processing usage info - input: ", openai_chunk.usage.prompt_tokens or 0, ", output: ", openai_chunk.usage.completion_tokens or 0)
-- GLM 等模型可能在 usage-only chunk 中不带 finish_reason(choices 为空数组), -- 导致 content blocks 未被关闭。在发送 message_delta 之前补关所有未关闭的 blocks。 if sctx.thinking_started and not sctx.thinking_stopped then sctx.thinking_stopped = true core.log.debug("[OpenAI->Claude] Closing thinking block before usage, index=", sctx.thinking_index) table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index }) }) end if sctx.text_started and not sctx.text_stopped then sctx.text_stopped = true core.log.debug("[OpenAI->Claude] Closing text block before usage, index=", sctx.text_index) table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.text_index }) }) end -- 关闭所有未关闭的 tool call blocks local sorted_indices = {} for k, _ in pairs(sctx.tool_call_states) do table_insert(sorted_indices, k) end table.sort(sorted_indices) for _, tidx in ipairs(sorted_indices) do local ts = sctx.tool_call_states[tidx] if ts.started and not ts.stopped then ts.stopped = true core.log.debug("[OpenAI->Claude] Closing tool block before usage, index=", ts.claude_index) table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = ts.claude_index }) }) end end
-- 参考 litellm: input_tokens = prompt_tokens - cached_tokens local prompt_tokens = openai_chunk.usage.prompt_tokens or 0 local cached_tokens = 0 if type(openai_chunk.usage.prompt_tokens_details) == "table" and openai_chunk.usage.prompt_tokens_details.cached_tokens then cached_tokens = openai_chunk.usage.prompt_tokens_details.cached_tokens end local stream_usage = { input_tokens = prompt_tokens - cached_tokens, output_tokens = openai_chunk.usage.completion_tokens or 0, } if cached_tokens > 0 then stream_usage.cache_read_input_tokens = cached_tokens end if type(openai_chunk.usage.prompt_tokens_details) == "table" and openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens then stream_usage.cache_creation_input_tokens = openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens end local msg_delta = { type = "message_delta", delta = { stop_sequence = cjson.null, }, usage = stream_usage, } -- 合并缓存的 stop_reason;如果没有缓存(GLM usage-only chunk 场景),默认 end_turn if sctx.pending_stop_reason then msg_delta.delta.stop_reason = sctx.pending_stop_reason core.log.debug("[OpenAI->Claude] Combining cached stop_reason ", sctx.pending_stop_reason, " with usage") sctx.pending_stop_reason = nil else msg_delta.delta.stop_reason = "end_turn" core.log.debug("[OpenAI->Claude] No cached stop_reason, defaulting to end_turn") end table_insert(events, { event = "message_delta", data = cjson.encode(msg_delta) }) core.log.debug("[OpenAI->Claude] Generated message_delta event with usage and stop_reason")
-- message_stop if not sctx.message_stop_sent then sctx.message_stop_sent = true core.log.debug("[OpenAI->Claude] Generated message_stop event") table_insert(events, { event = "message_stop", data = cjson.encode({ type = "message_stop" }) }) end end
return events end
-- 处理 [DONE] 信号,发送所有未关闭的 block 和结束事件,并重置状态 local function build_claude_stream_done(sctx, conf) core.log.debug("[OpenAI->Claude] Processing [DONE] message, finalizing stream") local events = {}
-- 关闭所有未关闭的 content blocks if sctx.thinking_started and not sctx.thinking_stopped then sctx.thinking_stopped = true table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index }) }) end if sctx.text_started and not sctx.text_stopped then sctx.text_stopped = true table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = sctx.text_index }) }) end for _, ts in pairs(sctx.tool_call_states) do if ts.started and not ts.stopped then ts.stopped = true table_insert(events, { event = "content_block_stop", data = cjson.encode({ type = "content_block_stop", index = ts.claude_index }) }) end end
-- 如果还有缓存的 stop_reason 没发(没收到 usage 的情况) if sctx.pending_stop_reason then table_insert(events, { event = "message_delta", data = cjson.encode({ type = "message_delta", delta = { stop_reason = sctx.pending_stop_reason, stop_sequence = cjson.null }, usage = { output_tokens = 0 } }) }) sctx.pending_stop_reason = nil end
-- message_stop if sctx.message_start_sent and not sctx.message_stop_sent then sctx.message_stop_sent = true table_insert(events, { event = "message_stop", data = cjson.encode({ type = "message_stop" }) }) end
-- 重置所有状态,防止连接复用时状态污染(对齐 Higress [DONE] 后的 reset 逻辑) sctx.message_start_sent = false sctx.message_stop_sent = false sctx.message_id = nil sctx.next_content_index = 0 sctx.thinking_started = false sctx.thinking_stopped = false sctx.thinking_index = -1 sctx.text_started = false sctx.text_stopped = false sctx.text_index = -1 sctx.tool_call_states = {} sctx.active_tool_index = nil sctx.pending_stop_reason = nil sctx.output_tokens = 0
core.log.debug("[OpenAI->Claude] Reset converter state for next request") return events end
------------------------------------------------------------------------ -- APISIX 插件钩子 ------------------------------------------------------------------------
-- rewrite 阶段:在 key-auth(2500) 之前执行 header 转换,确保认证能通过 function _M.rewrite(conf, ctx) local uri = ngx.var.uri if uri ~= "/v1/messages" then core.log.debug("[APISIX] rewrite phase skipped, uri=", uri, ", method=", ngx.req.get_method(), ", x-api-key=", ngx.req.get_headers()["x-api-key"] and "present" or "nil", ", authorization=", ngx.req.get_headers()["authorization"] and "present" or "nil") return end
core.log.debug("[APISIX] rewrite phase triggered, uri=", uri, ", method=", ngx.req.get_method())
-- Header 转换: x-api-key -> Authorization: Bearer -- 必须在 rewrite 阶段完成,否则 key-auth 在 rewrite 阶段读不到 Authorization local api_key = ngx.req.get_headers()["x-api-key"] if api_key then core.log.debug("[Claude->OpenAI] Converting x-api-key to Authorization header") ngx.req.set_header("Authorization", "Bearer " .. api_key) ngx.req.clear_header("x-api-key") end end
-- access 阶段:改写请求体 + URI + 清理其他 header function _M.access(conf, ctx) local uri = ngx.var.uri if uri ~= "/v1/messages" then core.log.debug("[APISIX] access phase skipped, uri=", uri) return end
core.log.debug("[APISIX] access phase triggered, uri=", uri)
ngx.req.read_body() local body = ngx.req.get_body_data() if not body then -- body 可能在临时文件中(大请求体场景) local file_name = ngx.req.get_body_file() if file_name then local f = io.open(file_name, "r") if f then body = f:read("*a") f:close() end end if not body then core.log.error("failed to read request body, body_file=", file_name or "nil") return end end
local claude_body, decode_err = cjson.decode(body) if not claude_body then core.log.error("failed to decode Claude request body, err=", decode_err or "unknown", ", body_prefix=", string_sub(body, 1, 200)) return end
-- 保存 stream 标志 ctx.claude2openai_stream = claude_body.stream or false
-- 转换请求体 local openai_body = convert_claude_request_to_openai(claude_body, conf) local encoded_body, encode_err = cjson.encode(openai_body) if not encoded_body then core.log.error("failed to encode OpenAI request body, err=", encode_err or "unknown") return end ngx.req.set_body_data(encoded_body)
-- 改写 URI ngx.req.set_uri("/v1/chat/completions")
-- 清理 Claude 特有 header ngx.req.clear_header("anthropic-version") ngx.req.clear_header("anthropic-beta")
ctx.claude2openai_converted = true core.log.debug("[c2o v0.8] request converted, stream=", ctx.claude2openai_stream, ", model=", claude_body.model or "nil", ", ctx_id=", tostring(ctx)) end
-- 响应头阶段 function _M.header_filter(conf, ctx) if not ctx.claude2openai_converted then return end
core.log.debug("[APISIX] header_filter phase, stream=", ctx.claude2openai_stream, ", upstream_status=", ngx.status, ", upstream_content_type=", ngx.header["Content-Type"] or "nil", ", upstream_content_length=", ngx.header["Content-Length"] or "nil")
-- 上游返回非 2xx 时记录警告 if ngx.status >= 400 then core.log.error("upstream returned error status=", ngx.status) end
if ctx.claude2openai_stream then ngx.header["Content-Type"] = "text/event-stream" -- 关键:SSE 流式响应必须关闭缓冲,否则 nginx 会缓冲上游数据, -- 导致 body_filter 在 SSE data 行中间分割 chunk,丢失大量数据 ngx.header["X-Accel-Buffering"] = "no" else ngx.header["Content-Type"] = "application/json" end -- 响应体会被改写,移除 Content-Length ngx.header["Content-Length"] = nil end
-- 响应体阶段 -- 注意:当 ai-proxy-multi bypass 模式下,lua_body_filter 已经完成了转换, -- 其输出通过 ngx_print 会再次触发 nginx body_filter。 -- 必须检测并跳过,避免对已转换的 Claude SSE 数据二次处理。 function _M.body_filter(conf, ctx) if not ctx.claude2openai_converted then return end
-- 如果 lua_body_filter 已经在处理流式转换,跳过 nginx body_filter -- 避免对 lua_body_filter 输出的 Claude SSE 数据进行二次转换 if ctx.claude2openai_lua_body_filter_active then return end
local chunk = ngx.arg[1] local eof = ngx.arg[2]
core.log.debug("[APISIX] body_filter phase v0.6 (nginx), stream=", ctx.claude2openai_stream, ", chunk_len=", chunk and #chunk or 0, ", eof=", eof, ", status=", ngx.status)
if ctx.claude2openai_stream then -- 流式响应:逐 chunk 转换 if not ctx.claude2openai_sctx then ctx.claude2openai_sctx = new_stream_ctx() ctx.claude2openai_line_buf = "" -- SSE 行缓冲,处理 body_filter chunk 边界分割 end local sctx = ctx.claude2openai_sctx
local output_parts = {}
-- 流式请求上游返回非流式 error JSON 时(如 4xx/5xx),直接透传真实错误 if not ctx.claude2openai_stream_error_checked then ctx.claude2openai_stream_error_checked = true if ngx.status >= 400 and chunk and chunk ~= "" then core.log.error("upstream error response (stream), status=", ngx.status, ", body=", string_sub(chunk, 1, 4096)) local err_resp, _ = cjson.decode(chunk) if err_resp and err_resp.error then local err_msg = "Unknown upstream error" if type(err_resp.error) == "table" then err_msg = err_resp.error.message or cjson.encode(err_resp.error) elseif type(err_resp.error) == "string" then err_msg = err_resp.error end core.log.debug("[OpenAI->Claude] Stream request got upstream error: ", err_msg) local claude_err = cjson.encode({ type = "error", error = { type = err_resp.error.type or "api_error", message = err_msg, } }) ngx.arg[1] = claude_err return end end end
if chunk and chunk ~= "" then core.log.debug("[OpenAI->Claude] Original OpenAI streaming chunk len=", #chunk)
-- 行缓冲:nginx body_filter 可能在 SSE data 行中间分割 chunk -- 导致不完整的 JSON 行被丢弃。将上一次的不完整行和当前 chunk 拼接。 local buf = chunk if ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then buf = ctx.claude2openai_line_buf .. buf core.log.debug("[OpenAI->Claude] Prepended buffered line fragment, len=", #ctx.claude2openai_line_buf) ctx.claude2openai_line_buf = "" end
-- 检查 chunk 是否以换行结尾,如果不是,最后一行可能不完整 local last_char = string_sub(buf, -1) local complete_buf, trailing if last_char == "\n" or last_char == "\r" then complete_buf = buf trailing = nil else -- 找到最后一个换行符的位置,之后的内容缓存到下次 local last_nl = nil for i = #buf, 1, -1 do local c = string_sub(buf, i, i) if c == "\n" or c == "\r" then last_nl = i break end end if last_nl then complete_buf = string_sub(buf, 1, last_nl) trailing = string_sub(buf, last_nl + 1) else -- 整个 buf 都没有换行,全部缓存 ctx.claude2openai_line_buf = buf core.log.debug("[OpenAI->Claude] Entire chunk buffered (no newline), len=", #buf) complete_buf = nil trailing = nil end end
if trailing and trailing ~= "" then ctx.claude2openai_line_buf = trailing core.log.debug("[OpenAI->Claude] Buffered trailing incomplete line, len=", #trailing) end
if complete_buf and complete_buf ~= "" then for line in complete_buf:gmatch("[^\r\n]+") do if line:sub(1, 6) == "data: " then local data = line:sub(7) if data == "[DONE]" then local done_events = build_claude_stream_done(sctx, conf) for _, evt in ipairs(done_events) do table_insert(output_parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end else local openai_chunk, chunk_err = cjson.decode(data) if openai_chunk then -- SSE data 中包含 error 对象时,转换为 Claude error 格式 if openai_chunk.error then local err_msg = "Unknown upstream error" if type(openai_chunk.error) == "table" then err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error) elseif type(openai_chunk.error) == "string" then err_msg = openai_chunk.error end core.log.debug("[OpenAI->Claude] SSE chunk contains error: ", err_msg) local claude_err = cjson.encode({ type = "error", error = { type = openai_chunk.error.type or "api_error", message = err_msg, } }) table_insert(output_parts, "event: error\ndata: " .. claude_err .. "\n\n") else local ok, err = pcall(function() local events = build_claude_stream_events(sctx, openai_chunk, conf) core.log.debug("[OpenAI->Claude] Generated ", #events, " Claude stream events from OpenAI chunk") for i, evt in ipairs(events) do core.log.debug("[OpenAI->Claude] Stream event [", i, "/", #events, "]: ", evt.data) table_insert(output_parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end end) if not ok then core.log.error("stream event conversion error, err=", err, ", data_prefix=", string_sub(data, 1, 200)) end end else core.log.error("failed to decode stream chunk, err=", chunk_err or "unknown", ", data_prefix=", string_sub(data, 1, 200)) end end end end end -- complete_buf end
-- EOF 时处理行缓冲残留 if eof and ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then local remaining = ctx.claude2openai_line_buf ctx.claude2openai_line_buf = "" core.log.debug("[OpenAI->Claude] Processing remaining buffered line at EOF, len=", #remaining) for line in remaining:gmatch("[^\r\n]+") do if line:sub(1, 6) == "data: " then local data = line:sub(7) if data == "[DONE]" then local done_events = build_claude_stream_done(sctx, conf) for _, evt in ipairs(done_events) do table_insert(output_parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end else local openai_chunk, chunk_err = cjson.decode(data) if openai_chunk then local ok, err = pcall(function() local events = build_claude_stream_events(sctx, openai_chunk, conf) for _, evt in ipairs(events) do table_insert(output_parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end end) if not ok then core.log.error("stream event conversion error at EOF, err=", err) end else core.log.error("failed to decode buffered stream chunk at EOF, err=", chunk_err or "unknown") end end end end end
if eof and not sctx.message_stop_sent then core.log.debug("[OpenAI->Claude] EOF reached without message_stop, sending final events") local done_events = build_claude_stream_done(sctx, conf) for _, evt in ipairs(done_events) do table_insert(output_parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end end
ngx.arg[1] = #output_parts > 0 and table_concat(output_parts) or "" if #output_parts > 0 then core.log.debug("[OpenAI->Claude] Converted Claude streaming chunk: ", ngx.arg[1]) end
else -- 非流式响应:收集完整响应体后转换 if not ctx.claude2openai_resp_chunks then ctx.claude2openai_resp_chunks = {} end
if chunk and chunk ~= "" then table_insert(ctx.claude2openai_resp_chunks, chunk) if not eof then ngx.arg[1] = nil return end end
if eof then local full_body = table_concat(ctx.claude2openai_resp_chunks) core.log.debug("[OpenAI->Claude] non-stream response complete, body_len=", #full_body, ", status=", ngx.status) if ngx.status >= 400 then core.log.error("upstream error response, status=", ngx.status, ", body=", string_sub(full_body, 1, 4096)) end local openai_resp, resp_err = cjson.decode(full_body) if openai_resp then local claude_resp = convert_openai_response_to_claude(openai_resp, conf) local encoded, enc_err = cjson.encode(claude_resp) if encoded then ngx.arg[1] = encoded else core.log.error("failed to encode Claude response, err=", enc_err or "unknown") ngx.arg[1] = full_body end else core.log.error("failed to decode upstream response, err=", resp_err or "unknown", ", body_prefix=", string_sub(full_body, 1, 300)) ngx.arg[1] = full_body end end end end
------------------------------------------------------------------------ -- lua_body_filter: 供 ai-proxy-multi 的 lua_response_filter 调用 -- 签名: (conf, ctx, headers, body) -> (code, new_body) -- body 是 ai-proxy-multi 从上游body_reader() 读取的原始 SSE chunk -- 返回 nil, new_body 替换输出内容 -- -- 关键:body_reader() 按 HTTP chunk 返回数据,chunk 边界可能在 SSE data 行 -- 中间截断,必须用行缓冲处理跨 chunk 的不完整行,否则会丢失事件。 ------------------------------------------------------------------------
-- 处理完整的 SSE data 行,返回转换后的 Claude SSE 事件片段 local function process_sse_line(sctx, line, conf) local parts = {} if line:sub(1, 6) ~= "data: " then return parts end local data = line:sub(7) if data == "[DONE]" then local done_events = build_claude_stream_done(sctx, conf) for _, evt in ipairs(done_events) do table_insert(parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end else local openai_chunk, chunk_err = cjson.decode(data) if openai_chunk then if openai_chunk.error then local err_msg = "Unknown upstream error" if type(openai_chunk.error) == "table" then err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error) elseif type(openai_chunk.error) == "string" then err_msg = openai_chunk.error end local claude_err = cjson.encode({ type = "error", error = { type = openai_chunk.error.type or "api_error", message = err_msg, } }) table_insert(parts, "event: error\ndata: " .. claude_err .. "\n\n") else local ok, err = pcall(function() local events = build_claude_stream_events(sctx, openai_chunk, conf) for _, evt in ipairs(events) do table_insert(parts, "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n") end end) if not ok then core.log.warn("[c2o v0.8] convert err: ", err) end end else core.log.warn("[c2o v0.8] decode err: ", chunk_err, " data=", string_sub(data, 1, 200)) end end return parts end
function _M.lua_body_filter(conf, ctx, headers, body) core.log.debug("[c2o v0.8] ENTER converted=", tostring(ctx.claude2openai_converted), " stream=", tostring(ctx.claude2openai_stream), " body_len=", body and #body or 0, " ctx_id=", tostring(ctx)) if not ctx.claude2openai_converted then return end if not ctx.claude2openai_stream then return end
-- 标记 lua_body_filter 已激活,阻止 nginx body_filter 二次处理 ctx.claude2openai_lua_body_filter_active = true
-- 流式响应转换 if not ctx.claude2openai_sctx then ctx.claude2openai_sctx = new_stream_ctx() ctx.claude2openai_lua_line_buf = "" -- 行缓冲:处理跨 chunk 的不完整 SSE 行 core.log.debug("[c2o v0.8] INIT stream ctx") end local sctx = ctx.claude2openai_sctx
if not body or body == "" then return nil, "" end
core.log.debug("[c2o v0.8] IN len=", #body, " first80=", string_sub(body, 1, 80))
local output_parts = {}
-- 拼接上次残留的不完整行 local buf = body if ctx.claude2openai_lua_line_buf and ctx.claude2openai_lua_line_buf ~= "" then buf = ctx.claude2openai_lua_line_buf .. buf core.log.debug("[c2o v0.8] prepend buf len=", #ctx.claude2openai_lua_line_buf) ctx.claude2openai_lua_line_buf = "" end
-- 检查 buf 是否以换行结尾;如果不是,最后一行可能不完整 local last_char = string_sub(buf, -1) local complete_buf, trailing if last_char == "\n" or last_char == "\r" then complete_buf = buf trailing = nil else -- 找最后一个换行符 local last_nl = nil for i = #buf, 1, -1 do local c = string_sub(buf, i, i) if c == "\n" or c == "\r" then last_nl = i break end end if last_nl then complete_buf = string_sub(buf, 1, last_nl) trailing = string_sub(buf, last_nl + 1) else -- 整个 buf 都没有换行,全部缓存到下次 ctx.claude2openai_lua_line_buf = buf core.log.debug("[c2o v0.8] no newline, buffered all len=", #buf) return nil, "" end end
if trailing and trailing ~= "" then ctx.claude2openai_lua_line_buf = trailing core.log.debug("[c2o v0.8] trailing buf len=", #trailing, " content=", string_sub(trailing, 1, 80)) end
-- 逐行解析完整的 SSE 行 for line in complete_buf:gmatch("[^\r\n]+") do local parts = process_sse_line(sctx, line, conf) for _, p in ipairs(parts) do table_insert(output_parts, p) end end
local out = table_concat(output_parts) core.log.debug("[c2o v0.8] OUT len=", #out, " events=", #output_parts, " first100=", string_sub(out, 1, 100)) return nil, out end
return _M
|