2023-12-18 13:12:25 +08:00
'use strict' ;
const net = require ( 'net' ) ,
tls = require ( 'tls' ) ,
fs = require ( 'fs' ) ,
dgram = require ( 'dgram' ) ,
EventParser = require ( '../entities/EventParser.js' ) ,
2024-01-16 21:26:16 +08:00
Message = require ( 'js-message' ) ,
2023-12-18 13:12:25 +08:00
Events = require ( 'event-pubsub' ) ;
let eventParser = new EventParser ( ) ;
class Server extends Events {
constructor ( path , config , log , port ) {
super ( ) ;
Object . assign (
this ,
{
config : config ,
path : path ,
port : port ,
udp4 : false ,
udp6 : false ,
log : log ,
server : false ,
sockets : [ ] ,
emit : emit ,
broadcast : broadcast
}
) ;
eventParser = new EventParser ( this . config ) ;
this . on (
'close' ,
serverClosed . bind ( this )
) ;
}
onStart ( socket ) {
this . trigger (
'start' ,
socket
) ;
}
stop ( ) {
this . server . close ( ) ;
}
start ( ) {
if ( ! this . path ) {
this . log ( 'Socket Server Path not specified, refusing to start' ) ;
return ;
}
if ( this . config . unlink ) {
fs . unlink (
this . path ,
startServer . bind ( this )
) ;
} else {
startServer . bind ( this ) ( ) ;
}
}
}
function emit ( socket , type , data ) {
this . log ( 'dispatching event to socket' , ' : ' , type , data ) ;
let message = new Message ;
message . type = type ;
message . data = data ;
if ( this . config . rawBuffer ) {
this . log ( this . config . encoding )
2024-01-16 21:26:16 +08:00
message = Buffer . from ( type , this . config . encoding ) ;
2023-12-18 13:12:25 +08:00
} else {
message = eventParser . format ( message ) ;
}
if ( this . udp4 || this . udp6 ) {
if ( ! socket . address || ! socket . port ) {
this . log ( 'Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets' ) ;
this . broadcast ( type , data ) ;
return ;
}
this . server . write (
message ,
socket
) ;
return ;
}
socket . write ( message ) ;
}
function broadcast ( type , data ) {
this . log ( 'broadcasting event to all known sockets listening to ' , this . path , ' : ' , ( ( this . port ) ? this . port : '' ) , type , data ) ;
let message = new Message ;
message . type = type ;
message . data = data ;
if ( this . config . rawBuffer ) {
2024-01-16 21:26:16 +08:00
message = Buffer . from ( type , this . config . encoding ) ;
2023-12-18 13:12:25 +08:00
} else {
message = eventParser . format ( message ) ;
}
if ( this . udp4 || this . udp6 ) {
for ( let i = 1 , count = this . sockets . length ; i < count ; i ++ ) {
this . server . write ( message , this . sockets [ i ] ) ;
}
} else {
for ( let i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
this . sockets [ i ] . write ( message ) ;
}
}
}
function serverClosed ( ) {
for ( let i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
let socket = this . sockets [ i ] ;
let destroyedSocketId = false ;
if ( socket ) {
if ( socket . readable ) {
continue ;
}
}
if ( socket . id ) {
destroyedSocketId = socket . id ;
}
this . log ( 'socket disconnected' , destroyedSocketId . toString ( ) ) ;
if ( socket && socket . destroy ) {
socket . destroy ( ) ;
}
this . sockets . splice ( i , 1 ) ;
this . publish ( 'socket.disconnected' , socket , destroyedSocketId ) ;
return ;
}
}
function gotData ( socket , data , UDPSocket ) {
let sock = ( ( this . udp4 || this . udp6 ) ? UDPSocket : socket ) ;
if ( this . config . rawBuffer ) {
2024-01-16 21:26:16 +08:00
data = Buffer . from ( data , this . config . encoding ) ;
2023-12-18 13:12:25 +08:00
this . publish (
'data' ,
data ,
sock
) ;
return ;
}
2024-01-16 21:26:16 +08:00
if ( ! sock . ipcBuffer ) {
sock . ipcBuffer = '' ;
2023-12-18 13:12:25 +08:00
}
2024-01-16 21:26:16 +08:00
data = ( sock . ipcBuffer += data ) ;
2023-12-18 13:12:25 +08:00
if ( data . slice ( - 1 ) != eventParser . delimiter || data . indexOf ( eventParser . delimiter ) == - 1 ) {
this . log ( 'Messages are large, You may want to consider smaller messages.' ) ;
return ;
}
2024-01-16 21:26:16 +08:00
sock . ipcBuffer = '' ;
2023-12-18 13:12:25 +08:00
data = eventParser . parse ( data ) ;
while ( data . length > 0 ) {
let message = new Message ;
message . load ( data . shift ( ) ) ;
// Only set the sock id if it is specified.
if ( message . data && message . data . id ) {
sock . id = message . data . id ;
}
this . log ( 'received event of : ' , message . type , message . data ) ;
this . publish (
message . type ,
message . data ,
sock
) ;
}
}
function socketClosed ( socket ) {
this . publish (
'close' ,
socket
) ;
}
function serverCreated ( socket ) {
this . sockets . push ( socket ) ;
if ( socket . setEncoding ) {
socket . setEncoding ( this . config . encoding ) ;
}
this . log ( '## socket connection to server detected ##' ) ;
socket . on (
'close' ,
socketClosed . bind ( this )
) ;
socket . on (
'error' ,
function ( err ) {
this . log ( 'server socket error' , err ) ;
this . publish ( 'error' , err ) ;
} . bind ( this )
) ;
socket . on (
'data' ,
gotData . bind ( this , socket )
) ;
socket . on (
'message' ,
function ( msg , rinfo ) {
if ( ! rinfo ) {
return ;
}
this . log ( 'Received UDP message from ' , rinfo . address , rinfo . port ) ;
let data ;
if ( this . config . rawSocket ) {
2024-01-16 21:26:16 +08:00
data = Buffer . from ( msg , this . config . encoding ) ;
2023-12-18 13:12:25 +08:00
} else {
data = msg . toString ( ) ;
}
socket . emit ( 'data' , data , rinfo ) ;
} . bind ( this )
) ;
this . publish (
'connect' ,
socket
) ;
if ( this . config . rawBuffer ) {
return ;
}
}
function startServer ( ) {
this . log (
'starting server on ' , this . path ,
( ( this . port ) ? ` : ${ this . port } ` : '' )
) ;
if ( ! this . udp4 && ! this . udp6 ) {
this . log ( 'starting TLS server' , this . config . tls ) ;
if ( ! this . config . tls ) {
this . server = net . createServer (
serverCreated . bind ( this )
) ;
} else {
startTLSServer . bind ( this ) ( ) ;
}
} else {
this . server = dgram . createSocket (
( ( this . udp4 ) ? 'udp4' : 'udp6' )
) ;
this . server . write = UDPWrite . bind ( this ) ;
this . server . on (
'listening' ,
function UDPServerStarted ( ) {
serverCreated . bind ( this ) ( this . server ) ;
} . bind ( this )
) ;
}
this . server . on (
'error' ,
function ( err ) {
this . log ( 'server error' , err ) ;
this . publish (
'error' ,
err
) ;
} . bind ( this )
) ;
this . server . maxConnections = this . config . maxConnections ;
if ( ! this . port ) {
this . log ( 'starting server as' , 'Unix || Windows Socket' ) ;
if ( process . platform === 'win32' ) {
this . path = this . path . replace ( /^\// , '' ) ;
this . path = this . path . replace ( /\//g , '-' ) ;
this . path = ` \\ \\ . \\ pipe \\ ${ this . path } ` ;
}
2024-01-16 21:26:16 +08:00
this . server . listen ( {
path : this . path ,
readableAll : this . config . readableAll ,
writableAll : this . config . writableAll
} , this . onStart . bind ( this ) ) ;
2023-12-18 13:12:25 +08:00
return ;
}
if ( ! this . udp4 && ! this . udp6 ) {
this . log ( 'starting server as' , ( this . config . tls ? 'TLS' : 'TCP' ) ) ;
this . server . listen (
this . port ,
this . path ,
this . onStart . bind ( this )
) ;
return ;
}
this . log ( 'starting server as' , ( ( this . udp4 ) ? 'udp4' : 'udp6' ) ) ;
this . server . bind (
this . port ,
this . path
) ;
this . onStart (
{
address : this . path ,
port : this . port
}
) ;
}
function startTLSServer ( ) {
this . log ( 'starting TLS server' , this . config . tls ) ;
if ( this . config . tls . private ) {
this . config . tls . key = fs . readFileSync ( this . config . tls . private ) ;
} else {
this . config . tls . key = fs . readFileSync ( ` ${ _ _dirname } /../local-node-ipc-certs/private/server.key ` ) ;
}
if ( this . config . tls . public ) {
this . config . tls . cert = fs . readFileSync ( this . config . tls . public ) ;
} else {
this . config . tls . cert = fs . readFileSync ( ` ${ _ _dirname } /../local-node-ipc-certs/server.pub ` ) ;
}
if ( this . config . tls . dhparam ) {
this . config . tls . dhparam = fs . readFileSync ( this . config . tls . dhparam ) ;
}
if ( this . config . tls . trustedConnections ) {
if ( typeof this . config . tls . trustedConnections === 'string' ) {
this . config . tls . trustedConnections = [ this . config . tls . trustedConnections ] ;
}
this . config . tls . ca = [ ] ;
for ( let i = 0 ; i < this . config . tls . trustedConnections . length ; i ++ ) {
this . config . tls . ca . push (
fs . readFileSync ( this . config . tls . trustedConnections [ i ] )
) ;
}
}
this . server = tls . createServer (
this . config . tls ,
serverCreated . bind ( this )
) ;
}
function UDPWrite ( message , socket ) {
2024-01-16 21:26:16 +08:00
let data = Buffer . from ( message , this . config . encoding ) ;
2023-12-18 13:12:25 +08:00
this . server . send (
data ,
0 ,
data . length ,
socket . port ,
socket . address ,
function ( err , bytes ) {
if ( err ) {
this . log ( 'error writing data to socket' , err ) ;
this . publish (
'error' ,
function ( err ) {
this . publish ( 'error' , err ) ;
}
) ;
}
}
) ;
}
module . exports = Server ;