forked from OSS/graphql
Copy subscription code
This commit is contained in:
parent
f808d0664f
commit
c7e586a125
@ -38,7 +38,6 @@ library
|
||||
Language.GraphQL.Error
|
||||
Language.GraphQL.Execute
|
||||
Language.GraphQL.Execute.Coerce
|
||||
Language.GraphQL.Executor
|
||||
Language.GraphQL.Execute.OrderedMap
|
||||
Language.GraphQL.Type
|
||||
Language.GraphQL.Type.In
|
||||
@ -52,6 +51,7 @@ library
|
||||
Language.GraphQL.Execute.Internal
|
||||
Language.GraphQL.Execute.Subscribe
|
||||
Language.GraphQL.Execute.Transform
|
||||
Language.GraphQL.Executor
|
||||
Language.GraphQL.Type.Definition
|
||||
Language.GraphQL.Type.Internal
|
||||
Language.GraphQL.Validate.Rules
|
||||
|
@ -11,12 +11,14 @@
|
||||
module Language.GraphQL.Executor
|
||||
( Error(..)
|
||||
, Operation(..)
|
||||
, QueryError(..)
|
||||
, Path(..)
|
||||
, ResponseEventStream
|
||||
, Response(..)
|
||||
, Segment(..)
|
||||
, executeRequest
|
||||
, execute
|
||||
) where
|
||||
|
||||
import Conduit (ConduitT, mapMC, (.|))
|
||||
import Control.Arrow (left)
|
||||
import Control.Monad.Catch
|
||||
( Exception(..)
|
||||
, MonadCatch(..)
|
||||
@ -54,6 +56,7 @@ import qualified Language.GraphQL.Type as Type
|
||||
import qualified Language.GraphQL.Type.Internal as Type.Internal
|
||||
import Language.GraphQL.Type.Schema (Schema, Type)
|
||||
import qualified Language.GraphQL.Type.Schema as Schema
|
||||
import Language.GraphQL.Error (Error(..), Response(..), Path(..))
|
||||
|
||||
data Replacement m = Replacement
|
||||
{ variableValues :: Type.Subs
|
||||
@ -87,7 +90,7 @@ instance MonadCatch m => MonadCatch (TransformT m) where
|
||||
TransformT $ catch stack $ runTransformT . handler
|
||||
|
||||
newtype ExecutorT m a = ExecutorT
|
||||
{ runExecutorT :: ReaderT (HashMap Full.Name (Type m)) (WriterT [Error] m) a
|
||||
{ runExecutorT :: ReaderT (HashMap Full.Name (Type m)) (WriterT (Seq Error) m) a
|
||||
}
|
||||
|
||||
instance Functor m => Functor (ExecutorT m) where
|
||||
@ -161,19 +164,6 @@ instance Exception FieldException where
|
||||
toException = graphQLExceptionToException
|
||||
fromException = graphQLExceptionFromException
|
||||
|
||||
data Segment = Segment String | Index Int
|
||||
|
||||
data Error = Error
|
||||
{ message :: String
|
||||
, locations :: [Full.Location]
|
||||
, path :: [Segment]
|
||||
}
|
||||
|
||||
data Response a = Response
|
||||
{ data' :: a
|
||||
, errors :: [Error]
|
||||
}
|
||||
|
||||
data QueryError
|
||||
= OperationNameRequired
|
||||
| OperationNotFound String
|
||||
@ -187,32 +177,33 @@ queryError :: QueryError -> Error
|
||||
queryError OperationNameRequired =
|
||||
Error{ message = "Operation name is required.", locations = [], path = [] }
|
||||
queryError (OperationNotFound operationName) =
|
||||
let queryErrorMessage = concat
|
||||
let queryErrorMessage = Text.concat
|
||||
[ "Operation \""
|
||||
, operationName
|
||||
, Text.pack operationName
|
||||
, "\" not found."
|
||||
]
|
||||
in Error{ message = queryErrorMessage, locations = [], path = [] }
|
||||
queryError (CoercionError variableDefinition) =
|
||||
let Full.VariableDefinition variableName _ _ location = variableDefinition
|
||||
queryErrorMessage = concat
|
||||
queryErrorMessage = Text.concat
|
||||
[ "Failed to coerce the variable \""
|
||||
, Text.unpack variableName
|
||||
, variableName
|
||||
, "\"."
|
||||
]
|
||||
in Error{ message = queryErrorMessage, locations = [location], path = [] }
|
||||
queryError (UnknownInputType variableDefinition) =
|
||||
let Full.VariableDefinition variableName variableTypeName _ location = variableDefinition
|
||||
queryErrorMessage = concat
|
||||
queryErrorMessage = Text.concat
|
||||
[ "Variable \""
|
||||
, Text.unpack variableName
|
||||
, variableName
|
||||
, "\" has unknown type \""
|
||||
, show variableTypeName
|
||||
, Text.pack $ show variableTypeName
|
||||
, "\"."
|
||||
]
|
||||
in Error{ message = queryErrorMessage, locations = [location], path = [] }
|
||||
|
||||
data Operation m = Operation Full.OperationType (Seq (Selection m))
|
||||
data Operation m
|
||||
= Operation Full.OperationType (Seq (Selection m)) Full.Location
|
||||
|
||||
data Selection m
|
||||
= FieldSelection (Field m)
|
||||
@ -252,12 +243,12 @@ document = foldr filterOperation ([], HashMap.empty)
|
||||
filterOperation _ accumulator = accumulator -- Type system definitions.
|
||||
|
||||
transform :: Monad m => Full.OperationDefinition -> TransformT m (Operation m)
|
||||
transform (Full.OperationDefinition operationType _ _ _ selectionSet' _) = do
|
||||
transform (Full.OperationDefinition operationType _ _ _ selectionSet' operationLocation) = do
|
||||
transformedSelections <- selectionSet selectionSet'
|
||||
pure $ Operation operationType transformedSelections
|
||||
transform (Full.SelectionSet selectionSet' _) = do
|
||||
pure $ Operation operationType transformedSelections operationLocation
|
||||
transform (Full.SelectionSet selectionSet' operationLocation) = do
|
||||
transformedSelections <- selectionSet selectionSet'
|
||||
pure $ Operation Full.Query transformedSelections
|
||||
pure $ Operation Full.Query transformedSelections operationLocation
|
||||
|
||||
selectionSet :: Monad m => Full.SelectionSet -> TransformT m (Seq (Selection m))
|
||||
selectionSet = selectionSetOpt . NonEmpty.toList
|
||||
@ -413,24 +404,34 @@ node :: Monad m => Full.Node Full.Value -> TransformT m (Maybe (Full.Node Input)
|
||||
node Full.Node{node = node', ..} =
|
||||
traverse Full.Node <$> input node' <*> pure location
|
||||
|
||||
execute :: (MonadCatch m, Coerce.VariableValue a, Coerce.Serialize b)
|
||||
=> Schema m -- ^ Resolvers.
|
||||
-> Maybe Text -- ^ Operation name.
|
||||
-> HashMap Full.Name a -- ^ Variable substitution function.
|
||||
-> Full.Document -- @GraphQL@ document.
|
||||
-> m (Either (ResponseEventStream m b) (Response b))
|
||||
execute schema' operationName subs document' =
|
||||
executeRequest schema' document' (Text.unpack <$> operationName) subs
|
||||
|
||||
executeRequest :: (MonadCatch m, Coerce.Serialize a, Coerce.VariableValue b)
|
||||
=> Schema m
|
||||
-> Full.Document
|
||||
-> Maybe String
|
||||
-> HashMap Full.Name b
|
||||
-> m (Response a)
|
||||
-> m (Either (ResponseEventStream m a) (Response a))
|
||||
executeRequest schema sourceDocument operationName variableValues = do
|
||||
operationAndVariables <- sequence buildOperation
|
||||
case operationAndVariables of
|
||||
Left queryError' -> pure
|
||||
$ Right
|
||||
$ Response Coerce.null $ pure $ queryError queryError'
|
||||
Right operation
|
||||
| Operation Full.Query topSelections <- operation ->
|
||||
executeQuery topSelections schema
|
||||
| Operation Full.Mutation topSelections <- operation ->
|
||||
executeMutation topSelections schema
|
||||
| Operation Full.Subscription topSelections <- operation ->
|
||||
subscribe topSelections schema
|
||||
| Operation Full.Query topSelections _operationLocation <- operation ->
|
||||
Right <$> executeQuery topSelections schema
|
||||
| Operation Full.Mutation topSelections operationLocation <- operation ->
|
||||
Right <$> executeMutation topSelections schema operationLocation
|
||||
| Operation Full.Subscription topSelections operationLocation <- operation ->
|
||||
either rightErrorResponse Left <$> subscribe topSelections schema operationLocation
|
||||
where
|
||||
schemaTypes = Schema.types schema
|
||||
(operationDefinitions, fragmentDefinitions') = document sourceDocument
|
||||
@ -450,6 +451,9 @@ executeRequest schema sourceDocument operationName variableValues = do
|
||||
$ runTransformT
|
||||
$ transform operationDefinition
|
||||
|
||||
rightErrorResponse :: Coerce.Serialize b => forall a. Error -> Either a (Response b)
|
||||
rightErrorResponse = Right . Response Coerce.null . pure
|
||||
|
||||
getOperation :: [Full.OperationDefinition] -> Maybe String -> Either QueryError Full.OperationDefinition
|
||||
getOperation [operation] Nothing = Right operation
|
||||
getOperation operations (Just givenOperationName)
|
||||
@ -470,36 +474,31 @@ executeQuery topSelections schema = do
|
||||
(data', errors) <- runWriterT
|
||||
$ flip runReaderT (Schema.types schema)
|
||||
$ runExecutorT
|
||||
$ executeSelectionSet topSelections queryType Type.Null []
|
||||
$ executeSelectionSet topSelections queryType Type.Null []
|
||||
pure $ Response data' errors
|
||||
|
||||
executeMutation :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> Seq (Selection m)
|
||||
-> Schema m
|
||||
-> Full.Location
|
||||
-> m (Response a)
|
||||
executeMutation topSelections schema
|
||||
executeMutation topSelections schema operationLocation
|
||||
| Just mutationType <- Schema.mutation schema = do
|
||||
(data', errors) <- runWriterT
|
||||
$ flip runReaderT (Schema.types schema)
|
||||
$ runExecutorT
|
||||
$ executeSelectionSet topSelections mutationType Type.Null []
|
||||
$ executeSelectionSet topSelections mutationType Type.Null []
|
||||
pure $ Response data' errors
|
||||
| otherwise = pure $ Response Coerce.null
|
||||
[Error "Schema doesn't define a mutation type." [] []]
|
||||
|
||||
-- TODO: Subscribe.
|
||||
subscribe :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> Seq (Selection m)
|
||||
-> Schema m
|
||||
-> m (Response a)
|
||||
subscribe _operation _schema =
|
||||
pure $ Response Coerce.null mempty
|
||||
| otherwise = pure
|
||||
$ Response Coerce.null
|
||||
$ Seq.singleton
|
||||
$ Error "Schema doesn't support mutations." [operationLocation] []
|
||||
|
||||
executeSelectionSet :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> Seq (Selection m)
|
||||
-> Out.ObjectType m
|
||||
-> Type.Value
|
||||
-> [Segment]
|
||||
-> [Path]
|
||||
-> ExecutorT m a
|
||||
executeSelectionSet selections objectType objectValue errorPath = do
|
||||
let groupedFieldSet = collectFields objectType selections
|
||||
@ -512,15 +511,15 @@ executeSelectionSet selections objectType objectValue errorPath = do
|
||||
go fields@(Field _ fieldName _ _ _ :| _) =
|
||||
traverse (executeField' fields) $ HashMap.lookup fieldName resolvers
|
||||
|
||||
fieldsSegment :: forall m. NonEmpty (Field m) -> Segment
|
||||
fieldsSegment :: forall m. NonEmpty (Field m) -> Path
|
||||
fieldsSegment (Field alias fieldName _ _ _ :| _) =
|
||||
Segment (Text.unpack $ fromMaybe fieldName alias)
|
||||
Segment (fromMaybe fieldName alias)
|
||||
|
||||
executeField :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> Type.Value
|
||||
-> NonEmpty (Field m)
|
||||
-> Out.Resolver m
|
||||
-> [Segment]
|
||||
-> [Path]
|
||||
-> ExecutorT m a
|
||||
executeField objectValue fields resolver errorPath =
|
||||
let Field _ fieldName inputArguments _ fieldLocation :| _ = fields
|
||||
@ -531,8 +530,8 @@ executeField objectValue fields resolver errorPath =
|
||||
-> GraphQLException
|
||||
-> ExecutorT m a
|
||||
exceptionHandler fieldLocation e =
|
||||
let newError = Error (displayException e) [fieldLocation] errorPath
|
||||
in ExecutorT (lift $ tell [newError]) >> pure Coerce.null
|
||||
let newError = Error (Text.pack $ displayException e) [fieldLocation] errorPath
|
||||
in ExecutorT (lift $ tell $ Seq.singleton newError) >> pure Coerce.null
|
||||
go fieldName inputArguments = do
|
||||
let (Out.Field _ fieldType argumentTypes, resolveFunction) =
|
||||
resolverField resolver
|
||||
@ -577,7 +576,7 @@ resolveAbstractType abstractType values'
|
||||
completeValue :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> Out.Type m
|
||||
-> NonEmpty (Field m)
|
||||
-> [Segment]
|
||||
-> [Path]
|
||||
-> Type.Value
|
||||
-> ExecutorT m a
|
||||
completeValue outputType _ _ Type.Null
|
||||
@ -648,7 +647,7 @@ throwFieldError = throwM . FieldException
|
||||
coerceArgumentValues :: MonadCatch m
|
||||
=> HashMap Full.Name In.Argument
|
||||
-> HashMap Full.Name (Full.Node Input)
|
||||
-> ExecutorT m Type.Subs
|
||||
-> m Type.Subs
|
||||
coerceArgumentValues argumentDefinitions argumentValues =
|
||||
HashMap.foldrWithKey c pure argumentDefinitions mempty
|
||||
where
|
||||
@ -759,3 +758,90 @@ constValue (Full.ConstObject o) =
|
||||
where
|
||||
constObjectField Full.ObjectField{value = value', ..} =
|
||||
(name, constValue $ Full.node value')
|
||||
|
||||
type ResponseEventStream m a = ConduitT () (Response a) m ()
|
||||
|
||||
subscribe :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> Seq (Selection m)
|
||||
-> Schema m
|
||||
-> Full.Location
|
||||
-> m (Either Error (ResponseEventStream m a))
|
||||
subscribe fields schema objectLocation
|
||||
| Just objectType <- Schema.subscription schema = do
|
||||
let types' = Schema.types schema
|
||||
sourceStream <-
|
||||
createSourceEventStream types' objectType objectLocation fields
|
||||
let traverser =
|
||||
mapSourceToResponseEvent types' objectType fields
|
||||
traverse traverser sourceStream
|
||||
| otherwise = pure $ Left
|
||||
$ Error "Schema doesn't support subscriptions." [] []
|
||||
|
||||
mapSourceToResponseEvent :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> HashMap Full.Name (Type m)
|
||||
-> Out.ObjectType m
|
||||
-> Seq (Selection m)
|
||||
-> Out.SourceEventStream m
|
||||
-> m (ResponseEventStream m a)
|
||||
mapSourceToResponseEvent types' subscriptionType fields sourceStream
|
||||
= pure
|
||||
$ sourceStream
|
||||
.| mapMC (executeSubscriptionEvent types' subscriptionType fields)
|
||||
|
||||
createSourceEventStream :: MonadCatch m
|
||||
=> HashMap Full.Name (Type m)
|
||||
-> Out.ObjectType m
|
||||
-> Full.Location
|
||||
-> Seq (Selection m)
|
||||
-> m (Either Error (Out.SourceEventStream m))
|
||||
createSourceEventStream _types subscriptionType objectLocation fields
|
||||
| [fieldGroup] <- OrderedMap.elems groupedFieldSet
|
||||
, Field _ fieldName arguments' _ errorLocation <- NonEmpty.head fieldGroup
|
||||
, Out.ObjectType _ _ _ fieldTypes <- subscriptionType
|
||||
, resolverT <- fieldTypes HashMap.! fieldName
|
||||
, Out.EventStreamResolver fieldDefinition _ resolver <- resolverT
|
||||
, Out.Field _ _fieldType argumentDefinitions <- fieldDefinition =
|
||||
case coerceArgumentValues argumentDefinitions arguments' of
|
||||
Left _ -> pure
|
||||
$ Left
|
||||
$ Error "Argument coercion failed." [errorLocation] []
|
||||
Right argumentValues -> left (singleError' [errorLocation])
|
||||
<$> resolveFieldEventStream Type.Null argumentValues resolver
|
||||
| otherwise = pure
|
||||
$ Left
|
||||
$ Error "Subscription contains more than one field." [objectLocation] []
|
||||
where
|
||||
groupedFieldSet = collectFields subscriptionType fields
|
||||
|
||||
singleError' :: [Full.Location] -> String -> Error
|
||||
singleError' errorLocations message = Error (Text.pack message) errorLocations []
|
||||
|
||||
resolveFieldEventStream :: MonadCatch m
|
||||
=> Type.Value
|
||||
-> Type.Subs
|
||||
-> Out.Subscribe m
|
||||
-> m (Either String (Out.SourceEventStream m))
|
||||
resolveFieldEventStream result args resolver =
|
||||
catch (Right <$> runReaderT resolver context) handleEventStreamError
|
||||
where
|
||||
handleEventStreamError :: MonadCatch m
|
||||
=> ResolverException
|
||||
-> m (Either String (Out.SourceEventStream m))
|
||||
handleEventStreamError = pure . Left . displayException
|
||||
context = Type.Context
|
||||
{ Type.arguments = Type.Arguments args
|
||||
, Type.values = result
|
||||
}
|
||||
|
||||
executeSubscriptionEvent :: (MonadCatch m, Coerce.Serialize a)
|
||||
=> HashMap Full.Name (Type m)
|
||||
-> Out.ObjectType m
|
||||
-> Seq (Selection m)
|
||||
-> Type.Value
|
||||
-> m (Response a)
|
||||
executeSubscriptionEvent types' objectType fields initialValue = do
|
||||
(data', errors) <- runWriterT
|
||||
$ flip runReaderT types'
|
||||
$ runExecutorT
|
||||
$ executeSelectionSet fields objectType initialValue []
|
||||
pure $ Response data' errors
|
||||
|
Loading…
x
Reference in New Issue
Block a user