J'essaie de conserver une certaine date générée par un planificateur, puis de les lire via un point de terminaison de repos d'une manière de flux. J'ai un modèle très simple, un Feed
contenant uniquement uuid et du texte. J'ai également un référentiel qui étend ReactiveCouchbaseRepository
.
La persistance fonctionne lorsque j'utilise le CouchbaseRepository
.
voici mon Feed
:
Tue Apr 09 15:12:02 CEST 2019 There was an unexpected error (type=Internal Server Error, status=500). View feed/all does not exist. com.couchbase.client.java.error.ViewDoesNotExistException: View feed/all does not exist. at com.couchbase.client.java.view.ViewQueryResponseMapper$BuildViewResult.call(ViewQueryResponseMapper.java:211) at com.couchbase.client.java.view.ViewQueryResponseMapper$BuildViewResult.call(ViewQueryResponseMapper.java:185) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:113) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:281) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.FluxDefer] : reactor.core.publisher.Flux.defer(Flux.java:805) org.springframework.data.repository.util.ReactiveWrapperConverters$RxJava1ObservableToFluxConverter.convert(ReactiveWrapperConverters.java:682) Error has been observed by the following operator(s): |_ Flux.defer ââ¡Â¢ org.springframework.data.repository.util.ReactiveWrapperConverters$RxJava1ObservableToFluxConverter.convert(ReactiveWrapperConverters.java:682) |_ Flux.map ââ¡Â¢ org.springframework.http.codec.ServerSentEventHttpMessageWriter.encode(ServerSentEventHttpMessageWriter.java:119) |_ Mono.doOnError ââ¡Â¢ org.springframework.http.server.reactive.AbstractServerHttpResponse.writeAndFlushWith(AbstractServerHttpResponse.java:186) |_ Mono.flatMap ââ¡Â¢ org.springframework.web.reactive.DispatcherHandler.lambda$handleResult$5(DispatcherHandler.java:175) |_ Mono.onErrorResume ââ¡Â¢ org.springframework.web.reactive.DispatcherHandler.handleResult(DispatcherHandler.java:175) |_ Mono.flatMap ââ¡Â¢ org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:152) |_ Mono.defer ââ¡Â¢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119) |_ Mono.doAfterSuccessOrError ââ¡Â¢ org.springframework.boot.actuate.web.trace.reactive.HttpTraceWebFilter.filter(HttpTraceWebFilter.java:99) |_ Mono.flatMap ââ¡Â¢ org.springframework.boot.actuate.web.trace.reactive.HttpTraceWebFilter.filter(HttpTraceWebFilter.java:82) |_ Mono.defer ââ¡Â¢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119) |_ Mono.defer ââ¡Â¢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119) |_ Mono.doOnSuccess ââ¡Â¢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:84) |_ Mono.doOnError ââ¡Â¢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:84) |_ Mono.compose ââ¡Â¢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:76) |_ Mono.defer ââ¡Â¢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119) Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.java.document.json.JsonObject.class at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ... 16 more
et le référentiel:
@RestController public class MainController { @Autowired private FeedRepository feedRepository; @GetMapping(value = "/get/feed/live", produces = "text/event-stream") public Flux<Feed> getLiveFeeds(){ return feedRepository.findAll(); } }
ainsi que le planificateur:
.... @Autowired private FeedRepository feedRepository; @Scheduled(fixedRate = 1000) public void generate(){ feedRepository.save(new Feed(UUID.randomUUID().toString(), String.valueOf(System.currentTimeMillis()))); ...
voici mon point de terminaison:
@N1qlPrimaryIndexed @ViewIndexed(designDoc = "feed") public interface FeedRepository extends ReactiveCouchbaseRepository<Feed, String> { }
quand j'appelle le point de terminaison, j'obtiens ceci:
@Document public class Feed { @Id private String id; @Field private String text; ...
Merci pour votre aide.
MODIFIER 1: il y avait trois choses qui étaient fausses / manquantes pour persister:
@EnableReactiveCouchbaseRepositories
manquait (signalé dans l'une des réponses) AbstractReactiveCouchbaseConfiguration
mais une version standard feedRepository.save (new Feed (UUID.randomUUID (). ToString (), String.valueOf (System.currentTimeMillis ())))
manquait .subscribe () à la fin.
Maintenant, la partie persistante fonctionne. J'ai encore des problèmes avec la récupération.
edit 2
vient de découvrir ce message de débogage dans le journal lors de l'appel du point de terminaison:
2019-04-10 16: 27: 06.467 DEBUG 11172 --- [-computations-3] ccclient.java.view.ViewRetryHandler: Document de conception introuvable, l'erreur est {"errors": [{"error" : "not_found", "reason": "Design document _design / feed not found"}
3 Réponses :
Il semble que vous ayez oublié d'activer les dépôts réactifs à l'aide de l'annotation @EnableReactiveCouchbaseRepositories:
@SpringBootApplication @EnableScheduling @EnableReactiveCouchbaseRepositories public class ReactiveDemoApplication { public static void main(String[] args) { SpringApplication.run(ReactiveDemoApplication.class, args); } }
Vous pouvez en savoir plus ici:
J'ai ajouté l'annotation mais aucun nouvel élément n'apparaît dans le seau (il n'y a que les 49 initiaux que j'ai réussi à enregistrer avec le CouchbaseRepository
normal)
le document dit que la classe de configuration couchbase devrait étendre la AbstractReactiveCouchbaseConfiguration
, ce qui n'était pas le cas auparavant. Le changer n'a fait aucune différence.
Il semble que votre interface FeedRepository ne dispose pas du nom de la vue utilisée pour effectuer la recherche.
Essayez avec le nom de vue ci-dessous:
@N1qlPrimaryIndexed @ViewIndexed(designDoc = "feed", viewName = "all") public interface FeedRepository extends ReactiveCouchbaseRepository<Feed, String> { }
Pour moi, cela a fonctionné après avoir ajouté le nom de la vue.
un peu tard, mais j'espère que cela pourrait aider les autres.
Dans la programmation réactive, vous devez vous abonner à vos éditeurs, sinon le flux d'informations ne démarrera pas. Dans votre exemple, feedRepository.save (***)
n'est jamais exécuté car vous avez besoin de quelque chose comme feedRepository.save (**). Subscribe ()
.
Pour la deuxième question, comme vous demandez tous les éléments, vous devez spécifier une vue telle que {item} / all
, pour les versions plus récentes de Couchbase, vous pouvez spécifier une clé primaire comme index requis pour obtenir tous les éléments.
Pouvez-vous partager le code quelque part? parce que tout me va bien.
ici vous allez github.com/mooras/reactiveDemo
viens de répondre, laissez-moi savoir si cela fonctionne pour vous