PEP-8 and PyLint
authorEdouard Tisserant
Thu, 19 Apr 2018 14:53:42 +0200
changeset 1997 d9e8fb47340f
parent 1996 4ae9c4447947
child 1998 12b20698e4f3
PEP-8 and PyLint
Beremiz_service.py
ProjectController.py
canfestival/canfestival.py
connectors/PYRO/__init__.py
runtime/PLCObject.py
--- a/Beremiz_service.py	Thu Apr 19 13:09:41 2018 +0200
+++ b/Beremiz_service.py	Thu Apr 19 14:53:42 2018 +0200
@@ -400,7 +400,6 @@
     return res
 
 
-
 class Server(object):
     def __init__(self, servicename, ip_addr, port,
                  workdir, argv,
@@ -437,7 +436,6 @@
 
         sys.stdout.flush()
 
-
     def PyroLoop(self, when_ready):
         while self.continueloop:
             Pyro.config.PYRO_MULTITHREADED = 0
@@ -449,7 +447,7 @@
             # unwanted diconnection when IDE is kept busy for long periods
             self.daemon.setTimeout(60)
 
-            uri = self.daemon.connect(self.plcobj, "PLCObject")
+            self.daemon.connect(self.plcobj, "PLCObject")
 
             if self._to_be_published():
                 self.servicepublisher = ServicePublisher.ServicePublisher()
@@ -484,8 +482,6 @@
                 self.plcobj.StartPLC()
         self.plcobj.StatusChange()
 
-
-
 if enabletwisted:
     import warnings
     with warnings.catch_warnings():
@@ -652,19 +648,19 @@
     if havetwisted:
         # reactor._installSignalHandlersAgain()
         def ui_thread_target():
-            # FIXME: had to disable SignaHandlers install because 
+            # FIXME: had to disable SignaHandlers install because
             # signal not working in non-main thread
             reactor.run(installSignalHandlers=False)
-    else :
+    else:
         ui_thread_target = app.MainLoop
 
-    ui_thread = Thread(target = ui_thread_target)
+    ui_thread = Thread(target=ui_thread_target)
     ui_thread.start()
 
     # This order ui loop to unblock main thread when ready.
     if havetwisted:
-        reactor.callLater(0,ui_thread_started.release)
-    else :
+        reactor.callLater(0, ui_thread_started.release)
+    else:
         wx.CallAfter(ui_thread_started.release)
 
     # Wait for ui thread to be effective
--- a/ProjectController.py	Thu Apr 19 13:09:41 2018 +0200
+++ b/ProjectController.py	Thu Apr 19 14:53:42 2018 +0200
@@ -37,7 +37,7 @@
 import re
 import tempfile
 from types import ListType
-from threading import Timer, Lock, Thread
+from threading import Timer
 from datetime import datetime
 from weakref import WeakKeyDictionary
 from itertools import izip
@@ -1439,7 +1439,6 @@
                                         values_buffer.append((value, forced))
                             self.DebugTicks.append(debug_tick)
 
-
         buffers, self.DebugValuesBuffers = (self.DebugValuesBuffers,
                                             [list() for dummy in xrange(len(self.TracedIECPath))])
 
@@ -1556,26 +1555,22 @@
         if IECPath not in self.IECdebug_datas:
             return
 
-
         # If no entry exist, create a new one with a fresh WeakKeyDictionary
         IECdebug_data = self.IECdebug_datas.get(IECPath, None)
         IECdebug_data[2] = "Forced"
         IECdebug_data[3] = fvalue
 
-
         self.ReArmDebugRegisterTimer()
 
     def ReleaseDebugIECVariable(self, IECPath):
         if IECPath not in self.IECdebug_datas:
             return
 
-
         # If no entry exist, create a new one with a fresh WeakKeyDictionary
         IECdebug_data = self.IECdebug_datas.get(IECPath, None)
         IECdebug_data[2] = "Registered"
         IECdebug_data[3] = None
 
-
         self.ReArmDebugRegisterTimer()
 
     def CallWeakcallables(self, IECPath, function_name, *cargs):
@@ -1599,7 +1594,6 @@
             return -1, "No runtime connected!"
         return self._connector.RemoteExec(script, **kwargs)
 
-
     def DispatchDebugValuesProc(self, event):
         debug_ticks, buffers = self.SnapshotAndResetDebugValuesBuffers()
         start_time = time.time()
--- a/canfestival/canfestival.py	Thu Apr 19 13:09:41 2018 +0200
+++ b/canfestival/canfestival.py	Thu Apr 19 14:53:42 2018 +0200
@@ -38,17 +38,16 @@
     LOCATION_CONFNODE, \
     LOCATION_VAR_MEMORY
 
-base_folder = paths.AbsParentDir(__file__, 2)
-CanFestivalPath = os.path.join(base_folder, "CanFestival-3")
-sys.path.append(os.path.join(CanFestivalPath, "objdictgen"))
-
+base_folder = paths.AbsParentDir(__file__, 2)  # noqa
+CanFestivalPath = os.path.join(base_folder, "CanFestival-3")  # noqa
+sys.path.append(os.path.join(CanFestivalPath, "objdictgen"))  # noqa
+
+# pylint: disable=wrong-import-position
 from nodelist import NodeList
-
 from nodemanager import NodeManager
 import gen_cfile
 import eds_utils
 import canfestival_config as local_canfestival_config  # pylint: disable=import-error
-
 from commondialogs import CreateNodeDialog
 from subindextable import IECTypeConversion, SizeConversion
 from canfestival import config_utils
--- a/connectors/PYRO/__init__.py	Thu Apr 19 13:09:41 2018 +0200
+++ b/connectors/PYRO/__init__.py	Thu Apr 19 14:53:42 2018 +0200
@@ -145,6 +145,7 @@
         "GetPLCstatus": ("Broken", None),
         "RemoteExec": (-1, "RemoteExec script failed!")
     }
+
     class PyroProxyProxy(object):
         """
         A proxy proxy class to handle Beremiz Pyro interface specific behavior.
--- a/runtime/PLCObject.py	Thu Apr 19 13:09:41 2018 +0200
+++ b/runtime/PLCObject.py	Thu Apr 19 14:53:42 2018 +0200
@@ -24,7 +24,7 @@
 
 from __future__ import absolute_import
 import thread
-from threading import Timer, Thread, Lock, Semaphore, Event, Condition
+from threading import Thread, Lock, Semaphore, Event, Condition
 import ctypes
 import os
 import sys
@@ -65,19 +65,19 @@
     """
     job to be executed by a worker
     """
-    def __init__(self,call,*args,**kwargs):
-        self.job = (call,args,kwargs)
+    def __init__(self, call, *args, **kwargs):
+        self.job = (call, args, kwargs)
         self.result = None
         self.success = False
         self.exc_info = None
 
     def do(self):
         """
-        do the job by executing the call, and deal with exceptions 
-        """
-        try :
+        do the job by executing the call, and deal with exceptions
+        """
+        try:
             call, args, kwargs = self.job
-            self.result = call(*args,**kwargs)
+            self.result = call(*args, **kwargs)
             self.success = True
         except Exception:
             self.success = False
@@ -98,13 +98,13 @@
         self.free = Condition(self.mutex)
         self.job = None
 
-    def runloop(self,*args,**kwargs):
+    def runloop(self, *args, **kwargs):
         """
         meant to be called by worker thread (blocking)
         """
         self._threadID = thread.get_ident()
         if args or kwargs:
-            job(*args,**kwargs).do()
+            job(*args, **kwargs).do()
             # result is ignored
         self.mutex.acquire()
         while not self._finish:
@@ -115,16 +115,16 @@
             else:
                 self.free.notify()
         self.mutex.release()
-    
+
     def call(self, *args, **kwargs):
         """
         creates a job, execute it in worker thread, and deliver result.
-        if job execution raise exception, re-raise same exception 
+        if job execution raise exception, re-raise same exception
         meant to be called by non-worker threads, but this is accepted.
         blocking until job done
         """
 
-        _job = job(*args,**kwargs)
+        _job = job(*args, **kwargs)
 
         if self._threadID == thread.get_ident() or self._threadID is None:
             # if caller is worker thread execute immediately
@@ -147,7 +147,7 @@
             return _job.result
         else:
             raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2]
-        
+
     def quit(self):
         """
         unblocks main thread, and terminate execution of runloop()
@@ -164,10 +164,10 @@
 
 
 def RunInMain(func):
-    def func_wrapper(*args,**kwargs):
+    def func_wrapper(*args, **kwargs):
         return MainWorker.call(func, *args, **kwargs)
     return func_wrapper
-    
+
 
 class PLCObject(pyro.ObjBase):
     def __init__(self, server):
@@ -193,7 +193,7 @@
 
     # First task of worker -> no @RunInMain
     def AutoLoad(self):
-        # Get the last transfered PLC 
+        # Get the last transfered PLC
         try:
             self.CurrentPLCFilename = open(
                 self._GetMD5FileName(),
@@ -226,7 +226,7 @@
         if self._ResetLogCount is not None:
             self._ResetLogCount()
 
-    # used internaly 
+    # used internaly
     def GetLogCount(self, level):
         if self._GetLogCount is not None:
             return int(self._GetLogCount(level))
@@ -414,7 +414,7 @@
             if exp is not None:
                 self.LogMessage(0, '\n'.join(traceback.format_exception(*exp)))
 
-    # used internaly 
+    # used internaly
     def PythonRuntimeInit(self):
         MethodNames = ["init", "start", "stop", "cleanup"]
         self.python_runtime_vars = globals().copy()
@@ -466,7 +466,7 @@
 
         self.PythonRuntimeCall("init")
 
-    # used internaly 
+    # used internaly
     def PythonRuntimeCleanup(self):
         if self.python_runtime_vars is not None:
             self.PythonRuntimeCall("cleanup")
@@ -544,10 +544,9 @@
             NewFileName = md5sum + lib_ext
             extra_files_log = os.path.join(self.workingdir, "extra_files.txt")
 
-            old_PLC_filename = os.path.join(self.workingdir, \
-                                            self.CurrentPLCFilename) \
-                               if self.CurrentPLCFilename is not None \
-                               else None
+            old_PLC_filename = os.path.join(self.workingdir, self.CurrentPLCFilename) \
+                if self.CurrentPLCFilename is not None \
+                else None
             new_PLC_filename = os.path.join(self.workingdir, NewFileName)
 
             # Some platform (i.e. Xenomai) don't like reloading same .so file
@@ -559,7 +558,6 @@
             self.LogMessage("NewPLC (%s)" % md5sum)
             self.PLCStatus = "Empty"
 
-
             try:
                 if replace_PLC_shared_object:
                     os.remove(old_PLC_filename)
@@ -635,7 +633,6 @@
         else:
             self._suspendDebug(True)
 
-
     def _TracesSwap(self):
         self.LastSwapTrace = time()
         if self.TraceThread is None and self.PLCStatus == "Started":
@@ -666,14 +663,14 @@
 
             res = self._GetDebugData(ctypes.byref(tick),
                                      ctypes.byref(size),
-                                     ctypes.byref(buff)) 
+                                     ctypes.byref(buff))
             if res == 0:
                 if size.value:
                     TraceBuffer = ctypes.string_at(buff.value, size.value)
                 self._FreeDebugData()
 
             self.PLClibraryLock.release()
-                
+
             # leave thread if GetDebugData isn't happy.
             if res != 0:
                 break