-
-
Notifications
You must be signed in to change notification settings - Fork 144
[Chat] Add streaming support & persistence to storage through accumulator callback chaining #928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Chat] Add streaming support & persistence to storage through accumulator callback chaining #928
Conversation
a1f4c83 to
8a71916
Compare
7d97953 to
5b388c1
Compare
ab56dc7 to
301e15c
Compare
301e15c to
377d7d6
Compare
377d7d6 to
62503c1
Compare
|
Had some trouble with Git as I never worked with a forked repo like this before, hence the force pushes.. However, the branch history should now be fixed, up-to-date & mergeable. |
src/chat/src/Chat.php
Outdated
|
|
||
| return new AccumulatingStreamResult($result, function (AssistantMessage $assistantMessage) use ($messages) { | ||
| $messages->add($assistantMessage); | ||
| $this->store->save($messages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be done in a single line with $this->store->save($messages->with($assistantMessage));
This way, we could use an arrow function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! For future PRs, it'd be helpful to consolidate these kinds of stylistic/opinion-level comments into a single pass when possible. Going through several iterations for non-functional feedback can slow things down. Just a thought though, I'm happy to help. But that'd keep contributors more motivated 🙂
| if ($result instanceof StreamResult || $result instanceof ToolboxStreamResult) { | ||
| if (!$this->store instanceof StreamableStoreInterface) { | ||
| throw new RuntimeException($this->store::class.' does not support streaming.'); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we should throw an exception here, maybe just returning the complete stream at once?
Or maybe we can just store the message "as it" in the store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fairly certain throwing is the right call here. Returning the stream "as is" would work, but it's likely to confuse developers and hide the fact that streaming and session storage are incompatible by nature. As a developer, I'd rather get an early exception that forces me to fix a faulty configuration than a silently degraded solution. I think it'll also prevent a lot of GitHub issues down the road.
| yield 'Bitter Sweet'; | ||
| yield ' '; | ||
| yield 'Symfony'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see what you did here - nice 🎶 😂
|
Hey @marco-jouwweb, sorry for the delay and thanks for the reminder in Slack - the main issues why this PR is dragging on is that your jumping on a difficult topic 😄 Streaming is one of the more mind bending this we have here, and I feel like adding another result on top is not the right way to go - that's mostly a gut feeling, but i would favor going into the lower level to solve this. regarding the my last point would be: to me it's quite questionable if and how this PR even works - you would need to switch streaming on somehow - which is usually done by action items from my point of view:
Let me know if you have follow up questions - cheers! |
Description
The
Chat::submit()function didn't support streaming. I have added support for this through:AccumulatingStreamResultwrapper which allows callback chaining and accumulates the full message (callbacks are fired onceGeneratorin innerStreamResultis exhausted, aka when stream has completed)StreamResultclasses with this wrapper inside theChat::submit()functionStoragewith a newAssistantMessagebuilt from the accumulated message (as non-stream responses are also automatically added to storage inChat::submit())Other changes:
StreamableStoreInterfacewhich indicates aStoreimplementation supports streaming. Currently added to all stores exceptSessionStorage(due to inherent issues with headers being already sent; deferred updates through callbacks go against nature of header/session lifecycles in Symfony; discussed with @Guikingone)Chat::submit()(metadata was not transferred to newAssistantMessagewe created)Chat::submit()combination