0%

APISIX Claude 与 OpenAI 协议互转插件:支持流式、思考、工具调用与多模态

APISIX Claude 与 OpenAI 协议互转插件:支持流式、思考、工具调用与多模态

AI时代高效设计开发:Anthropic → OpenAI 协议转换插件实战

一、参考已有实现

复用 Higress、LiteLLM 等相似项目的代码实现,让 AI 直接参考。

二、准备测试用例

整理 CURL 示例等场景,让 AI 自行校验验证结果。

三、明确功能边界

阶段 转换内容
请求 Claude Messages API → OpenAI Chat Completions
响应 OpenAI Chat Completions → Claude Messages API
支持 非流式/流式、thinking、tool_use、多模态图片

四、Vibe Coding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
--
-- APISIX Plugin: claude2openai
-- 参考实现: Higress ai-proxy provider/claude_to_openai.go
--
-- 功能:
-- 1. 请求阶段:将 Claude Messages API 请求转换为 OpenAI Chat Completions 请求
-- 2. 响应阶段:将 OpenAI Chat Completions 响应转换为 Claude Messages API 响应
-- 支持:非流式、流式、thinking、tool_use、多模态图片
--

local core = require("apisix.core")
local cjson = require("cjson.safe")
local ngx = ngx
local pairs = pairs
local ipairs = ipairs
local type = type
local table_insert = table.insert
local table_concat = table.concat
local string_sub = string.sub
local string_find = string.find
local ngx_now = ngx.now

local plugin_name = "jiankunking-claude2openai"

local schema = {
type = "object",
properties = {
target_model = {
type = "string",
description = "Override the model name sent to OpenAI backend"
},
},
}

local _M = {
version = 0.1,
priority = 2650, -- 在 consumer-restriction(2640) 和 key-auth(2500) 之前执行
name = plugin_name,
schema = schema,
}

function _M.check_schema(conf)
return core.schema.check(schema, conf)
end

------------------------------------------------------------------------
-- 工具函数
------------------------------------------------------------------------

-- OpenAI finish_reason -> Claude stop_reason (参考 Higress openAIFinishReasonToClaude)
local function openai_finish_reason_to_claude(reason)
if not reason or reason == cjson.null then return "end_turn" end
local mapping = {
stop = "end_turn",
length = "max_tokens",
tool_calls = "tool_use",
content_filter = "end_turn",
}
return mapping[reason] or reason
end

-- 移除 x-anthropic-billing-header 中的动态 cch 字段以支持 prompt caching
-- 参考 Higress stripCchFromBillingHeader
-- cch 值每次请求都会变化,如果不移除会导致缓存失效
local function strip_cch_from_billing_header(text)
if type(text) ~= "string" then return text end
local prefix = "x-anthropic-billing-header:"
if string_sub(text, 1, #prefix) ~= prefix then
return text
end
-- 循环移除所有"; cch=xxx" 模式
local result = text
while true do
local cch_start = string_find(result, "; cch=", 1, true)
if not cch_start then break end
local after = cch_start + 2 -- skip "; "
local semi_pos = string_find(result, ";", after, true)
if not semi_pos then
-- cch 在末尾,直接截断
result = string_sub(result, 1, cch_start - 1)
else
-- cch 后面还有内容,移除 "; cch=xxx" 部分
result = string_sub(result, 1, cch_start - 1) .. string_sub(result, semi_pos)
end
end
return result
end

-- Claude content blocks 中提取纯文本部分
local function extract_text_parts(content_array)
local parts = {}
if type(content_array) ~= "table" then return parts end
for _, block in ipairs(content_array) do
if block.type == "text" and block.text then
table_insert(parts, block.text)
end
end
return parts
end

------------------------------------------------------------------------
-- 请求转换: Claude Messages -> OpenAI Chat Completions
-- 参Higress ConvertClaudeRequestToOpenAI
------------------------------------------------------------------------

-- 转换 Claude content blocks 数组为 OpenAI 格式
-- 返回 { text_parts, tool_calls, tool_results, openai_contents }
local function convert_content_array(content_array)
local result = {
text_parts = {},
tool_calls = {},
tool_results = {},
openai_contents = {},
}
if type(content_array) ~= "table" then return result end

for _, block in ipairs(content_array) do
if block.type == "text" and block.text then
local processed_text = strip_cch_from_billing_header(block.text)
table_insert(result.text_parts, processed_text)
local openai_content = {
type = "text",
text = processed_text
}
-- 透传 cache_control 以支持 prompt caching
if block.cache_control then
openai_content.cache_control = block.cache_control
end
table_insert(result.openai_contents, openai_content)

elseif (block.type == "image" or block.type == "document") and block.source then
-- Claude: {type:"image"/"document", source:{type:"base64", media_type:"...", data:"..."}}
-- OpenAI: {type:"image_url", image_url:{url:"data:...;base64,..."}}
local img_obj
if block.source.type == "base64" then
img_obj = {
type = "image_url",
image_url = {
url = "data:" .. (block.source.media_type or "image/png")
.. ";base64," .. block.source.data
}
}
elseif block.source.type == "url" then
img_obj = {
type = "image_url",
image_url = { url = block.source.url }
}
end
if img_obj then
if block.cache_control then
img_obj.cache_control = block.cache_control
end
table_insert(result.openai_contents, img_obj)
end

elseif block.type == "thinking" and block.thinking then
-- Claude assistant 历史消息中的 thinking block -> OpenAI thinking block
local thinking_obj = {
type = "thinking",
thinking = block.thinking,
signature = block.signature or ""
}
table_insert(result.openai_contents, thinking_obj)

elseif block.type == "redacted_thinking" then
-- Claude redacted_thinking block -> 透传
table_insert(result.openai_contents, {
type = "redacted_thinking",
data = block.data or ""
})

elseif block.type == "tool_use" then
-- Claude tool_use -> OpenAI tool_calls
if block.id and block.name then
table_insert(result.tool_calls, {
id = block.id,
type = "function",
["function"] = {
name = block.name,
arguments = cjson.encode(block.input or {}) or "{}"
}
})
core.log.debug("[Claude->OpenAI] Converted tool_use to tool_call: ", block.name)
end

elseif block.type == "tool_result" then
table_insert(result.tool_results, block)
core.log.debug("[Claude->OpenAI] Found tool_result for tool_use_id: ", block.tool_use_id or "")
end
end

return result
end

local function convert_claude_request_to_openai(claude_body, conf)
core.log.debug("[Claude->OpenAI] Original Claude request body: ", cjson.encode(claude_body))

local openai_body = {
model = conf.target_model or claude_body.model,
max_tokens = claude_body.max_tokens,
temperature = claude_body.temperature,
top_p = claude_body.top_p,
stop = claude_body.stop_sequences,
stream = claude_body.stream,
}

if openai_body.stream then
openai_body.stream_options = { include_usage = true }
end

local messages = {}

-- system: Claude 顶层字段 -> OpenAI system message(插入到 messages 最前面)
if claude_body.system then
if type(claude_body.system) == "string" then
table_insert(messages, { role = "system", content = strip_cch_from_billing_header(claude_body.system) })
elseif type(claude_body.system) == "table" then
-- system 可以是 content blocks 数组
-- 参考 litellm: 过滤掉 x-anthropic-billing-header 开头的 block,空文本 block 也跳过
local sys_contents = {}
for _, block in ipairs(claude_body.system) do
if block.type == "text" and block.text then
if block.text == "" then
-- 跳过空文本(Anthropic API 会报错)
elseif string_sub(block.text, 1, 28) == "x-anthropic-billing-header:" then
-- 跳过 billing header metadata
core.log.debug("[Claude->OpenAI] Skipping billing header system block")
else
local sys_block = {
type = "text",
text = strip_cch_from_billing_header(block.text)
}
if block.cache_control then
sys_block.cache_control = block.cache_control
end
table_insert(sys_contents, sys_block)
end
end
end
if #sys_contents > 0 then
table_insert(messages, { role = "system", content = sys_contents })
end
end
end

-- messages 转换
if claude_body.messages then
for _, msg in ipairs(claude_body.messages) do
if type(msg.content) == "string" then
-- 简单文本
table_insert(messages, { role = msg.role, content = msg.content })
elseif type(msg.content) == "table" then
local conv = convert_content_array(msg.content)

-- 有 tool_calls(assistant 消息中的 tool_use)
if #conv.tool_calls > 0 then
local openai_msg = {
role = msg.role,
tool_calls = conv.tool_calls,
}
if #conv.text_parts > 0 then
openai_msg.content = table_concat(conv.text_parts, "\n\n")
else
openai_msg.content = cjson.null
end
table_insert(messages, openai_msg)
end

-- 有 tool_results(user 消息中的 tool_result)
if #conv.tool_results > 0 then
for _, tr in ipairs(conv.tool_results) do
local tool_content
if type(tr.content) == "string" then
tool_content = tr.content
elseif type(tr.content) == "table" then
-- 参考 litellm: tool_result.content 可能包含 text + image 混合内容
local content_parts = {}
local has_non_text = false
for _, c in ipairs(tr.content) do
if type(c) == "string" then
table_insert(content_parts, { type = "text", text = c })
elseif type(c) == "table" then
if c.type == "text" then
table_insert(content_parts, { type = "text", text = c.text or "" })
elseif (c.type == "image" or c.type == "document") and c.source then
has_non_text = true
if c.source.type == "base64" then
table_insert(content_parts, {
type = "image_url",
image_url = {
url = "data:" .. (c.source.media_type or "image/png")
.. ";base64," .. c.source.data
}
})
elseif c.source.type == "url" then
table_insert(content_parts, {
type = "image_url",
image_url = { url = c.source.url }
})
end
end
end
end
-- 如果只有纯文本,合并为 string(兼容性更好)
if not has_non_text then
local texts = {}
for _, p in ipairs(content_parts) do
if p.type == "text" then table_insert(texts, p.text) end
end
tool_content = table_concat(texts, "\n")
else
-- 混合内容(text + image),保持数组格式
tool_content = content_parts
end
else
tool_content = ""
end
table_insert(messages, {
role = "tool",
tool_call_id = tr.tool_use_id,
content = tool_content
})
end
-- tool_result 旁边可能还有 text
if #conv.text_parts > 0 then
table_insert(messages, {
role = msg.role,
content = table_concat(conv.text_parts, "\n\n")
})
end
end

-- 普通内容(无 tool_calls 也无 tool_results)
if #conv.tool_calls == 0 and #conv.tool_results == 0 then
-- 如果只有 text,简化为 string
if #conv.openai_contents == 1 and conv.openai_contents[1].type == "text" then
table_insert(messages, {
role = msg.role,
content = conv.openai_contents[1].text
})
else
table_insert(messages, {
role = msg.role,
content = conv.openai_contents
})
end
end
end
end
end

openai_body.messages = messages

-- tools 转换: Claude {name, description, input_schema} -> OpenAI {type, function}
if claude_body.tools then
local openai_tools = {}
for _, tool in ipairs(claude_body.tools) do
table_insert(openai_tools, {
type = "function",
["function"] = {
name = tool.name,
description = tool.description,
parameters = tool.input_schema
}
})
end
openai_body.tools = openai_tools
end

-- tool_choice 转换
if claude_body.tool_choice then
local tc = claude_body.tool_choice
if tc.type == "auto" then
openai_body.tool_choice = "auto"
elseif tc.type == "any" then
openai_body.tool_choice = "required"
elseif tc.type == "tool" and tc.name then
openai_body.tool_choice = {
type = "function",
["function"] = { name = tc.name }
}
end
-- parallel_tool_calls
if tc.disable_parallel_tool_use ~= nil then
openai_body.parallel_tool_calls = not tc.disable_parallel_tool_use
end
end

-- thinking 转换: Claude {type:"enabled", budget_tokens:N} -> OpenAI reasoning 参数
if claude_body.thinking then
core.log.debug("[Claude->OpenAI] Found thinking config: type=", claude_body.thinking.type,
", budget_tokens=", claude_body.thinking.budget_tokens or 0)
if claude_body.thinking.type == "enabled" then
local budget = claude_body.thinking.budget_tokens or 8192
openai_body.reasoning_max_tokens = budget
if budget < 4096 then
openai_body.reasoning_effort = "low"
elseif budget < 16384 then
openai_body.reasoning_effort = "medium"
else
openai_body.reasoning_effort = "high"
end
core.log.debug("[Claude->OpenAI] Converted thinking config: budget_tokens=", budget,
", reasoning_effort=", openai_body.reasoning_effort,
", reasoning_max_tokens=", openai_body.reasoning_max_tokens)
end
end

-- output_format 转换: Claude {type:"json_schema", schema:{...}} -> OpenAI response_format
-- 参考 litellm translate_anthropic_output_format_to_openai
if claude_body.output_format and type(claude_body.output_format) == "table" then
if claude_body.output_format.type == "json_schema" and claude_body.output_format.schema then
openai_body.response_format = {
type = "json_schema",
json_schema = {
name = "structured_output",
schema = claude_body.output_format.schema,
strict = true,
}
}
core.log.debug("[Claude->OpenAI] Converted output_format to response_format: json_schema")
end
end

core.log.debug("[Claude->OpenAI] Converted OpenAI request body: ", cjson.encode(openai_body))
return openai_body
end

------------------------------------------------------------------------
-- 非流式响应转换: OpenAI -> Claude
-- 参考 Higress ConvertOpenAIResponseToClaude
------------------------------------------------------------------------

local function convert_openai_response_to_claude(openai_body, conf)
core.log.debug("[OpenAI->Claude] Original OpenAI response body: ", cjson.encode(openai_body))

-- 上游返回 error 对象时,直接透传真实错误信息给前端
-- 注意:GLM 等模型可能不返回 error 字段,但其他模型可能返回 null,需排除 cjson.null
if openai_body and openai_body.error and openai_body.error ~= cjson.null then
local err_msg = "Unknown upstream error"
if type(openai_body.error) == "table" then
err_msg = openai_body.error.message or cjson.encode(openai_body.error)
elseif type(openai_body.error) == "string" then
err_msg = openai_body.error
end
core.log.debug("[OpenAI->Claude] Upstream returned error: ", err_msg)
return {
type = "error",
error = {
type = openai_body.error.type or "api_error",
message = err_msg,
}
}
end

if not openai_body or not openai_body.choices
or type(openai_body.choices) ~= "table" or #openai_body.choices == 0 then
return {
type = "error",
error = { type = "api_error", message = "Empty response from upstream" }
}
end

local choice = openai_body.choices[1]
local msg = choice.message or {}
local content = {}

-- reasoning/thinking content(兼容 reasoning_content 和 reasoning 两种字段名)
-- 注意:GLM 等模型返回 "reasoning_content": null,cjson.null 是 truthy,必须用 type() 过滤
local rc = msg.reasoning_content
local r = msg.reasoning
local reasoning = (type(rc) == "string" and rc ~= "" and rc)
or (type(r) == "string" and r ~= "" and r)
or nil
if reasoning then
core.log.debug("[OpenAI->Claude] Added thinking content: ", string_sub(reasoning, 1, 200))
table_insert(content, {
type = "thinking",
thinking = reasoning,
signature = "", -- Claude 协议要求 signature 字段
})
end

-- text content
-- 注意:GLM 等模型可能返回 "content": null,必须用 type() 过滤 cjson.null
if type(msg.content) == "string" and msg.content ~= "" then
table_insert(content, {
type = "text",
text = msg.content
})
end

-- tool_calls -> tool_use content blocks
if msg.tool_calls and type(msg.tool_calls) == "table" then
for _, tc in ipairs(msg.tool_calls) do
local input = {}
if tc["function"] and tc["function"].arguments then
input = cjson.decode(tc["function"].arguments) or {}
end
table_insert(content, {
type = "tool_use",
id = tc.id,
name = tc["function"] and tc["function"].name or "",
input = input
})
end
end

-- 空 content 兜底
if #content == 0 then
table_insert(content, { type = "text", text = "" })
end

-- 构建 Claude 响应
local stop_reason = openai_finish_reason_to_claude(choice.finish_reason)

-- 构建 usage(参考 litellm: input_tokens = prompt_tokens - cached_tokens)
local claude_usage = {
input_tokens = 0,
output_tokens = 0,
}
if type(openai_body.usage) == "table" then
local prompt_tokens = openai_body.usage.prompt_tokens or 0
local cached_tokens = 0
if type(openai_body.usage.prompt_tokens_details) == "table"
and openai_body.usage.prompt_tokens_details.cached_tokens then
cached_tokens = openai_body.usage.prompt_tokens_details.cached_tokens
end
-- Claude 协议: input_tokens 是未缓存的 token 数
claude_usage.input_tokens = prompt_tokens - cached_tokens
claude_usage.output_tokens = openai_body.usage.completion_tokens or 0
-- cache token 字段
if cached_tokens > 0 then
claude_usage.cache_read_input_tokens = cached_tokens
end
-- cache_creation_input_tokens(如果上游返回)
if type(openai_body.usage.prompt_tokens_details) == "table"
and openai_body.usage.prompt_tokens_details.cache_creation_input_tokens then
claude_usage.cache_creation_input_tokens = openai_body.usage.prompt_tokens_details.cache_creation_input_tokens
end
end

local claude_resp = {
id = openai_body.id or ("msg_" .. ngx_now()),
type = "message",
role = "assistant",
content = content,
model = openai_body.model or "unknown",
stop_reason = stop_reason,
stop_sequence = cjson.null,
usage = claude_usage,
}

core.log.debug("[OpenAI->Claude] Converted Claude response body: ", cjson.encode(claude_resp))
return claude_resp
end

------------------------------------------------------------------------
-- 流式响应转换: OpenAI SSE -> Claude SSE
-- 参考 Higress ConvertOpenAIStreamResponseToClaude + buildClaudeStreamResponse
--
-- 关键设计(对齐 Higress):
-- 1. nextContentIndex 动态递增分配 content block index
-- 2. pendingStopReason 缓存 stop_reason 直到收到 usage 后一起发送
-- 3. tool_calls 支持 activeToolIndex + cachedArguments 序列化机制
-- 4. thinking block 输出 signature 字段
------------------------------------------------------------------------

-- 创建流式转换器状态(对应 Higress ClaudeToOpenAIConverter 的状态字段)
local function new_stream_ctx()
return {
message_start_sent = false,
message_stop_sent = false,
message_id = nil,
-- content block index 动态分配
next_content_index = 0,
-- thinking block
thinking_started = false,
thinking_stopped = false,
thinking_index = -1,
-- text block
text_started = false,
text_stopped = false,
text_index = -1,
-- tool call 状态追踪
tool_call_states = {}, -- map[openai_index] = {id, name, claude_index, started, stopped, cached_args}
active_tool_index = nil, -- 当前活跃的 tool call index
-- stop_reason 缓存
pending_stop_reason = nil,
-- usage
output_tokens = 0,
}
end

-- 启动一个 tool call content block(参考 Higress startToolCall)
local function start_tool_call(sctx, tool_state, conf)
local events = {}

-- 关闭 thinking block
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
core.log.debug("[OpenAI->Claude] Closing thinking content block before tool use")
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end

-- 关闭 text block
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
core.log.debug("[OpenAI->Claude] Closing text content block before tool use")
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end

-- 分配 Claude content index
tool_state.claude_index = sctx.next_content_index
sctx.next_content_index = sctx.next_content_index + 1
tool_state.started = true

core.log.debug("[OpenAI->Claude] Started tool call: Claude index=", tool_state.claude_index,
", id=", tool_state.id, ", name=", tool_state.name)

-- content_block_start
table_insert(events, {
event = "content_block_start",
data = cjson.encode({
type = "content_block_start",
index = tool_state.claude_index,
content_block = {
type = "tool_use",
id = tool_state.id,
name = tool_state.name,
input = {}
}
})
})

-- 输出缓存的 arguments
if tool_state.cached_args and tool_state.cached_args ~= "" then
core.log.debug("[OpenAI->Claude] Outputting cached arguments for tool: ", tool_state.cached_args)
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = tool_state.claude_index,
delta = { type = "input_json_delta", partial_json = tool_state.cached_args }
})
})
end

return events
end

-- 处理单个 OpenAI SSE data chunk,返回 Claude SSE events 列表
-- 参考 Higress buildClaudeStreamResponse
local function build_claude_stream_events(sctx, openai_chunk, conf)
local events = {}

local choice
if openai_chunk.choices and type(openai_chunk.choices) == "table" and #openai_chunk.choices > 0 then
choice = openai_chunk.choices[1]
else
choice = { index = 0, delta = {} }
end
local delta = choice.delta or {}

-- 日志:分析当前 chunk 包含的内容
local has_role = type(delta.role) == "string" and delta.role ~= ""
local has_content = type(delta.content) == "string" and delta.content ~= ""
local has_finish = choice.finish_reason ~= nil and choice.finish_reason ~= cjson.null
local has_usage = type(openai_chunk.usage) == "table"
local has_reasoning = (type(delta.reasoning_content) == "string" and delta.reasoning_content ~= "") or (type(delta.reasoning) == "string" and delta.reasoning ~= "")
local has_tool_calls = delta.tool_calls ~= nil and delta.tool_calls ~= cjson.null
core.log.debug("[OpenAI->Claude] Processing OpenAI chunk - Role: ", tostring(has_role),
", Content: ", tostring(has_content), ", Reasoning: ", tostring(has_reasoning),
", ToolCalls: ", tostring(has_tool_calls),
", FinishReason: ", tostring(has_finish), ", Usage: ", tostring(has_usage))

-- message_start(只发一次)
-- 注意:GLM 等模型后续 chunk 中 role 为 null(cjson.null),必须用 type() 过滤
if type(delta.role) == "string" and delta.role ~= "" and not sctx.message_start_sent then
sctx.message_start_sent = true
sctx.message_id = openai_chunk.id

core.log.debug("[OpenAI->Claude] Generated message_start event for id: ", openai_chunk.id)

local msg_start = {
type = "message_start",
message = {
id = openai_chunk.id or ("msg_" .. ngx_now()),
type = "message",
role = "assistant",
content = {},
model = openai_chunk.model or "unknown",
stop_reason = cjson.null,
stop_sequence = cjson.null,
usage = {
input_tokens = 0,
output_tokens = 0,
cache_creation_input_tokens = 0,
cache_read_input_tokens = 0,
}
}
}
-- 如果首个 chunk 就带 usage
if type(openai_chunk.usage) == "table" then
msg_start.message.usage.input_tokens = openai_chunk.usage.prompt_tokens or 0
end
table_insert(events, { event = "message_start", data = cjson.encode(msg_start) })
elseif type(delta.role) == "string" and delta.role ~= "" and sctx.message_start_sent then
-- Skip duplicate role messages from OpenRouter
core.log.debug("[OpenAI->Claude] Skipping duplicate role message for id: ", openai_chunk.id)
end

-- reasoning content(thinking)
-- 注意:Kimi-K2.5 等模型的流式输出可能交错发送 reasoning_content 和 content
-- 即某个 chunk 同时包含两者,或者 content 出现后仍有后续 reasoning_content。
-- 因此不能在收到 content 时立即关闭 thinking block,而是延迟到 finish_reason 时关闭。
-- 注意:GLM 等模型每个 chunk 都带 "reasoning_content": null,cjson.null 是 truthy 且 ~= "" 为 true,
-- 必须用 type() == "string" 过滤
local rc = delta.reasoning_content
local r = delta.reasoning
local reasoning = (type(rc) == "string" and rc ~= "" and rc)
or (type(r) == "string" and r ~= "" and r)
or nil
if reasoning then
if not sctx.thinking_started then
sctx.thinking_index = sctx.next_content_index
sctx.next_content_index = sctx.next_content_index + 1
sctx.thinking_started = true
table_insert(events, {
event = "content_block_start",
data = cjson.encode({
type = "content_block_start",
index = sctx.thinking_index,
content_block = { type = "thinking", thinking = "", signature = "" }
})
})
end
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = sctx.thinking_index,
delta = { type = "thinking_delta", thinking = reasoning }
})
})
end

-- text content
-- 不在此处关闭 thinking block,因为后续 chunk 可能还有 reasoning_content(Kimi-K2.5 交错行为)。
-- thinking block 的关闭统一在 finish_reason 处理中完成
-- 注意:GLM 等模型每个 chunk 都带 "content": null,必须用 type() == "string" 过滤 cjson.null
if type(delta.content) == "string" and delta.content ~= "" then
if not sctx.text_started then
sctx.text_index = sctx.next_content_index
sctx.next_content_index = sctx.next_content_index + 1
sctx.text_started = true
table_insert(events, {
event = "content_block_start",
data = cjson.encode({
type = "content_block_start",
index = sctx.text_index,
content_block = { type = "text", text = "" }
})
})
end
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = sctx.text_index,
delta = { type = "text_delta", text = delta.content }
})
})
end

-- tool_calls(参考 Higress 的 activeToolIndex + cachedArguments 机制)
if delta.tool_calls and type(delta.tool_calls) == "table" then
-- 确保 message_start 已发送
if not sctx.message_start_sent then
sctx.message_start_sent = true
sctx.message_id = openai_chunk.id
core.log.debug("[OpenAI->Claude] Generated message_start event before tool calls for id: ", openai_chunk.id)
table_insert(events, {
event = "message_start",
data = cjson.encode({
type = "message_start",
message = {
id = openai_chunk.id or ("msg_" .. ngx_now()),
type = "message", role = "assistant", content = {},
model = openai_chunk.model or "unknown",
stop_reason = cjson.null, stop_sequence = cjson.null,
usage = { input_tokens = 0, output_tokens = 0 }
}
})
})
end

for _, tc in ipairs(delta.tool_calls) do
local idx = tc.index or 0

-- 新 tool call(有 id 和 name)
if tc.id and tc.id ~= "" and tc["function"] and tc["function"].name and tc["function"].name ~= "" then
core.log.debug("[OpenAI->Claude] Processing tool call delta: index=", idx,
", id=", tc.id, ", name=", tc["function"].name,
", args=", tc["function"].arguments or "")
if not sctx.tool_call_states[idx] then
sctx.tool_call_states[idx] = {
id = tc.id,
name = tc["function"].name,
claude_index = -1,
started = false,
stopped = false,
cached_args = "",
}
end

-- 如果没有活跃的 tool call,立即启动
if sctx.active_tool_index == nil then
sctx.active_tool_index = idx
local tc_events = start_tool_call(sctx, sctx.tool_call_states[idx], conf)
for _, e in ipairs(tc_events) do
table_insert(events, e)
end
end
end

-- arguments 增量
if tc["function"] and tc["function"].arguments and tc["function"].arguments ~= "" then
local state = sctx.tool_call_states[idx]
if state then
state.cached_args = (state.cached_args or "") .. tc["function"].arguments
core.log.debug("[OpenAI->Claude] Cached arguments for tool index ", idx,
": ", tc["function"].arguments, " (total: ", state.cached_args, ")")
-- 只有活跃的 tool call 才实时输出 delta
if sctx.active_tool_index == idx and state.started then
core.log.debug("[OpenAI->Claude] Generated input_json_delta event for active tool index ", idx,
": ", tc["function"].arguments)
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = state.claude_index,
delta = { type = "input_json_delta", partial_json = tc["function"].arguments }
})
})
end
end
end
end
end

-- finish_reason 处理 (v0.8: 排除 cjson.null,JSON null 在 Lua 中是 truthy userdata)
if choice.finish_reason and choice.finish_reason ~= cjson.null then
local claude_reason = openai_finish_reason_to_claude(choice.finish_reason)
core.log.debug("[OpenAI->Claude] Processing finish_reason: ", choice.finish_reason, " -> ", claude_reason)

-- 关闭所有活跃的 content blocks
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
core.log.debug("[OpenAI->Claude] Generated thinking content_block_stop event at index ", sctx.thinking_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
core.log.debug("[OpenAI->Claude] Generated text content_block_stop event at index ", sctx.text_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end

-- 启动所有未启动的 tool calls,然后关闭所有 tool calls(按 index 排序)
local sorted_indices = {}
for k, _ in pairs(sctx.tool_call_states) do
table_insert(sorted_indices, k)
end
table.sort(sorted_indices)

for _, tidx in ipairs(sorted_indices) do
local ts = sctx.tool_call_states[tidx]
if not ts.started then
core.log.debug("[OpenAI->Claude] Starting remaining tool call at finish: index=", tidx,
", id=", ts.id, ", name=", ts.name)
sctx.active_tool_index = tidx
local tc_events = start_tool_call(sctx, ts, conf)
for _, e in ipairs(tc_events) do table_insert(events, e) end
sctx.active_tool_index = nil
end
end
for _, tidx in ipairs(sorted_indices) do
local ts = sctx.tool_call_states[tidx]
if ts.started and not ts.stopped then
ts.stopped = true
core.log.debug("[OpenAI->Claude] Generated content_block_stop for tool at index ", tidx,
", Claude index ", ts.claude_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
})
end
end

-- 缓存 stop_reason,等 usage 到了一起发(Claude 协议要求)
sctx.pending_stop_reason = claude_reason
core.log.debug("[OpenAI->Claude] Cached stop_reason: ", claude_reason, ", waiting for usage")
end

-- usage 处理(可能和 finish_reason 在同一个 chunk,也可能在后续 chunk)?
if type(openai_chunk.usage) == "table" then
core.log.debug("[OpenAI->Claude] Processing usage info - input: ",
openai_chunk.usage.prompt_tokens or 0, ", output: ",
openai_chunk.usage.completion_tokens or 0)

-- GLM 等模型可能在 usage-only chunk 中不带 finish_reason(choices 为空数组),
-- 导致 content blocks 未被关闭。在发送 message_delta 之前补关所有未关闭的 blocks。
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
core.log.debug("[OpenAI->Claude] Closing thinking block before usage, index=", sctx.thinking_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
core.log.debug("[OpenAI->Claude] Closing text block before usage, index=", sctx.text_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end
-- 关闭所有未关闭的 tool call blocks
local sorted_indices = {}
for k, _ in pairs(sctx.tool_call_states) do
table_insert(sorted_indices, k)
end
table.sort(sorted_indices)
for _, tidx in ipairs(sorted_indices) do
local ts = sctx.tool_call_states[tidx]
if ts.started and not ts.stopped then
ts.stopped = true
core.log.debug("[OpenAI->Claude] Closing tool block before usage, index=", ts.claude_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
})
end
end

-- 参考 litellm: input_tokens = prompt_tokens - cached_tokens
local prompt_tokens = openai_chunk.usage.prompt_tokens or 0
local cached_tokens = 0
if type(openai_chunk.usage.prompt_tokens_details) == "table"
and openai_chunk.usage.prompt_tokens_details.cached_tokens then
cached_tokens = openai_chunk.usage.prompt_tokens_details.cached_tokens
end
local stream_usage = {
input_tokens = prompt_tokens - cached_tokens,
output_tokens = openai_chunk.usage.completion_tokens or 0,
}
if cached_tokens > 0 then
stream_usage.cache_read_input_tokens = cached_tokens
end
if type(openai_chunk.usage.prompt_tokens_details) == "table"
and openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens then
stream_usage.cache_creation_input_tokens = openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens
end
local msg_delta = {
type = "message_delta",
delta = {
stop_sequence = cjson.null,
},
usage = stream_usage,
}
-- 合并缓存的 stop_reason;如果没有缓存(GLM usage-only chunk 场景),默认 end_turn
if sctx.pending_stop_reason then
msg_delta.delta.stop_reason = sctx.pending_stop_reason
core.log.debug("[OpenAI->Claude] Combining cached stop_reason ", sctx.pending_stop_reason, " with usage")
sctx.pending_stop_reason = nil
else
msg_delta.delta.stop_reason = "end_turn"
core.log.debug("[OpenAI->Claude] No cached stop_reason, defaulting to end_turn")
end
table_insert(events, { event = "message_delta", data = cjson.encode(msg_delta) })
core.log.debug("[OpenAI->Claude] Generated message_delta event with usage and stop_reason")

-- message_stop
if not sctx.message_stop_sent then
sctx.message_stop_sent = true
core.log.debug("[OpenAI->Claude] Generated message_stop event")
table_insert(events, {
event = "message_stop",
data = cjson.encode({ type = "message_stop" })
})
end
end

return events
end

-- 处理 [DONE] 信号,发送所有未关闭的 block 和结束事件,并重置状态
local function build_claude_stream_done(sctx, conf)
core.log.debug("[OpenAI->Claude] Processing [DONE] message, finalizing stream")
local events = {}

-- 关闭所有未关闭的 content blocks
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end
for _, ts in pairs(sctx.tool_call_states) do
if ts.started and not ts.stopped then
ts.stopped = true
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
})
end
end

-- 如果还有缓存的 stop_reason 没发(没收到 usage 的情况)
if sctx.pending_stop_reason then
table_insert(events, {
event = "message_delta",
data = cjson.encode({
type = "message_delta",
delta = { stop_reason = sctx.pending_stop_reason, stop_sequence = cjson.null },
usage = { output_tokens = 0 }
})
})
sctx.pending_stop_reason = nil
end

-- message_stop
if sctx.message_start_sent and not sctx.message_stop_sent then
sctx.message_stop_sent = true
table_insert(events, {
event = "message_stop",
data = cjson.encode({ type = "message_stop" })
})
end

-- 重置所有状态,防止连接复用时状态污染(对齐 Higress [DONE] 后的 reset 逻辑)
sctx.message_start_sent = false
sctx.message_stop_sent = false
sctx.message_id = nil
sctx.next_content_index = 0
sctx.thinking_started = false
sctx.thinking_stopped = false
sctx.thinking_index = -1
sctx.text_started = false
sctx.text_stopped = false
sctx.text_index = -1
sctx.tool_call_states = {}
sctx.active_tool_index = nil
sctx.pending_stop_reason = nil
sctx.output_tokens = 0

core.log.debug("[OpenAI->Claude] Reset converter state for next request")
return events
end

------------------------------------------------------------------------
-- APISIX 插件钩子
------------------------------------------------------------------------

-- rewrite 阶段:在 key-auth(2500) 之前执行 header 转换,确保认证能通过
function _M.rewrite(conf, ctx)
local uri = ngx.var.uri
if uri ~= "/v1/messages" then
core.log.debug("[APISIX] rewrite phase skipped, uri=", uri,
", method=", ngx.req.get_method(),
", x-api-key=", ngx.req.get_headers()["x-api-key"] and "present" or "nil",
", authorization=", ngx.req.get_headers()["authorization"] and "present" or "nil")
return
end

core.log.debug("[APISIX] rewrite phase triggered, uri=", uri,
", method=", ngx.req.get_method())

-- Header 转换: x-api-key -> Authorization: Bearer
-- 必须在 rewrite 阶段完成,否则 key-auth 在 rewrite 阶段读不到 Authorization
local api_key = ngx.req.get_headers()["x-api-key"]
if api_key then
core.log.debug("[Claude->OpenAI] Converting x-api-key to Authorization header")
ngx.req.set_header("Authorization", "Bearer " .. api_key)
ngx.req.clear_header("x-api-key")
end
end

-- access 阶段:改写请求体 + URI + 清理其他 header
function _M.access(conf, ctx)
local uri = ngx.var.uri
if uri ~= "/v1/messages" then
core.log.debug("[APISIX] access phase skipped, uri=", uri)
return
end

core.log.debug("[APISIX] access phase triggered, uri=", uri)

ngx.req.read_body()
local body = ngx.req.get_body_data()
if not body then
-- body 可能在临时文件中(大请求体场景)
local file_name = ngx.req.get_body_file()
if file_name then
local f = io.open(file_name, "r")
if f then
body = f:read("*a")
f:close()
end
end
if not body then
core.log.error("failed to read request body, body_file=",
file_name or "nil")
return
end
end

local claude_body, decode_err = cjson.decode(body)
if not claude_body then
core.log.error("failed to decode Claude request body, err=",
decode_err or "unknown", ", body_prefix=", string_sub(body, 1, 200))
return
end

-- 保存 stream 标志
ctx.claude2openai_stream = claude_body.stream or false

-- 转换请求体
local openai_body = convert_claude_request_to_openai(claude_body, conf)
local encoded_body, encode_err = cjson.encode(openai_body)
if not encoded_body then
core.log.error("failed to encode OpenAI request body, err=",
encode_err or "unknown")
return
end
ngx.req.set_body_data(encoded_body)

-- 改写 URI
ngx.req.set_uri("/v1/chat/completions")

-- 清理 Claude 特有 header
ngx.req.clear_header("anthropic-version")
ngx.req.clear_header("anthropic-beta")

ctx.claude2openai_converted = true
core.log.debug("[c2o v0.8] request converted, stream=", ctx.claude2openai_stream,
", model=", claude_body.model or "nil",
", ctx_id=", tostring(ctx))
end

-- 响应头阶段
function _M.header_filter(conf, ctx)
if not ctx.claude2openai_converted then return end

core.log.debug("[APISIX] header_filter phase, stream=", ctx.claude2openai_stream,
", upstream_status=", ngx.status,
", upstream_content_type=", ngx.header["Content-Type"] or "nil",
", upstream_content_length=", ngx.header["Content-Length"] or "nil")

-- 上游返回非 2xx 时记录警告
if ngx.status >= 400 then
core.log.error("upstream returned error status=", ngx.status)
end

if ctx.claude2openai_stream then
ngx.header["Content-Type"] = "text/event-stream"
-- 关键:SSE 流式响应必须关闭缓冲,否则 nginx 会缓冲上游数据,
-- 导致 body_filter 在 SSE data 行中间分割 chunk,丢失大量数据
ngx.header["X-Accel-Buffering"] = "no"
else
ngx.header["Content-Type"] = "application/json"
end
-- 响应体会被改写,移除 Content-Length
ngx.header["Content-Length"] = nil
end

-- 响应体阶段
-- 注意:当 ai-proxy-multi bypass 模式下,lua_body_filter 已经完成了转换,
-- 其输出通过 ngx_print 会再次触发 nginx body_filter。
-- 必须检测并跳过,避免对已转换的 Claude SSE 数据二次处理。
function _M.body_filter(conf, ctx)
if not ctx.claude2openai_converted then return end

-- 如果 lua_body_filter 已经在处理流式转换,跳过 nginx body_filter
-- 避免对 lua_body_filter 输出的 Claude SSE 数据进行二次转换
if ctx.claude2openai_lua_body_filter_active then
return
end

local chunk = ngx.arg[1]
local eof = ngx.arg[2]

core.log.debug("[APISIX] body_filter phase v0.6 (nginx), stream=", ctx.claude2openai_stream,
", chunk_len=", chunk and #chunk or 0, ", eof=", eof,
", status=", ngx.status)

if ctx.claude2openai_stream then
-- 流式响应:逐 chunk 转换
if not ctx.claude2openai_sctx then
ctx.claude2openai_sctx = new_stream_ctx()
ctx.claude2openai_line_buf = "" -- SSE 行缓冲,处理 body_filter chunk 边界分割
end
local sctx = ctx.claude2openai_sctx

local output_parts = {}

-- 流式请求上游返回非流式 error JSON 时(如 4xx/5xx),直接透传真实错误
if not ctx.claude2openai_stream_error_checked then
ctx.claude2openai_stream_error_checked = true
if ngx.status >= 400 and chunk and chunk ~= "" then
core.log.error("upstream error response (stream), status=", ngx.status,
", body=", string_sub(chunk, 1, 4096))
local err_resp, _ = cjson.decode(chunk)
if err_resp and err_resp.error then
local err_msg = "Unknown upstream error"
if type(err_resp.error) == "table" then
err_msg = err_resp.error.message or cjson.encode(err_resp.error)
elseif type(err_resp.error) == "string" then
err_msg = err_resp.error
end
core.log.debug("[OpenAI->Claude] Stream request got upstream error: ", err_msg)
local claude_err = cjson.encode({
type = "error",
error = {
type = err_resp.error.type or "api_error",
message = err_msg,
}
})
ngx.arg[1] = claude_err
return
end
end
end

if chunk and chunk ~= "" then
core.log.debug("[OpenAI->Claude] Original OpenAI streaming chunk len=", #chunk)

-- 行缓冲:nginx body_filter 可能在 SSE data 行中间分割 chunk
-- 导致不完整的 JSON 行被丢弃。将上一次的不完整行和当前 chunk 拼接。
local buf = chunk
if ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then
buf = ctx.claude2openai_line_buf .. buf
core.log.debug("[OpenAI->Claude] Prepended buffered line fragment, len=",
#ctx.claude2openai_line_buf)
ctx.claude2openai_line_buf = ""
end

-- 检查 chunk 是否以换行结尾,如果不是,最后一行可能不完整
local last_char = string_sub(buf, -1)
local complete_buf, trailing
if last_char == "\n" or last_char == "\r" then
complete_buf = buf
trailing = nil
else
-- 找到最后一个换行符的位置,之后的内容缓存到下次
local last_nl = nil
for i = #buf, 1, -1 do
local c = string_sub(buf, i, i)
if c == "\n" or c == "\r" then
last_nl = i
break
end
end
if last_nl then
complete_buf = string_sub(buf, 1, last_nl)
trailing = string_sub(buf, last_nl + 1)
else
-- 整个 buf 都没有换行,全部缓存
ctx.claude2openai_line_buf = buf
core.log.debug("[OpenAI->Claude] Entire chunk buffered (no newline), len=", #buf)
complete_buf = nil
trailing = nil
end
end

if trailing and trailing ~= "" then
ctx.claude2openai_line_buf = trailing
core.log.debug("[OpenAI->Claude] Buffered trailing incomplete line, len=", #trailing)
end

if complete_buf and complete_buf ~= "" then
for line in complete_buf:gmatch("[^\r\n]+") do
if line:sub(1, 6) == "data: " then
local data = line:sub(7)
if data == "[DONE]" then
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
else
local openai_chunk, chunk_err = cjson.decode(data)
if openai_chunk then
-- SSE data 中包含 error 对象时,转换为 Claude error 格式
if openai_chunk.error then
local err_msg = "Unknown upstream error"
if type(openai_chunk.error) == "table" then
err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error)
elseif type(openai_chunk.error) == "string" then
err_msg = openai_chunk.error
end
core.log.debug("[OpenAI->Claude] SSE chunk contains error: ", err_msg)
local claude_err = cjson.encode({
type = "error",
error = {
type = openai_chunk.error.type or "api_error",
message = err_msg,
}
})
table_insert(output_parts, "event: error\ndata: " .. claude_err .. "\n\n")
else
local ok, err = pcall(function()
local events = build_claude_stream_events(sctx, openai_chunk, conf)
core.log.debug("[OpenAI->Claude] Generated ", #events, " Claude stream events from OpenAI chunk")
for i, evt in ipairs(events) do
core.log.debug("[OpenAI->Claude] Stream event [", i, "/", #events, "]: ", evt.data)
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end)
if not ok then
core.log.error("stream event conversion error, err=",
err, ", data_prefix=", string_sub(data, 1, 200))
end
end
else
core.log.error("failed to decode stream chunk, err=",
chunk_err or "unknown", ", data_prefix=", string_sub(data, 1, 200))
end
end
end
end
end -- complete_buf
end

-- EOF 时处理行缓冲残留
if eof and ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then
local remaining = ctx.claude2openai_line_buf
ctx.claude2openai_line_buf = ""
core.log.debug("[OpenAI->Claude] Processing remaining buffered line at EOF, len=", #remaining)
for line in remaining:gmatch("[^\r\n]+") do
if line:sub(1, 6) == "data: " then
local data = line:sub(7)
if data == "[DONE]" then
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
else
local openai_chunk, chunk_err = cjson.decode(data)
if openai_chunk then
local ok, err = pcall(function()
local events = build_claude_stream_events(sctx, openai_chunk, conf)
for _, evt in ipairs(events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end)
if not ok then
core.log.error("stream event conversion error at EOF, err=", err)
end
else
core.log.error("failed to decode buffered stream chunk at EOF, err=",
chunk_err or "unknown")
end
end
end
end
end

if eof and not sctx.message_stop_sent then
core.log.debug("[OpenAI->Claude] EOF reached without message_stop, sending final events")
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end

ngx.arg[1] = #output_parts > 0 and table_concat(output_parts) or ""
if #output_parts > 0 then
core.log.debug("[OpenAI->Claude] Converted Claude streaming chunk: ", ngx.arg[1])
end

else
-- 非流式响应:收集完整响应体后转换
if not ctx.claude2openai_resp_chunks then
ctx.claude2openai_resp_chunks = {}
end

if chunk and chunk ~= "" then
table_insert(ctx.claude2openai_resp_chunks, chunk)
if not eof then
ngx.arg[1] = nil
return
end
end

if eof then
local full_body = table_concat(ctx.claude2openai_resp_chunks)
core.log.debug("[OpenAI->Claude] non-stream response complete, body_len=", #full_body,
", status=", ngx.status)
if ngx.status >= 400 then
core.log.error("upstream error response, status=", ngx.status,
", body=", string_sub(full_body, 1, 4096))
end
local openai_resp, resp_err = cjson.decode(full_body)
if openai_resp then
local claude_resp = convert_openai_response_to_claude(openai_resp, conf)
local encoded, enc_err = cjson.encode(claude_resp)
if encoded then
ngx.arg[1] = encoded
else
core.log.error("failed to encode Claude response, err=",
enc_err or "unknown")
ngx.arg[1] = full_body
end
else
core.log.error("failed to decode upstream response, err=",
resp_err or "unknown", ", body_prefix=", string_sub(full_body, 1, 300))
ngx.arg[1] = full_body
end
end
end
end

------------------------------------------------------------------------
-- lua_body_filter: 供 ai-proxy-multi 的 lua_response_filter 调用
-- 签名: (conf, ctx, headers, body) -> (code, new_body)
-- body 是 ai-proxy-multi 从上游body_reader() 读取的原始 SSE chunk
-- 返回 nil, new_body 替换输出内容
--
-- 关键:body_reader() 按 HTTP chunk 返回数据,chunk 边界可能在 SSE data 行
-- 中间截断,必须用行缓冲处理跨 chunk 的不完整行,否则会丢失事件。
------------------------------------------------------------------------

-- 处理完整的 SSE data 行,返回转换后的 Claude SSE 事件片段
local function process_sse_line(sctx, line, conf)
local parts = {}
if line:sub(1, 6) ~= "data: " then
return parts
end
local data = line:sub(7)
if data == "[DONE]" then
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
else
local openai_chunk, chunk_err = cjson.decode(data)
if openai_chunk then
if openai_chunk.error then
local err_msg = "Unknown upstream error"
if type(openai_chunk.error) == "table" then
err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error)
elseif type(openai_chunk.error) == "string" then
err_msg = openai_chunk.error
end
local claude_err = cjson.encode({
type = "error",
error = {
type = openai_chunk.error.type or "api_error",
message = err_msg,
}
})
table_insert(parts, "event: error\ndata: " .. claude_err .. "\n\n")
else
local ok, err = pcall(function()
local events = build_claude_stream_events(sctx, openai_chunk, conf)
for _, evt in ipairs(events) do
table_insert(parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end)
if not ok then
core.log.warn("[c2o v0.8] convert err: ", err)
end
end
else
core.log.warn("[c2o v0.8] decode err: ", chunk_err,
" data=", string_sub(data, 1, 200))
end
end
return parts
end

function _M.lua_body_filter(conf, ctx, headers, body)
core.log.debug("[c2o v0.8] ENTER converted=",
tostring(ctx.claude2openai_converted),
" stream=", tostring(ctx.claude2openai_stream),
" body_len=", body and #body or 0,
" ctx_id=", tostring(ctx))
if not ctx.claude2openai_converted then return end
if not ctx.claude2openai_stream then
return
end

-- 标记 lua_body_filter 已激活,阻止 nginx body_filter 二次处理
ctx.claude2openai_lua_body_filter_active = true

-- 流式响应转换
if not ctx.claude2openai_sctx then
ctx.claude2openai_sctx = new_stream_ctx()
ctx.claude2openai_lua_line_buf = "" -- 行缓冲:处理跨 chunk 的不完整 SSE 行
core.log.debug("[c2o v0.8] INIT stream ctx")
end
local sctx = ctx.claude2openai_sctx

if not body or body == "" then return nil, "" end

core.log.debug("[c2o v0.8] IN len=", #body,
" first80=", string_sub(body, 1, 80))

local output_parts = {}

-- 拼接上次残留的不完整行
local buf = body
if ctx.claude2openai_lua_line_buf and ctx.claude2openai_lua_line_buf ~= "" then
buf = ctx.claude2openai_lua_line_buf .. buf
core.log.debug("[c2o v0.8] prepend buf len=", #ctx.claude2openai_lua_line_buf)
ctx.claude2openai_lua_line_buf = ""
end

-- 检查 buf 是否以换行结尾;如果不是,最后一行可能不完整
local last_char = string_sub(buf, -1)
local complete_buf, trailing
if last_char == "\n" or last_char == "\r" then
complete_buf = buf
trailing = nil
else
-- 找最后一个换行符
local last_nl = nil
for i = #buf, 1, -1 do
local c = string_sub(buf, i, i)
if c == "\n" or c == "\r" then
last_nl = i
break
end
end
if last_nl then
complete_buf = string_sub(buf, 1, last_nl)
trailing = string_sub(buf, last_nl + 1)
else
-- 整个 buf 都没有换行,全部缓存到下次
ctx.claude2openai_lua_line_buf = buf
core.log.debug("[c2o v0.8] no newline, buffered all len=", #buf)
return nil, ""
end
end

if trailing and trailing ~= "" then
ctx.claude2openai_lua_line_buf = trailing
core.log.debug("[c2o v0.8] trailing buf len=", #trailing,
" content=", string_sub(trailing, 1, 80))
end

-- 逐行解析完整的 SSE 行
for line in complete_buf:gmatch("[^\r\n]+") do
local parts = process_sse_line(sctx, line, conf)
for _, p in ipairs(parts) do
table_insert(output_parts, p)
end
end

local out = table_concat(output_parts)
core.log.debug("[c2o v0.8] OUT len=", #out,
" events=", #output_parts,
" first100=", string_sub(out, 1, 100))
return nil, out
end

return _M

欢迎关注我的其它发布渠道