#!/usr/bin/env python # This will create golden files in a directory passed to it. # A Test calls this internally to create the golden files # So it can process them (so we don't have to checkin the files). # Ensure msgpack-python and cbor are installed first, using: # sudo apt install python-dev (may not be necessary) # sudo apt install python-pip # or python3-pip # pip install --user msgpack-python msgpack-rpc-python cbor # Ensure all "string" keys are utf strings (else encoded as bytes) from __future__ import print_function import cbor, msgpack, msgpackrpc, sys, os, threading mylocaladdr="127.0.0.1" # localhost.localdomain localhost 127.0.0.1 def get_test_data_list(): # get list with all primitive types, and a combo type l0 = [ -8, -1616, -32323232, -6464646464646464, 192, 1616, 32323232, 6464646464646464, 192, -3232.0, -6464646464.0, 3232.0, 6464.0, 6464646464.0, 160.0, 1616.0, False, True, u"null", None, u"some&day>some 0 if stopTimeSec > 0: def myStopRpcServer(): server.stop() t = threading.Timer(stopTimeSec, myStopRpcServer) t.start() server.start() def doRpcClientToPythonSvc(port): address = msgpackrpc.Address(mylocaladdr, port) client = msgpackrpc.Client(address, unpack_encoding='utf-8') print(client.call("Echo123", "A1", "B2", "C3")) print(client.call("EchoStruct", {"A" :"Aa", "B":"Bb", "C":"Cc"})) # def doCheckSocket(port): # print(">>>> port: ", port, " <<<<<") # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # result = sock.connect_ex(('127.0.0.1', port)) # if result == 0: # print("\t>>>> Port is open") # else: # print("\t>>>> Port is not open") # sock.close() def doRpcClientToGoSvc(port): # doCheckSocket(port) address = msgpackrpc.Address(mylocaladdr, port) client = msgpackrpc.Client(address, unpack_encoding='utf-8') print(client.call("TestRpcInt.Echo123", ["A1", "B2", "C3"])) print(client.call("TestRpcInt.EchoStruct", {"A" :"Aa", "B":"Bb", "C":"Cc"})) def doMain(args): if len(args) == 2 and args[0] == "testdata": build_test_data(args[1]) elif len(args) == 3 and args[0] == "rpc-server": doRpcServer(int(args[1]), int(args[2])) elif len(args) == 2 and args[0] == "rpc-client-python-service": doRpcClientToPythonSvc(int(args[1])) elif len(args) == 2 and args[0] == "rpc-client-go-service": doRpcClientToGoSvc(int(args[1])) else: print("Usage: test.py " + "[testdata|rpc-server|rpc-client-python-service|rpc-client-go-service] ...") if __name__ == "__main__": doMain(sys.argv[1:])