Skip to content

Commit 282be32

Browse files
committed
Prepared for stateless transactions
- also bugfix routing discovery query - rename constant for semantics - tmp bind query() - rename prop for semantics - README
1 parent 56f4e3b commit 282be32

19 files changed

Lines changed: 249 additions & 184 deletions

README.md

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,46 +39,52 @@ const driver = new Driver({});
3939
// Reactive
4040
driver.query('RETURN $foo', {foo: true}).subscribe({
4141
next: console.log,
42-
complete: driver.shutDown,
43-
error: driver.shutDown
42+
complete: () => driver.shutDown().toPromise(),
43+
error: (err) => {
44+
console.error(err);
45+
46+
driver.shutDown().toPromise();
47+
}
4448
})
4549

4650
// Promise
4751
driver.query('RETURN $foo', {foo: true})
4852
.toPromise()
4953
.then(console.log)
5054
.catch(console.error)
51-
.finally(driver.shutDown);
55+
.finally(() => driver.shutDown().toPromise());
5256
```
5357

5458
### Transactions
5559
Only for 4.X
5660
```Typescript
57-
import {flatMap, tap} from 'rxjs/operators';
61+
import {flatMap, reduce, tap} from 'rxjs/operators';
5862

59-
import {Driver, Num} from '.';
63+
import {Driver, List, Num, Result} from '.';
6064

61-
const driver = new Driver({});
65+
const driver = new Driver<Result>({});
6266

6367
// Reactive
6468
driver.transaction().pipe(
6569
flatMap((tx) => tx.query('CREATE (n {foo: $foo}) RETURN n', {foo: true}).pipe(
66-
tap(({data}) => data.length.greaterThan(Num.ZERO)
67-
? tx.commit()
68-
: tx.rollback()
69-
)
70+
reduce((agg, next) => agg.concat(next), List.of<Result>([])),
71+
tap(() => tx.rollback().toPromise())
7072
))
7173
).subscribe({
7274
next: console.log,
73-
complete: driver.shutDown,
74-
error: driver.shutDown
75+
complete: () => driver.shutDown().toPromise(),
76+
error: (err) => {
77+
console.error(err);
78+
79+
driver.shutDown().toPromise();
80+
}
7581
});
7682

7783
// Promise
7884
getResults()
7985
.then(console.log)
8086
.catch(console.error)
81-
.finally(driver.shutDown)
87+
.finally(() => driver.shutDown().toPromise())
8288

8389
async function getResults() {
8490
const tx = await driver.transaction().toPromise();
@@ -103,7 +109,7 @@ import {forkJoin} from 'rxjs';
103109
import {filter, reduce} from 'rxjs/operators';
104110
import _ from 'lodash'
105111

106-
import {Driver, DRIVER_RESULT_TYPE, List, Result} from './index';
112+
import {Driver, DRIVER_RESULT_TYPE, List, Result} from '.';
107113

108114
const driver = new Driver<Result>({
109115
useRouting: true,
@@ -120,8 +126,12 @@ const result = forkJoin(_.map(Array(10), () => query));
120126

121127
result.subscribe({
122128
next: console.log,
123-
error: console.error,
124-
complete: driver.shutDown
129+
complete: () => driver.shutDown().toPromise(),
130+
error: (err) => {
131+
console.error(err);
132+
133+
driver.shutDown().toPromise();
134+
}
125135
})
126136

127137
// Promise
@@ -130,15 +140,15 @@ const result = Promise.all(_.map(Array(10), () => query.toPromise()));
130140
result
131141
.then(console.log)
132142
.catch(console.error)
133-
.finally(driver.shutDown);
143+
.finally(() => driver.shutDown().toPromise())
134144
```
135145

136146
## Routing + Transactions
137147
Only for 4.X
138148
```TypeScript
139149
import {filter, reduce} from 'rxjs/operators';
140150

141-
import {DBMS_MEMBER_ROLE, Driver, DRIVER_RESULT_TYPE, List, Result} from './index';
151+
import {DBMS_DB_ROLE, Driver, DRIVER_RESULT_TYPE, List, Result} from '.';
142152

143153
const driver = new Driver<Result>({
144154
useRouting: true,
@@ -148,12 +158,12 @@ const driver = new Driver<Result>({
148158
getResults()
149159
.then(console.log)
150160
.catch(console.error)
151-
.finally(driver.shutDown)
161+
.finally(() => driver.shutDown().toPromise())
152162

153163
async function getResults() {
154164
// request WRITE transaction for db 'neo4j'
155-
const tx = await driver.transaction({role: DBMS_MEMBER_ROLE.LEADER, db: 'neo4j'}).toPromise();
156-
const q1 = await tx.query('CREATE (n {foo: $foo})', {foo: true}).pipe(
165+
const tx = await driver.transaction({role: DBMS_DB_ROLE.LEADER, db: 'neo4j'}).toPromise();
166+
const q1 = await tx.query('CREATE (n {foo: $foo}) RETURN n', {foo: true}).pipe(
157167
filter(({type}) => type === DRIVER_RESULT_TYPE.RECORD),
158168
reduce((agg, next) => agg.concat(next), List.of<Result>([]))
159169
).toPromise();
@@ -185,8 +195,12 @@ driver.query('MATCH (n) RETURN n')
185195
reduce((agg, next) => agg.concat(next), [])
186196
).subscribe({
187197
next: console.log,
188-
complete: driver.shutDown,
189-
error: driver.shutDown
198+
complete: () => driver.shutDown().toPromise(),
199+
error: (err) => {
200+
console.error(err);
201+
202+
driver.shutDown().toPromise();
203+
}
190204
})
191205
```
192206

src/connection/connection.class.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export default class Connection<Data extends any = any> extends Subject<IServerM
1313
public readonly id = uuid();
1414

1515
protected readonly socket: WebSocket;
16-
protected protocol: BOLT_PROTOCOLS = BOLT_PROTOCOLS.UNKNOWN;
16+
protected ourProtocol: BOLT_PROTOCOLS = BOLT_PROTOCOLS.UNKNOWN;
1717
protected didAuth: boolean = false;
1818
protected incomingData = new ArrayBuffer(0);
1919
protected readySubject = new AsyncSubject<void>();
@@ -37,8 +37,12 @@ export default class Connection<Data extends any = any> extends Subject<IServerM
3737
return `${this.config.host}:${this.config.port}`;
3838
}
3939

40+
public get protocol() {
41+
return this.ourProtocol;
42+
}
43+
4044
private get didHandshake() {
41-
return this.protocol !== BOLT_PROTOCOLS.UNKNOWN;
45+
return this.ourProtocol !== BOLT_PROTOCOLS.UNKNOWN;
4246
}
4347

4448
private get isReady() {
@@ -56,13 +60,13 @@ export default class Connection<Data extends any = any> extends Subject<IServerM
5660
map(() => {
5761
const {cmd, data, additionalData} = message;
5862
const allData = additionalData
59-
? [...data, ...additionalData(this.protocol)]
63+
? [...data, ...additionalData(this.ourProtocol)]
6064
: [...data];
6165

62-
this.socket.send(createMessage<Data>(this.protocol, cmd, allData, this.config.packer));
66+
this.socket.send(createMessage<Data>(this.ourProtocol, cmd, allData, this.config.packer));
6367
}),
6468
switchMap(() => this)
65-
)
69+
);
6670
}
6771

6872
@boundMethod
@@ -77,10 +81,10 @@ export default class Connection<Data extends any = any> extends Subject<IServerM
7781

7882
return this.terminationSubject.pipe(
7983
map(() => {
80-
this.socket.close()
84+
this.socket.close();
8185
}),
8286
mapTo(this)
83-
// @todo: not very graceful
87+
// @todo: not very graceful
8488
).toPromise();
8589
}
8690

@@ -123,7 +127,7 @@ export default class Connection<Data extends any = any> extends Subject<IServerM
123127

124128
@boundMethod
125129
private onChunk(view: DataView) {
126-
const {data} = unpackResponseData<Data>(this.protocol, view, this.config.unpacker);
130+
const {data} = unpackResponseData<Data>(this.ourProtocol, view, this.config.unpacker);
127131

128132
this.next({
129133
// @todo: cleanup
@@ -134,9 +138,9 @@ export default class Connection<Data extends any = any> extends Subject<IServerM
134138

135139
@boundMethod
136140
private onHandshake(data: DataView) {
137-
this.protocol = data.getInt32(0, false);
141+
this.ourProtocol = data.getInt32(0, false);
138142

139-
this.socket.send(getAuthMessage(this.protocol, this.config, this.config.packer));
143+
this.socket.send(getAuthMessage(this.ourProtocol, this.config, this.config.packer));
140144
}
141145

142146
@boundMethod

src/connection/connection.utils.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ function getCommandForProtocol(protocol: BOLT_PROTOCOLS, cmd: DriverCommand): nu
6060
return V3_BOLT_MESSAGES[cmd];
6161

6262
default: {
63-
throw new InvalidOperationError(`Protocol ${protocol} does not support transactions`)
63+
throw new InvalidOperationError(`Protocol ${protocol} does not support transactions`);
6464
}
6565
}
6666
}
@@ -106,10 +106,10 @@ export function createMessage<T extends any = any>(protocol: BOLT_PROTOCOLS, cmd
106106

107107
function getMessageHeader(size: number, signature: number) {
108108
if (size < 0x10) {
109-
return [HEADER_SIZE_LIMITS.TINY_STRUCT | size, signature]
109+
return [HEADER_SIZE_LIMITS.TINY_STRUCT | size, signature];
110110
}
111111

112-
throw new Error('Messages of size ' + size + ' are not supported')
112+
throw new Error('Messages of size ' + size + ' are not supported');
113113
}
114114

115115
export function getAuthMessage<T extends any = any>(protocol: BOLT_PROTOCOLS, params: IConnectionConfig<any>, packer?: Packer<T>) {

src/dev.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,19 @@
1-
import {DBMS_MEMBER_ROLE, Driver, List, Result} from '.';
1+
import {DBMS_DB_ROLE, Driver, List, Result} from '.';
22
import {reduce} from 'rxjs/operators';
33

44
const driver = new Driver<Result>({
55
useRouting: true,
6-
maxPoolSize: 10,
7-
connectionConfig: {
8-
port: 7697
9-
}
6+
maxPoolSize: 10
107
});
118

129
// Promise
1310
getResults()
1411
.then(console.log)
1512
.catch(console.error)
16-
.finally(driver.shutDown);
13+
.finally(() => driver.shutDown().toPromise());
1714

1815
async function getResults() {
19-
const tx = await driver.transaction({role: DBMS_MEMBER_ROLE.LEADER, db: 'neo4j'}).toPromise();
16+
const tx = await driver.transaction({role: DBMS_DB_ROLE.LEADER, db: 'neo4j'}).toPromise();
2017
const q1 = await tx.query('CREATE (n {foo: $foo}) RETURN n', {foo: true}).pipe(
2118
reduce((agg, next) => agg.concat(next), List.of<Result>([]))
2219
).toPromise();

src/driver/driver.abstract.ts

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,29 @@ import {InvalidOperationError} from '../errors';
2727
import {determineConnectionHosts} from './driver.utils';
2828

2929
export default abstract class DriverBase<Rec = any> {
30+
protected isReady = false;
31+
protected isShutDown = false;
32+
protected isSlave = false;
3033
protected discoveryTables: IDiscoveryTable[] = [];
31-
protected readonly connectionConfig: IConnectionConfig;
3234
protected connections: Connection[] = [];
3335
protected available: Connection[] = [];
34-
protected readySubject = new AsyncSubject<void>();
35-
protected availableConnections: BehaviorSubject<Connection[]> = new BehaviorSubject<Connection[]>([]);
36-
protected requestQueue: Subject<IRequest> = new Subject();
37-
protected processing: Observable<[IRequest, Connection]> = this.requestQueue.pipe(
36+
protected readonly connectionConfig: IConnectionConfig;
37+
protected readonly connectionSubject: BehaviorSubject<Connection[]> = new BehaviorSubject<Connection[]>([]);
38+
protected readonly readySubject = new AsyncSubject<void>();
39+
protected readonly availableConnections: BehaviorSubject<Connection[]> = new BehaviorSubject<Connection[]>([]);
40+
protected readonly requestQueue: Subject<IRequest> = new Subject();
41+
protected readonly processing: Observable<[IRequest, Connection]> = this.requestQueue.pipe(
3842
flatMap((request) => this.requestAvailableConnection(request.meta).pipe(
3943
map((connections): [IRequest, Connection] => [request, connections[0]])
4044
)),
4145
);
42-
protected isReady = false;
43-
protected isShutDown = false;
44-
protected isBound = false;
4546

4647
constructor(protected readonly config: IDriverConfig) {
4748
this.connectionConfig = _.merge({}, DEFAULT_CONNECTION_CONFIG, this.config.connectionConfig);
4849

50+
// @boundMethod does not like inheritance
51+
this.query = this.query.bind(this);
52+
4953
if (!this.config.useRouting) {
5054
this.isReady = true;
5155

@@ -55,7 +59,7 @@ export default abstract class DriverBase<Rec = any> {
5559
}
5660
}
5761

58-
//@boundMethod
62+
// @boundMethod does not like inheritance
5963
query<Res = Rec>(cypher: string, params: any = {}, meta: IQueryMeta = {}): Observable<Res> {
6064
const {pullN = -1, db} = meta; // @todo: transaction session ID
6165

@@ -66,7 +70,7 @@ export default abstract class DriverBase<Rec = any> {
6670
data: [cypher, params],
6771
additionalData: (protocol) => {
6872
if (protocol < BOLT_PROTOCOLS.V3) {
69-
return []
73+
return [];
7074
}
7175

7276
return db && protocol > BOLT_PROTOCOLS.V3
@@ -90,26 +94,26 @@ export default abstract class DriverBase<Rec = any> {
9094
throw new InvalidOperationError('Transaction pending');
9195
};
9296

93-
commit() {
97+
commit<Res = Rec>(): Observable<Res> {
9498
throw new InvalidOperationError('No transaction pending');
9599
}
96100

97-
rollback() {
101+
rollback<Res = Rec>(): Observable<Res> {
98102
throw new InvalidOperationError('No transaction pending');
99103
}
100104

101-
abstract shutDown(): Promise<this>;
105+
abstract shutDown(): Observable<this>;
102106

103107
@boundMethod
104108
protected sendMessages<Res>(messages: IClientMessage[] = [], meta?: IRequestMeta): Observable<Res> {
105109
return this.readySubject.pipe(
106-
flatMap(() => this.getConnectionForRequest(messages, meta)),
110+
flatMap(() => this.waitForConnection(messages, meta)),
107111
flatMap(([request, connection]) => this.executeRequests<Res>(request, connection))
108112
);
109113
}
110114

111115
@boundMethod
112-
protected getConnectionForRequest(messages: IClientMessage[], meta?: IRequestMeta): Observable<[IRequest, Connection]> {
116+
protected waitForConnection(messages: IClientMessage[], meta?: IRequestMeta): Observable<[IRequest, Connection]> {
113117
if (this.isShutDown) {
114118
throw new InvalidOperationError('Driver is shut down');
115119
}
@@ -130,7 +134,6 @@ export default abstract class DriverBase<Rec = any> {
130134
take(1),
131135
tap(([, connection]) => this.occupyConnection(connection)),
132136
);
133-
134137
}
135138

136139
@boundMethod
@@ -277,8 +280,8 @@ export default abstract class DriverBase<Rec = any> {
277280

278281
const validAvailable = _.filter(this.available, connectionPredicate);
279282

280-
if (this.isBound || arrayHasItems(validAvailable)) {
281-
return this.availableConnections
283+
if (this.isSlave || arrayHasItems(validAvailable)) {
284+
return this.availableConnections;
282285
}
283286

284287
if (this.connections.length < this.config.maxPoolSize) {
@@ -290,19 +293,21 @@ export default abstract class DriverBase<Rec = any> {
290293
];
291294

292295
this.releaseConnection(connection);
296+
this.connectionSubject.next(this.connections);
293297
this.availableConnections.next(this.available);
294298
}
295299

296300
return this.availableConnections.pipe(
297301
skipWhile((connections) => !_.some(connections, connectionPredicate))
298-
)
302+
);
299303
}
300304

301305
@boundMethod
302306
protected terminateConnection(connection: Connection): Promise<Connection> {
303307
this.occupyConnection(connection);
304308

305309
this.connections = _.filter(this.connections, ({id}) => id !== connection.id);
310+
this.connectionSubject.next(this.connections);
306311

307312
return connection.terminate();
308313
}

0 commit comments

Comments
 (0)