Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions dataproc/dataproc_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@
ChunkOfFiles []string
}


// writeResultsToSingleJSON handles stream writing to a single file
func writeResultsToSingleJSON(outputDir string, input <-chan string) {
outputPath := filepath.Join(outputDir, "all_replays.json")
f, err := os.Create(outputPath)
if err != nil {
log.Error("Failed to create output JSON file:", err)
// Drain channel to prevent blocking workers if file fails
for range input {}
return
}
defer f.Close()

// Start the JSON array
f.WriteString("[\n")

Check failure on line 43 in dataproc/dataproc_pipeline.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `f.WriteString` is not checked (errcheck)

first := true
for jsonString := range input {
if !first {
f.WriteString(",\n")

Check failure on line 48 in dataproc/dataproc_pipeline.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `f.WriteString` is not checked (errcheck)
}
f.WriteString(jsonString)

Check failure on line 50 in dataproc/dataproc_pipeline.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `f.WriteString` is not checked (errcheck)
first = false
}

// End the JSON array
f.WriteString("\n]")
log.Info("Successfully wrote combined JSON to ", outputPath)
}


// PipelineWrapper is an orchestrator that distributes work
// among available workers (threads)
func PipelineWrapper(
Expand All @@ -51,6 +82,21 @@
)
defer progressBar.Close()


// 1. Create the results channel
singleJsonResultChan := make(chan string, cliFlags.NumberOfThreads*4)

// 2. Start the single writer goroutine
var writerWg sync.WaitGroup
// Creating a single writer gorouting that will create a single JSON file
// with all of the replays as a JSON array.
writerWg.Add(1)
go func() {
defer writerWg.Done()
writeResultsToSingleJSON(cliFlags.OutputDirectory, singleJsonResultChan)
}()


// If it is specified by the user to perform the processing without
// multiprocessing GOMAXPROCS needs to be set to 1 in order to allow 1 thread:
runtime.GOMAXPROCS(cliFlags.NumberOfThreads)
Expand All @@ -59,13 +105,14 @@
// Adding a task for each of the supplied chunks to speed up the processing:
wg.Add(cliFlags.NumberOfThreads)


// Spin up workers waiting for chunks to process:
for i := 0; i < cliFlags.NumberOfThreads; i++ {
go func() {
defer wg.Done()
for {
channelContents, ok := <-channel
if !ok {
wg.Done()
return
}
MultiprocessingChunkPipeline(
Expand All @@ -76,6 +123,7 @@
foreignToEnglishMapping,
progressBar,
cliFlags,
singleJsonResultChan,
)
}
}()
Expand All @@ -91,6 +139,8 @@

close(channel)
wg.Wait()
close(singleJsonResultChan)
writerWg.Wait()
progressBar.Close()

log.Debug("Finished PipelineWrapper()")
Expand All @@ -107,6 +157,7 @@
englishToForeignMapping map[string]string,
progressBar *progressbar.ProgressBar,
cliFlags utils.CLIFlags,
singleJsonResultChan chan<- string,
) {

// Letting the orchestrator know that this processing task was finished:
Expand Down Expand Up @@ -204,8 +255,17 @@
return
}

if cliFlags.SingleJsonOutput {
singleJsonResultChan <- replayString
processedCounter++
processingInfoStruct.AddToProcessed(replayFile)
log.Info("Sent file to writer for single JSON output.")
return
}


// Saving output to zip archive:
if packageToZipBool {
if packageToZipBool{
// Append it to a list and when a package is created create a package summary and clear the list for next iterations
persistent_data.AddReplaySummToPackageSumm(
&replaySummary,
Expand All @@ -217,7 +277,8 @@
replayString,
replayFile,
compressionMethod,
writer)
writer,
)
if !savedSuccess {
compressionErrorCounter++
log.WithFields(log.Fields{
Expand All @@ -233,6 +294,7 @@
return
}


okSaveToDrive := file_utils.SaveReplayJSONFileToDrive(
replayString,
replayFile,
Expand Down
15 changes: 13 additions & 2 deletions dataproc/stringify_replay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dataproc

import (
"bytes"
"encoding/json"

"github.com/Kaszanas/SC2InfoExtractorGo/datastruct/replay_data"
Expand All @@ -12,12 +13,22 @@ func stringifyReplay(replayData *replay_data.CleanedReplay) (bool, string) {

log.Debug("Entered stringifyReplay()")

replayDataString, marshalErr := json.MarshalIndent(replayData, "", " ")

replayDataStringBytes, marshalErr := json.Marshal(replayData)
if marshalErr != nil {
log.Error("Error while marshaling the string representation of cleanReplayData.")
return false, ""
}

compactedOutput := new(bytes.Buffer)
compactErr := json.Compact(compactedOutput, replayDataStringBytes)
if compactErr != nil {
log.Error("Error while compacting the string representation of cleanReplayData.")
return false, ""
}

compactedString := compactedOutput.String()

log.Debug("Finished stringifyReplay()")
return true, string(replayDataString)
return true, compactedString
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func mainReturnWithCode() int {
"CLIflags.SkipDependencyDownload": CLIflags.SkipDependencyDownload,
"CLIflags.DependencyDirectory": CLIflags.DependencyDirectory,
"CLIflags.NumberOfPackages": CLIflags.NumberOfPackages,
"CLIflags.SingleJsonOutput": CLIflags.SingleJsonOutput,
"CLIflags.PerformIntegrityCheck": CLIflags.PerformIntegrityCheck,
"CLIflags.PerformValidityCheck": CLIflags.PerformValidityCheck,
"CLIflags.PerformCleanup": CLIflags.PerformCleanup,
Expand Down
22 changes: 16 additions & 6 deletions utils/flag_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type CLIFlags struct {
DependencyDirectory string
NumberOfThreads int
NumberOfPackages int
SingleJsonOutput bool
PerformIntegrityCheck bool
PerformValidityCheck bool
PerformCleanup bool
Expand Down Expand Up @@ -77,6 +78,14 @@ func ParseFlags() (CLIFlags, bool) {
zip packaging and output .json directly to drive.`,
)

singleJsonOutput := flag.Bool(
"single_json_output",
false,
`Flag specifying if the output should be a single JSON file instead of multiple zip packages.
If set to true, the output will be a single JSON file containing an array of all processed replays.
In such case the "number_of_packages" flag will be ignored and all processed replays will be saved in a single JSON file.`,
)

// Boolean Flags:
help := flag.Bool(
"help",
Expand Down Expand Up @@ -138,12 +147,12 @@ func ParseFlags() (CLIFlags, bool) {
// Misc flags:
logLevelFlag := flag.Int(
"log_level",
4,
`Specifies a log level from 1-7:
Panic - 1, Fatal - 2,
Error - 3, Warn - 4,
Info - 5, Debug - 6,
Trace - 7`,
3,
`Specifies a log level from 0-6:
Panic - 0, Fatal - 1,
Error - 2, Warn - 3,
Info - 4, Debug - 5,
Trace - 6`,
)
logDirectoryFlag := flag.String(
"log_dir",
Expand Down Expand Up @@ -197,6 +206,7 @@ func ParseFlags() (CLIFlags, bool) {
SkipDependencyDownload: *skipDependencyDownload,
DependencyDirectory: absolutePathDependencyDirectory,
NumberOfPackages: *numberOfPackagesFlag,
SingleJsonOutput: *singleJsonOutput,
PerformIntegrityCheck: *performIntegrityCheckFlag,
PerformValidityCheck: *performValidityCheckFlag,
PerformCleanup: *performCleanupFlag,
Expand Down
4 changes: 4 additions & 0 deletions utils/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func SetLogging(logPath string, logLevel int) (*os.File, bool) {
log.SetOutput(logFile)
log.Info("Set logging format, defined log file.")

logLevelString := log.Level(logLevel).String()
log.Info("Log level set to: " + logLevelString)


log.SetLevel(log.Level(logLevel))
log.Info("Set logging level.")

Expand Down
Loading