-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathFileServer.scala
More file actions
126 lines (111 loc) · 4.82 KB
/
FileServer.scala
File metadata and controls
126 lines (111 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package alpakka.env
import com.github.blemale.scaffeine.{Cache, Scaffeine}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.StatusCodes.*
import org.apache.pekko.http.scaladsl.model.{HttpResponse, MediaTypes, StatusCode, StatusCodes}
import org.apache.pekko.http.scaladsl.server.Directives.{logRequestResult, path, *}
import org.apache.pekko.http.scaladsl.server.{ExceptionHandler, Route}
import org.slf4j.{Logger, LoggerFactory}
import java.io.File
import java.nio.file.Paths
import scala.concurrent.duration.*
import scala.util.{Failure, Success}
/**
* HTTP FileServer to test: [[sample.stream_shared_state.LocalFileCacheCaffeine]]
* Simulates a quirky legacy file download server encountered in real life
*
* The client can request these types of response:
* - HTTP 200 response: /download/[id]
* - Non-idempotent response: /downloadni/[id]
* Allows only one download file request per id, answer with HTTP 404 on subsequent requests
* - Flaky response: /downloadflaky/[id]
* Reply with additional random failures on requests with certain IDs
*
* Uses a cache to remember the "one download per id" behaviour
* Note that pekko-http would also support server-side caching (by wrapping caffeine in caching directives):
* https://pekko.apache.org/docs/pekko-http/current/common/caching.html
*/
object FileServer extends App {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit val system: ActorSystem = ActorSystem()
import system.dispatcher
val (address, port) = ("127.0.0.1", 6001)
server(address, port)
def server(address: String, port: Int): Unit = {
val resourceFileName = "payload.zip"
val payloadFile = new File(Paths.get("src/main/resources", resourceFileName).toAbsolutePath.toString)
val cache: Cache[String, String] =
Scaffeine()
.recordStats()
.expireAfterWrite(1.hour)
.maximumSize(500)
.build[String, String]()
val exceptionHandler = ExceptionHandler {
case ex: RuntimeException =>
extractUri { uri =>
logger.error(s"Request to $uri could not be handled normally message: ${ex.getMessage}")
//cache.invalidate(id)
complete(HttpResponse(InternalServerError, entity = "Runtime ex occurred"))
}
}
def routes: Route = handleExceptions(exceptionHandler) {
logRequestResult("FileServer") {
path("download" / Segment) { id =>
logger.info(s"TRACE_ID: $id Server received download request")
get {
getFromFile(payloadFile, MediaTypes.`application/zip`)
}
} ~ path("downloadflaky" / Segment) { id =>
logger.info(s"TRACE_ID: $id Server received flaky download request")
get {
if (id.toInt % 10 == 0) { // 10, 20, 30
complete(randomErrorHttpStatusCode)
} else if (id.toInt % 5 == 0) { // 5, 15, 25
// Causes TimeoutException on client if sleep time > 5 sec
randomSleeper()
getFromFile(payloadFile, MediaTypes.`application/zip`)
} else {
getFromFile(payloadFile, MediaTypes.`application/zip`)
}
}
} ~ path("downloadni" / Segment) { id =>
logger.info(s"TRACE_ID: $id Server received non-idempotent request")
if (cache.getIfPresent(id).isDefined) {
logger.warn(s"TRACE_ID: $id Only one download file request per TRACE_ID allowed. Reply with 404")
complete(StatusCodes.NotFound)
} else {
cache.put(id, "downloading") // to simulate blocking on concurrent requests
get {
randomSleeper()
val response = getFromFile(payloadFile, MediaTypes.`application/zip`)
cache.put(id, "downloaded")
response
}
}
}
}
}
val bindingFuture = Http().newServerAt(address, port).bindFlow(routes)
bindingFuture.onComplete {
case Success(b) =>
logger.info(s"Server started, listening on: ${b.localAddress}")
case Failure(e) =>
logger.info(s"Server could not bind to $address:$port. Exception message: ${e.getMessage}")
system.terminate()
}
}
def randomSleeper(): Unit = {
val (start, end) = (1000, 10000)
val rnd = new scala.util.Random
val sleepTime = start + rnd.nextInt((end - start) + 1)
logger.debug(s" -> Sleep for $sleepTime ms")
Thread.sleep(sleepTime.toLong)
}
def randomErrorHttpStatusCode: StatusCode = {
val statusCodes = Seq(StatusCodes.NotFound, StatusCodes.InternalServerError, StatusCodes.ServiceUnavailable)
val statusCode = statusCodes(scala.util.Random.nextInt(statusCodes.size))
logger.info(s" -> Complete with HTTP status code: $statusCode")
statusCode
}
}