当前位置: 动力学知识库 > 问答 > 编程问答 >

mongodb - mongo scala driver - missing results while reading due to asynchronous issue?

问题描述:

I am trying to use the new Mongo Scala Driver as below.

I noticed that some data were missing (79k vs 85k expected).

import org.bson.types.ObjectId

import org.mongodb.scala.model.Projections._

import org.mongodb.scala.{Document, FindObservable, MongoClient, MongoCollection, MongoDatabase}

object MongoDBItemRepository extends Logging {

val tenantField = "_id.tenant"

}

final class MongoDBItemRepository(

uri: String,

database: String,

collection: String,

tenant: Tenant

) extends ItemRepository {

import MongoDBItemRepository._

override def retrieveItemMetadata: Set[ItemMetadata] = {

logInfo(tenant, s"Retrieving Item Repository from $uri/$database/$collection...")

val mongoClient: MongoClient = MongoClient(uri)

val db: MongoDatabase = mongoClient.getDatabase(database)

val coll: MongoCollection[Document] = db.getCollection(collection)

val repositoryHandler = new MongoDBItemRepositoryHandler(tenant)

val doc: Document = Document(tenantField -> new ObjectId(tenant.id))

val observable: FindObservable[Document] = coll

.find(doc)

.projection(include(MongoDBItemRepositoryHandler.fieldsForProjection: _*))

observable.subscribe(repositoryHandler)

val itemRepository = repositoryHandler.awaitAndGet(observable.toFuture())

mongoClient.close

itemRepository

}

}

The observer:

import org.mongodb.scala.{Document, Observer}

import scala.collection.JavaConversions._

import scala.collection.mutable

import scala.concurrent.{Await, Future}

object MongoDBItemRepositoryHandler extends Logging {

// Some fields...

}

final class MongoDBItemRepositoryHandler(tenant: Tenant) extends Observer[Document] {

import MongoDBItemRepositoryHandler._

private val itemRepository: mutable.Set[ItemMetadata] = new mutable.HashSet[ItemMetadata]

private var count: Int = 0

override def onNext(d: Document): Unit = {

count += 1

// some logic omitted

itemRepository.add(ItemMetadata(...))

}

}

override def onError(e: Throwable): Unit = logError(tenant, "Item Repository retrieval failed", e)

override def onComplete(): Unit = logInfo(tenant, s"Item Repository retrieval succeeded ($count items)")

def awaitAndGet(future: Future[Seq[Document]]): Set[ItemMetadata] = {

Await.ready(future, scala.concurrent.duration.Duration.Inf)

itemRepository.toSet

}

}

With debug, despite the Awwait.ready call, I noticed that the onComplete method is never called (no log printed)

Some WARNs may happen after the (partial) retrieval :

2016-12-14 18:49:43 WARN connection:91 - Callback onResult call produced an error

java.lang.IllegalStateException: state should be: open

at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)

at com.mongodb.connection.DefaultServer.getDescription(DefaultServer.java:114)

at com.mongodb.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getServerDescription(AsyncClusterBinding.java:97)

at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:255)

at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:235)

at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)

at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor$1.onResult(DefaultServer.java:185)

at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)

at com.mongodb.connection.CommandProtocol$CommandResultCallback.callCallback(CommandProtocol.java:271)

at com.mongodb.connection.ResponseCallback.onResult(ResponseCallback.java:48)

at com.mongodb.connection.ResponseCallback.onResult(ResponseCallback.java:23)

at com.mongodb.connection.DefaultConnectionPool$PooledConnection$2.onResult(DefaultConnectionPool.java:470)

at com.mongodb.connection.DefaultConnectionPool$PooledConnection$2.onResult(DefaultConnectionPool.java:464)

at com.mongodb.connection.UsageTrackingInternalConnection$3.onResult(UsageTrackingInternalConnection.java:119)

at com.mongodb.connection.UsageTrackingInternalConnection$3.onResult(UsageTrackingInternalConnection.java:115)

at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)

at com.mongodb.connection.InternalStreamConnection.executeCallbackAndReceiveResponse(InternalStreamConnection.java:378)

at com.mongodb.connection.InternalStreamConnection.access$1700(InternalStreamConnection.java:66)

at com.mongodb.connection.InternalStreamConnection$ResponseBuffersCallback.onResult(InternalStreamConnection.java:420)

at com.mongodb.connection.InternalStreamConnection$ResponseBuffersCallback.onResult(InternalStreamConnection.java:389)

at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback.onSuccess(InternalStreamConnection.java:562)

at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback.access$2200(InternalStreamConnection.java:517)

at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback$ResponseBodyCallback.onResult(InternalStreamConnection.java:584)

at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback$ResponseBodyCallback.onResult(InternalStreamConnection.java:568)

at com.mongodb.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:447)

at com.mongodb.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:444)

at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:218)

at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:201)

at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)

at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:281)

at sun.nio.ch.WindowsAsynchronousSocketChannelImpl$ReadTask.completed(WindowsAsynchronousSocketChannelImpl.java:579)

at sun.nio.ch.Iocp$EventHandlerTask.run(Iocp.java:397)

at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

What am I doing wrong ?

MongoDB version: 3.2.6

Driver version: 1.2.1

Scala: 2.11.8

Thanks for your help

分享给朋友:
您可能感兴趣的文章:
随机阅读: