Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/graphql/GraphQLUnusualConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package graphql;

import graphql.execution.ResponseMapFactory;
import graphql.execution.incremental.IncrementalExecutionContextKeys;
import graphql.introspection.GoodFaithIntrospection;
import graphql.parser.ParserOptions;
import graphql.schema.PropertyDataFetcherHelper;
Expand Down Expand Up @@ -337,6 +338,15 @@ public IncrementalSupportConfig enableIncrementalSupport(boolean enable) {
contextConfig.put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, enable);
return this;
}

/**
* This controls whether @defer field execution starts as early as possible.
*/
@ExperimentalApi
public IncrementalSupportConfig enableEarlyIncrementalFieldExecution(boolean enable) {
contextConfig.put(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, enable);
return this;
}
}

public static class DataloaderConfig extends BaseContextConfig {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/graphql/execution/ExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import graphql.execution.directives.QueryDirectives;
import graphql.execution.directives.QueryDirectivesImpl;
import graphql.execution.incremental.DeferredExecutionSupport;
import graphql.execution.incremental.IncrementalExecutionContextKeys;
import graphql.execution.instrumentation.ExecuteObjectInstrumentationContext;
import graphql.execution.instrumentation.FieldFetchingInstrumentationContext;
import graphql.execution.instrumentation.Instrumentation;
Expand Down Expand Up @@ -325,7 +326,16 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi
Object fieldValueInfo = resolveFieldWithInfo(executionContext, newParameters);
futures.addObject(fieldValueInfo);
}

}

if (executionContext.hasIncrementalSupport()
&& deferredExecutionSupport.deferredFieldsCount() > 0
&& executionContext.getGraphQLContext().getBoolean(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, false)) {

executionContext.getIncrementalCallState().startDrainingNow();
}

return futures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,8 @@ public Publisher<DelayedIncrementalPartialResult> startDeferredCalls() {
return publisher.get();
}

public void startDrainingNow() {
drainIncrementalCalls();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package graphql.execution.incremental;


import graphql.Internal;
import org.jspecify.annotations.NullMarked;

/**
* GraphQLContext keys for controlling incremental execution behavior.
*/
@Internal
@NullMarked
public final class IncrementalExecutionContextKeys {
private IncrementalExecutionContextKeys() {
}

/**
* Enables eager start of @defer processing so defered work runs before the initial result is computed.
* Defaults to false.
* <p>
* Expects a boolean value.
*/
public static final String ENABLE_EAGER_DEFER_START = "__GJ_enable_eager_defer_start";

}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to create a new entry in the unusual config - lets put it in

graphql.GraphQLUnusualConfiguration.IncrementalSupportConfig

        /**
         * This controls whether @defer field execution starts as early as possible.
         */
        @ExperimentalApi
        public IncrementalSupportConfig enableEarlyFieldExecution(boolean enable) {
            contextConfig.put(YOUR_KEY_HERE, enable);
            return this;
        }

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import graphql.ExecutionResult
import graphql.ExperimentalApi
import graphql.GraphQL
import graphql.GraphqlErrorBuilder
import graphql.GraphQLContext
import graphql.TestUtil
import graphql.execution.DataFetcherResult
import graphql.execution.pubsub.CapturingSubscriber
Expand All @@ -27,6 +28,8 @@ import spock.lang.Specification
import spock.lang.Unroll

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring
Expand Down Expand Up @@ -1726,6 +1729,151 @@ class DeferExecutionSupportIntegrationTest extends Specification {

}

def "eager defer starts before initial result completes when ENABLE_EAGER_DEFER_START"() {
given:
def deferStarted = new CountDownLatch(1)
def allowDeferredComplete = new CountDownLatch(1)

def runtimeWiring = RuntimeWiring.newRuntimeWiring()
.type(newTypeWiring("Query")
.dataFetcher("post", resolve([id: "1001"]))
)
.type(newTypeWiring("Query").dataFetcher("hello", resolve("world", 4000)))
.type(newTypeWiring("Post").dataFetcher("summary", { env ->
deferStarted.countDown()
allowDeferredComplete.await(2, TimeUnit.SECONDS)
CompletableFuture.completedFuture("A summary")
} as DataFetcher))
.type(newTypeWiring("Item").typeResolver(itemTypeResolver()))
.build()

def schema = TestUtil.schema(schemaSpec, runtimeWiring)
.transform({ b -> b.additionalDirective(Directives.DeferDirective) })
def testGraphQL = GraphQL.newGraphQL(schema).build()

def ctx = GraphQLContext.newContext().build()
ctx.put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, true)
ctx.put(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, true)

def query = '''
query {
hello
... @defer { post { summary } }
}
'''

when:
def executionInput = ExecutionInput.newExecutionInput()
.graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): true, (IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START): true])
.query(query)
.build()
def execFuture = CompletableFuture.supplyAsync {
testGraphQL.execute(executionInput)
}

then:
// Deferred fetcher starts while initial result is still computing
assert deferStarted.await(2000, TimeUnit.MILLISECONDS)
assert !execFuture.isDone()

when:
allowDeferredComplete.countDown()
def initialResult = execFuture.join() as IncrementalExecutionResult

then:
assert initialResult.toSpecification() == [
data : [hello: "world"],
hasNext: true
]

when:
def incrementalResults = getIncrementalResults(initialResult)

then:
incrementalResults == [
[
hasNext : false,
incremental: [
[
path: [],
data: [post: [summary: "A summary"]]
]
]
]
]
}


def "incremental starts only after initial result when eager start disabled"() {
given:
def deferStarted = new CountDownLatch(1)
def allowDeferredComplete = new CountDownLatch(1)

def runtimeWiring = RuntimeWiring.newRuntimeWiring()
.type(newTypeWiring("Query")
.dataFetcher("post", resolve([id: "1001"]))
)
.type(newTypeWiring("Query").dataFetcher("hello", resolve("world", 300)))
.type(newTypeWiring("Post").dataFetcher("summary", { env ->
deferStarted.countDown()
allowDeferredComplete.await(2, TimeUnit.SECONDS)
CompletableFuture.completedFuture("A summary")
} as DataFetcher))
.type(newTypeWiring("Item").typeResolver(itemTypeResolver()))
.build()

def schema = TestUtil.schema(schemaSpec, runtimeWiring)
.transform({ b -> b.additionalDirective(Directives.DeferDirective) })
def testGraphQL = GraphQL.newGraphQL(schema).build()

def query = '''
query {
hello
... @defer { post { summary } }
}
'''

when:
def executionInput = ExecutionInput.newExecutionInput()
.graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): true]) // no eager flag
.query(query)
.build()
def execFuture = CompletableFuture.supplyAsync {
testGraphQL.execute(executionInput)
}

then:
assert !deferStarted.await(100, TimeUnit.MILLISECONDS)
assert !execFuture.isDone()

when:
def initialResult = execFuture.join() as IncrementalExecutionResult

then:
assert initialResult.toSpecification() == [
data : [hello: "world"],
hasNext: true
]
assert deferStarted.count == 1 // still not started, no subscriber yet

when:
allowDeferredComplete.countDown()
def incrementalResults = getIncrementalResults(initialResult)

then:
incrementalResults == [
[
hasNext : false,
incremental: [
[
path: [],
data: [post: [summary: "A summary"]]
]
]
]
]
}


private ExecutionResult executeQuery(String query) {
return this.executeQuery(query, true, [:])
Expand Down
Loading