-
Notifications
You must be signed in to change notification settings - Fork 269
Expand file tree
/
Copy pathAbstractConnectionProvider.java
More file actions
119 lines (101 loc) · 3.22 KB
/
AbstractConnectionProvider.java
File metadata and controls
119 lines (101 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package ua.naiksoftware.stomp.provider;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.util.Log;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import ua.naiksoftware.stomp.dto.LifecycleEvent;
/**
* Created by forresthopkinsa on 8/8/2017.
* <p>
* Created because there was a lot of shared code between JWS and OkHttp connection providers.
*/
public abstract class AbstractConnectionProvider implements ConnectionProvider {
private static final String TAG = AbstractConnectionProvider.class.getSimpleName();
@NonNull
private final PublishSubject<LifecycleEvent> lifecycleStream;
@NonNull
private final PublishSubject<String> messagesStream;
public AbstractConnectionProvider() {
lifecycleStream = PublishSubject.create();
messagesStream = PublishSubject.create();
}
@NonNull
@Override
public Observable<String> messages() {
return messagesStream.startWith(initSocket().toObservable());
}
/**
* Simply close socket.
* <p>
* For example:
* <pre>
* webSocket.close();
* </pre>
*/
protected abstract void rawDisconnect();
@Override
public Completable disconnect() {
return Completable
.fromAction(this::rawDisconnect);
}
private Completable initSocket() {
return Completable
.fromAction(this::createWebSocketConnection);
}
/**
* Most important method: connects to websocket and notifies program of messages.
* <p>
* See implementations in OkHttpConnectionProvider and WebSocketsConnectionProvider.
*/
protected abstract void createWebSocketConnection();
@NonNull
@Override
public Completable send(String stompMessage) {
return Completable.fromCallable(() -> {
if (getSocket() == null) {
throw new IllegalStateException("Not connected");
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
rawSend(stompMessage);
return null;
}
});
}
/**
* Just a simple message send.
* <p>
* For example:
* <pre>
* webSocket.send(stompMessage);
* </pre>
*
* @param stompMessage message to send
*/
protected abstract void rawSend(String stompMessage);
/**
* Get socket object.
* Used for null checking; this object is expected to be null when the connection is not yet established.
* <p>
* For example:
* <pre>
* return webSocket;
* </pre>
*/
@Nullable
protected abstract Object getSocket();
protected void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
lifecycleStream.onNext(lifecycleEvent);
}
protected void emitMessage(String stompMessage) {
Log.d(TAG, "Receive STOMP message: " + stompMessage);
messagesStream.onNext(stompMessage);
}
@NonNull
@Override
public Observable<LifecycleEvent> lifecycle() {
return lifecycleStream;
}
}