[소스코드] 리쉐니에 스마트센서 OPC UA Client Examples (Python) - OPCuaGetVibrationDataStruct.py > Download

본문 바로가기
사이트 내 전체검색

전체메뉴

Download
Home > Information > Download
Q&A
Please leave a question.
more
CUSTOMER CENTER
TEL
070-8285-2269
Mail
reshenie@naver.com

[소스코드] 리쉐니에 스마트센서 OPC UA Client Examples (Python) - OPCuaGetVibrationD…

페이지 정보

작성자 최고관리자 댓글 0건 조회 438회 작성일 20-05-03 09:39

본문

OPCuaGetVibrationDataStruct.py Uses Python-opcua to retrieve and plot the vibration structure containing the most recent vibration measurement data in the time and frequency domain.

 

import time

import sys

import pytz

import logging

import datetime

import itertools

from urllib.parse import urlparse

import matplotlib.pyplot as plt


from opcua import ua, Client



class OpcUaClient(object):

    CONNECT_TIMEOUT = 15  # [sec]

    RETRY_DELAY = 10  # [sec]

    MAX_RETRIES = 3  # [-]


    class Decorators(object):

        @staticmethod

        def autoConnectingClient(wrappedMethod):

            def wrapper(obj, *args, **kwargs):

                for retry in range(OpcUaClient.MAX_RETRIES):

                    try:

                        return wrappedMethod(obj, *args, **kwargs)

                    except ua.uaerrors.BadNoMatch:

                        raise

                    except Exception:

                        pass

                    try:

                        obj._logger.warn('(Re)connecting to OPC-UA service.')

                        obj.reconnect()

                    except ConnectionRefusedError:

                        obj._logger.warn(

                            'Connection refused. Retry in 10s.'.format(

                                OpcUaClient.RETRY_DELAY

                            )

                        )

                        time.sleep(OpcUaClient.RETRY_DELAY)

                else:  # So the exception is exposed.

                    obj.reconnect()

                    return wrappedMethod(obj, *args, **kwargs)

            return wrapper


    def __init__(self, serverUrl):

        self._logger = logging.getLogger(self.__class__.__name__)

        self._client = Client(

            serverUrl.geturl(),

            timeout=self.CONNECT_TIMEOUT

        )


    def __enter__(self):

        self.connect()

        return self


    def __exit__(self, exc_type, exc_value, traceback):

        self.disconnect()

        self._client = None


    @property

    @Decorators.autoConnectingClient

    def sensorList(self):

        return self.objectsNode.get_children()


    @property

    @Decorators.autoConnectingClient

    def objectsNode(self):

        path = [ua.QualifiedName(name='Objects', namespaceidx=0)]

        return self._client.get_root_node().get_child(path)


    def connect(self):

        self._client.connect()

        self._client.load_type_definitions()


    def disconnect(self):

        try:

            self._client.disconnect()

        except Exception:

            pass


    def reconnect(self):

        self.disconnect()

        self.connect()


    @Decorators.autoConnectingClient

    def get_browse_name(self, uaNode):

        return uaNode.get_browse_name()


    @Decorators.autoConnectingClient

    def get_node_class(self, uaNode):

        return uaNode.get_node_class()


    @Decorators.autoConnectingClient

    def get_namespace_index(self, uri):

        return self._client.get_namespace_index(uri)


    @Decorators.autoConnectingClient

    def get_child(self, uaNode, path):

        return uaNode.get_child(path)


    @Decorators.autoConnectingClient

    def read_raw_history(self,

                         uaNode,

                         starttime=None,

                         endtime=None,

                         numvalues=0,

                         cont=None):

        details = ua.ReadRawModifiedDetails()

        details.IsReadModified = False

        details.StartTime = starttime or ua.get_win_epoch()

        details.EndTime = endtime or ua.get_win_epoch()

        details.NumValuesPerNode = numvalues

        details.ReturnBounds = True

        result = OpcUaClient._history_read(uaNode, details, cont)

        assert(result.StatusCode.is_good())

        return result.HistoryData.DataValues, result.ContinuationPoint


    @staticmethod

    def _history_read(uaNode, details, cont):

        valueid = ua.HistoryReadValueId()

        valueid.NodeId = uaNode.nodeid

        valueid.IndexRange = ''

        valueid.ContinuationPoint = cont


        params = ua.HistoryReadParameters()

        params.HistoryReadDetails = details

        params.TimestampsToReturn = ua.TimestampsToReturn.Both

        params.ReleaseContinuationPoints = False

        params.NodesToRead.append(valueid)

        result = uaNode.server.history_read(params)[0]

        return result



class DataAcquisition(object):

    LOGGER = logging.getLogger('DataAcquisition')

    AXES = ('x', 'y', 'z')

    ORDINATES = ('accel', 'veloc')

    DOMAINS = ('time', 'freq')

    MAX_VALUES_PER_ENDNODE = 100  # Num values per endnode

    MAX_VALUES_PER_REQUEST = 2  # Num values per history request


    @staticmethod

    def selected_to_workbook(serverUrl,

                             macIdsToCollect,

                             starttime,

                             endtime):

        with OpcUaClient(serverUrl) as client:

            for sensorNode in client.sensorList:

                assert(client._client.uaclient._uasocket.timeout == 15)

                macId = client.get_browse_name(sensorNode).Name

                if macId not in macIdsToCollect:

                    DataAcquisition.LOGGER.info(

                        'Skipping sensor {:s}'.format(macId)

                    )

                    continue

                tagPath = ua.QualifiedName(

                    'deviceTag',

                    sensorNode.nodeid.NamespaceIndex

                )

                DataAcquisition.LOGGER.info(

                    'Processing sensor {:s} ({:s})'.format(

                        macId,

                        client.get_child(sensorNode, tagPath).get_value()

                    )

                )

                DataAcquisition.get_sensor_data(

                        client,

                        sensorNode,

                        starttime,

                        endtime

                )


    @staticmethod

    def get_sensor_data(serverUrl, macId, browseName, starttime, endtime):

        allValues = []

        allDates = []

        with OpcUaClient(serverUrl) as client:

            assert(client._client.uaclient._uasocket.timeout == 15)

            sensorNode = DataAcquisition.get_sensor_node(

                    client,

                    macId,

                    browseName

            )

            for path in DataAcquisition.endnodes_path_generator(sensorNode):

                DataAcquisition.LOGGER.info(

                        'Browsing {:s} -> {:s}'.format(

                                macId,

                                sensorNode.get_browse_name().Name

                            )

                )

                endNode = client.get_child(sensorNode, path)

                (values, dates) = DataAcquisition.get_endnode_data(

                        client,

                        endNode,

                        starttime,

                        endtime

                )

                allValues.extend(values)

                allDates.extend(dates)

        return (allValues, allDates)


    @staticmethod

    def endnodes_path_generator(sensorNode):

        for (axis, ordinate, domain) in \

                itertools.product(DataAcquisition.AXES,

                                  DataAcquisition.ORDINATES,

                                  DataAcquisition.DOMAINS):

            # browseName: e.g. xAccelTime

            browseName = ''.join([

                axis, ordinate.capitalize(), domain.capitalize()

            ])

            nsIdx = sensorNode.nodeid.NamespaceIndex  # iQunet namespace index

            path = [

                ua.QualifiedName(axis, nsIdx),        # e.g. 'x'

                ua.QualifiedName(ordinate, nsIdx),    # e.g. 'accel'

                ua.QualifiedName(browseName, nsIdx),  # e.g. 'xAccelTime'

            ]

            yield path


    @staticmethod

    def get_sensor_node(client, macId, browseName):

        nsIdx = client.get_namespace_index(

                'http://www.iqunet.com'

        )  # iQunet namespace index

        bpath = [

                ua.QualifiedName(name=macId, namespaceidx=nsIdx),

                ua.QualifiedName(name=browseName, namespaceidx=nsIdx)

                ]

        sensorNode = client.objectsNode.get_child(bpath)

        return sensorNode


    @staticmethod

    def get_endnode_data(client, endNode, starttime, endtime):

        dvList = DataAcquisition.download_endnode(

                client,

                endNode,

                starttime,

                endtime

        )

        dates, values = ([], [])

        for dv in dvList:

            dates.append(dv.SourceTimestamp.strftime('%Y-%m-%d %H:%M:%S'))

            values.append(dv.Value.Value.y_ordinate)


        # If no starttime is given, results of read_raw_history are reversed.

        if starttime is None:

            values.reverse()

            dates.reverse()

 

댓글목록

등록된 댓글이 없습니다.

Company : Reshenie (Manufacturing Engineering Group)
Address : R. 406, Ace Gwanggyo Tower 3rd, 1286, Uidong, Yeongtong-gu, Suwon-si, Gyeonggi-do, S. Korea
TEL : 070-8285-2269 E-mail : reshenie@naver.com
Copyright © 2018 Reshenie All rights reserved.