diff --git a/src/BaselineOfPharoZeroMQ/BaselineOfPharoZeroMQ.class.st b/src/BaselineOfPharoZeroMQ/BaselineOfPharoZeroMQ.class.st new file mode 100644 index 0000000..df0a7df --- /dev/null +++ b/src/BaselineOfPharoZeroMQ/BaselineOfPharoZeroMQ.class.st @@ -0,0 +1,15 @@ +Class { + #name : #BaselineOfPharoZeroMQ, + #superclass : #Object, + #category : #BaselineOfPharoZeroMQ +} + +{ #category : #baselines } +BaselineOfPharoZeroMQ >> baseline: spec [ + + + spec for: #common do: [ + + "Packages" + spec package: 'PharoZeroMQ' ] +] diff --git a/src/BaselineOfPharoZeroMQ/package.st b/src/BaselineOfPharoZeroMQ/package.st new file mode 100644 index 0000000..9db2606 --- /dev/null +++ b/src/BaselineOfPharoZeroMQ/package.st @@ -0,0 +1 @@ +Package { #name : #BaselineOfPharoZeroMQ } diff --git a/src/ZMQ/LibZMQ.class.st b/src/ZMQ/LibZMQ.class.st new file mode 100644 index 0000000..265e058 --- /dev/null +++ b/src/ZMQ/LibZMQ.class.st @@ -0,0 +1,28 @@ +" +ZMQ Library contains the name of the file that has the library. +Exclusively called in ZMQLibraryInterface. To know the version of this lib, you can find out by doing: +ZMQLibraryInterface version. + +This library uses UFFI calls. +" +Class { + #name : #LibZMQ, + #superclass : #FFILibrary, + #category : #'ZMQ-Libs' +} + +{ #category : #'accessing platform' } +LibZMQ >> macModuleName [ + ^ 'libzmq.dylib' +] + +{ #category : #'accessing platform' } +LibZMQ >> unixModuleName [ + ^ 'libzmq.so' +] + +{ #category : #'accessing platform' } +LibZMQ >> win32ModuleName [ + "While this is not a 'libc' properly, msvcrt has the functions we are defining here" + ^ 'zmq.dll' +] diff --git a/src/ZMQ/LibZMQThreaded.class.st b/src/ZMQ/LibZMQThreaded.class.st new file mode 100644 index 0000000..dfabdc4 --- /dev/null +++ b/src/ZMQ/LibZMQThreaded.class.st @@ -0,0 +1,41 @@ +" +I am LibZMQ but with a Thread Worker. Warning, I do not support every socket type, being used with an non-thread safe socket might lead to undesired behavior regarding your image. +I am used in only a few methods, there is no point in calling me for short tasks. + +To understand how you should use the TFWorker, please check TFTestLibraryUsingWorker. +Collaborators : ZMQLibraryInterface interacts with me . + + +" +Class { + #name : #LibZMQThreaded, + #superclass : #LibZMQ, + #classInstVars : [ + 'tfWorker' + ], + #category : #'ZMQ-Libs' +} + +{ #category : #accessing } +LibZMQThreaded class >> tfWorker [ + + ^ tfWorker +] + +{ #category : #accessing } +LibZMQThreaded class >> tfWorker: anObject [ + + tfWorker := anObject +] + +{ #category : #'accessing platform' } +LibZMQThreaded >> calloutAPIClass [ + + ^ TFCalloutAPI +] + +{ #category : #'accessing platform' } +LibZMQThreaded >> runner [ + + ^ self class tfWorker +] diff --git a/src/ZMQ/ManifestZMQ.class.st b/src/ZMQ/ManifestZMQ.class.st new file mode 100644 index 0000000..249a512 --- /dev/null +++ b/src/ZMQ/ManifestZMQ.class.st @@ -0,0 +1,17 @@ +" +I store metadata for this package. These meta data are used by other tools such as the SmalllintManifestChecker and the critics Browser +" +Class { + #name : #ManifestZMQ, + #superclass : #PackageManifest, + #category : #'ZMQ-Manifest' +} + +{ #category : #'meta-data' } +ManifestZMQ class >> description [ + + ^ 'ZMQ port from SMOCK. + ZMQ is a library that lets you discuss between 2 interfaces. With Pharo, you can send and receive messages with 2 different images. To do that, it uses a socket system, here is implemented a PUB-SUB and a REQ-REP pattern. + For more information on the library and the use of its functions, visit : + https://zguide.zeromq.org/ ' +] diff --git a/src/ZMQ/ZMQConstants.class.st b/src/ZMQ/ZMQConstants.class.st new file mode 100644 index 0000000..2d4b24f --- /dev/null +++ b/src/ZMQ/ZMQConstants.class.st @@ -0,0 +1,50 @@ +" +This class initializes the option values for sockets. This pool is shared among each class of ZMQ-Core, Libs and Samples. It is initialized in the new: instance method of ZMQSocket. + + +" +Class { + #name : #ZMQConstants, + #superclass : #SharedPool, + #classVars : [ + 'ZMQ_BACKLOG', + 'ZMQ_DONTWAIT', + 'ZMQ_IMMEDIATE', + 'ZMQ_LAST_ENDPOINT', + 'ZMQ_PUB', + 'ZMQ_RCVMORE', + 'ZMQ_REP', + 'ZMQ_REQ', + 'ZMQ_REQ_RELAXED', + 'ZMQ_ROUTING_ID', + 'ZMQ_SUB', + 'ZMQ_SUBSCRIBE', + 'ZMQ_UNSUBSCRIBE', + 'ZMQ_WAIT' + ], + #category : #'ZMQ-Core' +} + +{ #category : #'private - initialization' } +ZMQConstants class >> initConstants [ + "Available socket types" + + ZMQ_PUB := 1. + ZMQ_REP := 4. + ZMQ_REQ := 3. + ZMQ_SUB := 2. + + "Socket options, see zmq.h Socket option list" + ZMQ_SUBSCRIBE := 6. + ZMQ_UNSUBSCRIBE := 7. + ZMQ_LAST_ENDPOINT := 32. + ZMQ_BACKLOG := 19. + ZMQ_REQ_RELAXED := 53. + ZMQ_IMMEDIATE := 39. + ZMQ_RCVMORE := 13. + ZMQ_ROUTING_ID := 5. + + "Flags " + ZMQ_DONTWAIT := 1. + ZMQ_WAIT := 0 +] diff --git a/src/ZMQ/ZMQContext.class.st b/src/ZMQ/ZMQContext.class.st new file mode 100644 index 0000000..7ea25fc --- /dev/null +++ b/src/ZMQ/ZMQContext.class.st @@ -0,0 +1,44 @@ +" +A ZMQ context is a container for all sockets in a single process. You should create and use exactly one context in your process, if you create 2, these will be like separate ZMQ instances. + +Instance Variables : +pointer : , points to a ZMQ Context +" +Class { + #name : #ZMQContext, + #superclass : #FFIExternalObject, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Core' +} + +{ #category : #'instance creation' } +ZMQContext class >> new [ + + ^ self new: 1 +] + +{ #category : #'instance creation' } +ZMQContext class >> new: intIoThreads [ + + | context | + context := (ZMQInterface default ctxInit: intIoThreads). + ^ context +] + +{ #category : #dependencies } +ZMQContext >> destroy [ + + | ret | + handle ifNotNil: [ ret := ZMQInterface default ctxDestroy: handle ]. + ret = -1 ifTrue: [ + self error: 'ZMQ Context Error, unsuccessful destruction' ]. + ^ self +] + +{ #category : #dependencies } +ZMQContext >> release [ + + self destroy +] diff --git a/src/ZMQ/ZMQInterface.class.st b/src/ZMQ/ZMQInterface.class.st new file mode 100644 index 0000000..3fb0079 --- /dev/null +++ b/src/ZMQ/ZMQInterface.class.st @@ -0,0 +1,259 @@ +" +ZMQ Interface is used by ZMQSocket as a bridge to ZMQLibraryInterface wich contains the FFI calls to ZMQ lib. + + +" +Class { + #name : #ZMQInterface, + #superclass : #Object, + #classVars : [ + 'default' + ], + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Core' +} + +{ #category : #'instance creation' } +ZMQInterface class >> default [ + + default ifNil: [ default := self new ]. + ^ default +] + +{ #category : #'api - context' } +ZMQInterface >> ctxDestroy: aContextPointer [ + + ^[ZMQLibraryInterface zmq_term: aContextPointer] on: Error + do: [self error: 'ZMQ context destruction error'] +] + +{ #category : #'api - context' } +ZMQInterface >> ctxInit: intIoThreads [ + + ^[ZMQLibraryInterface zmq_init: intIoThreads] on: Error + do: [self error: 'ZMQ context creation error'] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr bind: anURLString [ + + ^[ZMQLibraryInterface zmq_bind: aSocketPtr with: anURLString] + on: Error + do: [self error: 'ZMQ socket bind(',anURLString,') error'] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr connect: anURLString [ + + ^[ZMQLibraryInterface zmq_connect: aSocketPtr with: anURLString] + on: Error + do: [self error: 'ZMQ socket connect(',anURLString,') error'] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr disconnect: anURLString [ + + ^[ZMQLibraryInterface zmq_disconnect: aSocketPtr with: anURLString] + on: Error + do: [self error: 'ZMQ socket disconnect(',anURLString,') error'] +] + +{ #category : #'api - socket - options deprecated' } +ZMQInterface >> socket: aSocketPtr getOption: anOptionKey [ + + | aBuffer aSize | + aSize := FFIArray newType: #size_t size: 1. + aSize at: 1 put: 100. + "aBuffer := (FFITypeArray ofType: 'char' size: (aSize at: 1)) new." + aBuffer := FFIArray newType: #char size: 100. + + + ^ [ + ZMQLibraryInterface + zmq_getsockopt: aSocketPtr + with: anOptionKey + with: aBuffer + with: aSize ] + on: Error + do: [ "'ZMQ_BACKLOG' = anOptionKey ifTrue: [ + ^ self getOptionBacklogFor: aSocketPtr ]. + 'ZMQ_TYPE' = anOptionKey ifTrue: [ + ^ self getOptionBacklogFor: aSocketPtr ]. +" + self error: + (ZMQLibraryInterface zmq_strerror: ZMQLibraryInterface zmq_errno) ] +] + +{ #category : #'api - socket - options' } +ZMQInterface >> socket: aSocketPtr option: aOptionInt value: aBacklogValue [ + | len | + len := aBacklogValue size. + ^[ZMQLibraryInterface zmq_setsockopt: aSocketPtr with: aOptionInt with: aBacklogValue with: len.] + on: Error + do: [self error: 'ZMQ socket Option(',aOptionInt printString,' error'] +] + +{ #category : #'api - socket - options deprecated' } +ZMQInterface >> socket: aSocketPtr optionName: anOptionName optValue: aBuffer [ + + | len | + len := aBuffer size. + + ^ [ + ZMQLibraryInterface + zmq_getsockopt: aSocketPtr + with: anOptionName + with: aBuffer + with: len ] + on: Error + do: [ + self error: 'ZMQ socket getOption (' , anOptionName , ') error' ] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr recv: aBuffer len: aMaxSize flags: flags [ + "Receive a message, with max size len + buffer is a ValueHolder + return the message size" + + | ret | + [ "zmq receive " + ret := ZMQLibraryInterface + zmq_recv: aSocketPtr + with: aBuffer + with: aMaxSize + with: flags. + + ^ ret ] + on: Error + do: [ :ex | + self error: + (ZMQLibraryInterface zmq_strerror: ZMQLibraryInterface zmq_errno) ] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr recvMsg: buffer flags: flags [ + "Receive a message, with max size len + buffer is a ValueHolder + return the message size" + + | zmqMsg | + buffer class = ValueHolder ifFalse: [ + self error: 'ZMQ socket recv error: buffer should be a ValueHolder' ]. + + ^ [ + | msg ba nbytes baList isEndMessage isReadableMsg | + "memory allocation" + baList := OrderedCollection new. + msg := ZMQ_msg_t new. + + + isEndMessage := false. + [ isEndMessage ] whileFalse: [ + ZMQLibraryInterface zmq_msg_init: msg. + "zmq recveive message" + ZMQLibraryInterface zmq_recvmsg: aSocketPtr msg: msg with: flags. + nbytes := ZMQLibraryInterface zmq_msg_size: msg. + isReadableMsg := true. + + "test null pointer" + nbytes = -1 ifTrue: [ + isEndMessage := (ZMQLibraryInterface zmq_msg_more: msg) = 0. + ZMQLibraryInterface zmq_msg_close: msg. + "reponse vide" + baList add: (ByteArray new: 0). + isReadableMsg := false ]. + + isReadableMsg ifTrue: [ "Conversion as a byte array" + nbytes = 0 + ifTrue: [ ba := ByteArray new: nbytes ] + ifFalse: [ + zmqMsg := ZMQLibraryInterface zmq_msg_data: msg. + ba := zmqMsg getHandle copyFrom: 1 to: nbytes ]. + + "is the last part or not ?" + isEndMessage := (ZMQLibraryInterface zmq_msg_more: msg) = 0. + + "free memory" + ZMQLibraryInterface zmq_msg_close: msg. + baList add: ba ] ]. + "received data store in buffer valueHolder" + buffer contents: baList. + ^ self ] + on: Error + do: [ :ex | self error: 'ZMQ socket recv error' ] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr send: buffer flags: flags [ + + | len | + len := buffer size. + ^[ZMQLibraryInterface zmq_send: aSocketPtr with: buffer with: len with: flags] + on: Error + do: [self error: 'ZMQ socket send error'] +] + +{ #category : #'api - socket - options deprecated' } +ZMQInterface >> socket: pointer setOption: aKeyOption optValue: aValueOption [ + + | aSize | + "aSize := (FFITypeArray ofType: 'int' size: 1) new. + aSize at: 1 put: 100." + aSize := aValueOption class = ByteString + ifTrue: [ aValueOption size ] + ifFalse: [ + (FFITypeArray ofType: 'size_t' size: 1) new + at: 1 + put: (aValueOption at: 1) ]. + + ^ [ + ZMQLibraryInterface + zmq_setsockopt: pointer + with: aKeyOption + with: aValueOption + with: aSize ] + on: Error + do: [ + self error: + (ZMQLibraryInterface zmq_strerror: ZMQLibraryInterface zmq_errno) ] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aPointerContext type: anIntegerType [ + + + ^ [ + ZMQLibraryInterface zmq_socket: aPointerContext type: anIntegerType ] + on: Error + do: [ + self error: + 'ZMQ type: ' , anIntegerType printString + , ' socket creation error' ] +] + +{ #category : #'api - socket' } +ZMQInterface >> socket: aSocketPtr unbind: anURLString [ + + + ^ [ ZMQLibraryInterface zmq_unbind: aSocketPtr with: anURLString ] + on: Error + do: [ self error: 'ZMQ socket unbind(' , anURLString , ') error' ] +] + +{ #category : #'api - socket' } +ZMQInterface >> socketClose: socketPtr [ + + + ^ [ ZMQLibraryInterface zmq_close: socketPtr ] + on: Error + do: [ self error: 'ZMQ socket destruction error' ] +] + +{ #category : #accessing } +ZMQInterface >> version [ + + ^ZMQLibraryInterface version +] diff --git a/src/ZMQ/ZMQLibraryInterface.class.st b/src/ZMQ/ZMQLibraryInterface.class.st new file mode 100644 index 0000000..2237465 --- /dev/null +++ b/src/ZMQ/ZMQLibraryInterface.class.st @@ -0,0 +1,193 @@ +" +Accessing interface to ZMQ's functions. The C functions are called with FFI and TFFI. ZMQInterface ensues form this class. +" +Class { + #name : #ZMQLibraryInterface, + #superclass : #Object, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Core' +} + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> cleanUp [ + +] + +{ #category : #'library path' } +ZMQLibraryInterface class >> ffiLibraryName [ + ^ LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> version [ + + | tmp1 tmp2 tmp3 tmp4 tmp5 | + tmp1 := 0 pointer. + tmp2 := tmp1 size. + tmp3 := 0 pointer. + tmp4 := 0 pointer. + self zmq_version: tmp1 with: tmp3 with: tmp4. + tmp5 := String new writeStream. + tmp5 + nextPutAll: (tmp1 integerAt: 1 size: tmp2 signed: true) printString; + nextPut: $.; + nextPutAll: (tmp3 integerAt: 1 size: tmp2 signed: true) printString; + nextPut: $.; + nextPutAll: (tmp4 integerAt: 1 size: tmp2 signed: true) printString. + ^ tmp5 contents +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_bind: s with: addr [ + ^self ffiCall: #(int zmq_bind(void *s, const char *addr)) module: LibZMQ + +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_close: s [ + + ^ self + ffiCall: #( int zmq_close #( void * s ) ) + module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_connect: aSocket with: anAddress [ + + ^ self + ffiCall: #( int zmq_connect #( void * aSocket , const char * anAddress ) ) + module: LibZMQ + +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_disconnect: s with: addr [ + ^self ffiCall: #(zmq_disconnect (void *s, const char *addr)) module: LibZMQ + +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_errno [ + ^self ffiCall: #(int zmq_errno (void)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_getsockopt: s with: option with: optval with: optvallen [ + + ^ self + ffiCall: + #( int zmq_getsockopt #( void * s , int option , void * optval + , size_t * optvallen ) ) + module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_init: io_threads [ + + ^ self + ffiCall: #( ZMQContext zmq_init #( int io_threads ) ) + module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_msg_close: msg [ + ^self ffiCall: #(int zmq_msg_close(ZMQ_msg_t * msg)) module: LibZMQ + +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_msg_data: msg [ + ^self ffiCall: #( void * zmq_msg_data(ZMQ_msg_t * msg)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_msg_init: msg [ + ^self ffiCall: #( int zmq_msg_init(ZMQ_msg_t * msg)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_msg_init: s msg: msg with: flags [ + ^self zmq_msg_init: msg +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_msg_more: message [ + ^self ffiCall: #( int zmq_msg_more (ZMQ_msg_t *message)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_msg_size: msg [ + ^self ffiCall: #( size_t zmq_msg_size(ZMQ_msg_t * msg)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_recv: socket with: buffer with: len with: flags [ + + | lib | + lib := LibZMQ. + flags == ZMQ_WAIT ifTrue: [ lib := LibZMQThreaded ]. + + ^ self + ffiCall: + #( int zmq_recv #( void * socket , void * buffer , size_t len , int + flags ) ) + module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_recvmsg: s msg: msg with: flags [ + + | lib | + lib := LibZMQ. + flags == ZMQ_WAIT ifTrue: [ lib := LibZMQThreaded ]. + ^ self + ffiCall: + #( int zmq_recvmsg #( void * s , ZMQ_msg_t * msg , int flags ) ) + module: lib +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_send: socket with: buffer with: len with: flags [ + ^self ffiCall: #( int zmq_send(void * socket, const void * buffer, size_t len, int flags)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_setsockopt: s with: option with: optval with: optvallen [ + ^self ffiCall: #(int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_socket: context type: type [ + + ^ self + ffiCall: #( ZMQSocket zmq_socket #( void * context , int type ) ) + module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_strerror: errnum [ + ^self ffiCall: #(const char *zmq_strerror (int errnum)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_term: context [ + ^self ffiCall: #(int zmq_term (void *context)) module: LibZMQ +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_unbind: s with: addr [ + ^self ffiCall: #(int zmq_unbind(void *s, const char *addr)) module: LibZMQ + +] + +{ #category : #'procedures-zmq' } +ZMQLibraryInterface class >> zmq_version: major with: minor with: patch [ + ^self ffiCall: #(void zmq_version (int *major, int *minor, int *patch)) module: LibZMQ +] + +{ #category : #accessing } +ZMQLibraryInterface >> ffiLibraryName [ + ^ self class ffiLibraryName +] diff --git a/src/ZMQ/ZMQSampleAbstract.class.st b/src/ZMQ/ZMQSampleAbstract.class.st new file mode 100644 index 0000000..d7970de --- /dev/null +++ b/src/ZMQ/ZMQSampleAbstract.class.st @@ -0,0 +1,347 @@ +" +ZMQSampleAbstract contains all the common methods to the different messaging patterns that you'll find as its subclasses. + +Responsibility part: I can send and receive messages from a server connected on the same IP/port address as I am. + +Collaborators Part: I mainly rely on ZMQ-Core . + +Thank's to me, my subclasses can send and receive messages. They receive them via threads. Also, they can clean themselves up so that a socket can be destroyed. +I am in charge of initializing ZMQConstants if it isn't done prior to me. + +As I am an Abstract method, there is no use to create a instance of myself. You can still check the comments and methods of my subclasses if you want to understand how you can use me. + + Instance Variables: +thread: , used to receive messages +currentMsg: , gives the last message received/sent +threadIsWorking: , is there a normal thread working +threadWorker: , Used if a blocking function is in used, nil otherwise +threaded: , true if you use a Worker but false by default. +topic: , contains topics to which the class subscribes, '' if not told otherwise. +" +Class { + #name : #ZMQSampleAbstract, + #superclass : #Object, + #instVars : [ + 'thread', + 'currentMsg', + 'threadIsWorking', + 'threadWorker', + 'threaded', + 'topicSender', + 'topicReceiver' + ], + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleAbstract class >> defaultIP [ + + ^ '127.0.0.1' +] + +{ #category : #accessing } +ZMQSampleAbstract >> addTopicReceiver: aString [ + + topicReceiver addLast: aString asString +] + +{ #category : #accessing } +ZMQSampleAbstract >> addTopicsReceiver: aCollection [ +aCollection do: [ :e| self addTopicReceiver: e ] +] + +{ #category : #cleanup } +ZMQSampleAbstract >> cleanUp [ +"Empties a socket when it's not nil" + self stopThreadAndWorker. + self socketReceiver ifNotNil: [ + self socketReceiver release. + self socketReceiver: nil ]. + self socketSender ifNotNil: [ + self socketSender release. + self socketSender: nil ]. + ^ self +] + +{ #category : #accessing } +ZMQSampleAbstract >> currentMsg [ + + ^ currentMsg +] + +{ #category : #accessing } +ZMQSampleAbstract >> currentMsg: anObject [ + + currentMsg := anObject +] + +{ #category : #'start-stop' } +ZMQSampleAbstract >> initialize [ + "Initializes variables" + + threadIsWorking := false. + thread := nil. + topicSender := String new. + topicReceiver := OrderedCollection new. + ZMQConstants initConstants +] + +{ #category : #accessing } +ZMQSampleAbstract >> msgDontWait [ + "Non-blocking method to receive messages" + + ^ self socketReceiver receiveMsgDontWait +] + +{ #category : #accessing } +ZMQSampleAbstract >> msgWait [ +"Blocking method to receive messages" + ^ self socketReceiver receiveMsg +] + +{ #category : #'accessing - children' } +ZMQSampleAbstract >> processName [ + + ^ self subclassResponsibility +] + +{ #category : #accessing } +ZMQSampleAbstract >> pushMessage: aText [ + "Puts a given aText in the Transcript" + + Transcript + show: aText; + show: String cr. + ^ self +] + +{ #category : #accessing } +ZMQSampleAbstract >> rcvDontWait [ + "Non-blocking method to receive messages" + + | lastText cmsg actual_topic aSize | + aSize := 100. + lastText := self socketReceiver recv: aSize flags: ZMQ_DONTWAIT. + lastText = (ByteArray new: aSize withAll: 0) ifTrue: [^self]. + cmsg := lastText asString. + cmsg == '' ifTrue: [ ^ self ]. + + 1 to: topicReceiver size do: [ :e | + actual_topic := topicReceiver at: e. + (actual_topic , '*' match: cmsg) ifTrue: [ + cmsg := cmsg allButFirst: actual_topic size. + currentMsg := actual_topic -> cmsg. + self pushMessage: + 'Correspondant ' , currentMsg key , ' - ' , currentMsg value. + ^ self ] ] +] + +{ #category : #accessing } +ZMQSampleAbstract >> receiveData [ + "Receive data in a non-blocking way" + + | lastText cmsg actual_topic | + lastText := self msgDontWait. + lastText size = 1 ifFalse: [ ^ self ]. + cmsg := lastText first. + cmsg := cmsg asString. + cmsg isEmpty ifTrue: [ ^ self ]. + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + 1 to: topicReceiver size do: [ :e | + actual_topic := topicReceiver at: e. + (actual_topic , '*' match: cmsg) ifTrue: [ + cmsg := cmsg allButFirst: actual_topic size. + currentMsg := actual_topic -> cmsg. + self pushMessage: + 'Correspondant ' , currentMsg key , ' - ' , currentMsg value. + ^ self ] ] +] + +{ #category : #accessing } +ZMQSampleAbstract >> receiveDataWithWorker [ + "Receive data in a blocking way using a Worker Thread" + + | lastText cmsg | + lastText := self msgWait. + lastText size = 1 ifFalse: [ ^ self ]. + cmsg := lastText first. + cmsg := cmsg asString. + cmsg isEmpty ifFalse: [ self pushMessage: cmsg ] +] + +{ #category : #accessing } +ZMQSampleAbstract >> send: aMessage [ + " send aMessage from a socket sender to a socket receiver" + + topicSender ifNil: [ topicSender := '' ]. + currentMsg := topicSender -> aMessage. + self socketSender send: topicSender , aMessage. + self pushMessage: 'You ' , currentMsg key , ' - ' , currentMsg value. + ^ self +] + +{ #category : #'accessing - children' } +ZMQSampleAbstract >> socketReceiver [ + + ^ self subclassResponsibility +] + +{ #category : #'accessing - children' } +ZMQSampleAbstract >> socketReceiver:aSocket [ + + ^ self subclassResponsibility +] + +{ #category : #'accessing - children' } +ZMQSampleAbstract >> socketSender [ + + ^ self subclassResponsibility +] + +{ #category : #'accessing - children' } +ZMQSampleAbstract >> socketSender: aSocket [ + + ^ self subclassResponsibility +] + +{ #category : #'start-stop' } +ZMQSampleAbstract >> startThread [ +" Used to receive messages in a non-blocking way" + | startedBloc | + self stopThread. + + threadIsWorking := true. + startedBloc := [ + [ threadIsWorking ] whileTrue: [ + self receiveData. + 100 milliSeconds wait ] ]. + thread := startedBloc + forkAt: Processor userBackgroundPriority + named: self processName , ' thread' +] + +{ #category : #'start-stop' } +ZMQSampleAbstract >> startThreadRcvBuffer [ + " Used to receive messages in a non-blocking way with a buffer" + + | startedBloc | + self stopThread. + threadIsWorking := true. + startedBloc := [ + [ threadIsWorking ] whileTrue: [ + self rcvDontWait. + 100 milliSeconds wait ] ]. + thread := startedBloc + forkAt: Processor userBackgroundPriority + named: self processName , ' RcvBuffer' +] + +{ #category : #'start-stop' } +ZMQSampleAbstract >> startThreadWorker [ +" Used to receive messages in a blocking way in a TFWorker" + | startedBloc | + self stopThreadAndWorker. + threadWorker := TFWorker named: 'ZMQ'. + threadWorker doInitialize. + LibZMQThreaded tfWorker: threadWorker. + threadIsWorking := true. + startedBloc := [ + [ threadIsWorking ] whileTrue: [ + self receiveDataWithWorker. + 100 milliSeconds wait ] ]. + thread := startedBloc + forkAt: Processor userBackgroundPriority + named: self processName , ' worker' +] + +{ #category : #accessing } +ZMQSampleAbstract >> started [ + + ^ threadIsWorking +] + +{ #category : #accessing } +ZMQSampleAbstract >> started: anObject [ + + threadIsWorking := anObject +] + +{ #category : #'start-stop' } +ZMQSampleAbstract >> stopThread [ +"Stops a thread" + threadIsWorking := false. + + thread ifNotNil: [ + thread terminate. + thread := nil ] +] + +{ #category : #'start-stop' } +ZMQSampleAbstract >> stopThreadAndWorker [ +" Stops a Worker if it exists, and the normal thread" + self stopThread. + threadWorker ifNotNil: [ + threadWorker release. + threadWorker := nil ] +] + +{ #category : #accessing } +ZMQSampleAbstract >> thread [ + + ^ thread +] + +{ #category : #accessing } +ZMQSampleAbstract >> thread: anObject [ + + thread := anObject +] + +{ #category : #accessing } +ZMQSampleAbstract >> threaded [ +threaded ifNil: [ threaded := false ]. + ^ threaded +] + +{ #category : #accessing } +ZMQSampleAbstract >> threaded: anObject [ + + threaded := anObject +] + +{ #category : #accessing } +ZMQSampleAbstract >> topicReceiver [ + + ^ topicReceiver +] + +{ #category : #accessing } +ZMQSampleAbstract >> topicReceiver: aString [ + "Discards all current existing topics to replace them with aString" + + topicReceiver := OrderedCollection new + add: aString asString; + yourself +] + +{ #category : #accessing } +ZMQSampleAbstract >> topicSender [ + + ^topicSender +] + +{ #category : #accessing } +ZMQSampleAbstract >> topicSender: aString [ + + topicSender := aString +] + +{ #category : #accessing } +ZMQSampleAbstract >> topicsReceiver: aCollection [ + +topicReceiver := aCollection. +] diff --git a/src/ZMQ/ZMQSampleAbstractPUBSUB.class.st b/src/ZMQ/ZMQSampleAbstractPUBSUB.class.st new file mode 100644 index 0000000..31d1f8e --- /dev/null +++ b/src/ZMQ/ZMQSampleAbstractPUBSUB.class.st @@ -0,0 +1,56 @@ +" +I am an Abstract class that gathers common accessors of the different sockets owned by Client and Server in a PUB-SUB pattern. +There is no use in creating an instance of myself. For an example, look at my subclasses. + +Instance Variables +socketP: of type Publisher, accessible through socketSender +socketS: of type Subscriber, accessible through socketReceiver + +" +Class { + #name : #ZMQSampleAbstractPUBSUB, + #superclass : #ZMQSampleAbstract, + #instVars : [ + 'socketP', + 'socketS' + ], + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #connecting } +ZMQSampleAbstractPUBSUB >> connectPublisher [ + ^ self subclassResponsibility +] + +{ #category : #'accessing - children' } +ZMQSampleAbstractPUBSUB >> processName [ + + ^ self subclassResponsibility +] + +{ #category : #accessing } +ZMQSampleAbstractPUBSUB >> socketReceiver [ +^socketS +] + +{ #category : #accessing } +ZMQSampleAbstractPUBSUB >> socketReceiver: aSocketValue [ + + socketS := aSocketValue + +] + +{ #category : #accessing } +ZMQSampleAbstractPUBSUB >> socketSender [ + + ^ socketP +] + +{ #category : #accessing } +ZMQSampleAbstractPUBSUB >> socketSender: aSocketValue [ + + socketP := aSocketValue +] diff --git a/src/ZMQ/ZMQSampleAbstractTest.class.st b/src/ZMQ/ZMQSampleAbstractTest.class.st new file mode 100644 index 0000000..9d683e8 --- /dev/null +++ b/src/ZMQ/ZMQSampleAbstractTest.class.st @@ -0,0 +1,47 @@ +Class { + #name : #ZMQSampleAbstractTest, + #superclass : #TestCase, + #instVars : [ + 'serverOrClient' + ], + #category : #'ZMQ-Tests' +} + +{ #category : #running } +ZMQSampleAbstractTest >> setUp [ + + super setUp. + serverOrClient := ZMQSampleAbstract new. + ZMQConstants initConstants +] + +{ #category : #running } +ZMQSampleAbstractTest >> tearDown [ + + serverOrClient := nil. + super tearDown +] + +{ #category : #tests } +ZMQSampleAbstractTest >> testSampleInitialize [ + + self assert: serverOrClient thread equals: nil. + self assert: serverOrClient started equals: false +] + +{ #category : #tests } +ZMQSampleAbstractTest >> testSampleTopics [ + + | topics | + topics := OrderedCollection new. + 1 to: (Random new nextInteger: 10) do: [ :c | + topics addLast: (Random new nextInteger: 1000) asString ]. + + serverOrClient addTopicsReceiver: topics . + self assert: serverOrClient topicReceiver equals: topics. + topics := Random new nextInteger: 1000 . + serverOrClient topicReceiver: topics. + self + assert: serverOrClient topicReceiver + equals: (OrderedCollection new add: topics asString ; yourself) +] diff --git a/src/ZMQ/ZMQSampleClientPUBSUB.class.st b/src/ZMQ/ZMQSampleClientPUBSUB.class.st new file mode 100644 index 0000000..c0ae774 --- /dev/null +++ b/src/ZMQ/ZMQSampleClientPUBSUB.class.st @@ -0,0 +1,73 @@ +" +ZMQSampleClientPUBSUB provides an example on how to use ZMQ, on the client side, using a PUB-SUB pattern. + +Responsibility part: I can send and receive messages from a server connected on the same IP/port address as I am. Each socket is unidirectional. I can subscribe to a list of different topics. + +Collaborators Part: I mainly rely on ZMQ-Core and ZMQSampleAbstract + +Here is an example on how to use this class : +client := ZMQSampleClientPUBSUB new. ""Creates instance"" +client connectSubscriber. ""Connects sockets"" +client connectPublisher. +client send: 'Hello World'. ""Sends message to clients who subscribe to this subject or to none."" +client cleanUp. ""Destroys the socket"" + + +" +Class { + #name : #ZMQSampleClientPUBSUB, + #superclass : #ZMQSampleAbstractPUBSUB, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUB class >> defaultPortPUB [ + + ^ 31002 +] + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUB class >> defaultPortSUB [ + + ^ 31001 +] + +{ #category : #connecting } +ZMQSampleClientPUBSUB >> connectPublisher [ + "Socket connection as a Publisher" + + self socketSender: (ZMQSocket new: ZMQ_PUB). + self socketSender connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortPUB printString +] + +{ #category : #connecting } +ZMQSampleClientPUBSUB >> connectSubscriber [ + "Socket connection as a Subscriber" + + self socketReceiver: (ZMQSocket new: ZMQ_SUB). + self socketReceiver connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortSUB printString. + + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + "Subscribe to value's argument topics. Can only receive messages beginning by said values" + 1 to: topicReceiver size do: [ :e | + self socketReceiver + optionKey: ZMQ_SUBSCRIBE + optValue: (topicReceiver at: e) ]. + +" self socketReceiver optionKey: ZMQ_BACKLOG optValue: '100'." + + self threaded + ifFalse: [ self startThread ] + ifTrue: [ self startThreadWorker ] +] + +{ #category : #accessing } +ZMQSampleClientPUBSUB >> processName [ +^'ZMQ Client' +] diff --git a/src/ZMQ/ZMQSampleClientPUBSUBGetOption.class.st b/src/ZMQ/ZMQSampleClientPUBSUBGetOption.class.st new file mode 100644 index 0000000..41b4f84 --- /dev/null +++ b/src/ZMQ/ZMQSampleClientPUBSUBGetOption.class.st @@ -0,0 +1,63 @@ +Class { + #name : #ZMQSampleClientPUBSUBGetOption, + #superclass : #ZMQSampleAbstractPUBSUB, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUBGetOption class >> defaultPortPUB [ + + ^ 31002 +] + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUBGetOption class >> defaultPortSUB [ + + ^ 31001 +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBGetOption >> connectPublisher [ + "Socket connection as a Publisher" + + self socketSender: (ZMQSocket new: ZMQ_PUB). + self socketSender connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortPUB printString +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBGetOption >> connectSubscriber [ + "Socket connection as a Subscriber" + + + self socketReceiver: (ZMQSocket new: ZMQ_SUB). + self socketReceiver connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortSUB printString. + + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + "Subscribe to value's argument topics. Can only receive messages beginning by said values" + 1 to: topicReceiver size do: [ :e | + self socketReceiver + optionKey: ZMQ_SUBSCRIBE + optValue: (topicReceiver at: e) ]. + + + self threaded + ifFalse: [ self startThread ] + ifTrue: [ self startThreadWorker ] +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBGetOption >> getOptNumber: anOptionNumber [ + "Socket connection as a Subscriber" +^ self socketSender optionKey: 19. +] + +{ #category : #accessing } +ZMQSampleClientPUBSUBGetOption >> processName [ +^'ZMQ Client' +] diff --git a/src/ZMQ/ZMQSampleClientPUBSUBRcvBuff.class.st b/src/ZMQ/ZMQSampleClientPUBSUBRcvBuff.class.st new file mode 100644 index 0000000..04889b4 --- /dev/null +++ b/src/ZMQ/ZMQSampleClientPUBSUBRcvBuff.class.st @@ -0,0 +1,68 @@ +" +ZMQSampleClientPUBSUBRcvBuff provides an example on how to use ZMQ, on the client side, using a PUB-SUB pattern with a receive method that uses a buffer as a message receiver. + +Responsibility part: I can send and receive messages from a server connected on the same IP/port address as I am. Each socket is unidirectional. I can subscribe to a list of different topics. + +Collaborators Part: I mainly rely on ZMQ-Core and ZMQSampleAbstract + +Here is an example on how to use this class : +client := ZMQSampleClientPUBSUBRcvBuff new. ""Creates instance"" +client connectSubscriberRcvBuffer. ""Connects sockets"" +client connectPublisher. +client send: 'Hello World'. ""Sends message to clients who subscribe to this subject or to none."" +client cleanUp. ""Destroys the socket"" +" +Class { + #name : #ZMQSampleClientPUBSUBRcvBuff, + #superclass : #ZMQSampleAbstractPUBSUB, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUBRcvBuff class >> defaultPortPUB [ + + ^ 31002 +] + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUBRcvBuff class >> defaultPortSUB [ + + ^ 31001 +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBRcvBuff >> connectPublisher [ + "Socket connection as a Publisher" + + self socketSender: (ZMQSocket new: ZMQ_PUB). + self socketSender connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortPUB printString +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBRcvBuff >> connectSubscriberRcvBuffer [ + "Socket connection as a Subscriber" + + self socketReceiver: (ZMQSocket new: ZMQ_SUB). + self socketReceiver connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortSUB printString. + + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + "Subscribe to value's argument topics. Can only receive messages beginning by said values" + 1 to: topicReceiver size do: [ :e | + self socketReceiver + optionKey: ZMQ_SUBSCRIBE + optValue: (topicReceiver at: e) ]. + + self startThreadRcvBuffer +] + +{ #category : #accessing } +ZMQSampleClientPUBSUBRcvBuff >> processName [ + + ^ 'ZMQ Client recv Buffer' +] diff --git a/src/ZMQ/ZMQSampleClientPUBSUBSetOption.class.st b/src/ZMQ/ZMQSampleClientPUBSUBSetOption.class.st new file mode 100644 index 0000000..6a47445 --- /dev/null +++ b/src/ZMQ/ZMQSampleClientPUBSUBSetOption.class.st @@ -0,0 +1,59 @@ +Class { + #name : #ZMQSampleClientPUBSUBSetOption, + #superclass : #ZMQSampleAbstractPUBSUB, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUBSetOption class >> defaultPortPUB [ + + ^ 31002 +] + +{ #category : #'accessing - defaults' } +ZMQSampleClientPUBSUBSetOption class >> defaultPortSUB [ + + ^ 31001 +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBSetOption >> connectPublisher [ + "Socket connection as a Publisher" + + self socketSender: (ZMQSocket new: ZMQ_PUB). + self socketSender connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortPUB printString +] + +{ #category : #connecting } +ZMQSampleClientPUBSUBSetOption >> connectSubscriber [ + "Socket connection as a Subscriber" + + | bytes | + self socketReceiver: (ZMQSocket new: ZMQ_SUB). + self socketReceiver connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortSUB printString. + + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + "Subscribe to value's argument topics. Can only receive messages beginning by said values" + 1 to: topicReceiver size do: [ :e | + self socketReceiver + optionKey: ZMQ_SUBSCRIBE + optValue: (topicReceiver at: e) ]. + bytes := (FFITypeArray ofType: 'int' size: 1) new. + bytes at: 1 put: 100. + self socketReceiver optionKey: ZMQ_BACKLOG optValue: bytes. + + self threaded + ifFalse: [ self startThread ] + ifTrue: [ self startThreadWorker ] +] + +{ #category : #accessing } +ZMQSampleClientPUBSUBSetOption >> processName [ +^'ZMQ Client' +] diff --git a/src/ZMQ/ZMQSampleClientPUBSUBTest.class.st b/src/ZMQ/ZMQSampleClientPUBSUBTest.class.st new file mode 100644 index 0000000..6d02052 --- /dev/null +++ b/src/ZMQ/ZMQSampleClientPUBSUBTest.class.st @@ -0,0 +1,49 @@ +Class { + #name : #ZMQSampleClientPUBSUBTest, + #superclass : #TestCase, + #instVars : [ + 'client' + ], + #category : #'ZMQ-Tests' +} + +{ #category : #running } +ZMQSampleClientPUBSUBTest >> setUp [ + super setUp. + client := ZMQSampleClientPUBSUB new. + ZMQConstants initConstants . +] + +{ #category : #running } +ZMQSampleClientPUBSUBTest >> tearDown [ +client cleanUp. + client := nil. + super tearDown +] + +{ #category : #running } +ZMQSampleClientPUBSUBTest >> testClientConnectPublisher [ + + client connectPublisher. + self deny: client socketSender equals: nil. + self assert: client socketSender url equals: 'tcp://127.0.0.1:31002' +] + +{ #category : #running } +ZMQSampleClientPUBSUBTest >> testClientConnectSubscriber [ + + client connectSubscriber. + self deny: client socketReceiver equals: nil. + self assert: client socketReceiver url equals: 'tcp://127.0.0.1:31001' +] + +{ #category : #running } +ZMQSampleClientPUBSUBTest >> testServerCleanUp [ + + client connectPublisher. + client connectSubscriber. + client cleanUp. + + self assert: client socketReceiver equals: nil. + self assert: client socketSender equals: nil +] diff --git a/src/ZMQ/ZMQSampleReply.class.st b/src/ZMQ/ZMQSampleReply.class.st new file mode 100644 index 0000000..da42647 --- /dev/null +++ b/src/ZMQ/ZMQSampleReply.class.st @@ -0,0 +1,74 @@ +" +This class is a ZMQ Server with a Reply pattern for its socket. + +Once connected to a client in Request pattern, I can wait for its response and when received, send messages to it. I'm connected to the same port as the server's Request. I can't send 2 messages in a row, I have to wait for a reply from my Client. +I can't subscribe to any topic, I will receive every message sent to my url. + +Collaborators Part: I mainly rely on ZMQ-Core and ZMQSampleAbstract + +Example to initialize one of my instances : +server := ZMQSampleReply new. +server connectREP. +server send: 'Hello World'. +server cleanUp. + +Instance Variables +socketREP: , type Reply accessible through socketReceiver and socketSender +" +Class { + #name : #ZMQSampleReply, + #superclass : #ZMQSampleAbstract, + #instVars : [ + 'socketREP' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleReply class >> defaultPortREP [ + + ^ 31002 +] + +{ #category : #connecting } +ZMQSampleReply >> connectReply [ + "Socket connectionas a server to a request client" + + self socketSender: (ZMQSocket new: ZMQ_REP). + self socketSender bind: 'tcp://' , self class defaultIP , ':' + , self class defaultPortREP printString. + self threaded + ifFalse: [ self startThread ] + ifTrue: [ self startThreadWorker ] +] + +{ #category : #accessing } +ZMQSampleReply >> processName [ + + ^ 'ZMQ Reply' +] + +{ #category : #accessing } +ZMQSampleReply >> socketReceiver [ + + + ^ socketREP +] + +{ #category : #accessing } +ZMQSampleReply >> socketReceiver: aSocketValue [ + + socketREP := aSocketValue +] + +{ #category : #accessing } +ZMQSampleReply >> socketSender [ + + ^ socketREP +] + +{ #category : #accessing } +ZMQSampleReply >> socketSender: aSocketValue [ + + socketREP := aSocketValue +] diff --git a/src/ZMQ/ZMQSampleReplyTest.class.st b/src/ZMQ/ZMQSampleReplyTest.class.st new file mode 100644 index 0000000..dac313b --- /dev/null +++ b/src/ZMQ/ZMQSampleReplyTest.class.st @@ -0,0 +1,40 @@ +Class { + #name : #ZMQSampleReplyTest, + #superclass : #TestCase, + #instVars : [ + 'reply' + ], + #category : #'ZMQ-Tests' +} + +{ #category : #running } +ZMQSampleReplyTest >> setUp [ + + super setUp. + reply := ZMQSampleReply new. + ZMQConstants initConstants +] + +{ #category : #running } +ZMQSampleReplyTest >> tearDown [ +reply cleanUp. + reply := nil. + super tearDown +] + +{ #category : #tests } +ZMQSampleReplyTest >> testServerCleanUp [ + + reply connectReply. + reply cleanUp. + + self assert: reply socketReceiver equals: nil +] + +{ #category : #tests } +ZMQSampleReplyTest >> testServerConnectReply [ + + reply connectReply. + self deny: reply socketReceiver equals: nil. + self assert: reply socketReceiver url equals: 'tcp://127.0.0.1:31002' +] diff --git a/src/ZMQ/ZMQSampleRequest.class.st b/src/ZMQ/ZMQSampleRequest.class.st new file mode 100644 index 0000000..e5bfdbe --- /dev/null +++ b/src/ZMQ/ZMQSampleRequest.class.st @@ -0,0 +1,74 @@ +" +This class is a ZMQ Client with a Request pattern for its socket. + +Once connected to a server in Reply pattern, I can send messages to it and wait for its response. I'm connected to the same port as the server's Reply. I can't send 2 messages in a row, I have to wait for a reply from my Server. +Collaborators Part: I mainly rely on ZMQ-Core and ZMQSampleAbstract +I can't subscribe to any topic, I will receive every message sent to my url. + + Example to initialize one of my instances : +client := ZMQSampleRequest new. +client connectREQ. +client send: 'Hello World'. +client cleanUp. + + + Instance Variables +socketREQ: , type Request accessible through socketReceiver and socketSender +" +Class { + #name : #ZMQSampleRequest, + #superclass : #ZMQSampleAbstract, + #instVars : [ + 'socketREQ' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleRequest class >> defaultPortREQ [ + + ^ 31002 +] + +{ #category : #connecting } +ZMQSampleRequest >> connectRequest [ + "Socket connection as a client to a reply server" + + self socketSender: (ZMQSocket new: ZMQ_REQ). + self socketSender connect: 'tcp://' , self class defaultIP , ':' + , self class defaultPortREQ printString. + + + self threaded + ifFalse: [ self startThread ] + ifTrue: [ self startThreadWorker ] +] + +{ #category : #accessing } +ZMQSampleRequest >> processName [ +^ 'ZMQ Request' +] + +{ #category : #accessing } +ZMQSampleRequest >> socketReceiver [ + + ^ socketREQ +] + +{ #category : #accessing } +ZMQSampleRequest >> socketReceiver: aSocketValue [ + + socketREQ := aSocketValue +] + +{ #category : #accessing } +ZMQSampleRequest >> socketSender [ + + ^ socketREQ +] + +{ #category : #action } +ZMQSampleRequest >> socketSender: aSocketValue [ + + socketREQ := aSocketValue +] diff --git a/src/ZMQ/ZMQSampleRequestTest.class.st b/src/ZMQ/ZMQSampleRequestTest.class.st new file mode 100644 index 0000000..adb4ee4 --- /dev/null +++ b/src/ZMQ/ZMQSampleRequestTest.class.st @@ -0,0 +1,41 @@ +Class { + #name : #ZMQSampleRequestTest, + #superclass : #TestCase, + #instVars : [ + 'request' + ], + #category : #'ZMQ-Tests' +} + +{ #category : #running } +ZMQSampleRequestTest >> setUp [ + + super setUp. + request := ZMQSampleRequest new. + ZMQConstants initConstants +] + +{ #category : #running } +ZMQSampleRequestTest >> tearDown [ + +request cleanUp. + request := nil. + super tearDown +] + +{ #category : #running } +ZMQSampleRequestTest >> testSampleConnectRequest [ + + request connectRequest. + self deny: request socketSender equals: nil. + self assert: request socketSender url equals: 'tcp://127.0.0.1:31002' +] + +{ #category : #running } +ZMQSampleRequestTest >> testServerCleanUp [ + + request connectRequest. + request cleanUp. + + self assert: request socketSender equals: nil +] diff --git a/src/ZMQ/ZMQSampleServerPUBSUB.class.st b/src/ZMQ/ZMQSampleServerPUBSUB.class.st new file mode 100644 index 0000000..6bab398 --- /dev/null +++ b/src/ZMQ/ZMQSampleServerPUBSUB.class.st @@ -0,0 +1,79 @@ +" +ZMQSamplServer provides an example on how to use ZMQ, on the server side, using a SUB/PUB patterns. + +Responsibility part: I can send and receive messages from clients connected on the same IP/port address as I am. + +Collaborators Part: I mainly rely on ZMQ-Core and ZMQSampleAbstract + + + Here is an example on how to use this class : +serv := ZMQSampleServerPUBSUB new. ""Creates instance"" +serv connectSubscriber. ""Connects sockets"" +serv connectPublisher. +serv send: 'Hello World'. ""Sends message to clients who subscribe to this subject or to none."" +serv cleanUp. ""Destroys the socket"" + + + Instance Variables +socketP: of type Publisher, accessible through socketSender +socketS: of type Subscriber, accessible through socketReceiver + + + +" +Class { + #name : #ZMQSampleServerPUBSUB, + #superclass : #ZMQSampleAbstractPUBSUB, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleServerPUBSUB class >> defaultPortPUB [ + + ^ 31001 +] + +{ #category : #'accessing - defaults' } +ZMQSampleServerPUBSUB class >> defaultPortSUB [ + + ^ 31002 +] + +{ #category : #connecting } +ZMQSampleServerPUBSUB >> connectPublisher [ + "Socket connection as Publisher" + + self socketSender: (ZMQSocket new: ZMQ_PUB). + self socketSender bind: 'tcp://' , self class defaultIP , ':' + , self class defaultPortPUB printString +] + +{ #category : #connecting } +ZMQSampleServerPUBSUB >> connectSubscriber [ + "Socket connection as Subscriber" + + self socketReceiver: (ZMQSocket new: ZMQ_SUB). + self socketReceiver bind: 'tcp://' , self class defaultIP , ':' + , self class defaultPortSUB printString. + + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + "Subscribe to value's argument topics. Can only receive messages beginning by said values" + 1 to: topicReceiver size do: [ :e | + self socketReceiver + optionKey: ZMQ_SUBSCRIBE + optValue: (topicReceiver at: e) ]. + + self threaded + ifFalse: [ self startThread ] + ifTrue: [ self startThreadWorker ] +] + +{ #category : #accessing } +ZMQSampleServerPUBSUB >> processName [ + + ^ 'ZMQ Server' +] diff --git a/src/ZMQ/ZMQSampleServerPUBSUBRcvBuff.class.st b/src/ZMQ/ZMQSampleServerPUBSUBRcvBuff.class.st new file mode 100644 index 0000000..6affd72 --- /dev/null +++ b/src/ZMQ/ZMQSampleServerPUBSUBRcvBuff.class.st @@ -0,0 +1,72 @@ +" +ZMQSampleServerPUBSUBRcvBuff provides an example on how to use ZMQ, on the server side, using a SUB/PUB patternswith a receive method that uses a buffer as a message receiver. + +Responsibility part: I can send and receive messages from clients connected on the same IP/port address as I am. + +Collaborators Part: I mainly rely on ZMQ-Core and ZMQSampleAbstract + +Here is an example on how to use this class : +serv := ZMQSampleServerPUBSUBRcvBuff new. ""Creates instance"" +serv connectSubscriberRcvBuffer. ""Connects sockets"" +serv connectPublisher. +serv send: 'Hello World'. ""Sends message to clients who subscribe to this subject or to none."" +serv cleanUp. ""Destroys the socket"" + +Instance Variables +socketP: of type Publisher, accessible through socketSender +socketS: of type Subscriber, accessible through socketReceiver +" +Class { + #name : #ZMQSampleServerPUBSUBRcvBuff, + #superclass : #ZMQSampleAbstractPUBSUB, + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Samples' +} + +{ #category : #'accessing - defaults' } +ZMQSampleServerPUBSUBRcvBuff class >> defaultPortPUB [ + + ^ 31001 +] + +{ #category : #'accessing - defaults' } +ZMQSampleServerPUBSUBRcvBuff class >> defaultPortSUB [ + + ^ 31002 +] + +{ #category : #connecting } +ZMQSampleServerPUBSUBRcvBuff >> connectPublisher [ + "Socket connection as Publisher" + + self socketSender: (ZMQSocket new: ZMQ_PUB). + self socketSender bind: 'tcp://' , self class defaultIP , ':' + , self class defaultPortPUB printString +] + +{ #category : #connecting } +ZMQSampleServerPUBSUBRcvBuff >> connectSubscriberRcvBuffer [ + "Socket connection as a Subscriber" + + self socketReceiver: (ZMQSocket new: ZMQ_SUB). + self socketReceiver bind: 'tcp://' , self class defaultIP , ':' + , self class defaultPortSUB printString. + + topicReceiver ifEmpty: [ topicReceiver addLast: '' ]. + + "Subscribe to value's argument topics : here topicReceiver. Can only receive messages beginning by said values" + 1 to: topicReceiver size do: [ :e | + self socketReceiver + optionKey: ZMQ_SUBSCRIBE + optValue: (topicReceiver at: e) ]. + + self startThreadRcvBuffer +] + +{ #category : #'accessing - children' } +ZMQSampleServerPUBSUBRcvBuff >> processName [ + + ^ 'ZMQ Server recv Buffer' +] diff --git a/src/ZMQ/ZMQSampleServerPUBSUBTest.class.st b/src/ZMQ/ZMQSampleServerPUBSUBTest.class.st new file mode 100644 index 0000000..c220013 --- /dev/null +++ b/src/ZMQ/ZMQSampleServerPUBSUBTest.class.st @@ -0,0 +1,51 @@ +Class { + #name : #ZMQSampleServerPUBSUBTest, + #superclass : #TestCase, + #instVars : [ + 'server' + ], + #category : #'ZMQ-Tests' +} + +{ #category : #running } +ZMQSampleServerPUBSUBTest >> setUp [ + + super setUp. + server := ZMQSampleServerPUBSUB new. + ZMQConstants initConstants . + +] + +{ #category : #running } +ZMQSampleServerPUBSUBTest >> tearDown [ + server cleanUp. + server := nil. + super tearDown +] + +{ #category : #'test server' } +ZMQSampleServerPUBSUBTest >> testServerCleanUp [ + + server connectPublisher. + server connectSubscriber. + server cleanUp. + + self assert: server socketReceiver equals: nil. + self assert: server socketSender equals: nil +] + +{ #category : #'test server' } +ZMQSampleServerPUBSUBTest >> testServerConnectPublisher [ + + server connectPublisher. + self deny: server socketSender equals: nil. + self assert: server socketSender url equals: 'tcp://127.0.0.1:31001' +] + +{ #category : #'test server' } +ZMQSampleServerPUBSUBTest >> testServerConnectSubscriber [ + + server connectSubscriber. + self deny: server socketReceiver equals: nil. + self assert: server socketReceiver url equals: 'tcp://127.0.0.1:31002' +] diff --git a/src/ZMQ/ZMQSocket.class.st b/src/ZMQ/ZMQSocket.class.st new file mode 100644 index 0000000..118b256 --- /dev/null +++ b/src/ZMQ/ZMQSocket.class.st @@ -0,0 +1,302 @@ +" +ZMQ Socket contains all actions that you can do with a socket. Warning, if ou want to use synchronous calls with a TFWorker, not all sockets supports multi-threading and the image might crash or freeze during the use. +REP-REQ, PUB-SUB are not thread safe. + +Collaborators Part: I mainly rely on ZMQInterface and ZMQConstants. +To understand how to use it, please refer to ZMQ-Sample. + +Instance Variables: + +pointer:, points to the socket's address in memory +context: +url:, url to bind/connect a socket to +type:, a number that describes the type of socket, see ZMQConstants for available values + +" +Class { + #name : #ZMQSocket, + #superclass : #FFIExternalObject, + #instVars : [ + 'context', + 'url', + 'type' + ], + #pools : [ + 'ZMQConstants' + ], + #category : #'ZMQ-Core' +} + +{ #category : #'instance creation' } +ZMQSocket class >> bind: anURLString type: aSocketType [ + + | socket | + socket := self context: (ZMQContext new: 1) type: aSocketType. + socket bind: anURLString. + ^socket +] + +{ #category : #'instance creation' } +ZMQSocket class >> connect: anURLString type: aSocketType [ + + | socket | + socket := self context: (ZMQContext new: 1) type: aSocketType. + socket connect: anURLString. + ^socket +] + +{ #category : #'instance creation' } +ZMQSocket class >> context: aContext type: aSocketType [ + + | socket ztype | + ztype := aSocketType isSymbol + ifTrue: [ ZMQInterface default resolveType: aSocketType ] + ifFalse: [ aSocketType ]. + socket := (ZMQInterface default socket: aContext getHandle type: ztype). + socket context: aContext. + socket type: ztype. + ^ socket +] + +{ #category : #'instance creation' } +ZMQSocket class >> new: aSocketType [ + ZMQConstants initConstants . + ^self type: aSocketType +] + +{ #category : #'instance creation' } +ZMQSocket class >> type: aSocketType [ + + ^self context: (ZMQContext new: 1) type: aSocketType +] + +{ #category : #'api - connect' } +ZMQSocket >> bind: anURLString [ + + | ret | + url := anURLString. + handle isNull ifFalse: [ + ret := ZMQInterface default socket: handle bind: anURLString . + self zmqErrorHandler: ret] ifTrue: [ self error: 'Null socket error' ]. + ^ self +] + +{ #category : #'api - connect' } +ZMQSocket >> cleanUp [ + + "type = ZMQ_REP ifTrue: [ self unbind: url ]. + type = ZMQ_REQ ifTrue: [ self disconnect: url ]." + "ZMQSampleClientPUBSUB ifTrue: [ self unbind: url ]. + self class = ZMQSampleServerPUBSUB ifTrue: [ self disconnect: url ]." + " This next line can't work as we don't know if its server or client plus it works without it and whats above" + "type = ZMQ_SUB ifTrue: [ self disconnect: url ]." + + self close. + context destroy +] + +{ #category : #'api - connect' } +ZMQSocket >> close [ + + | ret | + handle isNull ifFalse: [ ret := ZMQInterface default socketClose: handle. + self zmqErrorHandler: ret.] +ifTrue:[ self error: 'Null socket error' ]. + ^ self +] + +{ #category : #'api - connect' } +ZMQSocket >> connect: anURLString [ + + | ret | + url := anURLString. + handle isNull ifFalse: [ + ret := ZMQInterface default socket: handle connect: anURLString. + self zmqErrorHandler: ret.] + ifTrue: [ self error: 'Null socket error' ]. + ^ ret +] + +{ #category : #accessing } +ZMQSocket >> context [ + + ^context +] + +{ #category : #accessing } +ZMQSocket >> context: anObject [ + + context := anObject +] + +{ #category : #'api - connect' } +ZMQSocket >> disconnect: anURLString [ + + | ret | + handle isNull ifFalse: [ + ret := ZMQInterface default socket: handle disconnect: anURLString. + self zmqErrorHandler: ret.] + ifTrue: [ self error: 'Null socket error' ]. + ^ self +] + +{ #category : #'api - options' } +ZMQSocket >> optionKey: aKeyOption [ + + | ret | + handle isNull ifFalse: [ + ret := ZMQInterface default socket: handle getOption: aKeyOption ]. + self zmqErrorHandler: ret. + ^ ret +] + +{ #category : #'api - options' } +ZMQSocket >> optionKey: aKeyOption optValue: aValueOption [ + + | ret | + handle isNull ifFalse: [ + ret := ZMQInterface default + socket: handle + setOption: aKeyOption + optValue: aValueOption ]. + self zmqErrorHandler: ret. + ^ ret +] + +{ #category : #'api - messaging' } +ZMQSocket >> receive [ + + ^ self recv: 1000 flags: ZMQ_WAIT +] + +{ #category : #'api - messaging' } +ZMQSocket >> receiveDontWait [ + + ^ self recv: 1000 flags: ZMQ_DONTWAIT +] + +{ #category : #'api - messaging' } +ZMQSocket >> receiveMsg [ + + | ret | + ret := self recvMsgWithFlags: ZMQ_WAIT . + self zmqErrorHandler: ret. + ^ ret +] + +{ #category : #'api - messaging' } +ZMQSocket >> receiveMsgDontWait [ + + | ret | + ret := self recvMsgWithFlags: ZMQ_DONTWAIT . + self zmqErrorHandler: ret. + ^ ret +] + +{ #category : #'api - messaging' } +ZMQSocket >> recv: aMaxSize [ + + ^ self recv: aMaxSize flags: ZMQ_WAIT +] + +{ #category : #'api - messaging' } +ZMQSocket >> recv: aMaxSize flags: flags [ + + | aBuffer | + aBuffer := ByteArray new: aMaxSize. + ZMQInterface default + socket: handle + recv: aBuffer + len: aMaxSize + flags: flags. + + + ^ aBuffer contents +] + +{ #category : #'api - messaging' } +ZMQSocket >> recvMsgWithFlags: flags [ + + | message ret | + message := ValueHolder new. + ret := ZMQInterface default + socket: handle + recvMsg: message + flags: flags. + + ret = -1 ifTrue: [ + self error: ' on receive:' + , (ZMQLibraryInterface zmq_strerror: ZMQLibraryInterface zmq_errno) ]. + ^ message contents +] + +{ #category : #'initialize - release' } +ZMQSocket >> release [ + super release. + self cleanUp +] + +{ #category : #'api - messaging' } +ZMQSocket >> send: aStringOrByteArray [ + + | ret | + ret := self send: aStringOrByteArray flags: ZMQ_WAIT . + self zmqErrorHandler: ret. + ^ self +] + +{ #category : #'api - messaging' } +ZMQSocket >> send: aStringOrByteArray flags: flags [ + + | ret | + handle isNull ifFalse: [ + ret := ZMQInterface default + socket: handle + send: aStringOrByteArray + flags: flags ]. + self zmqErrorHandler: ret. + ^ ret +] + +{ #category : #accessing } +ZMQSocket >> type [ + + ^ type +] + +{ #category : #accessing } +ZMQSocket >> type: aType [ + + type := aType +] + +{ #category : #'api - connect' } +ZMQSocket >> unbind: anURLString [ + + | ret | + handle isNull + ifFalse: [ + ret := ZMQInterface default socket: handle unbind: anURLString. + self zmqErrorHandler: ret ] + ifTrue: [ self error: 'Null socket error' ]. + ^ self +] + +{ #category : #accessing } +ZMQSocket >> url [ + + ^ url +] + +{ #category : #accessing } +ZMQSocket >> url: anObject [ + + url := anObject +] + +{ #category : #'error handling' } +ZMQSocket >> zmqErrorHandler: aReturn [ +aReturn = -1 ifTrue: [ + self error: (ZMQLibraryInterface zmq_strerror: ZMQLibraryInterface zmq_errno) ]. + ^ aReturn +] diff --git a/src/ZMQ/ZMQ_msg_t.class.st b/src/ZMQ/ZMQ_msg_t.class.st new file mode 100644 index 0000000..eaecfd8 --- /dev/null +++ b/src/ZMQ/ZMQ_msg_t.class.st @@ -0,0 +1,42 @@ +" +FFI declaration of a zmq_msg_t. This is the format used to transmit messages. +Don't access directly a message through this Class. + + +OFFSET__ UChar64: +" +Class { + #name : #'ZMQ_msg_t', + #superclass : #FFIExternalStructure, + #classVars : [ + 'OFFSET__', + 'UChar64' + ], + #category : #'ZMQ-Core' +} + +{ #category : #fields } +ZMQ_msg_t class >> fieldsDesc [ +" self rebuildFieldAccessors " +^#( + UChar64 _; + ) +] + +{ #category : #'class initialization' } +ZMQ_msg_t class >> initialize [ + UChar64 := FFITypeArray ofType: 'uchar' size: 64. + +] + +{ #category : #'accessing structure variables' } +ZMQ_msg_t >> _ [ + "This method was automatically generated" + ^(FFITypeArray ofType: #FFIUInt8 size: 64) fromHandle: (handle copyFrom: OFFSET__ to: OFFSET__ + 63) +] + +{ #category : #'accessing structure variables' } +ZMQ_msg_t >> _: anObject [ + "This method was automatically generated" + handle replaceFrom: OFFSET__ to: OFFSET__ + 63 with: anObject getHandle startingAt: 1 +] diff --git a/src/ZMQ/package.st b/src/ZMQ/package.st new file mode 100644 index 0000000..2d510eb --- /dev/null +++ b/src/ZMQ/package.st @@ -0,0 +1 @@ +Package { #name : #ZMQ }