Ich habe eine Micro, die an den mongodb changestream Meldungen einer Sammlung hören (i: e - mycollection) und die Änderungs Stream-Daten in einer anderen Sammlung aktualisieren (i; e - yourcollection)
Für Skalierbarkeit, habe ich 10 Instanz meiner Anwendung, die die gleichen changestream Meldungen lesen.
Ex -
Angenommen, in einer Zeit 100 Datensätze aktualisiert werden in „mycollection“ Sammlungen. so wird sich ändern Strom Benachrichtigung der 100 Datensätze erhalten.
1. Instanz sollte 1 gelesen - 20 Datensätze aus der changestream und zu aktualisieren, die in anderen Sammlung aufzeichnet (i: e - yourcollection)
40 Datensätze aus der changestream und zu aktualisieren, die in der Sammlung „yourcollection“ zeichnet - 2. Instanz sollte 21 aktualisieren.
60 Datensätze aus der changestream und zu aktualisieren, die in der Sammlung „yourcollection“ zeichnet - 3nd Instanz sollte 41 aktualisieren.
bald...
keine zwei Instanzen gleiche duplicte Daten aus dem Wechselstrom erhalten.
gibt es eine Möglichkeit, die oben genannten Anforderungen zu implementieren.
Code Snapshot
unten sind meine Codes, jeder Prozess doppelte Daten zu bekommen. jeder Vorschlag, dieses Szenario zu behandeln?
public String startProcess () {
MongoClient mongoClient = new MongoClient(new MongoClientURI(mongodb://localhost:27017,localhost:27018,localhost:27019/user?replSet=simpliReplica));
MongoDatabase database = mongoClient.getDatabase(mypoc);
MongoCollection<Document> collection = database.getCollection(user);
try {
Block<ChangeStreamDocument<Document>> printBlock = new Block<ChangeStreamDocument<Document>>() {
public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
System.out.println( MyService::: changeStreamDocument.getFullDocument());
}
};
// collection.watch - Establishes a Change Stream on a collection.This will identify any changes happening to the collection.
collection.watch(asList(Aggregates.match(Filters.in(operationType, asList(insert, update, replace, delete)))))
.fullDocument(FullDocument.UPDATE_LOOKUP).forEach(printBlock);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Thannks Dillip