Skip to content

Commit

Permalink
two step retain event
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed May 17, 2021
1 parent a5948c0 commit 1a19370
Show file tree
Hide file tree
Showing 22 changed files with 331 additions and 154 deletions.
33 changes: 18 additions & 15 deletions allot/ndb.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
'use strict'

const conf_db_file = process.env.DB_FILE
|| console.log('DB_FILE must be defined') || process.exit(1)

const sqlite3 = require('sqlite3')
const sqlite = require('sqlite')
const autobahn = require('autobahn')
Expand All @@ -13,17 +16,17 @@ const gateMass = new Map()

let maxId = makeEmpty(new Date())

const runQuorum = new QuorumEdge((bundleId, value) => {
console.log('SYNC:', bundleId, '=>', value)
const runQuorum = new QuorumEdge((applicantId, value) => {
console.log('SYNC:', applicantId, '=>', value)
for (let [,ss] of syncMass) {
ss.publish('syncId', [], {maxId, bundleId, syncId: value})
ss.publish('syncId', [], {maxId, applicantId, syncId: value})
}
}, mergeMin)

const readyQuorum = new QuorumEdge((bundleId, syncId) => {
console.log('READY:', bundleId, '=>', syncId)
const readyQuorum = new QuorumEdge((applicantId, syncId) => {
console.log('READY:', applicantId, '=>', syncId)
for (let [,gg] of gateMass) {
gg.done(bundleId, syncId)
gg.commitSegment(applicantId, syncId)
}
}, mergeMin)

Expand All @@ -39,13 +42,13 @@ function mkSync(uri, ssId) {

session.subscribe('runId', (args, kwargs, opts) => {
console.log('runId', ssId, kwargs)
runQuorum.vote(ssId, kwargs.bundleId, kwargs.runId)
runQuorum.vote(ssId, kwargs.applicantId, kwargs.runId)
maxId = mergeMax(maxId, kwargs.runId)
})

session.subscribe('readyId', (args, kwargs, opts) => {
console.log('readyId', ssId, kwargs)
readyQuorum.vote(ssId, kwargs.bundleId, kwargs.readyId)
readyQuorum.vote(ssId, kwargs.applicantId, kwargs.readyId)
})
}

Expand All @@ -59,13 +62,13 @@ function mkSync(uri, ssId) {
connection.open()
}

function mkGate(uri, gateId) {
function mkGate(uri, gateId, history) {
console.log('connect to gate:', uri)
const connection = new autobahn.Connection({url: uri, realm: 'gate'})
const connection = new autobahn.Connection({url: uri, realm: 'sys'})

connection.onopen = function (session, details) {
session.log('Session open '+gateId)
gateMass.set(gateId, new EntrySession(session, syncMass))
gateMass.set(gateId, new EntrySession(session, syncMass, history))
}

connection.onclose = function (reason, details) {
Expand All @@ -78,7 +81,7 @@ function mkGate(uri, gateId) {

async function main () {
const db = await sqlite.open({
filename: '../dbfiles/msgdb.sqlite',
filename: conf_db_file,
driver: sqlite3.Database
})

Expand All @@ -89,9 +92,9 @@ async function main () {
mkSync('ws:https://127.0.0.1:9012/wamp', 2)
mkSync('ws:https://127.0.0.1:9013/wamp', 3)

mkGate('ws:https://127.0.0.1:9021/wamp', 1)
mkGate('ws:https://127.0.0.1:9022/wamp', 2)
mkGate('ws:https://127.0.0.1:9023/wamp', 3)
mkGate('ws:https://127.0.0.1:9021/wamp', 1, history)
// mkGate('ws:https://127.0.0.1:9022/wamp', 2, history)
// mkGate('ws:https://127.0.0.1:9023/wamp', 3, history)
}

main().then(() => {
Expand Down
18 changes: 9 additions & 9 deletions allot/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ console.log('Listening WAMP port:', conf_wamp_port)
makeId.update(new Date())
setInterval(()=>{makeId.update(new Date())}, 7000)

const mkQuorum = new QuorumEdge((bundleId, value) => {
const mkQuorum = new QuorumEdge((applicantId, value) => {
const id = makeId.makeId()
console.log('CREATE-ID!', bundleId, '=>', id)
api.publish(['runId'], {kv: {bundleId: bundleId, runId: id}})
}, ()=>{return null})
console.log('CREATE-ID!', applicantId, '=>', id)
api.publish(['runId'], {kv: {applicantId, runId: id}})
}, () => null)

const syncQuorum = new QuorumEdge((bundleId, value) => {
console.log('QSYNC!', bundleId, '=>', value)
api.publish(['readyId'], {kv: {bundleId: bundleId, readyId: value}})
const syncQuorum = new QuorumEdge((applicantId, value) => {
console.log('QSYNC!', applicantId, '=>', value)
api.publish(['readyId'], {kv: {applicantId, readyId: value}})
}, mergeMin)

realm.on(MSG.SESSION_JOIN, (session) => {
Expand All @@ -45,11 +45,11 @@ realm.on(MSG.SESSION_LEAVE, (session) => {

api.subscribe(['mkId'], (data, opt) => {
console.log('MAKE-ID', data, opt)
mkQuorum.vote(opt.sid, data.kwargs.bundleId, null)
mkQuorum.vote(opt.sid, data.kwargs.applicantId, null)
})

api.subscribe(['syncId'], (data, opt) => {
console.log('SYNC-ID', data, opt)
makeId.shift(data.kwargs.maxId)
syncQuorum.vote(opt.sid, data.kwargs.bundleId, data.kwargs.syncId)
syncQuorum.vote(opt.sid, data.kwargs.applicantId, data.kwargs.syncId)
})
86 changes: 70 additions & 16 deletions lib/allot/entry_session.js
Original file line number Diff line number Diff line change
@@ -1,52 +1,106 @@
'use strict'

const { keyId } = require("../tools")

class HistorySegment {
constructor () {
this.content = []
}

addEvent (event) {
this.content.push(event)
}
}

class EntrySession {
constructor (wampSession, syncMass) {
constructor (wampSession, syncMass, history) {
this.wampSession = wampSession
this.stack = []
this.waitForValue = undefined
this.stackApplicantId = []
this.curApplicantId = undefined
this.syncMass = syncMass
this.history = history
this.segmentToWrite = new Map()

wampSession.subscribe('mkId', (publishArgs, kwargs, opts) => {
this.sync(kwargs.bundleId)
console.log('mkId', kwargs)
this.queueSync(kwargs.applicantId)
})

wampSession.subscribe('ping', (publishArgs, kwargs, opts) => {
console.log('ping', kwargs)
wampSession.publish('pong', publishArgs, kwargs)
})

wampSession.subscribe('mqlog', (publishArgs, kwargs, opts) => {
this.delayEvent(kwargs)
})
}

mkDbId (segmentId, offset) {
return segmentId.dt+keyId(segmentId.id)+keyId(offset)
}

sendToSync(bundleId) {
console.log('mkId', bundleId)
delayEvent (event) {
let segment = this.segmentToWrite.get(event.applicantId)
if (!segment) {
console.error("applicantId not found to delay [", event.applicantId, "]", event)
segment = new HistorySegment()
this.segmentToWrite.set(event.applicantId, segment)
}
segment.addEvent(event)
}

sendToSync (applicantId) {
console.log('sendToSync applicantId[', applicantId, "]")
for (let [,ss] of this.syncMass) {
ss.publish('mkId', [], {bundleId})
ss.publish('mkId', [], {applicantId})
}
if (!this.segmentToWrite.has(applicantId)) {
this.segmentToWrite.set(applicantId, new HistorySegment())
}
}

checkLine() {
if (this.waitForValue) {
if (this.curApplicantId) {
return false
}
this.waitForValue = this.stack.shift()
if (this.waitForValue) {
this.sendToSync(this.waitForValue)
this.curApplicantId = this.stackApplicantId.shift()
if (this.curApplicantId) {
this.sendToSync(this.curApplicantId)
return true
}
return false
}

sync(bundleId) {
this.stack.push(bundleId)
queueSync (applicantId) {
this.stackApplicantId.push(applicantId)
return this.checkLine()
}

done(bundleId, syncId) {
if (this.waitForValue === bundleId) {
this.wampSession.publish('readyId', [], {bundleId, syncId})
commitSegment (applicantId, segmentId) {
// check is applicant of mine session
if (this.curApplicantId === applicantId) {
this.dbSaveSegment(applicantId, segmentId)
this.wampSession.publish('readyId', [], {applicantId, segmentId})
this.curApplicantId = undefined
return this.checkLine()
}
return false
}

dbSaveSegment (applicantId, segmentId) {
console.log("dbSaveSegment", applicantId, segmentId)
let segment = this.segmentToWrite.get(applicantId)
if (!segment) {
console.error("applicantId not found in segments [", applicantId, "]")
return
}
let offset = 1
for (let row of segment.content) {
this.history.saveEventHistory(this.mkDbId(segmentId, offset), undefined, row.realm, row.uri, row.data)
offset++
}
}
}

exports.EntrySession = EntrySession
2 changes: 1 addition & 1 deletion lib/allot/makeid.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { keyDate } = require('./tools')
const { keyDate } = require('../tools')

function mergeMax(a, b) {
if (a.dt > b.dt) {
Expand Down
54 changes: 44 additions & 10 deletions lib/allot/netbinder.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,83 @@ const { ReactEngine, ReactBinder } = require('../binder')
const NET_REALM_NAME = 'sys'

class HistorySegment {
constructor (segmentId) {
constructor (applicantId) {
this.content = []
this.segmentId = segmentId
this.applicantId = applicantId
}

addActor (actor) {
this.content.push(actor)
return this.segmentId
return this.applicantId
}
}

class NetBinder extends ReactBinder {
constructor (router) {
super()
this.curSegment = null
this.segmentId = 0
this.applicantId = 0
this.segments = new Map()

const realm = new BaseRealm(router, new BaseEngine())
router.addRealm(NET_REALM_NAME, realm)
this.api = realm.foxApi()

this.api.subscribe(['pong'], (data, opt) => {
this.curSegment = null
console.log('PONG', data, opt)
})

this.api.subscribe(['readyId'], (data, opt) => {
console.log('READY-ID', data, opt)
this.segmentCommited(data.kwargs, opt)
})

this.api.subscribe(['dispatch'], (data, opt) => {
console.log('DISPATCH', data, opt)
})
}

getSegment () {
if (this.curSegment) {
return this.curSegment
}
this.segmentId++
this.curSegment = new HistorySegment(this.segmentId)
this.segments.set(this.segmentId, this.curSegment)
this.applicantId++
this.curSegment = new HistorySegment(this.applicantId)
this.segments.set(this.applicantId, this.curSegment)
this.api.publish(['mkId'], {kv:{
applicantId: this.applicantId
}})
this.api.publish(['ping'], {kv:{
applicantId: this.applicantId
}})
return this.curSegment
}

keepHistory (engine, actor) {
findSegment(applicantId) {
return this.segments.get(applicantId)
}

segmentCommited (applicant, opt) {
console.log('segmentCommited', applicant, opt)
let segment = this.findSegment(applicant.applicantId)
if (!segment) {
return
}
for (let actor of segment.content) {
actor.confirm()
}
// segmentCommited { applicantId: 181, segmentId: { dt: '2104230438', id: 18 } }
// segmentCommited { applicantId: 181, segmentId: { dt: '2104230438', id: 18 } }
// segmentCommited { applicantId: 182, segmentId: { dt: '2104230438', id: 19 } }
// segmentCommited { applicantId: 182, segmentId: { dt: '2104230438', id: 19 } }
}

keepUpdateHistory (engine, actor) {
let segment = this.getSegment()
let id = segment.addActor(actor)

this.api.publish(['mqlog'], {kv:{
segmengId: id,
applicantId: id,
realm: engine.getRealmName(),
data: makeDataSerializable(actor.getData()),
uri: actor.getUri(),
Expand Down
18 changes: 16 additions & 2 deletions lib/binder.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,28 @@ class ReactEngine extends BaseEngine {
this.qConfirm = new DeferMap()
}

keepHistory (actor) {
this.binder.keepHistory(this, actor)
keepEventHistory (actor) {
return this.binder.keepEventHistory(this, actor)
}

keepUpdateHistory (actor) {
return this.binder.keepUpdateHistory(this, actor)
}

getHistoryAfter (after, uri, cbRow) {
return this.binder.getHistoryAfter(this, after, uri, cbRow)
}

doPush (actor) {
this.keepEventHistory(actor).then(() => {
if (actor.getOpt().retain) {
this.updateKvFromActor(actor)
} else {
actor.confirm(actor.msg)
}
})
}

cleanupSession(sessionId) {
return Promise.all([
super.cleanupSession(sessionId),
Expand Down
9 changes: 3 additions & 6 deletions lib/fox_router.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,9 @@ class FoxRouter extends Router {
}

createRealm () {
const engine = new MemEngine()
engine.registerKeyValueEngine(['#'], new MemKeyValueStorage())
return new BaseRealm(
this,
engine
)
const realm = new BaseRealm(this, new MemEngine())
realm.registerKeyValueEngine(['#'], new MemKeyValueStorage())
return realm
}
}

Expand Down
Loading

0 comments on commit 1a19370

Please sign in to comment.