[소스코드] 리쉐니에 스마트센서 OPC UA Client Examples (Python) - OPCuaGetDeviceList.py > Download

본문 바로가기
사이트 내 전체검색

전체메뉴

Download
Home > Information > Download
Q&A
Please leave a question.
more
CUSTOMER CENTER
TEL
070-8285-2269
Mail
reshenie@naver.com

[소스코드] 리쉐니에 스마트센서 OPC UA Client Examples (Python) - OPCuaGetDeviceList…

페이지 정보

작성자 최고관리자 댓글 0건 조회 368회 작성일 20-05-03 09:36

본문

OPCuaGetDeviceList.py Uses Python-opcua to retrieve the list of devices connected to your server.

 

import time

import logging

from urllib.parse import urlparse


from opcua import ua, Client



class OpcUaClient(object):

    CONNECT_TIMEOUT = 15  # [sec]

    RETRY_DELAY = 10  # [sec]

    MAX_RETRIES = 3  # [-]


    class Decorators(object):

        @staticmethod

        def autoConnectingClient(wrappedMethod):

            def wrapper(obj, *args, **kwargs):

                for retry in range(OpcUaClient.MAX_RETRIES):

                    try:

                        return wrappedMethod(obj, *args, **kwargs)

                    except ua.uaerrors.BadNoMatch:

                        raise

                    except Exception:

                        pass

                    try:

                        obj._logger.warn('(Re)connecting to OPC-UA service.')

                        obj.reconnect()

                    except ConnectionRefusedError:

                        obj._logger.warn(

                            'Connection refused. Retry in 10s.'.format(

                                OpcUaClient.RETRY_DELAY

                            )

                        )

                        time.sleep(OpcUaClient.RETRY_DELAY)

                else:  # So the exception is exposed.

                    obj.reconnect()

                    return wrappedMethod(obj, *args, **kwargs)

            return wrapper


    def __init__(self, serverUrl):

        self._logger = logging.getLogger(self.__class__.__name__)

        self._client = Client(

            serverUrl.geturl(),

            timeout=self.CONNECT_TIMEOUT

        )


    def __enter__(self):

        self.connect()

        return self


    def __exit__(self, exc_type, exc_value, traceback):

        self.disconnect()

        self._client = None


    @property

    @Decorators.autoConnectingClient

    def sensorList(self):

        return self.objectsNode.get_children()


    @property

    @Decorators.autoConnectingClient

    def objectsNode(self):

        path = [ua.QualifiedName(name='Objects', namespaceidx=0)]

        return self._client.get_root_node().get_child(path)


    def connect(self):

        self._client.connect()

        self._client.load_type_definitions()


    def disconnect(self):

        try:

            self._client.disconnect()

        except Exception:

            pass


    def reconnect(self):

        self.disconnect()

        self.connect()


    @Decorators.autoConnectingClient

    def get_browse_name(self, uaNode):

        return uaNode.get_browse_name()


    @Decorators.autoConnectingClient

    def get_node_class(self, uaNode):

        return uaNode.get_node_class()


    @Decorators.autoConnectingClient

    def get_child(self, uaNode, path):

        return uaNode.get_child(path)


    @Decorators.autoConnectingClient

    def read_raw_history(self,

                         uaNode,

                         starttime=None,

                         endtime=None,

                         numvalues=0,

                         cont=None):

        details = ua.ReadRawModifiedDetails()

        details.IsReadModified = False

        details.StartTime = starttime or ua.get_win_epoch()

        details.EndTime = endtime or ua.get_win_epoch()

        details.NumValuesPerNode = numvalues

        details.ReturnBounds = True

        result = OpcUaClient._history_read(uaNode, details, cont)

        assert(result.StatusCode.is_good())

        return result.HistoryData.DataValues, result.ContinuationPoint


    @staticmethod

    def _history_read(uaNode, details, cont):

        valueid = ua.HistoryReadValueId()

        valueid.NodeId = uaNode.nodeid

        valueid.IndexRange = ''

        valueid.ContinuationPoint = cont


        params = ua.HistoryReadParameters()

        params.HistoryReadDetails = details

        params.TimestampsToReturn = ua.TimestampsToReturn.Both

        params.ReleaseContinuationPoints = False

        params.NodesToRead.append(valueid)

        result = uaNode.server.history_read(params)[0]

        return result



class DataAcquisition(object):

    LOGGER = logging.getLogger('DataAcquisition')


    @staticmethod

    def get_device_list(serverUrl):

        deviceList = dict()

        with OpcUaClient(serverUrl) as client:

            for sensorNode in client.sensorList:

                macId = client.get_browse_name(sensorNode).Name

                if (client.get_node_class(sensorNode) is ua.NodeClass.Object) \

                        and ("server" not in macId.lower()):

                    try:

                        tagPath = ua.QualifiedName(

                            'deviceTag',

                            sensorNode.nodeid.NamespaceIndex

                        )

                        deviceTag = \

                            client.get_child(sensorNode, tagPath).get_value()

                        deviceList[macId] = '{:s}'.format(deviceTag)

                    except Exception:

                        deviceList[macId] = 'Device'

                        continue

        return deviceList



if __name__ == "__main__":

    logging.basicConfig(level=logging.INFO)

    logging.getLogger("opcua").setLevel(logging.WARNING)


    # replace xx.xx.xx.xx with the IP address of your server

    serverIP = "xx.xx.xx.xx"

    serverUrl = urlparse('opc.tcp://{:s}:4840'.format(serverIP))

    deviceList = DataAcquisition.get_device_list(serverUrl=serverUrl)

    print(deviceList)

댓글목록

등록된 댓글이 없습니다.

Company : Reshenie (Manufacturing Engineering Group)
Address : R. 406, Ace Gwanggyo Tower 3rd, 1286, Uidong, Yeongtong-gu, Suwon-si, Gyeonggi-do, S. Korea
TEL : 070-8285-2269 E-mail : reshenie@naver.com
Copyright © 2018 Reshenie All rights reserved.