@@ -518,15 +518,15 @@ class RemoteLogger(Logger):
518518
519519 def __init__ (
520520 self ,
521- url ,
521+ url , # Not used yet
522+ useServerCertificate ,
522523 name = "Pilot" ,
523524 debugFlag = False ,
524525 pilotOutput = "pilot.out" ,
525526 isPilotLoggerOn = True ,
526527 pilotUUID = "unknown" ,
527528 flushInterval = 10 ,
528529 bufsize = 1000 ,
529- wnVO = "unknown" ,
530530 ):
531531 """
532532 c'tor
@@ -538,34 +538,48 @@ def __init__(
538538 self .pilotUUID = pilotUUID
539539 self .wnVO = wnVO
540540 self .isPilotLoggerOn = isPilotLoggerOn
541- sendToURL = partial (sendMessage , url , pilotUUID , wnVO , "sendMessage" )
541+ sendToURL = partial (sendMessage , useServerCertificate , pilotUUID )
542542 self .buffer = FixedSizeBuffer (sendToURL , bufsize = bufsize , autoflush = flushInterval )
543543
544- def debug (self , msg , header = True , _sendPilotLog = False ):
545- # TODO: Send pilot log remotely?
544+ def format_to_json (self , level , message ):
545+
546+ escaped = json .dumps (message )[1 :- 1 ] # remove outer quotes
547+
548+ # Split on escaped newlines
549+ splitted_message = escaped .split ("\\ n" )
550+
551+ output = []
552+ for mess in splitted_message :
553+ if mess :
554+ output .append ({
555+ "timestamp" : datetime .utcnow ().strftime ("%Y-%m-%dT%H:%M:%S.%fZ" ),
556+ "severity" : level ,
557+ "message" : mess ,
558+ "scope" : self .name
559+ })
560+ return output
561+
562+ def debug (self , msg , header = True ):
546563 super (RemoteLogger , self ).debug (msg , header )
547564 if (
548565 self .isPilotLoggerOn and self .debugFlag
549566 ): # the -d flag activates this debug flag in CommandBase via PilotParams
550- self .sendMessage (self .messageTemplate . format (level = "DEBUG" , message = msg ))
567+ self .sendMessage (self .format_to_json (level = "DEBUG" , message = msg ))
551568
552- def error (self , msg , header = True , _sendPilotLog = False ):
553- # TODO: Send pilot log remotely?
569+ def error (self , msg , header = True ):
554570 super (RemoteLogger , self ).error (msg , header )
555571 if self .isPilotLoggerOn :
556- self .sendMessage (self .messageTemplate . format (level = "ERROR" , message = msg ))
572+ self .sendMessage (self .format_to_json (level = "ERROR" , message = msg ))
557573
558- def warn (self , msg , header = True , _sendPilotLog = False ):
559- # TODO: Send pilot log remotely?
574+ def warn (self , msg , header = True ):
560575 super (RemoteLogger , self ).warn (msg , header )
561576 if self .isPilotLoggerOn :
562- self .sendMessage (self .messageTemplate . format (level = "WARNING" , message = msg ))
577+ self .sendMessage (self .format_to_json (level = "WARNING" , message = msg ))
563578
564- def info (self , msg , header = True , _sendPilotLog = False ):
565- # TODO: Send pilot log remotely?
579+ def info (self , msg , header = True ):
566580 super (RemoteLogger , self ).info (msg , header )
567581 if self .isPilotLoggerOn :
568- self .sendMessage (self .messageTemplate . format (level = "INFO" , message = msg ))
582+ self .sendMessage (self .format_to_json (level = "INFO" , message = msg ))
569583
570584 def sendMessage (self , msg ):
571585 """
@@ -577,7 +591,7 @@ def sendMessage(self, msg):
577591 :rtype: None
578592 """
579593 try :
580- self .buffer .write (msg + " \n " )
594+ self .buffer .write (msg )
581595 except Exception as err :
582596 super (RemoteLogger , self ).error ("Message not sent" )
583597 super (RemoteLogger , self ).error (str (err ))
@@ -622,34 +636,31 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10):
622636 self ._timer .start ()
623637 else :
624638 self ._timer = None
625- self .output = StringIO ()
639+ self .output = []
626640 self .bufsize = bufsize
627641 self ._nlines = 0
628642 self .senderFunc = senderFunc
629643
630644 @synchronized
631- def write (self , text ):
645+ def write (self , content_json ):
632646 """
633647 Write text to a string buffer. Newline characters are counted and number of lines in the buffer
634648 is increased accordingly.
635649
636- :param text: text string to write
637- :type text: str
650+ :param content_json: Json to send, format following format_to_json
651+ :type content_json: list[dict]
638652 :return: None
639653 :rtype: None
640654 """
641- # reopen the buffer in a case we had to flush a partially filled buffer
642- if self .output .closed :
643- self .output = StringIO ()
644- self .output .write (text )
645- self ._nlines += max (1 , text .count ("\n " ))
655+
656+ self .output .extend (content_json )
657+
658+ try :
659+ self ._nlines += max (1 , len (content_json ))
660+ except Exception :
661+ raise ValueError (content_json )
646662 self .sendFullBuffer ()
647663
648- @synchronized
649- def getValue (self ):
650- content = self .output .getvalue ()
651- return content
652-
653664 @synchronized
654665 def sendFullBuffer (self ):
655666 """
@@ -659,22 +670,19 @@ def sendFullBuffer(self):
659670
660671 if self ._nlines >= self .bufsize :
661672 self .flush ()
662- self .output = StringIO ()
673+ self .output = []
663674
664675 @synchronized
665- def flush (self ):
676+ def flush (self , force = False ):
666677 """
667678 Flush the buffer and send log records to a remote server. The buffer is closed as well.
668679
669680 :return: None
670681 :rtype: None
671682 """
672- if not self .output .closed and self ._nlines > 0 :
673- self .output .flush ()
674- buf = self .getValue ()
675- self .senderFunc (buf )
683+ if force or (self .output and self ._nlines > 0 ):
684+ self .senderFunc (self .output )
676685 self ._nlines = 0
677- self .output .close ()
678686
679687 def cancelTimer (self ):
680688 """
@@ -687,40 +695,32 @@ def cancelTimer(self):
687695 self ._timer .cancel ()
688696
689697
690- def sendMessage (url , pilotUUID , wnVO , method , rawMessage ):
691- """
692- Invoke a remote method on a Tornado server and pass a JSON message to it.
693-
694- :param str url: Server URL
695- :param str pilotUUID: pilot unique ID
696- :param str wnVO: VO name, relevant only if not contained in a proxy
697- :param str method: a method to be invoked
698- :param str rawMessage: a message to be sent, in JSON format
699- :return: None.
700- """
701- caPath = os .getenv ("X509_CERT_DIR" )
702- cert = os .getenv ("X509_USER_PROXY" )
703-
704- context = ssl .create_default_context ()
705- context .load_verify_locations (capath = caPath )
698+ def sendMessage (useServerCertificate , pilotUUID , rawMessage = []):
699+ cfg = []
700+ if useServerCertificate :
701+ cfg .append ("-o /DIRAC/Security/UseServerCertificate=yes" )
706702
707- message = json .dumps ((json .dumps (rawMessage ), pilotUUID , wnVO ))
703+ formatted_logs = json .dumps (rawMessage )
704+
705+ # Escape single quotes in JSON string for safe shell quoting
706+ safe_logs = formatted_logs .replace ("'" , "'\\ ''" )
708707
709- try :
710- context .load_cert_chain (cert ) # this is a proxy
711- raw_data = {"method" : method , "args" : message }
712- except IsADirectoryError : # assuming it'a dir containing cert and key
713- context .load_cert_chain (os .path .join (cert , "hostcert.pem" ), os .path .join (cert , "hostkey.pem" ))
714- raw_data = {"method" : method , "args" : message , "extraCredentials" : '"hosts"' }
715-
716- if sys .version_info .major == 3 :
717- data = urlencode (raw_data ).encode ("utf-8" ) # encode to bytes ! for python3
718- else :
719- # Python2
720- data = urlencode (raw_data )
708+ cmd = "dirac-admin-send-pilot-logs %s '%s' %s -d" % (
709+ pilotUUID ,
710+ safe_logs ,
711+ " " .join (cfg ),
712+ )
721713
722- res = urlopen (url , data , context = context )
723- res .close ()
714+ FNULL = open (os .devnull , 'w' )
715+ _p = subprocess .Popen (
716+ cmd ,
717+ shell = True ,
718+ stdout = FNULL ,
719+ stderr = FNULL ,
720+ close_fds = False
721+ )
722+ _p .wait ()
723+ FNULL .close ()
724724
725725
726726class CommandBase (object ):
@@ -750,12 +750,12 @@ def __init__(self, pilotParams):
750750 # remote logger
751751 self .log = RemoteLogger (
752752 loggerURL ,
753- self .__class__ .__name__ ,
753+ useServerCertificate = pilotParams .useServerCertificate ,
754+ name = self .__class__ .__name__ ,
754755 pilotUUID = pilotParams .pilotUUID ,
755756 debugFlag = self .debugFlag ,
756757 flushInterval = interval ,
757758 bufsize = bufsize ,
758- wnVO = pilotParams .wnVO ,
759759 )
760760
761761 self .log .isPilotLoggerOn = isPilotLoggerOn
@@ -805,8 +805,12 @@ def executeAndGetOutput(self, cmd, environDict=None):
805805 else :
806806 sys .stdout .write (outChunk )
807807 sys .stdout .flush ()
808- if hasattr (self .log , "buffer" ) and self .log .isPilotLoggerOn :
809- self .log .buffer .write (outChunk )
808+ if hasattr (self .log , "url" ):
809+ # It's a remote logger
810+ self .log .buffer .write (self .log .format_to_json ( # type: ignore
811+ "COMMAND" ,
812+ outChunk
813+ ))
810814 outData += outChunk
811815 # If no data was read on any of the pipes then the process has finished
812816 if not dataWasRead :
0 commit comments